"""Top-level package for QNET-QSD."""
__version__ = '0.1.0'
import re
import random
import os
import logging
import struct
import shlex
from textwrap import dedent
from collections import OrderedDict
from functools import partial
import subprocess as sp
from qnet.algebra.scalar_types import SCALAR_TYPES
from qnet.algebra.abstract_algebra import (
Operation, Expression, set_union, _scalar_free_symbols)
from qnet.algebra.hilbert_space_algebra import TrivialSpace, BasisNotSetError
from qnet.algebra.circuit_algebra import Circuit
from qnet.algebra.state_algebra import (
Ket, LocalKet, BasisKet, CoherentStateKet, TensorKet,
ScalarTimesKet, KetPlus
)
from qnet.algebra.operator_algebra import (
IdentityOperator, Create, Destroy, LocalOperator,
LocalSigma, ScalarTimesOperator, OperatorPlus, OperatorTimes)
from qnet.printing._ascii import QnetAsciiDefaultPrinter
from trajectorydata import TrajectoryData
try:
from sympy.printing.ccode import C89CodePrinter as CCodePrinter
except ImportError:
from sympy.printing.ccode import CCodePrinter
# max unsigned int in C/C++ when compiled the same way as python
UNSIGNED_MAXINT = 2 ** (struct.Struct('I').size * 8 - 1) - 1
[docs]class QSDCCodePrinter(CCodePrinter):
"""A printer for converting SymPy expressions to C++ code, while taking
into account pre-defined variable names for symbols"""
def __init__(self, settings={}):
self._default_settings['user_symbols'] = {}
super(QSDCCodePrinter, self).__init__(settings=settings)
self.known_symbols = dict(settings.get('user_symbols'))
def _print_Symbol(self, expr):
if expr in self.known_symbols:
return self.known_symbols[expr]
else:
return super()._print_Symbol(expr)
[docs]def local_ops(expr):
"""Given a symbolic expression, extract the set of "atomic" operators
(instances of :class:`~qnet.algebra.operator_algebra.Operator`) occurring
in that expression. The set is "atomic" in the sense that the
operators are not algebraic combinations of other operators.
"""
if isinstance(expr, SCALAR_TYPES + (str,)):
return set()
elif isinstance(expr, (LocalOperator, IdentityOperator.__class__)):
return set([expr])
elif isinstance(expr, Circuit):
slh = expr.toSLH()
Ls = slh.L.matrix.ravel().tolist()
H = slh.H
return set_union(*tuple(map(local_ops, Ls))) | local_ops(H)
elif isinstance(expr, Expression):
return set_union(*tuple(map(local_ops, expr.args)))
else:
raise TypeError(str(expr))
[docs]def find_kets(expr, cls=LocalKet):
"""Given a :class:`~qnet.algebra.state_algebra.Ket` instance,
return the set of :class:`~qnet.algebra.state_algebra.LocalKet`
instances contained in it.
"""
if isinstance(expr, SCALAR_TYPES + (str,)):
return set()
elif isinstance(expr, cls):
return set([expr])
elif isinstance(expr, Expression):
finder = partial(find_kets, cls=cls)
return set_union(*tuple(map(finder, expr.args)))
else:
raise TypeError(str(expr.__class__.__name__))
[docs]class QSDOperator(object):
"""Encapsulation of a QSD (symbolic) Operator, containing all information
required to instantiate that operator and to use it in C++ code
expressions.
All arguments set the corresponding properties.
Examples:
>>> A0 = QSDOperator('AnnihilationOperator', 'A0', '(0)')
>>> Ad0 = QSDOperator('Operator', 'Ad0', '= A0.hc()')
"""
known_types = [
'AnnihilationOperator', 'FieldTransitionOperator', 'IdentityOperator',
'Operator']
def __init__(self, qsd_type, name, instantiator):
self._type = None
self._name = None
self._instantiator = None
self.qsd_type = qsd_type.strip()
self.name = name.strip()
self.instantiator = instantiator.strip()
def __key(self):
return (self._type, self._name, self._instantiator)
def __eq__(self, other):
return self.__key() == other.__key()
def __hash__(self):
return hash(self.__key())
def __repr__(self):
return "{cls}{args}".format(cls=self.__class__.__name__,
args=str(self.__key()))
@property
def qsd_type(self):
"""QSD object type, i.e., name of the C++ class. See
:attr:`known_types` class attribute for allowed type names
"""
return self._type
@qsd_type.setter
def qsd_type(self, value):
if value not in self.known_types:
raise ValueError(
"Type '%s' must be one of %s" % (value, self.known_types))
self._type = value
@property
def name(self):
"""The name of the operator object. Must be a valid C++ variable
name.
"""
return self._name
@name.setter
def name(self, value):
if not re.match(r'\w[\w\d]+', value):
raise ValueError("Name '%s' is not a valid C++ variable name"
% value)
self._name = value
@property
def instantiator(self):
"""String that instantiates the operator object. This
must either be the constructor arguments of the operator's QSD class,
or a C++ expression (starting with an equal sign) that initializes the
object
"""
# strip out leading space for ' = ..'
return self._instantiator.strip()
@instantiator.setter
def instantiator(self, value):
if value[0] not in ['=', '(']:
raise ValueError(("Instantiator '%s' does not start with '=' "
"(assignement instantiation) or '"
"(' (constructor instantiation)") % value)
if (value.startswith('(') and not value.endswith(')')):
raise ValueError("Instantiator '%s' does not end with ')'" % value)
if value.startswith('='):
# ensure that '=' is surrounded by spaces
value = ' = ' + value[1:].strip()
self._instantiator = value
@property
def instantiation(self):
"""Complete line of C++ code that instantiates the operator
Example:
>>> A0 = QSDOperator('AnnihilationOperator', 'A0', '(0)')
>>> print(A0.instantiation)
AnnihilationOperator A0(0);
"""
return '{_type} {_name}{_instantiator};'.format(**self.__dict__)
def __len__(self):
return 3
[docs] def __iter__(self):
"""Split :obj:`QSDOperator` into a tuple.
Example:
>>> A0 = QSDOperator('AnnihilationOperator', 'A0', '(0)')
>>> qsd_type, name, instantiator = A0
"""
return iter((self.qsd_type, self.name, self.instantiator))
[docs] def __str__(self):
"""The string representation of an operator is simply its name
Example:
>>> A0 = QSDOperator('AnnihilationOperator', 'A0', '(0)')
>>> assert(str(A0) == str(A0.name))
"""
return str(self.name)
[docs]class QSDCodeGenError(Exception):
"""Exception raised for missing data in a :obj:`QSDCodeGen` instance"""
pass
[docs]class QSDCodeGen(object):
"""Class that allows to generate a QSD program for QNET expressions, and
to run the program to (accumulative) collect expectation values for
observables
Parameters:
circuit (:obj:`~qnet.algebra.circuit_algebra.SLH`): The circuit to be
simulated via QSD.
num_vals (dict of :obj:`~sympy.core.symbol.Symbol` to float)): Numeric
value for any symbol occurring in the `circuit`, or any
operator/state that may be added later on.
time_symbol (None or :obj:`~sympy.core.symbol.Symbol`): symbol to
denote the time dependence in the Hamiltonian (usually `t`). If
None, the Hamiltonian is time-independent.
Attributes:
circuit (:class:`~qnet.algebra.circuit_algebra.SLH`): see `circuit`
parameter
time_symbol (None or :obj:`~sympy.core.symbol.Symbol`): see
`time_symbol` parameter
syms (set of :obj:`~sympy.core.symbol.Symbol`): The set of symbols used
either in the circuit, any of the observables, or the initial
state, excluding `time_symbol`
num_vals (dict of :obj:`~sympy.core.symbol.Symbol` to float)): Map of
symbols to numeric value. Must specify a value for any symbol in
`syms`.
traj_data (:obj:`trajectorydata.TrajectoryData`): The
accumulated trajectory data. Every time the :meth:`run`,
respectively the :meth:`run_delayed` method is
called, the resulting trajectory data is incorporated. Thus, by
repeatedly calling :meth:`run` (followed by :meth:`run_delayed` if
``delay=True``), an arbitrary number of trajectories
may be accumulated in `traj_data`.
"""
known_steppers = ['Order4Step', 'AdaptiveStep', 'AdaptiveJump',
'AdaptiveOrthoJump']
_template = dedent(r'''
#define _USE_MATH_DEFINES
#include <cmath>
#include "Complex.h"
#include "ACG.h"
#include "CmplxRan.h"
#include "State.h"
#include "Operator.h"
#include "FieldOp.h"
#include "Traject.h"
{PARAMETERS}
{FUNCTIONS}
int main(int argc, char* argv[])
{{
unsigned int rndSeed;
if (argc != 2){{
std::cout << "Usage: " << argv[0] << " SEED" << std::endl;
std::exit(1);
}} else {{
if(sscanf(argv[1], "%u", &rndSeed) != 1){{
std::cout << "ERROR: Could not read SEED" << std::endl;
std::exit(1);
}} else {{
std::cout << "Using rnd seed: " << rndSeed << std::endl;
}}
}}
// Primary Operators
{OPERATORBASIS}
// Hamiltonian
{HAMILTONIAN}
// Lindblad operators
{LINDBLADS}
// Observables
{OBSERVABLES}
// Initial state
{INITIAL_STATE}
// Trajectory
{TRAJECTORY}
}}''').strip()
_max_op_name_length = 16
_lib_qsd = 'libqsd.a' # expected name of qsd library
_link_qsd = '-lqsd' # compiler option to link qsd
_sort_str = QnetAsciiDefaultPrinter().doprint
def __init__(self, circuit, num_vals=None, time_symbol=None):
self.circuit = circuit.toSLH()
self.time_symbol = time_symbol
self.num_vals = {}
self.traj_data = None
self._psi_initial = None
self._traj_params = {}
self._moving_params = {}
# Set of sympy.core.symbol.Symbol instances
self.syms = set(circuit.all_symbols())
self.syms.discard(self.time_symbol)
# Mapping symbol => variable name (sanitized)
self._var_names = {}
if self.time_symbol is not None:
# it is important to register the time symbol before any other
# symbols, to detect possible name clashes
self._var_names[self.time_symbol] = 't'
self._update_var_names()
# The add_observable and set_trajectories methods may later extend syms
# and _var_names
# Set of qnet.algebra.operator_algebra.Operator, all "atomic"
# operators required in the code generation
self._local_ops = local_ops(self.circuit)
# The add_observable and set_trajectories methods may later extend this
# set
self._full_space = self.circuit.space
self._local_spaces = self._full_space.local_factors
self._hilbert_space_index = {
space: index for (index, space) in enumerate(self._local_spaces)}
# Dict name => tuple(qnet.algebra.operator_algebra.Operator, outfile)
self._observables = OrderedDict()
# Managed via the add_observable method
# Mapping QNET Operator => QSDOperator for every operator in
# self._local_ops. This is kept as an ordered dictionary, such that the
# instantiation of an operator may refer to a previously defined
# operator
self._qsd_ops = OrderedDict()
self._update_qsd_ops(self._local_ops)
# The add_observable and set_trajectories methods may later extend this
# mapping
# Mapping QNET Ket => QSD state name (str) for "atomic" states
# (instances of LocalKet and TensorKet)
self._qsd_states = {}
# This is set in the _define_atomic_kets method
# The following are set by the compile method to allow for delayed (or
# remote) compilation. These are stored in "unexpanded" form, i.e.
# possibly including environment variables. These will only be expanded
# by the compilation_worker, possibly on a remote system
self._executable = None # name of the executable
self._path = None # folder for executable (unexpanded)
self._compile_cmd = None # list of command arguments (unexpanded)
self._keep_cc = None # delete C++ file after compilation?
# _executable will remain None until the compile method has finished
# without error. Thus, only _executable should be used to check whether
# the compile method has been called.
if num_vals is not None:
self.num_vals.update(num_vals)
# when the `run` method is called with `delay=True`, the `kwargs`
# dictionary is appended to the following list. A call to `run_delayed`
# may then process the whole list in parallel
self._delayed_runs_kwargs = []
# We also cache the 'seed' of each kwargs in _delayed_runs_kwargs
self._delayed_seeds = []
# for any time-dependent coefficient, we keep
# coeff => (time-function-name, time-function-placehold, is_real)
# in a dictionary
self._tfuncs = {}
@property
def observables(self):
"""Iterator over all defined observables (instances of
:obj:`~qnet.algebra.operator_algebra.Operator`)
"""
return iter([op for (op, fn) in self._observables.values()])
@property
def observable_names(self):
"""Iterator of all defined observable names (str)"""
return iter(self._observables.keys())
@property
def compile_cmd(self):
"""Command to be used for compilation (after :meth:`compile` method has
been called). Environment variables and '~' are not expanded"""
if self._executable is None:
return ''
else:
return _cmd_list_to_str(self._compile_cmd)
[docs] def get_observable(self, name):
"""Return the observable for the given name
(instance of :obj:`~qnet.algebra.operator_algebra.Operator`), according
to the mapping defined by :meth:`add_observable`"""
return self._observables[name][0]
def _update_qsd_ops(self, operators):
"""Update :attr:`self._qsd_ops` to that every operator in operators is
mapped to an appropriate :obj:`QSDOperator`. The names of the
operators are chosen automatically based on the operator type and
the index of the Hilbert space they act in. For a Hilbert space index
``k``, the operators names are chosen as follows::
IdentityOperator => Id
Create => A{k}
Destroy => Ad{k}
LocalSigma |i><j| => S{k}_{i}_{j}
Arguments:
operators (iterable of :obj:`~qnet.operator_algebra.Operator`):
list or set of operators for which to define :obj:`QSDOperator`
instances. These operators must be "atomic", i.e. they must
not be an algebraic combination of other operators. They must
be in the Hilbert space of the circuit (otherwise, a ValueError
is raised), and their Hilbert-space must be (a subspace of) the
circuit Hilbert space (otherwise, a :exc:`BasisNotSetError` is
raised)
"""
self._qsd_ops[IdentityOperator] = QSDOperator(
qsd_type='Operator',
name="Id",
instantiator='= '+'*'.join([
"Id{k}".format(k=k)
for k in range(len(self._hilbert_space_index))]))
# In order to achieve stable output, we go through the operators in an
# arbitrary, but well-defined order (sorting them according to their
# string representation)
for op in sorted(operators, key=self._sort_str):
if not op.space.is_tensor_factor_of(self._full_space):
raise ValueError(("Operator '%s' is not in the circuit's "
"Hilbert space") % str(op))
if op.space is not TrivialSpace:
if not op.space.has_basis:
raise BasisNotSetError(
"The Hilbert space of the operator %s has no defined "
"basis" % op)
if isinstance(op, IdentityOperator.__class__):
continue
elif isinstance(op, (Create, Destroy)):
a = Destroy(hs=op.space)
k = self._hilbert_space_index[op.space]
self._qsd_ops[a] = QSDOperator(
qsd_type='AnnihilationOperator',
name="A{k}".format(k=k),
instantiator=('(%d)' % k))
ad = a.dag()
self._qsd_ops[ad] = QSDOperator(
qsd_type='Operator',
name="Ad{k}".format(k=k),
instantiator=('= A{k}.hc()'.format(k=k)))
elif isinstance(op, LocalSigma):
k = self._hilbert_space_index[op.space]
i = op.index_j
j = op.index_k
self._qsd_ops[op] = QSDOperator(
qsd_type='FieldTransitionOperator',
name="S{k}_{i}_{j}".format(k=k, i=i, j=j),
instantiator='({ijk})'.format(
ijk=','.join([str(n) for n in (i, j, k)])))
else:
raise TypeError(str(op))
def _update_var_names(self):
"""Ensure that for every symbol, there is a var name in the cache"""
used_vars = set(self._var_names.values())
for sym in self.syms:
if sym not in self._var_names:
var = sanitize_varname(str(sym))
if var in used_vars:
raise ValueError("Cannot generate a unique variable name "
"for symbol '%s'" % sym)
else:
self._var_names[sym] = var
used_vars.add(var)
[docs] def add_observable(self, op, name=None):
"""Register an operator as an observable, together with a name that
will be used in the header of the table of expectation values, and on
which the name of the QSD output files will be based.
Arguments:
op (:obj:`~qnet.algebra.operator_algebra.Operator`): Observable
(does not need to be Hermitian)
name (str or ``None``): Name of of the operator, to be used in the
header of the output table. If ``None``, ``str(op)`` is used.
Raises:
ValueError: if `name` is invalid or too long, or no unique
filename can be generated from `name`
"""
logger = logging.getLogger(__name__)
if name is None:
name = str(op).strip()
if not TrajectoryData._rx['op_name'].match(name):
raise ValueError(("Operator name '%s' contains invalid "
"characters") % name)
if len(name) > self._max_op_name_length:
raise ValueError("Operator name '%s' is longer than limit %d"
% (name, self._max_op_name_length))
if name in self._observables:
logger.warn("Overwriting existing operator '%s'", name)
# It is necessary to delete the observable, otherwise the check for
# unique filenames would be tripped
del self._observables[name]
filename = sanitize_filename(name)
if (len(filename) == 0):
raise ValueError("Cannot generate filename for operator "
"%s. You must use a different name" % name)
filename = filename + '.out'
if filename in [fn for (__, fn) in self._observables.values()]:
raise ValueError("Cannot generate unique filename for operator "
"%s. You must use a different name" % name)
op_local_ops = local_ops(op)
self._local_ops.update(op_local_ops)
self._update_qsd_ops(op_local_ops)
self.syms.update(op.all_symbols())
self.syms.discard(self.time_symbol)
self._update_var_names()
self._observables[name] = (op, filename)
[docs] def set_moving_basis(self, move_dofs, delta=1e-4, width=2, move_eps=1e-4):
"""Activate the use of the moving basis, see Section 6 of the QSD
Paper.
Arguments:
move_dofs (int): degrees of freedom for which to use a moving basis
(the first 'move_dofs' freedoms are re-centered, and their
cutoffs adjusted.)
delta (float): probability threshold for the cutoff adjustment
width (int): size of the "pad" for the cutoff
move_eps (float): numerical accuracy with which to make the shift.
Cf. ``shiftAccuracy`` in QSD ``State::recenter`` method
Raises:
ValueError: if `move_dofs` is invalid
QSDCodeGenError: if requesting a moving basis for a degree of
freedom for which any operator is defined that cannot be
applied in the moving basis
"""
if move_dofs <= 0:
raise ValueError("move_dofs must be an integer >0")
elif move_dofs > len(self._local_spaces):
raise ValueError("move_dofs must not be larger than the number "
"of local Hilbert spaces")
else:
# Ensure that there are no LocalSigma operators for any of the
# degrees of freedom that are part of the moving basis (LocalSigma
# is mapped to FieldTransitionOperator in QSD, which is
# incompatible with a moving basis)
for op in self._local_ops:
if isinstance(op, LocalSigma):
k = self._hilbert_space_index[op.space]
if k < move_dofs:
# '<', not '<=', because k counts from 0
raise QSDCodeGenError((
"A moving basis cannot be used for a degree of "
"freedom that has local transition operators. "
"Conflicting operator %s acts on Hilbert "
"space %d<%d") % (op, k, move_dofs))
self._moving_params['move_dofs'] = move_dofs
self._moving_params['delta'] = delta
if move_dofs <= 0:
raise ValueError("width must be an integer >0")
self._moving_params['width'] = width
self._moving_params['move_eps'] = move_eps
[docs] def set_trajectories(
self, psi_initial, stepper, dt, nt_plot_step, n_plot_steps,
n_trajectories, traj_save=10):
"""Set the parameters that control the trajectories from which a plot
of expectation values for the registered observables will be generated.
Arguments:
psi_initial (:obj:`~qnet.algebra.state_algebra.Ket`): The initial
state
stepper (str): Name of the QSD stepper that should handle
propagation of a single time step. See :attr:`known_steppers`
for allowed values
dt (float): The duration for a single propagation step. Note that
the plot of expectation values will generally be on a coarser
grid, as controlled by the ``set_plotting`` routine
nt_plot_step (int): Number of propagation steps per plot step. That
is, expectation values of the observables will be written out
every `nt_plot_step` propagation steps
n_plot_steps (int): Number of plot steps. The total number of
propagation steps for each trajectory will be ``nt_plot_step *
n_plot_steps``, and duration T of the entire trajectory will be
``dt * nt_plot_step * n_plot_steps``
n_trajectories (int): The number of trajectories over which to
average for getting the expectation values of the observables
traj_save (int): Number of trajectories to propagate before writing
the averaged expectation values of all observables to file.
This ensures that if the program is terminated before the
calculation of ``n_trajectories`` is complete, the lost data is
at most that of the last ``traj_save`` trajectories is lost. A
value of 0 indicates that the values are to be written out
only after completing all trajectories.
"""
self._psi_initial = psi_initial
if isinstance(psi_initial, Operation):
# all non-atomic instances of Ket are also instances of Operation
psi_local_ops = local_ops(psi_initial)
self._local_ops.update(psi_local_ops)
self._update_qsd_ops(psi_local_ops)
self.syms.update(psi_initial.all_symbols())
self.syms.discard(self.time_symbol)
self._update_var_names()
if stepper not in self.known_steppers:
raise ValueError(
"stepper '%s' must be one of %s"
% (stepper, self.known_steppers))
self._traj_params['stepper'] = stepper
self._traj_params['dt'] = dt
self._traj_params['nt_plot_step'] = nt_plot_step
self._traj_params['n_plot_steps'] = n_plot_steps
self._traj_params['n_trajectories'] = n_trajectories
self._traj_params['traj_save'] = traj_save
def _ordered_tensor_operands(self, state):
"""Return the operands of the given TensorKet instance ordered by their
Hilbert space (using `self._hilbert_space_index`)
This is necessary because in QNET, state carry a label for their
Hilbert space, while in QSD they do not. Instead, in a Tensor product
in QSD the order of the operands associates them with their Hilbert
space.
"""
def sort_key(ket):
return self._hilbert_space_index[ket.space]
return sorted(state.operands, key=sort_key)
def _define_atomic_kets(self, state, reset=True):
"""Find all "atomic" kets in the given state and register them in
self._qsd_states. Return an array of lines of C++ code that defines
the state in a QSD program. If reset is True, any existing entries in
self._qsd_states are discarded.
The "atomic" kets are instances of `LocalKet` or `TensorKet`, both of
which require to be defined with their own name in QSD code
"""
if not state.space == self._full_space:
raise QSDCodeGenError(("State %s is not in the Hilbert "
"space of the Hamiltonian") % state)
lines = []
if reset:
self._qsd_states = {}
for (prfx, kets) in [
('phiL', find_kets(state, cls=LocalKet)),
('phiT', find_kets(state, cls=TensorKet))
]:
for k, ket in enumerate(sorted(kets, key=self._sort_str)):
# We go through the states in an arbitrary, but well-defined
# order by sorting them according to str
name = prfx + str(k)
self._qsd_states[ket] = name
N = ket.space.dimension
if isinstance(ket, BasisKet):
n = ket.index
instantiation = '({N:d},{n:d},FIELD)'.format(N=N, n=n)
comment = ' // HS %d' \
% self._hilbert_space_index[ket.space]
elif isinstance(ket, CoherentStateKet):
alpha = ket.ampl
if alpha in self.syms:
alpha_name = self._var_names[alpha]
else:
try:
alpha = complex(ket.ampl)
except TypeError:
raise TypeError((
"CoherentStateKet amplitude %s is neither a "
"known symbol nor a complex number") % alpha)
alpha_name = name + '_alpha'
lines.append('Complex {alpha}({re:g},{im:g});'.format(
alpha=alpha_name, re=alpha.real,
im=alpha.imag))
instantiation = '({N:d},{alpha},FIELD)'.format(
N=N, alpha=alpha_name)
comment = ' // HS %d' \
% self._hilbert_space_index[ket.space]
elif isinstance(ket, TensorKet):
operands = [self._ket_str(operand) for operand
in self._ordered_tensor_operands(ket)]
lines.append(
"State {name}List[{n}] = {{{ketlist}}};"
.format(
name=name, n=len(operands),
ketlist=", ".join(operands)))
instantiation = '({n}, {name}List)'.format(
n=len(operands), name=name)
comment = ' // ' + " * ".join([
"HS %d" % self._hilbert_space_index[o.space]
for o in self._ordered_tensor_operands(ket)])
else:
raise TypeError(
"Cannot instantiate QSD state for type %s"
% str(type(ket)))
lines.append('State '+name+instantiation+';'+comment)
return lines
def _initial_state_lines(self, indent=2):
if not isinstance(self._psi_initial, Ket):
raise TypeError("Initial state must be a Ket instance")
lines = self._define_atomic_kets(self._psi_initial)
lines.append('')
lines.append('State psiIni = '+self._ket_str(self._psi_initial)+';')
lines.append('psiIni.normalize();')
return "\n".join(_indent(lines, indent))
def _trajectory_lines(self, indent=2):
try:
self._traj_params['stepper']
except KeyError:
raise QSDCodeGenError("No trajectories set up. Ensure that "
"'set_trajectories' method has been called")
lines = [
'ACG gen(rndSeed); // random number generator',
'ComplexNormal rndm(&gen); // Complex Gaussian random numbers',
'',
'double dt = {dt};',
'int dtsperStep = {nt_plot_step};',
'int nOfSteps = {n_plot_steps};',
'int nTrajSave = {traj_save};',
'int nTrajectory = {n_trajectories};',
'int ReadFile = 0;',
'',
'{stepper} stepper(psiIni, H, nL, L);',
'Trajectory traj(psiIni, dt, stepper, &rndm);',
'']
if len(self._moving_params) > 0:
lines.extend([
'int move = {move_dofs};',
'double delta = {delta};',
'int width = {width};',
'double moveEps = {move_eps};',
'',
'traj.sumExp(nOfOut, outlist, flist , dtsperStep, nOfSteps,',
' nTrajectory, nTrajSave, ReadFile, move,',
' delta, width, moveEps);'
])
else:
lines.extend([
'traj.sumExp(nOfOut, outlist, flist , dtsperStep, nOfSteps,',
' nTrajectory, nTrajSave, ReadFile);'
])
fmt_mapping = self._traj_params.copy()
fmt_mapping.update(self._moving_params)
rendered_lines = [line.format(**fmt_mapping) for line in lines]
return "\n".join(_indent(rendered_lines, indent))
[docs] def generate_code(self):
"""Return C++ program that corresponds to the circuit as a multiline
string"""
return self._template.format(
OPERATORBASIS=self._operator_basis_lines(),
PARAMETERS=self._parameters_lines(indent=0),
FUNCTIONS=self._function_lines(
ops=[self.circuit.H, ], indent=0),
HAMILTONIAN=self._hamiltonian_lines(),
LINDBLADS=self._lindblads_lines(),
OBSERVABLES=self._observables_lines(),
INITIAL_STATE=self._initial_state_lines(),
TRAJECTORY=self._trajectory_lines())
[docs] def write(self, outfile):
"""Write C++ program that corresponds to the circuit"""
with open(outfile, 'w') as out_fh:
out_fh.write(self.generate_code())
out_fh.write("\n")
[docs] def compile(
self, qsd_lib, qsd_headers, executable='qsd_run',
path='.', compiler='g++', compile_options='-O2', delay=False,
keep_cc=False, remote_apply=None):
"""Compile into an executable
Arguments:
qsd_lib (str): full path to the file ``libqsd.a`` containing the
statically compiled QSD library. May reference environment
variables the home directory ('~')
qsd_headers (str): path to the folder containing the QSD header
files. May reference environment variables the home directory
('~')
executable (str): name of executable to which the QSD program
should be compiled. Must consist only of letters, numbers,
dashes, and underscores only
path (str): The path to the folder where executable will be
generated. May reference environment variables the home
directory ('~')
compiler (str): compiler executable
compile_options (str): options to pass to the compiler
delay (bool): Deprecated, must be False
keep_cc (bool): If True, keep the C++ code from which the
executable was compiled. It will have the same name as the
executable, with an added '.cc' file extension.
remote_apply (callable or None): If not None,
``remote_apply(compilation_worker, kwargs)`` must call
:func:`compilation_worker` on any remote node.
Typically, this might point to the `apply` method of an
``ipyparallel`` View instance. The `remote_apply` argument
should only be given if :meth:`run_delayed` will be called with
an argument `map` that will push the calculation of a
trajectory to a remote node.
Raises:
ValueError: if `executable` name or `qsd_lib` are invalid
subprocess.CalledProcessError: if compilation fails
"""
if delay:
raise DeprecationWarning(
"`delay` will be removed in future versions")
logger = logging.getLogger(__name__)
executable = str(executable)
self._path = str(path)
self._keep_cc = keep_cc
if not re.match(r'^[\w-]{1,128}$', executable):
if len(executable) > 128:
raise ValueError("Executable name too long")
else:
raise ValueError("Invalid executable name '%s'" % executable)
self._compile_cmd = self._build_compile_cmd(
qsd_lib, qsd_headers, executable, self._path, compiler,
compile_options)
kwargs = {
'executable': executable, 'path': self._path,
'cc_code': self.generate_code(), 'keep_cc': self._keep_cc,
'cmd': self._compile_cmd}
if remote_apply is None:
if not os.path.isdir(_full_expand(qsd_headers)):
logger.warn("Header directory "+qsd_headers+" does not exist")
if not os.path.isfile(_full_expand(qsd_lib)):
logger.warn("File "+qsd_lib+" does not exist")
try:
compilation_worker(kwargs)
except sp.CalledProcessError as exc_info:
logger.error(
"command '{cmd:s}' failed with code {code:d}"
.format(
cmd=self._compile_cmd, code=int(exc_info.returncode)))
raise
else:
remote_apply(compilation_worker, kwargs)
# We set the executable only at the very end so that we can use it as
# an indicator whether the compile method is complete
self._executable = executable
def _build_compile_cmd(
self, qsd_lib, qsd_headers, executable, path, compiler,
compile_options):
# For debugging purposes, it can be useful to call
# _cmd_list_to_str(_build_compile_cmd(...))
# instead of the compile method
link_dir, libqsd_a = os.path.split(qsd_lib)
cc_file = executable + '.cc'
if not libqsd_a == self._lib_qsd:
raise ValueError("qsd_lib "+qsd_lib+" does not point to a "
"file of the name "+self._lib_qsd)
return (
[compiler, ] + shlex.split(compile_options) +
['-I%s' % qsd_headers, '-o', executable, cc_file] +
['-L%s' % link_dir, self._link_qsd])
[docs] def run(self, seed=None, workdir=None, keep=False, delay=False):
"""Run the QSD program. The :meth:`compile` method must have been
called before `run`. If :meth:`compile` was called with
``delay=True``, compile at this point and run the resulting program.
Otherwise, just run the existing program from the earlier compilation.
The resulting directory data is returned, and in addition the
`traj_data` attribute is updated to include the new trajectories (in
addition to any previous trajectories)
The `run` method may be called repeatedly to accumulate trajectories.
Arguments:
seed (int): Random number generator seed (unsigned integer), will
be passed to the executable as the only argument.
workdir (str or None): The directory in which to (temporarily)
create the output files. If None, a temporary directory will be
used. Otherwise, the `workdir` must exist. Environment
variables and '~' will be expanded.
keep (bool): If True, keep QSD output files inside `workdir`.
delay (bool): If True, schedule the run to be performed at a later
point in time, when the :meth:`run_delayed` routine is called.
Returns:
trajectorydata.TrajectoryData: Averaged data obtained
from the newly simulated trajectories only. None if `delay=True`.
Raises:
QSDCodeGenError: if :meth:`compile` was not called
OSError: if creating/removing files/folders fails
subprocess.CalledProcessError: if delayed compilation fails or
executable returns with non-zero exit code
ValueError: if seed is not unique
Note:
The only way to run multiple trajectories in parallel is by giving
``delay=True``. After preparing an arbitrary number of trajectories
by repeated calls to :meth:`run`. Then :meth:`run_delayed` must be
called with a `map` argument that supports parallel execution.
"""
if self._executable is None:
raise QSDCodeGenError("Call compile method first")
if self.traj_data is not None:
seed_in_record = (
(seed in self.traj_data.record_seeds) or
(seed in self._delayed_seeds))
if seed_in_record:
raise ValueError(
"Seed %d already in record or in delayed run" % seed)
if seed is None:
seed = random.randint(0, UNSIGNED_MAXINT)
# ensure we don't reuse an existing or schedules seed
seed_in_record = (
(seed in self.traj_data.record_seeds) or
(seed in self._delayed_seeds))
while seed_in_record:
seed = random.randint(0, UNSIGNED_MAXINT)
if delay:
while seed in self._delayed_seed:
seed = random.randint(0, UNSIGNED_MAXINT)
kwargs = {
'executable': self._executable, 'keep': keep,
'path': self._path, 'seed': seed, 'workdir': workdir,
'operators': OrderedDict(
[(name, fn) for (name, (__, fn))
in self._observables.items()]),
}
if delay:
self._delayed_runs_kwargs.append(kwargs)
self._delayed_seeds.append(kwargs['seed'])
else:
traj = qsd_run_worker(kwargs)
if self.traj_data is None:
self.traj_data = traj.copy()
else:
self.traj_data += traj
return traj
[docs] def run_delayed(self, map=map, n_procs_extend=1, _run_worker=None):
"""Execute all scheduled runs (see `delay` option in :meth:`run`
method), possibly in parallel.
Arguments:
map (callable): ``map(qsd_run_worker, list_of_kwargs)``
must be equivalent to
``[qsd_run_worker(kwargs) for kwargs in list_of_kwargs]``.
Defaults to the builtin `map` routine, which will process the
scheduled runs serially.
n_procs_extend (int): Number of local processes to use when
averaging over trajectories.
Raises:
TypeError: If `map` does not return a list of
:class:`~trajectory_data.TrajectoryData` instances.
Note:
Parallel execution is achieved by passing an appropriate `map`
routine. For example, ``map=multiprocessing.Pool(5).map`` would use
a local thread pool of 5 workers. Another alternative would be the
`map` method of an ``ipyparallel`` View. If (and only if) the View
connects remote IPython engines, :meth:`compile` must have been
called with an appropriate `remote_apply` argument that compiled
the QSD program on all of the remote engines.
"""
if _run_worker is None:
_run_worker = qsd_run_worker
trajs = []
try:
trajs = list(map(_run_worker, self._delayed_runs_kwargs))
self._delayed_runs_kwargs = []
self._delayed_seeds = []
if self.traj_data is None:
self.traj_data = trajs[0].copy()
if len(trajs) > 1:
self.traj_data.extend(*trajs[1:], n_procs=n_procs_extend)
else:
self.traj_data.extend(*trajs, n_procs=n_procs_extend)
except (IndexError, TypeError, AttributeError):
raise TypeError("`map ` does not return a list of TrajectoryData "
"instances.")
return trajs
def __str__(self):
return self.generate_code()
def _operator_basis_lines(self, indent=2):
"""Return a multiline string of C++ code that defines and initializes
all operators in the system"""
# QSD demands that we first define all "special" operators (instances
# of classes derived from PrimaryOperator) before an instance of the
# class Operator (algebraic combinations of other operators).
# Therefore, we collect the lines for these two cases separately.
special_op_lines = []
general_op_lines = []
for k in range(len(self._local_spaces)):
special_op_lines.append("IdentityOperator Id{k}({k});".format(k=k))
for op in self._qsd_ops:
# We assume that self._qsd_ops is an OrderedDict, so that
# instantiations may refer to earlier operators
line = self._qsd_ops[op].instantiation
if line.startswith("Operator "):
general_op_lines.append(line)
else:
special_op_lines.append(line)
return "\n".join(_indent(special_op_lines + general_op_lines, indent))
def _parameters_lines(self, indent=2):
"""Return a multiline string of C++ code that defines all numerical
constants"""
self._update_var_names() # should be superfluous, but just to be safe
lines = set() # sorting will happen at the very end
lines.add("Complex I(0.0,1.0);")
for s in list(self.syms):
var = self._var_names[s]
try:
val = self.num_vals[s]
except KeyError:
raise KeyError(("There is no value for symbol %s in "
"self.num_vals") % s)
if s.is_real is True:
if val.imag != 0:
raise ValueError(("Value %s for %s is complex, but "
"should be real") % (val, s))
lines.add("double {!s} = {:g};"
.format(var, val.real))
else:
lines.add("Complex {!s}({:g},{:g});"
.format(var, val.real, val.imag))
return "\n".join(_indent(sorted(lines), indent))
def _observables_lines(self, indent=2):
"""Return a multiline string of C++ code that defines all
observables"""
lines = []
n_of_out = len(self._observables)
lines.append('const int nOfOut = %d;' % n_of_out)
outlist_lines = []
outfiles = []
if len(self._observables) < 1:
raise QSDCodeGenError("Must register at least one observable")
for (observable, outfile) in self._observables.values():
outlist_lines.append(self._operator_str(observable))
outfiles.append(outfile)
lines.extend(
("Operator outlist[nOfOut] = {\n " + ",\n ".join(outlist_lines))
.split("\n"))
lines.append("};")
lines.append('char *flist[nOfOut] = {{{filenames}}};'
.format(filenames=", ".join(
[('"'+fn+'"') for fn in outfiles]
)))
lines.append(r'int pipe[4] = {1,2,3,4};')
return "\n".join(_indent(lines, indent))
def _operator_str(self, op):
"""For a given instance of ``qnet.algebra.operator_algebra.Operator``,
recursively generate the C++ expression that will instantiate the
operator.
"""
if isinstance(op, LocalOperator):
return str(self._qsd_ops[op])
elif isinstance(op, ScalarTimesOperator):
if op.coeff in self._tfuncs:
func_placeholder = self._tfuncs[op.coeff][1]
return "%s * (%s)" % (func_placeholder,
self._operator_str(op.term))
else:
return "(%s) * (%s)" % (self._scalar_str(op.coeff),
self._operator_str(op.term))
elif isinstance(op, OperatorPlus):
str_expr = " + ".join([self._operator_str(o) for o in op.operands])
return "(" + str_expr + ")"
elif isinstance(op, OperatorTimes):
str_expr = " * ".join([self._operator_str(o) for o in op.operands])
return "(" + str_expr + ")"
elif op is IdentityOperator:
return str(self._qsd_ops[op])
else:
raise TypeError(str(op))
def _ket_str(self, ket):
"""For a given instance of ``qnet.algebra.state_algebra.Ket``,
recursively generate the C++ expression that will instantiate the
state.
"""
if isinstance(ket, (LocalKet, TensorKet)):
return str(self._qsd_states[ket])
elif isinstance(ket, ScalarTimesKet):
return "({}) * ({})".format(
self._scalar_str(ket.coeff), self._ket_str(ket.term))
elif isinstance(ket, KetPlus):
return "({})".format(
" + ".join([self._ket_str(o) for o in ket.operands]))
else:
raise TypeError(str(ket))
def _scalar_str(self, sc, assign_to=None):
ccode = QSDCCodePrinter(settings={'user_symbols': self._var_names})
return ccode.doprint(sc, assign_to=assign_to)
def _hamiltonian_lines(self, indent=2):
H = self.circuit.H
return " "*indent + "Operator H = "+self._operator_str(H)+";"
def _lindblads_lines(self, indent=2):
lines = []
lines.append('const int nL = {nL};'.format(nL=self.circuit.cdim))
L_op_lines = []
for L_op in self.circuit.L.matrix.ravel():
L_op_lines.append(self._operator_str(L_op))
lines.extend(
("Operator L[nL]={\n " + ",\n ".join(L_op_lines) + "\n};")
.split("\n"))
return "\n".join(_indent(lines, indent))
def _function_lines(self, ops, indent=2):
if self.time_symbol is None:
return ''
func_lines = []
tfunc_counter = 0
for op in ops:
for coeff in _find_time_dependent_coeffs(op, self.time_symbol):
if coeff in self._tfuncs:
continue
is_real = coeff.is_real
if is_real:
func_type = 'double'
else:
func_type = 'Complex'
tfunc_counter += 1
func_name = "tfunc%d" % tfunc_counter
# choose a variable name for the time-dependent coefficient
u = "u%d" % tfunc_counter
func_placeholder = u
u_counter = 1
while func_placeholder in self.syms:
func_placeholder = "%s_%d" % (u, u_counter)
u_counter += 1
# remember that the _var_name for the time_symbol was set to
# 't' in the __init__ routine
func_lines.append("%s %s(double t)" % (func_type, func_name))
func_lines.append("{")
func_lines.append(" "+func_type+" "+func_placeholder+";")
func_lines.append(" "+self._scalar_str(
coeff, assign_to=func_placeholder))
func_lines.append(" return "+func_placeholder+";")
func_lines.append("}")
func_lines.append("")
self._tfuncs[coeff] = (func_name, func_placeholder, is_real)
if len(func_lines) > 0:
lines = ["", ] + func_lines + ["", ]
for coeff in self._tfuncs:
func_name, func_placeholder, is_real = self._tfuncs[coeff]
if is_real:
lines.append("RealFunction %s = %s;"
% (func_placeholder, func_name))
else:
lines.append("ComplexFunction %s = %s;"
% (func_placeholder, func_name))
else:
lines = []
return "\n".join(_indent(lines, indent))
def _find_time_dependent_coeffs(op, time_symbol):
if isinstance(op, ScalarTimesOperator):
if time_symbol in _scalar_free_symbols(op.coeff):
yield op.coeff
else:
try:
for operand in op.operands:
for coeff in _find_time_dependent_coeffs(operand, time_symbol):
yield coeff
except AttributeError:
# e.g. IdentityOperator has no attribute 'operands'
pass
def _full_expand(s):
if s is None:
return s
return os.path.expanduser(os.path.expandvars(s))
[docs]def expand_cmd(cmd):
"""Return a copy of the array `cmd`, where for each element of the `cmd`
array, environment variables and '~' are expanded"""
if isinstance(cmd, str):
raise TypeError("cmd must be a list")
cmd_expanded = []
for part in cmd:
cmd_expanded.append(_full_expand(part))
return cmd_expanded
[docs]def compilation_worker(kwargs, _runner=None):
"""Worker to perform compilation, suitable e.g. for being run on an
IPython cluster. All arguments are in the `kwargs` dictionary.
Keys:
executable (str): Name of the executable to be created. Nothing will be
expanded.
path (str): Path where the executable should be created, as absolute
path or relative to the current working directory. Environment
variables and '~' will be expanded.
cc_code (str): Multiline string that contains the entire C++ program to
be compiled
keep_cc (bool): Keep C++ file after compilation? It will have the
same name as the executable, with an added ``.cc`` file extension.
cmd (list of str): Command line arguments (see `args` in
`subprocess.check_output`). In each argument, environment
variables are expanded, and '~' is
expanded to ``$HOME``. It must meet the following requirements:
* the compiler (first argument) must be in the ``$PATH``
* Invocation of the command must compile a C++ file with the name
`executable`.cc in the current working directory to `exectuable`,
also in the current working directoy. It must *not* take into
account `path`. This is because the working directory for the
subprocess handling the command invocation will be set to `path`.
Thus, that is where the executable will be created.
Returns:
Absolute path of the compiled executable
Raises:
subprocess.CalledProcessError: if compilation fails
OSError: if creating/removing files/folder fails
"""
# we import subprocess locally, to make the routine completely
# self-contained. This is a requirement e.g. to be a worker on an IPython
# cluster. To still allow testing, we have the undocumented _runner
# parameter
import subprocess as sp
import os
if _runner is None:
_runner = sp.check_output
executable = str(kwargs['executable'])
path = _full_expand(str(kwargs['path']))
cc_code = str(kwargs['cc_code'])
keep_cc = kwargs['keep_cc']
cmd = expand_cmd(kwargs['cmd'])
cc_file = executable + '.cc'
try:
os.makedirs(path)
except OSError:
# Ignore existing directory
pass
with open(os.path.join(path, cc_file), 'w') as out_fh:
out_fh.write(cc_code)
out_fh.write("\n")
executable_abs = os.path.abspath(os.path.join(path, executable))
try:
_runner(cmd, stderr=sp.STDOUT, cwd=path)
finally:
if not keep_cc:
os.unlink(cc_file)
if os.path.isfile(executable_abs) and os.access(executable_abs, os.X_OK):
return executable_abs
else:
raise sp.CalledProcessError("Compilation did not create executable %s"
% executable_abs)
[docs]def qsd_run_worker(kwargs, _runner=None):
"""Worker to perform run of a previously compiled program (see
:func:`compilation_worker`), suitable e.g. for being run on an
IPython cluster. All arguments are in the `kwargs` dictionary.
Keys:
executable (str): Name of the executable to be run. Nothing will be
expanded. This should generally be only the name of the
executable, but it can also be a path relative to
``kwargs['path']``, or a (fully expanded) absolute path, in which
case ``kwargs['path']`` is ignored.
path (str): Path where the executable can be found, as absolute path or
relative to the current working directory. Environment variables
and '~' will be expanded.
seed (int): Seed (unsigned int) to be passed as argument to the
executable
operators(dict or OrderedDict of str to str)): Mapping of operator name
to filename, see `operators` parameter of
:meth:`~trajectory_data.TrajectoryData.from_qsd_data`
workdir (str or None): The working directory in which to execute the
executable (relative to the current working directory). The output
files defined in `operators` will be created in this folder. If
None, a temporary directory will be used. If `workdir` does not
exist yet, it will be created.
keep (bool): If True, keep the QSD output files. If False,
remove the output files as well as any parent folders that may have
been created alongside with `workdir`
Raises:
FileNotFoundError: if `executable` does not exist in `path`
Returns:
Expectation values and variances of the observables, from the newly
simulated trajectories only (instance of
:obj:`~qnet.misc.trajectory_data.TrajectoryData`)
"""
# imports *must* be local so that we can send this to an IPython engine
import subprocess as sp
import os
import shutil
import tempfile
if _runner is None:
_runner = sp.check_output
executable = str(kwargs['executable'])
path = os.path.abspath(_full_expand(str(kwargs['path'])))
seed = int(kwargs['seed'])
operators = kwargs['operators']
workdir = _full_expand(kwargs['workdir'])
if workdir is None:
workdir = tempfile.mkdtemp()
keep = kwargs['keep']
delete_folder = None
if not os.path.isdir(workdir):
# If keep is True, we want to remove not only the QSD output files, but
# also any folder that is newly created as part of workdir. Therefore,
# before creating workdir, we walk up the path to find the topmost
# nonexisting folder
folder = os.path.abspath(workdir)
while not os.path.isdir(folder):
delete_folder = folder
folder = os.path.abspath(os.path.join(folder, '..'))
try:
os.makedirs(workdir)
except OSError:
# This might happen sometimes when using multi-threading and
# another thread has created the directory since the "isdir" check
pass
local_executable = _full_expand(os.path.join(path, executable))
is_exe = lambda f: os.path.isfile(f) and os.access(f, os.X_OK)
if not is_exe(local_executable):
raise FileNotFoundError("No executable "+local_executable)
cmd = [local_executable, str(seed)]
_runner(cmd, stderr=sp.STDOUT, cwd=workdir)
traj = TrajectoryData.from_qsd_data(operators, seed, workdir=workdir)
if not keep:
for filename in operators.values():
os.unlink(os.path.join(workdir, filename))
if delete_folder is not None:
shutil.rmtree(delete_folder, ignore_errors=True)
return traj
[docs]def sanitize_name(name, allowed_letters, replacements):
"""Return a sanitized `name`, where all letters that occur as keys in
`replacements` are replaced by their corresponding values, and any letters
that do not match `allowed_letters` are dropped
Arguments:
name (str): string to be sanitized
allowed_letters (regex): compiled regular expression
that any allowed letter must match
replacement (dict of str to str): dictionary of mappings
Returns:
str: sanitized name
Example:
>>> sanitize_filename = partial(sanitize_name,
... allowed_letters=re.compile(r'[.a-zA-Z0-9_-]'),
... replacements={'^':'_', '+':'_', '*':'_', ' ':'_'})
>>> sanitize_filename.__doc__ = "Sanitize name to be used as a filename"
>>> sanitize_filename('\chi^{(1)}_1')
'chi_1_1'
"""
sanitized = ''
for letter in name:
if letter in replacements:
letter = replacements[letter]
if allowed_letters.match(letter):
sanitized += letter
return sanitized
def _indent(lines, indent=2):
indented_lines = []
for line in lines:
if len(line) > 0:
indented_lines.append(" "*indent + line)
else:
indented_lines.append(line)
return indented_lines
def _cmd_list_to_str(cmd_list):
result = ''
for part in cmd_list:
part = part.replace('"', '\\"')
if " " in part:
result += ' "%s"' % part
else:
result += ' %s' % part
return result.strip()
sanitize_filename = partial(
sanitize_name, allowed_letters=re.compile(r'[.a-zA-Z0-9_-]'),
replacements={'^': '_', '+': '_', '*': '_', ' ': '_'})
sanitize_filename.__doc__ = "Sanitize name to be used as a filename"
sanitize_varname = partial(
sanitize_name, allowed_letters=re.compile(r'[a-zA-Z0-9_]'),
replacements={'^': '_', '+': '_', '*': '_', ' ': '_', '-': '_', '.': '_'})
sanitize_filename.__doc__ = "Sanitize name to be used as a C++ variable name"