"""This module implements an ALARA[1]-like chain-based transmutation solver.
[1] Wilson, P. P. H. "ALARA: Analytic Laplacian Adaptive Radioactivity
Analysis," a Ph.D. Dissertation, University of Wisconsin, Madison, WI, 1999.
"""
from __future__ import division
from pyne.utils import QA_warn
import numpy as np
from scipy import linalg
#from scipy import sparse # <-- SPARSE
from pyne import utils
from pyne import data
from pyne import rxname
from pyne import nucname
from pyne import nuc_data
from pyne.material import Material, from_atom_frac
from pyne.xs.data_source import NullDataSource, EAFDataSource
from pyne.xs.cache import XSCache
from pyne.xs.channels import sigma_a
QA_warn(__name__)
[docs]class Transmuter(object):
"""A class for transmuting materials using an ALARA-like chain solver."""
def __init__(self, t=0.0, phi=0.0, temp=300.0, tol=1e-7, rxs=None, log=None,
*args, **kwargs):
"""Parameters
----------
t : float
Transmutations time [sec].
phi : float or array of floats
Neutron flux vector [n/cm^2/sec]. Currently this must either be
a scalar or match the group structure of EAF.
temp : float, optional
Temperature [K] of material, defaults to 300.0.
tol : float
Tolerance level for chain truncation.
rxs : set of ints or strs
Reaction ids or names to use in transmutation that produce well-defined
children. This set should thus not include fission. If None, then the
reactions from EAF are used.
log : file-like or None
The log file object should be written. A None imples the log is
not desired.
args : tuple, optional
Other arguments ignored for compatibility with other Transmuters.
kwargs : dict, optional
Other keyword arguments ignored for compatibility with other Transmuters.
"""
eafds = EAFDataSource()
eafds.load(temp=temp)
gs = np.array([eafds.src_group_struct[0], eafds.src_group_struct[-1]])
eafds.dst_group_struct = gs
self.xscache = XSCache(group_struct=gs, data_sources=(eafds, NullDataSource,))
self.t = t
self._phi = None
self.phi = phi
self.temp = temp
self.log = log
self.tol = tol
if rxs is None:
rxs = ['gamma', 'gamma_1', 'gamma_2', 'p', 'p_1', 'p_2', 'd', 'd_1',
'd_2', 't', 't_1', 't_2', 'He3', 'He3_1', 'He3_2', 'a', 'a_1',
'a_2', 'z_2a', 'z_2p', 'z_2p_1', 'z_2p_2', 'z_2n', 'z_2n_1',
'z_2n_2', 'z_3n', 'z_3n_1', 'z_3n_2', 'na', 'na_1', 'na_2',
'z_2na', 'np', 'np_1', 'np_2', 'n2a', 'nd', 'nd_1', 'nd_2',
'nt', 'nt_1', 'nt_2', 'nHe3', 'nHe3_1', 'nHe3_2','z_4n',
'z_4n_1', 'n', 'n_1', 'n_2', 'z_3np']
rxs = set([rxname.id(rx) for rx in rxs])
rxs.discard(rxname.id('fission'))
self.rxs = rxs
@property
def phi(self):
return self._phi
@phi.setter
def phi(self, flux):
"""Ensures that the flux is correctly formatted."""
flux = np.asarray(flux)
if flux.ndim == 0:
_ = np.empty(175, float)
_.fill(flux / 175.0)
flux = _
elif flux.ndim == 1 and flux.shape[0] != 175:
raise ValueError("Group structure must match EAF.")
elif flux.ndim > 1:
raise ValueError("The flux vector must be 0- or 1-dimensional.")
if not np.all(flux >= 0.0):
raise ValueError("Flux entries must be non-negative.")
for ds in self.xscache.data_sources:
ds.src_phi_g = flux
self.xscache['phi_g'] = np.array([flux.sum()])
self._phi = flux
[docs] def transmute(self, x, t=None, phi=None, tol=None, log=None, *args, **kwargs):
"""Transmutes a material into its daughters.
Parameters
----------
x : Material or similar
Input material for transmutation.
t : float
Transmutations time [sec].
phi : float or array of floats
Neutron flux vector [n/cm^2/sec]. Currently this must either be
a scalar or match the group structure of EAF.
tol : float
Tolerance level for chain truncation.
log : file-like or None
The log file object should be written. A None imples the log is
not desired.
Returns
-------
y : Material
The output material post-transmutation.
"""
if not isinstance(x, Material):
x = Material(x)
if t is not None:
self.t = t
if phi is not None:
self.phi = phi
if log is not None:
self.log = log
if tol is not None:
self.tol = tol
x_atoms = x.to_atom_frac()
y_atoms = {}
for nuc, adens in x_atoms.items():
# Find output for root of unit density and scale all output by
# actual nuclide density and add to final output.
partial = self._transmute_partial(nuc)
for part_nuc, part_adens in partial.items():
y_atoms[part_nuc] = part_adens * adens + y_atoms.get(part_nuc, 0.0)
mw_x = x.molecular_mass()
y = from_atom_frac(y_atoms, atoms_per_molecule=x.atoms_per_molecule)
# even though it doesn't look like it, the following line is actually
# mass_y = MW_y * mass_x / MW_x
y.mass *= x.mass / mw_x
return y
def _transmute_partial(self, nuc):
"""Core method to transmute a material into its daughters.
This method assumes that the initial nuclide has unit density.
Parameters
----------
nuc : int
Nuclide id to be transmuted.
Returns
-------
partial : dict
A dictionary containing number densities for each nuclide after
the transmutation is carried out for the input nuclide. Keys are
nuclide ids and values are float number densities for the coupled
# what is the coupled?.
"""
dest = self._get_destruction(nuc)
# DENSE
A = np.empty((1,1), float)
A[0, 0] = -dest
#A = sparse.csr_matrix([[-dest]]) # <-- SPARSE
rootval = np.exp(-dest * self.t)
partial = {nuc: rootval}
self._traversal(nuc, A, partial)
return partial
def _get_destruction(self, nuc, decay=True):
"""Computes the destruction rate of the nuclide.
Parameters
----------
nuc : int
Name of the nuclide in question
decay : bool
True if the decay constant should be added to the returned value.
False if only destruction from neutron reactions should be considered.
Returns
-------
d : float
Destruction rate of the nuclide.
"""
xscache = self.xscache
sig_a = sigma_a(nuc, xs_cache=xscache)
d = utils.from_barns(sig_a[0], 'cm2') * xscache['phi_g'][0]
if decay and not np.isnan(data.decay_const(nuc)):
d += data.decay_const(nuc)
return d
def _grow_matrix(self, A, prod, dest):
"""Grows the given matrix by one row and one column, adding necessary
production and destruction rates.
Parameters
----------
A : NumPy 2-dimensional array
The original matrix that must be grown.
prod : float
The production rate of the next nuclide in the chain.
dest : float
The destruction rate of the next nuclide in the chain.
Returns
-------
B : NumPy 2-dimensional array
The grown matrix
"""
shape = A.shape
n = shape[0]
# DENSE
# Add row and column to current matrix
B = np.empty((n+1, n+1), dtype=float)
B[:n,:n] = A
B[n,:n-1] = 0
B[:n,n] = 0
# Update new matrix with provided data
B[n,n-1] = prod
B[n,n] = -dest
# SPARSE
#B = sparse.bmat([[A, None], [None, [[-dest]]]]).tocsr()
#B[n,n-1] = prod
return B
def _traversal(self, nuc, A, out, depth=0):
"""Nuclide transmutation traversal method.
This method will traverse the reaction tree recursively, using a DFS
algorithm. On termination, the method will return all number densities
after a given time that are a result of the starting nuclide.
Parameters
----------
nuc : int
ID of the active nuclide for the traversal.
A : NumPy 2-dimensional array
Current state of the coupled equation matrix.
out : dict
A dictionary containing the final recorded number densities for each
nuclide. Keys are nuclide names in integer id form. Values are
number densities for the coupled nuclide in float format. This is
modified in place.
depth : int
Current depth of traversal (root at 0). Should never be provided by user.
"""
t = self.t
tol = self.tol
phi = self.xscache['phi_g'][0]
temp = self.temp
xscache = self.xscache
if self.log is not None:
self._log_tree(depth, nuc, 1.0)
prod = {}
# decay info
lam = data.decay_const(nuc)
decay_branches = {} if lam == 0 else self._decay_branches(nuc)
for decay_child, branch_ratio in decay_branches.items():
prod[decay_child] = lam * branch_ratio
# reaction daughters
for rx in self.rxs:
try:
child = rxname.child(nuc, rx)
except RuntimeError:
continue
child_xs = xscache[nuc, rx, temp][0]
rr = utils.from_barns(child_xs, 'cm2') * phi # reaction rate
prod[child] = rr + prod.get(child, 0.0)
# Cycle production dictionary
for child in prod:
# Grow matrix
d = self._get_destruction(child)
B = self._grow_matrix(A, prod[child], d)
# Create initial density vector
n = B.shape[0]
N0 = np.zeros((n, 1), dtype=float)
N0[0] = 1.0
# Compute matrix exponential and dot with density vector
eB = linalg.expm(B * t)
N_final = np.dot(eB, N0) # <-- DENSE
#N_final = eB.dot(N0) # <-- SPARSE
if self.log is not None:
self._log_tree(depth+1, child, N_final[-1])
# Check against tolerance and continue traversal
if N_final[-1] > tol:
self._traversal(child, B, out, depth=depth+1)
# On recursion exit or truncation, write data from this nuclide
outval = N_final[-1,0] + out.get(child, 0.0)
if 0.0 < outval:
out[child] = outval
def _log_tree(self, depth, nuc, numdens):
"""Logging method to track path of _traversal.
Parameters
----------
depth : integer
Current depth of traversal (root at 0).
nuc : nucname
Current nuclide in traversal.
numdens : float
Current number density of nuc.
tree : File
File to write tree log to.
"""
# Don't print a zero density.
if numdens == 0.0:
return
space = ' |'
entry = "{spacing}--> {name} {numdens}\n".format(spacing=depth*space,
numdens=numdens,
name=nucname.name(nuc))
self.log.write(entry)
self.log.flush()
def _decay_branches(self, nuc):
"""Returns a dictionary that contains the decay children of nuc as keys
to the branch ratio of that child's decay process.
Parameters
----------
nuc : int
Name of parent nuclide to get decay children of.
Returns
-------
decay_branches : dictionary
Keys are decay children of nuc in zzaaam format.
Values are the branch ratio of the decay child.
"""
decay_branches = {}
children = data.decay_children(nuc)
for child in children:
decay_branches[child] = data.branch_ratio(nuc, child)
return decay_branches