code block for getting signals
### Get coil currents from ptdata ###
try:
from ptdata import fetch_ptdata, PtDataFetcher
except Exception as e:
print("could not import ptdata, got exception")
print(e)
PCS_SYS_D3 = ":/fusion/projects/codes/pcs/data/ptdata:/fusion/projects/codes/pcs/data/ptdata/uncomp:"
import numpy as np
from scipy.interpolate import interp1d
import sys
shot = int(sys.argv[1]) # full shot numver
time = int(sys.argv[2]) # time is 4 digit time
def compare_to_ptdata_diags_Bp_probes(
ptnames,
shot,
time,
coilset_in=None,
coil_grid=None,
eq=None,
get_coilset_kwargs={},
):
shot = round(shot)
bad_names = []
interpolable_ptnames = {}
print(shot)
for ptname in ptnames:
# print(ptname)
try:
fetcher = PtDataFetcher(ptname, round(shot), sys_d3=PCS_SYS_D3)
header = fetcher.header
results = fetcher.fetch()
interpolable_ptnames[ptname] = interp1d(
x=results["times"], y=results["data"], kind="nearest"
)
except Exception as e:
print(f"Shot {shot} does not have ptname {ptname}")
print(e)
bad_names.append(ptname)
bad_name_inds = np.asarray(
[i for i, name in enumerate(ptnames) if name not in bad_names]
)
probe_names_minus_bad_names = [name for name in ptnames if name not in bad_names]
print("#" * 15)
print(f"Shot {shot}")
print("#" * 15)
print(f"t = {time}")
# poloidal probes
Bps_measured = [
interpolable_ptnames[name](time) for name in probe_names_minus_bad_names
]
return Bps_measured, bad_names
import pickle
import f90nml
# get diag signal values and positions
# retrieve the flux loop and B probe info from the EFIT files
mhdin = f90nml.read("../mhdin_for_EFIT_112000_to__156014.dat")
# magnetic probes for poloidal field
efit_mp_names = mhdin["IN3"]["MPNAM2"]
for i in range(len(efit_mp_names)):
efit_mp_names[i] = efit_mp_names[i].strip()
efit_mp_measurements, bad_names = compare_to_ptdata_diags_Bp_probes(
efit_mp_names, shot, time
)
fn = "efit_mp_values.p"
with open(fn, "wb") as f:
efit_mp_names = pickle.dump(efit_mp_measurements, f)
if len(bad_names) > 0:
for i in range(len(bad_names)):
bad_names[i] = bad_names[i] + "\n"
with open("mp_probes_to_remove.txt", "w+") as f:
f.writelines(bad_names)
code block for cleaning signals given mhdin
import os
from desc.magnetic_fields import SplineMagneticField
coils = load("../g0198489.01600_coilset_DESC.h5")
if not os.path.exists("../g0198489.01600_coilset_DESC_splined_field_N500_grid.h5"):
R = np.linspace(0.5, 3, 100)
Z = np.linspace(-2, 2, 100)
phi = np.linspace(0, 2 * np.pi, 2)
splined_field = SplineMagneticField.from_field(
coils, R, phi, Z, source_grid=LinearGrid(N=500), chunk_size=100
)
splined_field.save("../g0198489.01600_coilset_DESC_splined_field_N500_grid.h5")
else:
splined_field = load("../g0198489.01600_coilset_DESC_splined_field_N500_grid.h5")
field = splined_field
# get diag signal values and positions
# retrieve the flux loop and B probe info from the EFIT files
mhdin = f90nml.read("../mhdin_197555_and_later.dat")
# magnetic probes for poloidal field
efit_mp_names = mhdin["IN3"]["MPNAM2"]
for i in range(len(efit_mp_names)):
efit_mp_names[i] = efit_mp_names[i].strip()
efit_mp_coords_rpz = np.vstack(
[mhdin["IN3"]["XMP2"], np.zeros_like(mhdin["IN3"]["XMP2"]), mhdin["IN3"]["YMP2"]]
).T
efit_mp_angles = np.asarray(mhdin["IN3"]["AMP2"])
dir_phis_Bp_probes = np.zeros_like(efit_mp_angles)
dir_Rs_Bp_probes = np.cos(efit_mp_angles / 180 * np.pi)
dir_Zs_Bp_probes = np.sin(efit_mp_angles / 180 * np.pi)
efit_mp_directions_rpz = np.vstack(
[dir_Rs_Bp_probes, dir_phis_Bp_probes, dir_Zs_Bp_probes]
).T
# from the linked mhd.in stuff
efit_probe_dict = {}
mhdin = f90nml.read("../mhdin_197555_and_later.dat")
efit_probe_names = mhdin["IN3"]["MPNAM2"]
for i in range(len(efit_probe_names)):
efit_probe_names[i] = efit_probe_names[i].strip()
# flux loops for poloidal flux
efit_flux_loop_names = mhdin["IN3"]["LPNAME"]
for i in range(len(efit_flux_loop_names)):
efit_flux_loop_names[i] = efit_flux_loop_names[i].strip()
efit_flux_loop_coords_rz = np.vstack([mhdin["IN3"]["RSI"], mhdin["IN3"]["ZSI"]]).T
flux_loops = CoilSet(
[
FourierRZCoil(current=0.0, R_n=r, Z_n=z, sym=False)
for r, z in efit_flux_loop_coords_rz
],
check_intersection=False,
)
# efit_flux_loop_dict = {}
# for name, efit_loop_data in zip(efit_flux_loop_names, efit_data_dict["flux_loops"]):
# efit_flux_loop_dict[name] = efit_loop_data
import pickle
fn = "efit_flux_loop_values.p"
with open(fn, "rb") as f:
efit_flux_loop_values_with_plasma = pickle.load(f)
# convert from Wb/rad to Wb
efit_flux_loop_values_with_plasma_in_Wb_shifted = (
(np.asarray(efit_flux_loop_values_with_plasma)) * 2 * np.pi
)
# all except first (PSF1A) are measured relative to PSF1A, so shift back to get absolute measurement
efit_flux_loop_values_with_plasma_in_Wb_shifted[
1:
] += efit_flux_loop_values_with_plasma_in_Wb_shifted[0]
import pickle
fn = "efit_mp_values.p"
with open(fn, "rb") as f:
efit_mp_values_with_plasma = pickle.load(f)
# DSL6U157 does not exist so remove that one, I know this from when I fetched the ptname and it failed to find this for 198489
with open("mp_probes_to_remove.txt", "r") as f:
mp_names_to_remove = f.readlines()
mp_inds_remove = []
for mpname in mp_names_to_remove:
mp_inds_remove.append(efit_mp_names.index(mpname.strip()))
# DSL6U157 does not exist so remove that one, I know this from when I fetched the ptname and it failed to find this for 198489
with open("mp_probes_to_remove.txt", "r") as f:
mp_names_to_remove = f.readlines()
mp_inds_remove = []
for mpname in mp_names_to_remove:
mp_inds_remove.append(efit_mp_names.index(mpname.strip()))
print(f"{mp_inds_remove=}")
print(f"efit_mp_values_with_plasma size {len(efit_mp_values_with_plasma)}")
print(f"efit mp names length: {len(efit_mp_names)}")
# further clean by removing ones ==0, and ones very large
mp_additionally_inds_remove_due_to_data = []
if np.where(np.array(efit_mp_values_with_plasma) < -0.3)[0].size > 0:
mp_additionally_inds_remove_due_to_data += [
ind for ind in np.where(np.array(efit_mp_values_with_plasma) < -0.3)[0]
]
if np.where(np.isclose(efit_mp_values_with_plasma, 0, atol=1e-6))[0].size > 0:
mp_additionally_inds_remove_due_to_data += [
ind for ind in np.where(np.isclose(efit_mp_values_with_plasma, 0, atol=1e-3))[0]
]
flux_loop_inds_remove = np.where(
np.logical_or(
np.isclose(efit_flux_loop_values_with_plasma_in_Wb_shifted, 0, atol=1e-1),
np.array(efit_flux_loop_values_with_plasma_in_Wb_shifted) < -2,
)
)[0]
with open("rogowski_plasma_current_value.p", "rb") as f:
measured_plasma_current = pickle.load(f)
if len(mp_inds_remove):
# mp_inds_remove.sort()
# mp_inds_remove.reverse()
# for ind_remove in mp_inds_remove:
mp_meas_coords = np.delete(efit_mp_coords_rpz, mp_inds_remove, axis=0)
mp_meas_dirs = np.delete(efit_mp_directions_rpz, mp_inds_remove, axis=0)
if len(mp_additionally_inds_remove_due_to_data):
mp_meas_coords = np.delete(
mp_meas_coords, mp_additionally_inds_remove_due_to_data, axis=0
)
mp_meas_dirs = np.delete(
mp_meas_dirs, mp_additionally_inds_remove_due_to_data, axis=0
)
# also remove those values from the efit stuff
efit_mp_values_with_plasma = np.delete(
efit_mp_values_with_plasma, mp_additionally_inds_remove_due_to_data
)
code block for getting signals
code block for cleaning signals given mhdin