"""
This module contains functions for mesh-based Monte Carlo variance reduction.
"""
import numpy as np
from pyne.particle import mcnp
from .mcnp import Wwinp
from pyne.mesh import Mesh, MeshError, HAVE_PYMOAB
# The buildin zip in python3 behaves as itertools.izip as python2.
# For python2, we need to import izip as zip.
# For python3, do nothing with zip.
try:
from itertools import izip as zip
except ImportError:
pass
from warnings import warn
from pyne.utils import QA_warn
import numpy as np
QA_warn(__name__)
if HAVE_PYMOAB:
from pyne.mesh import NativeMeshTag, mesh_iterate
else:
warn("The PyMOAB optional dependency could not be imported. "
"Some aspects of the variance reduction module may be incomplete.",
ImportWarning)
[docs]def cadis(adj_flux_mesh, adj_flux_tag, q_mesh, q_tag,
ww_mesh, ww_tag, q_bias_mesh, q_bias_tag, beta=5):
"""This function reads PyNE Mesh objects tagged with adjoint fluxes and
unbiased source densities and outputs PyNE Meshes of weight window lower
bounds and biased source densities as computed by the Consistant
Adjoint-Driven Importance Sampling (CADIS) method [1]. Note that values can
be stored on the same Mesh object, all different Mesh objects, or any
combination in between. Meshes can be structured or unstructured.
Note that this function is suitable for Forward Weighted (FW) CADIS as well,
the only difference being the adjoint source used for the estimation of the
adjoint flux.
[1] Haghighat, A. and Wagner, J. C., "Monte Carlo Variance Reduction with
Deterministic Importance Functions," Progress in Nuclear Energy,
Vol. 42, No. 1, pp. 25-53, 2003.
Parameters
----------
adj_flux_mesh : PyNE Mesh object
The mesh containing the adjoint fluxes.
adj_flux_tag : string
The name of the adjoint flux tag on adj_mesh.
q_mesh : PyNE Mesh object
The mesh containing the unbiased source density.
q_tag : string
The name of the source density tag on q_mesh.
ww_mesh : PyNE Mesh object
The mesh to store the output weight window mesh.
ww_tag : string
Name of the tag to store output weight window values on ww_mesh.
q_bias_mesh : PyNE Mesh object
The mesh to store the output biased source density mesh.
q_bias_tag : PyNE Mesh object
Name of the tag to store output weight window values on q_bias_mesh.
beta : float
The ratio of the weight window upper bound to the weight window lower
bound. The default value is 5: the value used in MCNP.
"""
# find number of energy groups
e_groups = adj_flux_mesh.get_tag(
adj_flux_tag)[list(mesh_iterate(adj_flux_mesh.mesh))[0]]
e_groups = np.atleast_1d(e_groups)
num_e_groups = len(e_groups)
# verify source (q) mesh has the same number of energy groups
q_e_groups = q_mesh.get_tag(q_tag)[list(
mesh_iterate(q_mesh.mesh))[0]]
q_e_groups = np.atleast_1d(q_e_groups)
num_q_e_groups = len(q_e_groups)
if num_q_e_groups != num_e_groups:
raise TypeError("{0} on {1} and {2} on {3} "
"must be of the same dimension".format(adj_flux_mesh,
adj_flux_tag,
q_mesh, q_tag))
# create volume element (ve) iterators
adj_ves = mesh_iterate(adj_flux_mesh.mesh)
q_ves = mesh_iterate(q_mesh.mesh)
# calculate total source strength
q_tot = 0
for q_ve in q_ves:
q_tot += np.sum(q_mesh.get_tag(q_tag)[q_ve]) \
* q_mesh.elem_volume(q_ve)
q_ves.reset()
# calculate the total response per source particle (R)
R = 0
for adj_ve, q_ve in zip(adj_ves, q_ves):
adj_flux = adj_flux_mesh.get_tag(adj_flux_tag)[adj_ve]
adj_flux = np.atleast_1d(adj_flux)
q = q_mesh.get_tag(q_tag)[q_ve]
q = np.atleast_1d(q)
vol = adj_flux_mesh.elem_volume(adj_ve)
for i in range(0, num_e_groups):
R += adj_flux[i]*q[i]*vol/q_tot
# generate weight windows and biased source densities using R
ww_mesh.tag(ww_tag, np.zeros(num_e_groups, dtype=float),
'nat_mesh', size=num_e_groups, dtype=float)
tag_ww = ww_mesh.get_tag(ww_tag)
ww_ves = mesh_iterate(ww_mesh.mesh)
q_bias_mesh.tag(q_bias_tag, np.zeros(num_e_groups, dtype=float),
'nat_mesh', size=num_e_groups, dtype=float)
tag_q_bias = q_bias_mesh.get_tag(q_bias_tag)
q_bias_ves = mesh_iterate(q_bias_mesh.mesh)
# reset previously created iterators
q_ves.reset()
adj_ves.reset()
for adj_ve, q_ve, ww_ve, q_bias_ve in zip(adj_ves, q_ves,
ww_ves, q_bias_ves):
adj_flux = adj_flux_mesh.get_tag(adj_flux_tag)[adj_ve]
adj_flux = np.atleast_1d(adj_flux)
q = q_mesh.get_tag(q_tag)[q_ve]
q = np.atleast_1d(q)
tag_q_bias[q_bias_ve] = [adj_flux[i]*q[i]/q_tot/R
for i in range(num_e_groups)]
tag_ww[ww_ve] = [R/(adj_flux[i]*(beta + 1.)/2.)
if adj_flux[i] != 0.0 else 0.0
for i in range(num_e_groups)]
[docs]def magic(meshtally, tag_name, tag_name_error, **kwargs):
"""This function reads a PyNE mcnp.MeshTally object and preforms the MAGIC
algorithm and returns the resulting weight window mesh.
Parameters:
-----------
meshtally : a single PyNE mcnp.MeshTally obj
tag_name : string
The meshtally tag_name (example: n_result or n_total_result).
tag_name_error : string
The meshtally tag_name for the error associated with provided tag_name.
Example: n_rel_error
tolerance : float, optional
The maximum relative error allowable for the MAGIC algorithm to create
a weight window lower bound for for a given mesh volume element for the
intial weight window lower bound generation, or overwrite preexisting
weight window lower bounds for subsequent iterations.
null_value : float, optional
The weight window lower bound value that is assigned to mesh volume
elements where the relative error on flux exceeds the tolerance.
"""
tolerance = kwargs.get('tolerance', 0.5)
null_value = kwargs.get('null_value', 0.0)
# Convert particle name to the recognized abbreviation
particle = (meshtally.particle.capitalize())
if particle == ("Neutron" or "Photon" or "Electron"):
meshtally.particle = mcnp(particle).lower()
# Create tags for values and errors
meshtally.vals = NativeMeshTag(mesh=meshtally, name=tag_name)
meshtally.errors = NativeMeshTag(mesh=meshtally, name=tag_name_error)
# Create weight window tags
tag_size = meshtally.vals[0].size
meshtally.ww_x = NativeMeshTag(tag_size, float,
name="ww_{0}".format(meshtally.particle))
meshtally.tag("{0}_e_upper_bounds".format(meshtally.particle),
np.zeros(tag_size, dtype=float), 'nat_mesh', size=tag_size, dtype=float)
root_tag = meshtally.get_tag(
"{0}_e_upper_bounds".format(meshtally.particle))
# Determine if total energy or single energy bin or multiple energy bins
if tag_size == 1 and len(meshtally.e_bounds) > 1:
total = True
elif tag_size == 1 and len(meshtally.e_bounds) == 1:
total = True
elif tag_size > 1 and len(meshtally.e_bounds) > 1:
total = False
# Reassign arrays for total and not total case
if total:
# get value tagged on the mesh itself
root_tag[meshtally] = np.max(meshtally.e_bounds[:])
max_val = np.max(meshtally.vals[:])
vals = []
errors = []
for flux, error in zip(meshtally.vals[:], meshtally.errors[:]):
vals.append(np.atleast_1d(flux))
errors.append(np.atleast_1d(error))
vals = np.array(vals)
errors = np.array(errors)
else:
root_tag[meshtally] = meshtally.e_bounds[1:]
vals = meshtally.vals[:]
errors = meshtally.errors[:]
# Determine the max values for each energy bin
max_val = []
for i in range(tag_size):
vals_in_e = []
for ve, flux in enumerate(vals[:]):
vals_in_e.append(vals[ve][i])
max_val.append(np.max(vals_in_e))
# Apply normalization to create weight windows
ww = []
for ve, flux_list in enumerate(vals[:]):
tally_list = errors[ve]
flux = []
for i, value in enumerate(flux_list):
if tally_list[i] > tolerance:
flux.append(null_value)
else:
flux.append(value/(2.0*max_val[i]))
ww.append(flux)
# Resassign weight windows to meshtally
if total:
meshtally.ww_x[:] = np.reshape(ww, len(ww))
else:
meshtally.ww_x[:] = ww
# Create wwinp mesh
wwinp = Wwinp()
wwinp.read_mesh(meshtally.mesh)