Source code for surface_sim.util.circuit_operations

from collections.abc import Collection, Sequence
from itertools import chain

import stim

from ..circuit_blocks.decorators import (
    LogicalOperation,
    LogOpCallable,
)
from ..detectors import Detectors, get_new_stab_dict_from_layout
from ..layouts.layout import Layout
from ..models import Model
from .circuit_modifications import NOISE_CHANNELS

MEAS_INSTR = [
    "M",
    "MR",
    "MRX",
    "MRY",
    "MRZ",
    "MX",
    "MY",
    "MZ",
    "MXX",
    "MYY",
    "MZZ",
    "MPP",
]

QEC_OP_TYPES = ["qec_round", "to_mid_cycle_circuit", "to_end_cycle_circuit"]
QEC_DETS_OP_TYPES = ["qec_round", "to_end_cycle_circuit"]
QEC_RESET_OP_TYPES = ["qec_round", "to_mid_cycle_circuit"]
GATE_OP_TYPES = ["sq_unitary_gate", "tq_unitary_gate"]
MEAS_OP_TYPES = ["measurement"]
RESET_OP_TYPES = ["qubit_init", "qubit_encoding"]
FAKE_OP_TYPES = ["logical_noise", "pauli_observable"]
TRUE_OP_TYPES = QEC_OP_TYPES + GATE_OP_TYPES + MEAS_OP_TYPES + RESET_OP_TYPES
VALID_OP_TYPES = TRUE_OP_TYPES + FAKE_OP_TYPES


[docs] def merge_circuits(*circuits: stim.Circuit) -> stim.Circuit: """ Returns a circuit in which the given circuits have been merged following the TICK blocks. The number of operations between TICKs must be the same for all qubits. The circuit must not include any measurement because if they get moved, then the ``rec[-i]`` indexes do not work. Parameters ---------- *circuits Circuits to merge. Returns ------- merged_circuit Circuit from merging the given circuits. """ if any(not isinstance(c, stim.Circuit) for c in circuits): raise TypeError("The given circuits are not stim.Circuits.") if len(set(c.num_ticks for c in circuits)) != 1: raise ValueError("All the circuits must have the same number of TICKs.") # split circuits into TICK blocks num_ticks = circuits[0].num_ticks blocks = [[stim.Circuit() for _ in range(num_ticks + 1)] for _ in circuits] for k, circuit in enumerate(circuits): block_id = 0 for instr in circuit.flattened(): if instr.name in MEAS_INSTR: raise ValueError("Circuits cannot contain measurements.") if instr.name == "TICK": block_id += 1 continue blocks[k][block_id].append(instr) # merge instructions in blocks and into a circuit. tick = stim.Circuit("TICK") merged_circuit = stim.Circuit() for n in range(num_ticks + 1): merged_blocks = merge_operation_layers( *[blocks[k][n] for k, _ in enumerate(circuits)] ) merged_circuit += merged_blocks if n != num_ticks: merged_circuit += tick return merged_circuit
[docs] def merge_operation_layers(*operation_layers: stim.Circuit) -> stim.Circuit: """ Merges operation layers acting on different qubits to simplify the final circuit. It tries to merge the different blocks if they have the same sequence of operations and noise channels, if not, blocks are stacked together. This ensures that the output circuit has the same effect as the stacking of all blocks. Parameters ---------- operation_layers Each operation layer is a ``stim.Circuit`` acting on different qubits. A valid operation layer is a ``stim.Circuit`` in which the qubits perform exactly one operation (without including noise channels). Returns ------- merged_blocks A ``stim.Circuit`` having the same effect as stacking all the given operation layers. Notes ----- The instructions in ``merged_blocks`` have been (correctly) merged so that the lenght of the output circuit is minimal. Correctly means that the order of the instructions has not been changed in a way that changes the output of the circuit. """ # check which blocks can be merged to reduce the output circuit length ops_blocks = [tuple(instr.name for instr in block) for block in operation_layers] # avoid changing the order of the measurements, which happens if # each operation layer contains the two circuit instructions corresponding to measurements. if any(sum(names.count(n) for n in MEAS_INSTR) > 1 for names in ops_blocks): return sum(operation_layers, start=stim.Circuit()) mergeable_blocks: dict[tuple[str, ...], list[stim.Circuit]] = {} for block, op_block in zip(operation_layers, ops_blocks): if op_block not in mergeable_blocks: mergeable_blocks[op_block] = [block] else: mergeable_blocks[op_block].append(block) # usually, there is only one op_block. if len(mergeable_blocks) == 1: merged_circuit = stim.Circuit() for instrs in zip(*operation_layers): for instr in instrs: merged_circuit.append(instr) return merged_circuit # another common case is to have two op_blocks which # one is the noiseless version of the other one. if len(mergeable_blocks) == 2: b1, b2 = list(mergeable_blocks) b1, b2 = (b2, b1) if len(b2) > len(b1) else (b1, b2) unique_b1 = len(b1) == len(set(b1)) if tuple(n for n in b1 if n not in NOISE_CHANNELS) == b2 and unique_b1: lookup = {n: [] for n in b1} for circ in operation_layers: for instr in circ: lookup[instr.name].append(instr) merged_circuit = stim.Circuit() for n in b1: for instr in lookup[n]: merged_circuit.append(instr) return merged_circuit # if there is at most a single mergeable block with measurements, we can merge the # operations layer by layer without having an issue with measruement ordering. # In the presence of multiple measurements, this can alter the measurement order, e.g. # block1 = ("M", "X") and block2 = ("H", "M") if sum(len(set(MEAS_INSTR).intersection(b)) for b in mergeable_blocks) <= 1: max_length = len(max(ops_blocks, key=lambda x: len(x))) merged_circuit = stim.Circuit() for t in range(max_length): for mblocks in mergeable_blocks.values(): for block in mblocks: if t > len(block): continue # the trick with the indices ensures that the returned object # is a stim.Circuit instead of a stim.CircuitInstruction merged_circuit += block[t : t + 1] return merged_circuit # avoid problems with measurements return sum(operation_layers, start=stim.Circuit())
def merge_iterators( iterators: Sequence[LogicalOperation], model: Model, ) -> stim.Circuit: """Merges a list of iterators that yield operation layers when initialized with inputs ``(model, *layouts)``. Note that it only adds the stim circuits yield by the iterators. It does not add the appropiate detectors, logical observables, activate or deactivate the ancilla detectors... Parameters ---------- iterators List of iterators to merge and the layouts used to instanciate them. Each of the elements should be ``(LogOpCallable, Layout)`` or ``(LogOpCallable, Layout, Layout)``. If they have different lenght, then idling is added to the corresponding qubits. model Noise model to use when generating and merging the circuit. Returns ------- circuit Circuit containing the merged operations. Notes ----- This function ensures that the iterators create correct generators in the sense that each correct operation layer is separated by a TICK. See ``merge_operation_layers`` for more information. """ if any(not isinstance(i[0], LogOpCallable) for i in iterators): raise TypeError( "The first element for each entry in 'op_iterators' must be LogOpCallable." ) layouts = sum([list(i[1:]) for i in iterators], start=[]) if len(layouts) != len(set(layouts)): raise ValueError("Layouts are participating in more than one operation.") circuit = stim.Circuit() generators = [i[0](model, *i[1:]) for i in iterators] tick_instr = stim.CircuitInstruction("TICK", [], []) curr_block = [next(g, None) for g in generators] while not all(b is None for b in curr_block): # merge all ticks into a single tick. # [TICK, None, None] still needs to be a single TICK # As it is a TICK, no idling needs to be added. tick_presence = [tick_instr in c for c in curr_block if c is not None] if any(tick_presence) and not all(tick_presence): raise ValueError("TICKs must appear at the same time in all iterators.") if all(tick_presence): circuit += merge_ticks([c for c in curr_block if c is not None]) curr_block = [next(g, None) for g in generators] continue # change 'None' to idling for k, _ in enumerate(curr_block): if curr_block[k] is None: qubits = list(chain(*[l.qubits for l in iterators[k][1:]])) curr_block[k] = model.idle(qubits) if iterators[k][0].noiseless: curr_block[k] = curr_block[k].without_noise() circuit += merge_operation_layers(*curr_block) curr_block = [next(g, None) for g in generators] return circuit
[docs] def merge_logical_operations( op_iterators: list[LogicalOperation], model: Model, detectors: Detectors, num_prev_meas: int | None = None, anc_reset: bool | None = None, anc_detectors: Collection[str] | None = None, meas_to_obs: dict[int, tuple[int, ...]] | None = None, ) -> stim.Circuit: """ Returns a circuit in which the given logical operation iterators have been merged and idle noise have been added if the iterators have different lenght. Parameters ---------- op_iterators List of logical operations to merge represented as a tuple of the operation function iterator and the layout(s) to be applied to. The functions need to have ``(model, *layouts)`` as signature. There must be an entry for each layout except if it is participating in a two-qubit gate, then there must be one entry per pair. Each layout can only appear once, i.e. it can only perform one operation. The TICK instructions must appear at the same time in all iterators when iterating through them. model Noise model for the gates. detectors Detector definitions to use. num_prev_meas Number of logical measurements already included in the circuit. It is used in the definition of the observables. If ``meas_to_obs`` is ``None``, an observable is defined for each measurement with observable index starting from ``num_prev_meas``. anc_reset If ``True``, ancillas are reset at the beginning of the QEC round. anc_detectors List of ancilla qubits for which to define the detectors. If ``None``, adds all detectors. By default ``None``. meas_to_obs Dictionary with keys corresponding to the logical measurement indices and values corresponding to the observable indices that the measurement has support on. Note that one only needs to specify the logical measurements involved in the given layer of operations. By default, it defines an observable for every logical measurement. Returns ------- circuit Circuit from merging the given iterators. The circuit includes an observable for each logical measurement operation. """ if len(op_iterators) == 0: return stim.Circuit() if any(set(op[0].log_op_type) > set(VALID_OP_TYPES) for op in op_iterators): raise TypeError("'op_iterators' must be valid operation types.") qec_dets_ops = [ set(QEC_DETS_OP_TYPES).intersection(op[0].log_op_type) for op in op_iterators ] qec_reset_ops = [ set(QEC_RESET_OP_TYPES).intersection(op[0].log_op_type) for op in op_iterators ] if any(qec_dets_ops) and (not all(qec_dets_ops)): raise ValueError( "All logical qubits must be performing QEC cycles at the same time." ) if any(qec_reset_ops) and (not all(qec_reset_ops)): raise ValueError( "All logical qubits must be performing QEC cycles at the same time." ) if any(qec_reset_ops + qec_dets_ops) and anc_reset is None: raise ValueError("QEC round found but 'anc_reset' is not specified.") # change QEC round iterators to include 'anc_reset' (if needed) # because the 'merge_iterators' only inputs Model and Layout(s), not 'anc_reset' if any(qec_reset_ops): if any(len(op[1:]) > 1 for op in op_iterators): raise ValueError( "Incorrect schedule format when specifying the QEC round iterators." ) for k, (reset, _) in enumerate(zip(qec_reset_ops, op_iterators)): if not reset: continue func = op_iterators[k][0] func.anc_reset = anc_reset op_iterators[k] = (func, op_iterators[k][1]) circuit = merge_iterators(op_iterators, model) # update the detectors due to unitary gates. # the detectors need to be updated before building them because of # the mid-cycle logical gates. for op in op_iterators: func, layouts = op[0], op[1:] if set(GATE_OP_TYPES).intersection(func.log_op_type) == set(): # detectors do not need to be updated continue # the only supported inter-layout operations are # for layouts/codes that only contain a single logical qubit. if {len(l.logical_qubits) for l in layouts} == {1}: gate_label = func.__name__.replace("_iterator", "_") gate_label += "_".join([l.logical_qubits[0] for l in layouts]) else: gate_label = func.__name__.replace("_iterator", "") new_stabs, new_stabs_inv = get_new_stab_dict_from_layout(layouts[0], gate_label) # only single-qubit and two-qubit logical gates are supported. if len(layouts) == 2: new_stabs_2, new_stabs_2_inv = get_new_stab_dict_from_layout( layouts[1], gate_label ) new_stabs.update(new_stabs_2) new_stabs_inv.update(new_stabs_2_inv) detectors.update(new_stabs, new_stabs_inv) # check if detectors need to be build because of QEC rounds. # the detectors from the QEC rounds need to be built the ones # from the logical measurements becuase of the 'qec_round_with_meas'. if all(qec_dets_ops): circuit += detectors.build_from_anc( model.meas_target, anc_reset, anc_qubits=anc_detectors ) # check if detectors needs to be built because of logical measurements. meas_ops = [ k for k, op in enumerate(op_iterators) if set(MEAS_OP_TYPES).intersection(op[0].log_op_type) ] if meas_ops: if anc_reset is None: raise ValueError( "Logical measurement found but 'anc_reset' is not specified." ) if num_prev_meas is None: raise ValueError( "Logical measurement found but 'num_prev_meas' is not specified." ) if not isinstance(num_prev_meas, int): raise TypeError( f"'num_prev_meas' must be an int, but {type(num_prev_meas)} was given." ) layouts = [op_iterators[k][1] for k in meas_ops] rot_bases = [op_iterators[k][0].rot_basis for k in meas_ops] # build the detectors from the logical measurement reconstructable_stabs: list[str] = [] anc_support: dict[str, Collection[str]] = {} for layout, rot_basis in zip(layouts, rot_bases): stab_type = "x_type" if rot_basis else "z_type" stabs = layout.get_qubits(role="anc", stab_type=stab_type) reconstructable_stabs += stabs anc_support.update(layout.get_support(stabs)) circuit += detectors.build_from_data( model.meas_target, anc_support, anc_reset=anc_reset, reconstructable_stabs=reconstructable_stabs, anc_qubits=anc_detectors, ) # add logical observable definitions for layout, rot_basis in zip(layouts, rot_bases): for log_qubit_label in layout.logical_qubits: log_op = "log_x" if rot_basis else "log_z" log_data_qubits: Collection[str] = layout.logical_param( log_op, log_qubit_label ) targets = [model.meas_target(qubit, -1) for qubit in log_data_qubits] obs_inds = [num_prev_meas] if meas_to_obs is not None: obs_inds = meas_to_obs[num_prev_meas] for obs_ind in obs_inds: instr = stim.CircuitInstruction( name="OBSERVABLE_INCLUDE", targets=targets, gate_args=[obs_ind], ) circuit.append(instr) num_prev_meas += 1 # deactivate (ancilla-qubit) detectors from the logical measurement. for k in meas_ops: detectors.deactivate_detectors(op_iterators[k][1].anc_qubits) # check if detectors need to be activated due to the logical resets. reset_ops = [ k for k, i in enumerate(op_iterators) if set(RESET_OP_TYPES).intersection(i[0].log_op_type) ] if reset_ops: for k in reset_ops: # give information about gauge detectors to Detectors so that Detectors.include_gauge_detectors # is the one specifying if gauge detectors are included or not. # e.g. if reset in X basis, the Z stabilizers are gauge detectors # for the case of encoding, there are no gauge stabilizers if op_iterators[k][0].rot_basis is not None: stab_type = "z_type" if op_iterators[k][0].rot_basis else "x_type" gauge_dets = op_iterators[k][1].get_qubits( role="anc", stab_type=stab_type ) else: gauge_dets: tuple[str, ...] = tuple() detectors.activate_detectors( op_iterators[k][1].anc_qubits, gauge_dets=gauge_dets ) return circuit
[docs] def merge_ticks(blocks: Collection[stim.Circuit]) -> stim.Circuit: """ Merges stim circuit containing TICK instructions and noise channels so that only one TICK instruction is present while keeping if the noise channels happened before of after the TICK. It assumes that a TICK instruction is present in each block. """ tick_instr = stim.Circuit("TICK")[0] circuit = stim.Circuit() after_tick = stim.Circuit() for block in blocks: tick_idx = [k for k, i in enumerate(block) if i == tick_instr] if len(tick_idx) != 1: raise ValueError("A block from cannot have more than one TICK.") tick_idx = tick_idx[0] circuit += block[:tick_idx] after_tick += block[tick_idx + 1 :] circuit += stim.Circuit("TICK") + after_tick return circuit
def merge_fake_operations( log_ops: Sequence[LogicalOperation], model: Model ) -> stim.Circuit: """ Returns the circuit corresponding to merging all the given iterators that do not correspond to 'true' logical operations, in the sense that they do not apply any physcal gates to the qubits (they only include noise, observable definitions...). Note that it adds only a single ``TICK`` at the end of the circuit. Parameters ---------- log_ops Iterators to merge. model Noise model to use. Note that it is only used to get the indicies of the qubits. Returns ------- circuit Stim circuit corresponding to merging all the ``log_ops``. A single ``TICK`` is added at the end of the circuit. """ circuit = stim.Circuit() for log_op in log_ops: op, layouts = log_op[0], log_op[1:] circuit += sum(op(model, *layouts), start=stim.Circuit()) circuit += stim.Circuit("TICK") return circuit def group_logical_operations( log_ops: Sequence[LogicalOperation], ) -> tuple[list[LogicalOperation], list[LogicalOperation], list[LogicalOperation]]: """ Splits the given list of logical operations into (1) the iterators that are applied before the actual logical operations, (2) the actual logical operations, and (3) the iterators that are applied after the actual logical operations. Parameters ---------- log_ops List of logical operations. Each layout can only participate in a single actual logical operation, but can participate in multiple logical noise operations. Returns ------- pre_fake_operations Iterators that appear before the actual logical operations. true_log_ops Actual logical operations. post_fake_operations Iterators that appear after the actual logical operations. """ if not isinstance(log_ops, Sequence): raise TypeError(f"'log_ops' must be a Sequence, but {type(log_ops)} was given.") if any(not isinstance(i[0], LogOpCallable) for i in log_ops): raise TypeError( "The first element for each entry in 'log_ops' must be LogOpCallable." ) pre_fake_operations: list[LogicalOperation] = [] true_log_ops: list[LogicalOperation] = [] post_fake_operations: list[LogicalOperation] = [] log_op_layouts: set[Layout] = set() for log_op in log_ops: op_types = log_op[0].log_op_type if set(FAKE_OP_TYPES).intersection(op_types) != set(): if len(op_types) != 1: raise TypeError( f"'Fake' operations can only be of a single type, but '{op_types}' were found." ) if len(log_op[1:]) != 1: raise ValueError( f"Only single-logical-qubit 'fake' operations are supported: {log_op[1:]}." ) layout = log_op[1] if layout not in log_op_layouts: pre_fake_operations.append(log_op) else: post_fake_operations.append(log_op) else: if log_op_layouts.intersection(log_op[1:]) != set(): raise ValueError( "Layouts must only perform one 'true' logical operation." ) true_log_ops.append(log_op) for layout in log_op[1:]: log_op_layouts.add(layout) return pre_fake_operations, true_log_ops, post_fake_operations