Skip to content
2 changes: 0 additions & 2 deletions mpqp/core/circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,6 @@ def _update_targets_components(self, component: Instruction | NoiseModel):
if isinstance(component, Barrier):
component.size = self.nb_qubits
component.targets = list(range(self.nb_qubits))
elif isinstance(component, ExpectationMeasure):
component._check_targets_order() # pyright: ignore[reportPrivateUsage]
elif isinstance(component, DimensionalNoiseModel):
component.check_dimension()
elif isinstance(component, BasisMeasure):
Expand Down
62 changes: 3 additions & 59 deletions mpqp/core/instruction/measurement/expectation_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ class to define your observable, and a :class:`ExpectationMeasure` to perform
import copy
from numbers import Real
from typing import TYPE_CHECKING, Literal, Optional, Union, overload
from typing_extensions import Never
from warnings import warn

import numpy as np
import numpy.typing as npt
from typing_extensions import Never

from mpqp.core.instruction.gates.native_gates import SWAP
from mpqp.core.instruction.measurement.measure import Measure
from mpqp.core.instruction.measurement.pauli_string import (
CommutingTypes,
Expand All @@ -24,7 +22,6 @@ class to define your observable, and a :class:`ExpectationMeasure` to perform
)
from mpqp.core.languages import Language
from mpqp.tools.display import one_lined_repr
from mpqp.tools.errors import NumberQubitsError
from mpqp.tools.generics import Matrix
from mpqp.tools.maths import is_diagonal, is_hermitian, is_power_of_two

Expand All @@ -38,8 +35,6 @@ class to define your observable, and a :class:`ExpectationMeasure` to perform
from qiskit.quantum_info import SparsePauliOp
from sympy import Expr

from mpqp.core.instruction.gates.custom_controlled_gate import Gate


class Observable:
"""Class defining an observable, used for evaluating expectation values.
Expand Down Expand Up @@ -337,7 +332,7 @@ def to_other_language(
return QLMObservable(self.nb_qubits, matrix=self.matrix)
elif language == Language.BRAKET:
if self._pauli_string:
from braket.circuits.observables import TensorProduct, Sum
from braket.circuits.observables import Sum, TensorProduct

obs = self.pauli_string.to_other_language(Language.BRAKET)
if isinstance(obs, TensorProduct):
Expand Down Expand Up @@ -469,8 +464,7 @@ def __init__(
self.observables.append(new_obs)

if targets is None:
self.targets = list(range(observable[0].nb_qubits))
self._check_targets_order()
self.targets = list(range(self.observables[0].nb_qubits))

@property
def nb_observables(self) -> int:
Expand All @@ -480,56 +474,6 @@ def nb_observables(self) -> int:
def observables_labels(self) -> list[str]:
return [o.label for o in self.observables if o.label is not None]

def _check_targets_order(self):
"""Ensures target qubits are ordered and contiguous, rearranging them if
necessary (private)."""

if len(self.targets) == 0:
self._pre_measure: list[Gate] = []
return

if self.nb_qubits != self.observables[0].nb_qubits:
raise NumberQubitsError(
f"Target size {self.nb_qubits} doesn't match observable size "
f"{self.observables[0].nb_qubits}."
)

self._pre_measure: list[Gate] = []
"""List of Gates added before the expectation measurement to correctly swap
target qubits when their are not ordered or contiguous."""
targets_is_ordered = all(
[self.targets[i] > self.targets[i - 1] for i in range(1, len(self.targets))]
)
tweaked_tgt = copy.copy(self.targets)
if (
max(tweaked_tgt) - min(tweaked_tgt) + 1 != len(tweaked_tgt)
or not targets_is_ordered
):
warn(
"Non contiguous or non sorted observable target will introduce "
"additional CNOTs."
)

for t_index, target in enumerate(tweaked_tgt): # sort the targets
min_index = tweaked_tgt.index(min(tweaked_tgt[t_index:]))
if t_index != min_index:
self._pre_measure.append(SWAP(target, tweaked_tgt[min_index]))
tweaked_tgt[t_index] = tweaked_tgt[min_index]
tweaked_tgt[min_index] = target
for t_index, target in enumerate(tweaked_tgt): # compact the targets
if t_index == 0:
continue
if target != tweaked_tgt[t_index - 1] + 1:
self._pre_measure.append(SWAP(target, tweaked_tgt[t_index - 1] + 1))
tweaked_tgt[t_index] = tweaked_tgt[t_index - 1] + 1
self.rearranged_targets = tweaked_tgt
"""Adjusted list of target qubits when they are not initially sorted and
contiguous."""

@property
def pre_measure(self) -> list[Gate]:
return self._pre_measure

def get_pauli_grouping(self) -> list[list[PauliStringMonomial]]:
"""Return the grouped monomials of the Pauli string of the observable.
The grouping is done according to the grouping method of the expectation
Expand Down
50 changes: 50 additions & 0 deletions mpqp/core/instruction/measurement/pauli_string.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,56 @@ def is_diagonal(self) -> bool:

return all([all([a == pI or a == pZ for a in m.atoms]) for m in self.monomials])

def rearrange(self, targets: list[int], copy: bool = True) -> "PauliString":
"""
This function aims at reorderring a pauli string's monomials according to unordered targets.

Note: The targets must be contiguous, otherwise the algorithm won't work.
Args:
ps: The PauliString to reorder.
targets: The list of unordered targets.
copy: If set at True will deepcopy the initial string ps.

Examples:
>>> ps = pX @ pI
>>> print(ps.rearrange([1,0]))
pI@pX
>>> ps2 = pX @ pI + pI @ pX
>>> print(ps2.rearrange([1,0]))
pI@pX + pX@pI
"""
from copy import deepcopy

if copy:
pauli = deepcopy(self)
else:
pauli = self

l = len(targets)
targets = deepcopy(targets)
rearranged = sorted(targets)
for index in range(l):
if targets[index] == index:
continue
shuffled_index = rearranged.index(targets[index])
for monom in pauli.monomials:
atoms = monom.atoms
atoms[shuffled_index], atoms[rearranged[index]] = (
atoms[rearranged[index]],
atoms[shuffled_index],
)
rearranged[index], rearranged[targets[index]] = (
rearranged[targets[index]],
rearranged[index],
)

i = targets.index(index)
targets[i], targets[index] = (
targets[index],
targets[i],
)
return pauli


class PauliStringMonomial(PauliString):
"""Represents a monomial in a Pauli string, consisting of a coefficient and
Expand Down
13 changes: 8 additions & 5 deletions mpqp/execution/providers/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,14 @@ def run_braket_observable(job: Job):

if job.measure.pre_transpiled is None:
grouping = job.measure.get_pauli_grouping()
pre_measure = [
QCircuit(find_qubitwise_rotations(group)) for group in grouping
]
for circuit in pre_measure:
for instr in circuit.instructions:
instr.targets = [job.measure.targets[t] for t in instr.targets]
transpiled_pre_measures = [
QCircuit(find_qubitwise_rotations(group)).to_other_language(
Language.BRAKET
)
for group in grouping
pre_m.to_other_language(Language.BRAKET) for pre_m in pre_measure
]
eigenvalues = [
{monom.name: pauli_monomial_eigenvalues(monom) for monom in group}
Expand Down Expand Up @@ -226,7 +229,7 @@ def run_braket_observable(job: Job):
)
result = local_result.result()
assert isinstance(result, GateModelQuantumTaskResult)
length = 2**job.circuit.nb_qubits
length = 2**job.measure.nb_qubits
sorted_values: list[float] = []
for i in range(length):
binary_state = f"{bin(i)[2:].zfill(len(bin(length))- 3)}"
Expand Down
119 changes: 91 additions & 28 deletions mpqp/execution/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,56 +55,112 @@


def adjust_measure(measure: ExpectationMeasure, circuit: QCircuit):
"""We allow the measure to not span the entire circuit, but providers
"""A measure can be incomplete and not span the entire circuit, but providers
usually do not support this behavior. To make this work, we tweak the measure
this function to match the expected behavior.

In order to do this, we add identity measures on the qubits not targeted by
the measure. In addition to this, some swaps are automatically added so the
the qubits measured are ordered and contiguous (though this is done in
:func:`generate_job`)
In order to do this, we place identity operators on the qubits not targeted by
the measure. If the targets are not ordered, each observable is first reordered so that
its local qubit order matches the sorted traget order. Pauli observables are directly embedded
on their target qubits, while matrix observables are padded with identity matrices
when the targets are ordered and contiguous, and are otherwise embedded through their pauli decomposition.

Args:
measure: The expectation measure, potentially incomplete.
circuit: The circuit to which will be added the potential swaps allowing
the user to get the expectation value of the qubits in an arbitrary
order (this part is not handled by this function).
circuit: The circuit defining the full qubit register.

Returns:
The measure padded with identities before and after.
A measure targeting all circuit qubits, with observables embedded into the full register.
"""
# TODO: use this only for specific provider

if measure.targets == list(range(circuit.nb_qubits)):
return measure

tweaked_observables = []
n_before = measure.rearranged_targets[0]
n_after = circuit.nb_qubits - measure.rearranged_targets[-1] - 1
nb_qubits = circuit.nb_qubits
targets = measure.targets

targets_is_ordered = all(
[targets[i] > targets[i - 1] for i in range(1, len(targets))]
)
if not targets_is_ordered:
ordered_targets = sorted(targets)
contiguous_targets = [targets.index(t) for t in ordered_targets]
for obs in measure.observables:
if (
obs._matrix is None # pyright: ignore[reportPrivateUsage]
or measure.optimize_measurement
): # Order pauli string
from mpqp.tools.maths import rearrange_pauli_string

obs._pauli_string = ( # pyright: ignore[reportPrivateUsage]
rearrange_pauli_string(obs.pauli_string, contiguous_targets)
)
else: # Order the matrix
from mpqp.tools.maths import rearrange_matrix

obs.matrix = rearrange_matrix(obs.matrix, contiguous_targets)

targets_is_contiguous = len(targets) > 0 and (
targets[-1] - targets[0] + 1 == len(sorted(targets))
)

tweaked_observables: list[Observable] = []

for obs in measure.observables:
if obs._pauli_string is not None: # pyright: ignore[reportPrivateUsage]
from mpqp.measures import pI
from mpqp.core.instruction.measurement.pauli_string import (
PauliString,
PauliStringMonomial,
)
from mpqp.measures import pI

if (
obs._pauli_string is None # pyright: ignore[reportPrivateUsage]
and targets_is_contiguous
):
n_before = targets[0]
n_after = nb_qubits - targets[-1] - 1

full_matrix = obs.matrix

pauli = pI(n_before - 1) @ obs.pauli_string @ pI(n_after - 1)
tweaked_observables.append(Observable(pauli))
else:
Id_before = np.eye(2**n_before)
Id_after = np.eye(2**n_after)

if n_before > 0:
full_matrix = np.kron(Id_before, full_matrix)

if n_after > 0:
full_matrix = np.kron(full_matrix, Id_after)

tweaked_observables.append(
Observable(
np.kron(
np.kron(Id_before, obs.matrix), Id_after
) # pyright: ignore[reportArgumentType]
full_matrix, label=obs.label # pyright: ignore[reportArgumentType]
)
)
continue

pauli = obs.pauli_string
embedded = PauliString()

for mono in pauli.monomials:
full_register = [pI] * nb_qubits

for local_idx, target in enumerate(targets):
full_register[target] = mono.atoms[local_idx]

embedded += PauliStringMonomial(mono.coef, full_register)

tweaked_observables.append(Observable(embedded.simplify(), label=obs.label))

tweaked_measure = ExpectationMeasure(
tweaked_observables,
list(range(circuit.nb_qubits)),
measure.shots,
measure.commuting_type,
measure.grouping_method,
label=measure.label,
optimize_measurement=measure.optimize_measurement,
optim_diagonal=measure.optim_diagonal,
)
return tweaked_measure

Expand Down Expand Up @@ -145,14 +201,21 @@ def generate_job(
else:
job = Job(JobType.SAMPLE, circuit, device)
elif isinstance(measurement, ExpectationMeasure):
m = adjust_measure(measurement, circuit)
c = circuit.without_measurements(deep_copy=False)
c.add(m)
job = Job(
JobType.OBSERVABLE,
c,
device,
)
if measurement.optimize_measurement and isinstance(device, AWSDevice):
job = Job(
JobType.OBSERVABLE,
circuit,
device,
)
else:
m = adjust_measure(measurement, circuit)
c = circuit.without_measurements(deep_copy=False)
c.add(m)
job = Job(
JobType.OBSERVABLE,
c,
device,
)
else:
raise NotImplementedError(
f"Measurement type {type(measurement)} not handled"
Expand Down
Loading
Loading