Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 58 additions & 78 deletions ipfn/ipfn.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
#!/usr/bin/env python
from __future__ import print_function
import numpy as np
import pandas as pd
from itertools import product
import copy


def asfloatarray(a, dtype='float64'):
""" Return `a` as ndarray of type `dtype`.
If a is already an ndarray of floating point type, a copy is returned.
This preserves single precision `a` and otherwise converts is to double precision.
"""
if isinstance(a, np.ndarray) and a.dtype.kind == 'f':
return a.copy()
return np.asarray(a, dtype=dtype)


class ipfn(object):
Expand Down Expand Up @@ -41,7 +49,7 @@ def __init__(self, original, aggregates, dimensions, weight_col='total',
self.conv_rate = convergence_rate
self.max_itr = max_iteration
if verbose not in [0, 1, 2]:
raise(ValueError(f"wrong verbose input, must be either 0, 1 or 2 but got {verbose}"))
raise ValueError(f"wrong verbose input, must be either 0, 1 or 2 but got {verbose}")
self.verbose = verbose
self.rate_tolerance = rate_tolerance

Expand Down Expand Up @@ -74,80 +82,51 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'):
m = IPF.iteration()
"""

# Check that the inputs are numpay arrays of floats
inc = 0
# Check that the inputs are numpy arrays of floats
assert isinstance(m, np.ndarray)
assert m.dtype.kind == 'f'
for aggregate in aggregates:
if not isinstance(aggregate, np.ndarray):
aggregate = np.array(aggregate).astype(float)
aggregates[inc] = aggregate
elif aggregate.dtype not in [float, float]:
aggregate = aggregate.astype(float)
aggregates[inc] = aggregate
inc += 1
if not isinstance(m, np.ndarray):
m = np.array(m)
elif m.dtype not in [float, float]:
m = m.astype(float)
assert isinstance(aggregate, np.ndarray)
assert aggregate.dtype.kind == 'f'
assert len(aggregates) == len(dimensions)

steps = len(aggregates)
dim = len(m.shape)
product_elem = []
tables = [m]
# TODO: do we need to persist all these dataframe? Or maybe we just need to persist the table_update and table_current
# and then update the table_current to the table_update to the latest we have. And create an empty zero dataframe for table_update (Evelyn)
for inc in range(steps - 1):
tables.append(np.array(np.zeros(m.shape)))
original = copy.copy(m)
m_new = np.zeros_like(m)

# Calculate the new weights for each dimension
for inc in range(steps):
if inc == (steps - 1):
table_update = m
table_current = tables[inc].copy()
else:
table_update = tables[inc + 1]
table_current = tables[inc]
for dimension in dimensions[inc]:
product_elem.append(range(m.shape[dimension]))
for aggregate, dimension in zip(aggregates, dimensions):
product_elem = [range(m.shape[d]) for d in dimension]
for item in product(*product_elem):
idx = self.index_axis_elem(dim, dimensions[inc], item)
table_current_slice = table_current[idx]
mijk = table_current_slice.sum()
# TODO: Directly put it as xijk = aggregates[inc][item] (Evelyn)
xijk = aggregates[inc]
xijk = xijk[item]
idx = self.index_axis_elem(dim, dimension, item)
m_slice = m[idx]
mijk = m_slice.sum()
xijk = aggregate[item]
if mijk == 0:
# table_current_slice += 1e-5
# m_slice += 1e-5
# TODO: Basically, this part would remain 0 as always right? Cause if the sum of the slice is zero, then we only have zeros in this slice.
# TODO: you could put it as table_update[idx] = table_current_slice (since multiplication on zero is still zero)
table_update[idx] = table_current_slice
# TODO: you could put it as m_new[idx] = m_slice (since multiplication on zero is still zero)
m_new[idx] = m_slice
else:
# TODO: when inc == steps - 1, this part is also directly updating the dataframe m (Evelyn)
# If we are not going to persist every table generated, we could still keep this part to directly update dataframe m
table_update[idx] = table_current_slice * 1.0 * xijk / mijk
m_new[idx] = m_slice * 1.0 * xijk / mijk
# For debug purposes
# if np.isnan(table_update).any():
# if np.isnan(m_new).any():
# print(idx)
# sys.exit(0)
product_elem = []
m, m_new = m_new, m

# Check the convergence rate for each dimension
max_conv = 0
for inc in range(steps):
# TODO: this part already generated before, we could somehow persist it. But it's not important (Evelyn)
for dimension in dimensions[inc]:
product_elem.append(range(m.shape[dimension]))
for aggregate, dimension in zip(aggregates, dimensions):
product_elem = [range(m.shape[d]) for d in dimension]
for item in product(*product_elem):
idx = self.index_axis_elem(dim, dimensions[inc], item)
ori_ijk = aggregates[inc][item]
idx = self.index_axis_elem(dim, dimension, item)
ori_ijk = aggregate[item]
m_slice = m[idx]
m_ijk = m_slice.sum()
# print('Current vs original', abs(m_ijk/ori_ijk - 1))
# print("Current vs original", abs(m_ijk/ori_ijk - 1))
if abs(m_ijk / ori_ijk - 1) > max_conv:
max_conv = abs(m_ijk / ori_ijk - 1)

product_elem = []

return m, max_conv

def ipfn_df(self, df, aggregates, dimensions, weight_col='total'):
Expand Down Expand Up @@ -186,7 +165,6 @@ def ipfn_df(self, df, aggregates, dimensions, weight_col='total'):
tables = [df]
for inc in range(steps - 1):
tables.append(df.copy())
original = df.copy()

# Calculate the new weights for each dimension
inc = 0
Expand Down Expand Up @@ -254,34 +232,36 @@ def iteration(self):
"""
Runs the ipfn algorithm. Automatically detects of working with numpy ndarray or pandas dataframes.
"""

i = 0
conv = np.inf
old_conv = -np.inf
old_conv = np.inf
conv_list = []
m = self.original
converged = 1

# If the original data input is in pandas DataFrame format
# Prepare input data
if isinstance(self.original, pd.DataFrame):
ipfn_method = self.ipfn_df
m = self.original.copy()
aggregates = self.aggregates
elif isinstance(self.original, np.ndarray):
ipfn_method = self.ipfn_np
self.original = self.original.astype('float64')
m = asfloatarray(self.original)
aggregates = [asfloatarray(aggregate) for aggregate in self.aggregates]
else:
raise(ValueError(f'Data input instance not recognized. The input matrix is not a numpy array or pandas DataFrame'))
while ((i <= self.max_itr and conv > self.conv_rate) and (i <= self.max_itr and abs(conv - old_conv) > self.rate_tolerance)):
old_conv = conv
m, conv = ipfn_method(m, self.aggregates, self.dimensions, self.weight_col)
raise ValueError(f"Data input instance not recognized. The input matrix is not a numpy array or pandas DataFrame")

# Run iterations
for i in range(self.max_itr):
m, conv = ipfn_method(m, aggregates, self.dimensions, self.weight_col)
conv_list.append(conv)
i += 1
converged = 1
if i <= self.max_itr:
if (not conv > self.conv_rate) & (self.verbose > 1):
print('ipfn converged: convergence_rate below threshold')
elif not abs(conv - old_conv) > self.rate_tolerance:
print('ipfn converged: convergence_rate not updating or below rate_tolerance')
if conv <= self.conv_rate:
if self.verbose > 1:
print("ipfn converged: convergence_rate below threshold")
break
if abs(conv - old_conv) <= self.rate_tolerance:
print("ipfn converged: convergence_rate not updating or below rate_tolerance")
break
old_conv = conv
else:
print('Maximum iterations reached')
print("Maximum iterations reached")
converged = 0

# Handle the verbose
Expand All @@ -290,6 +270,6 @@ def iteration(self):
elif self.verbose == 1:
return m, converged
elif self.verbose == 2:
return m, converged, pd.DataFrame({'iteration': range(i), 'conv': conv_list}).set_index('iteration')
return m, converged, pd.DataFrame({'iteration': range(1, i+2), 'conv': conv_list}).set_index('iteration')
else:
raise(ValueError(f'wrong verbose input, must be either 0, 1 or 2 but got {self.verbose}'))
raise ValueError(f"wrong verbose input, must be either 0, 1 or 2 but got {self.verbose}")
21 changes: 21 additions & 0 deletions tests/perftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import numpy as np
from context import ipfn

N = 12000

def print_error(m, xip, xpj):
colsum_err = np.sum((np.sum(m, 0) - xip.squeeze())**2)
rowsum_err = np.sum((np.sum(m, 1) - xpj.squeeze())**2)
print(f"Error: {colsum_err:8.3f} {rowsum_err:8.3f}")

# Construct NxN matrix with ones on the main and upper diagonal
m = np.eye(N, dtype='float32')
m.flat[1:N*N:N+1] = 1

aggregates = [np.ones(N), np.ones(N)]
dimensions = [[0], [1]]

print_error(m, *aggregates)
m, converged, history = ipfn.ipfn(m, aggregates, dimensions, max_iteration=10, verbose=2).iteration()
print_error(m, *aggregates)
print(history)