diff --git a/simpleTest/exampleAutoapproximate_with_cv.py b/simpleTest/exampleAutoapproximate_with_cv.py index 23c2753..af1ded7 100644 --- a/simpleTest/exampleAutoapproximate_with_cv.py +++ b/simpleTest/exampleAutoapproximate_with_cv.py @@ -3,15 +3,13 @@ # Example for approximating an periodic function import math +import os +import sys import matplotlib.pyplot as plt import numpy as np from TestFunctionPeriodic import * - -import os -import sys - src_aa = os.path.abspath(os.path.join(os.getcwd(), "src")) sys.path.insert(0, src_aa) diff --git a/simpleTest/exampleMixed.py b/simpleTest/exampleMixed.py new file mode 100644 index 0000000..32aab52 --- /dev/null +++ b/simpleTest/exampleMixed.py @@ -0,0 +1,149 @@ +# pip install pyANOVAapprox + +# Example for approximating a function with a mixed basis (exp, alg, cos, alg) +# +# Test function (4-dimensional): +# +# f(x) = exp(sin(2π x₁) · x₂) + cos(π x₃) · x₄² +# + 1/10 · sin²(2π x₁) + 5 · √(x₂ x₄ + 1) +# +# Basis assignment per dimension: +# x₁ → "exp" (exponential), x₁ ∈ [-0.5, 0.5) +# x₂ → "alg" (Chebyshev / algebraic), x₂ ∈ [0, 1] +# x₃ → "cos" (cosine), x₃ ∈ [0, 1] +# x₄ → "alg" (Chebyshev / algebraic), x₄ ∈ [0, 1] + +import math + +import numpy as np + +import pyANOVAapprox as ANOVAapprox + + +def TestFunction(x): + return ( + np.exp(np.sin(2 * np.pi * x[0]) * x[1]) + + np.cos(np.pi * x[2]) * x[3] ** 2 + + 0.1 * np.sin(2 * np.pi * x[0]) ** 2 + + 5 * np.sqrt(x[1] * x[3] + 1) + ) + + +rng = np.random.default_rng(1234) + +################################## +## Definition of the parameters ## +################################## + +d = 4 # dimension +basis_vect = ["exp", "alg", "cos", "alg"] # basis per dimension + +M = 10000 # number of training points +M_test = 100000 # number of test points +max_iter = 50 # maximum number of LSQR iterations + +# there are 3 possibilities with varying degree of freedom to define the number of used frequencies +########### Variant 1: +ds = 2 # superposition dimension +num = np.sum([math.comb(d, k) for k in np.arange(1, ds + 1)]) # number of used subsets +b = M / ( + math.log10(M) * num +) # number for the number of frequencies if we use logarithmic oversampling and distribute it evenly to all subsets +bw = [ + math.floor(b / 2) * 2, + math.floor(math.sqrt(b) / 2) * 2, +] # bandwidths (use even numbers) +# Use all subsets up to ds and use bw[0] many frequencies in subsets with one element, +# bw[1]^2 many for subsets with two elements and so on. +# +########### Variant 2: +# used subsets: +# U = [(), (0,), (1,), (2,), (3,), +# (0,1), (0,2), (0,3), (1,2), (1,3), (2,3)] +# Bandwidths for these subsets: +# N = [0, 100, 100, 100, 100, +# 10, 10, 10, 10, 10, 10] +# Use the subsets U with the bandwidths N. The bandwidth N[i] corresponds to the subset U[i]. +# For subsets with more than one direction the same bandwidth is used in all directions. +# +########### Variant 3: +# used subsets: +# U = [(), (0,), (1,), (2,), (3,), +# (0,1), (0,2), (0,3), (1,2), (1,3), (2,3)] +# Bandwidths for these subsets: +# N = [(), (100,), (100,), (100,), (100,), +# (10,10), (10,10), (10,10), (10,10), (10,10), (10,10)] +# Use the subsets U with the bandwidths N. The bandwidth N[i][j] corresponds to direction U[i][j]. + +lambdas = np.array([0.0, 1.0]) # used regularisation parameters λ + +############################ +## Generation of the data ## +############################ + +# Nodes are drawn from the correct domain for each basis, +# with Chebyshev-weight sampling for the algebraic (alg) dimensions: +# +# x₁ (exp): uniform on [-0.5, 0.5) → u ∈ [0,1], shift by -0.5 +# x₂ (alg): arcsine distribution on [0, 1] → x = (1 − cos(π u)) / 2 +# x₃ (cos): uniform on [0, 1] (cosine series, no special weight needed) +# x₄ (alg): arcsine distribution on [0, 1] → x = (1 − cos(π u)) / 2 +# +# The transformation (1 − cos(πu))/2 is the analogue of sin(π(u−0.5)) +# used in exampleCheb.py, adapted to the domain [0, 1]. + + +def cheb_nodes(rng, size): + """Sample from the arcsine (Chebyshev-weight) distribution on [0, 1].""" + return (1.0 - np.cos(np.pi * rng.random(size))) / 2.0 + + +X = rng.random((M, d)) # initially all columns uniform in [0, 1] +X[:, 0] -= 0.5 # x₁ (exp): shift to [-0.5, 0.5) +X[:, 1] = cheb_nodes(rng, M) # x₂ (alg): Chebyshev-weight on [0, 1] +X[:, 3] = cheb_nodes(rng, M) # x₄ (alg): Chebyshev-weight on [0, 1] +y = np.array( + [TestFunction(X[i, :]) for i in range(M)], dtype=complex +) # function values (complex because the mixed basis with exp uses complex coefficients) + +X_test = rng.random((M_test, d)) +X_test[:, 0] -= 0.5 +X_test[:, 1] = cheb_nodes(rng, M_test) +X_test[:, 3] = cheb_nodes(rng, M_test) +y_test = np.array([TestFunction(X_test[i, :]) for i in range(M_test)], dtype=complex) + +########################## +## Do the approximation ## +########################## + +ads = ANOVAapprox.approx(X, y, ds=ds, basis="mixed", N=bw, basis_vect=basis_vect) +ads.approximate(lam=lambdas, max_iter=max_iter, solver="lsqr") + +################################ +## get approximation accuracy ## +################################ + +mse = ads.get_mse(X=X_test, y=y_test) # get mse error at the test points +λ_min = min( + mse, key=mse.get +) # get the regularisation parameter which leads to the minimal error +mse_min = mse[λ_min] + +print("mse = " + str(mse_min)) + +############################################### +## Analyze the model to improve the accuracy ## +############################################### + +# Attribute ranking: how much does each dimension contribute? +attr_ranking = ads.get_AttributeRanking(lam=λ_min) +print("Attribute ranking:") +for i, r in enumerate(attr_ranking): + print(f" x{i+1} ({basis_vect[i]}): {r:.4f}") + +# Global sensitivity indices grouped by ANOVA term +gsi = ads.get_GSI(lam=λ_min, Dict=True) +print("\nGlobal sensitivity indices (top terms):") +for u, v in sorted(gsi.items(), key=lambda item: item[1], reverse=True)[:8]: + if v > 1e-4: + print(f" u = {u}: {v:.4f}") diff --git a/src/pyANOVAapprox/__init__.py b/src/pyANOVAapprox/__init__.py index c41618d..f286d38 100644 --- a/src/pyANOVAapprox/__init__.py +++ b/src/pyANOVAapprox/__init__.py @@ -1,4 +1,6 @@ +import csv import math +import os import threading from math import acos, isnan @@ -8,8 +10,6 @@ from scipy.optimize import bisect from scipy.sparse.linalg import lsqr from scipy.special import erf -import csv -import os # from sklearn.metrics import roc_auc_score diff --git a/src/pyANOVAapprox/analysis.py b/src/pyANOVAapprox/analysis.py index c72d17a..aedd105 100644 --- a/src/pyANOVAapprox/analysis.py +++ b/src/pyANOVAapprox/analysis.py @@ -37,10 +37,7 @@ def get_variances(self, settingnr=None, lam=None, Dict=False): elif ( lam == None ): # get_variances( a::approx; dict::Bool = false )::Dict{Float64,Union{Vector{Float64},Dict{Vector{Int},Float64}}} - return { - l: self._variances(settingnr, l, Dict) - for l in self.lam.keys() - } + return {l: self._variances(settingnr, l, Dict) for l in self.lam.keys()} def _GSI(self, settingnr, lam, Dict): # helpfunction for get_GSI @@ -78,9 +75,7 @@ def get_GSI(self, settingnr=None, lam=None, Dict=False): ): # get_GSI(a::approx, λ::Float64; dict::Bool = false,)::Union{Vector{Float64},Dict{Vector{Int},Float64}} return self._GSI(settingnr, lam, Dict) else: # get_GSI( a::approx; dict::Bool = false )::Dict{Float64,Union{Vector{Float64},Dict{Vector{Int},Float64}}} - return { - l: self._GSI(settingnr, l, Dict) for l in self.lam.keys() - } + return {l: self._GSI(settingnr, l, Dict) for l in self.lam.keys()} def _AttributeRanking(self, settingnr, lam): # helpfunction for get_AttributeRanking @@ -120,10 +115,7 @@ def get_AttributeRanking(self, settingnr=None, lam=None): if ( lam is None ): # get_AttributeRanking( a::approx, λ::Float64 )::Dict{Float64,Vector{Float64}} - return { - l: self._AttributeRanking(settingnr, l) - for l in self.lam.keys() - } + return {l: self._AttributeRanking(settingnr, l) for l in self.lam.keys()} else: # get_AttributeRanking( a::approx, λ::Float64 )::Vector{Float64} return self._AttributeRanking(settingnr, lam) @@ -162,10 +154,7 @@ def get_ActiveSet(self, eps, settingnr=None, lam=None): if ( lam is None ): # get_ActiveSet(a::approx, eps::Vector{Float64})::Dict{Float64,Vector{Vector{Int}}} - return { - l: self._ActiveSet(eps, settingnr, l) - for l in self.lam.keys() - } + return {l: self._ActiveSet(eps, settingnr, l) for l in self.lam.keys()} else: # get_ActiveSet(a::approx, eps::Vector{Float64}, λ::Float64)::Vector{Vector{Int}} return self._ActiveSet(eps, settingnr, lam) @@ -190,9 +179,7 @@ def get_ShapleyValues(self, settingnr=None, lam=None): This function returns the Shapley values of the approximation for all reg. parameters ``\lambda``, if lam == None, as a dictionary of vectors of length `a.d`. Otherwise for the provided lam as a vector of length `a.d`. """ if lam is None: # get_ShapleyValues(a::approx)::Dict{Float64,Vector{Float64}} - return { - l: self._ShapleyValues(settingnr, l) for l in self.lam.keys() - } + return {l: self._ShapleyValues(settingnr, l) for l in self.lam.keys()} else: # get_ShapleyValues(a::approx, λ::Float64)::Vector{Float64} return self._ShapleyValues(settingnr, lam) diff --git a/src/pyANOVAapprox/approx.py b/src/pyANOVAapprox/approx.py index 3bf79cd..29b2895 100644 --- a/src/pyANOVAapprox/approx.py +++ b/src/pyANOVAapprox/approx.py @@ -1,8 +1,8 @@ import copy -import warnings -import os import csv import math +import os +import warnings from bisect import bisect import pyANOVAapprox.analysis as ANOVAanalysis @@ -76,6 +76,41 @@ def transformX(X, basis): return Xt +def _validate_nodes_for_basis(X, basis, basis_vect=None): + min_X = np.min(X) + max_X = np.max(X) + + if basis in {"per", "chui1", "chui2", "chui3", "chui4"}: + if min_X < -0.5 or max_X >= 0.5: + raise ValueError("Nodes need to be between -0.5 and 0.5.") + elif basis == "cos": + if min_X < 0 or max_X > 1: + raise ValueError("Nodes need to be between 0 and 1.") + elif basis == "cheb": + if min_X < -1 or max_X > 1: + raise ValueError("Nodes need to be between -1 and 1.") + elif basis == "mixed": + if basis_vect is None or len(basis_vect) != X.shape[1]: + raise ValueError("basis_vect must have an entry for every dimension.") + + for j, basis_j in enumerate(basis_vect): + col = X[:, j] + if basis_j == "exp": + if np.min(col) < -0.5 or np.max(col) >= 0.5: + raise ValueError( + f"Nodes for exp basis in dimension {j} must be in [-0.5, 0.5)." + ) + elif basis_j in {"cos", "alg"}: + if np.min(col) < 0 or np.max(col) > 1: + raise ValueError( + f"Nodes for {basis_j} basis in dimension {j} must be in [0, 1]." + ) + else: + raise ValueError( + "basis_vect entries for mixed basis must be in {'exp', 'cos', 'alg'}." + ) + + compute_bandwidth = ANOVAbandwidth.compute_bandwidth @@ -139,11 +174,13 @@ def __init__( # "Periodic functions require complex vectors, nonperiodic functions real vectors." # ) - # if basis == "mixed": - # if len(basis_vect) == 0: - # raise ValueError("please call approx with basis_vect for a NFMT transform.") - # if len(basis_vect) < max(max(u) for u in U) +1: - # raise ValueError("basis_vect must have an entry for every dimension.") + if basis == "mixed": + if len(basis_vect) == 0: + raise ValueError( + "please call approx with basis_vect for a NFMT transform." + ) + if len(basis_vect) < self.parent.X.shape[1]: + raise ValueError("basis_vect must have an entry for every dimension.") self.basis = basis self.U = U @@ -190,15 +227,14 @@ def __init__( ds=None, lam={0.0}, ): - + if type(lam) is np.ndarray: lam = {float(i) for i in lam} if type(lam) is list: lam = set(lam) - self.X = X - + if X.shape[0] != y.shape[0]: raise ValueError("X and y have different lengths.") @@ -215,18 +251,7 @@ def __init__( lam=lam, ) - min_X = np.min(X) - max_X = np.max(X) - - if setting.basis in {"per", "chui1", "chui2", "chui3", "chui4"}: - if min_X < -0.5 or max_X >= 0.5: - raise ValueError("Nodes need to be between -0.5 and 0.5.") - elif setting.basis == "cos": - if min_X < 0 or max_X > 1: - raise ValueError("Nodes need to be between 0 and 1.") - elif setting.basis == "cheb": - if min_X < -1 or max_X > 1: - raise ValueError("Nodes need to be between -1 and 1.") + _validate_nodes_for_basis(X, setting.basis, setting.basis_vect) self.y = y self.setting = [setting] @@ -247,7 +272,7 @@ def addSetting(self, setting): self.trafo.append(None) self.fc.append({}) - def getSettingNr(self, settingnr=None, lam = None, warn = True): + def getSettingNr(self, settingnr=None, lam=None, warn=True): if lam is None: return self.aktsetting if settingnr is None else settingnr else: @@ -257,13 +282,13 @@ def getSettingNr(self, settingnr=None, lam = None, warn = True): warnings.warn("Not yet approximated for this lambda") return self.aktsetting if settingnr is None else settingnr - def getSetting(self, settingnr=None, lam = None): + def getSetting(self, settingnr=None, lam=None): return self.setting[self.getSettingNr(settingnr, lam)] - def getTrafo(self, settingnr=None, lam = None): + def getTrafo(self, settingnr=None, lam=None): return self.trafo[self.getSettingNr(settingnr, lam)] - def getFc(self, settingnr=None, lam = None): + def getFc(self, settingnr=None, lam=None): return self.fc[self.getSettingNr(settingnr, lam)] def addTrafo(self, settingnr=None): @@ -377,11 +402,11 @@ def approximate( If lam is a np.ndarray of dtype float, this function computes the approximation for the regularization parameters contained in lam. If lam is a float, this function computes the approximation for the regularization parameter lam. """ - + setting = self.getSetting(settingnr) - + if lam is None: - lam = setting.lam + lam = setting.lam else: if isinstance(lam, np.ndarray): lam = {float(l) for l in lam} @@ -437,7 +462,6 @@ def _cv_for_budget(B_curr): settingnr_temp = self.aktsetting self.addTrafo(settingnr_temp) - self._approximate( lam, settingnr=settingnr_temp, @@ -449,8 +473,16 @@ def _cv_for_budget(B_curr): ) M = self.X.shape[0] - nfreqs = np.sum([np.prod(np.array(bw[u])-1) for u in temp_setting.U])+1 - return 1/M * np.linalg.norm(self.evaluate(settingnr=settingnr_temp, lam=lam) - self.y)**2/(1-nfreqs/M)**2 + nfreqs = np.sum([np.prod(np.array(bw[u]) - 1) for u in temp_setting.U]) + 1 + return ( + 1 + / M + * np.linalg.norm( + self.evaluate(settingnr=settingnr_temp, lam=lam) - self.y + ) + ** 2 + / (1 - nfreqs / M) ** 2 + ) B_min = float(max(sum(5 ** len(u) for u in setting.U), 1)) B_max = float(max(B_min + 1.0, len(self.y) / 3.0)) @@ -509,21 +541,23 @@ def _autoapproximate( D = dict([(u, tuple([1.0] * len(u))) for u in setting.U]) t = dict([(u, tuple([1.0] * len(u))) for u in setting.U]) - - if verbosity>3: + + if verbosity > 3: if not os.path.exists("log"): os.mkdir("log") - with open('log/log.csv', 'w', newline='') as csvfile: - csv.writer(csvfile, delimiter=',').writerow(["it"] + setting.U) + with open("log/log.csv", "w", newline="") as csvfile: + csv.writer(csvfile, delimiter=",").writerow(["it"] + setting.U) for idx in range(maxiter): if verbosity > 0: print("===== Iteration ", str(idx + 1), " =====") - + # Optional: Use cross-validation to find optimal B if cross_validation: if verbosity > 0: - print("Running binary search for optimal bandwidth via cross-validation...") + print( + "Running binary search for optimal bandwidth via cross-validation..." + ) B, cv_val = self._compute_cv_error( D=D, @@ -541,7 +575,7 @@ def _autoapproximate( ) if verbosity > 0: print(f"Optimal B from CV: {B:.1f} (cv={cv_val:.6e})") - + bw = compute_bandwidth(B, D, t) if setting.N is not None: self.addSetting(setting) @@ -555,11 +589,13 @@ def _autoapproximate( if verbosity > 0: for i in setting.U: print("bw in", str(i), ":", bw[i]) - #print("bw in iteration", str(idx + 1), "are", str(bw)) + # print("bw in iteration", str(idx + 1), "are", str(bw)) print() if verbosity > 3: - with open('log/log.csv', 'a', newline='') as csvfile: - csv.writer(csvfile, delimiter=',').writerow(["bw in it"+str(idx+1)] + [str(bw[i]) for i in setting.U]) + with open("log/log.csv", "a", newline="") as csvfile: + csv.writer(csvfile, delimiter=",").writerow( + ["bw in it" + str(idx + 1)] + [str(bw[i]) for i in setting.U] + ) self.approximate( lam=lam, solver=solver, @@ -572,22 +608,33 @@ def _autoapproximate( D, t = self.estimate_rates(lam=lam, verbosity=verbosity) if verbosity > 1: for i in setting.U: - print("estimated rates for", str(i), ": D = ", str(D[i]), "and t = ", str(t[i]) ) + print( + "estimated rates for", + str(i), + ": D = ", + str(D[i]), + "and t = ", + str(t[i]), + ) print() - - #print( + + # print( # "estimated rates in iteration", # str(idx + 1), # "are D =", # str(D), # "and t =", # str(t), - #) + # ) if verbosity > 3: - with open('log/log.csv', 'a', newline='') as csvfile: - wr = csv.writer(csvfile, delimiter=',') - wr.writerow(["D in it"+str(idx+1)] + [str(D[i]) for i in setting.U]) - wr.writerow(["t in it"+str(idx+1)] + [str(t[i]) for i in setting.U]) + with open("log/log.csv", "a", newline="") as csvfile: + wr = csv.writer(csvfile, delimiter=",") + wr.writerow( + ["D in it" + str(idx + 1)] + [str(D[i]) for i in setting.U] + ) + wr.writerow( + ["t in it" + str(idx + 1)] + [str(t[i]) for i in setting.U] + ) return settingnrs def autoapproximate( @@ -608,7 +655,7 @@ def autoapproximate( ): """ Automatic approximation with optional cross-validation for bandwidth selection. - + Parameters: ----------- use_cross_validation : bool, optional @@ -649,7 +696,6 @@ def autoapproximate( ) self.lam[l] = self.lam[l] + settingnrs - def evaluate(self, settingnr=None, lam=None, X=None): """ This function evaluates the approximation with optional node matrix X and regularization lam. @@ -662,12 +708,7 @@ def evaluate(self, settingnr=None, lam=None, X=None): setting = self.getSetting(settingnr, lam) if X is not None: - if setting.basis == "per" and (np.min(X) < -0.5 or np.max(X) >= 0.5): - raise ValueError("Nodes need to be between -0.5 and 0.5.") - elif setting.basis == "cos" and (np.min(X) < 0 or np.max(X) > 1): - raise ValueError("Nodes need to be between 0 and 1.") - elif setting.basis == "cheb" and (np.min(X) < -1 or np.max(X) > 1): - raise ValueError("Nodes need to be between -1 and 1.") + _validate_nodes_for_basis(X, setting.basis, setting.basis_vect) trafo = GroupedTransform( system=gt_systems[setting.basis], @@ -697,19 +738,14 @@ def evaluateANOVAterms(self, settingnr=None, X=None, lam=None): if X is None: X = self.X - if setting.basis == "per" and (np.min(X) < -0.5 or np.max(X) >= 0.5): - raise ValueError("Nodes need to be between -0.5 and 0.5.") - elif setting.basis == "cos" and (np.min(X) < 0 or np.max(X) > 1): - raise ValueError("Nodes need to be between 0 and 1.") - elif setting.basis == "cheb" and (np.min(X) < -1 or np.max(X) > 1): - raise ValueError("Nodes need to be between -1 and 1.") + _validate_nodes_for_basis(X, setting.basis, setting.basis_vect) Xt = transformX(X, setting.basis) - if self.setting[settingnr].basis == "per": - values = np.zeros((Xt.shape[0], len(self.setting[settingnr].U)), "complex") + if setting.basis in {"per", "mixed"}: + values = np.zeros((Xt.shape[0], len(setting.U)), "complex") else: - values = np.zeros((Xt.shape[0], len(self.setting[settingnr].U)), "float") + values = np.zeros((Xt.shape[0], len(setting.U)), "float") trafo = GroupedTransform( system=gt_systems[setting.basis], @@ -745,12 +781,7 @@ def evaluateSHAPterms(self, settingnr=None, X=None, lam=None): if X is None: X = self.X - if setting.basis == "per" and (np.min(X) < -0.5 or np.max(X) >= 0.5): - raise ValueError("Nodes need to be between -0.5 and 0.5.") - elif setting.basis == "cos" and (np.min(X) < 0 or np.max(X) > 1): - raise ValueError("Nodes need to be between 0 and 1.") - elif setting.basis == "cheb" and (np.min(X) < -1 or np.max(X) > 1): - raise ValueError("Nodes need to be between -1 and 1.") + _validate_nodes_for_basis(X, setting.basis, setting.basis_vect) d = X.shape[1] M = X.shape[0] @@ -758,9 +789,9 @@ def evaluateSHAPterms(self, settingnr=None, X=None, lam=None): if ( lam is not None ): # evaluateSHAPterms( a::approx; X::Matrix{Float64}, λ::Float64 )::Union{Matrix{ComplexF64},Matrix{Float64}} - terms = self.evaluateANOVAterms(X, lam) + terms = self.evaluateANOVAterms(settingnr=settingnr, X=X, lam=lam) - Dtype = np.complex128 if setting.basis == "per" else np.float64 + Dtype = np.complex128 if setting.basis in {"per", "mixed"} else np.float64 values = np.zeros((M, d), dtype=Dtype) for i in range(d): @@ -772,9 +803,11 @@ def evaluateSHAPterms(self, settingnr=None, X=None, lam=None): else: # evaluateSHAPterms( a::approx; X::Matrix{Float64} )::Dict{Float64,Union{Matrix{ComplexF64},Matrix{Float64}}} results = {} for l in self.lam.keys(): - terms = self.evaluateANOVAterms(X, l) + terms = self.evaluateANOVAterms(settingnr=settingnr, X=X, lam=l) - Dtype = np.complex128 if setting.basis == "per" else np.float64 + Dtype = ( + np.complex128 if setting.basis in {"per", "mixed"} else np.float64 + ) values = np.zeros((M, d), dtype=Dtype) for i in range(d): diff --git a/src/pyANOVAapprox/bandwidth.py b/src/pyANOVAapprox/bandwidth.py index e9fe215..ff0b6f4 100644 --- a/src/pyANOVAapprox/bandwidth.py +++ b/src/pyANOVAapprox/bandwidth.py @@ -5,11 +5,11 @@ def getfcu(ghat, u): idx = [s.u for s in ghat.settings].index(u) bws = ghat.settings[idx].bandwidths - #fcu = ghat[u].reshape(bws[::-1] - 1) - #fcu = np.permute_dims(fcu, range(len(bws))[::-1]) + # fcu = ghat[u].reshape(bws[::-1] - 1) + # fcu = np.permute_dims(fcu, range(len(bws))[::-1]) fcu = ghat[u].reshape(bws - 1) fcu = np.permute_dims(fcu, range(len(bws))) - + return fcu @@ -37,7 +37,7 @@ def compute_bandwidth(B, D, t): us = set(D.keys()) - {()} bw = {u: [6] * len(u) for u in us} bw[()] = [] - + minfreqs = sum(math.prod((bw[u][j] - 1) for j in range(len(u))) for u in us) if B < minfreqs: raise ValueError(f"Budget too small: {B} < {minfreqs}") @@ -123,19 +123,19 @@ def estimate_rates(self, lam, settingnr=None, verbosity=0): mcl = np.exp(most_common_value(np.log(abs(self.getFc(settingnr)[lam].data)))) threshold = 100 * mcl**2 - - if verbosity>5: - if not os.path.exists(os.path.join("log","figures")): - os.mkdir(os.path.join("log","figures")) + + if verbosity > 5: + if not os.path.exists(os.path.join("log", "figures")): + os.mkdir(os.path.join("log", "figures")) num = 0 - + system = self.getTrafo(settingnr).system for u in us: if len(u) == 0: continue - if verbosity>5: + if verbosity > 5: fig, ax = plt.subplots() ax.set_xscale("log") ax.set_yscale("log") @@ -154,7 +154,7 @@ def estimate_rates(self, lam, settingnr=None, verbosity=0): None, ) - if verbosity>5: + if verbosity > 5: y = np.cumsum(axissum[::-1])[::-1] ax.plot( @@ -174,7 +174,7 @@ def estimate_rates(self, lam, settingnr=None, verbosity=0): D[u][j] = Duj t[u][j] = -tuj / 2 - if verbosity>5: + if verbosity > 5: x = np.arange(1, idx + 1) ax.plot( x, @@ -182,12 +182,18 @@ def estimate_rates(self, lam, settingnr=None, verbosity=0): linewidth=2, # color=j ) - if verbosity>5: - #plt.figure(figsize=(12, 9)) - #for ax in ps: + if verbosity > 5: + # plt.figure(figsize=(12, 9)) + # for ax in ps: # plt.sca(ax) - fig.savefig(os.path.join("log","figures", str(num).strip()+ "_rates_" + str(u).strip() + ".png")) + fig.savefig( + os.path.join( + "log", + "figures", + str(num).strip() + "_rates_" + str(u).strip() + ".png", + ) + ) num = num + 1 - plt.close(fig) + plt.close(fig) return D, t diff --git a/src/pyANOVAapprox/errors.py b/src/pyANOVAapprox/errors.py index e484203..57eedbf 100644 --- a/src/pyANOVAapprox/errors.py +++ b/src/pyANOVAapprox/errors.py @@ -37,10 +37,7 @@ def get_l2_error(self, settingnr=None, X=None, y=None, lam=None): if lam is not None: return self._l2_error(settingnr, lam, X, y) else: - return { - l: self._l2_error(settingnr, l, X, y) - for l in self.lam.keys() - } + return {l: self._l2_error(settingnr, l, X, y) for l in self.lam.keys()} def _mse(self, settingnr, lam, X, y): # helpfunction for get_mse @@ -77,9 +74,7 @@ def get_mse(self, settingnr=None, X=None, y=None, lam=None): if lam is not None: return self._mse(settingnr, lam, X, y) else: - return { - l: self._mse(settingnr, l, X, y) for l in self.lam.keys() - } + return {l: self._mse(settingnr, l, X, y) for l in self.lam.keys()} def _mad(self, settingnr, lam, X, y): # helpfunction for get_mad @@ -116,16 +111,16 @@ def get_mad(self, settingnr=None, X=None, y=None, lam=None): if lam is not None: return self._mad(settingnr, lam, X, y) else: - return { - l: self._mad(settingnr, l, X, y) for l in self.lam.keys() - } + return {l: self._mad(settingnr, l, X, y) for l in self.lam.keys()} def _L2_error(self, norm, bc_fun, settingnr, lam): # print(settingnr) if self.getSetting(settingnr, lam).basis in {"per", "cos", "cheb", "std", "mixed"}: error = norm**2 - index_set = gt.get_IndexSet(self.getTrafo(settingnr, lam).settings, self.X.shape[1]) + index_set = gt.get_IndexSet( + self.getTrafo(settingnr, lam).settings, self.X.shape[1] + ) for i in range(index_set.shape[1]): k = index_set[:, i] @@ -153,10 +148,7 @@ def get_L2_error(self, norm, bc_fun, settingnr=None, lam=None): if lam is not None: return self._L2_error(norm, bc_fun, settingnr, lam) else: - return { - l: self._L2_error(norm, bc_fun, settingnr, l) - for l in self.lam.keys() - } + return {l: self._L2_error(norm, bc_fun, settingnr, l) for l in self.lam.keys()} def _acc(self, settingnr, lam, X, y): # helpfunction for get_acc @@ -176,9 +168,7 @@ def get_acc(self, settingnr=None, X=None, y=None, lam=None): if lam is not None: return self._acc(settingnr, lam, X, y) else: - return { - l: self._acc(settingnr, l, X, y) for l in self.lam.keys() - } + return {l: self._acc(settingnr, l, X, y) for l in self.lam.keys()} def auc_score(y_true, y_pred_proba): @@ -240,9 +230,7 @@ def get_auc(self, settingnr=None, X=None, y=None, lam=None): if lam is not None: return self._auc(settingnr, lam, X, y) else: - return { - l: self._auc(settingnr, l, X, y) for l in self.lam.keys() - } + return {l: self._auc(settingnr, l, X, y) for l in self.lam.keys()} __all__ = [ diff --git a/tests/mixed_lsqr.py b/tests/mixed_lsqr.py new file mode 100644 index 0000000..3be5d24 --- /dev/null +++ b/tests/mixed_lsqr.py @@ -0,0 +1,63 @@ +import os +import sys + +repo_root = os.getcwd() +src_aa = os.path.abspath(os.path.join(repo_root, "src")) +src_gt = os.path.abspath(os.path.join(repo_root, "..", "pyGroupedTransforms", "src")) +src_nfft3 = os.path.abspath(os.path.join(repo_root, "..", "pyNFFT3", "src")) + +sys.path.insert(0, src_aa) +sys.path.insert(0, src_gt) +sys.path.insert(0, src_nfft3) + +import numpy as np +from TestFunctionPeriodic import AS, f, fc, norm + +import pyANOVAapprox as ANOVAapprox + +d = 6 +ds = 2 +M = 10000 +max_iter = 50 +bw = [100, 10] +lambdas = np.array([0.0, 1.0]) + +basis_vect = ["exp", "exp", "exp", "exp", "exp", "exp"] + +rng = np.random.default_rng(1234) +X = rng.random((M, d)) / 2.0 +y = np.array([f(X[i, :]) for i in range(M)], dtype=complex) +X_test = rng.random((M, d)) / 2.0 +y_test = np.array([f(X_test[i, :]) for i in range(M)], dtype=complex) + +ads = ANOVAapprox.approx(X, y, ds=ds, N=bw, basis="mixed", basis_vect=basis_vect) +ads.approximate(lam=lambdas, solver="lsqr", max_iter=max_iter) + +attr_ranking = ads.get_AttributeRanking(lam=0.0) +print("AR: ", np.sum(attr_ranking)) +assert abs(np.sum(attr_ranking) - 1.0) < 1e-4 + +bw = ANOVAapprox.get_orderDependentBW(AS, [128, 32]) + +aU = ANOVAapprox.approx(X, y, U=AS, N=bw, basis="mixed", basis_vect=basis_vect) +aU.approximate(lam=lambdas, solver="lsqr", max_iter=max_iter) + +err_L2_ds = ads.get_L2_error(norm(), fc)[0.0] +err_L2_U = aU.get_L2_error(norm(), fc)[0.0] +err_l2_ds = ads.get_l2_error()[0.0] +err_l2_U = aU.get_l2_error()[0.0] +err_l2_rand_ds = ads.get_l2_error(X=X_test, y=y_test)[0.0] +err_l2_rand_U = aU.get_l2_error(X=X_test, y=y_test)[0.0] + +print("== MIXED LSQR ==") +print("L2 ds: ", err_L2_ds) +print("L2 U: ", err_L2_U) +print("l2 ds: ", err_l2_ds) +print("l2 U: ", err_l2_U) +print("l2 rand ds: ", err_l2_rand_ds) +print("l2 rand U: ", err_l2_rand_U) + +assert err_l2_ds < 0.01 +assert err_l2_U < 0.005 +assert err_l2_rand_ds < 0.01 +assert err_l2_rand_U < 0.005 diff --git a/tests/run_tests.py b/tests/run_tests.py index 716528b..870c775 100644 --- a/tests/run_tests.py +++ b/tests/run_tests.py @@ -17,6 +17,7 @@ def run_tests(): "tests/cheb_lsqr.py", "tests/per_fista.py", "tests/per_lsqr.py", + "tests/mixed_lsqr.py", ] for test_file in test_files: