-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathassignment_bigdata_module_1.py
More file actions
179 lines (135 loc) · 6.43 KB
/
assignment_bigdata_module_1.py
File metadata and controls
179 lines (135 loc) · 6.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# -*- coding: utf-8 -*-
"""assignment_BigData_module_1.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1F5TAoSvJ_xNE2m_lKjkvHRw81PCCfcum
# **Assignment for Big Data Module 1**
"*Implementare un interpolatore lineare a tratti su una griglia a $n$ dimensioni.*
*Supporre che sia data la struttura dati multidimensionale $S_{j_0, \cdots, j_{n - 1}}$ che contiene il valore del campione della funzione nel punto della griglia di indici $j_0, \cdots, j_{n-1}$ e che il punto in cui calcolare l'interpolazione sia dato come un vettore $x_j$ con $n$ componenti.*"
Reference used: course lecture notes *4 - Interpolation*.
"""
import numpy as np
np.random.seed(0)
from itertools import product # Cartesian product
"""Implementation of the convex decomposition of $\mathbf{x} \in [0, 1)^n$ into $n + 1$ pairs $(\mathbf{b}^j, \alpha_{\mathbf{b}^j})_{j = 0, \cdots, n}$ of:
* selected vectors `b[j]` $= \mathbf{b}^j \in \mathbb{B}$, with $\mathbb{B} = \{0, 1\}^n$
* corresponding coefficients `a[j]` $= {\alpha_{\mathbf{b}^j}} \ge 0$ s.t. ${\sum_{j = 0}^{n}} \alpha_{\mathbf{b}^j} = 1$.
"""
def my_decomposition(x):
n = len(x) # dimension
p = x.argsort() # permutation that sorts x in ascending order
B = np.concatenate( # B[j] will be the j-th vector of the basis
(
np.ones((n, n), dtype=bool), # 1st one is all 1s; up to the n-th, they will be adjusted flipping down components
np.zeros((1, n), dtype=bool), # last one is already known to be all 0s
)
)
a = np.zeros(n + 1) # alpha coefficients: a[j] is the coeff of B[j]
# decomposition algorithm
a[0] = x[p[0]]
for j in range(1, n):
B[j:, p[j - 1]] = 0 # entry j is zeroed from this vector on
a[j] = x[p[j]] - x[p[j - 1]]
a[n] = 1 - a.sum()
return p, B, a
"""Check of the correctness of the implementation: it must hold that $\mathbf{x} = {\sum_{j = 0}^{n}} \alpha_{\mathbf{b}^j} \mathbf{b}^j$.
That is, `np.dot(a, B)` must amount to `x` up to the numerical precision.
"""
n = np.random.randint(1, 1 + 5, (1,)).item() # random dimension (max 5)
x = np.random.rand(n) # random vector in [0, 1)^n
p, B, a = my_decomposition(x)
aB = np.dot(a, B) # recomposition
d = ((aB - x)**2).sum()**0.5 # Euclidean distance
# print a summary of the test
print('Vector:')
print('n = %d' % n)
print('x =', x)
print('\nDecomposition:')
print('p =', p)
print('B =')
print(B.astype(np.int))
print('a =', a)
print('\nCheck of the recomposition:')
print(' x =', x)
print('a*B =', aB)
print('\nTheoretical is 0. Numerical discrepancy: d =', d)
"""Implementation of `fhat(x, S)` $= \hat{f}(\mathbf{x}, S)$ based on the data structure `S` $= S_{j_0 \cdots, j_{n - 1}}$.
Formula: $\hat{f}(\mathbf{x}) = {\sum_{j = 0}^{n}} \alpha_{\mathbf{b}^j} S_{z + \mathbf{b}^j}$.
Pseudo-code, vectorized: `fhat_x = np.dot(a, S[z + B])`.
"""
def fhat(x, S):
n = len(x) # dimension
z = np.floor(x).astype(np.int) # vector integer part
x = x - z # vector fractional part
_, B, a = my_decomposition(x)
# z + B : matrix: point z (broadcast) + ALL decomposition vectors b^j = B[j]
# zpb : vector: point z + ONE decomposition vector b^j = B[j]
# S : the data structure f(z grid)
# S_zplusB : S(z + B): matrix of the n+1 images of z + B looked up in S
S_zplusB = np.array([S[tuple(zpb)] for zpb in z + B])
return np.dot(a, S_zplusB) # linear interpolation formula
"""## Example 1/2
* test on an integer vector
* computation on a generic point
"""
# The scalar function to be linearly interpolated
def f(x):
return np.sin(x / 2).sum()
J = (10, 10, 20, 15) # number of grid points in each dimension
n = len(J) # dimension
Zj = [np.arange(J[j]) for j in range(n)] # integer values in each dimension
grid_in_Zn = product(*Zj) # grid: "rectangular" subset of Z^n
# Cartesian product done via itertool.product
# creation of the grid of values S
S = np.zeros(J)
for z in grid_in_Zn:
S[z] = f(np.array(z))
# A) Interpolation in a random integer point (inside the grid)
x_integral = np.random.randint(low=0, high=np.array(J) - 1, size=n) # random integral part
x = x_integral
fx, fhatx = f(x), fhat(x, S) # compute exact value and interpolated value
e = fhatx - fx # interpolation error (signed)
# print summary of the test
print('Test on an integer vector:')
print(' x =', x)
print(' f(x) =', fx)
print('fhat(x) =', fhatx)
print('Interpolation error must be 0. Interpolation error (signed): e = fhat(x) - f(x) =', e)
# B) Interpolation in a random point (inside the grid)
x_integral = np.random.randint(low=0, high=np.array(J) - 1, size=n) # random integral part
x_fractional = np.random.rand(n) # random fractional part
x = x_integral + x_fractional
fx, fhatx = f(x), fhat(x, S) # compute exact value and interpolated value
e = fhatx - fx # interpolation error (signed)
print('\nInterpolation in a generic point:')
print(' x =', x)
print(' f(x) =', fx)
print('fhat(x) =', fhatx)
print('Interpolation error (signed): e = fhat(x) - f(x) =', e)
"""## Example 2/2
One-dimensional case with plot.
"""
# The scalar function to be linearly interpolated
def f(x):
t = 3.75 * 2 * np.pi
return np.sin(x) if x < t else np.sin(2.8 * x + t + np.pi * 2.8 / 2)
# dimension is n = 1
J = 42 # number of grid points
xmax = J - 2 # maximum x not to exceed the grid
grid_in_Z = np.arange(J) # grid of integers
S = np.array([f(z) for z in grid_in_Z]) # creation of the grid of values S
M = 1 + (J - 2) * 10 # number of points to interpolate in
x = np.linspace(0, xmax, M).reshape((M, 1)) # points to interpolate in
fhatx = np.array([fhat(xm, S) for xm in x]) # compute the interpolated values
# Plotting
import matplotlib.pyplot as plt
plt.figure(figsize=(20, 4))
xf = np.linspace(0, xmax, 1000) # exact values woth finer resolution also in x, for the plot
plt.plot(xf, [f(xx) for xx in xf], 'r-', label='Function')
zticks = range(0, J - 1)
plt.plot(zticks, [f(z) for z in zticks], 'ko', label='Given values')
plt.plot(x, fhatx, 'b.-', label='Linear interpolation')
plt.legend(loc='lower left', fontsize=14)
plt.xlabel('x', fontsize=16)
plt.ylabel('Value', fontsize=16)
plt.grid()