Source code for qbiocode.learning.compute_qensemble

"""
Quantum Ensemble Learning Module
=================================

This module implements quantum ensemble learning algorithms using controlled
swap operations and quantum superposition to create ensembles of training data
arrangements. The ensemble leverages quantum superposition to evaluate multiple
training set configurations simultaneously.

Supports two ensemble construction methods:
- Fixed swap patterns: Deterministic controlled-SWAP operations
- Random unitaries: Haar-random unitary transformations

References
----------
- Macaluso et al., "A variational algorithm for quantum ensemble learning"
  IET Quantum Communication (2023)
- Rhrissorrakrai et al., "Quantum Ensembling Methods for Healthcare and Life Science"
  arXiv:2506.02213 (2025)
  https://arxiv.org/abs/2506.02213
"""

import time
import numpy as np
import scipy.stats
from typing import List, Literal

# Qiskit imports
from qiskit import QuantumCircuit, ClassicalRegister, QuantumRegister
from qiskit.circuit.library import UnitaryGate

# Local imports
from qbiocode.evaluation.model_evaluation import modeleval
from qbiocode.utils import (
    normalize_data,
    label_to_array,
    prepare_training_set,
    retrieve_probabilities,
    execute_circuit,
)


[docs] def build_cosine_classifier(train: np.ndarray, test: np.ndarray, label_train: np.ndarray) -> QuantumCircuit: """ Build quantum cosine similarity classifier circuit. Implements a quantum cosine similarity classifier using controlled swap operations and Hadamard gates to measure similarity between training and test data points. Parameters ---------- train : np.ndarray Training data point as normalized vector (length must be power of 2) test : np.ndarray Test data point as normalized vector label_train : np.ndarray Training label as normalized probability vector [p0, p1] Returns ------- QuantumCircuit Quantum circuit implementing the cosine classifier Notes ----- The circuit computes P(0) = 1/2 + 1/2 * |<train|test>|^2 """ qubits_per = int(np.log2(len(train))) c = ClassicalRegister(1, 'c') x_train = QuantumRegister(qubits_per, 'x_train') x_test = QuantumRegister(qubits_per, 'x_test') y_train = QuantumRegister(1, 'y_train') y_test = QuantumRegister(1, 'y_test') qc = QuantumCircuit(x_train, x_test, y_train, y_test, c) # Initialize states - convert to list for Qiskit compatibility qc.initialize(train.tolist() if isinstance(train, np.ndarray) else train, [x_train]) qc.initialize(test.tolist() if isinstance(test, np.ndarray) else test, [x_test]) qc.initialize(label_train.tolist() if isinstance(label_train, np.ndarray) else label_train, [y_train]) qc.barrier() # SWAP test qc.h(y_test) qc.cswap(y_test, x_train, x_test) qc.h(y_test) qc.barrier() # Label integration qc.cx(y_train, y_test) qc.measure(y_test, c) return qc
[docs] def build_ensemble_circuit(X_data: np.ndarray, Y_data: np.ndarray, x_test: List[complex], n_swap: int = 1, d: int = 2, mode: str = "balanced", ensemble_method: Literal["swap", "random_unitary"] = "swap", barriers: bool = False) -> QuantumCircuit: """ Build quantum ensemble classifier circuit. Creates a quantum ensemble learning circuit using either fixed swap operations or random unitary transformations. Parameters ---------- X_data : np.ndarray, shape (n_samples, n_features) Training data points (normalized, n_features must be power of 2) Y_data : np.ndarray, shape (n_samples, 2) Training labels as one-hot encoded vectors x_test : List[complex] Test data point to classify (normalized) n_swap : int, optional Number of swap/unitary operations per control qubit (default: 1) d : int, optional Number of control qubits, creates 2^d ensemble members (default: 2) mode : str, optional Sampling strategy: "balanced", "unbalanced", or "pair_sample" (default: "balanced") ensemble_method : {"swap", "random_unitary"}, optional Method for ensemble construction: - "swap": Fixed controlled-SWAP operations (faster, deterministic) - "random_unitary": Haar-random unitaries (more general, slower) (default: "swap") barriers : bool, optional Add barrier gates for visualization (default: False) Returns ------- QuantumCircuit Quantum circuit implementing the ensemble classifier Notes ----- Total qubits: d + 2*n_samples*log2(n_features) + n_samples + 1 (+ 1 for random_unitary) """ def cswap_obs(c, a, b): """Controlled swap of observation qubits between indices a and b.""" qubit_indices_a = train_qubit_map[a] qubit_indices_b = train_qubit_map[b] for (i, j) in zip(qubit_indices_a, qubit_indices_b): qc.cswap(control[c], data[i], data[j]) def cswap_cosine(a): """Controlled swap for cosine similarity measurement.""" qubit_indices_a = train_qubit_map[a] for (j, i) in enumerate(qubit_indices_a): qc.cswap(label_test, data[i], data_test[j]) def cswap_labels(c, a, b): """Controlled swap of label qubits.""" qc.cswap(c, labels[a], labels[b]) n_obs = len(X_data) qubits_per = int(np.log2(X_data.shape[1])) n_obs_qubits = qubits_per * n_obs n_test_qubits = qubits_per control = QuantumRegister(d, 'control') data = QuantumRegister(n_obs_qubits, 'x') labels = QuantumRegister(n_obs, 'y') data_test = QuantumRegister(n_test_qubits, 'test_data') label_test = QuantumRegister(1, 'test_label') c = ClassicalRegister(1) # For random unitary method, add prediction qubit if ensemble_method == "random_unitary": prediction = QuantumRegister(1, 'pred_qubit') qc = QuantumCircuit(control, data, labels, data_test, prediction, label_test, c) else: qc = QuantumCircuit(control, data, labels, data_test, label_test, c) # Initialize test data qc.initialize(x_test, data_test) # Initialize training data and labels train_qubit_map = {} for index in range(n_obs): indices = list(range(index * qubits_per, (index + 1) * qubits_per)) x_init = X_data[index].tolist() if isinstance(X_data[index], np.ndarray) else X_data[index] y_init = Y_data[index].tolist() if isinstance(Y_data[index], np.ndarray) else Y_data[index] qc.initialize(x_init, [data[indices]]) qc.initialize(y_init, [labels[index]]) train_qubit_map[index] = indices # Create superposition on control qubits for i in range(d): qc.h(control[i]) if barriers: qc.barrier() # Apply ensemble operations based on method if ensemble_method == "random_unitary": # Sample random unitaries from Haar measure unitary_sampler = scipy.stats.unitary_group(2**(n_obs_qubits + n_obs)) if mode == 'balanced': for i in range(d - 1): for _ in range(n_swap): U = unitary_sampler.rvs() U1 = UnitaryGate(U) CU1 = U1.control(1) qc.append(CU1, [control[i]] + [x for x in data] + [x for x in labels]) if barriers: qc.barrier() qc.x(control[i]) if barriers: qc.barrier() U = unitary_sampler.rvs() U2 = UnitaryGate(U) CU2 = U2.control(1) qc.append(CU2, [control[i]] + [x for x in data] + [x for x in labels]) if barriers: qc.barrier() # Final swap for balanced mode U = np.random.choice(range(int(n_obs / 2)), 1, replace=False) U = np.insert(U, 1, n_obs - 1) d1, d2 = U[0], U[1] cswap_obs(d - 1, d1, d2) cswap_labels(d - 1, d1, d2) qc.x(control[d - 1]) else: # Fixed swap method if mode == 'balanced': for i in range(d - 1): for j in range(n_swap): # Swap within first class U = np.random.choice(range(int(n_obs / 2)), 2, replace=False) U_b = np.random.choice(range(len(train_qubit_map[U[0]])), 1)[0] d1 = train_qubit_map[U[0]][U_b] d2 = train_qubit_map[U[1]][U_b] qc.cswap(control[i], data[d1], data[d2]) qc.cswap(control[i], labels[int(U[0])], labels[int(U[1])]) # Swap within second class U = np.random.choice(range(int(n_obs / 2), n_obs), 2, replace=False) U_b = np.random.choice(range(len(train_qubit_map[U[0]])), 1)[0] d1 = train_qubit_map[U[0]][U_b] d2 = train_qubit_map[U[1]][U_b] qc.cswap(control[i], data[d1], data[d2]) qc.cswap(control[i], labels[int(U[0])], labels[int(U[1])]) qc.x(control[i]) for j in range(n_swap): U = np.random.choice(range(int(n_obs / 2)), 2, replace=False) U_b = np.random.choice(range(len(train_qubit_map[U[0]])), 1)[0] d1 = train_qubit_map[U[0]][U_b] d2 = train_qubit_map[U[1]][U_b] qc.cswap(control[i], data[d1], data[d2]) qc.cswap(control[i], labels[int(U[0])], labels[int(U[1])]) U = np.random.choice(range(int(n_obs / 2), n_obs), 2, replace=False) U_b = np.random.choice(range(len(train_qubit_map[U[0]])), 1)[0] d1 = train_qubit_map[U[0]][U_b] d2 = train_qubit_map[U[1]][U_b] qc.cswap(control[i], data[d1], data[d2]) qc.cswap(control[i], labels[int(U[0])], labels[int(U[1])]) if barriers: qc.barrier() # Final swap for balanced mode qc.x(control[d - 1]) U = np.random.choice(range(int(n_obs / 2)), 1, replace=False) U = np.insert(U, 1, n_obs - 1) U_b = np.random.choice(range(len(train_qubit_map[U[0]])), 1)[0] d1 = train_qubit_map[U[0]][U_b] d2 = train_qubit_map[U[1]][U_b] qc.cswap(control[d - 1], data[d1], data[d2]) qc.cswap(control[d - 1], labels[int(U[0])], labels[int(U[1])]) elif mode == "unbalanced": for i in range(d): for j in range(n_swap): U = np.random.choice(range(n_obs), 2, replace=False) U_b = np.random.choice(range(len(train_qubit_map[U[0]])), 1)[0] d1 = train_qubit_map[U[0]][U_b] d2 = train_qubit_map[U[1]][U_b] qc.cswap(control[i], data[d1], data[d2]) qc.cswap(control[i], labels[int(U[0])], labels[int(U[1])]) qc.x(control[i]) for j in range(n_swap): U = np.random.choice(range(n_obs), 2, replace=False) U_b = np.random.choice(range(len(train_qubit_map[U[0]])), 1)[0] d1 = train_qubit_map[U[0]][U_b] d2 = train_qubit_map[U[1]][U_b] qc.cswap(control[i], data[d1], data[d2]) qc.cswap(control[i], labels[int(U[0])], labels[int(U[1])]) elif mode == "pair_sample": for i in range(d): for j in range(n_swap): pairs = np.random.choice(range(n_obs), n_obs, replace=False) for U in pairs.reshape(int(len(pairs) / 2), 2): U_b = np.random.choice(range(len(train_qubit_map[U[0]])), 1)[0] d1 = train_qubit_map[U[0]][U_b] d2 = train_qubit_map[U[1]][U_b] qc.cswap(control[i], data[d1], data[d2]) qc.cswap(control[i], labels[int(U[0])], labels[int(U[1])]) qc.x(control[i]) if barriers: qc.barrier() for j in range(n_swap): pairs = np.random.choice(range(n_obs), n_obs, replace=False) for U in pairs.reshape(int(len(pairs) / 2), 2): U_b = np.random.choice(range(len(train_qubit_map[U[0]])), 1)[0] d1 = train_qubit_map[U[0]][U_b] d2 = train_qubit_map[U[1]][U_b] qc.cswap(control[i], data[d1], data[d2]) qc.cswap(control[i], labels[int(U[0])], labels[int(U[1])]) if barriers: qc.barrier() if barriers: qc.barrier() # Final classification step ix_cls = n_obs - 1 qc.h(label_test) if barriers: qc.barrier() cswap_cosine(ix_cls) if barriers: qc.barrier() qc.h(label_test) qc.cx(labels[ix_cls], label_test) qc.measure(label_test, c) return qc
[docs] def compute_qensemble(X_train: np.ndarray, X_test: np.ndarray, y_train: np.ndarray, y_test: np.ndarray, args: dict, model: str = 'QEnsemble', data_key: str = '', n_train: int = 4, n_swap: int = 1, d: int = 2, mode: str = "balanced", ensemble_method: Literal["swap", "random_unitary"] = "swap", n_shots: int = 8192, seed: int = 123, device: str = 'CPU', verbose: bool = False): """ Compute quantum ensemble classifier predictions. This function implements a quantum ensemble learning algorithm using either fixed swap operations or random unitary transformations to create superpositions of different training data arrangements. Parameters ---------- X_train : np.ndarray Training feature set X_test : np.ndarray Testing feature set y_train : np.ndarray Training labels y_test : np.ndarray Testing labels args : dict Dictionary containing arguments for backend and settings model : str, optional Model type (default: 'QEnsemble') data_key : str, optional Key for the dataset (default: '') n_train : int, optional Number of training samples to use (must be even, default: 4) n_swap : int, optional Number of swap/unitary operations per control qubit (default: 1) d : int, optional Number of control qubits, creates 2^d ensemble members (default: 2) mode : str, optional Sampling strategy: "balanced", "unbalanced", or "pair_sample" (default: "balanced") ensemble_method : {"swap", "random_unitary"}, optional Method for ensemble construction: - "swap": Fixed controlled-SWAP operations (faster, deterministic) - "random_unitary": Haar-random unitaries (more general, slower) (default: "swap") n_shots : int, optional Number of measurement shots (default: 8192) seed : int, optional Random seed for reproducibility (default: 123) device : str, optional Device type: 'CPU' or 'GPU' (default: 'CPU') verbose : bool, optional Print additional information (default: False) Returns ------- dict Dictionary containing evaluation results including accuracy, runtime, model parameters, and other relevant metrics Examples -------- >>> from qbiocode.learning import compute_qensemble >>> # Fixed swap ensemble (standard) >>> results = compute_qensemble(X_train, X_test, y_train, y_test, args, ... n_train=4, d=2, ensemble_method="swap") >>> # Random unitary ensemble (advanced) >>> results = compute_qensemble(X_train, X_test, y_train, y_test, args, ... n_train=4, d=2, ensemble_method="random_unitary") """ beg_time = time.time() if verbose: method_name = "Random Unitary" if ensemble_method == "random_unitary" else "Fixed Swap" print(f"Running Quantum Ensemble Classifier ({method_name})") print(f"Training samples: {n_train}, Ensemble depth (d): {d}, Operations: {n_swap}") print(f"Mode: {mode}, Shots: {n_shots}") # Prepare training subset X_data, Y_data = prepare_training_set(X_train, y_train, n=n_train, seed=seed) # Make predictions for each test sample predictions = [] qc = None # Initialize to avoid unbound variable for x_test in X_test: x_test_norm = normalize_data(x_test) # Build and execute circuit qc = build_ensemble_circuit(X_data, Y_data, x_test_norm, n_swap=n_swap, d=d, mode=mode, ensemble_method=ensemble_method) if qc.num_qubits > 36: raise ValueError(f"Circuit has {qc.num_qubits} qubits, exceeds simulation limit of 36") counts = execute_circuit(qc, n_shots=n_shots, device=device) probs = retrieve_probabilities(counts) predictions.append(probs) # Convert probabilities to class predictions y_predicted = np.array([1 if p[1] > p[0] else 0 for p in predictions]) # Model parameters model_params = { 'n_train': n_train, 'n_swap': n_swap, 'd': d, 'mode': mode, 'ensemble_method': ensemble_method, 'n_shots': n_shots, 'seed': seed, 'n_qubits': qc.num_qubits if qc is not None else 0, 'circuit_depth': qc.depth() if qc is not None else 0 } return modeleval(y_test, y_predicted, beg_time, model_params, args, model=model, verbose=verbose)