Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import logging
from dataclasses import dataclass
from typing import Dict, Tuple

import numpy as np
import xarray as xr
from qualibrate import QualibrationNode

# @dataclass
# class FitResults:
# """Stores the relevant fit parameters for a single qubit pair in an RB experiment"""

# optimal_amplitude: float
# success: bool


def log_fitted_results(fit_results: Dict[str, float], log_callable=None):
"""
Logs the node-specific fitted results for all qubit pairs.

Parameters:
-----------
fit_results : Dict[str, float]
Dictionary containing floats for each qubit pair.
log_callable : callable, optional
Logger for logging the fitted results. If None, a default logger is used.
"""
if log_callable is None:
log_callable = logging.getLogger(__name__).info

for qp_name, fit_result in fit_results.items():
s_qubit = f"Results for qubit pair {qp_name}: "

s_alpha = f"\tFitted alpha: {fit_result["alpha"]:.6f} a.u."
s_fidelity = f"\tFitted fidelity: {100*fit_result["fidelity"]:.6f} %"

if fit_result["success"]:
s_qubit += "SUCCESS!\n"
else:
s_qubit += "FAIL!\n"

log_message = s_qubit + s_alpha + s_fidelity

log_callable(log_message)


def process_raw_dataset(ds: xr.Dataset, node: QualibrationNode):
"""
Process the raw dataset by adding amplitude and detuning coordinates.

Parameters:
-----------
ds : xr.Dataset
Raw dataset from the experiment
node : QualibrationNode
The calibration node containing qubit pairs information

Returns:
--------
xr.Dataset
Processed dataset with additional coordinates
"""
ds = node.results["ds_raw"]
# Assume ds is your input dataset and ds['state'] is your DataArray
state = ds["state"] # shape: (qubit, shots, sequence, depths)

# Outcome labels for 2-qubit states
labels = ["00", "01", "10", "11"]

# Create a list of DataArrays: one for each outcome
probs = [state == i for i in range(4)]

# Stack along a new outcome dimension
probs = xr.concat(probs, dim="outcome")

# Assign outcome labels
probs = probs.assign_coords(outcome=("outcome", labels))

probs_00 = probs.sel(outcome="00")
probs_00 = probs_00.rename({"shots": "average", "sequence": "repeat", "depths": "circuit_depth"})
probs_00 = probs_00.transpose("qubit_pair", "repeat", "circuit_depth", "average")

probs_00 = probs_00.astype(int)

ds_transposed = ds.rename({"shots": "average", "sequence": "repeat", "depths": "circuit_depth"})
ds_transposed = ds_transposed.transpose("qubit_pair", "repeat", "circuit_depth", "average")

return ds_transposed


# def fit_raw_data(ds: xr.Dataset, node: QualibrationNode) -> Tuple[xr.Dataset, Dict[str, FitResults]]:
# """
# Fit the CZ conditional phase data for each qubit pair.

# Parameters:
# -----------
# ds : xr.Dataset
# Dataset containing the processed data.
# node : QualibrationNode
# The calibration node containing parameters and qubit pairs.

# Returns:
# --------
# Tuple[xr.Dataset, Dict[str, FitResults]]
# Dataset with fit results and dictionary of fit results for each qubit pair.
# """
# # For RB analysis, no fitting routine is currently implemented.
# # Pass the dataset through unchanged, or implement RB-specific fitting here if needed.
# ds_fit = ds

# # Extract the relevant fitted parameters
# ds_fit, fit_results = _extract_relevant_parameters(ds_fit, node)

# return ds_fit, fit_results


# def _extract_relevant_parameters(
# ds_fit: xr.Dataset, node: QualibrationNode
# ) -> Tuple[xr.Dataset, Dict[str, FitResults]]:
# """
# Extract relevant fit parameters and create FitResults for each qubit pair.

# Parameters:
# -----------
# ds_fit : xr.Dataset
# Dataset containing the fit results from fit_routine.
# node : QualibrationNode
# The calibration node containing parameters and qubit pairs.

# Returns:
# --------
# Tuple[xr.Dataset, Dict[str, FitResults]]
# Dataset with additional metadata and dictionary of FitResults for each qubit pair.
# """
# qubit_pairs = node.namespace["qubit_pairs"]

# # Add metadata attributes to the dataset
# if "optimal_amplitude" in ds_fit.data_vars:
# ds_fit.optimal_amplitude.attrs = {"long_name": "optimal CZ amplitude", "units": "a.u."}
# if "phase_diff" in ds_fit.data_vars:
# ds_fit.phase_diff.attrs = {"long_name": "phase difference", "units": "2π"}
# if "fitted_curve" in ds_fit.data_vars:
# ds_fit.fitted_curve.attrs = {"long_name": "fitted tanh curve", "units": "2π"}

# # Create FitResults for each qubit pair
# fit_results = {}
# for qp in qubit_pairs:
# qp_name = qp.name
# qp_data = ds_fit.sel(qubit_pair=qp_name)

# fit_results[qp_name] = FitResults(
# optimal_amplitude=float(qp_data.optimal_amplitude.values),
# success=bool(qp_data.success.values),
# )

# return ds_fit, fit_results
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from typing import List, Union

import numpy as np
from calibration_utils.two_qubit_interleaved_rb.rb_utils import EPS
from qiskit import QuantumCircuit
from qiskit.circuit import QuantumCircuit
from qiskit.converters import circuit_to_dag, dag_to_circuit

# Gate to integer mapping for single qubit gates
SINGLE_QUBIT_GATE_MAP = {"sx": 0, "x": 1, "sy": 2, "y": 3, "rz(pi/2)": 4, "rz(pi)": 5, "rz(3pi/2)": 6, "idle": 7}

# Two-qubit gate mapping
TWO_QUBIT_GATE_MAP = {"cz": 64, "idle_2q": 65} # Start at 64 to avoid overlap with single-qubit combinations


def get_gate_name(gate) -> str:
"""Extract the gate name and parameters in a standardized format."""
name = gate.name.lower()
if name == "rz":
angle = gate.params[0]
if np.isclose(angle, np.pi / 2):
return name + "(pi/2)"
elif np.isclose(angle, np.pi) or np.isclose(angle, -np.pi):
return name + "(pi)"
elif np.isclose(angle, 3 * np.pi / 2) or np.isclose(angle, -np.pi / 2):
return name + "(3pi/2)"
else:
raise ValueError(f"Unsupported angle: {angle}")
return name


def get_layer_integer(layer: Union[list[int, int], str]) -> int:
"""
Convert a layer configuration to an integer.

Args:
layer: Either a tuple of two single-qubit gate integers or a two-qubit gate name

Returns:
Integer representing the layer configuration
"""
if isinstance(layer[0], list):
# For single-qubit gate pairs, use a formula to generate unique integers
# This maps (g1, g2) to a unique integer in range [0, 63]
return layer[0][0] * len(SINGLE_QUBIT_GATE_MAP) + layer[0][1]
elif isinstance(layer[0], str):
return TWO_QUBIT_GATE_MAP[layer[0]]
else:
raise ValueError(f"Unsupported layer : {layer[0]}")


def process_circuit_to_integers(circuit: QuantumCircuit) -> List[int]:
"""
Process a quantum circuit and return a list of integers representing the circuit.

Args:
circuit: A Qiskit QuantumCircuit with 2 qubits

Returns:
List of integers representing the circuit configuration between barriers
"""
result = []
current_layer = [[7, 7]] # Initialize with idle gates for both qubits

for instruction in circuit:
if instruction.operation.name == "barrier" and current_layer != [[7, 7]]:
# Convert current layer to integer and add to result
layer_int = get_layer_integer(current_layer)
result.append(layer_int)
current_layer = [[7, 7]] # Reset to idle gates
continue

gate_name = get_gate_name(instruction.operation)

if gate_name == "cz" or gate_name == "idle_2q":
# If we have a CZ or idle_2q gate, it's a two-qubit operation
if current_layer != [[7, 7]]:
raise ValueError(f"{gate_name} gate found with single qubit gates in the layer")
else:
current_layer = [gate_name]
elif gate_name in SINGLE_QUBIT_GATE_MAP:
# Single-qubit gate
qubit_index = instruction.qubits[0]._index
current_layer[0][qubit_index] = SINGLE_QUBIT_GATE_MAP[gate_name]
else:
raise ValueError(f"Unsupported gate: {gate_name}")

# Process any remaining gates after the last barrier
if current_layer != [[7, 7]]:
layer_int = get_layer_integer(current_layer)
result.append(layer_int)

return result


def layerize_quantum_circuit(qc: QuantumCircuit) -> QuantumCircuit:

dag = circuit_to_dag(qc)
layered_qc = QuantumCircuit(qc.num_qubits)
for layer in dag.layers():
layer_as_circuit = dag_to_circuit(layer["graph"])
layer_as_circuit.barrier()
layered_qc = layered_qc.compose(layer_as_circuit)

return layered_qc


def collect_gates_as_2_qubit_layers(qc: QuantumCircuit) -> list[str]:
"""
Iterates over a qiskit quantum circuit and collects the gates as strings.
The circuit is separated by barriers. Between each barrier, the function collects the gates as strings.
The name of the gate as string should be like <name>_<qubit>. Between gates place a dash.

Args:
qc (QuantumCircuit): The quantum circuit to iterate over.

Returns:
list[str]: A list of strings representing the gates in the circuit.
"""
gate_strings = []
current_gates = []

for instruction in qc:
if instruction.name == "barrier":
if current_gates:
gate_strings.append("-".join(current_gates))
current_gates = []
else:
for qubit in instruction.qubits:
gate_str = f"{instruction.name}_{qubit.index}"
current_gates.append(gate_str)

if current_gates:
gate_strings.append("-".join(current_gates))

return gate_strings
Loading
Loading