Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from qualibrate.parameters import RunnableParameters
from qualibration_libs.parameters import CommonNodeParameters
from calibration_utils.common_utils.experiment import BaseExperimentNodeParameters
from typing import Optional, List


class NodeSpecificParameters(RunnableParameters):
Expand All @@ -11,10 +12,12 @@ class NodeSpecificParameters(RunnableParameters):
"""Minimum voltage offset for the sensor gate sweep in volts. Default is -0.2 V."""
offset_max: float = 0.2
"""Maximum voltage offset for the sensor gate sweep in volts. Default is 0.2 V."""
offset_step: float = 0.005
"""Step size for the voltage offset sweep in volts. Default is 0.005 V."""
offset_step: float = 0.01
"""Step size for the voltage offset sweep in volts. Default is 0.01 V."""
duration_after_step: int = 1000
"""Wait duration after each voltage step in nanoseconds. Default is 1000 ns (1 µs)."""
sensor_names: Optional[List[str]] = None
"""The list of sensor dot names to be included in the measurement. """


class Parameters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from qualang_tools.multi_user import qm_session
from qualang_tools.results import progress_counter
from qualang_tools.units import unit
from qualang_tools.loops import from_array

from qualibrate import QualibrationNode
from quam_config import Quam
Expand Down Expand Up @@ -41,7 +42,9 @@
"""


node = QualibrationNode[Parameters, Quam](name="05_sensor_gate_sweep", description=description, parameters=Parameters())
node = QualibrationNode[Parameters, Quam](
name="05a_sensor_gate_sweep_opx", description=description, parameters=Parameters()
)


# Any parameters that should change for debugging purposes only should go in here
Expand All @@ -54,6 +57,7 @@ def custom_param(node: QualibrationNode[Parameters, Quam]):
# node.parameters.offset_min = -0.1
# node.parameters.offset_max = 0.1
# node.parameters.offset_step = 0.01
node.parameters.simulate = True
pass


Expand All @@ -65,6 +69,63 @@ def custom_param(node: QualibrationNode[Parameters, Quam]):
@node.run_action(skip_if=node.parameters.load_data_id is not None)
def create_qua_program(node: QualibrationNode[Parameters, Quam]):
"""Create the sweep axes and generate the QUA program from the pulse sequence and the node parameters."""
# Class containing tools to help handle units and conversions.
u = unit(coerce_to_integer=True)

# Get the relevant sensor dots rom the node
node.namespace["sensors"] = sensors = get_sensors(node)

num_sensors = len(sensors)

# Extract the sweep parameters and axes from the node parameters
n_avg = node.parameters.num_shots

# The voltage offset sweep
bias_offsets = np.arange(node.parameters.offset_min, node.parameters.offset_max, node.parameters.offset_step)

# Register the sweep axes to be added to the dataset when fetching data
node.namespace["sweep_axes"] = {
"sensors": xr.DataArray(sensors.get_names()),
"bias_offsets": xr.DataArray(bias_offsets, attrs={"long_name": "Sensor bias offset", "units": "V"}),
}

# The QUA program stored in the node namespace to be transfer to the simulation and execution run_actions
with program() as node.namespace["qua_program"]:

I, I_st, Q, Q_st, n, n_st = node.machine.declare_qua_variables(num_IQ_pairs=num_sensors)
offset = declare(fixed) # QUA variable for the readout frequency

# No qubits yet at this point in the experiment - we only have sensors, batched by multiplexing. Simultaneous operation no problem
for multiplexed_sensors in sensors.batch():
align()
with for_(n, 0, n < n_avg, n + 1):
save(n, n_st)

with for_(*from_array(offset, bias_offsets)):
for i, sensor in multiplexed_sensors.items():
# Sweep the sensor bias voltage
readout_len = sensor.readout_resonator.operations["readout"].length
# sensor.physical_channel.set_dc_offset(offset=offset)
sensor.step_to_voltages(
{sensor.name: offset}, duration=readout_len + node.parameters.duration_after_step
)
# Measure the resonator after settling the sensor bias point
sensor.readout_resonator.wait(node.parameters.duration_after_step // 4)
sensor.readout_resonator.measure("readout", qua_vars=(I[i], Q[i]))
# save data
save(I[i], I_st[i])
save(Q[i], Q_st[i])
align()

for i, sensor in multiplexed_sensors.items():
sensor.apply_compensation_pulse(ramp_to_zero=True)
align()

with stream_processing():
n_st.save("n")
for i in range(num_sensors):
I_st[i].buffer(len(bias_offsets)).average().save(f"I{i + 1}")
Q_st[i].buffer(len(bias_offsets)).average().save(f"Q{i + 1}")


# %% {Simulate}
Expand Down Expand Up @@ -140,13 +201,15 @@ def update_state(node: QualibrationNode[Parameters, Quam]):
for sensor in node.namespace["sensors"]:
if not node.results["fit_results"][sensor.name]["success"]:
continue

optimal_offset = 0.0 # find_optimal_offset(node.results["ds_fit"], sensor.name)
# TODO: replace "measure" by its enum
sensor.add_point(
"measure",
voltages={sensor.name: optimal_offset},
duration=sensor.readout_resonator.operation["readout"].duration,
)
# TODO: replace "measure" by its enum
sensor.add_point(
"measure",
voltages={sensor.name: optimal_offset},
duration=sensor.readout_resonator.operation["readout"].duration,
replace_existing_point=True,
)


# %% {Save_results}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# %% {Imports}
import matplotlib.pyplot as plt
import numpy as np
import xarray as xr
from dataclasses import asdict

from qm.qua import *

from qualang_tools.multi_user import qm_session
from qualang_tools.results import progress_counter
from qualang_tools.units import unit
from qualang_tools.loops import from_array

from qualibrate import QualibrationNode
from quam_config import Quam
from calibration_utils.sensor_gate_sweep import Parameters
from calibration_utils.common_utils.experiment import get_sensors
from qualibration_libs.runtime import simulate_and_plot
from qualibration_libs.data import XarrayDataFetcher
from qualibration_libs.core import tracked_updates

# %% {Node initialisation}
description = """
CHARGE SENSOR GATE SWEEP with the QDAC-II
This sequence involves sweeping the voltage biasing the sensor gate using a QDAC channel connected to the DC line of
the bias-tee. For optimizing runtime, the QDAC preloads the list of voltage points and a digital marker from the OPX is
used in order to perform the sweep.
The OPX measures the response of the sensor dot via RF reflectometry, recording the I and Q quadratures of the demodulated signal.

The measurement performs a voltage sweep across a specified range with configurable step size. At each voltage point,
a readout pulse is sent to the resonator coupled to the sensor dot, and the reflected signal is demodulated and recorded.
A global average is performed (averaging on the most outer loop) and the data is extracted while the program is running
to display the sensor response with increasing SNR.

Prerequisites:
- Connect the DC line of the bias-tee connected to the sensor dot to one QDAC-II channel.
- Having initialized the Quam (quam_config/populate_quam_state_*.py).
- Having calibrated the resonators coupled to the SensorDot components.

State update:
- Update the optimal voltage bias of each sensor dot.
"""


node = QualibrationNode[Parameters, Quam](
name="05b_sensor_gate_sweep_qdac", description=description, parameters=Parameters()
)


# Any parameters that should change for debugging purposes only should go in here
# These parameters are ignored when run through the GUI or as part of a graph
@node.run_action(skip_if=node.modes.external)
def custom_param(node: QualibrationNode[Parameters, Quam]):
# You can get type hinting in your IDE by typing node.parameters.
# node.parameters.sensor_names = ["sensor_1"]
# node.parameters.num_shots = 10
# node.parameters.offset_min = -0.1
# node.parameters.offset_max = 0.1
# node.parameters.offset_step = 0.01
node.parameters.simulate = True
pass


# Instantiate the QUAM class from the state file
node.machine = Quam.load("C:\git\qua-libs\qualibration_graphs\quantum_dots\quam_config\quam_state")


# %% {Create_QUA_program}
@node.run_action(skip_if=node.parameters.load_data_id is not None)
def create_qua_program(node: QualibrationNode[Parameters, Quam]):
"""Create the sweep axes and generate the QUA program from the pulse sequence and the node parameters."""
# Class containing tools to help handle units and conversions.
u = unit(coerce_to_integer=True)

# Get the relevant sensor dots rom the node
node.namespace["sensors"] = sensors = get_sensors(node)

num_sensors = len(sensors)

# Extract the sweep parameters and axes from the node parameters
n_avg = node.parameters.num_shots

# The voltage offset sweep
bias_offsets = np.arange(node.parameters.offset_min, node.parameters.offset_max, node.parameters.offset_step)

# Set up the DC lists to load into the QDAC.
dc_lists = {}
for sensor in sensors: # TODO: Issue with qcodes driver?
dc_lists[sensor.name] = node.machine.qdac.channel(sensor.physical_channel.qdac_spec.qdac_output_port).dc_list(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the sweep going to be virtual? This goes for the charge stability node too, but perhaps we need to load multiple dc_lists for the physical outputs of the virtual channels. Here is a snippet of what I did in qua-dashboards:


    def _find_physical_dc_lists(
        self, 
        qdac_sweep_axis: VoltageSweepAxis,
    ) -> Dict[str, Union[List, np.ndarray]]: 
        """Use the sweep axis abnd scab mode to yield a dictionary of physical dc_lists to use for the Qdac"""

        axis_name = qdac_sweep_axis.name
        axis_values = qdac_sweep_axis.sweep_values_with_offset

        _, y_idxs = self.scan_mode.get_idxs(x_points = 1, y_points = len(axis_values))
        ordered_axis_values = axis_values[y_idxs]

        dc_set = self.dc_set
        full_physical_dicts = {name: [] for name in dc_set.channels.keys()}

        for value in ordered_axis_values: 
            virtual_dict = {axis_name: float(value)}
            physical_dict = dc_set.resolve_voltages(virtual_dict)

            for physical_gate in dc_set.channels.keys(): 
                full_physical_dicts[physical_gate].append(physical_dict[physical_gate])

        return {name: np.array(l) for name, l in full_physical_dicts.items()}
    
    def _prepare_qdac_sweeps(
        self,
    ) -> None: 
        """
        Prepares the DC list attributes for the QDAC channel. This function assumes the use of the 
        Qdac2 driver from qcodes_contrib_drivers. This also assumes that the VoltageGate objects have 
        their QdacSpec objects configured with the qdac_output_port and opx_trigger_out. 
        """
        if not isinstance(self.y_axis, VoltageSweepAxis): 
            raise ValueError("Qdac Sweep Axis must be mode 'Voltage'")
        
        self._last_dc_list_y_offset = self.y_axis.offset_parameter.get_latest()
        physical_dc_lists = self._find_physical_dc_lists(self.y_axis)

        for name, voltages in physical_dc_lists.items():
            dc_list = self.qdac.channel(self.dc_set.channels[name].qdac_spec.qdac_output_port).dc_list(
                voltages = voltages, 
                dwell_s = self.qdac_dwell_time_us/1e6, 
                stepped = True,
            )
            dc_list.start_on_external(trigger = self.qdac_ext_trigger_input_port)

voltages=bias_offsets,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point in this section, you'd probably have to do node.machine.connect_to_external_source(external_qdac = True) to instantiate machine.qdac, and if the machine has the qdac_ip in the network, it should directly connect via the qcodes driver. To be replaced with in-house in future, but can keep the api here the same.

For a more intuitive api, should we integrate the connect_to_external_source call in the machine.connect() function? It can look in its network to see if qdac_ip exists, and conditionally connect to external source

dwell_s=10e-6,
stepped=True,
)
dc_lists[sensor.name].start_on_external(trigger=sensor.physical_channel.qdac_spec.qdac_trigger_in)

# Register the sweep axes to be added to the dataset when fetching data
node.namespace["sweep_axes"] = {
"sensors": xr.DataArray(sensors.get_names()),
"bias_offsets": xr.DataArray(bias_offsets, attrs={"long_name": "Sensor bias offset", "units": "V"}),
}

# The QUA program stored in the node namespace to be transfer to the simulation and execution run_actions
with program() as node.namespace["qua_program"]:

I, I_st, Q, Q_st, n, n_st = node.machine.declare_qua_variables(num_IQ_pairs=num_sensors)
offset = declare(fixed) # QUA variable for the readout frequency

# No qubits yet at this point in the experiment - we only have sensors, batched by multiplexing. Simultaneous operation no problem
for multiplexed_sensors in sensors.batch():
align()
with for_(n, 0, n < n_avg, n + 1):
save(n, n_st)
with for_(*from_array(offset, bias_offsets)):
for i, sensor in multiplexed_sensors.items():
# Play a digital marker to update the voltage outputted by the QDAC channel
sensor.physical_channel.qdac_spec.opx_trigger_out.play("trigger")
# Wait for the voltage to settle
sensor.physical_channel.settle()
sensor.readout_resonator.align(sensor.physical_channel.name)
# Measure the resonator after settling the sensor bias point
sensor.readout_resonator.measure("readout", qua_vars=(I[i], Q[i]))
# save data
save(I[i], I_st[i])
save(Q[i], Q_st[i])
align()

with stream_processing():
n_st.save("n")
for i in range(num_sensors):
I_st[i].buffer(len(bias_offsets)).average().save(f"I{i + 1}")
Q_st[i].buffer(len(bias_offsets)).average().save(f"Q{i + 1}")


# %% {Simulate}
@node.run_action(skip_if=node.parameters.load_data_id is not None or not node.parameters.simulate)
def simulate_qua_program(node: QualibrationNode[Parameters, Quam]):
"""Connect to the QOP and simulate the QUA program"""
# Connect to the QOP
qmm = node.machine.connect()
# Get the config from the machine
config = node.machine.generate_config()
# Simulate the QUA program, generate the waveform report and plot the simulated samples
samples, fig, wf_report = simulate_and_plot(qmm, config, node.namespace["qua_program"], node.parameters)
# Store the figure, waveform report and simulated samples
node.results["simulation"] = {"figure": fig, "wf_report": wf_report, "samples": samples}


# %% {Execute}
@node.run_action(skip_if=node.parameters.load_data_id is not None or node.parameters.simulate)
def execute_qua_program(node: QualibrationNode[Parameters, Quam]):
"""Connect to the QOP, execute the QUA program and fetch the raw data and store it in a xarray dataset called "ds_raw"."""
# Connect to the QOP
qmm = node.machine.connect()
# Get the config from the machine
config = node.machine.generate_config()
# Execute the QUA program only if the quantum machine is available (this is to avoid interrupting running jobs).
with qm_session(qmm, config, timeout=node.parameters.timeout) as qm:
# The job is stored in the node namespace to be reused in the fetching_data run_action
node.namespace["job"] = job = qm.execute(node.namespace["qua_program"])
# Display the progress bar
data_fetcher = XarrayDataFetcher(job, node.namespace["sweep_axes"])
for dataset in data_fetcher:
progress_counter(
data_fetcher.get("n", 0),
node.parameters.num_shots,
start_time=data_fetcher.t_start,
)
# Display the execution report to expose possible runtime errors
node.log(job.execution_report())
# Register the raw dataset
node.results["ds_raw"] = dataset


# %% {Load_historical_data}
@node.run_action(skip_if=node.parameters.load_data_id is None)
def load_data(node: QualibrationNode[Parameters, Quam]):
"""Load a previously acquired dataset."""
load_data_id = node.parameters.load_data_id
# Load the specified dataset
node.load_from_id(node.parameters.load_data_id)
node.parameters.load_data_id = load_data_id
# Get the active sensors from the loaded node parameters
node.namespace["sensors"] = get_sensors(node)


# %% {Analyse_data}
@node.run_action(skip_if=node.parameters.simulate)
def analyse_data(node: QualibrationNode[Parameters, Quam]):
"""Analyse the raw data and store the fitted data in another xarray dataset "ds_fit" and the fitted results in the "fit_results" dictionary."""


# %% {Plot_data}
@node.run_action(skip_if=node.parameters.simulate)
def plot_data(node: QualibrationNode[Parameters, Quam]):
"""Plot the raw and fitted data."""


# %% {Update_state}
@node.run_action(skip_if=node.parameters.simulate)
def update_state(node: QualibrationNode[Parameters, Quam]):
"""Update the relevant parameters if the sensor data analysis was successful."""

with node.record_state_updates():
for sensor in node.namespace["sensors"]:
if not node.results["fit_results"][sensor.name]["success"]:
continue

optimal_offset = 0.0 # find_optimal_offset(node.results["ds_fit"], sensor.name)
sensor.physical_channel.offset_parameter(optimal_offset)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use VirtualDCSet, so that a virtual point can be used instead of setting it directly via the physical channel

machine.virtual_dc_sets[virtual_gate_set_id].set_voltages({sensor.name : optimal_offset})



# %% {Save_results}
@node.run_action()
def save_results(node: QualibrationNode[Parameters, Quam]):
node.save()
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

from quam_builder.architecture.quantum_dots.components import ( # type: ignore[import-not-found]
VoltageGate,
)
from quam_builder.architecture.quantum_dots.components.xy_drive import ( # type: ignore[import-not-found]
XYDriveIQ,
XYDriveMW,
)
from quam_builder.architecture.quantum_dots.components.readout_resonator import ( # type: ignore[import-not-found]
ReadoutResonatorIQ,
Expand Down Expand Up @@ -224,22 +222,15 @@ def _create_minimal_machine() -> Tuple[LossDiVincenzoQuam, dict]:
}
xy_drives = {}
for i, (fem_id, port_i, port_q) in xy_port_map.items():
xy_drives[i] = XYDriveIQ(
xy_drives[i] = XYDriveMW(
id=f"Q{i}_xy",
opx_output_I=LFFEMAnalogOutputPort(
controller_id=controller,
fem_id=fem_id,
port_id=port_i,
output_mode="direct",
),
opx_output_Q=LFFEMAnalogOutputPort(
opx_output=MWFEMAnalogOutputPort(
controller_id=controller,
fem_id=fem_id,
port_id=port_q,
output_mode="direct",
),
frequency_converter_up=FrequencyConverter(
local_oscillator=LocalOscillator(frequency=0),
fem_id=mw_fem_slot,
port_id=i,
upconverter_frequency=5e9,
band=2,
full_scale_power_dbm=10,
),
intermediate_frequency=100e6,
)
Expand Down
Loading
Loading