janus.circuit.dag 源代码

"""
Janus DAG (有向无环图) 电路表示

DAG 表示便于进行电路优化和分析
"""
from typing import List, Dict, Set, Optional, Iterator, Tuple
from dataclasses import dataclass
from enum import Enum


class NodeType(Enum):
    """DAG 节点类型"""
    INPUT = "input"      # 输入节点(量子比特初始状态)
    OUTPUT = "output"    # 输出节点(量子比特最终状态)
    OP = "op"            # 操作节点(量子门)


@dataclass
class DAGNode:
    """
    DAG 节点

    Attributes:
        node_id: 节点唯一标识
        node_type: 节点类型
        qubits: 关联的量子比特
        clbits: 关联的经典比特
        op: 操作(仅 OP 类型节点)
    """
    node_id: int
    node_type: NodeType
    qubits: List[int]
    clbits: List[int] = None
    op: 'Gate' = None

    def __post_init__(self):
        if self.clbits is None:
            self.clbits = []

    @property
    def qargs(self) -> List[int]:
        """兼容性别名: qargs -> qubits"""
        return self.qubits

    @property
    def cargs(self) -> List[int]:
        """兼容性别名: cargs -> clbits"""
        return self.clbits

    @property
    def name(self) -> str:
        if self.node_type == NodeType.INPUT:
            return f"input_q{self.qubits[0]}"
        elif self.node_type == NodeType.OUTPUT:
            return f"output_q{self.qubits[0]}"
        else:
            return self.op.name if self.op else "unknown"
    
    def __repr__(self) -> str:
        if self.node_type == NodeType.OP:
            return f"DAGOpNode({self.op}, qubits={self.qubits})"
        return f"DAGNode({self.node_type.value}, qubits={self.qubits})"
    
    def __hash__(self) -> int:
        return hash(self.node_id)
    
    def __eq__(self, other) -> bool:
        if isinstance(other, DAGNode):
            return self.node_id == other.node_id
        return False


[文档] class DAGCircuit: """ DAG 电路表示 将量子电路表示为有向无环图,其中: - 节点表示量子操作 - 边表示量子比特的数据流 Attributes: n_qubits: 量子比特数 n_clbits: 经典比特数 """
[文档] def __init__(self, n_qubits: int = 0, n_clbits: int = 0, name: str = None): self._n_qubits = n_qubits self._n_clbits = n_clbits self._name = name self._global_phase = 0.0 # 全局相位 self._metadata = {} # 元数据 # 节点存储 self._nodes: Dict[int, DAGNode] = {} self._next_node_id = 0 # 边存储: node_id -> set of successor node_ids self._successors: Dict[int, Set[int]] = {} self._predecessors: Dict[int, Set[int]] = {} # 输入输出节点 self._input_nodes: Dict[int, DAGNode] = {} # qubit -> input node self._output_nodes: Dict[int, DAGNode] = {} # qubit -> output node # 每个量子比特当前的最后一个节点 self._qubit_last_node: Dict[int, int] = {} # 初始化输入输出节点 self._init_io_nodes()
def _init_io_nodes(self): """初始化输入输出节点""" for q in range(self._n_qubits): # 输入节点 input_node = self._create_node(NodeType.INPUT, [q]) self._input_nodes[q] = input_node self._qubit_last_node[q] = input_node.node_id # 输出节点 output_node = self._create_node(NodeType.OUTPUT, [q]) self._output_nodes[q] = output_node def _create_node(self, node_type: NodeType, qubits: List[int], clbits: List[int] = None, op=None) -> DAGNode: """创建新节点""" node = DAGNode( node_id=self._next_node_id, node_type=node_type, qubits=qubits, clbits=clbits, op=op ) self._nodes[node.node_id] = node self._successors[node.node_id] = set() self._predecessors[node.node_id] = set() self._next_node_id += 1 return node def _add_edge(self, from_node: int, to_node: int): """添加边""" self._successors[from_node].add(to_node) self._predecessors[to_node].add(from_node) def _remove_edge(self, from_node: int, to_node: int): """移除边""" self._successors[from_node].discard(to_node) self._predecessors[to_node].discard(from_node) @property def n_qubits(self) -> int: return self._n_qubits @property def n_clbits(self) -> int: return self._n_clbits @property def name(self) -> str: """电路名称""" return self._name @name.setter def name(self, value: str): """设置电路名称""" self._name = value @property def global_phase(self) -> float: """全局相位""" return self._global_phase @global_phase.setter def global_phase(self, value: float): """设置全局相位""" self._global_phase = value @property def metadata(self) -> dict: """元数据字典""" return self._metadata @metadata.setter def metadata(self, value: dict): """设置元数据""" self._metadata = value if value is not None else {} @property def qubits(self) -> List[int]: """返回所有量子比特列表""" return list(range(self._n_qubits))
[文档] def add_qubits(self, qubits): """添加量子比特到DAG""" for q in qubits: if isinstance(q, int): qubit_idx = q else: qubit_idx = getattr(q, '_index', self._n_qubits) if qubit_idx >= self._n_qubits: # 扩展量子比特数 old_n = self._n_qubits self._n_qubits = qubit_idx + 1 # 为新量子比特创建输入输出节点 for new_q in range(old_n, self._n_qubits): input_node = self._create_node(NodeType.INPUT, [new_q]) self._input_nodes[new_q] = input_node self._qubit_last_node[new_q] = input_node.node_id output_node = self._create_node(NodeType.OUTPUT, [new_q]) self._output_nodes[new_q] = output_node
[文档] def add_clbits(self, clbits): """添加经典比特到DAG""" for c in clbits: if isinstance(c, int): clbit_idx = c else: clbit_idx = getattr(c, '_index', self._n_clbits) if clbit_idx >= self._n_clbits: self._n_clbits = clbit_idx + 1
@property def clbits(self) -> List[int]: """返回所有经典比特列表""" return list(range(self._n_clbits)) @property def qregs(self) -> Dict: """返回量子寄存器字典(返回空字典stub)""" return {} @property def cregs(self) -> Dict: """返回经典寄存器字典(返回空字典stub)""" return {}
[文档] def add_qreg(self, qreg): """添加量子寄存器 (stub)""" pass
[文档] def add_creg(self, creg): """添加经典寄存器 (stub)""" pass
[文档] def num_qubits(self) -> int: """返回量子比特数量(方法形式)""" return self._n_qubits
[文档] def num_clbits(self) -> int: """返回经典比特数量(方法形式)""" return self._n_clbits
[文档] def apply_operation(self, op, qubits: List[int], clbits: List[int] = None) -> DAGNode: """ 添加一个操作到 DAG Args: op: 量子操作(Gate) qubits: 作用的量子比特 clbits: 作用的经典比特 Returns: 创建的 DAGNode """ # 创建操作节点 node = self._create_node(NodeType.OP, qubits, clbits, op) # 连接前驱节点 for q in qubits: last_node_id = self._qubit_last_node[q] self._add_edge(last_node_id, node.node_id) self._qubit_last_node[q] = node.node_id return node
[文档] def apply_operation_back(self, op, qubits: List[int], clbits: List[int] = None) -> DAGNode: """ 添加一个操作到 DAG (在末尾) 兼容性方法: apply_operation_back 与 apply_operation 相同 Args: op: 量子操作(Gate) qubits: 作用的量子比特 clbits: 作用的经典比特 Returns: 创建的 DAGNode """ return self.apply_operation(op, qubits, clbits)
[文档] def finalize(self): """完成 DAG 构建,连接到输出节点""" for q in range(self._n_qubits): last_node_id = self._qubit_last_node[q] output_node = self._output_nodes[q] self._add_edge(last_node_id, output_node.node_id)
[文档] def op_nodes(self) -> Iterator[DAGNode]: """迭代所有操作节点""" for node in self._nodes.values(): if node.node_type == NodeType.OP: yield node
[文档] def topological_op_nodes(self) -> Iterator[DAGNode]: """按拓扑顺序迭代操作节点""" # Kahn's algorithm in_degree = {nid: len(self._predecessors[nid]) for nid in self._nodes} queue = [nid for nid, deg in in_degree.items() if deg == 0] while queue: node_id = queue.pop(0) node = self._nodes[node_id] if node.node_type == NodeType.OP: yield node for succ_id in self._successors[node_id]: in_degree[succ_id] -= 1 if in_degree[succ_id] == 0: queue.append(succ_id)
[文档] def control_flow_op_nodes(self) -> Iterator[DAGNode]: """ 迭代控制流操作节点(for, while, if等) Returns: Iterator[DAGNode]: 控制流节点迭代器 """ # 目前Janus不支持控制流,返回空迭代器 # 如果未来支持,需要检查node.op的类型 return iter([])
[文档] def layers(self) -> List[List[DAGNode]]: """ 获取 DAG 的分层表示 Returns: 每层包含可并行执行的操作节点 """ result = [] qubit_layer = {q: -1 for q in range(self._n_qubits)} for node in self.topological_op_nodes(): # 计算该节点应该在哪一层 layer_idx = 0 for q in node.qubits: layer_idx = max(layer_idx, qubit_layer[q] + 1) # 确保有足够的层 while len(result) <= layer_idx: result.append([]) result[layer_idx].append(node) # 更新量子比特的层 for q in node.qubits: qubit_layer[q] = layer_idx return result
[文档] def depth(self, recurse: bool = False) -> int: """ 获取 DAG 深度 Args: recurse: 是否递归计算嵌套电路的深度 (暂时忽略) Returns: DAG 的深度(层数) """ return len(self.layers())
[文档] def width(self) -> int: """ 获取 DAG 宽度 Returns: 量子比特数 + 经典比特数 """ return self._n_qubits + self._n_clbits
[文档] def size(self, recurse: bool = False) -> int: """ 获取 DAG 大小(操作节点数量) Args: recurse: 是否递归计算嵌套电路的大小 (暂时忽略) Returns: 操作节点的数量 """ return sum(1 for _ in self.op_nodes())
[文档] def count_ops(self, recurse: bool = False) -> Dict[str, int]: """ 统计各类操作的数量 Args: recurse: 是否递归计算嵌套电路的操作 (暂时忽略) Returns: 操作名称到数量的映射 """ counts = {} for node in self.op_nodes(): name = node.op.name if node.op else "unknown" counts[name] = counts.get(name, 0) + 1 return counts
[文档] def num_tensor_factors(self) -> int: """ 计算 DAG 电路中的张量因子数量 张量因子是指独立的量子子系统,即没有门连接的量子比特组。 使用并查集(Union-Find)算法来找出连通的量子比特组。 Returns: 张量因子的数量 """ if self._n_qubits == 0: return 0 # 并查集数据结构 parent = list(range(self._n_qubits)) def find(x): if parent[x] != x: parent[x] = find(parent[x]) return parent[x] def union(x, y): root_x = find(x) root_y = find(y) if root_x != root_y: parent[root_x] = root_y # 遍历所有操作节点,连接相关的量子比特 for node in self.op_nodes(): if len(node.qubits) >= 2: # 多量子比特门,连接所有相关的量子比特 for i in range(len(node.qubits) - 1): union(node.qubits[i], node.qubits[i + 1]) # 计算独立的组数 return len(set(find(q) for q in range(self._n_qubits)))
[文档] def collect_runs(self, gate_names: List[str]) -> List[List[DAGNode]]: """ 收集 DAG 中指定门类型的连续运行序列 在拓扑顺序中,找出指定门类型在同一量子比特上的连续序列。 Args: gate_names: 要收集的门名称列表 Returns: 连续运行的门节点列表的列表 """ runs = [] # 对每个量子比特跟踪当前的运行序列 qubit_runs = {q: [] for q in range(self._n_qubits)} # 按拓扑顺序遍历节点 for node in self.topological_op_nodes(): # 检查是否是目标门类型 gate_name = node.op.name if node.op else "" if gate_name in gate_names and len(node.qubits) == 1: # 单量子比特门,添加到对应量子比特的运行序列 qubit = node.qubits[0] qubit_runs[qubit].append(node) else: # 不是目标门或是多量子比特门,保存并清空相关量子比特的运行序列 for q in node.qubits: if qubit_runs[q]: if len(qubit_runs[q]) > 0: runs.append(qubit_runs[q]) qubit_runs[q] = [] # 保存所有剩余的运行序列 for q in range(self._n_qubits): if qubit_runs[q]: runs.append(qubit_runs[q]) return runs
[文档] def collect_1q_runs(self) -> List[List[DAGNode]]: """ 收集单量子比特门的连续序列 Returns: 单量子比特门序列列表,每个序列是连续作用在同一量子比特上的单量子比特门 """ runs = [] qubit_runs = {q: [] for q in range(self._n_qubits)} for node in self.topological_op_nodes(): # 只处理单量子比特门 if len(node.qubits) == 1: q = node.qubits[0] qubit_runs[q].append(node) else: # 遇到多量子比特门,结束当前运行 for q in node.qubits: if qubit_runs[q]: runs.append(qubit_runs[q]) qubit_runs[q] = [] # 添加剩余的运行 for q in range(self._n_qubits): if qubit_runs[q]: runs.append(qubit_runs[q]) return runs
[文档] def collect_2q_runs(self) -> List[List[DAGNode]]: """ 收集双量子比特门块 Returns: 双量子比特门块列表,每个块包含连续的双量子比特门 """ blocks = [] current_block = [] used_qubits = set() for node in self.topological_op_nodes(): # 只处理双量子比特门 if len(node.qubits) == 2: # 检查是否与当前块有量子比特冲突 node_qubits = set(node.qubits) if not current_block or not (node_qubits & used_qubits): # 可以添加到当前块 current_block.append(node) used_qubits.update(node_qubits) else: # 开始新块 if current_block: blocks.append(current_block) current_block = [node] used_qubits = node_qubits else: # 遇到非双量子比特门,结束当前块 if current_block: blocks.append(current_block) current_block = [] used_qubits = set() # 添加最后一个块 if current_block: blocks.append(current_block) return blocks
[文档] def copy_empty_like(self) -> 'DAGCircuit': """ 创建一个空的DAG副本,保留量子比特和经典比特数量,但不包含操作节点 Returns: 空的DAGCircuit对象 """ return DAGCircuit(self._n_qubits, self._n_clbits)
[文档] def predecessors(self, node: DAGNode) -> Iterator[DAGNode]: """获取节点的前驱""" for pred_id in self._predecessors[node.node_id]: yield self._nodes[pred_id]
[文档] def successors(self, node: DAGNode) -> Iterator[DAGNode]: """获取节点的后继""" for succ_id in self._successors[node.node_id]: yield self._nodes[succ_id]
[文档] def remove_op_node(self, node: DAGNode): """ 移除一个操作节点,重新连接其前驱和后继 """ if node.node_type != NodeType.OP: raise ValueError("Can only remove OP nodes") # 对于每个量子比特,连接前驱到后继 for q in node.qubits: # 找到该量子比特上的前驱和后继 pred = None succ = None for p in self.predecessors(node): if q in p.qubits: pred = p break for s in self.successors(node): if q in s.qubits: succ = s break if pred and succ: self._add_edge(pred.node_id, succ.node_id) # 移除所有边 for pred_id in list(self._predecessors[node.node_id]): self._remove_edge(pred_id, node.node_id) for succ_id in list(self._successors[node.node_id]): self._remove_edge(node.node_id, succ_id) # 移除节点 del self._nodes[node.node_id] del self._successors[node.node_id] del self._predecessors[node.node_id]
[文档] def substitute_node(self, node: DAGNode, new_op, inplace: bool = False) -> DAGNode: """ 替换节点的操作 Args: node: 要替换的节点 new_op: 新的操作 inplace: 是否原地替换(兼容参数,总是原地替换) Returns: 替换后的节点 """ if node.node_type != NodeType.OP: raise ValueError("Can only substitute OP nodes") node.op = new_op return node
def __repr__(self) -> str: op_count = sum(1 for _ in self.op_nodes()) return f"DAGCircuit(n_qubits={self._n_qubits}, ops={op_count}, depth={self.depth()})"
[文档] def circuit_to_dag(circuit) -> DAGCircuit: """ 将 Circuit 转换为 DAGCircuit Args: circuit: Janus Circuit 或 QuantumCircuit Returns: DAGCircuit """ # 处理 QuantumCircuit 兼容性 if hasattr(circuit, 'num_qubits'): n_qubits = circuit.num_qubits n_clbits = getattr(circuit, 'num_clbits', 0) else: n_qubits = circuit.n_qubits n_clbits = circuit.n_clbits dag = DAGCircuit(n_qubits, n_clbits) # 处理指令 if hasattr(circuit, 'instructions'): instructions = circuit.instructions elif hasattr(circuit, 'data'): instructions = circuit.data else: instructions = [] for inst in instructions: if hasattr(inst, 'operation'): # Janus Circuit 格式 dag.apply_operation(inst.operation, inst.qubits, inst.clbits) else: # QuantumCircuit 格式 - 简化处理 try: operation = inst.operation if hasattr(inst, 'operation') else inst[0] qubits = [q._index for q in inst.qubits] if hasattr(inst, 'qubits') else [q._index for q in inst[1]] clbits = [c._index for c in inst.clbits] if hasattr(inst, 'clbits') else [] dag.apply_operation(operation, qubits, clbits) except: # 如果转换失败,跳过这个指令 continue dag.finalize() return dag
[文档] def dag_to_circuit(dag: DAGCircuit) -> 'Circuit': """ 将 DAGCircuit 转换为 Circuit Args: dag: DAGCircuit Returns: Janus Circuit """ from .circuit import Circuit circuit = Circuit(dag.n_qubits, dag.n_clbits) for node in dag.topological_op_nodes(): circuit.append(node.op.copy(), node.qubits, node.clbits) return circuit
# Compatibility aliases for node types class DAGOpNode(DAGNode): """Operation node in the DAG - alias for DAGNode with OP type""" def __init__(self, op, qubits, clbits=None, node_id=None): super().__init__( node_id=node_id if node_id is not None else id(self), node_type=NodeType.OP, qubits=qubits, clbits=clbits or [], op=op ) class DAGInNode(DAGNode): """Input node in the DAG - alias for DAGNode with INPUT type""" def __init__(self, qubit, node_id=None): super().__init__( node_id=node_id if node_id is not None else id(self), node_type=NodeType.INPUT, qubits=[qubit], clbits=[], op=None ) class DAGOutNode(DAGNode): """Output node in the DAG - alias for DAGNode with OUTPUT type""" def __init__(self, qubit, node_id=None): super().__init__( node_id=node_id if node_id is not None else id(self), node_type=NodeType.OUTPUT, qubits=[qubit], clbits=[], op=None )