diff --git a/simpleTest/exampleAutoapproximate_with_cv.py b/simpleTest/exampleAutoapproximate_with_cv.py index 23c2753..af1ded7 100644 --- a/simpleTest/exampleAutoapproximate_with_cv.py +++ b/simpleTest/exampleAutoapproximate_with_cv.py @@ -3,15 +3,13 @@ # Example for approximating an periodic function import math +import os +import sys import matplotlib.pyplot as plt import numpy as np from TestFunctionPeriodic import * - -import os -import sys - src_aa = os.path.abspath(os.path.join(os.getcwd(), "src")) sys.path.insert(0, src_aa) diff --git a/simpleTest/exampleMixed.py b/simpleTest/exampleMixed.py index b201a08..32aab52 100644 --- a/simpleTest/exampleMixed.py +++ b/simpleTest/exampleMixed.py @@ -38,9 +38,9 @@ def TestFunction(x): d = 4 # dimension basis_vect = ["exp", "alg", "cos", "alg"] # basis per dimension -M = 10000 # number of training points +M = 10000 # number of training points M_test = 100000 # number of test points -max_iter = 50 # maximum number of LSQR iterations +max_iter = 50 # maximum number of LSQR iterations # there are 3 possibilities with varying degree of freedom to define the number of used frequencies ########### Variant 1: @@ -98,10 +98,10 @@ def cheb_nodes(rng, size): return (1.0 - np.cos(np.pi * rng.random(size))) / 2.0 -X = rng.random((M, d)) # initially all columns uniform in [0, 1] -X[:, 0] -= 0.5 # x₁ (exp): shift to [-0.5, 0.5) -X[:, 1] = cheb_nodes(rng, M) # x₂ (alg): Chebyshev-weight on [0, 1] -X[:, 3] = cheb_nodes(rng, M) # x₄ (alg): Chebyshev-weight on [0, 1] +X = rng.random((M, d)) # initially all columns uniform in [0, 1] +X[:, 0] -= 0.5 # x₁ (exp): shift to [-0.5, 0.5) +X[:, 1] = cheb_nodes(rng, M) # x₂ (alg): Chebyshev-weight on [0, 1] +X[:, 3] = cheb_nodes(rng, M) # x₄ (alg): Chebyshev-weight on [0, 1] y = np.array( [TestFunction(X[i, :]) for i in range(M)], dtype=complex ) # function values (complex because the mixed basis with exp uses complex coefficients) @@ -110,9 +110,7 @@ def cheb_nodes(rng, size): X_test[:, 0] -= 0.5 X_test[:, 1] = cheb_nodes(rng, M_test) X_test[:, 3] = cheb_nodes(rng, M_test) -y_test = np.array( - [TestFunction(X_test[i, :]) for i in range(M_test)], dtype=complex -) +y_test = np.array([TestFunction(X_test[i, :]) for i in range(M_test)], dtype=complex) ########################## ## Do the approximation ## diff --git a/src/pyANOVAapprox/__init__.py b/src/pyANOVAapprox/__init__.py index c41618d..f286d38 100644 --- a/src/pyANOVAapprox/__init__.py +++ b/src/pyANOVAapprox/__init__.py @@ -1,4 +1,6 @@ +import csv import math +import os import threading from math import acos, isnan @@ -8,8 +10,6 @@ from scipy.optimize import bisect from scipy.sparse.linalg import lsqr from scipy.special import erf -import csv -import os # from sklearn.metrics import roc_auc_score diff --git a/src/pyANOVAapprox/analysis.py b/src/pyANOVAapprox/analysis.py index c72d17a..aedd105 100644 --- a/src/pyANOVAapprox/analysis.py +++ b/src/pyANOVAapprox/analysis.py @@ -37,10 +37,7 @@ def get_variances(self, settingnr=None, lam=None, Dict=False): elif ( lam == None ): # get_variances( a::approx; dict::Bool = false )::Dict{Float64,Union{Vector{Float64},Dict{Vector{Int},Float64}}} - return { - l: self._variances(settingnr, l, Dict) - for l in self.lam.keys() - } + return {l: self._variances(settingnr, l, Dict) for l in self.lam.keys()} def _GSI(self, settingnr, lam, Dict): # helpfunction for get_GSI @@ -78,9 +75,7 @@ def get_GSI(self, settingnr=None, lam=None, Dict=False): ): # get_GSI(a::approx, λ::Float64; dict::Bool = false,)::Union{Vector{Float64},Dict{Vector{Int},Float64}} return self._GSI(settingnr, lam, Dict) else: # get_GSI( a::approx; dict::Bool = false )::Dict{Float64,Union{Vector{Float64},Dict{Vector{Int},Float64}}} - return { - l: self._GSI(settingnr, l, Dict) for l in self.lam.keys() - } + return {l: self._GSI(settingnr, l, Dict) for l in self.lam.keys()} def _AttributeRanking(self, settingnr, lam): # helpfunction for get_AttributeRanking @@ -120,10 +115,7 @@ def get_AttributeRanking(self, settingnr=None, lam=None): if ( lam is None ): # get_AttributeRanking( a::approx, λ::Float64 )::Dict{Float64,Vector{Float64}} - return { - l: self._AttributeRanking(settingnr, l) - for l in self.lam.keys() - } + return {l: self._AttributeRanking(settingnr, l) for l in self.lam.keys()} else: # get_AttributeRanking( a::approx, λ::Float64 )::Vector{Float64} return self._AttributeRanking(settingnr, lam) @@ -162,10 +154,7 @@ def get_ActiveSet(self, eps, settingnr=None, lam=None): if ( lam is None ): # get_ActiveSet(a::approx, eps::Vector{Float64})::Dict{Float64,Vector{Vector{Int}}} - return { - l: self._ActiveSet(eps, settingnr, l) - for l in self.lam.keys() - } + return {l: self._ActiveSet(eps, settingnr, l) for l in self.lam.keys()} else: # get_ActiveSet(a::approx, eps::Vector{Float64}, λ::Float64)::Vector{Vector{Int}} return self._ActiveSet(eps, settingnr, lam) @@ -190,9 +179,7 @@ def get_ShapleyValues(self, settingnr=None, lam=None): This function returns the Shapley values of the approximation for all reg. parameters ``\lambda``, if lam == None, as a dictionary of vectors of length `a.d`. Otherwise for the provided lam as a vector of length `a.d`. """ if lam is None: # get_ShapleyValues(a::approx)::Dict{Float64,Vector{Float64}} - return { - l: self._ShapleyValues(settingnr, l) for l in self.lam.keys() - } + return {l: self._ShapleyValues(settingnr, l) for l in self.lam.keys()} else: # get_ShapleyValues(a::approx, λ::Float64)::Vector{Float64} return self._ShapleyValues(settingnr, lam) diff --git a/src/pyANOVAapprox/approx.py b/src/pyANOVAapprox/approx.py index 616bb06..29b2895 100644 --- a/src/pyANOVAapprox/approx.py +++ b/src/pyANOVAapprox/approx.py @@ -1,8 +1,8 @@ import copy -import warnings -import os import csv import math +import os +import warnings from bisect import bisect import pyANOVAapprox.analysis as ANOVAanalysis @@ -176,7 +176,9 @@ def __init__( if basis == "mixed": if len(basis_vect) == 0: - raise ValueError("please call approx with basis_vect for a NFMT transform.") + raise ValueError( + "please call approx with basis_vect for a NFMT transform." + ) if len(basis_vect) < self.parent.X.shape[1]: raise ValueError("basis_vect must have an entry for every dimension.") @@ -225,15 +227,14 @@ def __init__( ds=None, lam={0.0}, ): - + if type(lam) is np.ndarray: lam = {float(i) for i in lam} if type(lam) is list: lam = set(lam) - self.X = X - + if X.shape[0] != y.shape[0]: raise ValueError("X and y have different lengths.") @@ -271,7 +272,7 @@ def addSetting(self, setting): self.trafo.append(None) self.fc.append({}) - def getSettingNr(self, settingnr=None, lam = None, warn = True): + def getSettingNr(self, settingnr=None, lam=None, warn=True): if lam is None: return self.aktsetting if settingnr is None else settingnr else: @@ -281,13 +282,13 @@ def getSettingNr(self, settingnr=None, lam = None, warn = True): warnings.warn("Not yet approximated for this lambda") return self.aktsetting if settingnr is None else settingnr - def getSetting(self, settingnr=None, lam = None): + def getSetting(self, settingnr=None, lam=None): return self.setting[self.getSettingNr(settingnr, lam)] - def getTrafo(self, settingnr=None, lam = None): + def getTrafo(self, settingnr=None, lam=None): return self.trafo[self.getSettingNr(settingnr, lam)] - def getFc(self, settingnr=None, lam = None): + def getFc(self, settingnr=None, lam=None): return self.fc[self.getSettingNr(settingnr, lam)] def addTrafo(self, settingnr=None): @@ -401,11 +402,11 @@ def approximate( If lam is a np.ndarray of dtype float, this function computes the approximation for the regularization parameters contained in lam. If lam is a float, this function computes the approximation for the regularization parameter lam. """ - + setting = self.getSetting(settingnr) - + if lam is None: - lam = setting.lam + lam = setting.lam else: if isinstance(lam, np.ndarray): lam = {float(l) for l in lam} @@ -461,7 +462,6 @@ def _cv_for_budget(B_curr): settingnr_temp = self.aktsetting self.addTrafo(settingnr_temp) - self._approximate( lam, settingnr=settingnr_temp, @@ -473,8 +473,16 @@ def _cv_for_budget(B_curr): ) M = self.X.shape[0] - nfreqs = np.sum([np.prod(np.array(bw[u])-1) for u in temp_setting.U])+1 - return 1/M * np.linalg.norm(self.evaluate(settingnr=settingnr_temp, lam=lam) - self.y)**2/(1-nfreqs/M)**2 + nfreqs = np.sum([np.prod(np.array(bw[u]) - 1) for u in temp_setting.U]) + 1 + return ( + 1 + / M + * np.linalg.norm( + self.evaluate(settingnr=settingnr_temp, lam=lam) - self.y + ) + ** 2 + / (1 - nfreqs / M) ** 2 + ) B_min = float(max(sum(5 ** len(u) for u in setting.U), 1)) B_max = float(max(B_min + 1.0, len(self.y) / 3.0)) @@ -533,21 +541,23 @@ def _autoapproximate( D = dict([(u, tuple([1.0] * len(u))) for u in setting.U]) t = dict([(u, tuple([1.0] * len(u))) for u in setting.U]) - - if verbosity>3: + + if verbosity > 3: if not os.path.exists("log"): os.mkdir("log") - with open('log/log.csv', 'w', newline='') as csvfile: - csv.writer(csvfile, delimiter=',').writerow(["it"] + setting.U) + with open("log/log.csv", "w", newline="") as csvfile: + csv.writer(csvfile, delimiter=",").writerow(["it"] + setting.U) for idx in range(maxiter): if verbosity > 0: print("===== Iteration ", str(idx + 1), " =====") - + # Optional: Use cross-validation to find optimal B if cross_validation: if verbosity > 0: - print("Running binary search for optimal bandwidth via cross-validation...") + print( + "Running binary search for optimal bandwidth via cross-validation..." + ) B, cv_val = self._compute_cv_error( D=D, @@ -565,7 +575,7 @@ def _autoapproximate( ) if verbosity > 0: print(f"Optimal B from CV: {B:.1f} (cv={cv_val:.6e})") - + bw = compute_bandwidth(B, D, t) if setting.N is not None: self.addSetting(setting) @@ -579,11 +589,13 @@ def _autoapproximate( if verbosity > 0: for i in setting.U: print("bw in", str(i), ":", bw[i]) - #print("bw in iteration", str(idx + 1), "are", str(bw)) + # print("bw in iteration", str(idx + 1), "are", str(bw)) print() if verbosity > 3: - with open('log/log.csv', 'a', newline='') as csvfile: - csv.writer(csvfile, delimiter=',').writerow(["bw in it"+str(idx+1)] + [str(bw[i]) for i in setting.U]) + with open("log/log.csv", "a", newline="") as csvfile: + csv.writer(csvfile, delimiter=",").writerow( + ["bw in it" + str(idx + 1)] + [str(bw[i]) for i in setting.U] + ) self.approximate( lam=lam, solver=solver, @@ -596,22 +608,33 @@ def _autoapproximate( D, t = self.estimate_rates(lam=lam, verbosity=verbosity) if verbosity > 1: for i in setting.U: - print("estimated rates for", str(i), ": D = ", str(D[i]), "and t = ", str(t[i]) ) + print( + "estimated rates for", + str(i), + ": D = ", + str(D[i]), + "and t = ", + str(t[i]), + ) print() - - #print( + + # print( # "estimated rates in iteration", # str(idx + 1), # "are D =", # str(D), # "and t =", # str(t), - #) + # ) if verbosity > 3: - with open('log/log.csv', 'a', newline='') as csvfile: - wr = csv.writer(csvfile, delimiter=',') - wr.writerow(["D in it"+str(idx+1)] + [str(D[i]) for i in setting.U]) - wr.writerow(["t in it"+str(idx+1)] + [str(t[i]) for i in setting.U]) + with open("log/log.csv", "a", newline="") as csvfile: + wr = csv.writer(csvfile, delimiter=",") + wr.writerow( + ["D in it" + str(idx + 1)] + [str(D[i]) for i in setting.U] + ) + wr.writerow( + ["t in it" + str(idx + 1)] + [str(t[i]) for i in setting.U] + ) return settingnrs def autoapproximate( @@ -632,7 +655,7 @@ def autoapproximate( ): """ Automatic approximation with optional cross-validation for bandwidth selection. - + Parameters: ----------- use_cross_validation : bool, optional @@ -673,7 +696,6 @@ def autoapproximate( ) self.lam[l] = self.lam[l] + settingnrs - def evaluate(self, settingnr=None, lam=None, X=None): """ This function evaluates the approximation with optional node matrix X and regularization lam. @@ -783,7 +805,9 @@ def evaluateSHAPterms(self, settingnr=None, X=None, lam=None): for l in self.lam.keys(): terms = self.evaluateANOVAterms(settingnr=settingnr, X=X, lam=l) - Dtype = np.complex128 if setting.basis in {"per", "mixed"} else np.float64 + Dtype = ( + np.complex128 if setting.basis in {"per", "mixed"} else np.float64 + ) values = np.zeros((M, d), dtype=Dtype) for i in range(d): diff --git a/src/pyANOVAapprox/bandwidth.py b/src/pyANOVAapprox/bandwidth.py index e9fe215..ff0b6f4 100644 --- a/src/pyANOVAapprox/bandwidth.py +++ b/src/pyANOVAapprox/bandwidth.py @@ -5,11 +5,11 @@ def getfcu(ghat, u): idx = [s.u for s in ghat.settings].index(u) bws = ghat.settings[idx].bandwidths - #fcu = ghat[u].reshape(bws[::-1] - 1) - #fcu = np.permute_dims(fcu, range(len(bws))[::-1]) + # fcu = ghat[u].reshape(bws[::-1] - 1) + # fcu = np.permute_dims(fcu, range(len(bws))[::-1]) fcu = ghat[u].reshape(bws - 1) fcu = np.permute_dims(fcu, range(len(bws))) - + return fcu @@ -37,7 +37,7 @@ def compute_bandwidth(B, D, t): us = set(D.keys()) - {()} bw = {u: [6] * len(u) for u in us} bw[()] = [] - + minfreqs = sum(math.prod((bw[u][j] - 1) for j in range(len(u))) for u in us) if B < minfreqs: raise ValueError(f"Budget too small: {B} < {minfreqs}") @@ -123,19 +123,19 @@ def estimate_rates(self, lam, settingnr=None, verbosity=0): mcl = np.exp(most_common_value(np.log(abs(self.getFc(settingnr)[lam].data)))) threshold = 100 * mcl**2 - - if verbosity>5: - if not os.path.exists(os.path.join("log","figures")): - os.mkdir(os.path.join("log","figures")) + + if verbosity > 5: + if not os.path.exists(os.path.join("log", "figures")): + os.mkdir(os.path.join("log", "figures")) num = 0 - + system = self.getTrafo(settingnr).system for u in us: if len(u) == 0: continue - if verbosity>5: + if verbosity > 5: fig, ax = plt.subplots() ax.set_xscale("log") ax.set_yscale("log") @@ -154,7 +154,7 @@ def estimate_rates(self, lam, settingnr=None, verbosity=0): None, ) - if verbosity>5: + if verbosity > 5: y = np.cumsum(axissum[::-1])[::-1] ax.plot( @@ -174,7 +174,7 @@ def estimate_rates(self, lam, settingnr=None, verbosity=0): D[u][j] = Duj t[u][j] = -tuj / 2 - if verbosity>5: + if verbosity > 5: x = np.arange(1, idx + 1) ax.plot( x, @@ -182,12 +182,18 @@ def estimate_rates(self, lam, settingnr=None, verbosity=0): linewidth=2, # color=j ) - if verbosity>5: - #plt.figure(figsize=(12, 9)) - #for ax in ps: + if verbosity > 5: + # plt.figure(figsize=(12, 9)) + # for ax in ps: # plt.sca(ax) - fig.savefig(os.path.join("log","figures", str(num).strip()+ "_rates_" + str(u).strip() + ".png")) + fig.savefig( + os.path.join( + "log", + "figures", + str(num).strip() + "_rates_" + str(u).strip() + ".png", + ) + ) num = num + 1 - plt.close(fig) + plt.close(fig) return D, t diff --git a/src/pyANOVAapprox/errors.py b/src/pyANOVAapprox/errors.py index e484203..57eedbf 100644 --- a/src/pyANOVAapprox/errors.py +++ b/src/pyANOVAapprox/errors.py @@ -37,10 +37,7 @@ def get_l2_error(self, settingnr=None, X=None, y=None, lam=None): if lam is not None: return self._l2_error(settingnr, lam, X, y) else: - return { - l: self._l2_error(settingnr, l, X, y) - for l in self.lam.keys() - } + return {l: self._l2_error(settingnr, l, X, y) for l in self.lam.keys()} def _mse(self, settingnr, lam, X, y): # helpfunction for get_mse @@ -77,9 +74,7 @@ def get_mse(self, settingnr=None, X=None, y=None, lam=None): if lam is not None: return self._mse(settingnr, lam, X, y) else: - return { - l: self._mse(settingnr, l, X, y) for l in self.lam.keys() - } + return {l: self._mse(settingnr, l, X, y) for l in self.lam.keys()} def _mad(self, settingnr, lam, X, y): # helpfunction for get_mad @@ -116,16 +111,16 @@ def get_mad(self, settingnr=None, X=None, y=None, lam=None): if lam is not None: return self._mad(settingnr, lam, X, y) else: - return { - l: self._mad(settingnr, l, X, y) for l in self.lam.keys() - } + return {l: self._mad(settingnr, l, X, y) for l in self.lam.keys()} def _L2_error(self, norm, bc_fun, settingnr, lam): # print(settingnr) if self.getSetting(settingnr, lam).basis in {"per", "cos", "cheb", "std", "mixed"}: error = norm**2 - index_set = gt.get_IndexSet(self.getTrafo(settingnr, lam).settings, self.X.shape[1]) + index_set = gt.get_IndexSet( + self.getTrafo(settingnr, lam).settings, self.X.shape[1] + ) for i in range(index_set.shape[1]): k = index_set[:, i] @@ -153,10 +148,7 @@ def get_L2_error(self, norm, bc_fun, settingnr=None, lam=None): if lam is not None: return self._L2_error(norm, bc_fun, settingnr, lam) else: - return { - l: self._L2_error(norm, bc_fun, settingnr, l) - for l in self.lam.keys() - } + return {l: self._L2_error(norm, bc_fun, settingnr, l) for l in self.lam.keys()} def _acc(self, settingnr, lam, X, y): # helpfunction for get_acc @@ -176,9 +168,7 @@ def get_acc(self, settingnr=None, X=None, y=None, lam=None): if lam is not None: return self._acc(settingnr, lam, X, y) else: - return { - l: self._acc(settingnr, l, X, y) for l in self.lam.keys() - } + return {l: self._acc(settingnr, l, X, y) for l in self.lam.keys()} def auc_score(y_true, y_pred_proba): @@ -240,9 +230,7 @@ def get_auc(self, settingnr=None, X=None, y=None, lam=None): if lam is not None: return self._auc(settingnr, lam, X, y) else: - return { - l: self._auc(settingnr, l, X, y) for l in self.lam.keys() - } + return {l: self._auc(settingnr, l, X, y) for l in self.lam.keys()} __all__ = [