from collections.abc import Collection, Sequence
from itertools import chain
import stim
from ..circuit_blocks.decorators import (
LogicalOperation,
LogOpCallable,
)
from ..detectors import Detectors, get_new_stab_dict_from_layout
from ..layouts.layout import Layout
from ..models import Model
from .circuit_modifications import NOISE_CHANNELS
MEAS_INSTR = [
"M",
"MR",
"MRX",
"MRY",
"MRZ",
"MX",
"MY",
"MZ",
"MXX",
"MYY",
"MZZ",
"MPP",
]
QEC_OP_TYPES = ["qec_round", "to_mid_cycle_circuit", "to_end_cycle_circuit"]
QEC_DETS_OP_TYPES = ["qec_round", "to_end_cycle_circuit"]
QEC_RESET_OP_TYPES = ["qec_round", "to_mid_cycle_circuit"]
GATE_OP_TYPES = ["sq_unitary_gate", "tq_unitary_gate"]
MEAS_OP_TYPES = ["measurement"]
RESET_OP_TYPES = ["qubit_init", "qubit_encoding"]
FAKE_OP_TYPES = ["logical_noise", "pauli_observable"]
TRUE_OP_TYPES = QEC_OP_TYPES + GATE_OP_TYPES + MEAS_OP_TYPES + RESET_OP_TYPES
VALID_OP_TYPES = TRUE_OP_TYPES + FAKE_OP_TYPES
[docs]
def merge_circuits(*circuits: stim.Circuit) -> stim.Circuit:
"""
Returns a circuit in which the given circuits have been merged
following the TICK blocks.
The number of operations between TICKs must be the same for all qubits.
The circuit must not include any measurement because if they get moved,
then the ``rec[-i]`` indexes do not work.
Parameters
----------
*circuits
Circuits to merge.
Returns
-------
merged_circuit
Circuit from merging the given circuits.
"""
if any(not isinstance(c, stim.Circuit) for c in circuits):
raise TypeError("The given circuits are not stim.Circuits.")
if len(set(c.num_ticks for c in circuits)) != 1:
raise ValueError("All the circuits must have the same number of TICKs.")
# split circuits into TICK blocks
num_ticks = circuits[0].num_ticks
blocks = [[stim.Circuit() for _ in range(num_ticks + 1)] for _ in circuits]
for k, circuit in enumerate(circuits):
block_id = 0
for instr in circuit.flattened():
if instr.name in MEAS_INSTR:
raise ValueError("Circuits cannot contain measurements.")
if instr.name == "TICK":
block_id += 1
continue
blocks[k][block_id].append(instr)
# merge instructions in blocks and into a circuit.
tick = stim.Circuit("TICK")
merged_circuit = stim.Circuit()
for n in range(num_ticks + 1):
merged_blocks = merge_operation_layers(
*[blocks[k][n] for k, _ in enumerate(circuits)]
)
merged_circuit += merged_blocks
if n != num_ticks:
merged_circuit += tick
return merged_circuit
[docs]
def merge_operation_layers(*operation_layers: stim.Circuit) -> stim.Circuit:
"""
Merges operation layers acting on different qubits to simplify
the final circuit.
It tries to merge the different blocks if they have the same sequence
of operations and noise channels, if not, blocks are stacked together.
This ensures that the output circuit has the same effect as the stacking
of all blocks.
Parameters
----------
operation_layers
Each operation layer is a ``stim.Circuit`` acting on different qubits.
A valid operation layer is a ``stim.Circuit`` in which the
qubits perform exactly one operation (without including noise channels).
Returns
-------
merged_blocks
A ``stim.Circuit`` having the same effect as stacking all the
given operation layers.
Notes
-----
The instructions in ``merged_blocks`` have been (correctly) merged so that
the lenght of the output circuit is minimal. Correctly means that the
order of the instructions has not been changed in a way that changes
the output of the circuit.
"""
# check which blocks can be merged to reduce the output circuit length
ops_blocks = [tuple(instr.name for instr in block) for block in operation_layers]
# avoid changing the order of the measurements, which happens if
# each operation layer contains the two circuit instructions corresponding to measurements.
if any(sum(names.count(n) for n in MEAS_INSTR) > 1 for names in ops_blocks):
return sum(operation_layers, start=stim.Circuit())
mergeable_blocks: dict[tuple[str, ...], list[stim.Circuit]] = {}
for block, op_block in zip(operation_layers, ops_blocks):
if op_block not in mergeable_blocks:
mergeable_blocks[op_block] = [block]
else:
mergeable_blocks[op_block].append(block)
# usually, there is only one op_block.
if len(mergeable_blocks) == 1:
merged_circuit = stim.Circuit()
for instrs in zip(*operation_layers):
for instr in instrs:
merged_circuit.append(instr)
return merged_circuit
# another common case is to have two op_blocks which
# one is the noiseless version of the other one.
if len(mergeable_blocks) == 2:
b1, b2 = list(mergeable_blocks)
b1, b2 = (b2, b1) if len(b2) > len(b1) else (b1, b2)
unique_b1 = len(b1) == len(set(b1))
if tuple(n for n in b1 if n not in NOISE_CHANNELS) == b2 and unique_b1:
lookup = {n: [] for n in b1}
for circ in operation_layers:
for instr in circ:
lookup[instr.name].append(instr)
merged_circuit = stim.Circuit()
for n in b1:
for instr in lookup[n]:
merged_circuit.append(instr)
return merged_circuit
# if there is at most a single mergeable block with measurements, we can merge the
# operations layer by layer without having an issue with measruement ordering.
# In the presence of multiple measurements, this can alter the measurement order, e.g.
# block1 = ("M", "X") and block2 = ("H", "M")
if sum(len(set(MEAS_INSTR).intersection(b)) for b in mergeable_blocks) <= 1:
max_length = len(max(ops_blocks, key=lambda x: len(x)))
merged_circuit = stim.Circuit()
for t in range(max_length):
for mblocks in mergeable_blocks.values():
for block in mblocks:
if t > len(block):
continue
# the trick with the indices ensures that the returned object
# is a stim.Circuit instead of a stim.CircuitInstruction
merged_circuit += block[t : t + 1]
return merged_circuit
# avoid problems with measurements
return sum(operation_layers, start=stim.Circuit())
def merge_iterators(
iterators: Sequence[LogicalOperation],
model: Model,
) -> stim.Circuit:
"""Merges a list of iterators that yield operation layers when initialized
with inputs ``(model, *layouts)``.
Note that it only adds the stim circuits yield by the iterators.
It does not add the appropiate detectors, logical observables, activate
or deactivate the ancilla detectors...
Parameters
----------
iterators
List of iterators to merge and the layouts used to instanciate them.
Each of the elements should be ``(LogOpCallable, Layout)`` or
``(LogOpCallable, Layout, Layout)``.
If they have different lenght, then idling is added to the corresponding qubits.
model
Noise model to use when generating and merging the circuit.
Returns
-------
circuit
Circuit containing the merged operations.
Notes
-----
This function ensures that the iterators create correct generators in the
sense that each correct operation layer is separated by a TICK.
See ``merge_operation_layers`` for more information.
"""
if any(not isinstance(i[0], LogOpCallable) for i in iterators):
raise TypeError(
"The first element for each entry in 'op_iterators' must be LogOpCallable."
)
layouts = sum([list(i[1:]) for i in iterators], start=[])
if len(layouts) != len(set(layouts)):
raise ValueError("Layouts are participating in more than one operation.")
circuit = stim.Circuit()
generators = [i[0](model, *i[1:]) for i in iterators]
tick_instr = stim.CircuitInstruction("TICK", [], [])
curr_block = [next(g, None) for g in generators]
while not all(b is None for b in curr_block):
# merge all ticks into a single tick.
# [TICK, None, None] still needs to be a single TICK
# As it is a TICK, no idling needs to be added.
tick_presence = [tick_instr in c for c in curr_block if c is not None]
if any(tick_presence) and not all(tick_presence):
raise ValueError("TICKs must appear at the same time in all iterators.")
if all(tick_presence):
circuit += merge_ticks([c for c in curr_block if c is not None])
curr_block = [next(g, None) for g in generators]
continue
# change 'None' to idling
for k, _ in enumerate(curr_block):
if curr_block[k] is None:
qubits = list(chain(*[l.qubits for l in iterators[k][1:]]))
curr_block[k] = model.idle(qubits)
if iterators[k][0].noiseless:
curr_block[k] = curr_block[k].without_noise()
circuit += merge_operation_layers(*curr_block)
curr_block = [next(g, None) for g in generators]
return circuit
[docs]
def merge_logical_operations(
op_iterators: list[LogicalOperation],
model: Model,
detectors: Detectors,
num_prev_meas: int | None = None,
anc_reset: bool | None = None,
anc_detectors: Collection[str] | None = None,
meas_to_obs: dict[int, tuple[int, ...]] | None = None,
) -> stim.Circuit:
"""
Returns a circuit in which the given logical operation iterators have been
merged and idle noise have been added if the iterators have different lenght.
Parameters
----------
op_iterators
List of logical operations to merge represented as a tuple of the operation
function iterator and the layout(s) to be applied to.
The functions need to have ``(model, *layouts)`` as signature.
There must be an entry for each layout except if it is participating
in a two-qubit gate, then there must be one entry per pair.
Each layout can only appear once, i.e. it can only perform one operation.
The TICK instructions must appear at the same time in all iterators
when iterating through them.
model
Noise model for the gates.
detectors
Detector definitions to use.
num_prev_meas
Number of logical measurements already included in the circuit.
It is used in the definition of the observables.
If ``meas_to_obs`` is ``None``, an observable is defined for
each measurement with observable index starting from ``num_prev_meas``.
anc_reset
If ``True``, ancillas are reset at the beginning of the QEC round.
anc_detectors
List of ancilla qubits for which to define the detectors.
If ``None``, adds all detectors. By default ``None``.
meas_to_obs
Dictionary with keys corresponding to the logical measurement indices and values
corresponding to the observable indices that the measurement has support on.
Note that one only needs to specify the logical measurements involved in
the given layer of operations.
By default, it defines an observable for every logical measurement.
Returns
-------
circuit
Circuit from merging the given iterators.
The circuit includes an observable for each logical measurement operation.
"""
if len(op_iterators) == 0:
return stim.Circuit()
if any(set(op[0].log_op_type) > set(VALID_OP_TYPES) for op in op_iterators):
raise TypeError("'op_iterators' must be valid operation types.")
qec_dets_ops = [
set(QEC_DETS_OP_TYPES).intersection(op[0].log_op_type) for op in op_iterators
]
qec_reset_ops = [
set(QEC_RESET_OP_TYPES).intersection(op[0].log_op_type) for op in op_iterators
]
if any(qec_dets_ops) and (not all(qec_dets_ops)):
raise ValueError(
"All logical qubits must be performing QEC cycles at the same time."
)
if any(qec_reset_ops) and (not all(qec_reset_ops)):
raise ValueError(
"All logical qubits must be performing QEC cycles at the same time."
)
if any(qec_reset_ops + qec_dets_ops) and anc_reset is None:
raise ValueError("QEC round found but 'anc_reset' is not specified.")
# change QEC round iterators to include 'anc_reset' (if needed)
# because the 'merge_iterators' only inputs Model and Layout(s), not 'anc_reset'
if any(qec_reset_ops):
if any(len(op[1:]) > 1 for op in op_iterators):
raise ValueError(
"Incorrect schedule format when specifying the QEC round iterators."
)
for k, (reset, _) in enumerate(zip(qec_reset_ops, op_iterators)):
if not reset:
continue
func = op_iterators[k][0]
func.anc_reset = anc_reset
op_iterators[k] = (func, op_iterators[k][1])
circuit = merge_iterators(op_iterators, model)
# update the detectors due to unitary gates.
# the detectors need to be updated before building them because of
# the mid-cycle logical gates.
for op in op_iterators:
func, layouts = op[0], op[1:]
if set(GATE_OP_TYPES).intersection(func.log_op_type) == set():
# detectors do not need to be updated
continue
# the only supported inter-layout operations are
# for layouts/codes that only contain a single logical qubit.
if {len(l.logical_qubits) for l in layouts} == {1}:
gate_label = func.__name__.replace("_iterator", "_")
gate_label += "_".join([l.logical_qubits[0] for l in layouts])
else:
gate_label = func.__name__.replace("_iterator", "")
new_stabs, new_stabs_inv = get_new_stab_dict_from_layout(layouts[0], gate_label)
# only single-qubit and two-qubit logical gates are supported.
if len(layouts) == 2:
new_stabs_2, new_stabs_2_inv = get_new_stab_dict_from_layout(
layouts[1], gate_label
)
new_stabs.update(new_stabs_2)
new_stabs_inv.update(new_stabs_2_inv)
detectors.update(new_stabs, new_stabs_inv)
# check if detectors need to be build because of QEC rounds.
# the detectors from the QEC rounds need to be built the ones
# from the logical measurements becuase of the 'qec_round_with_meas'.
if all(qec_dets_ops):
circuit += detectors.build_from_anc(
model.meas_target, anc_reset, anc_qubits=anc_detectors
)
# check if detectors needs to be built because of logical measurements.
meas_ops = [
k
for k, op in enumerate(op_iterators)
if set(MEAS_OP_TYPES).intersection(op[0].log_op_type)
]
if meas_ops:
if anc_reset is None:
raise ValueError(
"Logical measurement found but 'anc_reset' is not specified."
)
if num_prev_meas is None:
raise ValueError(
"Logical measurement found but 'num_prev_meas' is not specified."
)
if not isinstance(num_prev_meas, int):
raise TypeError(
f"'num_prev_meas' must be an int, but {type(num_prev_meas)} was given."
)
layouts = [op_iterators[k][1] for k in meas_ops]
rot_bases = [op_iterators[k][0].rot_basis for k in meas_ops]
# build the detectors from the logical measurement
reconstructable_stabs: list[str] = []
anc_support: dict[str, Collection[str]] = {}
for layout, rot_basis in zip(layouts, rot_bases):
stab_type = "x_type" if rot_basis else "z_type"
stabs = layout.get_qubits(role="anc", stab_type=stab_type)
reconstructable_stabs += stabs
anc_support.update(layout.get_support(stabs))
circuit += detectors.build_from_data(
model.meas_target,
anc_support,
anc_reset=anc_reset,
reconstructable_stabs=reconstructable_stabs,
anc_qubits=anc_detectors,
)
# add logical observable definitions
for layout, rot_basis in zip(layouts, rot_bases):
for log_qubit_label in layout.logical_qubits:
log_op = "log_x" if rot_basis else "log_z"
log_data_qubits: Collection[str] = layout.logical_param(
log_op, log_qubit_label
)
targets = [model.meas_target(qubit, -1) for qubit in log_data_qubits]
obs_inds = [num_prev_meas]
if meas_to_obs is not None:
obs_inds = meas_to_obs[num_prev_meas]
for obs_ind in obs_inds:
instr = stim.CircuitInstruction(
name="OBSERVABLE_INCLUDE",
targets=targets,
gate_args=[obs_ind],
)
circuit.append(instr)
num_prev_meas += 1
# deactivate (ancilla-qubit) detectors from the logical measurement.
for k in meas_ops:
detectors.deactivate_detectors(op_iterators[k][1].anc_qubits)
# check if detectors need to be activated due to the logical resets.
reset_ops = [
k
for k, i in enumerate(op_iterators)
if set(RESET_OP_TYPES).intersection(i[0].log_op_type)
]
if reset_ops:
for k in reset_ops:
# give information about gauge detectors to Detectors so that Detectors.include_gauge_detectors
# is the one specifying if gauge detectors are included or not.
# e.g. if reset in X basis, the Z stabilizers are gauge detectors
# for the case of encoding, there are no gauge stabilizers
if op_iterators[k][0].rot_basis is not None:
stab_type = "z_type" if op_iterators[k][0].rot_basis else "x_type"
gauge_dets = op_iterators[k][1].get_qubits(
role="anc", stab_type=stab_type
)
else:
gauge_dets: tuple[str, ...] = tuple()
detectors.activate_detectors(
op_iterators[k][1].anc_qubits, gauge_dets=gauge_dets
)
return circuit
[docs]
def merge_ticks(blocks: Collection[stim.Circuit]) -> stim.Circuit:
"""
Merges stim circuit containing TICK instructions and noise channels
so that only one TICK instruction is present while keeping if the noise
channels happened before of after the TICK.
It assumes that a TICK instruction is present in each block.
"""
tick_instr = stim.Circuit("TICK")[0]
circuit = stim.Circuit()
after_tick = stim.Circuit()
for block in blocks:
tick_idx = [k for k, i in enumerate(block) if i == tick_instr]
if len(tick_idx) != 1:
raise ValueError("A block from cannot have more than one TICK.")
tick_idx = tick_idx[0]
circuit += block[:tick_idx]
after_tick += block[tick_idx + 1 :]
circuit += stim.Circuit("TICK") + after_tick
return circuit
def merge_fake_operations(
log_ops: Sequence[LogicalOperation], model: Model
) -> stim.Circuit:
"""
Returns the circuit corresponding to merging all the given
iterators that do not correspond to 'true' logical operations,
in the sense that they do not apply any physcal gates to the qubits
(they only include noise, observable definitions...).
Note that it adds only a single ``TICK`` at the end of the circuit.
Parameters
----------
log_ops
Iterators to merge.
model
Noise model to use.
Note that it is only used to get the indicies of the qubits.
Returns
-------
circuit
Stim circuit corresponding to merging all the ``log_ops``.
A single ``TICK`` is added at the end of the circuit.
"""
circuit = stim.Circuit()
for log_op in log_ops:
op, layouts = log_op[0], log_op[1:]
circuit += sum(op(model, *layouts), start=stim.Circuit())
circuit += stim.Circuit("TICK")
return circuit
def group_logical_operations(
log_ops: Sequence[LogicalOperation],
) -> tuple[list[LogicalOperation], list[LogicalOperation], list[LogicalOperation]]:
"""
Splits the given list of logical operations into (1) the iterators that are
applied before the actual logical operations, (2) the actual logical operations,
and (3) the iterators that are applied after the actual logical operations.
Parameters
----------
log_ops
List of logical operations.
Each layout can only participate in a single actual logical operation, but
can participate in multiple logical noise operations.
Returns
-------
pre_fake_operations
Iterators that appear before the actual logical operations.
true_log_ops
Actual logical operations.
post_fake_operations
Iterators that appear after the actual logical operations.
"""
if not isinstance(log_ops, Sequence):
raise TypeError(f"'log_ops' must be a Sequence, but {type(log_ops)} was given.")
if any(not isinstance(i[0], LogOpCallable) for i in log_ops):
raise TypeError(
"The first element for each entry in 'log_ops' must be LogOpCallable."
)
pre_fake_operations: list[LogicalOperation] = []
true_log_ops: list[LogicalOperation] = []
post_fake_operations: list[LogicalOperation] = []
log_op_layouts: set[Layout] = set()
for log_op in log_ops:
op_types = log_op[0].log_op_type
if set(FAKE_OP_TYPES).intersection(op_types) != set():
if len(op_types) != 1:
raise TypeError(
f"'Fake' operations can only be of a single type, but '{op_types}' were found."
)
if len(log_op[1:]) != 1:
raise ValueError(
f"Only single-logical-qubit 'fake' operations are supported: {log_op[1:]}."
)
layout = log_op[1]
if layout not in log_op_layouts:
pre_fake_operations.append(log_op)
else:
post_fake_operations.append(log_op)
else:
if log_op_layouts.intersection(log_op[1:]) != set():
raise ValueError(
"Layouts must only perform one 'true' logical operation."
)
true_log_ops.append(log_op)
for layout in log_op[1:]:
log_op_layouts.add(layout)
return pre_fake_operations, true_log_ops, post_fake_operations