diff --git a/ipfn/ipfn.py b/ipfn/ipfn.py index 6b2181b..3bbf658 100755 --- a/ipfn/ipfn.py +++ b/ipfn/ipfn.py @@ -1,9 +1,17 @@ -#!/usr/bin/env python from __future__ import print_function import numpy as np import pandas as pd from itertools import product -import copy + + +def asfloatarray(a, dtype='float64'): + """ Return `a` as ndarray of type `dtype`. + If a is already an ndarray of floating point type, a copy is returned. + This preserves single precision `a` and otherwise converts is to double precision. + """ + if isinstance(a, np.ndarray) and a.dtype.kind == 'f': + return a.copy() + return np.asarray(a, dtype=dtype) class ipfn(object): @@ -41,7 +49,7 @@ def __init__(self, original, aggregates, dimensions, weight_col='total', self.conv_rate = convergence_rate self.max_itr = max_iteration if verbose not in [0, 1, 2]: - raise(ValueError(f"wrong verbose input, must be either 0, 1 or 2 but got {verbose}")) + raise ValueError(f"wrong verbose input, must be either 0, 1 or 2 but got {verbose}") self.verbose = verbose self.rate_tolerance = rate_tolerance @@ -74,80 +82,51 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): m = IPF.iteration() """ - # Check that the inputs are numpay arrays of floats - inc = 0 + # Check that the inputs are numpy arrays of floats + assert isinstance(m, np.ndarray) + assert m.dtype.kind == 'f' for aggregate in aggregates: - if not isinstance(aggregate, np.ndarray): - aggregate = np.array(aggregate).astype(float) - aggregates[inc] = aggregate - elif aggregate.dtype not in [float, float]: - aggregate = aggregate.astype(float) - aggregates[inc] = aggregate - inc += 1 - if not isinstance(m, np.ndarray): - m = np.array(m) - elif m.dtype not in [float, float]: - m = m.astype(float) + assert isinstance(aggregate, np.ndarray) + assert aggregate.dtype.kind == 'f' + assert len(aggregates) == len(dimensions) - steps = len(aggregates) dim = len(m.shape) - product_elem = [] - tables = [m] - # TODO: do we need to persist all these dataframe? Or maybe we just need to persist the table_update and table_current - # and then update the table_current to the table_update to the latest we have. And create an empty zero dataframe for table_update (Evelyn) - for inc in range(steps - 1): - tables.append(np.array(np.zeros(m.shape))) - original = copy.copy(m) + m_new = np.zeros_like(m) # Calculate the new weights for each dimension - for inc in range(steps): - if inc == (steps - 1): - table_update = m - table_current = tables[inc].copy() - else: - table_update = tables[inc + 1] - table_current = tables[inc] - for dimension in dimensions[inc]: - product_elem.append(range(m.shape[dimension])) + for aggregate, dimension in zip(aggregates, dimensions): + product_elem = [range(m.shape[d]) for d in dimension] for item in product(*product_elem): - idx = self.index_axis_elem(dim, dimensions[inc], item) - table_current_slice = table_current[idx] - mijk = table_current_slice.sum() - # TODO: Directly put it as xijk = aggregates[inc][item] (Evelyn) - xijk = aggregates[inc] - xijk = xijk[item] + idx = self.index_axis_elem(dim, dimension, item) + m_slice = m[idx] + mijk = m_slice.sum() + xijk = aggregate[item] if mijk == 0: - # table_current_slice += 1e-5 + # m_slice += 1e-5 # TODO: Basically, this part would remain 0 as always right? Cause if the sum of the slice is zero, then we only have zeros in this slice. - # TODO: you could put it as table_update[idx] = table_current_slice (since multiplication on zero is still zero) - table_update[idx] = table_current_slice + # TODO: you could put it as m_new[idx] = m_slice (since multiplication on zero is still zero) + m_new[idx] = m_slice else: - # TODO: when inc == steps - 1, this part is also directly updating the dataframe m (Evelyn) - # If we are not going to persist every table generated, we could still keep this part to directly update dataframe m - table_update[idx] = table_current_slice * 1.0 * xijk / mijk + m_new[idx] = m_slice * 1.0 * xijk / mijk # For debug purposes - # if np.isnan(table_update).any(): + # if np.isnan(m_new).any(): # print(idx) # sys.exit(0) - product_elem = [] + m, m_new = m_new, m # Check the convergence rate for each dimension max_conv = 0 - for inc in range(steps): - # TODO: this part already generated before, we could somehow persist it. But it's not important (Evelyn) - for dimension in dimensions[inc]: - product_elem.append(range(m.shape[dimension])) + for aggregate, dimension in zip(aggregates, dimensions): + product_elem = [range(m.shape[d]) for d in dimension] for item in product(*product_elem): - idx = self.index_axis_elem(dim, dimensions[inc], item) - ori_ijk = aggregates[inc][item] + idx = self.index_axis_elem(dim, dimension, item) + ori_ijk = aggregate[item] m_slice = m[idx] m_ijk = m_slice.sum() - # print('Current vs original', abs(m_ijk/ori_ijk - 1)) + # print("Current vs original", abs(m_ijk/ori_ijk - 1)) if abs(m_ijk / ori_ijk - 1) > max_conv: max_conv = abs(m_ijk / ori_ijk - 1) - product_elem = [] - return m, max_conv def ipfn_df(self, df, aggregates, dimensions, weight_col='total'): @@ -186,7 +165,6 @@ def ipfn_df(self, df, aggregates, dimensions, weight_col='total'): tables = [df] for inc in range(steps - 1): tables.append(df.copy()) - original = df.copy() # Calculate the new weights for each dimension inc = 0 @@ -254,34 +232,36 @@ def iteration(self): """ Runs the ipfn algorithm. Automatically detects of working with numpy ndarray or pandas dataframes. """ - - i = 0 - conv = np.inf - old_conv = -np.inf + old_conv = np.inf conv_list = [] - m = self.original + converged = 1 - # If the original data input is in pandas DataFrame format + # Prepare input data if isinstance(self.original, pd.DataFrame): ipfn_method = self.ipfn_df + m = self.original.copy() + aggregates = self.aggregates elif isinstance(self.original, np.ndarray): ipfn_method = self.ipfn_np - self.original = self.original.astype('float64') + m = asfloatarray(self.original) + aggregates = [asfloatarray(aggregate) for aggregate in self.aggregates] else: - raise(ValueError(f'Data input instance not recognized. The input matrix is not a numpy array or pandas DataFrame')) - while ((i <= self.max_itr and conv > self.conv_rate) and (i <= self.max_itr and abs(conv - old_conv) > self.rate_tolerance)): - old_conv = conv - m, conv = ipfn_method(m, self.aggregates, self.dimensions, self.weight_col) + raise ValueError(f"Data input instance not recognized. The input matrix is not a numpy array or pandas DataFrame") + + # Run iterations + for i in range(self.max_itr): + m, conv = ipfn_method(m, aggregates, self.dimensions, self.weight_col) conv_list.append(conv) - i += 1 - converged = 1 - if i <= self.max_itr: - if (not conv > self.conv_rate) & (self.verbose > 1): - print('ipfn converged: convergence_rate below threshold') - elif not abs(conv - old_conv) > self.rate_tolerance: - print('ipfn converged: convergence_rate not updating or below rate_tolerance') + if conv <= self.conv_rate: + if self.verbose > 1: + print("ipfn converged: convergence_rate below threshold") + break + if abs(conv - old_conv) <= self.rate_tolerance: + print("ipfn converged: convergence_rate not updating or below rate_tolerance") + break + old_conv = conv else: - print('Maximum iterations reached') + print("Maximum iterations reached") converged = 0 # Handle the verbose @@ -290,6 +270,6 @@ def iteration(self): elif self.verbose == 1: return m, converged elif self.verbose == 2: - return m, converged, pd.DataFrame({'iteration': range(i), 'conv': conv_list}).set_index('iteration') + return m, converged, pd.DataFrame({'iteration': range(1, i+2), 'conv': conv_list}).set_index('iteration') else: - raise(ValueError(f'wrong verbose input, must be either 0, 1 or 2 but got {self.verbose}')) + raise ValueError(f"wrong verbose input, must be either 0, 1 or 2 but got {self.verbose}") diff --git a/tests/perftest.py b/tests/perftest.py new file mode 100644 index 0000000..0551d77 --- /dev/null +++ b/tests/perftest.py @@ -0,0 +1,21 @@ +import numpy as np +from context import ipfn + +N = 12000 + +def print_error(m, xip, xpj): + colsum_err = np.sum((np.sum(m, 0) - xip.squeeze())**2) + rowsum_err = np.sum((np.sum(m, 1) - xpj.squeeze())**2) + print(f"Error: {colsum_err:8.3f} {rowsum_err:8.3f}") + +# Construct NxN matrix with ones on the main and upper diagonal +m = np.eye(N, dtype='float32') +m.flat[1:N*N:N+1] = 1 + +aggregates = [np.ones(N), np.ones(N)] +dimensions = [[0], [1]] + +print_error(m, *aggregates) +m, converged, history = ipfn.ipfn(m, aggregates, dimensions, max_iteration=10, verbose=2).iteration() +print_error(m, *aggregates) +print(history)