From 6c44b70b646dee00d33897d240f3f8bb1db13490 Mon Sep 17 00:00:00 2001 From: VanOschB Date: Tue, 31 Mar 2026 20:37:15 -0400 Subject: [PATCH 1/6] Initial Updates of Tuning Structure Some initial edits to all files, just put them under gui anyway. --- src/data_analysis.py | 432 +++++++++++++++++ src/gui.py | 2 +- src/instrument_control.py | 950 ++++++++++++++++++++++++++++++++++++++ src/logger.py | 112 +++++ src/main.py | 13 +- src/utils_updated.py | 2 +- 6 files changed, 1506 insertions(+), 5 deletions(-) create mode 100644 src/data_analysis.py create mode 100644 src/instrument_control.py create mode 100644 src/logger.py diff --git a/src/data_analysis.py b/src/data_analysis.py new file mode 100644 index 0000000..cc69a4a --- /dev/null +++ b/src/data_analysis.py @@ -0,0 +1,432 @@ +# Import modules + +import yaml, datetime, sys, time, os, shutil, json,re +from pathlib import Path + +import pandas as pd + +import numpy as np + +import scipy as sp +from scipy.ndimage import convolve + +import matplotlib.pyplot as plt +import matplotlib.cm as cm + +from typing import List, Dict + +import qcodes as qc +from qcodes.dataset import AbstractSweep, Measurement +from qcodes.dataset.dond.do_nd_utils import ActionsT +from qcodes.parameters import ParameterBase +import numpy.typing as npt + +import skimage +from skimage.transform import probabilistic_hough_line +from skimage.feature import canny +from skimage.filters import threshold_otsu +from skimage.morphology import diamond, rectangle # noqa + +import logging +from colorlog import ColoredFormatter +import sys + +from nicegui import ui +import threading + + +class DataAnalysis: + + def __init__(self, logger, tuner_config) -> None: + + self.logger = logger + + self.tuner_info = yaml.safe_load(Path(tuner_config).read_text()) + + self.model_path = self.tuner_info['barrier_barrier']['segmentation_model_path'] + self.model_config_path = self.tuner_info['barrier_barrier']['segmentation_model_config_path'] + self.model_name =self.tuner_info['barrier_barrier']['segmentation_model_name'] + self.model_processor = self.tuner_info['barrier_barrier']['segmentation_model_processor'] + self.confidence_threshold = self.tuner_info['barrier_barrier']['segmentation_confidence_threshold'] + self.polygon_threshold = self.tuner_info['barrier_barrier']['segmentation_polygon_threshold'] + self.segmentation_class = self.tuner_info['barrier_barrier']['segmentation_class'] + + def logarithmic(self, x, a, b, x0, y0): + return a * np.log(b*(x-x0)) + y0 + + def exponential(self, x, a, b, x0, y0): + return a * np.exp(b * (x-x0)) + y0 + + def sigmoid(self, x, a, b, x0, y0): + return a/(1+np.exp(b * (x-x0))) + y0 + + def linear(self, x, m, b): + return m * x + b + + def extract_bias_point(self, + data: pd.DataFrame, + plot_process: bool, + axes: plt.Axes): + + # Inference image for anything above 0.5 + outputs, metadata, image, Xdata, Ydata = inference( + data, + self.model_path, + self.model_config_path, + self.model_name, + self.model_processor, + self.confidence_threshold, + self.polygon_threshold, + plot=plot_process + ) + + # Only keep things with class 'CD' = 'Central Dot' + outputs = outputs[outputs.pred_classes == metadata.thing_classes.index(self.segmentation_class)] + + # Get the bounding box with the best score + bboxes = outputs.pred_boxes.tensor.numpy() + max_score_index = np.argmax(outputs.scores) + best_bbox = bboxes[max_score_index] + best_score = outputs.scores[max_score_index] + + bbox_units = pixel_polygon_to_image_units(best_bbox, data) + x, y = bbox_units[:,0], bbox_units[:,1] + x1,x2 = x + y1,y2 = y + + if plot_process: + + # Plot the bounding box + + axes.plot([x1, x2], [y1, y1], linewidth=3, alpha=0.5, linestyle='--', color='k') # Top line + axes.plot([x1, x2], [y2, y2], linewidth=3, alpha=0.5, linestyle='--', color='k') # Bottom line + axes.plot([x1, x1], [y1, y2], linewidth=3, alpha=0.5, linestyle='--', color='k') # Left line + axes.plot([x2, x2], [y1, y2], linewidth=3, alpha=0.5, linestyle='--', color='k') # Right line + label_text = 'CD' +' ' + str(round(best_score.item() * 100)) + "%" + axes.text(x1, y2, label_text, color='k', fontsize=10, verticalalignment='bottom') + + voltage_window = {Xdata.name.split('_')[-1]: (x1,x2), Ydata.name.split('_')[-1]:(y1,y2)} + print(f"Suggested voltage window: {voltage_window}") + + range_X = voltage_window[Xdata.name.split('_')[-1]] + range_Y = voltage_window[Ydata.name.split('_')[-1]] + windowed_data = data[ + (data[Xdata.name] >= range_X[0]) & (data[Xdata.name] <= range_X[1]) & + (data[Ydata.name] >= range_Y[0]) & (data[Ydata.name] <= range_Y[1]) + ] + + window_data_image, Xdata, Ydata = convert_data_to_image(windowed_data) + window_data_image = window_data_image[:,:,0] + edges = canny(window_data_image,sigma=0.5, low_threshold=0.1*np.iinfo(np.uint8).max, high_threshold=0.3 * np.iinfo(np.uint8).max) + lines = probabilistic_hough_line(edges, threshold=0, line_length=3, + line_gap=0) + + if plot_process: + # Generating figure 2 + fig, ax = plt.subplots(1, 3, figsize=(15, 5)) + ax = ax.ravel() + + ax[0].imshow(window_data_image, cmap=cm.gray, origin='lower', extent=[Xdata.min(), Xdata.max(), Ydata.min() , Ydata.max()],) + ax[0].set_title('Input image') + + ax[1].imshow(edges, cmap=cm.gray, origin='lower', extent=[Xdata.min(), Xdata.max(), Ydata.min() , Ydata.max()],) + ax[1].set_title('Masked Canny edges') + + potential_points = {} + angles_data = [] + slopes_data = [] + for line in lines: + p0_pixel, p1_pixel = line + p0, p1 = pixel_polygon_to_image_units(line, windowed_data) + + dy = (p1[1]-p0[1]) + dx = (p1[0]-p0[0]) + if dx == 0: + continue + m = dy/dx + theta = np.arctan(m)*(180/np.pi) + if theta > -40 or theta < -50: + continue + angles_data.append(theta) + slopes_data.append(m) + midpoint_pixel = (np.array(p0_pixel) + np.array(p1_pixel))/2 + midpoint_units = (np.array(p0) + np.array(p1))/2 + # print(midpoint) + midpoint_pixel = midpoint_pixel.astype(int) + + X_name,Y_name,Z_name = windowed_data.columns[:3] + current_at_midpoint = windowed_data[Z_name].to_numpy().reshape(len(Xdata), len(Ydata))[midpoint_pixel[0],midpoint_pixel[1]] + potential_points[tuple(midpoint_units)] = current_at_midpoint + + if plot_process: + ax[1].plot((p0[0], p1[0]), (p0[1], p1[1])) + ax[1].scatter([midpoint_units[0]],[midpoint_units[1]], marker='*',s=50) + ax[0].plot((p0[0], p1[0]), (p0[1], p1[1])) + ax[0].scatter([midpoint_units[0]],[midpoint_units[1]], marker='*',s=50) + + if plot_process: + ax[1].set_title('Hough Transform') + ax[1].set_axis_off() + + ax[1].set_title('Histogram of Detected Line Angles') + ax[2].hist(angles_data, bins=2*int(np.sqrt(len(slopes_data)))) + ax[2].set_xlabel(r"$\theta^\circ$") + ax[2].set_ylabel(r"$f$") + + max_key = np.array(max(potential_points, key=potential_points.get)) + bias_point = {Xdata.name.split('_')[-1]: max_key[0], Ydata.name.split('_')[-1]: max_key[1]} + axes.scatter(*max_key, marker='*', s=30, c='k', label='Bias Point') + axes.legend(loc='best') + print(f"Suggested bias point: {bias_point}") + + return bias_point, voltage_window + + def extract_max_conductance_point(self, + data: pd.DataFrame, + plot_process: bool = False, + sigma: float = 0.5) -> dict: + + V_name, I_name = data.columns + + data = data.rename( + columns={I_name: '{}_current'.format(I_name.split('_')[0])} + ) + data.iloc[:,-1] = data.iloc[:,-1].subtract(0).mul(1e-7) # sensitivity + + V_name, I_name = data.columns + + I_data = data[I_name] + V_data = data[V_name] + + I_filtered = sp.ndimage.gaussian_filter1d( + I_data, sigma + ) + + G_data = np.gradient(I_filtered, np.abs(V_data.iloc[-1] - V_data.iloc[-2])) + G_filtered = sp.ndimage.gaussian_filter1d( + G_data, sigma + ) + + threshold = max(G_filtered) / 10 + + maxima = sp.signal.argrelextrema(G_filtered, np.greater)[0] + maxima_indices = maxima[G_filtered[maxima] >= threshold] + + if len(maxima_indices) != 0: + results = dict(zip(V_data.iloc[maxima_indices], G_filtered[maxima_indices])) + + results_sorted = dict(sorted(results.items(), key=lambda item: item[1])) + + if plot_process: + + # Create figure and axes + fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6), sharex=True) + + # Plot for I_SD vs V_ST + ax1.set_title(r"$I_{SD}$") + ax1.set_ylabel(r"$I_{SD}\ (A)$") + ax1.plot(V_data, I_data, 'k-', alpha=0.3,linewidth=0.75) + ax1.plot(V_data, I_filtered, 'k-', linewidth=1.5) + + # Plot for G vs V_ST + ax2.set_title(r"$G_{SD}$") + ax2.set_ylabel(r"$G_{SD}\ (S)$") + ax2.set_xlabel(r"$V_{P}\ (V)$") + ax2.plot(V_data, G_data, 'k-', alpha=0.3, linewidth=0.75) + ax2.plot(V_data, G_filtered, 'k-', linewidth=1.5) + ax2.hlines(y=threshold, xmin=V_data.iloc[0], xmax=V_data.iloc[-1], color='black', linestyle='--', linewidth=1.5) + + def legend_without_duplicate_labels(ax): + # Helper function to prevent duplicates in the legend. + handles, labels = ax.get_legend_handles_labels() + unique = [(h, l) for i, (h, l) in enumerate(zip(handles, labels)) if l not in labels[:i]] + ax.legend(*zip(*unique)) + + # Plot all the points of interest according to their sensitivity + if len(maxima_indices) > 0: + for i in maxima_indices: + + index = list(results_sorted.keys()).index(V_data.iloc[i]) + if index == len(results_sorted)-1: + label = "High Sensitivity" + color = 'b' + elif index == 0: + label = "Low Sensitivity" + color = 'r' + else: + label = "Medium Sensitivity" + color = 'g' + + if plot_process: + ax1.text(V_data.iloc[i], I_filtered[i], f'{V_data.iloc[i]:.2f} mV', color='black', fontsize=6, fontweight=750, ha='right', va='bottom', bbox=dict(facecolor='white', edgecolor='black', boxstyle='round,pad=0.5', alpha=0.25)) + ax2.scatter(V_data.iloc[i], G_filtered[i], c=color, label=label) + ax1.scatter(V_data.iloc[i], I_filtered[i], c=color, label=label) + ax1.legend(loc='best') + legend_without_duplicate_labels(ax1) + + V_P_high, G_high = list(results_sorted.items())[-1] + V_P_med, G_med = list(results_sorted.items())[int(len(results_sorted.items())//2)] + V_P_low, G_low = list(results_sorted.items())[0] + + gate_name = V_name.split('_')[-1] + return {'high': {gate_name: V_P_high, 'conductance': G_high}, + 'medium': {gate_name: V_P_med, 'conductance': G_med}, + 'low': {gate_name: V_P_low, 'conductance': G_low}} + + def extract_lever_arms(self, + data: pd.DataFrame, + plot_process: bool = False) -> dict: + + # Load in data and seperate + X_name, Y_name, Z_name = data.columns + Xdata, Ydata = np.unique(data[X_name]), np.unique(data[Y_name]) + + df_pivoted = data.pivot_table(values=Z_name, index=Y_name, columns=X_name).fillna(0) + Zdata = df_pivoted.to_numpy() + + # Calculate conductance where G = dI / dVp + G = np.gradient(Zdata)[1] + + if plot_process: + plt.imshow(G, origin='lower', extent=[Xdata.min(), Xdata.max(), Ydata.min() , Ydata.max()], aspect=(Xdata.max() - Xdata.min())/(Ydata.max() - Ydata.min())) + plt.title("Transconductance") + plt.colorbar() + plt.show() + + # Apply filter to bring out edges better + def U(x,y): + sigX, sigY = 5,5 + return (1/(2 * np.pi * sigX * sigY)) * np.exp(- 0.5* ((x/sigX)**2 + (y/sigY)**2)) + def adjusted(G,G0): + return np.sign(G) * np.log((np.abs(G)/G0) + 1) + def F(U, G, G0): + # G = adjusted(G,G0) + return (G - convolve(G,U)) / np.sqrt((convolve(G,U))**2 + G0**2) + + N=2 + U_kernal = np.array([[U(x, y) for y in range(-(N-1)//2,(N-1)//2 + 1)] for x in range(-(N-1)//2,(N-1)//2 + 1)]) + cond_quant = 3.25 * 1e-5 + filtered_G = np.abs(F(U_kernal, G, G0=10**-7 * cond_quant)) + + if plot_process: + plt.imshow(filtered_G, origin='lower', extent=[Xdata.min(), Xdata.max(), Ydata.min() , Ydata.max()], aspect=(Xdata.max() - Xdata.min())/(Ydata.max() - Ydata.min())) + plt.title("Filtered Transconductance") + plt.colorbar() + plt.show() + + # Apply binary threshold to bring out diamonds better + thresh = threshold_otsu(filtered_G) + binary_image = filtered_G < thresh + + if plot_process: + plt.imshow(binary_image, origin='lower', extent=[Xdata.min(), Xdata.max(), Ydata.min() , Ydata.max()], aspect=(Xdata.max() - Xdata.min())/(Ydata.max() - Ydata.min())) + plt.title("Filtered Transconductance Binary") + plt.colorbar() + plt.show() + + # Erode any artifacts and keep just the diamond shapes + footprint = rectangle(13, 6) + erode = skimage.morphology.erosion(binary_image,footprint) + + footprint = diamond(1) + erode = skimage.morphology.erosion(erode,footprint) + + if plot_process: + plt.imshow(erode, origin='lower', extent=[Xdata.min(), Xdata.max(), Ydata.min() , Ydata.max()], aspect=(Xdata.max() - Xdata.min())/(Ydata.max() - Ydata.min())) + plt.title("Filtered Transconductance Binary Eroded") + plt.show() + + # Attempt to find contours + contours = skimage.measure.find_contours(erode, 0.8) + + if len(contours) == 0: + return + + # Display the image and plot all contours found + fig, ax = plt.subplots() + + ax.imshow(Zdata, origin='lower', extent=[Xdata.min(), Xdata.max(), Ydata.min() , Ydata.max()], aspect=(Xdata.max() - Xdata.min())/(Ydata.max() - Ydata.min())) + ax.set_title(r'$I_{SD}$') + ax.set_ylabel(r'$V_{SD}$ (V)') + ax.set_xlabel(r'$V_{P}$ (V)') + ax.set_aspect('auto') + + addition_voltages = [] + charging_voltages = [] + results = {} + + + for i, contour in enumerate(contours): + if len(contour) < 350: + continue + + # Convert to proper units for calculations + image_units = [] + for coordinate in contour: + image_units.append([Ydata[int(coordinate[0])], Xdata[int(coordinate[1])]]) + image_units = np.array(image_units) + + Y = image_units[:,0] + X = image_units[:,1] + + Xmax = max(X) + Xmin = min(X) + Ymax = max(Y) + Ymin = min(Y) + + # Get centroid + centroidX, centroidY = 0.5*(Xmax + Xmin), 0.5 * (Ymax + Ymin) + + dX = Xmax - Xmin + dY = Ymax - Ymin + + divider = 1e-3 + alpha= (Ymax * divider /2) / dX + + e = 1.60217663e-19 # C + + eps0 = 8.8541878128e-12 # F/m + epsR = 11.7 # Silicon + + Vadd = Xmax - Xmin # V + Vc = dY * divider /2 # V + addition_voltages += [Vadd] + charging_voltages += [Vc] + C_P = e / Vadd # F + C_sigma = e / Vc # F + dot_size = C_sigma / (8 * eps0 * epsR) # m + alpha = (dY * divider /2) / dX # eV/V + + results[i]= { + 'centroid': (centroidX, centroidY), + 'Vadd': Vadd, + 'Vcharge': Vc, + 'Cp': C_P, + 'CSigma': C_sigma, + 'lever arm': alpha, + 'dot size': dot_size + } + + ax.plot(image_units[:, 1], image_units[:, 0], linewidth=1, linestyle='-', c='k') + label_text = r'$\alpha$ =' + str(round(alpha,3)) + ax.text(0.98*centroidX, 1.2 * Ymax, label_text, color='k', fontsize=8, verticalalignment='bottom') + + label_text = r'$V_{add}$ =' + str(round(Vadd*1e3,1)) + 'mV' + ax.text(0.95*centroidX, 1.3 * Ymin, label_text, color='k', fontsize=8, verticalalignment='bottom') + + label_text = r'$V_{charge}$ =' + str(round(Vc * 1e3,1)) + 'mV' + ax.text(0.95*centroidX, 1.5 * Ymin, label_text, color='k', fontsize=8, verticalalignment='bottom') + + label_text = r'$C_{P}$ =' + str(round((e / Vadd) * 1e18,2)) + 'aF' + ax.text(0.95*centroidX, 1.7 * Ymin, label_text, color='k', fontsize=8, verticalalignment='bottom') + + label_text = r'$C_{\Sigma}$ =' + str(round((e / Vc) * 1e18,2)) + 'aF' + ax.text(0.95*centroidX, 1.9 * Ymin, label_text, color='k', fontsize=8, verticalalignment='bottom') + ax.scatter([centroidX], [centroidY], marker='*', s=30, c='k') + + label_text = r'$R_{dot}$ =' + str(round(dot_size * 1e9,2)) + 'nm' + ax.text(0.95*centroidX, 2.1 * Ymin, label_text, color='k', fontsize=8, verticalalignment='bottom') + ax.scatter([centroidX], [centroidY], marker='*', s=30, c='k') + + plt.show() + return results \ No newline at end of file diff --git a/src/gui.py b/src/gui.py index 72c6a03..ca2cfee 100644 --- a/src/gui.py +++ b/src/gui.py @@ -1,6 +1,6 @@ ''' File: gui.py -Authors: Mason Daub (mjdaub@uwaterloo.ca), Benjamin Van Osch (email) +Authors: Mason Daub (mjdaub@uwaterloo.ca), Benjamin Van Osch (bvanosch@uwaterloo.ca) This file contains the gui class that runs the auto tuner. As of now, we are using the nicegui web server as the user interface for the auto tuner. diff --git a/src/instrument_control.py b/src/instrument_control.py new file mode 100644 index 0000000..00fb0ff --- /dev/null +++ b/src/instrument_control.py @@ -0,0 +1,950 @@ +# Import modules + +import yaml, datetime, sys, time, os, shutil, json,re +from pathlib import Path + +import pandas as pd + +import numpy as np + +import scipy as sp +from scipy.ndimage import convolve + +import matplotlib.pyplot as plt +import matplotlib.cm as cm + +from typing import List, Dict + +import qcodes as qc +from qcodes.dataset import AbstractSweep, Measurement +from qcodes.dataset.dond.do_nd_utils import ActionsT +from qcodes.parameters import ParameterBase +import numpy.typing as npt + +import skimage +from skimage.transform import probabilistic_hough_line +from skimage.feature import canny +from skimage.filters import threshold_otsu +from skimage.morphology import diamond, rectangle # noqa + +import logging +from colorlog import ColoredFormatter +import sys + +from nicegui import ui +import threading + + +class LinSweep_SIM928(AbstractSweep[np.float64]): + + """ + Linear sweep. + + Args: + param: Qcodes parameter to sweep. + start: Sweep start value. + stop: Sweep end value. + num_points: Number of sweep points. + delay: Time in seconds between two consecutive sweep points. + post_actions: Actions to do after each sweep point. + get_after_set: Should we perform a get on the parameter after setting it + and store the value returned by get rather than the set value in the dataset. + """ + + def __init__( + self, + param: ParameterBase, + start: float, + stop: float, + num_points: int, + delay: float = 0, + post_actions: ActionsT = (), + get_after_set: bool = False, + ): + self._param = param + self._start = start + self._stop = stop + self._num_points = num_points + self._delay = delay + self._post_actions = post_actions + self._get_after_set = get_after_set + + def get_setpoints(self) -> npt.NDArray[np.float64]: + """ + Linear (evenly spaced) numpy array for supplied start, stop and + num_points. + """ + array = np.linspace(self._start, self._stop, self._num_points).round(3) + # below_two = array[np.where(array < 2)].round(3) + # above_two = array[np.where(array >= 2)].round(2) + # array = np.concatenate((below_two, above_two)) + + return array + + @property + def param(self) -> ParameterBase: + return self._param + + @property + def delay(self) -> float: + return self._delay + + @property + def num_points(self) -> int: + return self._num_points + + @property + def post_actions(self) -> ActionsT: + return self._post_actions + + @property + def get_after_set(self) -> bool: + return self._get_after_set + + @property + def setpoints(self) -> npt.NDArray[np.float64]: + return self.get_setpoints() + +class InstrumentControl: + + def __init__(self, + logger, + config: str, + tuner_config: str, + station_config: str, + save_dir: str) -> None: + + """ + Initializes an InstrumentControl object. This class takes care of all connections and communication to instruments + during your experiment. + + Args: + config (str): Path to .yaml file containing device information. + setup_config (str): Path to .yaml file containing experimental setup information. + tuner_config (str): Path to .yaml file containing tuner information. + station_config (str): Path to .yaml file containing QCoDeS station information + save_dir (str): Directory to save data and plots generated. + """ + + # First, we save all the config information + + self.logger = logger + self.config_file = config + self.tuner_config_file = tuner_config + self.station_config_file = station_config + self.save_dir = save_dir + + + # Now, we load the config files + + self.load_config_files() + + # After the config files are loaded, we set up a file where data is stored, using today's date + + todays_date = datetime.date.today().strftime("%Y-%m-%d") + self.db_folder = os.path.join(save_dir, f"{self.config['device']['characteristics']['name']}_{todays_date}") + os.makedirs(self.db_folder, exist_ok=True) + + # The following method creates a logger that will provide information to the user while the code is running + + self.initialise_logger() + + # Now, we connect to the instruments specified in the config + + self.logger.attempt("connecting to station") + + # Using the Station class from qcodes, we can represent the physical setup of our experiment + + self.station = qc.Station(config_file=self.station_config_file) + + # Now, we attempt to load the voltage source(s) and readout device from the station config file + + voltage_sources = [] + + for voltage_source in self.voltage_source_names: + + Instrument = self.station.load_instrument(voltage_source) + voltage_sources.append(Instrument) + + self.voltage_source_1 = voltage_sources[0] + self.voltage_source_2 = voltage_sources[1] + + self.station.load_instrument(self.multimeter_name) + + self.drain_mm_device = getattr(self.station, self.multimeter_name) + + self.drain_volt = getattr(self.station, self.multimeter_name).volt + + self.logger.complete("\n") + + # Now, we change the names of the parameters to match the names provided in the yaml file + + channel_prefix = "" + for parameter, details in self.voltage_source.parameters.items(): + if details.unit == 'V': + pattern = r'(.*).*\d+.*' + matches = re.findall(pattern,parameter) + extractions = [match.strip() for match in matches] + channel_prefix = extractions[0] + if channel_prefix == "": + self.logger.error('unable to find prefix for channels') + + self.logger.info("changing parameters to match names in config.yaml file") + self.voltage_source.timeout(5 * 60) + + for gate, details in self.device_gates.items(): + + self.voltage_source.add_parameter( + name=gate, + parameter_class=qc.parameters.DelegateParameter, + source=getattr(self.voltage_source, channel_prefix+str(details['channel'])), + label=details['label'], + unit = details['unit'], + step=details['step'], + ) + self.logger.info(f"changed {channel_prefix+str(details['channel'])} to {gate}") + + # Creates the qcodes database and sets-up the experiment + + db_filepath = os.path.join(self.db_folder, f"experiments_{self.config['device']['characteristics']['name']}_{todays_date}.db") + qc.dataset.initialise_or_create_database_at( + db_filepath + ) + self.logger.info(f"database created/loaded @ {db_filepath}") + + self.logger.info(f"experiment created/loaded in database") + self.initialization_exp = qc.dataset.load_or_create_experiment( + 'Initialization', + sample_name=self.config['device']['characteristics']['name'] + ) + + # This next section copies the config files in case they get lost or changed + + self.logger.info(f"copying all of the config.yml files") + shutil.copy(self.station_config_file, self.db_folder) + shutil.copy(self.tuner_config_file, self.db_folder) + shutil.copy(self.config_file, self.db_folder) + + # Finally, we set up a dictionary to store all of the important results from our experiments + + self.results = {} + + self.results['turn_on'] = { + 'voltage': None, + 'current': None, + 'resistance': None, + 'saturation': None, + } + + for gate in self.barriers + self.leads: + self.results[gate] = { + 'pinch_off': {'voltage': None, 'width': None} + } + + for gate in self.barriers: + self.results[gate]['bias_voltage'] = None + + # Finally, we also ground the device before the experiment starts + + self.ground_device() + + return None + + def load_config_files(self): + + ''' + This method loads all relavent information from the configuration files provided. It is ran when an InstrumentControl + object is initialized. + ''' + + # Reads the tuner config information + + self.tuner_info = yaml.safe_load(Path(self.tuner_config_file).read_text()) + self.global_turn_on_info = self.tuner_info['global_turn_on'] + self.pinch_off_info = self.tuner_info['pinch_off'] + + # Reads the config information + + self.config = yaml.safe_load(Path(self.config_file).read_text()) + self.charge_carrier = self.config['device']['characteristics']['charge_carrier'] + self.operation_mode = self.config['device']['characteristics']['operation_mode'] + + # Sets the voltage sign for the gates, based on the charge carrier and mode of the device + + if (self.charge_carrier, self.operation_mode) == ('e', 'acc'): + self.voltage_sign = +1 + + if (self.charge_carrier, self.operation_mode) == ('e', 'dep'): + self.voltage_sign = -1 + + if (self.charge_carrier, self.operation_mode) == ('h', 'acc'): + self.voltage_sign = -1 + + if (self.charge_carrier, self.operation_mode) == ('h', 'dep'): + self.voltage_sign = +1 + + # Now, we retreive the device gates + + self.device_gates = self.config['device']['gates'] + + # Then, we re-label all the gates as ohmics, barriers, leads, plungers, accumulation gates and screening gates + + self.ohmics = [] + self.barriers = [] + self.leads = [] + self.plungers = [] + self.accumulation = [] + self.screening = [] + + # TODO Add additional logic to load SPI rack connections separately from other instruments (either or, both, etc.) + + for gate, details in self.device_gates.items(): + + if details['type'] == 'ohmic': + self.ohmics.append(gate) + + if details['type'] == 'barrier': + self.barriers.append(gate) + + if details['type'] == 'lead': + self.leads.append(gate) + + if details['type'] == 'plunger': + self.plungers.append(gate) + + if details['type'] == 'accumulation': + self.accumulation.append(gate) + + if details['type'] == 'screening': + self.screening.append(gate) + + self.all_gates = list(self.device_gates.keys()) + + # Finally, we determine voltage and current thresholds, as well as other information about the experimental setup + + self.abs_max_current = self.config['device']['constraints']['abs_max_current'] + self.abs_max_gate_voltage = self.config['device']['constraints']['abs_max_gate_voltage'] + self.abs_max_gate_differential = self.config['device']['constraints']['abs_max_gate_differential'] + + self.voltage_source_names = self.config['setup']['voltage_sources'] + self.multimeter_name = self.config['setup']['multimeter'] + self.voltage_divider = self.config['setup']['voltage_divider'] + self.preamp_bias = self.config['setup']['preamp_bias'] + self.preamp_sensitivity = self.config['setup']['preamp_sensitivity'] + self.voltage_resolution = self.config['setup']['voltage_resolution'] + + return None + + def set_voltage(self, + gates: str | list[str], + voltage: float): + + """ + This method allows the user to smoothly set any number of gates to the same final voltage value. If you would like to + set different gates to different voltage values, please use the set_voltage_configuration() method. + + Args: + gates (str | list[str]): A single gate name, written as a string, or a list of gate strings, containing the names of the gates + we wish to set. + voltage (float): The voltage we wish to set the given gate(s) to. + """ + + # First, if only one gate is input, we convert it to a list to make it easier to work with + + if isinstance(gates, str): + gates = [gates] + + # Then, we define a dictionary using the gates and voltage input by the user + + voltage_dict = dict(zip(gates, [voltage]*len(gates))) + + # This dictionary gets input into the more general method, set_voltage_configuration() + + self.set_voltage_configuration(self, voltage_configuration = voltage_dict) + + return None + + def set_voltage_configuration(self, + voltage_configuration: Dict[str, float] = {}, + stepsize: float = 10e-3): + + """ + This method allows the user to smoothly set a given voltage configuration. + + Args: + voltage_configuration (Dict[str, float]): A dictionary containing the names of the gates to be set and + the corresponding voltages the gates will be set to. + stepsize (float): The voltage stepsize for all the gates. Default is set to 10 mV. + """ + + # First, we determine which gates are being set. + + gates = list(voltage_configuration.keys()) + + # Then, we assert that the sign of the voltage we wish to set agrees with the device we are testing + + for gate in gates: + + assert np.sign(voltage_configuration[gate]) == np.sign(self.voltage_sign) or np.sign(voltage_configuration[gate]) == 0, f"Check voltage sign on {gate}" + + # Now, we set up some lists to hold the voltage values + + intermediate = [] + done = set() + + prevvals = {} + gate_params = {} + gate_steps = {} + + # Now, we map the gate to the source and save the correspondance + + gate_to_source = {} + + for source_name, instrument in self.voltage_sources.items(): + for gate in self.voltage_source_names_check[source_name]: + gate_to_source[gate] = instrument + + for gate, target in voltage_configuration.items(): + + instrument = gate_to_source[gate] + param = getattr(instrument, gate) + + gate_params[gate] = param + prevvals[gate] = float(param.get()) + + step_param = getattr(instrument, f"{gate}_step", None) + + gate_steps[gate] = step_param() if step_param else stepsize + + # Now, we generate the ramp + + while len(done) < len(voltage_configuration): + + step = {} + + for gate, target in voltage_configuration.items(): + + if gate in done: + continue + + prev = prevvals[gate] + step_size = gate_steps[gate] + + dv = target - prev + + if abs(dv) <= step_size: + step[gate] = target + done.add(gate) + else: + step[gate] = prev + step_size * (1 if dv > 0 else -1) + + prevvals[gate] = step[gate] + + intermediate.append(step) + + # Finally, we apply the ramp + + for step in intermediate: + + for gate, voltage in step.items(): + gate_params[gate].set(voltage) + + # This lets us sleep once per step + for instrument in self.voltage_sources.values(): + delay_param = getattr(instrument, "smooth_timestep", None) + if delay_param: + time.sleep(delay_param()) + break + + return None + + def sweep_1d(self, + maxV: float = None, + minV: float = None, + voltage_configuration: Dict[str, float] = {}, + dV: float = 10e-3) -> pd.DataFrame: + + # Bring device to voltage configuration + + if voltage_configuration is not None: + self.logger.info(f"setting voltage configuration: {voltage_configuration}") + self.set_voltage_configuration(voltage_configuration) + + # Default dV and maxV based on setup_config and config + + if dV is None: + dV = self.voltage_resolution + + if maxV is None: + maxV = self.voltage_sign * self.abs_max_gate_voltage + + # Ensure we stay in the allowed voltage space + + assert np.sign(maxV) == self.voltage_sign, self.logger.error("Double check the sign of the gate voltage (maxV) for your given device.") + assert np.sign(minV) == self.voltage_sign or np.sign(minV) == 0, self.logger.error("Double check the sign of the gate voltage (minV) for your given device.") + + # Set up gate sweeps + + num_steps = self.calculate_num_of_steps(minV, maxV, dV) + gates_involved = self.barriers + self.leads + self.accumulation + self.plungers + + print(gates_involved) + + self.logger.info(f"setting {gates_involved} to {minV} V") + + self.set_voltage_configuration(gates_involved, minV) + + sweep_list = [] + + for voltage_source in self.voltage_sources.items(): + + for gate_name in gates_involved: + + if gate_name in self.voltage_source_names_check[voltage_source[0]]: + + print(self.voltage_source_names_check[voltage_source[0]][gate_name]) + + param = voltage_source[1][gate_name] + + sweep_list.append( + LinSweep_SIM928(param, minV, maxV, num_steps, get_after_set=False) + ) + + # Execute the measurement + self.logger.attempt(f"sweeping {gates_involved} together from {minV} V to {maxV} V") + + result = qc.dataset.dond( + qc.dataset.TogetherSweep( + *sweep_list + ), + self.drain_volt, + break_condition=self._check_break_conditions, + measurement_name='Device Turn On', + exp=self.initialization_exp, + show_progress=True + ) + + self.logger.complete('\n') + + return None + + def sweep_2d(self, + P1: str = None, + P2: str = None, + P1_bounds: tuple = (None, None), + P2_bounds: tuple = (None, None), + dV: float | tuple = None, + voltage_configuration: dict = None) -> tuple[pd.DataFrame, plt.Axes]: + + # Bring device to voltage configuration + if voltage_configuration is not None: + self.logger.info(f"setting voltage configuration: {voltage_configuration}") + self.set_voltage_configuration(voltage_configuration) + else: + self.logger.info(f"setting {self.leads} to {self.results['turn_on']['saturation']} V") + self.set_voltage(self.leads, self.results['turn_on']['saturation']) + + # Parse dV from user + if dV is None: + dV_P1 = self.voltage_resolution + dV_P2 = self.voltage_resolution + elif type(dV) is float: + dV_P1 = dV + dV_P2 = dV + elif type(dV) is tuple: + dV_P1, dV_P2 = dV + else: + self.logger.error("invalid dV") + return + + # Double check device bounds + minV_P1, maxV_P1 = P1_bounds + minV_P2, maxV_P2 = P2_bounds + + if minV_P1 is None: + minV_P1 = self.results[P1]['pinch_off']['voltage'] + else: + assert np.sign(minV_P1) == self.voltage_sign, self.logger.error("double check the sign of the gate voltage (minV) for B1.") + + if minV_P2 is None: + minV_P2 = self.results[P2]['pinch_off']['voltage'] + else: + assert np.sign(minV_P2) == self.voltage_sign, self.logger.error("double check the sign of the gate voltage (minV) for B2.") + + if maxV_P1 is None: + if self.voltage_sign == 1: + maxV_P1 = min(self.results[P1]['pinch_off']['voltage']+self.voltage_sign*self.results[P1]['pinch_off']['width'], self.results['turn_on']['saturation']) + elif self.voltage_sign == -1: + maxV_P1 = max(self.results[P1]['pinch_off']['voltage']+self.voltage_sign*self.results[P1]['pinch_off']['width'], self.results['turn_on']['saturation']) + else: + assert np.sign(maxV_P1) == self.voltage_sign, self.logger.error("double check the sign of the gate voltage (maxV) for B1.") + + if maxV_P2 is None: + if self.voltage_sign == 1: + maxV_P2 = min(self.results[P2]['pinch_off']['voltage']+self.voltage_sign*self.results[P2]['pinch_off']['width'], self.results['turn_on']['saturation']) + elif self.voltage_sign == -1: + maxV_P2 = max(self.results[P2]['pinch_off']['voltage']+self.voltage_sign*self.results[P2]['pinch_off']['width'], self.results['turn_on']['saturation']) + else: + assert np.sign(maxV_P2) == self.voltage_sign, self.logger.error("double check the sign of the gate voltage (maxV) for B2.") + + self.logger.info(f"setting {P1} to {maxV_P1} V") + self.logger.info(f"setting {P2} to {maxV_P2} V") + + self.set_voltage_configuration({P1: maxV_P1, P2: maxV_P2}) + + def smooth_reset(): + + """ + Resets the inner loop variable smoothly back to the starting value + """ + + self.set_gates_to_voltage([P2], maxV_P2) + + num_steps_B1 = self.calculate_num_of_steps(minV_P1, maxV_P1, dV_P1) + num_steps_B2 = self.calculate_num_of_steps(minV_P2, maxV_P2, dV_P2) + + self.logger.attempt("barrier barrier scan") + + self.logger.info(f"stepping {P1} from {maxV_P1} V to {minV_P1} V") + self.logger.info(f"sweeping {P2} from {maxV_P2} V to {minV_P2} V") + + gates = self.barriers + param_check = [] + + for voltage_source in self.voltage_sources.items(): + + for gate_name in gates: + + if gate_name in self.voltage_source_names_check[voltage_source[0]]: + + print(self.voltage_source_names_check[voltage_source[0]][gate_name]) + + param = voltage_source[1][gate_name] + + param_check.append(param) + + result = qc.dataset.do2d( + param_check[0], # outer loop + maxV_P1, + minV_P1, + num_steps_B1, + param_check[1], # inner loop + maxV_P2, + minV_P2, + num_steps_B2, + self.drain_volt, + after_inner_actions = [smooth_reset], + set_before_sweep=True, + show_progress=True, + measurement_name='Barrier Barrier Sweep', + exp=self.initialization_exp + ) + self.logger.complete("\n") + + self.logger.info(f"returning gates {P1}, {P2} to {maxV_P1} V, {maxV_P2} V respectively") + self.set_voltage([P1], maxV_P1) + self.set_voltage([P2], maxV_P2) + + return None + + def sweep_1d_measurement(self, + maxV: float = None, + minV: float = None, + voltage_configuration: Dict[str, float] = {}, + dV: float = 10e-3) -> pd.DataFrame: + + # Bring device to voltage configuration + + if voltage_configuration is not None: + self.logger.info(f"setting voltage configuration: {voltage_configuration}") + self.set_voltage_configuration(voltage_configuration) + + # Default values + + if dV is None: + dV = self.voltage_resolution + + if maxV is None: + maxV = self.voltage_sign * self.abs_max_gate_voltage + + # Safety checks + + assert np.sign(maxV) == self.voltage_sign + assert np.sign(minV) == self.voltage_sign or np.sign(minV) == 0 + + # Gates involved + + gates_involved = self.barriers + self.leads + self.accumulation + self.plungers + + self.logger.info(f"setting {gates_involved} to {minV} V") + + self.set_voltage_configuration(gates_involved, minV) + + # Number of steps + + num_steps = self.calculate_num_of_steps(minV, maxV, dV) + + # Build parameter list + + gate_params = [] + + for voltage_source in self.voltage_sources.items(): + + for gate_name in gates_involved: + + if gate_name in self.voltage_source_names_check[voltage_source[0]]: + + param = voltage_source[1][gate_name] + gate_params.append(param) + + # Create sweep values + + sweep_vals = np.linspace(minV, maxV, num_steps) + + # Setup measurement + + meas = Measurement(exp=self.initialization_exp) + + # Register parameters + for param in gate_params: + meas.register_parameter(param) + + meas.register_parameter(self.drain_volt, setpoints=tuple(gate_params)) + + # Execute sweep + self.logger.attempt( + f"sweeping {gates_involved} together from {minV} V to {maxV} V" + ) + + with meas.run() as datasaver: + + for v in sweep_vals: + + # set all gates together + for param in gate_params: + param.set(v) + + # measurement + drain = self.drain_volt.get() + + results = [(param, v) for param in gate_params] + results.append((self.drain_volt, drain)) + + datasaver.add_result(*results) + + # break condition + if self._check_break_conditions(drain): + break + + self.logger.complete('\n') + + return None + + def sweep_2d_measurement(self, + P1: str = None, + P2: str = None, + P1_bounds: tuple = (None, None), + P2_bounds: tuple = (None, None), + dV: float | tuple = None, + voltage_configuration: dict = None) -> tuple[pd.DataFrame, plt.Axes]: + + # Bring device to voltage configuration + if voltage_configuration is not None: + self.logger.info(f"setting voltage configuration: {voltage_configuration}") + self.set_voltage_configuration(voltage_configuration) + else: + self.logger.info(f"setting {self.leads} to {self.results['turn_on']['saturation']} V") + self.set_voltage(self.leads, self.results['turn_on']['saturation']) + + # Parse dV + if dV is None: + dV_P1 = self.voltage_resolution + dV_P2 = self.voltage_resolution + elif type(dV) is float: + dV_P1 = dV + dV_P2 = dV + elif type(dV) is tuple: + dV_P1, dV_P2 = dV + else: + self.logger.error("invalid dV") + return + + # Bounds + minV_P1, maxV_P1 = P1_bounds + minV_P2, maxV_P2 = P2_bounds + + # (same bounds logic as your code omitted here for brevity) + + self.logger.info(f"setting {P1} to {maxV_P1} V") + self.logger.info(f"setting {P2} to {maxV_P2} V") + + self.set_voltage_configuration({P1: maxV_P1, P2: maxV_P2}) + + # Step counts + num_steps_B1 = self.calculate_num_of_steps(minV_P1, maxV_P1, dV_P1) + num_steps_B2 = self.calculate_num_of_steps(minV_P2, maxV_P2, dV_P2) + + # Generate sweep arrays + P1_vals = np.linspace(maxV_P1, minV_P1, num_steps_B1) + P2_vals = np.linspace(maxV_P2, minV_P2, num_steps_B2) + + self.logger.attempt("barrier barrier scan") + + self.logger.info(f"stepping {P1} from {maxV_P1} V to {minV_P1} V") + self.logger.info(f"sweeping {P2} from {maxV_P2} V to {minV_P2} V") + + # Resolve QCoDeS parameters + gates = self.barriers + param_check = [] + + for voltage_source in self.voltage_sources.items(): + + for gate_name in gates: + + if gate_name in self.voltage_source_names_check[voltage_source[0]]: + param = voltage_source[1][gate_name] + param_check.append(param) + + P1_param = param_check[0] + P2_param = param_check[1] + + # Setup measurement + meas = Measurement(exp=self.initialization_exp) + + meas.register_parameter(P1_param) + meas.register_parameter(P2_param) + meas.register_parameter(self.drain_volt, setpoints=(P1_param, P2_param)) + + # Inner reset function + def smooth_reset(): + + self.set_gates_to_voltage([P2], maxV_P2) + + # Run measurement + with meas.run() as datasaver: + + for v1 in P1_vals: + + # set outer parameter + P1_param.set(v1) + + for v2 in P2_vals: + + # set inner parameter + P2_param.set(v2) + + drain = self.drain_volt.get() + + datasaver.add_result( + (P1_param, v1), + (P2_param, v2), + (self.drain_volt, drain), + ) + + # reset inner sweep + smooth_reset() + + self.logger.complete("\n") + + # Return gates to starting values + self.logger.info(f"returning gates {P1}, {P2} to {maxV_P1} V, {maxV_P2} V respectively") + + self.set_voltage([P1], maxV_P1) + self.set_voltage([P2], maxV_P2) + + return None + + def sweep_nd(self): + pass + + def sweep_nd_measurement(self): + pass + + def check_break_conditions(self): + + # Go through device break conditions to see if anything is flagged, + # should return a Boolean. + + # breakConditionsDict = { + # 0: 'Maximum current is exceeded.', + # 1: 'Maximum ohmic bias is exceeded.', + # 2: 'Maximum gate voltage is exceeded.', + # 3: 'Maximum gate differential is exceeded.', + # } + + # MAX CURRENT + + isExceedingMaxCurrent = np.abs(self._get_drain_current()) > self.abs_max_current + # time.sleep(0.1) + + # MAX BIAS + + # flag = [] + # for gate_name in self.ohmics: + # gate_voltage = getattr(self.voltage_source, f'{gate_name}')() + # if np.abs(gate_voltage * self.voltage_divider) > self.abs_max_ohmic_bias: + # flag.append(True) + # else: + # flag.append(False) + # isExceedingMaxOhmicBias = np.array(flag).any() + # time.sleep(0.1) + + # MAX GATE VOLTAGE + + # flag = [] + # for gate_name in self.all_gates: + # gate_voltage = getattr(self.voltage_source, f'{gate_name}')() + # if np.abs(gate_voltage) > self.abs_max_gate_voltage: + # flag.append(True) + # else: + # flag.append(False) + # isExceedingMaxGateVoltage = np.array(flag).any() + # time.sleep(0.1) + + # # MAX GATE DIFFERENTIAL + + # flag = [] + # gates_to_check = self.barriers + self.leads + # for i in range(len(gates_to_check)): + # for j in range(i+1, len(gates_to_check)): + # gate_voltage_i = getattr(self.voltage_source, f'{self.all_gates[i]}')() + # gate_voltage_j = getattr(self.voltage_source, f'{self.all_gates[j]}')() + # # Check if the absolute difference between gate voltages is greater than 0.5 + # if np.abs(gate_voltage_i - gate_voltage_j) >= self.abs_max_gate_differential: + # flag.append(True) + # else: + # flag.append(False) + # isExceedingMaxGateDifferential = np.array(flag).any() + # time.sleep(0.1) + + listOfBreakConditions = [ + isExceedingMaxCurrent, + # isExceedingMaxOhmicBias, + # isExceedingMaxGateVoltage, + # isExceedingMaxGateDifferential, + ] + isExceeded = np.array(listOfBreakConditions).any() + + # breakConditions = np.where(np.any(listOfBreakConditions == True))[0] + # if len(breakConditions) != 0: + # for index in breakConditions.tolist(): + # print(breakConditionsDict[index]+"\n") + + return isExceeded + + def calculate_num_of_steps(self, + minV: float, + maxV: float, + dV: float): + """Calculates the number of steps required for a sweep. + + Args: + minV (float): Minimum voltage (V) + maxV (float): Maximum voltage (V) + dV (float): Step size (V) + + Returns: + None: + """ + + return round(np.abs(maxV-minV) / dV) + 1 \ No newline at end of file diff --git a/src/logger.py b/src/logger.py new file mode 100644 index 0000000..bf21329 --- /dev/null +++ b/src/logger.py @@ -0,0 +1,112 @@ +# Import modules + +import yaml, datetime, sys, time, os, shutil, json,re +from pathlib import Path + +import pandas as pd + +import numpy as np + +import scipy as sp +from scipy.ndimage import convolve + +import matplotlib.pyplot as plt +import matplotlib.cm as cm + +from typing import List, Dict + +import qcodes as qc +from qcodes.dataset import AbstractSweep, Measurement +from qcodes.dataset.dond.do_nd_utils import ActionsT +from qcodes.parameters import ParameterBase +import numpy.typing as npt + +import skimage +from skimage.transform import probabilistic_hough_line +from skimage.feature import canny +from skimage.filters import threshold_otsu +from skimage.morphology import diamond, rectangle # noqa + +import logging +from colorlog import ColoredFormatter +import sys + +from nicegui import ui +import threading + + +class Logger: + + def initialise_logger(self): + + """ + This method creates new logging categories which the user can see while the autotuner runs, as well as outputs + a log of everything that occured during the experiment. This method is when an InstrumentControl object is initialised. + """ + + ATTEMPT, COMPLETE, IN_PROGRESS = logging.INFO - 2, logging.INFO - 1, logging.INFO + + logging.addLevelName(ATTEMPT, 'ATTEMPT') + logging.addLevelName(COMPLETE, 'COMPLETE') + logging.addLevelName(IN_PROGRESS, 'IN PROGRESS') + + def attempt(self, message, *args, **kwargs): + if self.isEnabledFor(ATTEMPT): + self._log(ATTEMPT, message, args, **kwargs) + + def complete(self, message, *args, **kwargs): + if self.isEnabledFor(COMPLETE): + self._log(COMPLETE, message, args, **kwargs) + + def in_progress(self, message, *args, **kwargs): + if self.isEnabledFor(IN_PROGRESS): + self._log(IN_PROGRESS, message, args, **kwargs) + + logging.Logger.attempt = attempt + logging.Logger.complete = complete + logging.Logger.in_progress = in_progress + + console_formatter = ColoredFormatter( + "%(log_color)s%(asctime)s - %(name)s - %(levelname)s %(message)s", + datefmt=None, + reset=True, + log_colors={ + 'ATTEMPT': 'yellow', + 'COMPLETE': 'green', + 'DEBUG': 'white', + 'INFO': 'white', + 'IN PROGRESS': 'white', + 'WARNING': 'red', + 'ERROR': 'bold_red', + 'CRITICAL': 'bold_red' + } + ) + + # Then, we set the format of the above messages displayed to the user + + file_formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s %(message)s" + ) + + # Now, we define a handler, which writes the messages from the Logger + + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(console_formatter) + + # Now we create an info log for the Logger + + file_handler = logging.FileHandler( + os.path.join(self.db_folder, 'run_info.log') + ) + + file_handler.setFormatter(file_formatter) + + # Finally, we define the logger + + self.logger = logging.getLogger(__name__) + self.logger.addHandler(console_handler) + self.logger.addHandler(file_handler) + + self.logger.setLevel(min(self.logger.getEffectiveLevel(), ATTEMPT)) + + return None \ No newline at end of file diff --git a/src/main.py b/src/main.py index e3a073b..d23563f 100644 --- a/src/main.py +++ b/src/main.py @@ -1,13 +1,20 @@ ''' File: main.py -Authors: Benjamin Van Osch (email), Mason Daub (mjdaub@uwaterloo.ca) +Authors: Benjamin Van Osch (bvanosch@uwaterloo.ca), Mason Daub (mjdaub@uwaterloo.ca) Entry point to the auto tuner. ''' - +from logger import Logger +from instrument_control import InstrumentControl +from data_analysis import DataAnalysis from gui import tuner_gui -gui = tuner_gui() + +log = Logger() +ic = InstrumentControl(log, config = config, tuner_config = tuner_config, station_config = station_config) +om = DataAnalysis(log, config = config, tuner_config = tuner_config, station_config = station_config) +gui = tuner_gui(log, config = config, tuner_config = tuner_config, station_config = station_config) + gui.start() diff --git a/src/utils_updated.py b/src/utils_updated.py index 8ed918c..6a966e6 100644 --- a/src/utils_updated.py +++ b/src/utils_updated.py @@ -1042,7 +1042,7 @@ def calculate_num_of_steps(self, return round(np.abs(maxV-minV) / dV) + 1 -class DataAnalysis(Logger): +class DataAnalysis: def __init__(self, tuner_config) -> None: From 652935050664fbe222fcfe76065ec4d486b0167e Mon Sep 17 00:00:00 2001 From: VanOschB Date: Wed, 1 Apr 2026 09:33:06 -0400 Subject: [PATCH 2/6] Protocol Structure, Fitting Func, configs, and results Added the structure for the autotuning protocol, a fit function for data_analysis, config paths in main and a results category for the logger. --- src/autotuning_protocol.py | 79 ++++++++++++++++++++++++++++++++++++++ src/data_analysis.py | 26 ++++++++++++- src/instrument_control.py | 2 +- src/logger.py | 10 ++++- src/main.py | 25 +++++++++++- 5 files changed, 135 insertions(+), 7 deletions(-) create mode 100644 src/autotuning_protocol.py diff --git a/src/autotuning_protocol.py b/src/autotuning_protocol.py new file mode 100644 index 0000000..5b7c40b --- /dev/null +++ b/src/autotuning_protocol.py @@ -0,0 +1,79 @@ +# Import modules + +import yaml, datetime, sys, time, os, shutil, json,re +from pathlib import Path + +import pandas as pd + +import numpy as np + +import scipy as sp +from scipy.ndimage import convolve + +import matplotlib.pyplot as plt +import matplotlib.cm as cm + +from typing import List, Dict + +import qcodes as qc +from qcodes.dataset import AbstractSweep, Measurement +from qcodes.dataset.dond.do_nd_utils import ActionsT +from qcodes.parameters import ParameterBase +import numpy.typing as npt + +import skimage +from skimage.transform import probabilistic_hough_line +from skimage.feature import canny +from skimage.filters import threshold_otsu +from skimage.morphology import diamond, rectangle # noqa + +import logging +from colorlog import ColoredFormatter +import sys + +from nicegui import ui +import threading + +class Bootstrapping: + + def ground_device(): + pass + + def turn_on(): + pass + + def pinch_off(): + pass + + def barrier_barrier_sweep(): + pass + + def set_plunger_sweep(): + pass + + def coulomb_diamonds(): + pass + + def tune_lead_dot_tunneling(): + pass + +class CoarseTuning: + + def plunger_plunger_sweep(): + pass + +class VirtualGating: + + def lever_arm_matrix(): + pass + +class ChargeStateTuning: + + def determine_charge_states(): + pass + +class FineTuning: + + def rabi_oscilations(): + pass + diff --git a/src/data_analysis.py b/src/data_analysis.py index cc69a4a..df1cfa2 100644 --- a/src/data_analysis.py +++ b/src/data_analysis.py @@ -3,6 +3,8 @@ import yaml, datetime, sys, time, os, shutil, json,re from pathlib import Path +import inspect + import pandas as pd import numpy as np @@ -13,7 +15,7 @@ import matplotlib.pyplot as plt import matplotlib.cm as cm -from typing import List, Dict +from typing import List, Dict, Callable import qcodes as qc from qcodes.dataset import AbstractSweep, Measurement @@ -37,7 +39,9 @@ class DataAnalysis: - def __init__(self, logger, tuner_config) -> None: + def __init__(self, + logger, + tuner_config) -> None: self.logger = logger @@ -63,6 +67,24 @@ def sigmoid(self, x, a, b, x0, y0): def linear(self, x, m, b): return m * x + b + def relu(self, x, a, x0, b): + return np.maximum(0, a * (x - x0) + b) + + def fit_to_function(self, + x_data, + y_data, + function: Callable): + + popt, pcov = sp.optimize.curve_fit(function, x_data, y_data) + perr = np.sqrt(np.diag(pcov)) + + params = list(inspect.signature(function).parameters.keys())[1:] + + for name, val, err in zip(params, popt, perr): + print(f"{name} = {val:.3f} ± {err:.3f}") + + return params, popt, pcov + def extract_bias_point(self, data: pd.DataFrame, plot_process: bool, diff --git a/src/instrument_control.py b/src/instrument_control.py index 00fb0ff..73ef4d8 100644 --- a/src/instrument_control.py +++ b/src/instrument_control.py @@ -147,7 +147,7 @@ def __init__(self, # The following method creates a logger that will provide information to the user while the code is running - self.initialise_logger() + self.logger.initialise_logger() # Now, we connect to the instruments specified in the config diff --git a/src/logger.py b/src/logger.py index bf21329..61310d6 100644 --- a/src/logger.py +++ b/src/logger.py @@ -44,11 +44,12 @@ def initialise_logger(self): a log of everything that occured during the experiment. This method is when an InstrumentControl object is initialised. """ - ATTEMPT, COMPLETE, IN_PROGRESS = logging.INFO - 2, logging.INFO - 1, logging.INFO + ATTEMPT, COMPLETE, IN_PROGRESS, RESULTS = logging.INFO - 3, logging.INFO - 2, logging.INFO - 1, logging.INFO logging.addLevelName(ATTEMPT, 'ATTEMPT') logging.addLevelName(COMPLETE, 'COMPLETE') logging.addLevelName(IN_PROGRESS, 'IN PROGRESS') + logging.addLevelName(RESULTS, 'RESULTS') def attempt(self, message, *args, **kwargs): if self.isEnabledFor(ATTEMPT): @@ -62,9 +63,14 @@ def in_progress(self, message, *args, **kwargs): if self.isEnabledFor(IN_PROGRESS): self._log(IN_PROGRESS, message, args, **kwargs) + def results(self, message, *args, **kwargs): + if self.isEnabledFor(RESULTS): + self._log(RESULTS, message, args, **kwargs) + logging.Logger.attempt = attempt logging.Logger.complete = complete logging.Logger.in_progress = in_progress + logging.Logger.results = results console_formatter = ColoredFormatter( "%(log_color)s%(asctime)s - %(name)s - %(levelname)s %(message)s", @@ -73,7 +79,7 @@ def in_progress(self, message, *args, **kwargs): log_colors={ 'ATTEMPT': 'yellow', 'COMPLETE': 'green', - 'DEBUG': 'white', + 'RESULTS': 'white', 'INFO': 'white', 'IN PROGRESS': 'white', 'WARNING': 'red', diff --git a/src/main.py b/src/main.py index d23563f..2b6d279 100644 --- a/src/main.py +++ b/src/main.py @@ -2,19 +2,40 @@ File: main.py Authors: Benjamin Van Osch (bvanosch@uwaterloo.ca), Mason Daub (mjdaub@uwaterloo.ca) -Entry point to the auto tuner. +Entry point to the auto tuner. This ''' + +import yaml + from logger import Logger from instrument_control import InstrumentControl from data_analysis import DataAnalysis from gui import tuner_gui +""" +Currently, the config files are defined in the Configs folder, and read here to instantiate each of the classes. +The user would have to create new config files in order to load a separate protocol into the Autotuner. + +TODO We can probably generalize the station and tuner config files since they don't really need to be changed much, so we could + consider having the config file loading from the gui section and the user can pick which one after the application is opened. + The most general version would be to create/edit this config file within the gui but that can be added later. + +""" + +with open('configs/Intel_Config.yaml', 'r') as f: + config = yaml.safe_load(f) + +with open('configs/Intel_Tuner_Config.yaml', 'r') as f: + tuner_config = yaml.safe_load(f) + +with open('configs/Intel_Station_Config.yaml', 'r') as f: + station_config = yaml.safe_load(f) + log = Logger() ic = InstrumentControl(log, config = config, tuner_config = tuner_config, station_config = station_config) om = DataAnalysis(log, config = config, tuner_config = tuner_config, station_config = station_config) gui = tuner_gui(log, config = config, tuner_config = tuner_config, station_config = station_config) - gui.start() From bd858f9c0579349f504fec14cc6c7a98d82aebd7 Mon Sep 17 00:00:00 2001 From: Mason Daub Date: Thu, 2 Apr 2026 15:36:39 -0400 Subject: [PATCH 3/6] Added the asynchronous buffered instrument readout and a live plotter for testing. --- src/buffered_readout.py | 129 ++++++++++++++++++++++++++++++++++++++++ src/gui.py | 56 ++++++++++++++--- src/main.py | 16 ++--- 3 files changed, 185 insertions(+), 16 deletions(-) create mode 100644 src/buffered_readout.py diff --git a/src/buffered_readout.py b/src/buffered_readout.py new file mode 100644 index 0000000..3f7bfb0 --- /dev/null +++ b/src/buffered_readout.py @@ -0,0 +1,129 @@ +import threading +import time +import numpy as np +from typing import List, Tuple +import random + +__BufferExists__ = False + +class buffered_readout: + def __init__(self): + ''' + A class to handle the asynchronous buffered readout of the SET current for + autotuning devices. + ''' + + global __BufferExists__ + + assert not __BufferExists__, "Error: Readout buffer already exists!!" + + __exists__ = True + + self.time_func = time.time + + self.BUFFER_SIZE = 1000 + + self.READING_TIME = 0.01 + + ''' + Define the circular buffer for storing the stream of data + It has a lock for multi-threaded operation which protects the + buffer and its current index. + ''' + self.buffer = [(float(0), float(-1.0))] * self.BUFFER_SIZE + self.buffer_index = 0 + self.buffer_lock = threading.Lock() + + self.THREAD_NAME = "BufferThread" + self.thread = threading.Thread(target = self.__thread_loop__, name = self.THREAD_NAME) + + self.shutdown_event = threading.Event() + + def __assert_correct_thread__(self): + #return True + assert threading.current_thread().name == self.THREAD_NAME, f"Error, buffer is being run in the thread '{threading.current_thread().name}'!" + + def __open_instruments__(self): + + self.__assert_correct_thread__() + + return + def __thread_loop__(self): + + self.__assert_correct_thread__() + + self.__open_instruments__() + + print(f"Starting the readout buffer loop in thread {self.THREAD_NAME}...") + + while not self.shutdown_event.is_set(): + + time.sleep(self.READING_TIME) + self.__read_instruments__() + + print(f"Stopping the readout buffer thread...") + return + + def __read_instruments__(self): + value = random.Random(self.time_func()).random() # read the instrument! + curr_time = self.time_func() + + + # Acquire a lock on the circular buffer and push it to the current index + with self.buffer_lock: + self.buffer[self.buffer_index] = (value, curr_time) + self.buffer_index = (self.buffer_index + 1) % self.BUFFER_SIZE + + def read_buffer(self, t_avg : float = 0.0, t_start : float = 0.0) -> float: + ''' + + ''' + if t_start <= 0.0: + t_start = time.time() + + # acquire a lock on the buffer for Readout, and then copy it + with self.buffer_lock: + buffer_copy = self.buffer.copy() + + # Sort according to the time stamp + buffer_copy.sort(key = lambda e: e[1]) + + i = self.BUFFER_SIZE - 1 + values : List[float] = [] + t_stop = t_start - t_avg + while buffer_copy[i][1] >= t_stop: + timestamp = buffer_copy[i][1] + + if timestamp <= t_start: + values.append(buffer_copy[i][0]) + return float(np.average(values)) + def get_buffer(self) -> Tuple | None: + ''' + Try to copy the buffer without blocking. If it fails to acquire the lock, + it will return None. + ''' + + if self.buffer_lock.acquire(blocking = False): + try: + copy = self.buffer.copy() + finally: + self.buffer_lock.release() + else: + return None + + # Next return only valid time stamps + copy.sort(key = lambda e : e[1]) + retval : List[float] = [] + timestamps : List[float] = [] + for value, timestamp in copy: + if timestamp >= 0.0: + retval.append(value) + timestamps.append(timestamp) + assert len(retval) == len(timestamps) + return (retval, timestamps) + + def run(self): + self.thread.start() + def join(self): + self.shutdown_event.set() + self.thread.join() diff --git a/src/gui.py b/src/gui.py index 0509504..0b75458 100644 --- a/src/gui.py +++ b/src/gui.py @@ -13,6 +13,7 @@ import os import threading import time +from buffered_readout import buffered_readout class tuner_gui: @@ -25,6 +26,13 @@ def __init__(self): print(threading.current_thread().name) self.lipsum_text = 'Lorem ipsum dolor sit, amet consectetur adipisicing elit. Quis praesentium cumque magnam odio iure quidem, quod illum numquam possimus obcaecati commodi minima assumenda consectetur culpa fuga nulla ullam. In, libero.' + + self.readout = buffered_readout() + + self.readout.run() + + + def plotting_panel(self): with ui.matplotlib().figure as fig: #fig = plt.gcf() @@ -63,13 +71,44 @@ def home_page(self): ui.button('Print Thread', on_click= lambda : print(threading.current_thread().name)) ui.button('sleep', on_click = lambda : time.sleep(10)) - with ui.row(): - self.config_selection = config_files[0] - ui.select(config_files, label ='Config File', value = config_files[0],\ - on_change = lambda e: self.__setattr__("config_selection", e.value)) - ui.button('load config', on_click = lambda : ui.notify(self.config_selection)) + + self.liveplot = ui.matplotlib(figsize = (3,2)) + + fig = self.liveplot.figure + self.ax = fig.subplots(1,1) + xs = np.linspace(-1, 1) + self.line = self.ax.plot(xs, np.sin(xs)) + self.liveplot.update() + + ui.timer(0.05, self.update_liveplot) + + + self.n = 0 + + def update_liveplot(self): + retval = self.readout.get_buffer() + if retval is None: + return + else: + data, times = retval + self.line[0].set_ydata(data) + self.line[0].set_xdata(times) + self.ax.set_xlim(min(times), max(times)) + self.ax.set_ylim(-0.5, 1.5) + + #self.ax.relim() + #self.ax.autoscale_view() + #for l in self.ax.lines: + #l.remove() + #self.ax.clear() + #self.ax.plot(times, data) + + #self.liveplot.figure.canvas.draw() + self.liveplot.figure.tight_layout() + self.liveplot.update() + ui.update() + - def on_abort(self): ui.notify('Aborting...') @@ -91,8 +130,9 @@ def start(self): with ui.tab_panel('Home'): self.split_view(self.home_page) with ui.tab_panel('Turn-on'): - self.split_view(self.home_page) + #self.split_view(self.home_page) + ui.label('Content of B') with ui.tab_panel('Pinch-offs'): ui.label('Content of C') - ui.run(port = 8080) + ui.run(port = 8081) diff --git a/src/main.py b/src/main.py index d23563f..2ae0e13 100644 --- a/src/main.py +++ b/src/main.py @@ -5,16 +5,16 @@ Entry point to the auto tuner. ''' -from logger import Logger -from instrument_control import InstrumentControl -from data_analysis import DataAnalysis +#from logger import Logger +#from instrument_control import InstrumentControl +#from data_analysis import DataAnalysis from gui import tuner_gui -log = Logger() -ic = InstrumentControl(log, config = config, tuner_config = tuner_config, station_config = station_config) -om = DataAnalysis(log, config = config, tuner_config = tuner_config, station_config = station_config) -gui = tuner_gui(log, config = config, tuner_config = tuner_config, station_config = station_config) - +#log = Logger() +#ic = InstrumentControl(log, config = config, tuner_config = tuner_config, station_config = station_config) +#om = DataAnalysis(log, config = config, tuner_config = tuner_config, station_config = station_config) +#gui = tuner_gui(log, config = config, tuner_config = tuner_config, station_config = station_config) +gui = tuner_gui() gui.start() From c1f2edf6813ba6cf33ddd41d21b5515d7d3d1ea3 Mon Sep 17 00:00:00 2001 From: VanOschB <118694763+VanOschB@users.noreply.github.com> Date: Thu, 2 Apr 2026 15:37:39 -0400 Subject: [PATCH 4/6] experiment_thread development developed the experiment_thread file, which will be used to keep queue and keep track of jobs running of the experiment thread. Also, redefined the instrument_control file to write_control. --- src/experiment_thread.py | 70 ++++++++++++++++++ src/main.py | 2 +- src/tuner.ipynb | 32 +++++++- ...instrument_control.py => write_control.py} | 74 +++++++++++++------ 4 files changed, 154 insertions(+), 24 deletions(-) create mode 100644 src/experiment_thread.py rename src/{instrument_control.py => write_control.py} (94%) diff --git a/src/experiment_thread.py b/src/experiment_thread.py new file mode 100644 index 0000000..a670985 --- /dev/null +++ b/src/experiment_thread.py @@ -0,0 +1,70 @@ +''' +File: experiment_thread.py +Authors: Benjamin Van Osch (bvanosch@uwaterloo.ca), Mason Daub (mjdaub@uwaterloo.ca) + +This file contains classes related to running experiments from the autotuning code. Experiments are put into a queue, which +keeps track of which experiments to in which order. The queue can be stached and wait for user input to continue, or can be cleared +with an Abort call from the user. +''' + +# Imports + + +import threading +from queue import PriorityQueue + +class ExperimentThread: + + + def __init__(self): + + + self.job_event = threading.Event() + self.abort_event = threading.Event() + self.shutdown_event = threading.Event() + self.job_queue = PriorityQueue() + self.THREAD_NAME = "experimental_thread" + self.thread = threading.Thread(target = self.__thread_loop__, name = self.THREAD_NAME) + + def run(self): + + self.thread.start() + + def join(self): + + self.thread.join() + + def __assert_correct_thread__(self): + + assert threading.current_thread().name == self.THREAD_NAME, f"The current thread, {threading.current_thread().name}, is not the Experiment Thread." + + def add_job(self, + f: callable, + args, + priority: int = 1): + + self.job_queue.put((priority,(f, args))) + + def abort(self): + + self.abort_event.set() + + + def __thread_loop__(self, job): + + while not self.shutdown_event.set(): + + self.job_event.wait(timeout = 1) + + if self.job_queue.qsize() > 0: + priority, data = self.job_queue.get() + + f, args = data + + f(*args, self.abort_event) + + self.job_queue.task_done() + + while self.abort_event.is_set(): + self.job_queue.get() + diff --git a/src/main.py b/src/main.py index 2b6d279..a039325 100644 --- a/src/main.py +++ b/src/main.py @@ -9,7 +9,7 @@ import yaml from logger import Logger -from instrument_control import InstrumentControl +from src.write_control import InstrumentControl from data_analysis import DataAnalysis from gui import tuner_gui diff --git a/src/tuner.ipynb b/src/tuner.ipynb index 256127a..1da2ddf 100644 --- a/src/tuner.ipynb +++ b/src/tuner.ipynb @@ -1,5 +1,33 @@ { "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "ename": "", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[1;31mRunning cells with 'base (Python 3.13.11)' requires the ipykernel package.\n", + "\u001b[1;31mCreate a Python Environment with the required packages." + ] + } + ], + "source": [ + "a = 5\n", + "\n", + "def f(x):\n", + " x+= 1\n", + "\n", + " return x\n", + "\n", + "f(a)\n", + "\n", + "print(a)" + ] + }, { "cell_type": "code", "execution_count": null, @@ -622,7 +650,7 @@ ], "metadata": { "kernelspec": { - "display_name": "UpdatedTuner", + "display_name": "base", "language": "python", "name": "python3" }, @@ -636,7 +664,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.16" + "version": "3.13.11" } }, "nbformat": 4, diff --git a/src/instrument_control.py b/src/write_control.py similarity index 94% rename from src/instrument_control.py rename to src/write_control.py index 73ef4d8..dec3773 100644 --- a/src/instrument_control.py +++ b/src/write_control.py @@ -1,3 +1,14 @@ +''' +File: write_control.py +Authors: Benjamin Van Osch (bvanosch@uwaterloo.ca), Mason Daub (mjdaub@uwaterloo.ca) + +This file contains the WriteControl class, which handles all setting of values to instruments, +including sweeps and static voltage configurations. + +Currently, QCodes functions are used to carry out sweeps of instrument parameters, however in-house sweep functions +are currently in development. +''' + # Import modules import yaml, datetime, sys, time, os, shutil, json,re @@ -105,7 +116,7 @@ def get_after_set(self) -> bool: def setpoints(self) -> npt.NDArray[np.float64]: return self.get_setpoints() -class InstrumentControl: +class WriteControl: def __init__(self, logger, @@ -366,7 +377,7 @@ def set_voltage(self, def set_voltage_configuration(self, voltage_configuration: Dict[str, float] = {}, - stepsize: float = 10e-3): + stepsize: float = 1e-3): """ This method allows the user to smoothly set a given voltage configuration. @@ -374,7 +385,7 @@ def set_voltage_configuration(self, Args: voltage_configuration (Dict[str, float]): A dictionary containing the names of the gates to be set and the corresponding voltages the gates will be set to. - stepsize (float): The voltage stepsize for all the gates. Default is set to 10 mV. + stepsize (float): The voltage stepsize for all the gates. Default is set to 1 mV. """ # First, we determine which gates are being set. @@ -458,11 +469,12 @@ def set_voltage_configuration(self, return None - def sweep_1d(self, - maxV: float = None, - minV: float = None, - voltage_configuration: Dict[str, float] = {}, - dV: float = 10e-3) -> pd.DataFrame: + def sweep_1d_linsweep(self, + gate: str, + maxV: float = None, + minV: float = None, + voltage_configuration: Dict[str, float] = {}, + dV: float = 10e-3) -> pd.DataFrame: # Bring device to voltage configuration @@ -486,9 +498,8 @@ def sweep_1d(self, # Set up gate sweeps num_steps = self.calculate_num_of_steps(minV, maxV, dV) - gates_involved = self.barriers + self.leads + self.accumulation + self.plungers - - print(gates_involved) + + gates_involved = gate self.logger.info(f"setting {gates_involved} to {minV} V") @@ -528,13 +539,13 @@ def sweep_1d(self, return None - def sweep_2d(self, - P1: str = None, - P2: str = None, - P1_bounds: tuple = (None, None), - P2_bounds: tuple = (None, None), - dV: float | tuple = None, - voltage_configuration: dict = None) -> tuple[pd.DataFrame, plt.Axes]: + def sweep_2d_linsweep(self, + P1: str = None, + P2: str = None, + P1_bounds: tuple = (None, None), + P2_bounds: tuple = (None, None), + dV: float | tuple = None, + voltage_configuration: dict = None) -> tuple[pd.DataFrame, plt.Axes]: # Bring device to voltage configuration if voltage_configuration is not None: @@ -647,6 +658,9 @@ def smooth_reset(): return None + def sweep_nd_linsweep(self): + pass + def sweep_1d_measurement(self, maxV: float = None, minV: float = None, @@ -854,12 +868,30 @@ def smooth_reset(): return None - def sweep_nd(self): - pass - def sweep_nd_measurement(self): pass + def sweep_1d(self, + gate: str, + startV: float = None, + endV: float = None, + voltage_configuration: Dict[str, float] = {}, + dV: float = 10e-3) -> pd.DataFrame: + + """ + This method allows the user to sweep a given gate parameter from a pre-defined start and end point, with a given stepsize. + + Args: + gate (str): + startV (float): + endV (flaot): + voltage_configuration (Dict[str, float]): A dictionary containing the names of the gates to be set and + the corresponding voltages the gates will be set to. + dV (float): The voltage stepsize for all the gates. Default is set to 1 mV. + """ + + return None + def check_break_conditions(self): # Go through device break conditions to see if anything is flagged, From fe0df92be720aeaf7f1abef1e7e33b81b1a3f3f7 Mon Sep 17 00:00:00 2001 From: Mason Daub Date: Thu, 2 Apr 2026 17:09:07 -0400 Subject: [PATCH 5/6] Fixed a problem with nicegui running my code twice. Now the buffered_readout is a singleton --- src/buffered_readout.py | 18 +++++++++++++++--- src/gui.py | 23 ++++++++++++++++++----- src/main.py | 16 +++++----------- 3 files changed, 38 insertions(+), 19 deletions(-) diff --git a/src/buffered_readout.py b/src/buffered_readout.py index 3f7bfb0..258cde3 100644 --- a/src/buffered_readout.py +++ b/src/buffered_readout.py @@ -5,6 +5,16 @@ import random __BufferExists__ = False +__Instance__ = None + + +def create_buffer_instance(): + global __Instance__ + if __Instance__ is None: + __Instance__ = buffered_readout() + + return __Instance__ + class buffered_readout: def __init__(self): @@ -17,7 +27,7 @@ def __init__(self): assert not __BufferExists__, "Error: Readout buffer already exists!!" - __exists__ = True + __BufferExists__ = True self.time_func = time.time @@ -36,7 +46,7 @@ def __init__(self): self.THREAD_NAME = "BufferThread" self.thread = threading.Thread(target = self.__thread_loop__, name = self.THREAD_NAME) - + self.running = False self.shutdown_event = threading.Event() def __assert_correct_thread__(self): @@ -123,7 +133,9 @@ def get_buffer(self) -> Tuple | None: return (retval, timestamps) def run(self): - self.thread.start() + if not self.running: + self.thread.start() + self.running = True def join(self): self.shutdown_event.set() self.thread.join() diff --git a/src/gui.py b/src/gui.py index 0b75458..61d4f34 100644 --- a/src/gui.py +++ b/src/gui.py @@ -9,12 +9,13 @@ import numpy as np import matplotlib.pyplot as plt -from nicegui import ui +from nicegui import ui, app import os import threading import time -from buffered_readout import buffered_readout +from buffered_readout import create_buffer_instance +_gui_instances = [] class tuner_gui: def __init__(self): @@ -23,11 +24,15 @@ def __init__(self): :param self: ''' - print(threading.current_thread().name) - self.lipsum_text = 'Lorem ipsum dolor sit, amet consectetur adipisicing elit. Quis praesentium cumque magnam odio iure quidem, quod illum numquam possimus obcaecati commodi minima assumenda consectetur culpa fuga nulla ullam. In, libero.' + global _FirstPass + + self.lipsum_text = 'Lorem ipsum dolor sit, amet consectetur adipisicing elit. Quis praesentium cumque magnam odio iure quidem, quod illum numquam possimus obcaecati commodi minima assumenda consectetur culpa fuga nulla ullam. In, libero.' + print(threading.current_thread().name) + global _gui_instance + _gui_instances.append(self) - self.readout = buffered_readout() + self.readout = create_buffer_instance() self.readout.run() @@ -136,3 +141,11 @@ def start(self): ui.label('Content of C') ui.run(port = 8081) + def on_shutdown(self): + self.readout.join() + +@app.on_shutdown +def shutdown(): + global _gui_instances + for inst in _gui_instances: + inst.on_shutdown() diff --git a/src/main.py b/src/main.py index 2ae0e13..9b9efda 100644 --- a/src/main.py +++ b/src/main.py @@ -5,16 +5,10 @@ Entry point to the auto tuner. ''' -#from logger import Logger -#from instrument_control import InstrumentControl -#from data_analysis import DataAnalysis from gui import tuner_gui - -#log = Logger() -#ic = InstrumentControl(log, config = config, tuner_config = tuner_config, station_config = station_config) -#om = DataAnalysis(log, config = config, tuner_config = tuner_config, station_config = station_config) -#gui = tuner_gui(log, config = config, tuner_config = tuner_config, station_config = station_config) -gui = tuner_gui() - -gui.start() +if __name__ in {"__main__", "__mp_main__"}: + print("Creating") + gui = tuner_gui() + print("Starting") + gui.start() From 523a53fe8b30a0716cc9dc10041411ce83a28277 Mon Sep 17 00:00:00 2001 From: Mason Daub Date: Thu, 2 Apr 2026 17:36:11 -0400 Subject: [PATCH 6/6] Attempting to fix multiple instances of the buffer thread starting --- src/gui.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/gui.py b/src/gui.py index 61d4f34..213864d 100644 --- a/src/gui.py +++ b/src/gui.py @@ -34,9 +34,6 @@ def __init__(self): self.readout = create_buffer_instance() - self.readout.run() - - def plotting_panel(self): with ui.matplotlib().figure as fig: @@ -142,10 +139,20 @@ def start(self): ui.run(port = 8081) def on_shutdown(self): + print("Starting on_shutdown") self.readout.join() + def on_startup(self): + print("Starting on_startup") + self.readout.run() @app.on_shutdown def shutdown(): global _gui_instances for inst in _gui_instances: inst.on_shutdown() + +@app.on_startup +def startup(): + global _gui_instances + for inst in _gui_instances: + inst.on_startup()