forked from w2naf/grapeDRF_doppler_model
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgrape_fft_CWT_single_plot.py
More file actions
224 lines (181 loc) · 10.3 KB
/
grape_fft_CWT_single_plot.py
File metadata and controls
224 lines (181 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# Program to read in the metadata and IQ data from a Grape receiver in digital_rf format
# This is FFT variant experimenting with Continuous Wavelet Transform (CWT) plotting single one-minute time interval
# See https://github.com/MITHaystack/digital_rf/blob/master/docs/DigitalRF2.0.pdf for digital_rf stuff
# hamsci.org/sites/default/files/Grape/2023-09-22%20Getting%20Started%20with%20Data%20Reporting%20Using%20A%20PSWS_V7.1.pdf
# also has details.
# Data for the examples in this folder are in channels ch0_Callsign in this directory:
# ch0_G4HZX is G4HZX 29 March 2025 partial eclipse use 15 MHz
# ch1_W2NAF is W2NAF 8 April 2024 use 25 MHz
# ch2_N8GA is N8GA 26 July 2024 use 10 MHz
# Script needs four command line arguments:
# 1. Directory to process 2. Index of array of frequency to plot
# 3. Observation time in decimal hours UTC 4. Number of peaks to find
# e.g. python3 grape_fft_CWT_single_plot_QEX.py ch0_W2NAF 8 14.5 2
# Last modified 26 June 2025 for use with QEX article submission
# Gwyn Griffiths G3ZIL with thanks for Nathaniel Frissell W2NAF for metadata data_dict code
import digital_rf as drf
import numpy as np
import pylab as plt
from mpl_toolkits.axisartist.parasite_axes import HostAxes
from datetime import datetime
import pytz
import scipy
from scipy.fft import fft, fftfreq, fftshift
from scipy import signal
from scipy.signal import peak_widths, butter, filtfilt
import sys
import csv # to write csv file for plotting and comparison in Excel
import os
import load_metadata # this is a module in this directory to read digital RF metadata
# set base directory for subsequent read operations and set up digital RF reader appropriate directory
base_directory='./'
data_dir=os.path.join(base_directory,'data','psws_grapeDRF')
output_dir=os.path.join(base_directory,'output')
do = drf.DigitalRFReader(data_dir)
# check for four command line arguments
n = len(sys.argv)
if n<=4:
print ("Rerun with channel name, frequency index and spectrum time and number of peaks as four command line arguments")
exit()
if n>5:
print ("Rerun with channel name, frequency index and spectrum time and number of peaks as four command line arguments")
exit()
# assign first three command line arguments to variables, check time span and that end time > start time + one hour
channel=sys.argv[1]
freq_index=int(sys.argv[2]) # Get index from metadata frequency list e.g. use grape_digital_RF_metadata.py or inspect PSWS spectrogram
hours_offset=float(sys.argv[3]) # Start time for data input and plot
if hours_offset < 0 or hours_offset > 23.9:
print ("Time (hours) must be between 0 and 23.9")
exit()
n_peaks=int(sys.argv[4]) # how many peaks to identify
length=1 # we'll get just one minute of data at hours_offset
##########################################################
# Data processing functions
##########################################################
# local peak search: takes array index of CWF identified peak, does local search n bins either side for a true peak, returns index
def findLocalPeak (index, radius,level):
# This method finds if the true local peak is to one side or other of CWF peak, and if so returns its index
if index < 5 or index >594:
return index # This is special case at either end near -5 and +5 Hz where we cannot search. Should not happen
cwf_peak=level[index]
for i in range (index-radius,index+radius+1):
if level[i] > cwf_peak:
index=i
cwf_peak=level[i]
return index
# Interpolate between frequency bins based on the weighted linear signal level at peak and either side
def freqInterpolate (index, radius, x, level):
# This method interpolates in frequency space around true local peak returning an amplitude-weighted frequency
if index < 5 or index >594:
return x[index] # This is special case at either end near -5 and +5 Hz where we cannot search. Should not happen
sum=0
sum_weights=0
for i in range (index-radius,index+radius+1):
sum=sum+x[i]*10**(level[i]/20) # Convert dB level to linear
sum_weights=sum_weights+10**(level[i]/20)
freq_interp=sum/sum_weights # Interpolated peak frequency
return freq_interp
def remove_adjacent(L): # This function removes instances where a single peak has adjacent frequencies
return [elem for i, elem in enumerate(L) if i == 0 or L[i-1]+1 != elem]
################################################
# Get metadata then set up constants and arrays
################################################
# Call module function to read in metadata, draws on data_dict code from Nathaniel Frissell
(date,freqList,s1,s0,fs,theCallsign,grid,lat,lon) = load_metadata.load_grape_drf_metadata(data_dir,channel)
Hann_factor=1.63 # This is the energy correction factor # https://community.sw.siemens.com/s/article/window-correction-factors
time_window=60 # 60 seconds
m_samples=int(fs*time_window)
n_samples=int(length*m_samples+1) # how many samples at fs to read in
s=s0+hours_offset*3600*fs # calculate start time given command line start time offset
frequency=freqList[freq_index] # This comes from command line argument and metadata frequency list
freq_peaks=np.empty(n_peaks) # to hold candidate peaks
level_peaks=np.empty(n_peaks) # their signal level
spec_widths=np.empty(n_peaks) # and width estimate between half peak amplitude
csv_dir=os.path.join(output_dir,'csv',theCallsign)
if not os.path.exists(csv_dir):
os.makedirs(csv_dir)
csv_filename=csv_dir+'/one_FFT_data_' + "_" + str(frequency) + "MHz_" + date + ".csv" #
########################################
# digital_rf read in code
########################################
do.get_channels()
# get samples, these are i,q pairs. Starting at s
input = do.read_vector(s, n_samples, channel)
if len(freqList) > 1:
data=input[:,freq_index]
else: # single channel Grape so 1 dimensional data array
data=input[:]
unix_time=np.int64(s/10)
plot_time = datetime.fromtimestamp(unix_time,pytz.utc).strftime('%Y-%m-%dT%H%M%S')
plot_hour = datetime.fromtimestamp(unix_time,pytz.utc).strftime('%H:%M:%S')
print ("Analysis at ",plot_time)
# get samples, these are i,q pairs. Starting at s and going on for length*10*60
data = do.read_vector(s, n_samples, channel)
######################
# Calculations
######################
# generate the x axis, which is frequency here
x=fftshift(fftfreq(m_samples,1/fs))
# generate a Hann window of length m_samples (i.e. 600 samples)
window = signal.windows.hann(m_samples)
if len(freqList) > 1:
yf=fftshift(fft(data[0:m_samples,freq_index]*window,norm="forward",overwrite_x=False)*Hann_factor) # do the FFT and fftshift moves 0 Hz to centre
else:
yf=fftshift(fft(data[0:m_samples]*window,norm="forward",overwrite_x=False)*Hann_factor) # fudge, cannot understand why I need 2 indicies above, this is for single freq Grape
yf=10*np.log10(np.abs(yf)) # convert to dB
with open(csv_filename, 'w', encoding='UTF8',) as out_file: # open the csv file to write to
writer=csv.writer(out_file)
writer.writerow(["Frequency (Hz)","Spectrum Level (dB)"])
for m in range(0,m_samples):
writer.writerow([x[m],yf[m]])
######################################################################################
# Scipy find_peaks_cwt approach using continuous wavelet transform
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.find_peaks_cwt.html
#######################################################################################
peaks = signal.find_peaks_cwt(yf, widths=np.arange(2,4)) # 2,4 is empirical selection for one-hop widths
#print ("CWF", peaks)
peakind=remove_adjacent(peaks) # in case single peak shown as two adj freqs
# find the index at four successively reducing maxima: algorithm finds first max, finds freq and level at that index, then sets max that index to zero
# and iterates
for i in range(0,n_peaks):
max=np.argmax(yf[peakind])
index_max=peakind[max]
index_max_original=index_max
freq_peaks[i]=x[index_max]
level_peaks[i]=yf[index_max]
print("\nCWF peak ",i," freq=", f"{freq_peaks[i]:.3f}", " Hz at level=",f"{level_peaks[i]:.2f}"," dB at index ",index_max)
# Now call function to look either side to find true peak, revise and print output
index_max=findLocalPeak(index_max,3,yf)
freq_peaks[i]=float(x[index_max])
level_peaks[i]=yf[index_max]
print("Revised CWF peak ",i," freq=", f"{freq_peaks[i]:.3f}", " Hz at level=",f"{level_peaks[i]:.2f}", " dB at index_max ",index_max)
# Now estimate spectrum widths between half max amplitudes for each peak
tops=np.ones((1,), dtype=int)*int(index_max)
(width,width_heights,left_ips,right_ips)=peak_widths(10**(yf/20),tops,rel_height=0.5, wlen=10)
spec_widths[i]=width[0]/time_window
print("Spec width w50 ",i," = ", f"{spec_widths[i]:.3f}", " Hz" )
# Now interpolate the true peak frequency based on correlation level either side
freq_peaks[i]=freqInterpolate(index_max,2,x,yf)
print("Interpolated CWF peak ",i," freq= ", f"{freq_peaks[i]:.3f}", " Hz" )
# Need to remove the peak just found from the array list of peaks
to_remove=np.array([index_max_original])
peakind=np.setdiff1d(peakind,to_remove)
###########################################
# Plots of Spectrum and peaks
###########################################
plot_dir=os.path.join(output_dir,'plots',theCallsign) # plots go into a subdirectory by callsign
if not os.path.exists(plot_dir):
os.makedirs(plot_dir)
# plot the spectrum with the identified peaks
fig, ax= plt.subplots() #
plt.suptitle("FFT Doppler Spectrum " + theCallsign + " at " + str(frequency) + " MHz at " + plot_time, fontsize=12)
xaxis_title="Frequency (Hz)"
plt.plot(x, yf, color='black')
plt.xlabel(xaxis_title)
plt.ylabel("PSD uncalibrated (dB)")
plt.scatter(freq_peaks[0:n_peaks], level_peaks[0:n_peaks], s= 40, c = 'red') # plots red dots at the peaks
for i in range (0,n_peaks): # annotates the peak frequencies
ax.annotate('Pk (%.3f)'% (freq_peaks[i]), (freq_peaks[i], 6*(np.random.uniform()*1)+(level_peaks[i])))
plt.savefig(plot_dir +"/One_Spectrum" + "_" + str(frequency) + "MHz_" + plot_time + ".png", dpi=600)
####
plt.show()