44Features from audio files
55"""
66
7+ import pickle
8+ import struct
79from typing import Dict , Optional , Tuple , Union
810
911import librosa
1012import numpy as np
13+ from scipy .spatial .distance import mahalanobis
1114
1215from matchmaker .utils .processor import Processor
1316
@@ -168,8 +171,17 @@ def __call__(
168171
169172class CQTSpectralFluxProcessor (Processor ):
170173 """
171- CQT spectrum (88 bins, A0-C8) with optional half-wave rectified spectral flux.
172- Output shape: (n_frames, 88) or (n_frames, 89) if include_spectral_flux=True.
174+ Processor for CQT spectrum with optional Spectral Flux features.
175+
176+ Based on Nakamura et al. (2013) "Acoustic Score Following to Musical
177+ Performance with Errors and Arbitrary Repeats and Skips for Automatic Accompaniment".
178+
179+ This processor extracts CQT (Constant-Q Transform) spectrum (88 dimensions
180+ matching piano keyboard range) and optionally combines it with spectral flux
181+ features for improved onset detection and score following.
182+
183+ The CQT spectrum uses 88 bins corresponding to piano keys (MIDI note 21-108,
184+ A0 to C8), with one bin per semitone, matching the piano roll representation.
173185 """
174186
175187 def __init__ (
@@ -186,6 +198,8 @@ def __init__(
186198 self .sample_rate = sample_rate
187199 self .hop_length = hop_length
188200 self .norm = norm
201+ # Default to A0 (MIDI note 21, 27.5 Hz) to match piano range
202+ # This ensures 88 bins correspond to piano keys (A0 to C8)
189203 self .fmin = fmin if fmin is not None else librosa .note_to_hz ("A0" )
190204 self .n_bins = n_bins
191205 self .bins_per_octave = bins_per_octave
@@ -196,6 +210,9 @@ def __call__(
196210 self ,
197211 y : InputAudioSeries ,
198212 ) -> Tuple [Optional [np .ndarray ], Dict ]:
213+ # Compute CQT spectrum
214+ # CQT provides constant-Q (logarithmic) frequency resolution
215+ # with one bin per semitone, matching piano keyboard layout
199216 cqt = librosa .cqt (
200217 y = y ,
201218 sr = self .sample_rate ,
@@ -206,21 +223,340 @@ def __call__(
206223 norm = self .norm ,
207224 dtype = np .float32 ,
208225 )
209- cqt_features = np .abs (cqt ).T
226+ # Use magnitude spectrum (absolute value of complex CQT coefficients)
227+ cqt_magnitude = np .abs (cqt )
228+
229+ # Transpose to get time frames as rows: (n_frames, n_bins)
230+ cqt_features = cqt_magnitude .T
210231
211232 if self .include_spectral_flux :
233+ # Compute spectral flux (optional)
234+ # Spectral flux measures the rate of change of the spectrum
212235 if self .prev_magnitude is None :
236+ # For the first frame, use zero flux
213237 spectral_flux = np .zeros ((cqt_features .shape [0 ], 1 ), dtype = np .float32 )
214238 else :
215- diff = np .maximum (cqt_features - self .prev_magnitude , 0 )
216- spectral_flux = np .sum (diff , axis = 1 , keepdims = True )
217-
239+ # Compute difference between current and previous magnitude
240+ diff = cqt_features - self .prev_magnitude
241+ # Half-wave rectification: only positive differences
242+ diff_rectified = np .maximum (diff , 0 )
243+ # Sum across frequency bins to get spectral flux per frame
244+ spectral_flux = np .sum (diff_rectified , axis = 1 , keepdims = True )
245+
246+ # Update previous magnitude for next call
218247 self .prev_magnitude = cqt_features .copy ()
248+
249+ # Combine CQT features with spectral flux
219250 features = np .hstack ([cqt_features , spectral_flux ])
220251 else :
252+ # Return only CQT features (88 dimensions)
221253 features = cqt_features
222254
223- return features [1 :- 1 ]
255+ # Return features: (n_frames, n_features)
256+ # For streaming, we want the last frame only
257+ # But for batch processing, return all frames
258+ return features [1 :- 1 ] # Remove first and last frame (edge effects)
259+
260+
261+ class GaussianToneModel :
262+ def __init__ (
263+ self , means : np .ndarray , inv_covs : np .ndarray , const_terms : np .ndarray
264+ ):
265+ """
266+ ToneModel implementation based on C++ ToneModel.hpp
267+
268+ Parameters
269+ ----------
270+ means : ndarray (N_models x n_features)
271+ Mean vectors for each tone model (usually 88 keys + noise)
272+ inv_covs : ndarray (N_models x n_features x n_features)
273+ Inverse covariance matrices (diagonal or full)
274+ const_terms : ndarray (N_models,)
275+ Precomputed log-constant terms for Gaussian PDF
276+ """
277+ self .means = means
278+ self .inv_covs = inv_covs
279+ self .const_terms = const_terms
280+ self .n_models = means .shape [0 ]
281+ self .n_features = means .shape [1 ]
282+ # True when generated by synthetic_cqt_templates fallback
283+ self .is_synthetic = False
284+
285+ def compute_log_likelihood (self , feature : np .ndarray ) -> np .ndarray :
286+ """
287+ Calculate Log-Observation Probability for a single feature vector.
288+ Corresponds to ToneModel::calc in C++
289+
290+ Parameters
291+ ----------
292+ feature : ndarray (n_features,)
293+ Input CQT feature vector
294+
295+ Returns
296+ -------
297+ log_probs : ndarray (N_models,)
298+ Log likelihood for each tone model
299+ """
300+ log_probs = np .zeros (self .n_models )
301+
302+ # Vectorized implementation of Mahalanobis distance calculation
303+ # log_prob = const_term - 0.5 * (x - mu).T * inv_cov * (x - mu)
304+ # Note: C++ code uses full matrix multiplication
305+
306+ diff = feature - self .means # (N_models, n_features)
307+
308+ # (x-mu)^T * inv_cov * (x-mu)
309+ # - Full covariance: inv_covs shape (N, F, F)
310+ # - Diagonal covariance: inv_covs shape (N, F) representing diag elements
311+ if self .inv_covs .ndim == 3 :
312+ # 'ni,nij,nj->n' where n=models, i,j=features
313+ dist_sq = np .einsum ("ni,nij,nj->n" , diff , self .inv_covs , diff )
314+ elif self .inv_covs .ndim == 2 :
315+ # diag quadratic form: sum_k (diff_k^2 * inv_var_k)
316+ dist_sq = np .sum ((diff * diff ) * self .inv_covs , axis = 1 )
317+ else :
318+ raise ValueError (
319+ f"Invalid inv_covs ndim={ self .inv_covs .ndim } ; expected 2 or 3."
320+ )
321+
322+ log_probs = self .const_terms - 0.5 * dist_sq
323+
324+ return log_probs
325+
326+ @classmethod
327+ def load_binary_template (cls , filename : str ) -> np .ndarray :
328+ """
329+ Port of C++ read_bintemplate function.
330+ Format: [FD (short)] [num_templates (short)] [data (double) * FD * num]
331+ """
332+ with open (filename , "rb" ) as f :
333+ header_bytes = f .read (4 )
334+ if len (header_bytes ) < 4 :
335+ raise ValueError (f"File { filename } is too short." )
336+
337+ # Little-endian short ('<h')
338+ fd , num_templates = struct .unpack ("<hh" , header_bytes )
339+
340+ print (f"Loading { filename } : FD={ fd } , Num={ num_templates } " )
341+
342+ data_size = 8 * fd * num_templates
343+ data_bytes = f .read (data_size )
344+
345+ if len (data_bytes ) != data_size :
346+ raise ValueError ("File size mismatch with header info." )
347+
348+ data = np .frombuffer (data_bytes , dtype = np .float64 )
349+ return data .reshape ((num_templates , fd ))
350+
351+ @classmethod
352+ def from_templates (cls , mean_path : str , cov_path : str ):
353+ """Load binary template files and create GaussianToneModel instance."""
354+ means = cls .load_binary_template (mean_path )
355+ raw_covs = cls .load_binary_template (cov_path )
356+
357+ # Fallback to synthetic templates if binary files are corrupted
358+ maxabs = float (np .max (np .abs (means ))) if means .size else float ("inf" )
359+ if not np .isfinite (maxabs ) or maxabs > 1e6 :
360+ import warnings
361+
362+ warnings .warn (
363+ "GaussianToneModel.from_templates(): template means look corrupted "
364+ f"(max|mean|={ maxabs :.3g} ). Falling back to synthetic CQT templates." ,
365+ RuntimeWarning ,
366+ )
367+ return cls .synthetic_cqt_templates (
368+ n_bins = 88 ,
369+ n_pitches = 88 ,
370+ noise_template = True ,
371+ pitch_variance = 5e-2 ,
372+ noise_variance = 5e-4 ,
373+ sigma = 1.5 ,
374+ mix_uniform = 0.8 ,
375+ noise_log_prior_penalty = 50.0 ,
376+ )
377+
378+ N , FD = means .shape
379+
380+ inv_covs = np .zeros_like (raw_covs , dtype = np .float64 )
381+ const_terms = np .zeros (N , dtype = np .float64 )
382+
383+ for i in range (N ):
384+ safe_cov = np .maximum (np .asarray (raw_covs [i ], dtype = np .float64 ), 1e-6 )
385+ inv_covs [i ] = 1.0 / safe_cov
386+ log_det_cov = np .sum (np .log (safe_cov ))
387+ const_terms [i ] = - 0.5 * (FD * np .log (2.0 * np .pi ) + log_det_cov )
388+ if not np .isfinite (const_terms [i ]):
389+ const_terms [i ] = - 1e10
390+
391+ return cls (np .asarray (means , dtype = np .float64 ), inv_covs , const_terms )
392+
393+ @classmethod
394+ def synthetic_cqt_templates (
395+ cls ,
396+ n_bins : int = 84 ,
397+ n_pitches : int = 88 ,
398+ noise_template : bool = True ,
399+ pitch_variance : float = 5e-2 ,
400+ noise_variance : float = 5e-4 ,
401+ sigma : float = 1.5 ,
402+ mix_uniform : float = 0.8 ,
403+ noise_log_prior_penalty : float = 50.0 ,
404+ ) -> "GaussianToneModel" :
405+ """
406+ Generate a simple, numerically-stable synthetic tone model.
407+
408+ - pitch template k: Gaussian bump centered at a mapped bin index
409+ - noise template: uniform spectrum
410+ """
411+ n_models = int (n_pitches + (1 if noise_template else 0 ))
412+ means = np .zeros ((n_models , n_bins ), dtype = np .float64 )
413+
414+ # map 88 pitch indices to n_bins (84) linearly
415+ bin_pos = np .linspace (0 , n_bins - 1 , num = n_pitches )
416+ xs = np .arange (n_bins , dtype = np .float64 )
417+ uni = np .ones (n_bins , dtype = np .float64 ) / n_bins
418+ for k in range (n_pitches ):
419+ c = float (bin_pos [k ])
420+ bump = np .exp (- 0.5 * ((xs - c ) / max (sigma , 1e-6 )) ** 2 )
421+ bump = bump / (np .sum (bump ) + 1e-12 )
422+ # Mix with uniform to account for CQT harmonic leakage
423+ a = float (np .clip (mix_uniform , 0.0 , 1.0 ))
424+ means [k ] = (1.0 - a ) * bump + a * uni
425+
426+ if noise_template :
427+ means [- 1 ] = uni
428+
429+ pitch_var = float (max (pitch_variance , 1e-8 ))
430+ noise_var = float (max (noise_variance , 1e-8 ))
431+ inv_covs = np .full ((n_models , n_bins ), 1.0 / pitch_var , dtype = np .float64 )
432+ const_terms = np .full (
433+ (n_models ,),
434+ - 0.5 * (n_bins * np .log (2.0 * np .pi ) + n_bins * np .log (pitch_var )),
435+ dtype = np .float64 ,
436+ )
437+ if noise_template :
438+ inv_covs [- 1 ] = 1.0 / noise_var
439+ const_terms [- 1 ] = - 0.5 * (
440+ n_bins * np .log (2.0 * np .pi ) + n_bins * np .log (noise_var )
441+ )
442+ # Prior penalty to prevent noise template from dominating
443+ const_terms [- 1 ] -= float (max (noise_log_prior_penalty , 0.0 ))
444+ model = cls (means = means , inv_covs = inv_covs , const_terms = const_terms )
445+ model .is_synthetic = True
446+ return model
447+
448+
449+ class HMMAudioProcessor :
450+ def __init__ (
451+ self ,
452+ tone_model : GaussianToneModel ,
453+ sample_rate : int = 16000 ,
454+ hop_length : int = 320 , # 20ms at 16k
455+ n_bins : int = 88 ,
456+ accept_cqt_features : bool = False ,
457+ ):
458+ self .tone_model = tone_model
459+ self .sample_rate = sample_rate
460+ self .hop_length = hop_length
461+ self .n_bins = n_bins
462+ self .accept_cqt_features = accept_cqt_features
463+
464+ def __call__ (self , y : np .ndarray ) -> np .ndarray :
465+ """
466+ Process audio and return observation probabilities for HMM.
467+
468+ Parameters
469+ ----------
470+ y : ndarray
471+ If accept_cqt_features=False: Input audio signal (1D array, raw audio)
472+ If accept_cqt_features=True: CQT features (2D array, shape: (n_frames, n_bins))
473+ or single frame (1D array, shape: (n_bins,))
474+
475+ Returns
476+ -------
477+ obs_probs : ndarray (N_models,)
478+ Observation likelihoods (Log scale) for HMM update
479+ """
480+ if self .accept_cqt_features :
481+ # Input is already CQT features
482+ if y .ndim == 2 :
483+ # Multiple frames: take the last frame
484+ spec = y [- 1 ]
485+ else :
486+ # Single frame
487+ spec = y
488+
489+ # Flatten to 1D if needed (handle (n_bins, 1) shape)
490+ spec = spec .flatten ()
491+
492+ # Ensure it's magnitude (in case it's complex)
493+ spec = np .abs (spec )
494+
495+ # Crop to match template feature dimension (84 bins)
496+ # Template expects FD=84, but CQTProcessor outputs 88 bins
497+ template_fd = self .tone_model .means .shape [
498+ 1
499+ ] # Get feature dimension from template
500+ if len (spec ) > template_fd :
501+ # Crop to match template dimension (take first template_fd bins)
502+ spec = spec [:template_fd ]
503+ elif len (spec ) < template_fd :
504+ # Pad if needed (shouldn't happen, but handle gracefully)
505+ padding = np .zeros (template_fd - len (spec ))
506+ spec = np .concatenate ([spec , padding ])
507+
508+ # Final check: ensure 1D (handle case where spec might be (n_features, 1))
509+ spec = np .asarray (spec ).flatten ()
510+ else :
511+ # Input is raw audio: compute CQT
512+ # 1. CQT Extraction (Corresponds to Filtered_Spectrum in C++)
513+ # Note: C++ uses a custom pre-calculated filter matrix.
514+ # librosa.cqt is a good approximation.
515+ cqt = librosa .cqt (
516+ y = y ,
517+ sr = self .sample_rate ,
518+ hop_length = self .hop_length ,
519+ fmin = librosa .note_to_hz ("A0" ), # MIDI 21
520+ n_bins = self .n_bins ,
521+ bins_per_octave = 12 ,
522+ )
523+
524+ # Get magnitude spectrum for the center frame
525+ # (If processing a buffer, take the specific column)
526+ spec = np .abs (cqt ).T [- 1 ] # Take the last frame if streaming
527+
528+ # Crop to match template feature dimension (84 bins)
529+ # Template expects FD=84, but we might compute 88 bins
530+ template_fd = self .tone_model .means .shape [
531+ 1
532+ ] # Get feature dimension from template
533+ if len (spec ) > template_fd :
534+ # Crop to match template dimension (take first template_fd bins)
535+ spec = spec [:template_fd ]
536+ elif len (spec ) < template_fd :
537+ # Pad if needed (shouldn't happen, but handle gracefully)
538+ padding = np .zeros (template_fd - len (spec ))
539+ spec = np .concatenate ([spec , padding ])
540+
541+ # Final check: ensure 1D
542+ spec = np .asarray (spec ).flatten ()
543+
544+ # 2. Normalization (Matches C++ 'calc_log_obsprob')
545+ # double power=(*new_feature).sum();
546+ # (*new_feature).array()/=(*new_feature).sum();
547+ # Ensure spec is 1D before normalization
548+ spec = np .asarray (spec ).flatten ()
549+ power = np .sum (spec ) + 1e-10
550+ normalized_spec = spec / power
551+
552+ # Ensure normalized_spec is 1D (not (n_features, 1))
553+ normalized_spec = np .asarray (normalized_spec ).flatten ()
554+
555+ # 3. Tone Model Calculation
556+ # C++: dists[i].calc(new_feature, power)
557+ log_obs_probs = self .tone_model .compute_log_likelihood (normalized_spec )
558+
559+ return log_obs_probs
224560
225561
226562class MelSpectrogramProcessor (Processor ):
0 commit comments