From 2d8e977337782e46cd189b7211bb0965c089e905 Mon Sep 17 00:00:00 2001 From: Maarten Bosmans Date: Mon, 28 Nov 2022 20:23:25 +0100 Subject: [PATCH 1/6] Some small cleanup User-visible strings should be quoted with " and ' is for internal strings. --- ipfn/ipfn.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/ipfn/ipfn.py b/ipfn/ipfn.py index 6b2181b..3d9e8b0 100755 --- a/ipfn/ipfn.py +++ b/ipfn/ipfn.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python from __future__ import print_function import numpy as np import pandas as pd @@ -41,7 +40,7 @@ def __init__(self, original, aggregates, dimensions, weight_col='total', self.conv_rate = convergence_rate self.max_itr = max_iteration if verbose not in [0, 1, 2]: - raise(ValueError(f"wrong verbose input, must be either 0, 1 or 2 but got {verbose}")) + raise ValueError(f"wrong verbose input, must be either 0, 1 or 2 but got {verbose}") self.verbose = verbose self.rate_tolerance = rate_tolerance @@ -74,7 +73,7 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): m = IPF.iteration() """ - # Check that the inputs are numpay arrays of floats + # Check that the inputs are numpy arrays of floats inc = 0 for aggregate in aggregates: if not isinstance(aggregate, np.ndarray): @@ -142,7 +141,7 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): ori_ijk = aggregates[inc][item] m_slice = m[idx] m_ijk = m_slice.sum() - # print('Current vs original', abs(m_ijk/ori_ijk - 1)) + # print("Current vs original", abs(m_ijk/ori_ijk - 1)) if abs(m_ijk / ori_ijk - 1) > max_conv: max_conv = abs(m_ijk / ori_ijk - 1) @@ -268,7 +267,7 @@ def iteration(self): ipfn_method = self.ipfn_np self.original = self.original.astype('float64') else: - raise(ValueError(f'Data input instance not recognized. The input matrix is not a numpy array or pandas DataFrame')) + raise ValueError(f"Data input instance not recognized. The input matrix is not a numpy array or pandas DataFrame") while ((i <= self.max_itr and conv > self.conv_rate) and (i <= self.max_itr and abs(conv - old_conv) > self.rate_tolerance)): old_conv = conv m, conv = ipfn_method(m, self.aggregates, self.dimensions, self.weight_col) @@ -277,11 +276,11 @@ def iteration(self): converged = 1 if i <= self.max_itr: if (not conv > self.conv_rate) & (self.verbose > 1): - print('ipfn converged: convergence_rate below threshold') + print("ipfn converged: convergence_rate below threshold") elif not abs(conv - old_conv) > self.rate_tolerance: - print('ipfn converged: convergence_rate not updating or below rate_tolerance') + print("ipfn converged: convergence_rate not updating or below rate_tolerance") else: - print('Maximum iterations reached') + print("Maximum iterations reached") converged = 0 # Handle the verbose @@ -292,4 +291,4 @@ def iteration(self): elif self.verbose == 2: return m, converged, pd.DataFrame({'iteration': range(i), 'conv': conv_list}).set_index('iteration') else: - raise(ValueError(f'wrong verbose input, must be either 0, 1 or 2 but got {self.verbose}')) + raise ValueError(f"wrong verbose input, must be either 0, 1 or 2 but got {self.verbose}") From ae61437248b4fa7665589e8da09212943b32b251 Mon Sep 17 00:00:00 2001 From: Maarten Bosmans Date: Mon, 28 Nov 2022 20:29:28 +0100 Subject: [PATCH 2/6] Rework iteration loop The max_itr is now more explicit and the convergence criteria are only checked in one place instead of in the while condition and after the loop. The loop now actually stops after doing max_itr iterations, where previously it would do iterations numbered 0 to max_itr, i.e. one iteration too many. --- ipfn/ipfn.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/ipfn/ipfn.py b/ipfn/ipfn.py index 3d9e8b0..db4a19a 100755 --- a/ipfn/ipfn.py +++ b/ipfn/ipfn.py @@ -253,14 +253,12 @@ def iteration(self): """ Runs the ipfn algorithm. Automatically detects of working with numpy ndarray or pandas dataframes. """ - - i = 0 - conv = np.inf - old_conv = -np.inf + old_conv = np.inf conv_list = [] + converged = 1 m = self.original - # If the original data input is in pandas DataFrame format + # Prepare input data if isinstance(self.original, pd.DataFrame): ipfn_method = self.ipfn_df elif isinstance(self.original, np.ndarray): @@ -268,17 +266,19 @@ def iteration(self): self.original = self.original.astype('float64') else: raise ValueError(f"Data input instance not recognized. The input matrix is not a numpy array or pandas DataFrame") - while ((i <= self.max_itr and conv > self.conv_rate) and (i <= self.max_itr and abs(conv - old_conv) > self.rate_tolerance)): - old_conv = conv + + # Run iterations + for i in range(self.max_itr): m, conv = ipfn_method(m, self.aggregates, self.dimensions, self.weight_col) conv_list.append(conv) - i += 1 - converged = 1 - if i <= self.max_itr: - if (not conv > self.conv_rate) & (self.verbose > 1): - print("ipfn converged: convergence_rate below threshold") - elif not abs(conv - old_conv) > self.rate_tolerance: + if conv <= self.conv_rate: + if self.verbose > 1: + print("ipfn converged: convergence_rate below threshold") + break + if abs(conv - old_conv) <= self.rate_tolerance: print("ipfn converged: convergence_rate not updating or below rate_tolerance") + break + old_conv = conv else: print("Maximum iterations reached") converged = 0 @@ -289,6 +289,6 @@ def iteration(self): elif self.verbose == 1: return m, converged elif self.verbose == 2: - return m, converged, pd.DataFrame({'iteration': range(i), 'conv': conv_list}).set_index('iteration') + return m, converged, pd.DataFrame({'iteration': range(1, i+2), 'conv': conv_list}).set_index('iteration') else: raise ValueError(f"wrong verbose input, must be either 0, 1 or 2 but got {self.verbose}") From e5d316e65eb41cd2c32bf85f8e3c4f27768a417f Mon Sep 17 00:00:00 2001 From: Maarten Bosmans Date: Mon, 28 Nov 2022 20:45:11 +0100 Subject: [PATCH 3/6] Add performance test This will construct a large matrix that converges slowly. The matrix is 12000x12000 single precision numbers, in total 576 MB. --- tests/perftest.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 tests/perftest.py diff --git a/tests/perftest.py b/tests/perftest.py new file mode 100644 index 0000000..0551d77 --- /dev/null +++ b/tests/perftest.py @@ -0,0 +1,21 @@ +import numpy as np +from context import ipfn + +N = 12000 + +def print_error(m, xip, xpj): + colsum_err = np.sum((np.sum(m, 0) - xip.squeeze())**2) + rowsum_err = np.sum((np.sum(m, 1) - xpj.squeeze())**2) + print(f"Error: {colsum_err:8.3f} {rowsum_err:8.3f}") + +# Construct NxN matrix with ones on the main and upper diagonal +m = np.eye(N, dtype='float32') +m.flat[1:N*N:N+1] = 1 + +aggregates = [np.ones(N), np.ones(N)] +dimensions = [[0], [1]] + +print_error(m, *aggregates) +m, converged, history = ipfn.ipfn(m, aggregates, dimensions, max_iteration=10, verbose=2).iteration() +print_error(m, *aggregates) +print(history) From 7bab7492a85ae1ce0e0fa4a6c0b901a548bd325b Mon Sep 17 00:00:00 2001 From: Maarten Bosmans Date: Mon, 28 Nov 2022 20:55:20 +0100 Subject: [PATCH 4/6] Avoid unnecessary copies The self.original and self.aggregates are never modified but kept the same as the original user input. The `m` and `aggregates` variables are a copy of the user input and may be modified during the iterations. The `iteration` method is responsible for converting the user input to a numpy array of floats. The `ipfn_np` function only checks whether that condition holds but does no conversion. This allows the `ipfn_np` to be used directly for user that want absolute minimum memory usage without any extra copy of the input array. Instead of always converting to float64, the input arrays are now only checked if they are of floating point type. This means that a single precision float32 array will be kept that way, allowing for a smaller memory footprint, if the problem allows the lower precision. --- ipfn/ipfn.py | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/ipfn/ipfn.py b/ipfn/ipfn.py index db4a19a..55fe274 100755 --- a/ipfn/ipfn.py +++ b/ipfn/ipfn.py @@ -2,7 +2,16 @@ import numpy as np import pandas as pd from itertools import product -import copy + + +def asfloatarray(a, dtype='float64'): + """ Return `a` as ndarray of type `dtype`. + If a is already an ndarray of floating point type, a copy is returned. + This preserves single precision `a` and otherwise converts is to double precision. + """ + if isinstance(a, np.ndarray) and a.dtype.kind == 'f': + return a.copy() + return np.asarray(a, dtype=dtype) class ipfn(object): @@ -74,19 +83,12 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): """ # Check that the inputs are numpy arrays of floats - inc = 0 + assert isinstance(m, np.ndarray) + assert m.dtype.kind == 'f' for aggregate in aggregates: - if not isinstance(aggregate, np.ndarray): - aggregate = np.array(aggregate).astype(float) - aggregates[inc] = aggregate - elif aggregate.dtype not in [float, float]: - aggregate = aggregate.astype(float) - aggregates[inc] = aggregate - inc += 1 - if not isinstance(m, np.ndarray): - m = np.array(m) - elif m.dtype not in [float, float]: - m = m.astype(float) + assert isinstance(aggregate, np.ndarray) + assert aggregate.dtype.kind == 'f' + assert len(aggregates) == len(dimensions) steps = len(aggregates) dim = len(m.shape) @@ -96,7 +98,6 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): # and then update the table_current to the table_update to the latest we have. And create an empty zero dataframe for table_update (Evelyn) for inc in range(steps - 1): tables.append(np.array(np.zeros(m.shape))) - original = copy.copy(m) # Calculate the new weights for each dimension for inc in range(steps): @@ -185,7 +186,6 @@ def ipfn_df(self, df, aggregates, dimensions, weight_col='total'): tables = [df] for inc in range(steps - 1): tables.append(df.copy()) - original = df.copy() # Calculate the new weights for each dimension inc = 0 @@ -256,20 +256,22 @@ def iteration(self): old_conv = np.inf conv_list = [] converged = 1 - m = self.original # Prepare input data if isinstance(self.original, pd.DataFrame): ipfn_method = self.ipfn_df + m = self.original.copy() + aggregates = self.aggregates elif isinstance(self.original, np.ndarray): ipfn_method = self.ipfn_np - self.original = self.original.astype('float64') + m = asfloatarray(self.original) + aggregates = [asfloatarray(aggregate) for aggregate in self.aggregates] else: raise ValueError(f"Data input instance not recognized. The input matrix is not a numpy array or pandas DataFrame") # Run iterations for i in range(self.max_itr): - m, conv = ipfn_method(m, self.aggregates, self.dimensions, self.weight_col) + m, conv = ipfn_method(m, aggregates, self.dimensions, self.weight_col) conv_list.append(conv) if conv <= self.conv_rate: if self.verbose > 1: From 6cfbf08e389b527b11cfa22f36089a0e73e672c4 Mon Sep 17 00:00:00 2001 From: Maarten Bosmans Date: Mon, 28 Nov 2022 21:21:31 +0100 Subject: [PATCH 5/6] Don't store the matrix for every step but just alternate between two matrices This save some memory in case of more than two steps. --- ipfn/ipfn.py | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/ipfn/ipfn.py b/ipfn/ipfn.py index 55fe274..ca4538d 100755 --- a/ipfn/ipfn.py +++ b/ipfn/ipfn.py @@ -93,43 +93,32 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): steps = len(aggregates) dim = len(m.shape) product_elem = [] - tables = [m] - # TODO: do we need to persist all these dataframe? Or maybe we just need to persist the table_update and table_current - # and then update the table_current to the table_update to the latest we have. And create an empty zero dataframe for table_update (Evelyn) - for inc in range(steps - 1): - tables.append(np.array(np.zeros(m.shape))) + m_new = np.zeros_like(m) # Calculate the new weights for each dimension for inc in range(steps): - if inc == (steps - 1): - table_update = m - table_current = tables[inc].copy() - else: - table_update = tables[inc + 1] - table_current = tables[inc] for dimension in dimensions[inc]: product_elem.append(range(m.shape[dimension])) for item in product(*product_elem): idx = self.index_axis_elem(dim, dimensions[inc], item) - table_current_slice = table_current[idx] - mijk = table_current_slice.sum() + m_slice = m[idx] + mijk = m_slice.sum() # TODO: Directly put it as xijk = aggregates[inc][item] (Evelyn) xijk = aggregates[inc] xijk = xijk[item] if mijk == 0: - # table_current_slice += 1e-5 + # m_slice += 1e-5 # TODO: Basically, this part would remain 0 as always right? Cause if the sum of the slice is zero, then we only have zeros in this slice. - # TODO: you could put it as table_update[idx] = table_current_slice (since multiplication on zero is still zero) - table_update[idx] = table_current_slice + # TODO: you could put it as m_new[idx] = m_slice (since multiplication on zero is still zero) + m_new[idx] = m_slice else: - # TODO: when inc == steps - 1, this part is also directly updating the dataframe m (Evelyn) - # If we are not going to persist every table generated, we could still keep this part to directly update dataframe m - table_update[idx] = table_current_slice * 1.0 * xijk / mijk + m_new[idx] = m_slice * 1.0 * xijk / mijk # For debug purposes - # if np.isnan(table_update).any(): + # if np.isnan(m_new).any(): # print(idx) # sys.exit(0) product_elem = [] + m, m_new = m_new, m # Check the convergence rate for each dimension max_conv = 0 From 242143398705243cb860895c251889d75f4d0915 Mon Sep 17 00:00:00 2001 From: Maarten Bosmans Date: Mon, 28 Nov 2022 21:38:35 +0100 Subject: [PATCH 6/6] Simplify step loop structure Since the previous commit we don't need an integer loop index `inc` anymore, so simply using zip over `aggregates` and `dimensions` is more clear. --- ipfn/ipfn.py | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/ipfn/ipfn.py b/ipfn/ipfn.py index ca4538d..3bbf658 100755 --- a/ipfn/ipfn.py +++ b/ipfn/ipfn.py @@ -90,22 +90,17 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): assert aggregate.dtype.kind == 'f' assert len(aggregates) == len(dimensions) - steps = len(aggregates) dim = len(m.shape) - product_elem = [] m_new = np.zeros_like(m) # Calculate the new weights for each dimension - for inc in range(steps): - for dimension in dimensions[inc]: - product_elem.append(range(m.shape[dimension])) + for aggregate, dimension in zip(aggregates, dimensions): + product_elem = [range(m.shape[d]) for d in dimension] for item in product(*product_elem): - idx = self.index_axis_elem(dim, dimensions[inc], item) + idx = self.index_axis_elem(dim, dimension, item) m_slice = m[idx] mijk = m_slice.sum() - # TODO: Directly put it as xijk = aggregates[inc][item] (Evelyn) - xijk = aggregates[inc] - xijk = xijk[item] + xijk = aggregate[item] if mijk == 0: # m_slice += 1e-5 # TODO: Basically, this part would remain 0 as always right? Cause if the sum of the slice is zero, then we only have zeros in this slice. @@ -117,26 +112,21 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): # if np.isnan(m_new).any(): # print(idx) # sys.exit(0) - product_elem = [] m, m_new = m_new, m # Check the convergence rate for each dimension max_conv = 0 - for inc in range(steps): - # TODO: this part already generated before, we could somehow persist it. But it's not important (Evelyn) - for dimension in dimensions[inc]: - product_elem.append(range(m.shape[dimension])) + for aggregate, dimension in zip(aggregates, dimensions): + product_elem = [range(m.shape[d]) for d in dimension] for item in product(*product_elem): - idx = self.index_axis_elem(dim, dimensions[inc], item) - ori_ijk = aggregates[inc][item] + idx = self.index_axis_elem(dim, dimension, item) + ori_ijk = aggregate[item] m_slice = m[idx] m_ijk = m_slice.sum() # print("Current vs original", abs(m_ijk/ori_ijk - 1)) if abs(m_ijk / ori_ijk - 1) > max_conv: max_conv = abs(m_ijk / ori_ijk - 1) - product_elem = [] - return m, max_conv def ipfn_df(self, df, aggregates, dimensions, weight_col='total'):