From d5d4498747fbc36b2db67cc362f4851ca279bd1d Mon Sep 17 00:00:00 2001 From: JulienCalistoTD <164006610+JulienCalistoTD@users.noreply.github.com> Date: Wed, 6 May 2026 17:15:37 +0200 Subject: [PATCH 1/4] feat: enhance IBM execution to support multiple jobs and diagonal observables --- mpqp/execution/providers/ibm.py | 210 ++++++++++++++++++++++++++++++-- mpqp/execution/runner.py | 114 +++++++++++++++-- 2 files changed, 302 insertions(+), 22 deletions(-) diff --git a/mpqp/execution/providers/ibm.py b/mpqp/execution/providers/ibm.py index e355c875..f1239a8a 100644 --- a/mpqp/execution/providers/ibm.py +++ b/mpqp/execution/providers/ibm.py @@ -3,7 +3,7 @@ import math import warnings from copy import deepcopy -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, overload import numpy as np @@ -19,7 +19,7 @@ ) from mpqp.execution.devices import AZUREDevice, IBMDevice from mpqp.execution.job import Job, JobStatus, JobType -from mpqp.execution.result import Result, Sample, StateVector +from mpqp.execution.result import Result, Sample, StateVector, BatchResult from mpqp.noise import DimensionalNoiseModel from mpqp.tools.errors import ( DeviceJobIncompatibleError, @@ -31,7 +31,6 @@ from qiskit import QuantumCircuit from qiskit.primitives import ( EstimatorResult, - PrimitiveResult, PubResult, SamplerPubResult, ) @@ -43,8 +42,13 @@ from mpqp.execution.simulated_devices import StaticIBMSimulatedDevice +@overload +def run_ibm(jobs: Job) -> Result: ... -def run_ibm(job: Job) -> Result: +@overload +def run_ibm(jobs: list[Job]) -> BatchResult: ... + +def run_ibm(jobs: Job | list[Job]) -> Result | BatchResult: """Executes the job on the right IBM Q device precised in the job in parameter. @@ -58,7 +62,21 @@ def run_ibm(job: Job) -> Result: This function is not meant to be used directly, please use :func:`~mpqp.execution.runner.run` instead. """ - return run_aer(job) if not job.device.is_remote() else run_remote_ibm(job) + if isinstance(jobs, list): + obs_jobs = [] + results: list[Result] = [] + for job in jobs: + if job.job_type == JobType.OBSERVABLE: + obs_jobs.append(job) + else: + results.append(run_aer(job) if not job.device.is_remote() else run_remote_ibm(job)) + if len(obs_jobs) != 0: + results.extend(run_aer_multiple_obs(obs_jobs)) + + return BatchResult(results) + + else: + return run_aer(jobs) if not jobs.device.is_remote() else run_remote_ibm(jobs) def compute_expectation_value( @@ -144,7 +162,7 @@ def compute_expectation_value( if TYPE_CHECKING: assert isinstance(job.device, (IBMDevice, StaticIBMSimulatedDevice)) - return extract_result(estimator_result, job, job.device) + return extract_result(estimator_result[0], job, job.device) def check_job_compatibility(job: Job): @@ -486,6 +504,105 @@ def run_aer(job: Job): job.status = JobStatus.DONE return result +def run_aer_multiple_obs(jobs: list[Job]): + from qiskit.primitives.containers import EstimatorPubLike + from qiskit.quantum_info import SparsePauliOp + from mpqp.execution.simulated_devices import StaticIBMSimulatedDevice + from qiskit_aer import AerSimulator + + pubs: list[EstimatorPubLike] = [] + job = jobs[0] # TODO: work only if same job + if isinstance(job.device, StaticIBMSimulatedDevice): + if len(job.circuit.noises) != 0: + warnings.warn( + "NoiseModel are ignored when running the circuit on a " + "SimulatedDevice" + ) + # 3M-TODO: handle case when we put NoiseModel + IBMSimulatedDevice + # (grab qiskit NoiseModel from AerSimulator generated below, and add + # to it directly) + backend_sim = job.device.to_noisy_simulator() + elif len(job.circuit.noises) != 0: + raise NotImplemented # TODO + else: + backend_sim = AerSimulator(method=job.device.value) + + if not isinstance(job.measure, ExpectationMeasure): + raise ValueError( + "Cannot compute expectation value if measure used in job is not of " + "type ExpectationMeasure" + ) + + if isinstance(job.device, StaticIBMSimulatedDevice) or job.measure.shots != 0: + from qiskit_ibm_runtime import EstimatorV2 as Runtime_Estimator + + backend = ( + job.device.value() + if isinstance(job.device, StaticIBMSimulatedDevice) + else backend_sim + ) + + options = {"default_shots": job.measure.shots} + estimator = Runtime_Estimator(mode=backend, options=options) + else: + from qiskit_aer.primitives import EstimatorV2 as Estimator + + backend_sim.set_options(shots=job.measure.shots) + options = { + "backend_options": backend_sim.options, + } + estimator = Estimator(options=options) + + for job in jobs: + check_job_compatibility(job) + + from qiskit import QuantumCircuit + from qiskit_aer import AerSimulator + + + if TYPE_CHECKING: + assert isinstance(job.device, (IBMDevice, StaticIBMSimulatedDevice)) + + if job.circuit.transpiled_circuit is None: + qiskit_circuit = job.circuit.to_other_device( + job.device, backend_sim=backend_sim + ) + else: + qiskit_circuit = job.circuit.transpiled_circuit + if TYPE_CHECKING: + assert isinstance(qiskit_circuit, QuantumCircuit) + + if job.job_type == JobType.OBSERVABLE: + + + if not isinstance(job.measure, ExpectationMeasure): + raise ValueError( + "Cannot compute expectation value if measure used in job is not of " + "type ExpectationMeasure" + ) + + qiskit_observables: list[SparsePauliOp] = [] + for obs in job.measure.observables: + if obs.pre_transpiled is None: + translated = obs.to_other_language(Language.QISKIT) + else: + translated = obs.pre_transpiled + if TYPE_CHECKING: + assert isinstance(translated, SparsePauliOp) + qiskit_observables.append(translated) + + qiskit_observables = [ + obs.apply_layout(qiskit_circuit.layout) for obs in qiskit_observables + ] + + pubs.append((qiskit_circuit, qiskit_observables)) + else: + raise ValueError(f"Job type {job.job_type} not handled.") + + + job_expectation = estimator.run(pubs) + estimator_result = job_expectation.result() + return [extract_result(result, job, job.device) for job, result in zip(jobs, estimator_result._pub_results)] def submit_remote_ibm(job: Job) -> tuple[str, "RuntimeJobV2"]: """Submits the job on the remote IBM device (quantum computer or simulator). @@ -568,6 +685,77 @@ def submit_remote_ibm(job: Job) -> tuple[str, "RuntimeJobV2"]: return job.id, ibm_job +def submit_remote_ibm_pubs(jobs: list[Job]): + from qiskit import QuantumCircuit + from qiskit_ibm_runtime import EstimatorV2 as Runtime_Estimator + from qiskit_ibm_runtime import Session + from qiskit.primitives.containers import EstimatorPubLike + + pubs: list[EstimatorPubLike] = [] + for job in jobs: + meas = job.measure + + check_job_compatibility(job) + + if TYPE_CHECKING: + assert isinstance(job.device, IBMDevice) + + if job.circuit.transpiled_circuit is None: + qiskit_circ = job.circuit.to_other_device(job.device) + else: + qiskit_circ = job.circuit.transpiled_circuit + + if TYPE_CHECKING: + assert isinstance(qiskit_circ, QuantumCircuit) + + if job.job_type == JobType.OBSERVABLE: + if TYPE_CHECKING: + assert isinstance(meas, ExpectationMeasure) + + qiskit_observables = [ + ( + obs.to_other_language(Language.QISKIT) + if obs.pre_transpiled is None + else obs.pre_transpiled + ) + for obs in meas.observables + ] + if TYPE_CHECKING: + assert all(isinstance(obs, SparsePauliOp) for obs in qiskit_observables) + + qiskit_observables = [ + obs.apply_layout(qiskit_circ.layout) for obs in qiskit_observables + ] + + pubs.append((qiskit_circ, qiskit_observables)) + + else: + raise NotImplementedError( + f"{job.job_type} not handled by remote remote IBM devices." + ) + + backend = get_backend(job.device) + job.device = IBMDevice(backend.name) + + session = Session(backend=backend) + + estimator = Runtime_Estimator(mode=session) + + # We have to disable all the twirling options and set manually the number of circuits and shots per circuits + twirling = getattr(estimator.options, "twirling", None) + if twirling is not None: + twirling.enable_gates = False + twirling.enable_measure = False + twirling.num_randomizations = 1 + twirling.shots_per_randomization = meas.shots + + setattr(estimator.options, "default_shots", meas.shots) + + ibm_job = estimator.run(pubs) + + job.id = ibm_job.job_id() + + return job.id, ibm_job def run_remote_ibm(job: Job) -> Result: """Submits the job on the right IBM remote device, precised in the job in @@ -592,7 +780,7 @@ def run_remote_ibm(job: Job) -> Result: def extract_result( - result: "QiskitResult | EstimatorResult | PrimitiveResult[PubResult | SamplerPubResult]", + result: "QiskitResult | EstimatorResult | PubResult | SamplerPubResult", job: Optional[Job], device: "IBMDevice | StaticIBMSimulatedDevice | AZUREDevice", ) -> Result: @@ -609,14 +797,14 @@ def extract_result( Returns: The ``qiskit`` result converted to our format. """ - from qiskit.primitives import EstimatorResult, PrimitiveResult from qiskit.result import Result as QiskitResult + from qiskit.primitives import PubResult, SamplerPubResult, EstimatorResult # If this is a PubResult from primitives V2 - if isinstance(result, PrimitiveResult): + if isinstance(result, (PubResult | SamplerPubResult)): # res_data is a DataBin, which means all typechecking is out of the # windows for this specific object - res_data = result[0].data + res_data = result.data if hasattr(res_data, "evs"): if job is None: @@ -630,7 +818,7 @@ def extract_result( shots = ( job.measure.shots if job.device.is_simulator() and job.measure is not None - else result[0].metadata["shots"] + else result.metadata["shots"] ) # If only one result, we directly return the expectation value diff --git a/mpqp/execution/runner.py b/mpqp/execution/runner.py index abc2bb3a..062a03f3 100644 --- a/mpqp/execution/runner.py +++ b/mpqp/execution/runner.py @@ -178,6 +178,14 @@ def _run_diagonal_observables( adapted_circuit.add(BasisMeasure(exp_measure.targets, shots=exp_measure.shots)) result = _run_single(adapted_circuit, device, values, False) + return _compute_result_diagonal_observables(result, exp_measure, observable_job) + +def _compute_result_diagonal_observables( + result: Result, + exp_measure: ExpectationMeasure, + observable_job: Job, +) -> Result: + probas = result.probabilities error = 0 if exp_measure.shots == 0 else None @@ -204,7 +212,6 @@ def _run_diagonal_observables( exp_measure.shots, ) - def _run_single( circuit: QCircuit, device: AvailableDevice, @@ -284,6 +291,80 @@ def _run_single( else: raise NotImplementedError(f"Device {device} not handled") +def _run_multiple( + circuits: list[tuple[QCircuit, "Optional[dict[Expr | str, Complex]]"]], + device: AvailableDevice, + display_breakpoints: bool = True, +) -> BatchResult: + """ + """ + from mpqp.execution.simulated_devices import ( + SimulatedDevice, + StaticIBMSimulatedDevice, + ) + + if display_breakpoints: + for (circuit, _) in circuits: + for k in range(len(circuit.breakpoints)): + display_kth_breakpoint(circuit, k, device) + + jobs = [] + run_diagonal_observables: dict[int, tuple[ExpectationMeasure, Job]] = {} + for i, (circuit, values) in enumerate(circuits): + job = generate_job(circuit, device, values) + job.status = JobStatus.INIT + + + if len(circuit.noises) != 0: + if not device.is_noisy_simulator(): + raise DeviceJobIncompatibleError( + f"Device {device} cannot simulate circuits containing NoiseModels." + ) + elif not isinstance( + device, (ATOSDevice, AWSDevice, IBMDevice, GOOGLEDevice, SimulatedDevice) + ): + raise NotImplementedError(f"Noisy simulations not supported on {device}.") + + + if len(circuit.measurements) == 1: + measure = circuit.measurements[0] + if isinstance(measure, ExpectationMeasure): + if measure.optim_diagonal and measure.only_diagonal_observables(): + adapted_circuit = circuit.without_measurements(deep_copy=False) + adapted_circuit.add(BasisMeasure(measure.targets, shots=measure.shots)) + + job_obs = generate_job(adapted_circuit, device, values) + job_obs.status = JobStatus.INIT + jobs.append(job_obs) + run_diagonal_observables[i] = (measure, job) + continue + + jobs.append(job) + + + + if isinstance(device, (IBMDevice, StaticIBMSimulatedDevice)): + result = run_ibm(jobs) + elif isinstance(device, ATOSDevice): + raise NotImplementedError(f"Device {device} not handled") + result = run_atos(jobs) # TODO + elif isinstance(device, AWSDevice): + raise NotImplementedError(f"Device {device} not handled") + result = run_braket(jobs) # TODO + elif isinstance(device, GOOGLEDevice): + raise NotImplementedError(f"Device {device} not handled") + result = run_google(jobs) # TODO + elif isinstance(device, AZUREDevice): + raise NotImplementedError(f"Device {device} not handled") + result = run_azure(jobs) # TODO + else: + raise NotImplementedError(f"Device {device} not handled") + + for i, (exp_measure, job) in run_diagonal_observables.items(): + result.results[i] = _compute_result_diagonal_observables(result[i], exp_measure, job) + + return result + @overload def run( @@ -313,7 +394,7 @@ def run( def run( - circuit: OneOrMany[QCircuit], + circuit: QCircuit | list[QCircuit | tuple[QCircuit, "Optional[dict[Expr | str, Complex]]"]], device: OneOrMany[AvailableDevice], values: "Optional[dict[Expr | str, Complex]]" = None, display_breakpoints: bool = True, @@ -394,18 +475,29 @@ def namer(circ: QCircuit, i: int): return circ if isinstance(circuit, Iterable) or isinstance(device, Iterable): - return BatchResult( - [ - _run_single( - namer(circ, i + 1), + results: list[Result] = [] + for dev in flatten(device): + + if isinstance(circuit, QCircuit): + results.append(_run_single( + namer(circuit, 1), dev, values, display_breakpoints, - ) - for i, circ in enumerate(flatten(circuit)) - for dev in flatten(device) - ] - ) + )) + else: + circ_list = [] + for i, circ in enumerate(circuit): + if isinstance(circ, QCircuit): + print(circ, i) + circ_list.append((namer(circ, i + 1), values)) + else: + (circ, values) = circ + print(circ, i) + circ_list.append((namer(circ, i + 1), values)) + results.extend(_run_multiple(circ_list, dev, display_breakpoints).results) + + return BatchResult(results) else: return _run_single(circuit, device, values, display_breakpoints) From eabc2214bd1eba65bfbe15c1b72e3700ae427828 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 6 May 2026 15:16:09 +0000 Subject: [PATCH 2/4] chore: Files formated --- mpqp/execution/providers/ibm.py | 45 +++++++++++++--------- mpqp/execution/runner.py | 66 +++++++++++++++++++-------------- 2 files changed, 66 insertions(+), 45 deletions(-) diff --git a/mpqp/execution/providers/ibm.py b/mpqp/execution/providers/ibm.py index f1239a8a..ed5637e9 100644 --- a/mpqp/execution/providers/ibm.py +++ b/mpqp/execution/providers/ibm.py @@ -42,12 +42,15 @@ from mpqp.execution.simulated_devices import StaticIBMSimulatedDevice + @overload def run_ibm(jobs: Job) -> Result: ... + @overload def run_ibm(jobs: list[Job]) -> BatchResult: ... + def run_ibm(jobs: Job | list[Job]) -> Result | BatchResult: """Executes the job on the right IBM Q device precised in the job in parameter. @@ -69,13 +72,15 @@ def run_ibm(jobs: Job | list[Job]) -> Result | BatchResult: if job.job_type == JobType.OBSERVABLE: obs_jobs.append(job) else: - results.append(run_aer(job) if not job.device.is_remote() else run_remote_ibm(job)) + results.append( + run_aer(job) if not job.device.is_remote() else run_remote_ibm(job) + ) if len(obs_jobs) != 0: results.extend(run_aer_multiple_obs(obs_jobs)) return BatchResult(results) - else: + else: return run_aer(jobs) if not jobs.device.is_remote() else run_remote_ibm(jobs) @@ -504,6 +509,7 @@ def run_aer(job: Job): job.status = JobStatus.DONE return result + def run_aer_multiple_obs(jobs: list[Job]): from qiskit.primitives.containers import EstimatorPubLike from qiskit.quantum_info import SparsePauliOp @@ -511,7 +517,7 @@ def run_aer_multiple_obs(jobs: list[Job]): from qiskit_aer import AerSimulator pubs: list[EstimatorPubLike] = [] - job = jobs[0] # TODO: work only if same job + job = jobs[0] # TODO: work only if same job if isinstance(job.device, StaticIBMSimulatedDevice): if len(job.circuit.noises) != 0: warnings.warn( @@ -523,7 +529,7 @@ def run_aer_multiple_obs(jobs: list[Job]): # to it directly) backend_sim = job.device.to_noisy_simulator() elif len(job.circuit.noises) != 0: - raise NotImplemented # TODO + raise NotImplemented # TODO else: backend_sim = AerSimulator(method=job.device.value) @@ -539,9 +545,9 @@ def run_aer_multiple_obs(jobs: list[Job]): backend = ( job.device.value() if isinstance(job.device, StaticIBMSimulatedDevice) - else backend_sim + else backend_sim ) - + options = {"default_shots": job.measure.shots} estimator = Runtime_Estimator(mode=backend, options=options) else: @@ -559,7 +565,6 @@ def run_aer_multiple_obs(jobs: list[Job]): from qiskit import QuantumCircuit from qiskit_aer import AerSimulator - if TYPE_CHECKING: assert isinstance(job.device, (IBMDevice, StaticIBMSimulatedDevice)) @@ -573,7 +578,6 @@ def run_aer_multiple_obs(jobs: list[Job]): assert isinstance(qiskit_circuit, QuantumCircuit) if job.job_type == JobType.OBSERVABLE: - if not isinstance(job.measure, ExpectationMeasure): raise ValueError( @@ -590,19 +594,22 @@ def run_aer_multiple_obs(jobs: list[Job]): if TYPE_CHECKING: assert isinstance(translated, SparsePauliOp) qiskit_observables.append(translated) - + qiskit_observables = [ obs.apply_layout(qiskit_circuit.layout) for obs in qiskit_observables ] - + pubs.append((qiskit_circuit, qiskit_observables)) else: raise ValueError(f"Job type {job.job_type} not handled.") - - + job_expectation = estimator.run(pubs) estimator_result = job_expectation.result() - return [extract_result(result, job, job.device) for job, result in zip(jobs, estimator_result._pub_results)] + return [ + extract_result(result, job, job.device) + for job, result in zip(jobs, estimator_result._pub_results) + ] + def submit_remote_ibm(job: Job) -> tuple[str, "RuntimeJobV2"]: """Submits the job on the remote IBM device (quantum computer or simulator). @@ -685,6 +692,7 @@ def submit_remote_ibm(job: Job) -> tuple[str, "RuntimeJobV2"]: return job.id, ibm_job + def submit_remote_ibm_pubs(jobs: list[Job]): from qiskit import QuantumCircuit from qiskit_ibm_runtime import EstimatorV2 as Runtime_Estimator @@ -699,7 +707,7 @@ def submit_remote_ibm_pubs(jobs: list[Job]): if TYPE_CHECKING: assert isinstance(job.device, IBMDevice) - + if job.circuit.transpiled_circuit is None: qiskit_circ = job.circuit.to_other_device(job.device) else: @@ -711,7 +719,7 @@ def submit_remote_ibm_pubs(jobs: list[Job]): if job.job_type == JobType.OBSERVABLE: if TYPE_CHECKING: assert isinstance(meas, ExpectationMeasure) - + qiskit_observables = [ ( obs.to_other_language(Language.QISKIT) @@ -733,10 +741,10 @@ def submit_remote_ibm_pubs(jobs: list[Job]): raise NotImplementedError( f"{job.job_type} not handled by remote remote IBM devices." ) - + backend = get_backend(job.device) job.device = IBMDevice(backend.name) - + session = Session(backend=backend) estimator = Runtime_Estimator(mode=session) @@ -750,13 +758,14 @@ def submit_remote_ibm_pubs(jobs: list[Job]): twirling.shots_per_randomization = meas.shots setattr(estimator.options, "default_shots", meas.shots) - + ibm_job = estimator.run(pubs) job.id = ibm_job.job_id() return job.id, ibm_job + def run_remote_ibm(job: Job) -> Result: """Submits the job on the right IBM remote device, precised in the job in parameter, and waits until the job is completed. diff --git a/mpqp/execution/runner.py b/mpqp/execution/runner.py index 062a03f3..bbc2f204 100644 --- a/mpqp/execution/runner.py +++ b/mpqp/execution/runner.py @@ -180,6 +180,7 @@ def _run_diagonal_observables( result = _run_single(adapted_circuit, device, values, False) return _compute_result_diagonal_observables(result, exp_measure, observable_job) + def _compute_result_diagonal_observables( result: Result, exp_measure: ExpectationMeasure, @@ -212,6 +213,7 @@ def _compute_result_diagonal_observables( exp_measure.shots, ) + def _run_single( circuit: QCircuit, device: AvailableDevice, @@ -291,20 +293,20 @@ def _run_single( else: raise NotImplementedError(f"Device {device} not handled") + def _run_multiple( circuits: list[tuple[QCircuit, "Optional[dict[Expr | str, Complex]]"]], device: AvailableDevice, display_breakpoints: bool = True, ) -> BatchResult: - """ - """ + """ """ from mpqp.execution.simulated_devices import ( SimulatedDevice, StaticIBMSimulatedDevice, ) if display_breakpoints: - for (circuit, _) in circuits: + for circuit, _ in circuits: for k in range(len(circuit.breakpoints)): display_kth_breakpoint(circuit, k, device) @@ -313,7 +315,6 @@ def _run_multiple( for i, (circuit, values) in enumerate(circuits): job = generate_job(circuit, device, values) job.status = JobStatus.INIT - if len(circuit.noises) != 0: if not device.is_noisy_simulator(): @@ -321,48 +322,52 @@ def _run_multiple( f"Device {device} cannot simulate circuits containing NoiseModels." ) elif not isinstance( - device, (ATOSDevice, AWSDevice, IBMDevice, GOOGLEDevice, SimulatedDevice) + device, + (ATOSDevice, AWSDevice, IBMDevice, GOOGLEDevice, SimulatedDevice), ): - raise NotImplementedError(f"Noisy simulations not supported on {device}.") - + raise NotImplementedError( + f"Noisy simulations not supported on {device}." + ) if len(circuit.measurements) == 1: measure = circuit.measurements[0] if isinstance(measure, ExpectationMeasure): if measure.optim_diagonal and measure.only_diagonal_observables(): adapted_circuit = circuit.without_measurements(deep_copy=False) - adapted_circuit.add(BasisMeasure(measure.targets, shots=measure.shots)) + adapted_circuit.add( + BasisMeasure(measure.targets, shots=measure.shots) + ) job_obs = generate_job(adapted_circuit, device, values) job_obs.status = JobStatus.INIT jobs.append(job_obs) run_diagonal_observables[i] = (measure, job) continue - - jobs.append(job) - + jobs.append(job) if isinstance(device, (IBMDevice, StaticIBMSimulatedDevice)): result = run_ibm(jobs) elif isinstance(device, ATOSDevice): raise NotImplementedError(f"Device {device} not handled") - result = run_atos(jobs) # TODO + result = run_atos(jobs) # TODO elif isinstance(device, AWSDevice): raise NotImplementedError(f"Device {device} not handled") - result = run_braket(jobs) # TODO + result = run_braket(jobs) # TODO elif isinstance(device, GOOGLEDevice): raise NotImplementedError(f"Device {device} not handled") - result = run_google(jobs) # TODO + result = run_google(jobs) # TODO elif isinstance(device, AZUREDevice): raise NotImplementedError(f"Device {device} not handled") - result = run_azure(jobs) # TODO + result = run_azure(jobs) # TODO else: raise NotImplementedError(f"Device {device} not handled") - + for i, (exp_measure, job) in run_diagonal_observables.items(): - result.results[i] = _compute_result_diagonal_observables(result[i], exp_measure, job) - + result.results[i] = _compute_result_diagonal_observables( + result[i], exp_measure, job + ) + return result @@ -394,7 +399,10 @@ def run( def run( - circuit: QCircuit | list[QCircuit | tuple[QCircuit, "Optional[dict[Expr | str, Complex]]"]], + circuit: ( + QCircuit + | list[QCircuit | tuple[QCircuit, "Optional[dict[Expr | str, Complex]]"]] + ), device: OneOrMany[AvailableDevice], values: "Optional[dict[Expr | str, Complex]]" = None, display_breakpoints: bool = True, @@ -479,12 +487,14 @@ def namer(circ: QCircuit, i: int): for dev in flatten(device): if isinstance(circuit, QCircuit): - results.append(_run_single( - namer(circuit, 1), - dev, - values, - display_breakpoints, - )) + results.append( + _run_single( + namer(circuit, 1), + dev, + values, + display_breakpoints, + ) + ) else: circ_list = [] for i, circ in enumerate(circuit): @@ -492,10 +502,12 @@ def namer(circ: QCircuit, i: int): print(circ, i) circ_list.append((namer(circ, i + 1), values)) else: - (circ, values) = circ + circ, values = circ print(circ, i) circ_list.append((namer(circ, i + 1), values)) - results.extend(_run_multiple(circ_list, dev, display_breakpoints).results) + results.extend( + _run_multiple(circ_list, dev, display_breakpoints).results + ) return BatchResult(results) else: From 84bac160cfe3763e06a5a50b94cc1ad8d983a48f Mon Sep 17 00:00:00 2001 From: JulienCalistoTD <164006610+JulienCalistoTD@users.noreply.github.com> Date: Wed, 3 Jun 2026 17:30:53 +0200 Subject: [PATCH 3/4] feat: introduce CircuitBinding class to enhance circuit management and execution Co-authored-by: Copilot --- mpqp/core/circuit.py | 29 +++++++++++++++++++++++++++++ mpqp/execution/runner.py | 38 +++++++++++++++----------------------- 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/mpqp/core/circuit.py b/mpqp/core/circuit.py index a681b988..c359bd7e 100644 --- a/mpqp/core/circuit.py +++ b/mpqp/core/circuit.py @@ -34,6 +34,7 @@ from __future__ import annotations from copy import deepcopy +from enum import Enum, auto from numbers import Complex from typing import TYPE_CHECKING, Literal, Optional, Sequence, Type, Union, overload from warnings import warn @@ -2187,3 +2188,31 @@ def variables(self) -> set[Basic]: if isinstance(param, Expr): params.update(param.free_symbols) return params + +class BindingMode(Enum): + PRODUCT = auto() + ZIP = auto() + +class CircuitBinding(): + def __init__(self, + circuit: OneOrMany[QCircuit | CircuitBinding], + value: Optional[OneOrMany[dict[Expr | str, Complex]]] = None, + expectation_measure: Optional[OneOrMany[ExpectationMeasure]] = None, + mode: BindingMode = BindingMode.PRODUCT) -> None: + self.circuit = circuit + if isinstance(circuit, QCircuit): + measures = circuit.measurements + if len(measures) != 0: + if expectation_measure is not None: + raise ValueError("your circuit already contains measurements, you cannot have multiple measurements") + elif isinstance(circuit, list): + for circ in circuit: + if isinstance(circ, QCircuit): + measures = circ.measurements + if len(measures) != 0: + if expectation_measure is not None: + raise ValueError("your circuit already contains measurements, you cannot have multiple measurements") + + self.value = value + self.expectation_measure = expectation_measure + self.mode = mode diff --git a/mpqp/execution/runner.py b/mpqp/execution/runner.py index bbc2f204..dfa66eff 100644 --- a/mpqp/execution/runner.py +++ b/mpqp/execution/runner.py @@ -24,7 +24,7 @@ import numpy as np -from mpqp.core.circuit import QCircuit +from mpqp.core.circuit import CircuitBinding, QCircuit from mpqp.core.instruction.breakpoint import Breakpoint from mpqp.core.instruction.measurement.basis_measure import BasisMeasure from mpqp.core.instruction.measurement.expectation_value import ( @@ -295,7 +295,7 @@ def _run_single( def _run_multiple( - circuits: list[tuple[QCircuit, "Optional[dict[Expr | str, Complex]]"]], + circuits: CircuitBinding, device: AvailableDevice, display_breakpoints: bool = True, ) -> BatchResult: @@ -373,7 +373,7 @@ def _run_multiple( @overload def run( - circuit: OneOrMany[QCircuit], + circuit: CircuitBinding | QCircuit, device: Sequence[AvailableDevice], values: "Optional[dict[Expr | str, Complex]]" = None, display_breakpoints: bool = True, @@ -382,7 +382,7 @@ def run( @overload def run( - circuit: Sequence[QCircuit], + circuit: CircuitBinding | QCircuit, device: OneOrMany[AvailableDevice], values: "Optional[dict[Expr | str, Complex]]" = None, display_breakpoints: bool = True, @@ -399,10 +399,7 @@ def run( def run( - circuit: ( - QCircuit - | list[QCircuit | tuple[QCircuit, "Optional[dict[Expr | str, Complex]]"]] - ), + circuit: QCircuit | CircuitBinding, device: OneOrMany[AvailableDevice], values: "Optional[dict[Expr | str, Complex]]" = None, display_breakpoints: bool = True, @@ -482,7 +479,7 @@ def namer(circ: QCircuit, i: int): circ.label = f"circuit {i}" if circ.label is None else circ.label return circ - if isinstance(circuit, Iterable) or isinstance(device, Iterable): + if isinstance(device, Iterable): results: list[Result] = [] for dev in flatten(device): @@ -496,22 +493,17 @@ def namer(circ: QCircuit, i: int): ) ) else: - circ_list = [] - for i, circ in enumerate(circuit): - if isinstance(circ, QCircuit): - print(circ, i) - circ_list.append((namer(circ, i + 1), values)) - else: - circ, values = circ - print(circ, i) - circ_list.append((namer(circ, i + 1), values)) - results.extend( - _run_multiple(circ_list, dev, display_breakpoints).results - ) - + if values is not None: + raise ValueError("values must be specified in CircuitBinding") + return _run_multiple(circuit, dev, display_breakpoints) return BatchResult(results) else: - return _run_single(circuit, device, values, display_breakpoints) + if isinstance(circuit, QCircuit): + return _run_single(circuit, device, values, display_breakpoints) + else: + if values is not None: + raise ValueError("values must be specified in CircuitBinding") + return _run_multiple(circuit, device, display_breakpoints) def submit( From de5e0fd020780c5379976689530c928d5af20041 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 3 Jun 2026 15:31:19 +0000 Subject: [PATCH 4/4] chore: Files formated --- mpqp/core/circuit.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/mpqp/core/circuit.py b/mpqp/core/circuit.py index c359bd7e..0b1a208f 100644 --- a/mpqp/core/circuit.py +++ b/mpqp/core/circuit.py @@ -2189,30 +2189,38 @@ def variables(self) -> set[Basic]: params.update(param.free_symbols) return params + class BindingMode(Enum): PRODUCT = auto() ZIP = auto() -class CircuitBinding(): - def __init__(self, - circuit: OneOrMany[QCircuit | CircuitBinding], - value: Optional[OneOrMany[dict[Expr | str, Complex]]] = None, - expectation_measure: Optional[OneOrMany[ExpectationMeasure]] = None, - mode: BindingMode = BindingMode.PRODUCT) -> None: + +class CircuitBinding: + def __init__( + self, + circuit: OneOrMany[QCircuit | CircuitBinding], + value: Optional[OneOrMany[dict[Expr | str, Complex]]] = None, + expectation_measure: Optional[OneOrMany[ExpectationMeasure]] = None, + mode: BindingMode = BindingMode.PRODUCT, + ) -> None: self.circuit = circuit if isinstance(circuit, QCircuit): measures = circuit.measurements if len(measures) != 0: if expectation_measure is not None: - raise ValueError("your circuit already contains measurements, you cannot have multiple measurements") + raise ValueError( + "your circuit already contains measurements, you cannot have multiple measurements" + ) elif isinstance(circuit, list): for circ in circuit: if isinstance(circ, QCircuit): measures = circ.measurements if len(measures) != 0: if expectation_measure is not None: - raise ValueError("your circuit already contains measurements, you cannot have multiple measurements") - + raise ValueError( + "your circuit already contains measurements, you cannot have multiple measurements" + ) + self.value = value self.expectation_measure = expectation_measure self.mode = mode