-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhilbert_transformer_phase.m
More file actions
145 lines (118 loc) · 4.83 KB
/
hilbert_transformer_phase.m
File metadata and controls
145 lines (118 loc) · 4.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
function [phase, estimate_mask, analytic] = hilbert_transformer_phase(data, buffer_len, varargin)
% hilbert_transformer_phase: estimate real-time phase using a hilbert transformer,
% using AR prediction to offset the group delay.
%
% Syntax: [phase, estimate_mask, analytic] = hilbert_transformer_phase(...
% data, buffer_len, [ht_b], [band], [Fs], [upsample]);
%
% Input:
% data: raw data at 30 kHz (single channel)
% buffer_len: number of 30 kHz samples per buffer (for 1 ms buffer)
% [ht_b]: (designed using firpm) numerator of (FIR) Hilbert
% transformer to use, at 500 Hz sample rate.
% [band]: ([4 8]) target frequency band, in Hz
% [Fs]: (30000) sample rate in Hz, must be a multiple of 500.
% [upsample]: (false) whether to upsample using linear interpolation
%
% Output:
% phase: estimated phase in radians
% estimate_mask: true for indices of source data with corresponding phase estimates.
% (should be all true if 'upsample' is true)
% analytic: analytic signal (same size as "phase" output)
assert(isvector(data), 'Data must be a vector');
% sample rates
Fs = 30000;
Fs_ds = 500;
if length(varargin) >= 3 && ~isempty(varargin{3})
Fs = varargin{3};
assert(isscalar(Fs) && isreal(Fs) && Fs > 0, 'Invalid sample rate');
assert(mod(Fs, Fs_ds) == 0, 'Sample rate must be a multiple of 500');
end
ds_factor = Fs / Fs_ds;
% bandpass filter
band = [4 8];
if length(varargin) >= 2 && ~isempty(varargin{2})
band = varargin{2};
assert(isvector(band) && isnumeric(band) && isreal(band) && length(band) == 2 ...
&& band(1) > 0 && band(2) > band(1) && band(1) < Fs_ds/2, ...
'Invalid frequency band.');
band = band(:);
end
[b, a] = butter(2, band/(Fs/2));
data_filt = filter(b, a, data);
% hilbert transformer
if ~isempty(varargin) && ~isempty(varargin{1})
ht_b = varargin{1};
ht_order = length(ht_b) - 1;
else
ht_order = 18;
ht_freq = [band(1) (Fs_ds/2)-band(1)] / (Fs_ds/2);
ht_b = firpm(ht_order, ht_freq, [1 1], 'hilbert');
end
ht_offset_samp = ht_order / 2;
f_resp = freqz(ht_b, 1, (band(1):0.1:band(2)) * 2 * pi / Fs_ds);
mag_resp = abs(f_resp);
% normalize by geometric mean of magnitude response range
scale_factor = 1/sqrt(min(mag_resp) * max(mag_resp));
% ar prediction (want to save ~1 second of data for training)
ar_order = 20;
ar_buf_len = max(Fs_ds, 2 * ar_order);
ar_buf = zeros(ar_buf_len, 1);
% output setup
estimate_mask = false(size(data));
estimate_mask(1:ds_factor:end) = true;
estimate_mask_buf = buffer(estimate_mask, buffer_len);
ht_state = zeros(ht_order, 1);
data_buf = buffer(data_filt, buffer_len);
analytic_buf = zeros(size(data_buf));
max_n_samp = ceil(buffer_len / ds_factor);
ar_out = zeros(ar_buf_len + ht_offset_samp + 1, 1);
ht_buf = zeros(max_n_samp + ht_offset_samp + 1, 1);
ht_out = ht_buf;
% for interpolation
do_upsamp = false;
if length(varargin) >= 4 && varargin{4}
do_upsamp = true;
lastcs = 0;
lastcs_offset = 1 - ds_factor;
estimate_mask = true(length(data), 1);
valid_samp = buffer(estimate_mask, buffer_len);
end
for kBuf = 1:size(data_buf, 2)
% get downsampled samples in this buffer
curr_mask = estimate_mask_buf(:, kBuf);
buf_ds = data_buf(curr_mask, kBuf);
n_samp = length(buf_ds);
ht_buf_offset = max_n_samp - n_samp;
% push new samples to ar buffer
ar_buf(1:end-n_samp) = ar_buf(1+n_samp:end);
ar_buf(end-n_samp+1:end) = buf_ds;
% extend by the hilbert transformer offset
ar_out(:) = ar_extend(ar_buf, ht_offset_samp + 1, [], ar_order);
ht_buf(ht_buf_offset + 1:end) = ar_out(end-(n_samp+ht_offset_samp+1)+1:end);
% apply hilbert transformer (2 steps)
ht_out(:) = 0;
% save state at t0
[ht_out(ht_buf_offset + 1:max_n_samp), ht_state] = filter(ht_b, 1, ht_buf(ht_buf_offset + 1:max_n_samp), ht_state);
% apply to extension w/o saving state
ht_out(max_n_samp+1:end) = filter(ht_b, 1, ht_buf(max_n_samp+1:end), ht_state);
% get analytic signal
analytic_buf(curr_mask, kBuf) = complex(buf_ds, scale_factor * ht_out(end-n_samp:end-1));
if do_upsamp
x_interp = (lastcs_offset : ds_factor : lastcs_offset + (n_samp+1)*ds_factor)';
lastcs_offset = x_interp(end-1) - buffer_len;
buf_interp = [lastcs;
analytic_buf(curr_mask, kBuf);
complex(ar_out(ar_buf_len + 1), ht_out(end))];
lastcs = buf_interp(end-1);
xq = find(valid_samp(:, kBuf));
analytic_buf(xq, kBuf) = ...
interp1(x_interp, abs(buf_interp), xq) ...
.* exp(1j * interp1(x_interp, unwrap(angle(buf_interp)), xq));
end
end
% reshape and mask phase
analytic = analytic_buf(:);
analytic = analytic(estimate_mask);
phase = angle(analytic);
end