"""
Module for calculating physical quantities from Sarkas checkpoints.
"""
import inspect
from copy import deepcopy
from IPython import get_ipython
if get_ipython().__class__.__name__ == "ZMQInteractiveShell":
from tqdm import tqdm_notebook as tqdm
else:
from tqdm import tqdm
import datetime
import matplotlib.pyplot as plt
import pandas as pd
import scipy.stats as scp_stats
import sys
from matplotlib.gridspec import GridSpec
from numba import njit
from numpy import append as np_append
from numpy import (
argsort,
array,
complex128,
concatenate,
exp,
format_float_scientific,
histogram,
interp,
isfinite,
load,
log,
ndarray,
ones,
ones_like,
pi,
real,
repeat,
roll,
savez,
sqrt,
trapz,
unique,
unwrap,
where,
zeros,
)
from numpy.polynomial import hermite_e
from numpy.random import default_rng
from os import listdir, mkdir
from os import remove as os_remove
from os.path import exists as os_path_exists
from os.path import join as os_path_join
from pandas import (
concat,
DataFrame,
HDFStore,
MultiIndex,
read_csv,
read_hdf,
Series,
to_numeric,
)
from pickle import dump
from pickle import load as pickle_load
from scipy.fft import fft, fftfreq, fftshift
from scipy.integrate import quad as scp_quad
from scipy.linalg import norm
from scipy.optimize import curve_fit
from scipy.special import erfc, factorial
from seaborn import histplot as sns_histplot
from ..utilities.io import print_to_logger
from ..utilities.maths import correlationfunction
from ..utilities.misc import add_col_to_df
from ..utilities.timing import datetime_stamp, SarkasTimer, time_stamp
from .fit_functions import exponential, gaussian
UNITS = [
# MKS Units
{
"Energy": "J",
"Heat Flux": "J/s",
"Time": "s",
"Length": "m",
"Charge": "C",
"Temperature": "K",
"ElectronVolt": "eV",
"Mass": "kg",
"Magnetic Field": "T",
"Current": "A",
"Power": "erg/s",
"Pressure": "Pa",
"Electrical Conductivity": "S/m",
"Diffusion": r"m$^2$/s",
"Bulk Viscosity": r"kg/m-s",
"Shear Viscosity": r"kg/m-s",
"Thermal Conductivity": r"J/m-s-K",
"none": "",
},
# CGS Units
{
"Energy": "erg",
"Heat Flux": "erg/s",
"Time": "s",
"Length": "m",
"Charge": "esu",
"Temperature": "K",
"ElectronVolt": "eV",
"Mass": "g",
"Magnetic Field": "G",
"Current": "esu/s",
"Power": "erg/s",
"Pressure": "Ba",
"Electrical Conductivity": "mho/m",
"Diffusion": r"m$^2$/s",
"Bulk Viscosity": r"g/cm-s",
"Shear Viscosity": r"g/cm-s",
"Thermal Conductivity": r"erg/cm-s-K",
"none": "",
},
]
PREFIXES = {
"y": 1.0e-24, # yocto
"z": 1.0e-21, # zepto
"a": 1.0e-18, # atto
"f": 1.0e-15, # femto
"p": 1.0e-12, # pico
"n": 1.0e-9, # nano
r"$\mu$": 1.0e-6, # micro
"m": 1.0e-3, # milli
"c": 1.0e-2, # centi
"": 1.0,
"k": 1e3, # kilo
"M": 1e6, # mega
"G": 1e9, # giga
"T": 1e12, # tera
"P": 1e15, # peta
"E": 1e18, # exa
"Z": 1e21, # zetta
"Y": 1e24, # yotta
}
[docs]def compute_doc(func):
func.__doc__ = """
Routine for computing the observable. See class doc for exact quantities.\n
The data of each slice is saved in hierarchical dataframes,
:py:attr:`~.dataframe_slices`. \n
The sliced averaged data is saved in other hierarchical dataframes,
:py:attr:`~.dataframe`.
Parameters
----------
calculate_acf : bool
Calculate the ACF of the observable. Default =`True` except for :class:`sarkas.tools.observables.Thermodynamics`.
"""
return func
[docs]def compute_acf_doc(func):
func.__doc__ = """
Routine for computing the observable's autocorrelation function. See class doc for exact quantities.\n
The data of each slice is saved in hierarchical dataframes,
:py:attr:`~.dataframe_acf_slices`. \n
The sliced averaged data is saved in other hierarchical dataframes,
:py:attr:`~.dataframe_acf`.
"""
return func
[docs]def calc_slices_doc(func):
func.__doc__ = """
Calculate the observable for each slice. See class doc for exact quantities.\n
The data of each slice is saved in hierarchical dataframes,
:py:attr:`~.dataframe_slices`.\n
"""
return func
[docs]def calc_acf_slices_doc(func):
func.__doc__ = """
Calculate the observable acf for each slice. See class doc for exact quantities.\n
The data of each slice is saved in hierarchical dataframes,
:py:attr:`~.dataframe_acf_slices`.\n
"""
return func
[docs]def avg_slices_doc(func):
func.__doc__ = """
Calculate the average and standard deviation of the observable from the slices dataframe.
See class doc for exact quantities. \n
The data of each slice is saved in hierarchical dataframes,
:py:attr:`~.dataframe_slices` (:py:attr:`~.dataframe_acf_slices`). \n
The sliced averaged data is saved in other hierarchical dataframes,
:py:attr:`~.dataframe` (:py:attr:`~.dataframe_acf_slices`).
"""
return func
[docs]def avg_acf_slices_doc(func):
func.__doc__ = """
Calculate the average and standard deviation of the observable autocorrelation function from the slices dataframe.
See class doc for exact quantities. \n
The data of each slice is saved in hierarchical dataframes,
:py:attr:`~.dataframe_acf_slices`. \n
The sliced averaged data is saved in other hierarchical dataframes,
:py:attr:`~.dataframe_acf`.
"""
return func
[docs]def setup_doc(func):
func.__doc__ = """
Assign attributes from simulation's parameters.
Parameters
----------
params : :class:`sarkas.core.Parameters`
Simulation's parameters.
phase : str, optional
Phase to compute. Default = 'production'.
no_slices : int, optional
Number of independent runs inside a long simulation. Default = 1.
**kwargs :
These will overwrite any :class:`sarkas.core.Parameters`
or default :class:`sarkas.tools.observables.Observable`
attributes and/or add new ones.
"""
return func
[docs]def arg_update_doc(func):
func.__doc__ = """Update observable specific attributes and call :meth:`~.update_finish` to save info."""
return func
# TODO: Divide calculation in case of multirun
# TODO: Check Velocity Distribution class.
[docs]class Observable:
"""
Parent class of all the observables.
Attributes
----------
dataframe : pandas.DataFrame
Dataframe containing the observable's data averaged over the number of slices.
dataframe_acf : pandas.DataFrame
Dataframe containing the observable's autocorrelation function data averaged over the number of slices.
dataframe_acf_slices : pandas.DataFrame
Dataframe containing the observable's autocorrelation data for each slice.
dataframe_slices : pandas.DataFrame
Dataframe containing the observable's data for each slice.
max_k_harmonics : list
Maximum number of :math:`\\mathbf{k}` harmonics to calculate along each dimension.
phase : str
Phase to analyze.
prod_no_dumps : int
Number of production phase checkpoints. Calculated from the number of files in the Production directory.
eq_no_dumps : int
Number of equilibration phase checkpoints. Calculated from the number of files in the Equilibration directory.
no_dumps : int
Number of simulation's checkpoints. Calculated from the number of files in the phase folder.
dump_dir : str
Path to correct dump directory.
dump_step : int
Correct step interval.
It is either :py:attr:`sarkas.core.Parameters.prod_dump_step` or :py:attr:`sarkas.core.Parameters.eq_dump_step`.
no_obs : int
Number of independent binary observable quantities.
It is calculated as :math:`N_s (N_s + 1) / 2` where :math:`N_s` is the number of species.
k_file : str
Path to the npz file storing the :math:`k` vector values.
nkt_hdf_file : str
Path to the npy file containing the Fourier transform of density fluctuations. :math:`n(\\mathbf k, t)`.
vkt_file : str
Path to the npz file containing the Fourier transform of velocity fluctuations. :math:`\\mathbf v(\\mathbf k, t)`.
k_space_dir : str
Directory where :math:`\\mathbf {k}` data is stored.
saving_dir : str
Path to the directory where computed data is stored.
slice_steps : int
Number of steps per slice.
dimensional_average: bool
Flag for averaging over all dimensions. Default = False.
runs: int
Number of independent MD runs. Default = 1.
multi_run_average: bool
Flag for averaging over multiple runs. Default = False.
If True, `runs` needs be specified. It will collect data from all runs and stored them in a large ndarray to
be averaged over.
"""
[docs] def __init__(self):
self.postprocessing_dir = None
self.mag_no_dumps = None
self.eq_no_dumps = None
self.prod_no_dumps = None
self.no_obs = None
self.filename_hdf_acf = None
self.species_index_start = None
self.filename_hdf_acf_slices = None
self.filename_hdf_slices = None
self.filename_hdf = None
self.__long_name__ = None
self.__name__ = None
self.saving_dir = None
self.phase = "production"
self.multi_run_average = False
self.dimensional_average = False
self.runs = 1
self.no_slices = 1
self.slice_steps = None
self.screen_output = True
self.timer = SarkasTimer()
# k observable attributes
self.k_observable = False
self.max_aa_harmonics = None
self.angle_averaging = "principal_axis"
self.max_k_harmonics = None
self.max_aa_ka_value = None
self.kw_observable = False
self.dim_labels = ["X", "Y", "Z"]
self.acf_observable = False
self.dataframe = None
self.dataframe_slices = None
self.dataframe_acf = None
self.dataframe_acf_slices = None
def __repr__(self):
sortedDict = dict(sorted(self.__dict__.items(), key=lambda x: x[0].lower()))
disp = "Observable( " + self.__class__.__name__ + "\n"
exclude_list = ["dataframe", "dataframe_slices", "dataframe_acf", "dataframe_acf_slices"]
for key, value in sortedDict.items():
if not key in exclude_list:
disp += "\t{} : {}\n".format(key, value)
disp += ")"
return disp
def __getstate__(self):
"""Copy the object's state from self.__dict__ which contains all our instance attributes.
Always use the dict.copy() method to avoid modifying the original state.
Reference: https://docs.python.org/3/library/pickle.html#handling-stateful-objects
"""
state = self.__dict__.copy()
# Remove the data that is stored already
del state["dataframe"]
del state["dataframe_slices"]
del state["dataframe_acf"]
del state["dataframe_acf_slices"]
return state
def __setstate__(self, state):
# Restore instance attributes.
self.__dict__.update(state)
# Restore the previously deleted dataframes.
self.parse()
[docs] def calc_k_data(self):
"""Calculate and save Fourier space data."""
# Do some checks
if not isinstance(self.angle_averaging, str):
raise TypeError("angle_averaging not a string. " "Choose from ['full', 'custom', 'principal_axis']")
elif self.angle_averaging not in ["full", "custom", "principal_axis"]:
raise ValueError(
"Option not available. " "Choose from ['full', 'custom', 'principal_axis']" "Note case sensitivity."
)
assert self.max_k_harmonics.all(), "max_k_harmonics not defined."
# Calculate the k arrays
self.k_list, self.k_counts, k_unique, self.k_harmonics = kspace_setup(
self.box_lengths, self.angle_averaging, self.max_k_harmonics, self.max_aa_harmonics
)
# Save the ka values
self.ka_values = 2.0 * pi * k_unique * self.a_ws
self.k_values = 2.0 * pi * k_unique
self.no_ka_values = len(self.ka_values)
# Check if the writing folder exist
if not (os_path_exists(self.k_space_dir)):
mkdir(self.k_space_dir)
# Write the npz file
savez(
self.k_file,
k_list=self.k_list,
k_harmonics=self.k_harmonics,
k_counts=self.k_counts,
k_values=self.k_values,
ka_values=self.ka_values,
angle_averaging=self.angle_averaging,
max_k_harmonics=self.max_k_harmonics,
max_aa_harmonics=self.max_aa_harmonics,
)
[docs] def calc_nkt_slices_data(self):
"""Calculate n(k,t) for each slice."""
start_slice = 0
end_slice = self.slice_steps * self.dump_step
self.nkt_dataframe_slices = DataFrame()
for isl in tqdm(
range(self.no_slices),
desc="Calculating n(k,t) for slice ",
position=0,
disable=not self.verbose,
leave=True,
):
nkt = calc_nkt(
self.dump_dir,
(start_slice, end_slice, self.slice_steps),
self.dump_step,
self.species_num,
self.k_list,
self.verbose,
)
start_slice += self.slice_steps * self.dump_step
end_slice += self.slice_steps * self.dump_step
# n(k,t).shape = [no_species, time, k vectors]
slc_column = "slice {}".format(isl + 1)
for isp, sp_name in enumerate(self.species_names):
df_columns = [
slc_column + "_{}_k = [{}, {}, {}]".format(sp_name, *self.k_harmonics[ik, :-2].astype(int))
for ik in range(len(self.k_harmonics))
]
# df_columns = [time_column, *k_columns]
self.nkt_dataframe_slices = concat(
[self.nkt_dataframe_slices, DataFrame(nkt[isp, :, :], columns=df_columns)], axis=1
)
# Example nkt_dataframe
# slices slice 1
# species H
# harmonics k = [0, 0, 1] | k = [0, 1, 0] | ...
tuples = [tuple(c.split("_")) for c in self.nkt_dataframe_slices.columns]
self.nkt_dataframe_slices.columns = MultiIndex.from_tuples(tuples, names=["slices", "species", "harmonics"])
[docs] def calc_vkt_slices_data(self):
"""Calculate v(k,t) for each slice."""
start_slice = 0
end_slice = self.slice_steps * self.dump_step
self.vkt_dataframe_slices = DataFrame()
for isl in tqdm(
range(self.no_slices),
desc="Calculating v(k,t) for slice ",
position=0,
disable=not self.verbose,
leave=True,
):
vkt, vkt_i, vkt_j, vkt_k = calc_vkt(
self.dump_dir,
(start_slice, end_slice, self.slice_steps),
self.dump_step,
self.species_num,
self.k_list,
self.verbose,
)
start_slice += self.slice_steps * self.dump_step
end_slice += self.slice_steps * self.dump_step
slc_column = "slice {}".format(isl + 1)
for isp, sp_name in enumerate(self.species_names):
df_columns = [
slc_column
+ "_Longitudinal_{}_k = [{}, {}, {}]".format(sp_name, *self.k_harmonics[ik, :-2].astype(int))
for ik in range(len(self.k_harmonics))
]
# df_columns = [time_column, *k_columns]
self.vkt_dataframe_slices = concat(
[self.vkt_dataframe_slices, DataFrame(vkt[isp, :, :], columns=df_columns)], axis=1
)
df_columns = [
slc_column
+ "_Transverse i_{}_k = [{}, {}, {}]".format(sp_name, *self.k_harmonics[ik, :-2].astype(int))
for ik in range(len(self.k_harmonics))
]
self.vkt_dataframe_slices = concat(
[self.vkt_dataframe_slices, DataFrame(vkt_i[isp, :, :], columns=df_columns)], axis=1
)
df_columns = [
slc_column
+ "_Transverse j_{}_k = [{}, {}, {}]".format(sp_name, *self.k_harmonics[ik, :-2].astype(int))
for ik in range(len(self.k_harmonics))
]
self.vkt_dataframe_slices = concat(
[self.vkt_dataframe_slices, DataFrame(vkt_j[isp, :, :], columns=df_columns)], axis=1
)
df_columns = [
slc_column
+ "_Transverse k_{}_k = [{}, {}, {}]".format(sp_name, *self.k_harmonics[ik, :-2].astype(int))
for ik in range(len(self.k_harmonics))
]
self.vkt_dataframe_slices = concat(
[self.vkt_dataframe_slices, DataFrame(vkt_k[isp, :, :], columns=df_columns)], axis=1
)
# Example vkt_dataframe
# slices slice 1
# species H
# direction Longitudinal/Transverse
# harmonics k = [0, 0, 1] | k = [0, 1, 0] | ...
# Full string: slice 1_Longitudinal_He_k = [1, 0, 0]
tuples = [tuple(c.split("_")) for c in self.vkt_dataframe_slices.columns]
self.vkt_dataframe_slices.columns = MultiIndex.from_tuples(
tuples, names=["slices", "species", "direction", "harmonics"]
)
[docs] @staticmethod
def integrate_normalized_acf_squared(time, data):
"""
Calculate the normalized correlation time as given by
"""
data_0 = data[0]
tau_2 = 2.0 * trapz((data / data_0) ** 2, x=time)
return tau_2
[docs] def calculate_corr_times(self, slices=False):
if slices:
df_acf = self.dataframe_acf_slices
else:
df_acf = self.dataframe_acf
cols = df_acf.columns
time = df_acf[cols[0]].values
index_list = [
"exponential [s]",
"gaussian [s]",
"integral [s]",
"exponential [timestep]",
"gaussian [timestep]",
"integral [timestep]",
"exponential [dump step]",
"gaussian [dump step]",
"integral [dump step]",
"exponential [plasma cycle]",
"gaussian [plasma cycle]",
"integral [plasma cycle]",
]
# This creates a df with a bunch of Nan objects
df = DataFrame(index=index_list, columns=MultiIndex.from_tuples(df_acf.columns[1:]))
t_wp = (2.0 * pi) / self.total_plasma_frequency
for col in cols[1:]:
# Normalize the ACF
acf_0 = (df_acf[col].values)[0]
acf = df_acf[col].values / acf_0
# Find a short time to avoid the noise in the tail.
# Take all the values of the ACF up to 10% from zero.
indices = where(acf > 0.1)[0]
acf_fit = acf[indices]
time_fit = time[indices]
# Fit an exponential. Use the plasma cycle as the initial guess.
popt, _ = curve_fit(exponential, time_fit, acf_fit, p0=t_wp)
# Calculate correlation time. Recall that the corr time is 2 int_0^inf [C(t)/C(0)]^2
df.loc["exponential [s]"][col] = popt[0]
df.loc["exponential [timestep]"][col] = popt[0] / self.dt
df.loc["exponential [dump step]"][col] = popt[0] / (self.dump_step * self.dt)
df.loc["exponential [plasma cycle]"][col] = popt[0] / t_wp
# Fit a gaussian
popt, _ = curve_fit(gaussian, time_fit, acf_fit, p0=t_wp)
# Calculate correlation time int (exp(- x^2/t) )^2 from 0 to inf = 0.5 * sqrt(pi/2 * t)
tau = sqrt(0.5 * pi) * popt[0] # don't forget the 2 in front of the integral
df.loc["gaussian [s]"][col] = tau
df.loc["gaussian [timestep]"][col] = tau / self.dt
df.loc["gaussian [dump step]"][col] = tau / (self.dump_step * self.dt)
df.loc["gaussian [plasma cycle]"][col] = tau / t_wp
# No fit.
tau = self.integrate_normalized_acf_squared(time, acf)
df.loc["integral [s]"][col] = tau
df.loc["integral [timestep]"][col] = tau / self.dt
df.loc["integral [dump step]"][col] = tau / (self.dump_step * self.dt)
df.loc["integral [plasma cycle]"][col] = tau / t_wp
# Need to conver to numbers because they are all objects.
for col in df.columns:
df[col] = to_numeric(df[col])
if slices:
self.correlation_times_slices = df
else:
self.correlation_times = df
[docs] def compute_kt_data(self, nkt_flag: bool = False, vkt_flag: bool = False):
"""Calculate Time dependent Fourier space quantities.
Parameters
----------
nkt_flag : bool
Flag for calculating microscopic density Fourier components :math:`n(\\mathbf k, t)`. \n
Default = False.
vkt_flag : bool
Flag for calculating microscopic velocity Fourier components, :math:`v(\\mathbf k, t)`. \n
Default = False.
"""
if nkt_flag:
tinit = self.timer.current()
self.calc_nkt_slices_data()
self.save_kt_hdf(nkt_flag=True)
tend = self.timer.current()
time_stamp(self.log_file, "n(k,t) Calculation", self.timer.time_division(tend - tinit), self.verbose)
if vkt_flag:
tinit = self.timer.current()
self.calc_vkt_slices_data()
self.save_kt_hdf(vkt_flag=True)
tend = self.timer.current()
time_stamp(self.log_file, "v(k,t) Calculation", self.timer.time_division(tend - tinit), self.verbose)
[docs] def copy_params(self, params):
for i, val in params.__dict__.items():
if not inspect.ismethod(val):
if isinstance(val, dict):
self.__dict__[i] = deepcopy(val)
elif isinstance(val, ndarray):
self.__dict__[i] = val.copy()
else:
self.__dict__[i] = val
[docs] def create_dirs_filenames(self):
"""Create the directories and filenames where to save dataframes. It also creates a log file that can be accessed at
:py:attr:`~.log_file`. Finally, it initializes the dataframes used to store data.
"""
# Saving Directory
self.setup_multirun_dirs()
# If multi_run_average self.postprocessing_dir is Simulations/Postprocessing else Job_dir/Postprocessing
saving_dir = os_path_join(self.directory_tree["postprocessing"]["path"], self.__long_name__.replace(" ", ""))
if not os_path_exists(saving_dir):
mkdir(saving_dir)
self.saving_dir = os_path_join(saving_dir, self.phase.capitalize())
if not os_path_exists(self.saving_dir):
mkdir(self.saving_dir)
# Create the log file path
fname = self.__long_name__.replace(" ", "") + "_log_file.out"
self.log_file = os_path_join(self.saving_dir, fname)
# Filenames and strings
fname = self.__long_name__.replace(" ", "") + "_" + self.job_id + ".h5"
self.filename_hdf = os_path_join(self.saving_dir, fname)
fname = self.__long_name__.replace(" ", "") + "_slices_" + self.job_id + ".h5"
self.filename_hdf_slices = os_path_join(self.saving_dir, fname)
if self.__name__ == "ccf":
fname = "Longitudinal_" + self.__long_name__.replace(" ", "") + "_" + self.job_id + ".h5"
self.filename_hdf_longitudinal = os_path_join(self.saving_dir, fname)
fname = "Longitudinal_" + self.__long_name__.replace(" ", "") + "_slices_" + self.job_id + ".h5"
self.filename_hdf_longitudinal_slices = os_path_join(self.saving_dir, fname)
fname = "Transverse_" + self.__long_name__.replace(" ", "") + "_" + self.job_id + ".h5"
self.filename_hdf_transverse = os_path_join(self.saving_dir, fname)
fname = "Transverse_" + self.__long_name__.replace(" ", "") + "_slices_" + self.job_id + ".h5"
self.filename_hdf_transverse_slices = os_path_join(self.saving_dir, fname)
self.filename_hdf = {
"Longitudinal": self.filename_hdf_longitudinal,
"Transverse": self.filename_hdf_transverse,
}
self.filename_hdf_slices = {
"Longitudinal": self.filename_hdf_longitudinal_slices,
"Transverse": self.filename_hdf_transverse_slices,
}
if self.acf_observable:
fname = self.__long_name__.replace(" ", "") + "ACF_" + self.job_id + ".h5"
self.filename_hdf_acf = os_path_join(self.saving_dir, fname)
fname = self.__long_name__.replace(" ", "") + "ACF_slices_" + self.job_id + ".h5"
self.filename_hdf_acf_slices = os_path_join(self.saving_dir, fname)
if self.k_observable:
# Create paths for files
self.k_space_dir = os_path_join(self.postprocessing_dir, "k_space_data")
self.k_file = os_path_join(self.k_space_dir, "k_arrays.npz")
self.nkt_hdf_file = os_path_join(self.k_space_dir, "nkt.h5")
self.vkt_hdf_file = os_path_join(self.k_space_dir, "vkt.h5")
[docs] def from_dict(self, input_dict: dict):
"""
Update attributes from input dictionary.
Parameters
----------
input_dict: dict
Dictionary to be copied.
"""
self.__dict__.update(input_dict)
[docs] def grab_sim_data(self, pva: str = "vel"):
"""Read in particles data into one large array.
Parameters
----------
pva : str
Key of the data to be collected. Options ["pos", "vel", "acc"], Default = "vel"
Returns
-------
time : numpy.ndarray
One dimensional array with time data.
data_all : numpy.ndarray
Array with shape (:attr:`sarkas.tools.observables.Observable.no_dumps`,
:attr:`sarkas.tools.observables.Observable.self.dim`, :attr:`sarkas.tools.observables.Observable.runs` *
:attr:`sarkas.tools.observables.Observable.inv_dim` * :attr:`sarkas.tools.observables.Observable.total_num_ptcls`).
`.dim` = 1 if :attr:`sarkas.tools.observables.Observable.dimensional_average = True` otherwise equals the number
of dimensions, (e.g. 3D : 3) `.runs` is the number of runs to be averaged over. Default = 1. `.inv_dim` is
the else option of `dim`. If `dim = 1` then `.inv_dim = .dimensions` and viceversa.
"""
# Velocity array for storing simulation data
data_all = zeros((self.no_dumps, self.dim, self.runs * self.inv_dim * self.total_num_ptcls))
time = zeros(self.no_dumps)
print("\nCollecting data from snapshots ...")
if self.dimensional_average:
# Loop over the runs
for r, dump_dir_r in enumerate(tqdm(self.dump_dirs_list, disable=(not self.verbose), desc="Runs Loop")):
# Loop over the timesteps
for it in tqdm(range(self.no_dumps), disable=(not self.verbose), desc="Timestep Loop"):
# Read data from file
dump = int(it * self.dump_step)
datap = load_from_restart(dump_dir_r, dump)
# Loop over the particles' species
for sp_indx, (sp_name, sp_num) in enumerate(zip(self.species_names, self.species_num)):
# Calculate the correct start and end index for storage
start_indx = self.species_index_start[sp_indx] + self.inv_dim * sp_num * r
end_indx = self.species_index_start[sp_indx] + self.inv_dim * sp_num * (r + 1)
# Use a mask to grab only the selected species and flatten along the first axis
# data = ( v1_x, v1_y, v1_z,
# v2_x, v2_y, v2_z,
# v3_x, v3_y, v3_z,
# ...)
# The flatten array would like this
# flattened = ( v1_x, v2_x, v3_x, ..., v1_y, v2_y, v3_y, ..., v1_z, v2_z, v3_z, ...)
mask = datap["names"] == sp_name
data_all[it, 0, start_indx:end_indx] = datap[pva][mask].flatten("F")
time[it] = datap["time"]
else: # Dimensional Average = False
# Loop over the runs
for r, dump_dir_r in enumerate(tqdm(self.dump_dirs_list, disable=(not self.verbose), desc="Runs Loop")):
# Loop over the timesteps
for it in tqdm(range(self.no_dumps), disable=(not self.verbose), desc="Timestep Loop"):
# Read data from file
dump = int(it * self.dump_step)
datap = load_from_restart(dump_dir_r, dump)
# Loop over the particles' species
for sp_indx, (sp_name, sp_num) in enumerate(zip(self.species_names, self.species_num)):
# Calculate the correct start and end index for storage
start_indx = self.species_index_start[sp_indx] + self.inv_dim * sp_num * r
end_indx = self.species_index_start[sp_indx] + self.inv_dim * sp_num * (r + 1)
# Use a mask to grab only the selected species and transpose the array to put dimensions first
for d in range(self.dimensions):
mask = datap["names"] == sp_name
data_all[it, d, start_indx:end_indx] = datap[pva][mask][:, d]
time[it] = datap["time"]
return time, data_all
[docs] def parse(self, acf_data: bool = False):
"""
Grab the pandas dataframe from the saved csv file. If file does not exist call :meth:`compute()`.
"""
if self.__name__ == "rdf":
acf_data = False
if self.k_observable:
try:
self.dataframe = read_hdf(self.filename_hdf, mode="r", index_col=False)
k_data = load(self.k_file)
self.k_list = k_data["k_list"]
self.k_counts = k_data["k_counts"]
self.ka_values = k_data["ka_values"]
except FileNotFoundError:
print("\nFile {} not found!".format(self.filename_hdf))
print("\nComputing Observable now ...")
self.compute()
else:
try:
if hasattr(self, "filename_csv"):
self.dataframe = read_csv(self.filename_csv, index_col=False)
else:
self.dataframe = read_hdf(self.filename_hdf, mode="r", index_col=False)
except FileNotFoundError:
if hasattr(self, "filename_csv"):
data_file = self.filename_csv
else:
data_file = self.filename_hdf
print("\nData file not found! \n {}".format(data_file))
print("\nComputing Observable now ...")
self.compute()
if hasattr(self, "dataframe_slices"):
self.dataframe_slices = read_hdf(self.filename_hdf_slices, mode="r", index_col=False)
if acf_data:
self.parse_acf()
[docs] def parse_acf(self):
try:
self.dataframe_acf = read_hdf(self.filename_hdf_acf, mode="r", index_col=False)
self.dataframe_acf_slices = read_hdf(self.filename_hdf_acf_slices, mode="r", index_col=False)
except FileNotFoundError:
print(f"\nFiles {self.filename_hdf_acf} not found!")
print("\nComputing Observable now ...")
self.compute_acf()
[docs] def parse_k_data(self):
"""Read in the precomputed Fourier space data. Recalculate if not correct."""
try:
k_data = load(self.k_file)
# Check for the correct number of k values
if self.angle_averaging == k_data["angle_averaging"]:
# Check for the correct max harmonics
comp = self.max_k_harmonics == k_data["max_k_harmonics"]
if comp.all():
self.k_list = k_data["k_list"]
self.k_harmonics = k_data["k_harmonics"]
self.k_counts = k_data["k_counts"]
self.k_values = k_data["k_values"]
self.ka_values = k_data["ka_values"]
self.no_ka_values = len(self.ka_values)
else:
self.calc_k_data()
else:
self.calc_k_data()
except FileNotFoundError:
self.calc_k_data()
[docs] def parse_kt_data(self, nkt_flag: bool = False, vkt_flag: bool = False):
"""
Read in the precomputed time dependent Fourier space data. Recalculate if not.
Parameters
----------
nkt_flag : bool
Flag for reading microscopic density Fourier components :math:`n(\\mathbf k, t)`. \n
Default = False.
vkt_flag : bool
Flag for reading microscopic velocity Fourier components, :math:`v(\\mathbf k, t)`. \n
Default = False.
"""
if nkt_flag:
try:
# Check that what was already calculated is correct
with HDFStore(self.nkt_hdf_file, mode="r") as nkt_hfile:
metadata = nkt_hfile.get_storer("nkt").attrs.metadata
if metadata["no_slices"] == self.no_slices:
# Check for the correct number of k values
if metadata["angle_averaging"] == self.angle_averaging:
# Check for the correct max harmonics
comp = self.max_k_harmonics == metadata["max_k_harmonics"]
if not comp.all():
self.compute_kt_data(nkt_flag=True)
else:
self.compute_kt_data(nkt_flag=True)
else:
self.compute_kt_data(nkt_flag=True)
# elif metadata['max_k_harmonics']
#
# if self.angle_averaging == nkt_data["angle_averaging"]:
#
# comp = self.max_k_harmonics == nkt_data["max_harmonics"]
# if not comp.all():
# self.compute_kt_data(nkt_flag=True)
# else:
# self.compute_kt_data(nkt_flag=True)
except OSError:
self.compute_kt_data(nkt_flag=True)
if vkt_flag:
try:
# Check that what was already calculated is correct
with HDFStore(self.vkt_hdf_file, mode="r") as vkt_hfile:
metadata = vkt_hfile.get_storer("vkt").attrs.metadata
if metadata["no_slices"] == self.no_slices:
# Check for the correct number of k values
if metadata["angle_averaging"] == self.angle_averaging:
# Check for the correct max harmonics
comp = self.max_k_harmonics == metadata["max_k_harmonics"]
if not comp.all():
self.compute_kt_data(vkt_flag=True)
else:
self.compute_kt_data(vkt_flag=True)
else:
self.compute_kt_data(vkt_flag=True)
except OSError:
self.compute_kt_data(vkt_flag=True)
[docs] def plot(self, scaling: tuple = None, acf: bool = False, figname: str = None, show: bool = False, **kwargs):
"""
Plot the observable by calling the pandas.DataFrame.plot() function and save the figure.
Parameters
----------
scaling : float, tuple
Factor by which to rescale the x and y axis.
acf : bool
Flag for renormalizing the autocorrelation functions. Default= False
figname : str
Name with which to save the file. It automatically saves it in the correct directory.
show : bool
Flag for prompting the plot to screen. Default=False
**kwargs :
Options to pass to matplotlib plotting method.
Returns
-------
axes_handle : matplotlib.axes.Axes
Axes. See `pandas` documentation for more info
"""
if acf:
plot_dataframe = self.dataframe_acf.copy()
# Autocorrelation function renormalization
for i, col in enumerate(plot_dataframe.columns[1:], 1):
plot_dataframe[col] /= plot_dataframe[col].iloc[0]
kwargs["logx"] = True
# kwargs['xlabel'] = 'Time difference'
else:
plot_dataframe = self.dataframe.copy()
# This is needed because I don't know a priori what the first column name is
first_col_name = plot_dataframe.columns[0]
if scaling:
if isinstance(scaling, tuple):
plot_dataframe[first_col_name] /= scaling[0]
plot_dataframe[kwargs["y"]] /= scaling[1]
else:
plot_dataframe[first_col_name] /= scaling
axes_handle = plot_dataframe.plot(x=plot_dataframe.columns[0], **kwargs)
fig = axes_handle.figure
fig.tight_layout()
# Saving
if figname:
fig.savefig(os_path_join(self.saving_dir, figname + "_" + self.job_id + ".png"))
else:
fig.savefig(os_path_join(self.saving_dir, "Plot_" + self.__name__ + "_" + self.job_id + ".png"))
if show:
fig.show()
return axes_handle
[docs] def pretty_print_msg(self):
"""Create the message with the basic information of every observable
Returns
-------
msg : str
Message to print.
"""
name = " " + self.__long_name__ + " "
if self.__name__ == "ccf":
msg = (
f"\n\n{name:=^70}\n"
f"Data saved in: \n {self.filename_hdf}\n"
f"Data accessible via: self.dataframe_longitudinal_slices, self.dataframe_longitudinal\n"
f"\t\tself.dataframe_transverse_slices, self.dataframe_transverse"
)
else:
msg = (
f"\n\n{name:=^70}\n"
f"Data saved in: \n {self.filename_hdf}\n"
f"Data accessible via: self.dataframe_slices, self.dataframe\n"
)
if self.__name__ == "rdf":
msg += (
f"No. bins = {self.no_bins}\n"
f"dr = {self.dr_rdf / self.a_ws:.4f} a_ws = {self.dr_rdf:.4e} {self.units_dict['length']}\n"
f"Maximum Distance (i.e. potential.rc)= {self.rc / self.a_ws:.4f} a_ws = {self.rc:.4e} {self.units_dict['length']}"
)
dtau = self.dt * self.dump_step
tau = dtau * self.slice_steps
t_wp = 2.0 * pi / self.total_plasma_frequency # Plasma period
tau_wp = int(tau / t_wp)
msg += (
f"\nTime Series Data:\n"
f"No. of slices = {self.no_slices}\n"
f"No. dumps per slice = {int(self.slice_steps)}\n"
f"Total time: T = {tau:.4e} {self.units_dict['time']} ~ {tau_wp} plasma periods\n"
f"Time interval step: dt = {dtau:.4e} ~ {dtau / t_wp:.4e} plasma period"
)
if self.acf_observable:
dtau = self.dt * self.dump_step
tau = dtau * self.acf_slice_steps
tau_wp = int(tau / t_wp)
msg += (
f"\n\nACF Data:\n"
f"If you choose to set equal_number_time_samples=True in compute_acf() then the following applies. Otherwise, the above applies."
f"No. of acf slices = {self.no_slices}\n"
f"No. dumps per slice = {int(self.acf_slice_steps)}\n"
f"Largest time lag of the autocorrelation function: tau = {tau:.4e} {self.units_dict['time']} ~ {tau_wp} plasma periods\n\n"
)
if self.k_observable:
if self.__name__ == "ccf":
kt_file = f"v(k,t) data saved in: \n {self.vkt_hdf_file}\n"
else:
kt_file = f"n(k,t) data saved in: \n {self.nkt_hdf_file}\n"
k_msg = (
f"k wave vector information saved in:\n {self.k_file}\n"
f"{kt_file}"
f"Data saved in: \n {self.filename_hdf}\n"
f"Data accessible at: self.k_list, self.k_counts, self.ka_values, self.frequencies, self.dataframe\n"
f"\nWave vector parameters:\n"
f"Smallest wave vector k_min = 2 pi / L = 3.9 / N^(1/3)\n"
f"k_min = {self.ka_values[0]:.4f} / a_ws = {self.ka_values[0] / self.a_ws:.4e} {self.units_dict['inverse length']}\n"
f"\nAngle averaging choice: {self.angle_averaging}\n"
)
if self.angle_averaging == "full":
nx, ny, nz = unwrap(self.max_aa_harmonics)
aa_k_msg = (
f"\tMaximum angle averaged k harmonics = n_x, n_y, n_z = {nx}, {ny}, {nz}\n"
f"\tLargest angle averaged k_max = k_min * sqrt( n_x^2 + n_y^2 + n_z^2)\n"
f"\tk_max = {self.max_aa_ka_value:.4f} / a_ws = {self.max_aa_ka_value / self.a_ws :1.4e} {self.units_dict['inverse length']}\n"
)
elif self.angle_averaging == "custom":
nx, ny, nz = unwrap(self.max_aa_harmonics)
knx, kny, knz = unwrap(self.max_k_harmonics)
aa_k_msg = (
f"\tMaximum angle averaged k harmonics = n_x, n_y, n_z = {nx}, {ny}, {nz}\n"
f"\tLargest angle averaged k_max = k_min * sqrt( n_x^2 + n_y^2 + n_z^2)\n"
f"\tAA k_max = {self.max_aa_ka_value:.4f} / a_ws = {self.max_aa_ka_value / self.a_ws :1.4e} {self.units_dict['inverse length']}\n"
f"\tMaximum k harmonics = n_x, n_y, n_z = {knx}, {kny}, {knz}\n"
f"\tLargest wave vector k_max = k_min * n_x\n"
f"\tk_max = {self.max_ka_value:.4f} / a_ws = {self.max_ka_value / self.a_ws:.4e} {self.units_dict['inverse length']}\n"
)
elif self.angle_averaging == "principal_axis":
knx, kny, knz = unwrap(self.max_k_harmonics)
aa_k_msg = (
f"\tMaximum k harmonics = n_x, n_y, n_z = {knx}, {kny}, {knz}\n"
f"\tLargest wave vector k_max = k_min * n_x\n"
f"\tk_max = {self.max_ka_value:.4f} / a_ws = {self.max_ka_value / self.a_ws:.4e} {self.units_dict['inverse length']}\n"
)
aa_k_msg += (
f"\nTotal number of k values to calculate = {len(self.k_list)}\n"
f"No. of unique ka values to calculate = {len(self.ka_values)}\n"
)
k_msg += aa_k_msg
if self.kw_observable:
kw_msg = (
f"\nFrequency Space Parameters:\n"
f"\tNo. of slices = {self.no_slices}\n"
f"\tNo. dumps per slice = {self.slice_steps}\n"
f"\tFrequency step dw = 2 pi /(slice_steps * dump_step * dt)\n"
f"\tdw = {self.w_min / self.total_plasma_frequency:.4f} w_p = {self.w_min:.4e} {self.units_dict['frequency']}\n"
f"\tMaximum Frequency w_max = pi /(dump_step * dt)\n"
f"\tw_max = {self.w_max / self.total_plasma_frequency:.4f} w_p = {self.w_max:.4e} {self.units_dict['frequency']}\n"
)
k_msg += kw_msg
msg += k_msg
return msg
[docs] def from_pickle(self):
"""Read the observable's info from the pickle file."""
self.filename_pickle = os_path_join(self.saving_dir, self.__long_name__.replace(" ", "") + ".pickle")
with open(self.filename_pickle, "rb") as pkl_data:
data = pickle_load(pkl_data)
self.from_dict(data.__dict__)
[docs] def save_hdf(self):
# Create the columns for the HDF df
if not self.k_observable:
if not isinstance(self.dataframe_slices.columns, MultiIndex):
self.dataframe_slices.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.dataframe_slices.columns]
)
if not isinstance(self.dataframe.columns, MultiIndex):
self.dataframe.columns = MultiIndex.from_tuples([tuple(c.split("_")) for c in self.dataframe.columns])
# Sort the index for speed
# see https://stackoverflow.com/questions/54307300/what-causes-indexing-past-lexsort-depth-warning-in-pandas
self.dataframe = self.dataframe.sort_index()
self.dataframe_slices = self.dataframe_slices.sort_index()
# TODO: Fix this hack. We should be able to add data to HDF instead of removing it and rewriting it.
# Save the data.
if os_path_exists(self.filename_hdf_slices):
os_remove(self.filename_hdf_slices)
self.dataframe_slices.to_hdf(self.filename_hdf_slices, mode="w", key=self.__name__)
if os_path_exists(self.filename_hdf):
os_remove(self.filename_hdf)
self.dataframe.to_hdf(self.filename_hdf, mode="w", key=self.__name__)
[docs] def save_acf_hdf(self):
if not isinstance(self.dataframe_acf.columns, pd.MultiIndex):
self.dataframe_acf.columns = MultiIndex.from_tuples([tuple(c.split("_")) for c in self.dataframe_acf.columns])
if not isinstance(self.dataframe_acf_slices.columns, pd.MultiIndex):
self.dataframe_acf_slices.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.dataframe_acf_slices.columns]
)
self.dataframe_acf = self.dataframe_acf.sort_index()
self.dataframe_acf_slices = self.dataframe_acf_slices.sort_index()
if os_path_exists(self.filename_hdf_acf):
os_remove(self.filename_hdf_acf)
self.dataframe_acf.to_hdf(self.filename_hdf_acf, mode="w", key=self.__name__)
if os_path_exists(self.filename_hdf_acf_slices):
os_remove(self.filename_hdf_acf_slices)
self.dataframe_acf_slices.to_hdf(self.filename_hdf_acf_slices, mode="w", key=self.__name__)
[docs] def save_kt_hdf(self, nkt_flag: bool = False, vkt_flag: bool = False):
"""
Save the :math:`n(\\mathbf{k},t)` and/or :math:`\mathbf{v}(\\mathbf{k},t)` data of each slice to disk. \n
The data is contained in :py:attr:~.nkt_dataframe_slices and :py:attr:~.vkt_dataframe_slices respectively.\n
Each dataframe is stored as a HDF5 file to disk. The location of the data is at :py:attr:~.nkt_hdf_file and :py:attr:~.vkt_hdf_file.
Parameters
----------
nkt_flag: bool
Flag for saving :math:`n(\\mathbf{k},t)` data. Default is False.
vkt_flag : bool
Flag for saving :math:`\\mathbf{v} (\\mathbf{k},t)` data. Default is False.
"""
if nkt_flag:
# Save the data and append metadata
if os_path_exists(self.nkt_hdf_file):
os_remove(self.nkt_hdf_file)
hfile = HDFStore(self.nkt_hdf_file, mode="w")
hfile.put("nkt", self.nkt_dataframe_slices)
# This metadata is needed to check if I need to recalculate
metadata = {
"no_slices": self.no_slices,
"max_k_harmonics": self.max_k_harmonics,
"angle_averaging": self.angle_averaging,
}
hfile.get_storer("nkt").attrs.metadata = metadata
hfile.close()
if vkt_flag:
if os_path_exists(self.vkt_hdf_file):
os_remove(self.vkt_hdf_file)
# Save the data and append metadata
hfile = HDFStore(self.vkt_hdf_file, mode="w")
hfile.put("vkt", self.vkt_dataframe_slices)
# This metadata is needed to check if I need to recalculate
metadata = {
"no_slices": self.no_slices,
"max_k_harmonics": self.max_k_harmonics,
"angle_averaging": self.angle_averaging,
}
hfile.get_storer("vkt").attrs.metadata = metadata
hfile.close()
[docs] def save_pickle(self):
"""Save the observable's info into a pickle file."""
self.filename_pickle = os_path_join(self.saving_dir, self.__long_name__.replace(" ", "") + ".pickle")
with open(self.filename_pickle, "wb") as pickle_file:
dump(self, pickle_file)
pickle_file.close()
[docs] def setup_init(
self,
params,
phase: str = None,
no_slices: int = None,
multi_run_average: bool = None,
dimensional_average: bool = None,
runs: int = None,
):
"""
Assign Observables attributes and copy the simulation's parameters.
Parameters
----------
runs
dimensional_average
multi_run_average
params : sarkas.core.Parameters
Simulation's parameters.
phase : str, optional
Phase to compute. Default = 'production'.
no_slices : int, optional
Number of independent runs inside a long simulation. Default = 1.
"""
if phase:
self.phase = phase.lower()
if no_slices:
self.no_slices = no_slices
if multi_run_average:
self.multi_run_average = multi_run_average
if dimensional_average:
self.dimensional_average = dimensional_average
if runs:
self.runs = runs
# The dict update could overwrite the names
name = self.__name__
long_name = self.__long_name__
self.copy_params(params)
# Get the right labels
if self.dimensions == 3:
self.dim_labels = ["X", "Y", "Z"]
elif self.dimensions == 2:
self.dim_labels = ["X", "Y"]
# Restore the correct names
self.__name__ = name
self.__long_name__ = long_name
if self.k_observable:
# Check for k space information.
if self.angle_averaging in ["full", "principal_axis"]:
if self.max_k_harmonics is None and self.max_ka_value is None:
raise AttributeError("max_ka_value and max_k_harmonics not defined.")
elif self.angle_averaging == "custom":
# if "custom" I expect that there is a portion that must be angle averaged. Hence, check for those values.
if self.max_aa_ka_value is None and self.max_aa_harmonics is None:
raise AttributeError("max_aa_harmonics and max_aa_ka_value not defined.")
# More checks on k attributes and initialization of k vectors
if self.max_k_harmonics is not None:
# Update angle averaged attributes depending on the choice of angle_averaging
# Convert max_k_harmonics to a numpy array. YAML reads in a list.
if not isinstance(self.max_k_harmonics, ndarray):
self.max_k_harmonics = array([self.max_k_harmonics for i in range(3)])
if self.dimensions < 3:
self.max_k_harmonics[2] = 0
# Calculate max_aa_harmonics based on the choice of angle averaging and inputs
if self.angle_averaging == "full":
self.max_aa_harmonics = self.max_k_harmonics.copy()
elif self.angle_averaging == "custom":
# Check if the user has defined the max_aa_harmonics
if self.max_aa_ka_value:
nx = int(self.max_aa_ka_value * self.box_lengths[0] / (2.0 * pi * self.a_ws * sqrt(3.0)))
ny = int(self.max_aa_ka_value * self.box_lengths[1] / (2.0 * pi * self.a_ws * sqrt(3.0)))
nz = int(self.max_aa_ka_value * self.box_lengths[2] / (2.0 * pi * self.a_ws * sqrt(3.0)))
self.max_aa_harmonics = array([nx, ny, nz])
if self.dimensions < 3:
self.max_aa_harmonics[2] = 0
# else max_aa_harmonics is user defined
elif self.angle_averaging == "principal_axis":
self.max_aa_harmonics = array([0, 0, 0])
# max_ka_value is still None
elif self.max_ka_value is not None:
# Update max_k_harmonics and angle average attributes based on the choice of angle_averaging
# Calculate max_k_harmonics from max_ka_value
# Check for angle_averaging choice
if self.angle_averaging == "full":
# The maximum value is calculated assuming that max nx = max ny = max nz
# ka_max = 2pi a/L sqrt( nx^2 + ny^2 + nz^2) = 2pi a/L nx sqrt(3)
nx = int(self.max_ka_value * self.box_lengths[0] / (2.0 * pi * self.a_ws * sqrt(3.0)))
ny = int(self.max_ka_value * self.box_lengths[1] / (2.0 * pi * self.a_ws * sqrt(3.0)))
nz = int(self.max_ka_value * self.box_lengths[2] / (2.0 * pi * self.a_ws * sqrt(3.0)))
self.max_k_harmonics = array([nx, ny, nz])
self.max_aa_harmonics = array([nx, ny, nz])
if self.dimensions < 3:
self.max_aa_harmonics[2] = 0
self.max_k_harmonics[2] = 0
elif self.angle_averaging == "custom":
# ka_max = 2pi a/L sqrt( nx^2 + 0 + 0) = 2pi a/L nx
nx = int(self.max_ka_value * self.box_lengths[0] / (2.0 * pi * self.a_ws))
ny = int(self.max_ka_value * self.box_lengths[1] / (2.0 * pi * self.a_ws))
nz = int(self.max_ka_value * self.box_lengths[2] / (2.0 * pi * self.a_ws))
self.max_k_harmonics = array([nx, ny, nz])
# Check if the user has defined the max_aa_harmonics
if self.max_aa_ka_value:
nx = int(self.max_aa_ka_value * self.box_lengths[0] / (2.0 * pi * self.a_ws * sqrt(3.0)))
ny = int(self.max_aa_ka_value * self.box_lengths[1] / (2.0 * pi * self.a_ws * sqrt(3.0)))
nz = int(self.max_aa_ka_value * self.box_lengths[2] / (2.0 * pi * self.a_ws * sqrt(3.0)))
self.max_aa_harmonics = array([nx, ny, nz])
# else max_aa_harmonics is user defined
elif self.angle_averaging == "principal_axis":
# ka_max = 2pi a/L sqrt( nx^2 + 0 + 0) = 2pi a/L nx
nx = int(self.max_ka_value * self.box_lengths[0] / (2.0 * pi * self.a_ws))
ny = int(self.max_ka_value * self.box_lengths[1] / (2.0 * pi * self.a_ws))
nz = int(self.max_ka_value * self.box_lengths[2] / (2.0 * pi * self.a_ws))
self.max_k_harmonics = array([nx, ny, nz])
self.max_aa_harmonics = array([0, 0, 0])
# Calculate the maximum ka value based on user's choice of angle_averaging
# Dev notes: Make sure max_ka_value, max_aa_ka_value are defined when this if is done
if self.angle_averaging == "full":
self.max_ka_value = 2.0 * pi * self.a_ws * norm(self.max_k_harmonics / self.box_lengths)
self.max_aa_ka_value = 2.0 * pi * self.a_ws * norm(self.max_k_harmonics / self.box_lengths)
elif self.angle_averaging == "principal_axis":
self.max_ka_value = 2.0 * pi * self.a_ws * self.max_k_harmonics[0] / self.box_lengths[0]
self.max_aa_ka_value = 0.0
elif self.angle_averaging == "custom":
self.max_aa_ka_value = 2.0 * pi * self.a_ws * norm(self.max_aa_harmonics / self.box_lengths)
self.max_ka_value = 2.0 * pi * self.a_ws * self.max_k_harmonics[0] / self.box_lengths[0]
# Get the number of independent observables if multi-species
self.no_obs = int(self.num_species * (self.num_species + 1) / 2)
# Get the total number of dumps by looking at the files in the directory
self.dump_dir = self.directory_tree["postprocessing"][self.phase]["dumps"]["path"]
self.prod_no_dumps = len(listdir(self.directory_tree["postprocessing"]["production"]["dumps"]["path"]))
self.eq_no_dumps = len(listdir(self.directory_tree["postprocessing"]["equilibration"]["dumps"]["path"]))
# Check for magnetized plasma options
if self.magnetized and self.electrostatic_equilibration:
self.mag_no_dumps = len(listdir(self.directory_tree["postprocessing"]["magnetization"]["dumps"]["path"]))
# Assign dumps variables based on the choice of phase
if self.phase == "equilibration":
self.no_dumps = self.eq_no_dumps
self.dump_step = self.eq_dump_step
self.no_steps = self.equilibration_steps
elif self.phase == "production":
self.no_dumps = self.prod_no_dumps
self.dump_step = self.prod_dump_step
self.no_steps = self.production_steps
elif self.phase == "magnetization":
self.no_dumps = self.mag_no_dumps
self.dump_step = self.mag_dump_step
self.no_steps = self.magnetization_steps
# Needed for preprocessing pretty print
self.slice_steps = (
int(self.no_steps / self.dump_step / self.no_slices)
if self.no_dumps < self.no_slices
else int(self.no_dumps / self.no_slices)
)
self.acf_slice_steps = (
int(self.no_steps / self.dump_step / (self.no_slices + 1))
if self.no_dumps < self.no_slices
else int(self.no_dumps / (self.no_slices + 1))
)
# Array containing the start index of each species.
self.species_index_start = array([0, *self.species_num.cumsum()], dtype=int)
[docs] def setup_multirun_dirs(self):
"""Set the attributes postprocessing_dir and dump_dirs_list.
The attribute postprocessing_dir refers to the location where to store postprocessing results.
If the attribute :py:attr:`sarkas.tools.observables.Observable.multi_run_average` is set to `True` then the
postprocessing data will be saved in the :py:attr:`sarkas.core.Parameters.md_simulations_dir` directory, i.e. where
all the runs are.
Otherwise :py:attr:`sarkas.tools.observables.Observable.postprocessing_dir` will be
in the :py:attr:`sarkas.core.Parameters.job_dir`.
The attribute :py:attr:`sarkas.tools.observables.Observable.dump_dirs_list` is a list of the refers to the locations
of the production (or other phases) dumps. If :py:attr:`sarkas.tools.observables.Observable.multi_run_average` is
`False` then the list will contain only one path, namely :py:attr:`sarkas.tools.observables.Observable.dump_dir`.
"""
self.dump_dirs_list = []
if self.multi_run_average:
for r in range(self.runs):
# Direct to the correct dumps directory
dump_dir = os_path_join(
f"run{r}", os_path_join("Simulation", os_path_join(self.phase.capitalize(), "dumps"))
)
dump_dir = os_path_join(self.md_simulations_dir, dump_dir)
self.dump_dirs_list.append(dump_dir)
# Re-path the saving directory.
# Data is saved in Simulations/PostProcessing/Observable/Phase/
self.postprocessing_dir = os_path_join(self.md_simulations_dir, "PostProcessing")
if not os_path_exists(self.postprocessing_dir):
mkdir(self.postprocessing_dir)
else:
self.dump_dirs_list = [self.dump_dir]
[docs] def update_finish(self):
"""Update the :py:attr:`~.slice_steps`, CCF's and DSF's attributes, and save pickle file with observable's info.
Notes
-----
The information is saved without the dataframe(s).
"""
self.create_dirs_filenames()
# Needed if no_slices has been passed
self.slice_steps = (
int(self.no_steps / self.dump_step / self.no_slices)
if self.no_dumps < self.no_slices
else int(self.no_dumps / self.no_slices)
)
if self.k_observable:
self.parse_k_data()
if self.kw_observable:
# These calculation are needed for the io.postprocess_info().
# This is a hack and we need to find a faster way to do it
dt_r = self.dt * self.dump_step
self.w_min = 2.0 * pi / (self.slice_steps * dt_r)
self.w_max = pi / dt_r # Half because fft calculates negative and positive frequencies
self.frequencies = 2.0 * pi * fftfreq(self.slice_steps, dt_r)
self.frequencies = fftshift(self.frequencies)
self.save_pickle()
self.initialize_hdf()
if self.__name__ == "ccf":
self.dataframe_longitudinal = DataFrame()
self.dataframe_longitudinal_slices = DataFrame()
self.dataframe_transverse = DataFrame()
self.dataframe_transverse_slices = DataFrame()
if self.acf_observable:
self.acf_slice_steps = (
int(self.no_steps / self.dump_step / (self.no_slices + 1))
if self.no_dumps < self.no_slices
else int(self.no_dumps / (self.no_slices + 1))
)
# Write log file
datetime_stamp(self.log_file)
msg = self.pretty_print_msg()
print_to_logger(msg, self.log_file, self.verbose)
[docs] def initialize_hdf(self):
self.dataframe = DataFrame()
self.dataframe_slices = DataFrame()
if self.acf_observable:
self.dataframe_acf = DataFrame()
self.dataframe_acf_slices = DataFrame()
[docs]class CurrentCorrelationFunction(Observable):
"""
Current Correlation Functions. \n
The species dependent longitudinal ccf :math:`L_{AB}(\\mathbf k, \\omega)` is defined as
.. math::
L_{AB}(\\mathbf k,\\omega) = \\int_0^\\infty dt \\,
\\left \\langle \\left [\\mathbf k \\cdot \\mathbf v_{A} ( \\mathbf k, t) \\right ]
\\left [ - \\mathbf k \\cdot \\mathbf v_{B} ( -\\mathbf k, t) \\right \\rangle \\right ]
e^{i \\omega t},
while the transverse are
.. math::
T_{AB}(\\mathbf k,\\omega) = \\int_0^\\infty dt \\,
\\left \\langle \\left [ \\mathbf k \\times \\mathbf v_{A} ( \\mathbf k, t) \\right ] \\cdot
\\left [ -\\mathbf k \\times \\mathbf v_{A} ( -\\mathbf k, t) \\right \\rangle \\right ]
e^{i \\omega t},
where the microscopic velocity of species :math:`A` with number of particles :math:`N_{A}` is given by
.. math::
\\mathbf v_{A}(\\mathbf k,t) = \\sum^{N_{A}}_{j} \\mathbf v_j(t) e^{-i \\mathbf k \\cdot \\mathbf r_j(t)} .
Attributes
----------
k_list : list
List of all possible :math:`k` vectors with their corresponding magnitudes and indexes.
k_counts : numpy.ndarray
Number of occurrences of each :math:`k` magnitude.
ka_values : numpy.ndarray
Magnitude of each allowed :math:`ka` vector.
no_ka_values: int
Length of :py:attr:`~.ka_values` array.
"""
[docs] def __init__(self):
super().__init__()
self.__name__ = "ccf"
self.__long_name__ = "Current Correlation Function"
self.k_observable = True
self.kw_observable = True
[docs] @avg_slices_doc
def average_slices_data(self, longitudinal: bool = False, transverse: bool = False):
# Now the actual dataframe
if longitudinal:
self.dataframe_longitudinal[" _ _Frequencies"] = self.frequencies
if transverse:
self.dataframe_transverse[" _ _Frequencies"] = self.frequencies
# Take the mean and std and store them into the dataframe to return
for sp1, sp1_name in enumerate(self.species_names):
for sp2, sp2_name in enumerate(self.species_names[sp1:], sp1):
comp_name = f"{sp1_name}-{sp2_name}"
# Get the k_harmonics columns over which to average
ka_columns_mean = [
comp_name + "_Mean_ka{} = {:.4f}".format(ik + 1, ka) for ik, ka in enumerate(self.ka_values)
]
# Std
ka_columns_std = [
comp_name + "_Std_ka{} = {:.4f}".format(ik + 1, ka) for ik, ka in enumerate(self.ka_values)
]
if longitudinal:
# Mean: level = 1 corresponds to averaging all the k harmonics with the same magnitude
df_mean = self.dataframe_longitudinal_slices[comp_name].groupby(level=1, axis="columns").mean()
df_mean = df_mean.rename(col_mapper(df_mean.columns, ka_columns_mean), axis=1)
df_std = self.dataframe_longitudinal_slices[comp_name].groupby(level=1, axis="columns").std()
df_std = df_std.rename(col_mapper(df_std.columns, ka_columns_std), axis=1)
self.dataframe_longitudinal = concat([self.dataframe_longitudinal, df_mean, df_std], axis=1)
if transverse:
# Mean: level = 1 corresponds to averaging all the k harmonics with the same magnitude
tdf_mean = self.dataframe_transverse_slices[comp_name].groupby(level=1, axis="columns").mean()
tdf_mean = tdf_mean.rename(col_mapper(tdf_mean.columns, ka_columns_mean), axis=1)
# Std
tdf_std = self.dataframe_transverse_slices[comp_name].groupby(level=1, axis="columns").std()
tdf_std = tdf_std.rename(col_mapper(tdf_std.columns, ka_columns_std), axis=1)
self.dataframe_transverse = concat([self.dataframe_transverse, tdf_mean, tdf_std], axis=1)
[docs] @calc_slices_doc
def calc_longitudinal_data(self):
self.parse_kt_data(nkt_flag=False, vkt_flag=True)
vkt_df = read_hdf(self.vkt_hdf_file, mode="r", key="vkt")
# Add frequencies
self.dataframe_longitudinal_slices[" _ _ _Frequencies"] = self.frequencies
# Containers
vkt = zeros((self.num_species, self.slice_steps, len(self.k_list)), dtype=complex128)
for isl in tqdm(range(self.no_slices), desc="Calculating longitudinal CCF for slice", disable=not self.verbose):
# Put data in the container to pass
for sp, sp_name in enumerate(self.species_names):
vkt[sp] = array(vkt_df[f"slice {isl + 1}"]["Longitudinal"][sp_name])
# Calculate Lkw and Tkw
Lkw = calc_Skw(vkt, self.k_list, self.species_num, self.slice_steps, self.dt, self.dump_step)
# Create the dataframe's column names
slc_column = f"slice {isl + 1}"
ka_columns = ["ka = {:.6f}".format(ka) for ka in self.ka_values]
# Save the full Lkw into a Dataframe
sp_indx = 0
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:]):
# Create the list of column names
# Final string : Longitudinal_H-He_slice 1_ka = 0.123456_k = [0, 0, 1]
column_names = []
col_sp_slc = f"{sp1}-{sp2}_" + slc_column
for ik in range(len(self.k_harmonics)):
ka_value = ka_columns[int(self.k_harmonics[ik, -1])]
col_name = col_sp_slc + f"_{ka_value}"
k_harm_col = f"_k = ["
for d in range(self.dimensions):
k_harm_col += f"{self.k_harmonics[ik, d].astype(int)}, "
# The above loop added ", " at the end. I don't want it
k_harm_col = k_harm_col[:-2] + f"]"
col_name += k_harm_col
column_names.append(col_name)
self.dataframe_longitudinal_slices = concat(
[self.dataframe_longitudinal_slices, DataFrame(Lkw[sp_indx, :, :].T, columns=column_names)],
axis=1,
)
sp_indx += 1
# Create the MultiIndex
tuples = [tuple(c.split("_")) for c in self.dataframe_longitudinal_slices.columns]
self.dataframe_longitudinal_slices.columns = MultiIndex.from_tuples(
tuples, names=["species", "slices", "ka_value", "k_harmonics"]
)
[docs] @calc_slices_doc
def calc_transverse_data(self):
self.parse_kt_data(nkt_flag=False, vkt_flag=True)
# Add frequencies
self.dataframe_transverse_slices[" _ _ _Frequencies"] = self.frequencies
vkt_df = read_hdf(self.vkt_hdf_file, mode="r", key="vkt")
# Containers
vkt_d = zeros((self.num_species, self.slice_steps, len(self.k_list)), dtype=complex128)
# number of independent observables
no_skw = int(len(self.species_num) * (len(self.species_num) + 1) / 2)
Tkw = zeros((no_skw, len(self.k_list), self.slice_steps))
for isl in tqdm(
range(self.no_slices), desc="Calculating transverse CCF for slice", position=0, disable=not self.verbose
):
for d, dim in tqdm(
zip(range(self.dimensions), ["i", "j", "k"]),
desc="Dimension",
position=1,
disable=not self.verbose,
leave=False,
):
# Put data in the container to pass
for sp, sp_name in enumerate(self.species_names):
vkt_d[sp] = array(vkt_df[f"slice {isl + 1}"][f"Transverse {dim}"][sp_name])
Tkw_d = calc_Skw(vkt_d, self.k_list, self.species_num, self.slice_steps, self.dt, self.dump_step)
Tkw += Tkw_d / self.dimensions
# Create the dataframe's column names
slc_column = "slice {}".format(isl + 1)
ka_columns = ["ka = {:.6f}".format(ka) for ik, ka in enumerate(self.ka_values)]
# Save the full Lkw into a Dataframe
sp_indx = 0
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:]):
# Create the list of column names
# Final string : H-He_slice 1_ka = 0.123456_k = [0, 0, 1]
column_names = []
col_sp_slc = f"{sp1}-{sp2}_" + slc_column
for ik in range(len(self.k_harmonics)):
ka_value = ka_columns[int(self.k_harmonics[ik, -1])]
col_name = col_sp_slc + f"_{ka_value}"
k_harm_col = f"_k = ["
for d in range(self.dimensions):
k_harm_col += f"{self.k_harmonics[ik, d].astype(int)}, "
# The above loop added ", " at the end. I don't want it
k_harm_col = k_harm_col[:-2] + f"]"
col_name += k_harm_col
column_names.append(col_name)
self.dataframe_transverse_slices = concat(
[self.dataframe_transverse_slices, DataFrame(Tkw[sp_indx, :, :].T, columns=column_names)], axis=1
)
sp_indx += 1
# Create the MultiIndex
tuples = [tuple(c.split("_")) for c in self.dataframe_transverse_slices.columns]
self.dataframe_transverse_slices.columns = MultiIndex.from_tuples(
tuples, names=["species", "slices", "ka_value", "k_harmonics"]
)
[docs] def compute(self, longitudinal: bool = False, transverse: bool = False):
"""
Routine for computing the current correlation function. See class doc for exact quantities.\n
The data of each slice is saved in hierarchical dataframes,
:py:attr:`~.dataframe_longitudinal_slices` or :py:attr:`~.dataframe_transverse_slices`. \n
The sliced averaged data is saved in other hierarchical dataframes,
:py:attr:`~.dataframe_longitudinal` :py:attr:`~.dataframe_transverse`.
Parameters
----------
longitudinal : bool
Flag for calculating the longitudinal CCF. Default = False.
transverse : bool
Flag for calculating the transverse CCF. Default = False.
"""
t0 = self.timer.current()
if longitudinal:
self.calc_longitudinal_data()
if transverse:
self.calc_transverse_data()
self.average_slices_data(longitudinal, transverse)
self.save_hdf(longitudinal, transverse)
tend = self.timer.current()
time_stamp(self.log_file, self.__long_name__ + " Calculation", self.timer.time_division(tend - t0), self.verbose)
[docs] def parse(self, longitudinal: bool = False, transverse: bool = False):
if longitudinal and not transverse:
try:
self.dataframe_longitudinal = read_hdf(self.filename_hdf_longitudinal, mode="r", index_col=False)
k_data = load(self.k_file)
self.k_list = k_data["k_list"]
self.k_counts = k_data["k_counts"]
self.ka_values = k_data["ka_values"]
except FileNotFoundError:
msg = f"\nFile {self.filename_hdf_longitudinal} not found!\nComputing longitudinal ccf ..."
print_to_logger(msg, self.log_file, self.verbose)
self.compute(longitudinal=longitudinal)
elif transverse and not longitudinal:
try:
self.dataframe_transverse = read_hdf(self.filename_hdf_transverse, mode="r", index_col=False)
k_data = load(self.k_file)
self.k_list = k_data["k_list"]
self.k_counts = k_data["k_counts"]
self.ka_values = k_data["ka_values"]
except FileNotFoundError:
msg = f"\nFile {self.filename_hdf_transverse} not found!\nComputing transverse ccf ..."
print_to_logger(msg, self.log_file, self.verbose)
self.compute(longitudinal=False, transverse=True)
elif longitudinal and transverse:
try:
self.dataframe_longitudinal = read_hdf(self.filename_hdf_longitudinal, mode="r", index_col=False)
self.dataframe_transverse = read_hdf(self.filename_hdf_transverse, mode="r", index_col=False)
k_data = load(self.k_file)
self.k_list = k_data["k_list"]
self.k_counts = k_data["k_counts"]
self.ka_values = k_data["ka_values"]
except FileNotFoundError:
msg = f"\nFile {self.filename_hdf_longitudinal} or {self.filename_hdf_transverse} not found!\nComputing all ccf ..."
print_to_logger(msg, self.log_file, self.verbose)
self.compute(longitudinal=True, transverse=True)
else:
msg = (
"Direction not defined. Call the method with either option set to true."
"\nlongitudinal = True/False, transverse = True/False"
)
print_to_logger(msg, self.log_file, self.verbose)
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase=phase, no_slices=no_slices)
self.update_args(**kwargs)
[docs] def save_hdf(self, longitudinal: bool = False, transverse: bool = False):
# Create the columns for the HDF df
if longitudinal:
if not isinstance(self.dataframe_longitudinal_slices.columns, MultiIndex):
self.dataframe_longitudinal_slices.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.dataframe_longitudinal_slices.columns]
)
if not isinstance(self.dataframe_longitudinal.columns, MultiIndex):
self.dataframe_longitudinal.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.dataframe_longitudinal.columns]
)
# Sort the index for speed
# see https://stackoverflow.com/questions/54307300/what-causes-indexing-past-lexsort-depth-warning-in-pandas
self.dataframe_longitudinal = self.dataframe_longitudinal.sort_index()
self.dataframe_longitudinal_slices = self.dataframe_longitudinal_slices.sort_index()
# TODO: Fix this hack. We should be able to add data to HDF instead of removing it and rewriting it.
# Save the data.
if os_path_exists(self.filename_hdf_longitudinal_slices):
os_remove(self.filename_hdf_longitudinal_slices)
self.dataframe_longitudinal_slices.to_hdf(self.filename_hdf_longitudinal_slices, mode="w", key=self.__name__)
if os_path_exists(self.filename_hdf_longitudinal):
os_remove(self.filename_hdf_longitudinal)
self.dataframe_longitudinal.to_hdf(self.filename_hdf_longitudinal, mode="w", key=self.__name__)
# Create the columns for the HDF df
if transverse:
if not isinstance(self.dataframe_transverse_slices.columns, MultiIndex):
self.dataframe_transverse_slices.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.dataframe_transverse_slices.columns]
)
if not isinstance(self.dataframe_transverse.columns, MultiIndex):
self.dataframe_transverse.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.dataframe_transverse.columns]
)
# Sort the index for speed
# see https://stackoverflow.com/questions/54307300/what-causes-indexing-past-lexsort-depth-warning-in-pandas
self.dataframe_transverse = self.dataframe_transverse.sort_index()
self.dataframe_transverse_slices = self.dataframe_transverse_slices.sort_index()
# TODO: Fix this hack. We should be able to add data to HDF instead of removing it and rewriting it.
# Save the data.
if os_path_exists(self.filename_hdf_transverse_slices):
os_remove(self.filename_hdf_transverse_slices)
self.dataframe_transverse_slices.to_hdf(self.filename_hdf_transverse_slices, mode="w", key=self.__name__)
if os_path_exists(self.filename_hdf_transverse):
os_remove(self.filename_hdf_transverse)
self.dataframe_transverse.to_hdf(self.filename_hdf_transverse, mode="w", key=self.__name__)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
self.update_finish()
[docs]class DiffusionFlux(Observable):
"""Diffusion Fluxes and their Auto-correlation functions.\n
The :math:`\\alpha` diffusion flux :math:`\\mathbf J_{\\alpha}(t)` is calculated from eq.~(3.5) in :cite:`Zhou1996` which
reads
.. math::
\\mathbf J_{\\alpha}(t) = \\frac{m_{\\alpha}}{m_{\\rm tot} }
\\sum_{\\beta = 1}^{M} \\left ( m_{\\rm tot} \\delta_{\\alpha\\beta} - x_{\\alpha} m_{\\beta} \\right )
\\mathbf j_{\\beta} (t)
where :math:`M` is the total number of species, :math:`m_{\\rm tot} = \\sum_{i}^M m_{i}` is the total mass of the
system, :math:`x_{\\alpha} = N_{\\alpha} / N_{\\rm tot}` is the concentration of species :math:`\\alpha`, and
.. math::
\\mathbf j_{\\alpha}(t) = \\sum_{i = 1}^{N_{\\alpha}} \\mathbf v_{i}(t)
is the microscopic velocity field of species :math:`\\alpha`.
"""
[docs] def __init__(self):
super().__init__()
self.__name__ = "diff_flux"
self.__long_name__ = "Diffusion Flux"
self.acf_observable = True
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase, no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
self.no_fluxes = self.num_species - 1
self.no_fluxes_acf = int(self.no_fluxes * self.no_fluxes)
self.update_finish()
[docs] @compute_doc
def compute(self):
t0 = self.timer.current()
# for run_idx, run_id in enumerate(self.runs):
# self.dump_dir = self.dump_dirs_list[run_idx]
self.calc_slices_data()
self.average_slices_data()
self.save_hdf()
tend = self.timer.current()
time_stamp(self.log_file, self.__long_name__ + " Calculation", self.timer.time_division(tend - t0), self.verbose)
[docs] @calc_slices_doc
def calc_slices_data(self):
start_slice = 0
end_slice = self.slice_steps * self.dump_step
time = zeros(self.slice_steps)
df_str = "Diffusion Flux"
df_acf_str = "Diffusion Flux ACF"
for isl in tqdm(
range(self.no_slices),
desc=f"\nCalculating {df_str} for slice ",
disable=not self.verbose,
position=0,
):
# Parse the particles from the dump files
vel = zeros((self.dimensions, self.slice_steps, self.total_num_ptcls))
#
for it, dump in enumerate(
tqdm(
range(start_slice, end_slice, self.dump_step),
desc="Reading data",
disable=not self.verbose,
position=1,
leave=False,
)
):
datap = load_from_restart(self.dump_dir, dump)
time[it] = datap["time"]
vel[0, it, :] = datap["vel"][:, 0]
vel[1, it, :] = datap["vel"][:, 1]
vel[2, it, :] = datap["vel"][:, 2]
#
if isl == 0:
self.dataframe["Time"] = time
self.dataframe_slices["Time"] = time
self.dataframe_acf["Time"] = time
self.dataframe_acf_slices["Time"] = time
# This returns two arrays
# diff_fluxes = array of shape (no_fluxes, no_dim, no_dumps_per_slice)
# df_acf = array of shape (no_fluxes_acf, no_dim + 1, no_dumps_per_slice)
diff_fluxes, df_acf = calc_diff_flux_acf(
vel, self.species_num, self.species_concentrations, self.species_masses
)
# # Store the data
for i, flux in enumerate(diff_fluxes):
for d, dim in zip(range(self.dimensions), ["X", "Y", "Z"]):
col_name = df_str + f" {i}_{dim}_slice {isl}"
col_data = flux[d, :]
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
for i, flux_acf in enumerate(df_acf):
for d, dim in zip(range(self.dimensions), ["X", "Y", "Z"]):
col_name = df_acf_str + f" {i}_{dim}_slice {isl}"
col_data = flux_acf[d, :]
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, col_data, col_name)
col_name = df_acf_str + f" {i}_Total_slice {isl}"
col_data = flux_acf[-1, :]
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, col_data, col_name)
start_slice += self.slice_steps * self.dump_step
end_slice += self.slice_steps * self.dump_step
[docs] @avg_slices_doc
def average_slices_date(self):
df_str = "Diffusion Flux"
df_acf_str = "Diffusion Flux ACF"
# Average and std over the slices
for i in range(self.no_fluxes):
for d, dim in zip(range(self.dimensions), ["X", "Y", "Z"]):
dim_col_str = [df_str + f" {i}_{dim}_slice {isl}" for isl in range(self.no_slices)]
col_name = df_str + f" {i}_{dim}_Mean"
col_data = self.dataframe_slices[dim_col_str].mean(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
col_name = df_str + f" {i}_{dim}_Std"
col_data = self.dataframe_slices[dim_col_str].std(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Average and std over the slices
for i in range(self.no_fluxes_acf):
for d, dim in zip(range(self.dimensions), ["X", "Y", "Z"]):
dim_col_str = [df_acf_str + f" {i}_{dim}_slice {isl}" for isl in range(self.no_slices)]
col_name = df_acf_str + f" {i}_{dim}_Mean"
col_data = self.dataframe_acf_slices[dim_col_str].mean(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
col_name = df_acf_str + f" {i}_{dim}_Std"
col_data = self.dataframe_acf_slices[dim_col_str].std(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
tot_col_str = [df_acf_str + f" {i}_Total_slice {isl}" for isl in range(self.no_slices)]
# Average
col_name = df_acf_str + f" {i}_Total_Mean"
col_data = self.dataframe_acf_slices[tot_col_str].mean(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# STD
col_name = df_acf_str + f" {i}_Total_Std"
col_data = self.dataframe_acf_slices[tot_col_str].std(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
[docs]class DynamicStructureFactor(Observable):
"""Dynamic Structure factor.
The species dependent DSF :math:`S_{AB}(k,\\omega)` is calculated from
.. math::
S_{AB}(k,\\omega) = \\int_0^\\infty dt \\,
\\left \\langle | n_{A}( \\mathbf k, t)n_{B}( -\\mathbf k, t) \\right \\rangle e^{i \\omega t},
where the microscopic density of species :math:`A` with number of particles :math:`N_{A}` is given by
.. math::
n_{A}(\\mathbf k,t) = \\sum^{N_{A}}_{j} e^{-i \\mathbf k \\cdot \\mathbf r_j(t)} .
"""
[docs] def __init__(self):
super().__init__()
self.__name__ = "dsf"
self.__long_name__ = "Dynamic Structure Factor"
self.kw_observable = True
self.k_observable = True
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = 1, **kwargs):
super().setup_init(params, phase, no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
self.update_finish()
[docs] @compute_doc
def compute(self):
t0 = self.timer.current()
# for run_idx, run_id in enumerate(self.runs):
# self.dump_dir = self.dump_dirs_list[run_idx]
self.calc_slices_data()
self.average_slices_data()
self.save_hdf()
tend = self.timer.current()
time_stamp(self.log_file, self.__long_name__ + " Calculation", self.timer.time_division(tend - t0), self.verbose)
[docs] @calc_slices_doc
def calc_slices_data(self):
# Parse nkt otherwise calculate it
self.parse_kt_data(nkt_flag=True)
nkt_df = read_hdf(self.nkt_hdf_file, mode="r", key="nkt")
for isl in tqdm(range(self.no_slices), desc="Calculating DSF for slice", disable=not self.verbose):
# Initialize container
nkt = zeros((self.num_species, self.slice_steps, len(self.k_list)), dtype=complex128)
for sp, sp_name in enumerate(self.species_names):
nkt[sp] = array(nkt_df["slice {}".format(isl + 1)][sp_name])
# Calculate Skw
Skw_all = calc_Skw(nkt, self.k_list, self.species_num, self.slice_steps, self.dt, self.dump_step)
# Create the dataframe's column names
slc_column = f"slice {isl + 1}"
ka_columns = [f"ka = {ka:.8f}" for ik, ka in enumerate(self.ka_values)]
# Save the full Skw into a Dataframe
sp_indx = 0
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:]):
columns = [
"{}-{}_".format(sp1, sp2)
+ slc_column
+ "_{}_k = [{}, {}, {}]".format(
ka_columns[int(self.k_harmonics[ik, -1])], *self.k_harmonics[ik, :-2].astype(int)
)
for ik in range(len(self.k_harmonics))
]
self.dataframe_slices = concat(
[self.dataframe_slices, DataFrame(Skw_all[sp_indx, :, :].T, columns=columns)], axis=1
)
sp_indx += 1
# Create the MultiIndex
tuples = [tuple(c.split("_")) for c in self.dataframe_slices.columns]
self.dataframe_slices.columns = MultiIndex.from_tuples(
tuples, names=["species", "slices", "k_index", "k_harmonics"]
)
[docs] @avg_slices_doc
def average_slices_data(self):
# Now for the actual df
self.dataframe[" _ _Frequencies"] = self.frequencies
# Take the mean and std and store them into the dataframe to return
for sp1, sp1_name in enumerate(self.species_names):
for sp2, sp2_name in enumerate(self.species_names[sp1:], sp1):
skw_name = f"{sp1_name}-{sp2_name}"
# Rename the columns with values of ka
ka_columns = [skw_name + f"_Mean_ka{ik + 1} = {ka:.4f}" for ik, ka in enumerate(self.ka_values)]
# Mean: level = 1 corresponds to averaging all the k harmonics with the same magnitude
df_mean = self.dataframe_slices[skw_name].groupby(level=1, axis=1).mean()
df_mean = df_mean.rename(col_mapper(df_mean.columns, ka_columns), axis=1)
# Std
ka_columns = [skw_name + f"_Std_ka{ik + 1} = {ka:.4f}" for ik, ka in enumerate(self.ka_values)]
df_std = self.dataframe_slices[skw_name].groupby(level=1, axis=1).std()
df_std = df_std.rename(col_mapper(df_std.columns, ka_columns), axis=1)
self.dataframe = concat([self.dataframe, df_mean, df_std], axis=1)
[docs]class ElectricCurrent(Observable):
"""Electric Current Auto-correlation function."""
[docs] def __init__(self):
super().__init__()
self.__name__ = "ec"
self.__long_name__ = "Electric Current"
self.acf_observable = True
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase, no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments. e.g time_averaging and timesteps_to_skip
self.__dict__.update(kwargs.copy())
self.update_finish()
[docs] @compute_doc
def compute(self):
# Initialize timer
t0 = self.timer.current()
self.calc_slices_data()
self.average_slices_data()
self.save_hdf()
tend = self.timer.current()
time_stamp(self.log_file, "Electric Current Calculation", self.timer.time_division(tend - t0), self.verbose)
[docs] def calc_slices_data(self):
start_slice = 0
end_slice = self.slice_steps * self.dump_step
time = zeros(self.slice_steps)
# Loop over the slices of each run
for isl in tqdm(
range(self.no_slices),
desc=f"\nCalculating {self.__long_name__} for slice ",
disable=not self.verbose,
position=0,
):
# Parse the particles from the dump files
species_current = zeros((self.num_species, self.dimensions, self.slice_steps))
for it, dump in enumerate(range(start_slice, end_slice, self.dump_step)):
datap = load_from_restart(self.dump_dir, dump)
time[it] = datap["time"]
species_current[:, :, it] = datap["species_electric_current"]
#
if isl == 0:
self.dataframe["Time"] = time.copy()
self.dataframe_acf["Time"] = time.copy()
self.dataframe_slices["Time"] = time.copy()
self.dataframe_acf_slices["Time"] = time.copy()
# species_current, total_current = calc_elec_current(vel, self.species_charges, self.species_num)
total_current = species_current.sum(axis=0)
# Store species data
for i, sp_name in enumerate(self.species_names):
sp_col_str = f"{sp_name} {self.__long_name__}"
sp_col_str_acf = f"{sp_name} {self.__long_name__} ACF"
sp_tot_acf = zeros(total_current.shape[1])
for d in range(self.dimensions):
dl = self.dim_labels[d]
col_name = sp_col_str + f"_{dl}_slice {isl}"
col_data = species_current[i, d, :]
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
# Calculate ACF
col_data = correlationfunction(species_current[i, d, :], species_current[i, d, :])
col_name = sp_col_str_acf + f"_{dl}_slice {isl}"
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, col_data, col_name)
sp_tot_acf += col_data
# Store Total ACF of single species
col_name = sp_col_str_acf + f"_Total_slice {isl}"
col_data = sp_tot_acf
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, col_data, col_name)
# Total current and its ACF
tot_acf = zeros(total_current.shape[1])
for d in range(self.dimensions):
dl = self.dim_labels[d]
col_name = f"{self.__long_name__}_{dl}_slice {isl}"
col_data = total_current[d, :]
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
# Calculate ACF
col_data = correlationfunction(total_current[d, :], total_current[d, :])
col_name = f"{self.__long_name__} ACF_{dl}_slice {isl}"
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, col_data, col_name)
tot_acf += col_data
col_data = tot_acf
col_name = f"{self.__long_name__} ACF_Total_slice {isl}"
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, col_data, col_name)
start_slice += self.slice_steps * self.dump_step
end_slice += self.slice_steps * self.dump_step
[docs] def average_slices_data(self):
"""Average and std over the slices."""
# Species data
for i, sp_name in enumerate(self.species_names):
col_str = f"{sp_name} {self.__long_name__}"
col_str_acf = f"{sp_name} {self.__long_name__} ACF"
for d in range(self.dimensions):
dl = self.dim_labels[d]
dim_col_str = [f"{col_str}_{dl}_slice {isl}" for isl in range(self.no_slices)]
col_data = self.dataframe_slices[dim_col_str].mean(axis=1).values
col_name = f"{col_str}_{dl}_Mean"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
col_data = self.dataframe_slices[dim_col_str].std(axis=1).values
col_name = f"{col_str}_{dl}_Std"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# ACF averages
dim_col_str_acf = [f"{col_str_acf}_{dl}_slice {isl}" for isl in range(self.no_slices)]
col_data = self.dataframe_acf_slices[dim_col_str_acf].mean(axis=1).values
col_name = f"{col_str_acf}_{dl}_Mean"
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
col_data = self.dataframe_acf_slices[dim_col_str_acf].std(axis=1).values
col_name = f"{col_str_acf}_{dl}_Std"
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
tot_col_str = [f"{col_str_acf}_Total_slice {isl}" for isl in range(self.no_slices)]
col_data = self.dataframe_acf_slices[tot_col_str].mean(axis=1).values
col_name = f"{col_str}_Total_Mean"
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
col_data = self.dataframe_acf_slices[tot_col_str].std(axis=1).values
col_name = f"{col_str}_Total_Std"
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# Average and std over the slices
# Total Current data
for d in range(self.dimensions):
dl = self.dim_labels[d]
dim_col_str = [f"{self.__long_name__}_{dl}_slice {isl}" for isl in range(self.no_slices)]
dim_col_str_acf = [f"{self.__long_name__} ACF_{dl}_slice {isl}" for isl in range(self.no_slices)]
self.dataframe[f"{self.__long_name__}_{dl}_Mean"] = self.dataframe_slices[dim_col_str].mean(axis=1)
self.dataframe[f"{self.__long_name__}_{dl}_Std"] = self.dataframe_slices[dim_col_str].std(axis=1)
# ACF
self.dataframe_acf[f"{self.__long_name__}_{dl}_Mean"] = self.dataframe_acf_slices[dim_col_str_acf].mean(
axis=1
)
self.dataframe_acf[f"{self.__long_name__}_{dl}_Std"] = self.dataframe_acf_slices[dim_col_str_acf].std(axis=1)
# Total ACF
tot_col_str = [f"{self.__long_name__} ACF_Total_slice {isl}" for isl in range(self.no_slices)]
col_data = self.dataframe_acf_slices[tot_col_str].mean(axis=1).values
col_name = self.__long_name__ + f" ACF_Total_Mean"
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
col_data = self.dataframe_acf_slices[tot_col_str].std(axis=1).values
col_name = self.__long_name__ + f" ACF_Total_Std"
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
[docs]class HeatFlux(Observable):
"""Heat Flux."""
[docs] def __init__(self):
super().__init__()
self.__name__ = "heat_flux_species_tensor"
self.__long_name__ = "Heat Flux"
self.acf_observable = True
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase, no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(**kwargs)
self.update_finish()
[docs] @compute_doc
def compute(self, calculate_acf: bool = False):
t0 = self.timer.current()
self.calc_slices_data()
self.average_slices_data()
self.save_hdf()
tend = self.timer.current()
time_stamp(
self.log_file,
self.__long_name__ + " calculation",
self.timer.time_division(tend - t0),
self.verbose,
)
if calculate_acf:
self.compute_acf()
[docs] @compute_acf_doc
def compute_acf(self, equal_number_time_samples: bool = False):
t0 = self.timer.current()
self.calc_acf_slices_data(equal_number_time_samples)
self.average_acf_slices_data()
self.save_acf_hdf()
tend = self.timer.current()
time_stamp(
self.log_file,
self.__long_name__ + " ACF calculation",
self.timer.time_division(tend - t0),
self.verbose,
)
[docs] @calc_slices_doc
def calc_slices_data(self):
time = zeros(self.slice_steps)
# Let's compute
start_slice_step = 0
end_slice_step = self.slice_steps * self.dump_step
### Slices loop
for isl in tqdm(
range(self.no_slices),
desc=f"\nCalculating {self.__long_name__} for slice ",
disable=not self.verbose,
position=0,
):
# Parse the particles from the dump files
heat_flux_species_tensor = zeros((3, self.num_species, self.slice_steps))
for it, dump in enumerate(range(start_slice_step, end_slice_step, self.dump_step)):
datap = load_from_restart(self.dump_dir, dump)
time[it] = datap["time"]
heat_flux_species_tensor[:, :, it] = datap["species_heat_flux"]
if isl == 0:
time_str = f"{self.__long_name__}_Species_Axis_Time"
self.dataframe[time_str] = time.copy()
self.dataframe_slices[time_str] = time.copy()
# Store data into dataframes
for isp, sp1 in enumerate(self.species_names):
for iax, ax in enumerate(self.dim_labels):
col_data = heat_flux_species_tensor[iax, isp, :]
col_name = f"{self.__long_name__}_{sp1}_{ax}_slice {isl}"
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
start_slice_step += self.slice_steps * self.dump_step
end_slice_step += self.slice_steps * self.dump_step
# end of slice loop
[docs] @calc_acf_slices_doc
def calc_acf_slices_data(self, equal_number_time_samples: bool = False):
if equal_number_time_samples:
# In order to have the same number of timesteps products for each time lag of the ACF do the following.
# Take two slice data
start_slice_step = 0
end_slice_step = int(2 * self.acf_slice_steps * self.dump_step)
time = zeros(self.acf_slice_steps)
# Temporarily store two consecutive slice data
heat_flux_species_tensor = zeros((3, self.num_species, 2 * self.acf_slice_steps))
# Slices loop
ec_ACF = zeros(self.acf_slice_steps)
for isl in tqdm(
range(self.no_slices),
desc=f"\nCalculating {self.__long_name__} ACF for slice ",
disable=not self.verbose,
position=0,
):
# This could be
for it, dump in enumerate(range(start_slice_step, end_slice_step, self.dump_step)):
datap = load_from_restart(self.dump_dir, dump)
heat_flux_species_tensor[:, :, it] = datap["species_heat_flux"]
time[it] = datap["time"]
if isl == 0:
time_str = f"{self.__long_name__}_Species_Axis_Time"
self.dataframe_acf[time_str] = time[: self.acf_slice_steps].copy()
self.dataframe_acf_slices[time_str] = time[: self.acf_slice_steps].copy()
# Calculate the ACF and store into dataframes
total_heat_flux = zeros((self.no_obs, self.acf_slice_steps))
# These loops calculate the acf of each pair for each axis. In this order
# EC_X_11 -> EC_X_12 -> EC_X_22 -> EC_Y_11 -> ... -> EC_Z_11 -> ...
for iax, ax in enumerate(self.dim_labels):
for isp, sp1 in enumerate(self.species_names):
for isp2, sp2 in enumerate(self.species_names[isp:], isp):
# inter-species multiplier. 1 if both species are equal, 2 if not.
const = 1 * (isp == isp2) + 2 * (isp < isp2)
# Index of total_ec
k = int(self.num_species * isp - (isp - 1) * isp / 2 + (isp2 - isp))
# Auto-correlation function with the same number of time steps for each lag.
for it in tqdm(
range(self.acf_slice_steps),
desc=f"{sp1}-{sp2} ACF {ax} time lag",
disable=not self.verbose,
position=0,
leave=False,
):
ec_sp1 = heat_flux_species_tensor[iax, isp, : self.acf_slice_steps + it]
ec_sp2 = heat_flux_species_tensor[iax, isp2, : self.acf_slice_steps + it]
delta_ec_sp1 = ec_sp1 - ec_sp1.mean(axis=-1)
delta_ec_sp2 = ec_sp2 - ec_sp2.mean(axis=-1)
ec_ACF[it] = correlationfunction(delta_ec_sp1, delta_ec_sp2)[it]
else:
ec_sp1 = heat_flux_species_tensor[iax, isp, : self.acf_slice_steps]
ec_sp2 = heat_flux_species_tensor[iax, isp2, : self.acf_slice_steps]
delta_ec_sp1 = ec_sp1 - ec_sp1.mean(axis=-1)
delta_ec_sp2 = ec_sp2 - ec_sp2.mean(axis=-1)
ec_ACF = correlationfunction(delta_ec_sp1, delta_ec_sp2)
# Store in the dataframe
col_name = f"{self.__long_name__} ACF_{sp1}-{sp2}_{ax}_slice {isl}"
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, ec_ACF, col_name)
# Add to the total ACF of each species pair.
total_heat_flux[k] += const * ec_ACF
# Store the total EC_11, EC_12, EC_13, .. EC_22, EC_23, ...
for isp, sp1 in enumerate(self.species_names):
for isp2, sp2 in enumerate(self.species_names[isp:], isp):
# Index of total_ec
k = int(self.num_species * isp - (isp - 1) * isp / 2 + (isp2 - isp))
col_name = f"{self.__long_name__} ACF_{sp1}-{sp2}_Total_slice {isl}"
col_data = total_heat_flux[k, :]
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, col_data, col_name)
# Sum over the species to get the true EC of the system
if self.num_species > 1:
col_name = f"{self.__long_name__} ACF_all-all_Total_slice {isl}"
self.dataframe_acf_slices = add_col_to_df(
self.dataframe_acf_slices, total_heat_flux.sum(axis=0) / self.dimensions, col_name
)
# Advance by only one slice at a time.
start_slice_step += self.acf_slice_steps * self.dump_step
end_slice_step += self.acf_slice_steps * self.dump_step
# end of slice loop
else:
start_slice_step = 0
end_slice_step = int(self.slice_steps * self.dump_step)
time = zeros(self.slice_steps)
# Temporarily store two consecutive slice data
heat_flux_species_tensor = zeros((3, self.num_species, self.slice_steps))
# Slices loop
ec_ACF = zeros(self.slice_steps)
for isl in tqdm(
range(self.no_slices),
desc=f"\nCalculating {self.__long_name__} ACF for slice ",
disable=not self.verbose,
position=0,
):
# This could be
for it, dump in enumerate(range(start_slice_step, end_slice_step, self.dump_step)):
datap = load_from_restart(self.dump_dir, dump)
heat_flux_species_tensor[:, :, it] = datap["species_heat_flux"]
time[it] = datap["time"]
if isl == 0:
time_str = f"{self.__long_name__}_Species_Axis_Time"
self.dataframe_acf[time_str] = time[:].copy()
self.dataframe_acf_slices[time_str] = time[:].copy()
# Calculate the ACF and store into dataframes
total_heat_flux = zeros((self.no_obs, self.slice_steps))
# These loops calculate the acf of each pair for each axis. In this order
# EC_X_11 -> EC_X_12 -> EC_X_22 -> EC_Y_11 -> ... -> EC_Z_11 -> ...
for iax, ax in enumerate(self.dim_labels):
for isp, sp1 in enumerate(self.species_names):
for isp2, sp2 in enumerate(self.species_names[isp:], isp):
# inter-species multiplier. 1 if both species are equal, 2 if not.
const = 1 * (isp == isp2) + 2 * (isp < isp2)
# Index of total_ec
k = int(self.num_species * isp - (isp - 1) * isp / 2 + (isp2 - isp))
# Auto-correlation function with the same number of time steps for each lag.
ec_sp1 = heat_flux_species_tensor[iax, isp, :]
ec_sp2 = heat_flux_species_tensor[iax, isp2, :]
delta_ec_sp1 = ec_sp1 - ec_sp1.mean(axis=-1)
delta_ec_sp2 = ec_sp2 - ec_sp2.mean(axis=-1)
ec_ACF = correlationfunction(delta_ec_sp1, delta_ec_sp2)
# Store in the dataframe
col_name = f"{self.__long_name__} ACF_{sp1}-{sp2}_{ax}_slice {isl}"
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, ec_ACF, col_name)
# Add to the total ACF of each species pair.
total_heat_flux[k] += const * ec_ACF
# Store the total EC_11, EC_12, EC_13, .. EC_22, EC_23, ...
for isp, sp1 in enumerate(self.species_names):
for isp2, sp2 in enumerate(self.species_names[isp:], isp):
# Index of total_ec
k = int(self.num_species * isp - (isp - 1) * isp / 2 + (isp2 - isp))
col_name = f"{self.__long_name__} ACF_{sp1}-{sp2}_Total_slice {isl}"
col_data = total_heat_flux[k, :]
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, col_data, col_name)
# Sum over the species to get the true EC of the system
if self.num_species > 1:
col_name = f"{self.__long_name__} ACF_all-all_Total_slice {isl}"
self.dataframe_acf_slices = add_col_to_df(
self.dataframe_acf_slices, total_heat_flux.sum(axis=0) / self.dimensions, col_name
)
# Advance by only one slice at a time.
start_slice_step += self.slice_steps * self.dump_step
end_slice_step += self.slice_steps * self.dump_step
# end of slice loop
[docs] @avg_slices_doc
def average_slices_data(self):
for isp, sp1 in enumerate(self.species_names):
for _, ax in enumerate(self.dim_labels):
columns = [f"{self.__long_name__}_{sp1}_{ax}_slice {isl}" for isl in range(self.no_slices)]
col_data = self.dataframe_slices[columns].mean(axis=1)
col_name = f"{self.__long_name__}_{sp1}_{ax}_Mean"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
col_data = self.dataframe_slices[columns].std(axis=1)
col_name = f"{self.__long_name__}_{sp1}_{ax}_Std"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
[docs] @avg_acf_slices_doc
def average_acf_slices_data(self):
# ACF data
dim_labels = [*self.dim_labels, "Total"]
if self.num_species > 1:
species_list = [*self.species_names, "all"]
else:
species_list = self.species_names
for isp, sp1 in enumerate(species_list):
for isp2, sp2 in enumerate(species_list[isp:], isp):
for _, ax in enumerate(dim_labels):
columns = [f"{self.__long_name__} ACF_{sp1}-{sp2}_{ax}_slice {isl}" for isl in range(self.no_slices)]
# Mean
col_data = self.dataframe_acf_slices[columns].mean(axis=1)
col_name = f"{self.__long_name__} ACF_{sp1}-{sp2}_{ax}_Mean"
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# Std
col_data = self.dataframe_acf_slices[columns].std(axis=1)
col_name = f"{self.__long_name__} ACF_{sp1}-{sp2}_{ax}_Std"
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
[docs]class PressureTensor(Observable):
"""Pressure Tensor."""
[docs] def __init__(self):
super().__init__()
self.__name__ = "pressure_tensor"
self.__long_name__ = "Pressure Tensor"
self.acf_observable = True
self.kinetic_potential_division = False
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase, no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(**kwargs)
self.update_finish()
[docs] def df_column_names(self):
# Dataframes' columns names
pt_str_kin = "Pressure Tensor Kinetic"
pt_str_pot = "Pressure Tensor Potential"
pt_str = "Pressure Tensor"
pt_acf_str_kin = "Pressure Tensor Kinetic ACF"
pt_acf_str_pot = "Pressure Tensor Potential ACF"
pt_acf_str_kinpot = "Pressure Tensor Kin-Pot ACF"
pt_acf_str_potkin = "Pressure Tensor Pot-Kin ACF"
pt_acf_str = "Pressure Tensor ACF"
# Slice Dataframe
slice_df_column_names = ["Time"]
for isl in range(self.no_slices):
slice_df_column_names.append(f"Pressure_slice {isl}")
slice_df_column_names.append(f"Delta Pressure_slice {isl}")
for i, ax1 in enumerate(self.dim_labels):
for j, ax2 in enumerate(self.dim_labels):
slice_df_column_names.append(pt_str_kin + f" {ax1}{ax2}_slice {isl}")
slice_df_column_names.append(pt_str_pot + f" {ax1}{ax2}_slice {isl}")
slice_df_column_names.append(pt_str + f" {ax1}{ax2}_slice {isl}")
# ACF Slice Dataframe
acf_slice_df_column_names = ["Time"]
for isl in range(self.no_slices):
acf_slice_df_column_names.append(f"Pressure ACF_slice {isl}")
acf_slice_df_column_names.append(f"Delta Pressure ACF_slice {isl}")
for ax1 in self.dim_labels:
for ax2 in self.dim_labels:
for ax3 in self.dim_labels:
for ax4 in self.dim_labels:
acf_slice_df_column_names.append(pt_acf_str_kin + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}")
acf_slice_df_column_names.append(pt_acf_str_pot + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}")
acf_slice_df_column_names.append(pt_acf_str_kinpot + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}")
acf_slice_df_column_names.append(pt_acf_str_potkin + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}")
acf_slice_df_column_names.append(pt_acf_str + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}")
# Mean and std Dataframe
df_column_names = ["Time", "Pressure_Mean", "Pressure_Std", "Delta Pressure_Mean", "Delta Pressure_Std"]
for i, ax1 in enumerate(self.dim_labels):
for j, ax2 in enumerate(self.dim_labels):
df_column_names.append(pt_str_kin + f" {ax1}{ax2}_Mean")
df_column_names.append(pt_str_kin + f" {ax1}{ax2}_Std")
df_column_names.append(pt_str_pot + f" {ax1}{ax2}_Mean")
df_column_names.append(pt_str_pot + f" {ax1}{ax2}_Std")
df_column_names.append(pt_str + f" {ax1}{ax2}_Mean")
df_column_names.append(pt_str + f" {ax1}{ax2}_Std")
# Mean and std Dataframe
acf_df_column_names = [
"Time",
"Pressure ACF_Mean",
"Pressure ACF_Std",
"Delta Pressure ACF_Mean",
"Delta Pressure ACF_Std",
]
for ax1 in self.dim_labels:
for ax2 in self.dim_labels:
for ax3 in self.dim_labels:
for ax4 in self.dim_labels:
acf_df_column_names.append(pt_acf_str_kin + f" {ax1}{ax2}{ax3}{ax4}_Mean")
acf_df_column_names.append(pt_acf_str_kin + f" {ax1}{ax2}{ax3}{ax4}_Std")
acf_df_column_names.append(pt_acf_str_pot + f" {ax1}{ax2}{ax3}{ax4}_Mean")
acf_df_column_names.append(pt_acf_str_pot + f" {ax1}{ax2}{ax3}{ax4}_Std")
acf_df_column_names.append(pt_acf_str_kinpot + f" {ax1}{ax2}{ax3}{ax4}_Mean")
acf_df_column_names.append(pt_acf_str_kinpot + f" {ax1}{ax2}{ax3}{ax4}_Std")
acf_df_column_names.append(pt_acf_str_potkin + f" {ax1}{ax2}{ax3}{ax4}_Mean")
acf_df_column_names.append(pt_acf_str_potkin + f" {ax1}{ax2}{ax3}{ax4}_Std")
acf_df_column_names.append(pt_acf_str + f" {ax1}{ax2}{ax3}{ax4}_Mean")
acf_df_column_names.append(pt_acf_str + f" {ax1}{ax2}{ax3}{ax4}_Std")
self.dataframe = pd.DataFrame(columns=df_column_names)
self.dataframe_slices = pd.DataFrame(columns=slice_df_column_names)
self.dataframe_acf = pd.DataFrame(columns=acf_df_column_names)
self.dataframe_acf_slices = pd.DataFrame(columns=acf_slice_df_column_names)
[docs] @compute_doc
def compute(self, calculate_acf: bool = False, kin_pot_division: bool = False):
self.kinetic_potential_division = kin_pot_division
t0 = self.timer.current()
self.calc_slices_data()
self.average_slices_data()
self.save_hdf()
tend = self.timer.current()
time_stamp(
self.log_file,
self.__long_name__ + " calculation",
self.timer.time_division(tend - t0),
self.verbose,
)
if calculate_acf:
self.compute_acf()
[docs] @compute_acf_doc
def compute_acf(self, kin_pot_division: bool = False, equal_number_time_samples: bool = False):
self.kinetic_potential_division = kin_pot_division
t0 = self.timer.current()
self.calc_acf_slices_data(equal_number_time_samples)
self.average_acf_slices_data()
self.save_acf_hdf()
tend = self.timer.current()
time_stamp(
self.log_file,
self.__long_name__ + " ACF calculation",
self.timer.time_division(tend - t0),
self.verbose,
)
[docs] @calc_slices_doc
def calc_slices_data(self):
# Precompute the number of columns in the dataframe and make the list of names
# self.df_column_names()
# Dataframes' columns names
pt_str_kin = "Pressure Tensor Kinetic"
pt_str_pot = "Pressure Tensor Potential"
pt_str = "Pressure Tensor"
time = zeros(self.slice_steps)
# Let's compute
start_slice_step = 0
end_slice_step = self.slice_steps * self.dump_step
for isl in tqdm(
range(self.no_slices),
desc=f"\nCalculating {self.__long_name__} for slice ",
disable=not self.verbose,
position=0,
):
# Parse the particles from the dump files
pressure = zeros(self.slice_steps)
pt_kin_temp = zeros((self.dimensions, self.dimensions, self.num_species, self.slice_steps))
pt_pot_temp = zeros((self.dimensions, self.dimensions, self.num_species, self.slice_steps))
pt_temp = zeros((self.dimensions, self.dimensions, self.num_species, self.slice_steps))
species_pressure = zeros((self.num_species, self.slice_steps))
for it, dump in enumerate(
tqdm(
range(start_slice_step, end_slice_step, self.dump_step),
desc="Timestep",
position=1,
disable=not self.verbose,
leave=False,
)
):
datap = load_from_restart(self.dump_dir, dump)
time[it] = datap["time"]
pt_pot_temp[:, :, :, it] = datap["species_pressure_pot_tensor"][:, :, :]
pt_kin_temp[:, :, :, it] = datap["species_pressure_kin_tensor"][:, :, :]
pt_temp[:, :, :, it] = pt_pot_temp[:, :, :, it] + pt_kin_temp[:, :, :, it]
for isp in range(self.num_species):
species_pressure[isp, it] = (pt_temp[:, :, isp, it]).trace() / self.dimensions
# pressure[it], pt_kin_temp[:, :, it], pt_pot_temp[:, :, it], pt_temp[:, :, it] = calc_pressure_tensor(
# datap["vel"], datap["virial_species_tensor"], self.species_masses, self.species_num, self.box_volume, self.dimensions
# )
if isl == 0:
time_str = f"Species_Quantity_Time"
self.dataframe[time_str] = time.copy()
self.dataframe_slices[time_str] = time.copy()
# Add total quantities first
col_data = species_pressure[:, :].sum(axis=0)
col_name = f"Total_Pressure_slice {isl}"
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
# The reason for dividing these two loops is that I want a specific order in the dataframe.
# Pressure, Stress Tensor, All
for i, ax1 in enumerate(self.dim_labels):
for j, ax2 in enumerate(self.dim_labels):
slc_str = f"{ax1}{ax2}_slice {isl}"
if self.kinetic_potential_division:
col_name = f"Total_{pt_str_kin} {slc_str}"
col_data = pt_kin_temp[i, j, :, :].sum(axis=-2)
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
col_name = f"Total_{pt_str_pot} {slc_str}"
col_data = pt_pot_temp[i, j, :, :].sum(axis=-2)
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
# Stress Tensor P_XX, P_XY, P_XZ, P_YX, ...
col_name = f"Total_{pt_str} {slc_str}"
col_data = pt_temp[i, j, :, :].sum(axis=-2)
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
# Save data for each species to df
for isp, sp_name in enumerate(self.species_names):
col_data = species_pressure[isp, :]
col_name = f"{sp_name}_Pressure_slice {isl}"
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
# The reason for dividing these two loops is that I want a specific order in the dataframe.
# Pressure, Stress Tensor, All
for i, ax1 in enumerate(self.dim_labels):
for j, ax2 in enumerate(self.dim_labels):
slc_str = f"{ax1}{ax2}_slice {isl}"
if self.kinetic_potential_division:
col_name = f"{sp_name}_{pt_str_kin} {slc_str}"
col_data = pt_kin_temp[i, j, isp, :]
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
col_name = f"{sp_name}_{pt_str_pot} {slc_str}"
col_data = pt_pot_temp[i, j, isp, :]
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
# Stress Tensor P_XX, P_XY, P_XZ, P_YX, ...
col_name = f"{sp_name}_{pt_str} {slc_str}"
col_data = pt_temp[i, j, isp, :]
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
start_slice_step += self.slice_steps * self.dump_step
end_slice_step += self.slice_steps * self.dump_step
# end of slice loop
[docs] @calc_acf_slices_doc
def calc_acf_slices_data(self, equal_number_time_samples: bool = False):
# Dataframes' columns names
pt_acf_str_kin = "Pressure Tensor Kinetic ACF"
pt_acf_str_pot = "Pressure Tensor Potential ACF"
pt_acf_str_kinpot = "Pressure Tensor Kin-Pot ACF"
pt_acf_str_potkin = "Pressure Tensor Pot-Kin ACF"
pt_acf_str = "Pressure Tensor ACF"
# The column names will be:
# Quantity_Time, Pressure Bulk ACF_slice #,
# Pressure Tensor ACF XXXX_slice #, Pressure Tensor ACF XXXY_slice #, Pressure Tensor ACF XXXZ_slice #
# ................................ Pressure Tensor ACF XXYY_slice #, Pressure Tensor ACF XXYZ_slice #
# ................................ ................................ Pressure Tensor ACF XXZZ_slice #
# Pressure Tensor ACF XYXX_slice #, Pressure Tensor ACF XYXY_slice #, Pressure Tensor ACF XYXZ_slice #
# ................................ Pressure Tensor ACF XYYY_slice #, Pressure Tensor ACF XYYZ_slice #
# ................................ ................................ Pressure Tensor ACF XYZZ_slice #
# Pressure Tensor ACF XZXX_slice #, Pressure Tensor ACF XZXY_slice #, Pressure Tensor ACF XZXZ_slice #
# ................................ Pressure Tensor ACF XZYY_slice #, Pressure Tensor ACF XZYZ_slice #
# ................................ ................................ Pressure Tensor ACF XZZZ_slice #
# ................................ ................................ ................................
# ................................ Pressure Tensor ACF YYYY_slice #, Pressure Tensor ACF YYYZ_slice #
# ................................ ................................ Pressure Tensor ACF YYZZ_slice #
# ................................ ................................ ................................
# ................................ Pressure Tensor ACF YZYY_slice #, Pressure Tensor ACF YZYZ_slice #
# ................................ ................................ Pressure Tensor ACF YZZZ_slice #
# ................................ ................................ ................................
# ................................ ................................ ................................
# ................................ ................................ Pressure Tensor ACF ZZZZ_slice #
# These are the only calculated axes combination, the .................... indicates that those combination are not calculated.
if equal_number_time_samples:
data_storage_size = int(2 * self.acf_slice_steps)
time = zeros(data_storage_size)
# Let's compute
start_slice_step = 0
end_slice_step = int(2 * self.acf_slice_steps * self.dump_step)
# Unfortunately I haven't found a better way to grab the data than to re-read it from file.
for isl in tqdm(
range(self.no_slices),
desc=f"\nCalculating {self.__long_name__} ACF for slice ",
disable=not self.verbose,
position=0,
):
# Parse the particles from the dump files
pt_kin_temp = zeros((self.dimensions, self.dimensions, data_storage_size))
pt_pot_temp = zeros((self.dimensions, self.dimensions, data_storage_size))
pt_temp = zeros((self.dimensions, self.dimensions, data_storage_size))
pressure = zeros((data_storage_size))
for it, dump in enumerate(
tqdm(
range(start_slice_step, end_slice_step, self.dump_step),
desc="Timestep",
position=1,
disable=not self.verbose,
leave=False,
)
):
datap = load_from_restart(self.dump_dir, dump)
time[it] = datap["time"]
pt_pot_temp[:, :, it] = datap["species_pressure_pot_tensor"][:, :, :].sum(axis=-1)
pt_kin_temp[:, :, it] = datap["species_pressure_kin_tensor"][:, :, :].sum(axis=-1)
pt_temp[:, :, it] = pt_pot_temp[:, :, it] + pt_kin_temp[:, :, it]
pressure[it] = pt_temp[:, :, it].trace()
if isl == 0:
time_str = f"Quantity_Time"
self.dataframe_acf[time_str] = time[: self.acf_slice_steps].copy()
self.dataframe_acf_slices[time_str] = time[: self.acf_slice_steps].copy()
# Calculate the ACF for each species to df. I am not calculating the Sp1-Sp2 ACF
acf_data = zeros(self.acf_slice_steps)
# Auto-correlation function with the same number of time steps for each lag.
for it in tqdm(
range(self.acf_slice_steps),
desc=f"Bulk Pressure Tensor ACF calculation",
disable=not self.verbose,
position=0,
leave=False,
):
# This is needed for the bulk viscosity
col_data = pressure[: self.acf_slice_steps + it]
delta_pressure = col_data - col_data.mean()
acf_data[it] = correlationfunction(delta_pressure, delta_pressure)[it]
col_name = f"Pressure Bulk ACF_slice {isl}"
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, acf_data, col_name)
# Calculate the ACF of the thermal fluctuations of the pressure tensor elements
# Note: C_{abcd} = < sigma_{ab} sigma_{cd} >
for i, ax1 in enumerate(self.dim_labels):
for j, ax2 in enumerate(self.dim_labels[i:], i):
for k, ax3 in enumerate(self.dim_labels[i:], i):
for l, ax4 in enumerate(self.dim_labels[k:], k):
slc_str = f"{ax1}{ax2}{ax3}{ax4}_slice {isl}"
if self.kinetic_potential_division:
C_ijkl_kin = correlationfunction(pt_kin_temp[i, j, :], pt_kin_temp[k, l, :])
C_ijkl_pot = correlationfunction(pt_pot_temp[i, j, :], pt_pot_temp[k, l, :])
C_ijkl_kinpot = correlationfunction(pt_kin_temp[i, j, :], pt_pot_temp[k, l, :])
C_ijkl_potkin = correlationfunction(pt_pot_temp[i, j, :], pt_kin_temp[k, l, :])
# Kinetic
col_name = pt_acf_str_kin + slc_str
col_data = C_ijkl_kin
self.dataframe_acf_slices = add_col_to_df(
self.dataframe_acf_slices, col_data, col_name
)
# Potential
col_name = pt_acf_str_pot + slc_str
col_data = C_ijkl_pot
self.dataframe_acf_slices = add_col_to_df(
self.dataframe_acf_slices, col_data, col_name
)
# Kin-Pot
col_name = pt_acf_str_kinpot + slc_str
col_data = C_ijkl_kinpot
self.dataframe_acf_slices = add_col_to_df(
self.dataframe_acf_slices, col_data, col_name
)
# Pot-Kin
col_name = pt_acf_str_potkin + slc_str
col_data = C_ijkl_potkin
self.dataframe_acf_slices = add_col_to_df(
self.dataframe_acf_slices, col_data, col_name
)
acf_data = zeros(self.acf_slice_steps)
# Auto-correlation function with the same number of time steps for each lag.
for it in range(self.acf_slice_steps):
# desc=f"Shear {ax1}{ax2}{ax3}{ax4} ACF calculation",
# disable=not self.verbose,
# position=0,
# leave=False,
# ):
# This is needed for the bulk viscosity
col_data = pt_temp[i, j, : self.acf_slice_steps + it]
acf_data[it] = correlationfunction(col_data, col_data)[it]
# Total
col_name = f"{pt_acf_str} {slc_str}"
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, acf_data, col_name)
start_slice_step += self.acf_slice_steps * self.dump_step
end_slice_step += self.acf_slice_steps * self.dump_step
# end of slice loop
else:
data_storage_size = int(self.slice_steps)
time = zeros(data_storage_size)
# Let's compute
start_slice_step = 0
end_slice_step = int(self.acf_slice_steps * self.dump_step)
# Unfortunately I haven't found a better way to grab the data than to re-read it from file.
for isl in tqdm(
range(self.no_slices),
desc=f"\nCalculating {self.__long_name__} for slice ",
disable=not self.verbose,
position=0,
):
# Parse the particles from the dump files
pt_kin_temp = zeros((self.dimensions, self.dimensions, data_storage_size))
pt_pot_temp = zeros((self.dimensions, self.dimensions, data_storage_size))
pt_temp = zeros((self.dimensions, self.dimensions, data_storage_size))
pressure = zeros((data_storage_size))
for it, dump in enumerate(
tqdm(
range(start_slice_step, end_slice_step, self.dump_step),
desc="Timestep",
position=1,
disable=not self.verbose,
leave=False,
)
):
datap = load_from_restart(self.dump_dir, dump)
time[it] = datap["time"]
pt_pot_temp[:, :, it] = datap["species_pressure_pot_tensor"][:, :, :].sum(axis=-1)
pt_kin_temp[:, :, it] = datap["species_pressure_kin_tensor"][:, :, :].sum(axis=-1)
pt_temp[:, :, it] = pt_pot_temp[:, :, it] + pt_kin_temp[:, :, it]
pressure[it] = pt_temp[:, :, it].trace()
if isl == 0:
time_str = f"Quantity_Time"
self.dataframe_acf[time_str] = time[: self.slice_steps].copy()
self.dataframe_acf_slices[time_str] = time[: self.slice_steps].copy()
# Calculate the ACF for each species to df. I am not calculating the Sp1-Sp2 ACF
acf_data = zeros(self.slice_steps)
# This is needed for the bulk viscosity
col_data = pressure
delta_pressure = col_data - col_data.mean()
acf_data = correlationfunction(delta_pressure, delta_pressure)
col_name = f"Pressure Bulk ACF_slice {isl}"
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, acf_data, col_name)
# Calculate the ACF of the thermal fluctuations of the pressure tensor elements
# Note: C_{abcd} = < sigma_{ab} sigma_{cd} >
for i, ax1 in enumerate(self.dim_labels):
for j, ax2 in enumerate(self.dim_labels[i:], i):
for k, ax3 in enumerate(self.dim_labels[i:], i):
for l, ax4 in enumerate(self.dim_labels[k:], k):
slc_str = f"{ax1}{ax2}{ax3}{ax4}_slice {isl}"
if self.kinetic_potential_division:
C_ijkl_kin = correlationfunction(pt_kin_temp[i, j, :], pt_kin_temp[k, l, :])
C_ijkl_pot = correlationfunction(pt_pot_temp[i, j, :], pt_pot_temp[k, l, :])
C_ijkl_kinpot = correlationfunction(pt_kin_temp[i, j, :], pt_pot_temp[k, l, :])
C_ijkl_potkin = correlationfunction(pt_pot_temp[i, j, :], pt_kin_temp[k, l, :])
# Kinetic
col_name = pt_acf_str_kin + slc_str
col_data = C_ijkl_kin
self.dataframe_acf_slices = add_col_to_df(
self.dataframe_acf_slices, col_data, col_name
)
# Potential
col_name = pt_acf_str_pot + slc_str
col_data = C_ijkl_pot
self.dataframe_acf_slices = add_col_to_df(
self.dataframe_acf_slices, col_data, col_name
)
# Kin-Pot
col_name = pt_acf_str_kinpot + slc_str
col_data = C_ijkl_kinpot
self.dataframe_acf_slices = add_col_to_df(
self.dataframe_acf_slices, col_data, col_name
)
# Pot-Kin
col_name = pt_acf_str_potkin + slc_str
col_data = C_ijkl_potkin
self.dataframe_acf_slices = add_col_to_df(
self.dataframe_acf_slices, col_data, col_name
)
# acf_data = zeros(self.acf_slice_steps)
col_data = pt_temp[i, j, :]
acf_data = correlationfunction(col_data, col_data)
# Total
col_name = f"{pt_acf_str} {slc_str}"
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, acf_data, col_name)
start_slice_step += self.slice_steps * self.dump_step
end_slice_step += self.slice_steps * self.dump_step
# end of slice loop
[docs] @avg_slices_doc
def average_slices_data(self):
# Dataframes' columns names
pt_str_kin = "Pressure Tensor Kinetic"
pt_str_pot = "Pressure Tensor Potential"
pt_str = "Pressure Tensor"
col_str = [f"Total_Pressure_slice {isl}" for isl in range(self.no_slices)]
col_name = "Total_Pressure_Mean"
col_data = self.dataframe_slices[col_str].mean(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
col_name = "Total_Pressure_Std"
col_data = self.dataframe_slices[col_str].std(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Save data for each species to df
for _, sp_name in enumerate(self.species_names):
col_str = [f"{sp_name}_Pressure_slice {isl}" for isl in range(self.no_slices)]
col_name = f"{sp_name}_Pressure_Mean"
col_data = self.dataframe_slices[col_str].mean(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Std
col_name = f"{sp_name}_Pressure_Std"
col_data = self.dataframe_slices[col_str].std(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
for i, ax1 in enumerate(self.dim_labels):
for j, ax2 in enumerate(self.dim_labels):
if self.kinetic_potential_division:
# Kinetic Terms
ij_col_str = [f"{sp_name}_{pt_str_kin} {ax1}{ax2}_slice {isl}" for isl in range(self.no_slices)]
# Mean
col_name = f"{sp_name}_{pt_str_kin} {ax1}{ax2}_Mean"
col_data = self.dataframe_slices[ij_col_str].mean(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Std
col_name = f"{sp_name}_{pt_str_kin} {ax1}{ax2}_Std"
col_data = self.dataframe_slices[ij_col_str].std(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Potential Terms
ij_col_str = [f"{sp_name}_{pt_str_pot} {ax1}{ax2}_slice {isl}" for isl in range(self.no_slices)]
# Mean
col_name = f"{sp_name}_{pt_str_pot} {ax1}{ax2}_Mean"
col_data = self.dataframe_slices[ij_col_str].mean(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Std
col_name = f"{sp_name}_{pt_str_pot} {ax1}{ax2}_Std"
col_data = self.dataframe_slices[ij_col_str].std(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Full
ij_col_str = [f"{sp_name}_{pt_str} {ax1}{ax2}_slice {isl}" for isl in range(self.no_slices)]
# Mean
col_name = f"{sp_name}_{pt_str} {ax1}{ax2}_Mean"
col_data = self.dataframe_slices[ij_col_str].mean(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Std
col_name = f"{sp_name}_{pt_str} {ax1}{ax2}_Std"
col_data = self.dataframe_slices[ij_col_str].std(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
[docs] @avg_acf_slices_doc
def average_acf_slices_data(self):
pt_acf_str_kin = "Pressure Tensor Kinetic ACF"
pt_acf_str_pot = "Pressure Tensor Potential ACF"
pt_acf_str_kinpot = "Pressure Tensor Kin-Pot ACF"
pt_acf_str_potkin = "Pressure Tensor Pot-Kin ACF"
pt_acf_str = "Pressure Tensor ACF"
# The column names will be:
# Quantity_Time, Pressure Tensor ACF_Mean, Pressure Tensor ACF_Std,
# Pressure Tensor ACF XXXX_Mean/Std, Pressure Tensor ACF XXXY_Mean/Std, Pressure Tensor ACF XXXZ_Mean/Std
# ................................ Pressure Tensor ACF XXYY_Mean/Std, Pressure Tensor ACF XXYZ_Mean/Std
# ................................ ................................ Pressure Tensor ACF XXZZ_Mean/Std
# Pressure Tensor ACF XYXX_Mean/Std, Pressure Tensor ACF XYXY_Mean/Std, Pressure Tensor ACF XYXZ_Mean/Std
# ................................ Pressure Tensor ACF XYYY_Mean/Std, Pressure Tensor ACF XYYZ_Mean/Std
# ................................ ................................ Pressure Tensor ACF XYZZ_Mean/Std
# Pressure Tensor ACF XZXX_Mean/Std, Pressure Tensor ACF XZXY_Mean/Std, Pressure Tensor ACF XZXZ_Mean/Std
# ................................ Pressure Tensor ACF XZYY_Mean/Std, Pressure Tensor ACF XZYZ_Mean/Std
# ................................ ................................ Pressure Tensor ACF XZZZ_Mean/Std
# ................................ ................................ ................................
# ................................ Pressure Tensor ACF YYYY_Mean/Std, Pressure Tensor ACF YYYZ_Mean/Std
# ................................ ................................ Pressure Tensor ACF YYZZ_Mean/Std
# ................................ ................................ ................................
# ................................ Pressure Tensor ACF YZYY_Mean/Std, Pressure Tensor ACF YZYZ_Mean/Std
# ................................ ................................ Pressure Tensor ACF YZZZ_Mean/Std
# ................................ ................................ ................................
# ................................ ................................ ................................
# ................................ ................................ Pressure Tensor ACF ZZZZ_Mean/Std
# These are the only calculated axes combination, the .................... indicates that those combination are not calculated.
# Pressure ACF Mean and Std
col_str = [f"Pressure Bulk ACF_slice {isl}" for isl in range(self.no_slices)]
col_name = "Pressure Bulk ACF_Mean"
col_data = self.dataframe_acf_slices[col_str].mean(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
col_name = "Pressure Bulk ACF_Std"
col_data = self.dataframe_acf_slices[col_str].std(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# Note: C_{abcd} = < sigma_{ab} sigma_{cd} >
for i, ax1 in enumerate(self.dim_labels):
for j, ax2 in enumerate(self.dim_labels[i:], i):
for k, ax3 in enumerate(self.dim_labels[i:], i):
for l, ax4 in enumerate(self.dim_labels[k:], k):
if self.kinetic_potential_division:
# Kinetic Terms
ij_col_acf_str = [
pt_acf_str_kin + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}" for isl in range(self.no_slices)
]
# Mean
col_name = pt_acf_str_kin + f" {ax1}{ax2}{ax3}{ax4}_Mean"
col_data = self.dataframe_acf_slices[ij_col_acf_str].mean(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# Std
col_name = pt_acf_str_kin + f" {ax1}{ax2}{ax3}{ax4}_Std"
col_data = self.dataframe_acf_slices[ij_col_acf_str].std(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# Potential Terms
ij_col_acf_str = [
pt_acf_str_pot + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}" for isl in range(self.no_slices)
]
# Mean
col_name = pt_acf_str_pot + f" {ax1}{ax2}{ax3}{ax4}_Mean"
col_data = self.dataframe_acf_slices[ij_col_acf_str].mean(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# Std
col_name = pt_acf_str_pot + f" {ax1}{ax2}{ax3}{ax4}_Std"
col_data = self.dataframe_acf_slices[ij_col_acf_str].std(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# Kinetic-Potential Terms
ij_col_acf_str = [
pt_acf_str_kinpot + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}" for isl in range(self.no_slices)
]
# Mean
col_name = pt_acf_str_kinpot + f" {ax1}{ax2}{ax3}{ax4}_Mean"
col_data = self.dataframe_acf_slices[ij_col_acf_str].mean(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# Std
col_name = pt_acf_str_kinpot + f" {ax1}{ax2}{ax3}{ax4}_Std"
col_data = self.dataframe_acf_slices[ij_col_acf_str].std(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# Potential-Kinetic Terms
ij_col_acf_str = [
pt_acf_str_potkin + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}" for isl in range(self.no_slices)
]
# Mean
col_name = pt_acf_str_potkin + f" {ax1}{ax2}{ax3}{ax4}_Mean"
col_data = self.dataframe_acf_slices[ij_col_acf_str].mean(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# Std
col_name = pt_acf_str_potkin + f" {ax1}{ax2}{ax3}{ax4}_Std"
col_data = self.dataframe_acf_slices[ij_col_acf_str].std(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# Full
ij_col_acf_str = [
f"{pt_acf_str} {ax1}{ax2}{ax3}{ax4}_slice {isl}" for isl in range(self.no_slices)
]
# Mean
col_name = f"{pt_acf_str} {ax1}{ax2}{ax3}{ax4}_Mean"
col_data = self.dataframe_acf_slices[ij_col_acf_str].mean(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# Std
col_name = f"{pt_acf_str} {ax1}{ax2}{ax3}{ax4}_Std"
col_data = self.dataframe_acf_slices[ij_col_acf_str].std(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
[docs] def sum_rule(self, beta, rdf, potential):
r"""
Calculate the sum rule integrals from the rdf.
.. math::
:nowrap:
\begin{eqnarray}
\sigma_{zzzz} & = & \frac{n}{\beta^2} \left [ 3 +
\frac{2\beta}{15} I^{(1)} + \frac{\beta}{5} I^{(2)} \right ] , \\
\sigma_{zzxx} & =& \frac{n}{\beta^2} \left [ 1
- \frac{2\beta}{5} I^{(1)} + \frac {\beta}{15} I^{(2)} \right ] , \\
\sigma_{xyxy} & = & \frac{n}{\beta^2} \left [ 1 +
\frac{4\beta}{15} I^{(2)} + \frac {\beta}{15} I^{(2)} \right ] ,
\end{eqnarray}
where :math:`I^{(k)} = \sum_{A} \sum_{B \geq A}I_{AB}^{(\rm {Hartree}, k)} + I_{AB}^{(\rm {Corr}, k)}`
calculated from
:meth:`sarkas.tools.observables.RadialDistributionFunction.compute_sum_rule_integrals`.
Parameters
----------
beta: float
Inverse temperature factor. Grab it from :py:attr:`sarkas.tools.observables.Thermodynamics.beta`.
rdf: sarkas.tools.observables.RadialDistributionFunction
Radial Distribution function object.
potential : :class:`sarkas.potentials.core.Potential`
Potential object.
Returns
-------
sigma_zzzz : float
See above integral.
sigma_zzxx : float
See above integral.
sigma_xyxy : float
See above integral.
"""
hartrees, corrs = rdf.compute_sum_rule_integrals(potential)
# Eq. 2.3.43 -44 in Boon and Yip
I_1 = hartrees[:, 1].sum() + corrs[:, 1].sum()
I_2 = hartrees[:, 2].sum() + corrs[:, 2].sum()
nkT = self.total_num_density / beta
sigma_zzzz = 3.0 * nkT + 2.0 / 15.0 * I_1 + I_2 / 5.0
sigma_zzxx = nkT - 2.0 / 5.0 * I_1 + I_2 / 15.0
sigma_xyxy = nkT + 4.0 / 15.0 * I_1 + I_2 / 15.0
return sigma_zzzz, sigma_zzxx, sigma_xyxy
[docs]class RadialDistributionFunction(Observable):
"""
Radial Distribution Function.
Attributes
----------
no_bins : int
Number of bins.
dr_rdf : float
Size of each bin.
"""
[docs] def __init__(self):
super().__init__()
self.__name__ = "rdf"
self.__long_name__ = "Radial Distribution Function"
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase=phase, no_slices=no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
# These definitions are needed for the print out.
self.rc = self.cutoff_radius
self.no_bins = self.rdf_nbins
self.dr_rdf = self.rc / self.no_bins
self.update_finish()
[docs] @compute_doc
def compute(self):
t0 = self.timer.current()
self.calc_slices_data()
self.average_slices_data()
self.save_hdf()
self.save_pickle()
tend = self.timer.current()
time_stamp(self.log_file, self.__long_name__ + " Calculation", self.timer.time_division(tend - t0), self.verbose)
[docs] def calc_slices_data(self):
# initialize temporary arrays
r_values = zeros(self.no_bins)
bin_vol = zeros(self.no_bins)
pair_density = zeros((self.num_species, self.num_species))
# This is needed to be certain the number of bins is the same.
# if not isinstance(rdf_hist, ndarray):
# # Find the last dump by looking for the largest number in the checkpoints filenames
# dumps_list = listdir(self.dump_dir)
# dumps_list.sort(key=num_sort)
# name, ext = os.path.splitext(dumps_list[-1])
# _, number = name.split('_')
datap = load_from_restart(self.dump_dir, 0)
# Make sure you are getting the right number of bins and redefine dr_rdf.
self.no_bins = datap["rdf_hist"].shape[0]
self.dr_rdf = self.rc / self.no_bins
# No. of pairs per volume
for i, sp1 in enumerate(self.species_num):
pair_density[i, i] = sp1 * (sp1 - 1) / self.box_volume
if self.num_species > 1:
for j, sp2 in enumerate(self.species_num[i + 1 :], i + 1):
pair_density[i, j] = sp1 * sp2 / self.box_volume
# Calculate the volume of each bin
# The formula for the N-dimensional sphere is
# pi^{N/2}/( factorial( N/2) )
# from https://en.wikipedia.org/wiki/N-sphere#:~:text=In%20general%2C%20the-,volume,-%2C%20in%20n%2Ddimensional
sphere_shell_const = (pi ** (self.dimensions / 2.0)) / factorial(self.dimensions / 2.0)
bin_vol[0] = sphere_shell_const * self.dr_rdf**self.dimensions
for ir in range(1, self.no_bins):
r1 = ir * self.dr_rdf
r2 = (ir + 1) * self.dr_rdf
bin_vol[ir] = sphere_shell_const * (r2**self.dimensions - r1**self.dimensions)
r_values[ir] = (ir + 0.5) * self.dr_rdf
# Save the ra values for simplicity
self.ra_values = r_values / self.a_ws
self.dataframe["Interparticle_Distance"] = r_values
self.dataframe_slices["Interparticle_Distance"] = r_values
dump_init = 0
for isl in tqdm(range(self.no_slices), desc="Calculating RDF for slice", disable=not self.verbose):
# Grab the data from the dumps. The -1 is for '0'-indexing
dump_end = (isl + 1) * (self.slice_steps - 1) * self.dump_step
data_init = load_from_restart(self.dump_dir, int(dump_init))
data_end = load_from_restart(self.dump_dir, int(dump_end))
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:], i):
denom_const = pair_density[i, j] * self.slice_steps * self.dump_step
# Each slice should be considered as an independent system.
# The RDF is calculated from the difference between the last dump of the slice and the initial dump
# of the slice
rdf_hist_init = data_init["rdf_hist"][:, i, j] + data_init["rdf_hist"][:, j, i]
rdf_hist_end = data_end["rdf_hist"][:, i, j] + data_end["rdf_hist"][:, j, i]
rdf_hist_slc = rdf_hist_end - rdf_hist_init
col_name = f"{sp1}-{sp2} RDF_slice {isl}"
col_data = rdf_hist_slc / denom_const / bin_vol
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
dump_init = dump_end
[docs] @avg_slices_doc
def average_slices_data(self):
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:], i):
col_str = [f"{sp1}-{sp2} RDF_slice {isl}" for isl in range(self.no_slices)]
col_name = f"{sp1}-{sp2} RDF_Mean"
col_data = self.dataframe_slices[col_str].mean(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
col_name = f"{sp1}-{sp2} RDF_Std"
col_data = self.dataframe_slices[col_str].std(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
[docs] def compute_sum_rule_integrals(self, potential):
"""
Compute integrals of the RDF used in sum rules. \n
The species dependent integrals are
.. math::
I_{AB}^{\\rm (Hartree, k)} = 2^{D - 2} \\pi n_{A} n_{B} \\int_0^{\\infty} dr \\,
r^{D - 1 + k} \\frac{d^k}{dr^k} \\phi_{AB}(r),
.. math::
I_{AB}^{\\rm (Corr, k)} = 2^{D - 2} \\pi n_{A} n_{B} \\int_0^{\\infty} dr \\,
r^{D - 1 + k} h_{AB} (r) \\frac{d^k}{dr^k} \\phi_{AB}(r),
where :math:`D` is the number of dimensions, :math:`k = {0, 1, 2}`,
and :math:`\\phi_{AB}(r)` is the potential between species :math:`A` and :math:`B`. \n
Only Coulomb and Yukawa potentials are supported at the moment.
Parameters
----------
potential : :class:`sarkas.potentials.core.Potential`
Sarkas Potential object. Needed for all its attributes.
Returns
-------
hartrees : numpy.ndarray
Hartree integrals with :math:`k = {0, 1, 2}`. \n
Shape = ( :py:attr:`sarkas.tools.observables.Observable.no_obs`, 3).
corrs : numpy.ndarray
Correlational integrals with :math:`k = {0, 1, 2}`. \n
Shape = ( :py:attr:`sarkas.tools.observables.Observable.no_obs`, 3).
"""
r = self.dataframe[self.dataframe.columns[0]].to_numpy().copy()
dims = self.dimensions
dim_const = 2.0 ** (dims - 2) * pi
if r[0] == 0.0:
r[0] = 1e-40
corrs = zeros((self.no_obs, 3))
hartrees = zeros((self.no_obs, 3))
obs_indx = 0
# TODO:Make this calculation for each slice and/or run
for sp1, sp1_name in enumerate(self.species_names):
for sp2, sp2_name in enumerate(self.species_names[sp1:], sp1):
h_r = self.dataframe[(f"{sp1_name}-{sp2_name} RDF", "Mean")].to_numpy() - 1.0
# Calculate the derivatives of the potential
u_r, dv_dr, d2v_dr2 = potential.potential_derivatives(r, potential.matrix[sp1, sp2])
densities = self.species_num_dens[sp1] * self.species_num_dens[sp2]
hartrees[obs_indx, 0] = dim_const * densities * trapz(u_r * r ** (dims - 1), x=r)
corrs[obs_indx, 0] = dim_const * densities * trapz(u_r * h_r * r ** (dims - 1), x=r)
hartrees[obs_indx, 1] = dim_const * densities * trapz(dv_dr * r**dims, x=r)
corrs[obs_indx, 1] = dim_const * densities * trapz(dv_dr * h_r * r**dims, x=r)
hartrees[obs_indx, 2] = dim_const * densities * trapz(d2v_dr2 * r ** (dims + 1), x=r)
corrs[obs_indx, 2] = dim_const * densities * trapz(d2v_dr2 * h_r * r ** (dims + 1), x=r)
obs_indx += 1
return hartrees, corrs
[docs]class StaticStructureFactor(Observable):
"""
Static Structure Factors.
The species dependent SSF :math:`S_{AB}(\\mathbf k)` is calculated from
.. math::
S_{AB}(\\mathbf k) = \\int_0^\\infty dt \\,
\\left \\langle | n_{A}( \\mathbf k, t)n_{B}( -\\mathbf k, t) \\right \\rangle,
where the microscopic density of species :math:`A` with number of particles :math:`N_{A}` is given by
.. math::
n_{A}(\\mathbf k,t) = \\sum^{N_{A}}_{j} e^{-i \\mathbf k \\cdot \\mathbf r_j(t)} .
Attributes
----------
k_list : list
List of all possible :math:`k` vectors with their corresponding magnitudes and indexes.
k_counts : numpy.ndarray
Number of occurrences of each :math:`k` magnitude.
ka_values : numpy.ndarray
Magnitude of each allowed :math:`ka` vector.
no_ka_values: int
Length of ``ka_values`` array.
"""
[docs] def __init__(self):
super().__init__()
self.__name__ = "ssf"
self.__long_name__ = "Static Structure Function"
self.k_observable = True
self.kw_observable = False
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase=phase, no_slices=no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
self.update_finish()
[docs] @compute_doc
def compute(self):
tinit = self.timer.current()
self.calc_slices_data()
self.average_slices_data()
self.save_hdf()
tend = self.timer.current()
time_stamp(
self.log_file, self.__long_name__ + " Calculation", self.timer.time_division(tend - tinit), self.verbose
)
[docs] @calc_slices_doc
def calc_slices_data(self):
self.parse_kt_data(nkt_flag=True)
no_dumps_calculated = self.slice_steps * self.no_slices
Sk_avg = zeros((self.no_obs, len(self.k_counts), no_dumps_calculated))
k_column = "Wavenumber_k"
self.dataframe_slices[k_column] = self.k_values
self.dataframe[k_column] = self.k_values
nkt_df = read_hdf(self.nkt_hdf_file, mode="r", key="nkt")
for isl in tqdm(range(self.no_slices), desc="Calculating SSF for slice", disable=not self.verbose):
# Initialize container
nkt = zeros((self.num_species, self.slice_steps, len(self.k_list)), dtype=complex128)
for sp, sp_name in enumerate(self.species_names):
nkt[sp] = array(nkt_df[f"slice {isl + 1}"][sp_name])
init = isl * self.slice_steps
fin = (isl + 1) * self.slice_steps
Sk_avg[:, :, init:fin] = calc_Sk(nkt, self.k_list, self.k_counts, self.species_num, self.slice_steps)
sp_indx = 0
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:]):
col_name = f"{sp1}-{sp2} SSF_slice {isl}"
col_data = Sk_avg[sp_indx, :, init:fin].mean(axis=-1)
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
sp_indx += 1
[docs] @avg_slices_doc
def average_slices_data(self):
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:]):
column = [f"{sp1}-{sp2} SSF_slice {isl}" for isl in range(self.no_slices)]
col_name = f"{sp1}-{sp2} SSF_Mean"
col_data = self.dataframe_slices[column].mean(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
col_name = f"{sp1}-{sp2} SSF_Std"
col_data = self.dataframe_slices[column].std(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
[docs]class Thermodynamics(Observable):
"""
Thermodynamic functions.
"""
[docs] def __init__(self):
super().__init__()
self.__name__ = "therm"
self.__long_name__ = "Thermodynamics"
self.beta_slice = None
self.specific_heat_volume_slice = None
self.acf_observable = True
[docs] def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
"""
Assign attributes from simulation's parameters.
Parameters
----------
params : sarkas.core.Parameters
Simulation's parameters.
phase : str, optional
Phase to compute. Default = 'production'.
**kwargs :
These will overwrite any :class:`sarkas.core.Parameters`
or default :class:`sarkas.tools.observables.Observable`
attributes and/or add new ones.
"""
super().setup_init(params, phase, no_slices)
if params.load_method[:-7] == "restart":
self.restart_sim = True
else:
self.restart_sim = False
self.update_args(**kwargs)
self.grab_sim_data(phase)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
self.update_finish()
[docs] def calculate_beta_slices(self, ensemble: str = "NVE"):
"""Calculate the inverse temperature by taking the mean of the temperature time series."""
if ensemble == "NVE":
self.beta_slice = zeros(self.no_slices)
for isl in range(self.no_slices):
self.beta_slice[isl] = 1.0 / (self.kB * self.dataframe_slices[("Temperature", f"slice {isl}")].mean())
else:
self.beta_slice = ones(self.no_slices) * 1.0 / (self.kB * self.T_desired)
[docs] def calculate_heat_capacity_slices(self, ensemble: str = "NVE"):
"""Calculate the specific heat capacity from the fluctuations of the energy."""
self.calculate_beta_slices(ensemble=ensemble)
if ensemble == "NVE":
self.specific_heat_volume_slice = zeros(self.no_slices)
for isl in range(self.no_slices):
kin_2 = (self.dataframe_slices[("Total Kinetic Energy", f"slice {isl}")].std()) ** 2
denom = 1 - 2.0 * self.beta_slice[isl] ** 2 * kin_2 / (self.dimensions * self.total_num_ptcls)
self.specific_heat_volume_slice[isl] = 0.5 * self.dimensions * self.kB * self.total_num_ptcls / denom
else:
self.specific_heat_volume_slice = zeros(self.no_slices)
for isl in range(self.no_slices):
deltaE_2 = (self.dataframe_slices[("Total Energy", f"slice {isl}")].std()) ** 2
self.specific_heat_volume_slice[isl] = deltaE_2 * self.beta_slice[isl] ** 2 * self.kB
[docs] def calculate_beta_simulation(self, ensemble: str = "NVE"):
"""Calculate the inverse temperature by taking the mean of the temperature time series."""
if ensemble == "NVE":
self.beta = 1.0 / (self.kB * self.simulation_dataframe["Temperature"].mean())
else:
self.beta = 1.0 / (self.kB * self.T_desired)
[docs] def calculate_heat_capacity_simulation(self, ensemble: str = "NVE"):
"""Calculate the specific heat capacity from the fluctuations of the energy."""
self.calculate_beta_simulation(ensemble=ensemble)
if ensemble == "NVE":
kin_2 = (self.simulation_dataframe["Total Kinetic Energy"].std()) ** 2
denom = 1 - 2.0 * self.beta**2 * kin_2 / (self.dimensions * self.total_num_ptcls)
self.specific_heat_volume = 0.5 * self.dimensions * self.kB * self.total_num_ptcls / denom
else:
deltaE_2 = (self.simulation_dataframe["Total Energy"].std()) ** 2
self.specific_heat_volume = deltaE_2 * self.beta**2 * self.kB
[docs] @calc_slices_doc
def calc_slices_data(self):
# data_size
data_size = int(self.slice_steps)
### Slices loop
for col_name, col_data in self.simulation_dataframe.iloc[:, 1:].items():
data_start = 0
data_end = data_size
for isl in range(self.no_slices):
if isl == 0:
time = self.simulation_dataframe["Time"][: self.slice_steps]
time_str = f"Quantity_Time"
self.dataframe[time_str] = time.copy()
self.dataframe_slices[time_str] = time.copy()
df_col_name = f"{col_name}_slice {isl}"
self.dataframe_slices = add_col_to_df(
self.dataframe_slices, col_data[data_start:data_end].values, df_col_name
)
data_start += self.slice_steps
data_end += self.slice_steps
# end of slice loop
[docs] @calc_acf_slices_doc
def calc_acf_slice_data(self, equal_number_time_samples: bool = False):
# In order to have the same number of timesteps products for each time lag of the ACF do the following.
# Temporarily store two consecutive slice data
data_col = zeros(2 * self.acf_slice_steps)
# data_size
data_size = int(2 * self.acf_slice_steps)
for col_name, col_data in self.simulation_dataframe.iloc[:, 1:].items():
data_start = 0
data_end = data_size
data_acf = zeros(self.acf_slice_steps)
# Slices loop
for isl in tqdm(
range(self.no_slices),
desc=f"\nCalculating {col_name} ACF for slice ",
disable=not self.verbose,
position=0,
):
if isl == 0:
time = self.simulation_dataframe["Time"][: self.acf_slice_steps]
time_str = f"Quantity_Time"
self.dataframe_acf[time_str] = time[: self.acf_slice_steps].copy()
self.dataframe_acf_slices[time_str] = time[: self.acf_slice_steps].copy()
data_col = col_data[data_start:data_end].values
if equal_number_time_samples:
# Auto-correlation function with the same number of time steps for each lag.
for it in tqdm(
range(self.acf_slice_steps),
desc=f"{col_name} ACF time lag",
disable=not self.verbose,
position=0,
leave=False,
):
delta_data = data_col[:it + self.acf_slice_steps] - data_col[:it + self.acf_slice_steps].mean()
data_acf[it] = correlationfunction(delta_data, delta_data)[it]
else:
delta_data = data_col - data_col.mean()
data_acf = correlationfunction(delta_data, delta_data)
# Store in the dataframe
col_name = f"{col_name} ACF_slice {isl}"
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, data_acf, col_name)
# Advance by only one slice at a time.
data_start += self.acf_slice_steps
data_end += self.acf_slice_steps
# end of slice loop
[docs] @avg_slices_doc
def average_slices_data(self):
### Slices loop
for _, df_col_name in enumerate(self.simulation_dataframe.columns[1:]):
columns = [f"{df_col_name}_slice {isl}" for isl in range(self.no_slices)]
col_data = self.dataframe_slices[columns].mean(axis=1)
col_name = f"{df_col_name}_Mean"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
col_data = self.dataframe_slices[columns].std(axis=1)
col_name = f"{df_col_name}_Std"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
[docs] @avg_acf_slices_doc
def average_acf_slices_data(self):
### Slices loop
for _, df_col_name in enumerate(self.simulation_dataframe.columns[1:]):
columns = [f"{df_col_name} ACF_slice {isl}" for isl in range(self.no_slices)]
col_data = self.dataframe_acf_slices[columns].mean(axis=1)
col_name = f"{df_col_name} ACF_Mean"
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
col_data = self.dataframe_acf_slices[columns].std(axis=1)
col_name = f"{df_col_name} ACF_Std"
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
[docs] @compute_doc
def compute(self, calculate_acf: bool = False):
t0 = self.timer.current()
self.calc_slices_data()
self.average_slices_data()
self.save_hdf()
self.calculate_beta_slices()
self.calculate_heat_capacity_slices()
msg = self.__long_name__
if calculate_acf:
msg = (self.__long_name__ + " and its ACF Calculation",)
self.calc_acf_slice_data()
self.average_acf_slices_data()
self.save_acf_hdf()
tend = self.timer.current()
time_stamp(
self.log_file,
msg,
self.timer.time_division(tend - t0),
self.verbose,
)
[docs] def compute_from_rdf(self, rdf, potential):
"""
Calculate the correlational energy and correlation pressure using
:meth:`sarkas.tools.observables.RadialDistributionFunction.compute_sum_rule_integrals` method.
The Hartree and correlational terms between species :math:`A` and :math:`B` are
.. math::
U_{AB}^{\\rm hartree} = 2 \\pi \\frac{N_AN_B}{V} \\int_0^\\infty dr \\, \\phi_{AB}(r) r^2 dr,
.. math::
U_{AB}^{\\rm corr} = 2 \\pi \\frac{N_AN_B}{V} \\int_0^\\infty dr \\, \\phi_{AB}(r) h(r) r^2 dr,
.. math::
P_{AB}^{\\rm hartree} = - \\frac{2 \\pi}{3} \\frac{N_AN_B}{V^2} \\int_0^\\infty dr \\, \\frac{d\\phi_{AB}(r)}{dr} r^3 dr,
.. math::
P_{AB}^{\\rm corr} = - \\frac{2 \\pi}{3} \\frac{N_AN_B}{V^2} \\int_0^\\infty dr \\, \\frac{d\\phi_{AB}(r)}{dr} h(r) r^3 dr,
Parameters
----------
rdf: :class:`sarkas.tools.observables.RadialDistributionFunction`
Radial Distribution Function object.
potential : :class:`sarkas.potentials.core.Potential`
Potential object.
Returns
-------
nkT : float
Ideal term of the pressure :math:`nk_BT`. Where :math:`n` is the total density.
u_hartree : numpy.ndarray
Hartree energy calculated from the above formula for each :math:`g_{ab}(r)`.
u_corr : numpy.ndarray
Correlational energy calculated from the above formula for each :math:`g_{ab}(r)`.
p_hartree : numpy.ndarray
Hartree pressure calculated from the above formula for each :math:`g_{ab}(r)`.
p_corr : numpy.ndarray
Correlational pressure calculated from the above formula for each :math:`g_{ab}(r)`.
"""
hartrees, corrs = rdf.compute_sum_rule_integrals(potential)
u_hartree = self.box_volume * hartrees[:, 0]
u_corr = self.box_volume * corrs[:, 0]
p_hartree = -hartrees[:, 1] / 3.0
p_corr = -corrs[:, 1] / 3.0
nkT = self.total_num_density / self.beta_slice.mean()
return nkT, u_hartree, u_corr, p_hartree, p_corr
[docs] def grab_sim_data(self, phase=None):
"""
Grab the pandas dataframe from the saved csv file.
"""
if phase:
self.phase = phase.lower()
self.simulation_dataframe = read_csv(self.filenames_tree["thermodynamics"][self.phase]["path"], index_col=False)
self.fldr = self.directory_tree["simulation"][self.phase]["path"]
[docs] def temp_energy_plot(
self,
process,
info_list: list = None,
phase: str = None,
show: bool = False,
publication: bool = False,
figname: str = None,
):
"""
Plot Temperature and Energy as a function of time with their cumulative sum and average.
Parameters
----------
process : sarkas.processes.Process
Sarkas Process.
info_list: list, optional
List of strings to print next to the plots.
phase: str, optional
Phase to plot. "equilibration" or "production".
show: bool, optional
Flag for displaying the figure.
publication: bool, optional
Flag for publication style plotting.
figname: str, optional
Name with which to save the plot.
"""
if phase:
phase = phase.lower()
self.phase = phase
if self.phase == "equilibration":
self.no_dumps = self.eq_no_dumps
self.dump_dir = self.eq_dump_dir
self.dump_step = self.eq_dump_step
# self.saving_dir = self.equilibration_dir
self.no_steps = self.equilibration_steps
self.grab_sim_data(self.phase)
# self.simulation_dataframe = self.simulation_dataframe.iloc[1:, :]
elif self.phase == "production":
self.no_dumps = self.prod_no_dumps
self.dump_dir = self.prod_dump_dir
self.dump_step = self.prod_dump_step
# self.saving_dir = self.production_dir
self.no_steps = self.production_steps
self.grab_sim_data(self.phase)
elif self.phase == "magnetization":
self.no_dumps = self.mag_no_dumps
self.dump_dir = self.mag_dump_dir
self.dump_step = self.mag_dump_step
# self.saving_dir = self.magnetization_dir
self.no_steps = self.magnetization_steps
self.grab_sim_data(self.phase)
else:
self.grab_sim_data()
if self.phase == "production":
self.calculate_beta_simulation(ensemble="NVE")
else:
self.calculate_beta_simulation(ensemble="NVT")
completed_steps = self.dump_step * (self.no_dumps - 1)
fig = plt.figure(figsize=(20, 8))
fsz = 16
if publication:
plt.style.use("PUBstyle")
gs = GridSpec(3, 7)
# Temperature plots
T_delta_plot = fig.add_subplot(gs[0, 0:2])
T_main_plot = fig.add_subplot(gs[1:3, 0:2])
T_hist_plot = fig.add_subplot(gs[1:3, 2])
# Energy plots
E_delta_plot = fig.add_subplot(gs[0, 4:6])
E_main_plot = fig.add_subplot(gs[1:3, 4:6])
E_hist_plot = fig.add_subplot(gs[1:3, 6])
else:
gs = GridSpec(3, 8)
Info_plot = fig.add_subplot(gs[0:4, 0:2])
# Temperature plots
T_delta_plot = fig.add_subplot(gs[0, 2:4])
T_main_plot = fig.add_subplot(gs[1:4, 2:4])
T_hist_plot = fig.add_subplot(gs[1:4, 4])
# Energy plots
E_delta_plot = fig.add_subplot(gs[0, 5:7])
E_main_plot = fig.add_subplot(gs[1:4, 5:7])
E_hist_plot = fig.add_subplot(gs[1:4, 7])
# Grab the current rcParams so that I can restore it later
current_rcParams = plt.rcParams.copy()
# Update rcParams with the necessary values
plt.rc("font", size=fsz) # controls default text sizes
plt.rc("axes", titlesize=fsz) # fontsize of the axes title
plt.rc("axes", labelsize=fsz) # fontsize of the x and y labels
plt.rc("xtick", labelsize=fsz - 2) # fontsize of the tick labels
plt.rc("ytick", labelsize=fsz - 2) # fontsize of the tick labels
# Grab the color line list from the plt cycler. I will use this in the hist plots
color_from_cycler = plt.rcParams["axes.prop_cycle"].by_key()["color"]
# ------------------------------------------- Temperature -------------------------------------------#
# Calculate Temperature plot's labels and multipliers
time_mul, temp_mul, time_prefix, temp_prefix, time_lbl, temp_lbl = plot_labels(
self.simulation_dataframe["Time"], self.simulation_dataframe["Temperature"], "Time", "Temperature", self.units
)
# Rescale quantities
time = time_mul * self.simulation_dataframe["Time"]
Temperature = temp_mul * self.simulation_dataframe["Temperature"]
T_desired = temp_mul * self.T_desired
# Temperature moving average
T_cumavg = Temperature.expanding().mean()
# Temperature deviation and its moving average
Delta_T = (Temperature - T_desired) * 100 / T_desired
Delta_T_cum_avg = Delta_T.expanding().mean()
# Temperature Main plot
T_main_plot.plot(time, Temperature, alpha=0.7)
T_main_plot.plot(time, T_cumavg, label="Moving Average")
T_main_plot.axhline(T_desired, ls="--", c="r", alpha=0.7, label="Desired T")
T_main_plot.legend(loc="best")
T_main_plot.set(ylabel="Temperature" + temp_lbl, xlabel="Time" + time_lbl)
# Temperature Deviation plot
T_delta_plot.plot(time, Delta_T, alpha=0.5)
T_delta_plot.plot(time, Delta_T_cum_avg, alpha=0.8)
T_delta_plot.set(xticks=[], ylabel=r"Deviation [%]")
if phase == "production":
# The Temperature fluctuations in an NVE ensemble are
# < delta T^2> = T_desired^2 * ( 2 /(Np * Dims) ) *( 1 - Np * Dims/2 * k_B/Cv)
# where Cv is the heat capacity at constant volume.
self.calculate_heat_capacity_simulation(ensemble="NVE")
dN = self.total_num_ptcls * self.dimensions
factor = 1 - 0.5 * dN * self.kB / self.specific_heat_volume
T_std = T_desired * sqrt(2.0 / dN * factor)
# TODO: Review this calculation.
# T_std *= sqrt(1 - 0.5 *process.parameters.dimensions*process.parameters.total_num_ptcls/heat_capacity_v)
else:
# The Temperature fluctuations in an NVT ensemble are
# < delta T^2> = T_desired^2 *( 2 /(Np * Dims))
dN = self.total_num_ptcls * self.dimensions
T_std = T_desired * sqrt(2.0 / dN)
# Calculate the theoretical distribution of the temperature.
# T_dist_desired is a Gaussian centered at T_desired with theoretical T_std
T_dist_desired = scp_stats.norm(loc=T_desired, scale=T_std)
# T_dist_actual is a Gaussian centered at the actual mean with theoretical T_std
T_dist_actual = scp_stats.norm(loc=Temperature.mean(), scale=T_std)
# Histogram plot
sns_histplot(y=Temperature, bins="fd", stat="density", alpha=0.75, legend="False", ax=T_hist_plot)
T_hist_plot.set(ylabel=None, xlabel=None, xticks=[], yticks=[], ylim=T_main_plot.get_ylim())
T_hist_plot.plot(
T_dist_desired.pdf(Temperature.sort_values()), Temperature.sort_values(), ls="--", color="r", alpha=0.7
)
T_hist_plot.plot(
T_dist_actual.pdf(Temperature.sort_values()), Temperature.sort_values(), color=color_from_cycler[1]
)
if self.phase == "equilibration":
T_main_plot.set(ylim=(T_desired * 0.85, T_desired * 1.15))
T_hist_plot.set(ylim=(T_desired * 0.85, T_desired * 1.15))
# ------------------------------------------- Total Energy -------------------------------------------#
# Calculate Energy plot's labels and multipliers
factor = 1.0 / process.parameters.J2erg if process.parameters.units == "cgs" else 1.0
Energy = self.simulation_dataframe["Total Energy"].copy() # * factor / process.parameters.eV2J # Energy in [eV]
time_mul, energy_mul, _, _, time_lbl, energy_lbl = plot_labels(
self.simulation_dataframe["Time"], Energy, "Time", "Energy", self.units
)
Energy *= energy_mul
# Total Energy moving average
E_cumavg = Energy.expanding().mean()
# Total Energy Deviation and its moving average
Delta_E = (Energy - Energy.iloc[0]) * 100 / Energy.iloc[0]
Delta_E_cum_avg = Delta_E.expanding().mean()
# Deviation Plot
E_delta_plot.plot(time, Delta_E, alpha=0.5)
E_delta_plot.plot(time, Delta_E_cum_avg, alpha=0.8)
E_delta_plot.set(xticks=[], ylabel=r"Deviation [%]")
# Energy main plot
E_main_plot.plot(time, Energy, alpha=0.7)
E_main_plot.plot(time, E_cumavg, label="Moving Average")
E_main_plot.axhline(Energy.mean(), ls="--", c="r", alpha=0.7, label="Avg")
E_main_plot.legend(loc="best")
E_main_plot.set(ylabel="Total Energy" + energy_lbl, xlabel="Time" + time_lbl)
# Histogram plot
# (Failed) Attempt to calculate the theoretical Energy distribution
# In an NVT ensemble Energy fluctuation are given by sigma(E) = sqrt( k_B T^2 C_v)
# where C_v is the isothermal heat capacity
# Since this requires a lot of prior calculation I skip it and just make a Gaussian
if phase == "production":
self.calculate_heat_capacity_simulation(ensemble="NVE")
NkB = self.total_num_ptcls * self.kB
beta_desired = 1.0 / (self.kB * self.T_desired)
delta_E2 = (
self.dimensions
* self.total_num_ptcls
/ beta_desired**2
* (1 - 0.5 * self.dimensions * NkB / self.specific_heat_volume)
)
else:
self.calculate_heat_capacity_simulation(ensemble="NVT")
delta_E2 = self.specific_heat_volume / self.beta**2 / self.kB
delta_E = sqrt(delta_E2)
# Calculate the theoretical distribution of the energy.
# E_dist_desired is a Gaussian centered at the actual mean with actaul E_std. This is to confirm that we have a Gaussian process.
E_dist_desired = scp_stats.norm(loc=Energy.mean(), scale=delta_E * energy_mul)
# E_dist_actual is a Gaussian centered at the actual mean with actaul E_std. This is to confirm that we have a Gaussian process.
E_dist_actual = scp_stats.norm(loc=Energy.mean(), scale=Energy.std())
sns_histplot(y=Energy, bins="fd", stat="density", alpha=0.75, legend="False", ax=E_hist_plot)
# Grab the second color since the first is used for histplot
E_hist_plot.plot(E_dist_desired.pdf(Energy.sort_values()), Energy.sort_values(), alpha=0.7, ls="--", color="r")
E_hist_plot.plot(E_dist_actual.pdf(Energy.sort_values()), Energy.sort_values(), color=color_from_cycler[1])
E_hist_plot.set(ylabel=None, xlabel=None, ylim=E_main_plot.get_ylim(), xticks=[], yticks=[])
if not publication:
dt_mul, _, _, _, dt_lbl, _ = plot_labels(
process.integrator.dt, self.simulation_dataframe["Total Energy"], "Time", "Energy", self.units
)
# Information section
Info_plot.axis([0, 10, 0, 10])
Info_plot.grid(False)
Info_plot.text(0.0, 10, "Job ID: {}".format(self.job_id))
Info_plot.text(0.0, 9.5, "Phase: {}".format(self.phase.capitalize()))
Info_plot.text(0.0, 9.0, "No. of species = {}".format(len(self.species_num)))
y_coord = 8.5
for isp, sp in enumerate(process.species):
if sp.name != "electron_background":
Info_plot.text(0.0, y_coord, "Species {} : {}".format(isp + 1, sp.name))
Info_plot.text(0.0, y_coord - 0.5, " No. of particles = {} ".format(sp.num))
Info_plot.text(
0.0, y_coord - 1.0, " Temperature = {:.2f} {}".format(temp_mul * sp.temperature, temp_lbl)
)
y_coord -= 1.5
y_coord -= 0.25
delta_t = dt_mul * process.integrator.dt
# Plasma Period
t_wp = 2.0 * pi / process.parameters.total_plasma_frequency
# Print some info to the left
if info_list is None:
integrator_type = {
"equilibration": process.integrator.equilibration_type,
"magnetization": process.integrator.magnetization_type,
"production": process.integrator.production_type,
}
info_list = [f"Total $N$ = {process.parameters.total_num_ptcls}"]
if process.integrator.thermalization:
info_list.append(f"Thermostat: {process.integrator.thermostat_type}")
info_list.append(f" Berendsen rate = {process.integrator.thermalization_rate:.2f}")
eq_cycles = int(process.parameters.equilibration_steps * process.integrator.dt / t_wp)
# calculate the actual coupling constant
t_ratio = self.T_desired / self.simulation_dataframe["Temperature"].mean()
coupling_constant = (
self.simulation_dataframe["Total Potential Energy"].mean()
/ self.simulation_dataframe["Total Kinetic Energy"].mean()
)
to_append = [
f"Equilibration cycles = {eq_cycles}",
f"Potential: {process.potential.type}",
f" Eff Coupl Const = {coupling_constant:.2e}",
f" Tot Force Error = {process.potential.force_error:.2e}",
f"Integrator: {integrator_type[self.phase]}",
]
[info_list.append(info) for info in to_append]
if integrator_type[self.phase] == "langevin":
info_list.append(f"langevin gamma = {process.integrator.langevin_gamma:.4e}")
prod_cycles = int(process.parameters.production_steps * process.integrator.dt / t_wp)
to_append = [
r" Plasma period $\tau_{\omega_p}$ = " + f"{t_wp*dt_mul:.2f} {dt_lbl}",
f" $\Delta t$ = {delta_t:.2f} {dt_lbl} = {delta_t/t_wp/dt_mul:.2e}" + r"$\tau_{\omega_p}$",
# "Step interval = {}".format(self.dump_step),
# "Step interval time = {:.2f} {}".format(self.dump_step * delta_t, dt_lbl),
f"Completed steps = {completed_steps}",
f"Total steps = {self.no_steps}",
f"{100 * completed_steps / self.no_steps:.2f} % Completed",
# "Completed time = {:.2f} {}".format(completed_steps * delta_t / dt_mul * time_mul, time_lbl),
f"Production time = {self.no_steps * delta_t / dt_mul * time_mul:.2f} {time_lbl}",
f"Production cycles = {prod_cycles}",
]
[info_list.append(info) for info in to_append]
for itext, text_str in enumerate(info_list):
Info_plot.text(0.0, y_coord, text_str)
y_coord -= 0.5
Info_plot.axis("off")
fig.tight_layout()
# Saving
if figname:
fig.savefig(os_path_join(self.saving_dir, figname + "_" + self.job_id + ".png"))
else:
fig.savefig(os_path_join(self.saving_dir, "Plot_EnsembleCheck_" + self.job_id + ".png"))
if show:
fig.show()
# Restore the previous rcParams
plt.rcParams = current_rcParams
# def gamma_plot(self, phase: str = None, figname: str = None, show: bool = False):
# if phase:
# phase = phase.lower()
# self.parse(phase)
# else:
# self.parse()
# Gamma = self.simulation_dataframe["Total Potential Energy"] / self.simulation_dataframe["Total Kinetic Energy"]
# Gamma_T = self.simulation_dataframe["Total Potential Energy"] * self.beta / self.total_num_ptcls
# Gamma_a = self.coupling_constant * self.T_desired / (self.simulation_dataframe["Temperature"])
# time_mul, energy_mul, _, _, time_lbl, energy_lbl = plot_labels(
# self.simulation_dataframe["Time"], self.simulation_dataframe["Total Potential Energy"], "Time", "ElectronVolt", self.units
# )
# time = self.simulation_dataframe["Time"] * time_mul
# fig, ax = plt.subplots(1, 3, figsize=(24, 7))
# ax[0].plot(time, Gamma)
# ax[0].plot(time, Gamma.expanding().mean(), alpha=0.8, label="Moving Average")
# # ax.axhline(self.coupling_constant, c = 'r', ls = '--', alpha = 0.7, label = r"Original $\Gamma$")
# ax[0].legend()
# ax[0].set(ylabel=r"$\Gamma = \frac{\langle U \rangle}{\langle K \rangle}$", xlabel=f"Time {time_lbl}")
# ax[1].plot(time, Gamma_T)
# ax[1].plot(time, Gamma_T.expanding().mean(), alpha=0.8, label="Moving Average")
# # ax.axhline(self.coupling_constant, c = 'r', ls = '--', alpha = 0.7, label = r"Original $\Gamma$")
# ax[1].legend()
# ax[1].set(ylabel=r"$\Gamma_T = \frac{\langle U \rangle}{k_B T}$", xlabel=f"Time {time_lbl}")
# ax[2].plot(time, Gamma_a)
# ax[2].plot(time, Gamma_a.expanding().mean(), alpha=0.8, label="Moving Average")
# ax[2].axhline(self.coupling_constant, c="r", ls="--", alpha=0.7, label=r"Original $\Gamma$")
# ax[2].legend()
# ax[2].set(
# ylabel=r"$\Gamma_a = \frac{Q^2}{4\pi \epsilon_0 a_{ws} } \frac{1}{\langle k_B T \rangle }$",
# xlabel=f"Time {time_lbl}",
# )
# # Saving
# if figname:
# fig.savefig(os_path_join(self.saving_dir, figname + "_" + self.job_id + ".png"))
# else:
# fig.savefig(os_path_join(self.saving_dir, "Plot_Gamma_" + self.job_id + ".png"))
# if show:
# fig.show()
[docs]class VelocityAutoCorrelationFunction(Observable):
"""Velocity Auto-correlation function."""
[docs] def __init__(self):
super(VelocityAutoCorrelationFunction, self).__init__()
self.__name__ = "vacf"
self.__long_name__ = "Velocity AutoCorrelation Function"
self.no_ptcls_per_species = [10]
self.particles_id = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.acf_observable = True
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase=phase, no_slices=no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
self.update_finish()
if "no_ptcls_per_species" in kwargs.keys():
self.select_random_indices(kwargs["no_ptcls_per_species"])
else:
self.select_random_indices()
[docs] @compute_doc
def compute(self, calculate_acf_data: bool = True, no_ptcls_per_species=None):
raise DeprecationWarning("VACF does not have a `compute` method anymore. Use `compute_acf()`.")
[docs] @compute_acf_doc
def compute_acf(self, no_ptcls_per_species=None):
if no_ptcls_per_species:
self.select_random_indices(no_ptcls_per_species)
t0 = self.timer.current()
self.calc_acf_slices_data()
self.average_acf_slices_data()
self.save_acf_hdf()
tend = self.timer.current()
time_stamp(self.log_file, f"{self.__long_name__} Calculation", self.timer.time_division(tend - t0), self.verbose)
[docs] @calc_acf_slices_doc
def calc_acf_slices_data(self):
start_slice = 0
end_slice = int(2 * self.acf_slice_steps * self.dump_step)
time = zeros(2 * self.slice_steps)
vel = zeros((self.dimensions, sum(self.no_ptcls_per_species), 2 * self.acf_slice_steps))
for isl in tqdm(
range(self.no_slices),
desc=f"\nCalculating {self.__name__.swapcase()} for slice ",
disable=not self.verbose,
position=0,
):
self.grab_sim_data(start_slice, end_slice, vel, time)
if isl == 0:
self.dataframe_acf["Time"] = time[: self.acf_slice_steps]
self.dataframe_acf_slices["Time"] = time[: self.acf_slice_steps]
# Return an array of shape = ( num_species, dim + 1, slice_steps)
vacf = self.calculate_vacf(vel)
self.save_acf_slices_data_to_hdf(vacf, isl)
start_slice += self.acf_slice_steps * self.dump_step
end_slice += self.acf_slice_steps * self.dump_step
[docs] def save_acf_slices_data_to_hdf(self, vacf, isl):
"""Store ACF data of slice `isl` into a hierarchical dataframe.
Parameters
----------
vacf: numpy.ndarray
Data to store,
isl: int
Number of the slice being evaluated.
"""
for i, sp1 in enumerate(self.species_names):
sp_vacf_str = f"{sp1} {self.__name__.swapcase()}"
for d in range(self.dimensions):
col_name = f"{sp_vacf_str}_{self.dim_labels[d]}_slice {isl}"
col_data = vacf[i, d, :]
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, col_data, col_name)
col_name = f"{sp_vacf_str}_Total_slice {isl}"
col_data = vacf[i, -1, :]
self.dataframe_acf_slices = add_col_to_df(self.dataframe_acf_slices, col_data, col_name)
[docs] def grab_sim_data(self, start_dump_no, end_dump_no, vel, time):
"""
Grab the data from simulation dumps.
Parameters
----------
start_dump_no: int
Initial number of the checkpoint dump.
end_dump_no: int
Final number of the checkpoint dump.
vel: numpy.ndarray
Array in which to store the collected velocities.
Shape = (:attr:`dimensions`, `len(`:attr:`no_ptcls_per_species``)`, `2`:attr:`acf_slice_steps`)
time: numpy.ndarray
Array in which to store the time collected from the dump.
Shape = (`2`:attr:`acf_slice_steps`)
"""
for it, dump in enumerate(
tqdm(
range(start_dump_no, end_dump_no, self.dump_step),
desc="Reading data",
disable=not self.verbose,
position=1,
leave=False,
)
):
datap = load_from_restart(self.dump_dir, dump)
time[it] = datap["time"]
for d in range(self.dimensions):
vel[d, :, it] = datap["vel"][self.particles_id, d]
[docs] def select_random_indices(self, no_ptcls_per_species=None):
"""Randomly select a given number of indices that indicate the particles to be used to average the VACF.
The random number of indices is stored in :attr:`particles_id`.
Parameters
----------
no_ptcls_per_species: list, int, optional
List containing the number of particles to randomly select for each species.
If `None` it uses :attr:`no_ptcls_per_species` attribute which is equal to `[10]` by default.
Raises
------
_: ValueError
If the chosen `no_ptcls_per_species` is less than the total number of particles in :attr:`species_num`.
Notes
-----
If the length of :attr:`no_ptcls_per_species` is less than the length of :attr:`species_num`, it selects the minimum of :attr:`no_ptcls_per_species` to be used for all species.
"""
# Check whether the user passed an integer
if no_ptcls_per_species:
if isinstance(no_ptcls_per_species, int):
self.no_ptcls_per_species = [no_ptcls_per_species]
# Total number of indices to select for each range
rng = default_rng()
# Check if the length of no_ptcls_per_species is equal to the length of species_num
if len(self.no_ptcls_per_species) != len(self.species_num):
# If not, adjust no_ptcls_per_species to have the same length and values
self.no_ptcls_per_species = min(self.no_ptcls_per_species) * ones_like(self.species_num)
# Initialize an empty list to store the combined random indices
combined_random_indices = []
species_start = 0
species_end = 0
for ip, num_ptcls in enumerate(self.no_ptcls_per_species):
species_end += self.species_num[ip]
# Check whether there are enough particles to choose from
if num_ptcls > self.species_num[ip]:
raise ValueError(
f"Species {self.species_names[ip]}: the chosen random number of particles, {num_ptcls}, is less than its total species number of particles, {self.species_num[ip]}"
)
# Generate unique random indices for the current range
random_indices = rng.choice(range(species_start, species_end), size=num_ptcls, replace=False)
# Append the current random indices to the combined list
combined_random_indices.extend(random_indices)
self.particles_id = combined_random_indices
[docs] @avg_acf_slices_doc
def average_acf_slices_data(self):
"""Average the data from all the slices and add it to the dataframe."""
# Average the stuff
for i, sp1 in enumerate(self.species_names):
sp_vacf_str = f"{sp1} {self.__name__.swapcase()}"
for d in range(self.dimensions):
dl = self.dim_labels[d]
dcol_str = [sp_vacf_str + f"_{dl}_slice {isl}" for isl in range(self.no_slices)]
self.dataframe_acf[sp_vacf_str + f"_{dl}_Mean"] = self.dataframe_acf_slices[dcol_str].mean(axis=1)
self.dataframe_acf[sp_vacf_str + f"_{dl}_Std"] = self.dataframe_acf_slices[dcol_str].std(axis=1)
tot_col_str = [sp_vacf_str + f"_Total_slice {isl}" for isl in range(self.no_slices)]
col_name = sp_vacf_str + "_Total_Mean"
col_data = self.dataframe_acf_slices[tot_col_str].mean(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
col_name = sp_vacf_str + "_Total_Std"
col_data = self.dataframe_acf_slices[tot_col_str].std(axis=1).values
self.dataframe_acf = add_col_to_df(self.dataframe_acf, col_data, col_name)
# @jit Numba doesn't like Scipy
[docs] def calculate_vacf(self, vel):
"""
Calculate the velocity autocorrelation function of each species and in each direction.
Parameters
----------
vel : numpy.ndarray
Particles' velocities stored in a 3D array with shape = (D x Np x Nt).
D = cartesian dimensions, Np = Number of particles, Nt = number of dumps.
Returns
-------
vacf: numpy.ndarray
Velocity autocorrelation functions. Shape= (No_species, D + 1, Nt)
"""
no_dim = vel.shape[0]
vacf = zeros((self.num_species, no_dim + 1, self.acf_slice_steps))
species_vacf = zeros(self.acf_slice_steps)
ptcl_vacf = zeros(self.acf_slice_steps)
# Calculate the vacf of each species in each dimension
for d in tqdm(range(no_dim), desc=f"Dimension", position=1, disable=not self.verbose, leave=False):
species_start = 0
species_end = 0
for sp, np in enumerate(
tqdm(self.no_ptcls_per_species, desc=f"Species", position=2, disable=not self.verbose, leave=False)
):
species_end += np
# Temporary species vacf
# Calculate the vacf for each particle of species sp
for ptcl in range(species_start, species_end):
for it in range(self.acf_slice_steps):
v = vel[d, ptcl, : self.acf_slice_steps + it]
# Calculate the correlation function and add it to the array
ptcl_vacf[it] = correlationfunction(v, v)[it]
# Add this particle vacf to the species vacf and normalize by the time origins
species_vacf += ptcl_vacf
# Save the species vacf for dimension i
vacf[sp, d, :] = species_vacf / np
# Save the total vacf
vacf[sp, -1, :] += species_vacf / np
# Move to the next species first particle position
species_start += np
return vacf
# TODO: Review and fix this class
[docs]class VelocityDistribution(Observable):
"""
Moments of the velocity distributions defined as
.. math::
\\langle v^{\\alpha} \\rangle = \\int_{-\\infty}^{\\infty} d v \\, f(v) v^{2 \\alpha}.
Attributes
----------
no_bins: int
Number of bins used to calculate the velocity distribution.
plots_dir: str
Directory in which to store Hermite coefficients plots.
species_plots_dirs : list, str
Directory for each species where to save Hermite coefficients plots.
max_no_moment: int
Maximum number of moments = :math:`\\alpha`. Default = 6.
"""
[docs] def __init__(self):
super(VelocityDistribution, self).__init__()
self.max_no_moment = None
self.__name__ = "vd"
self.__long_name__ = "Velocity Distribution"
[docs] def setup(
self,
params,
phase: str = None,
no_slices: int = None,
hist_kwargs: dict = None,
max_no_moment: int = None,
multi_run_average: bool = None,
dimensional_average: bool = None,
runs: int = 1,
curve_fit_kwargs: dict = None,
**kwargs,
):
"""
Assign attributes from simulation's parameters.
Parameters
----------
runs
dimensional_average
multi_run_average
params : :class:`sarkas.core.Parameters`
Simulation's parameters.
phase : str, optional
Phase to compute. Default = 'production'.
no_slices : int, optional
max_no_moment : int, optional
Maximum number of moments to calculate. Default = 6.
hist_kwargs : dict, optional
Dictionary of keyword arguments to pass to ``histogram`` for the calculation of the distributions.
curve_fit_kwargs: dict, optional
Dictionary of keyword arguments to pass to ``scipy.curve_fit`` for fitting of Hermite coefficients.
**kwargs :
These will overwrite any :class:`sarkas.core.Parameters`
or default :class:`sarkas.tools.observables.Observable`
attributes and/or add new ones.
"""
super().setup_init(
params,
phase=phase,
dimensional_average=dimensional_average,
multi_run_average=multi_run_average,
runs=runs,
no_slices=no_slices,
)
self.update_args(hist_kwargs, max_no_moment, curve_fit_kwargs, **kwargs)
[docs] @arg_update_doc
def update_args(self, hist_kwargs: dict = None, max_no_moment: int = None, curve_fit_kwargs: dict = None, **kwargs):
if curve_fit_kwargs:
self.curve_fit_kwargs = curve_fit_kwargs
# Check on hist_kwargs
if hist_kwargs:
# Is it a dictionary ?
if not isinstance(hist_kwargs, dict):
raise TypeError("hist_kwargs not a dictionary. Please pass a dictionary.")
# Did you pass a single dictionary for multispecies?
for key, value in hist_kwargs.items():
# The elements of hist_kwargs should be lists
if not isinstance(hist_kwargs[key], list):
hist_kwargs[key] = [value for i in range(self.num_species)]
self.hist_kwargs = hist_kwargs
# Default number of moments to calculate
if max_no_moment:
self.max_no_moment = max_no_moment
else:
self.max_no_moment = 6
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
self.update_finish()
# # Directories in which to store plots
# self.plots_dir = os_path_join(self.saving_dir, "Plots")
# if not os_path_exists(self.plots_dir):
# mkdir(self.plots_dir)
# Paths where to store the dataframes
if self.multi_run_average:
self.filename_csv = os_path_join(self.saving_dir, "VelocityDistribution.csv")
self.filename_hdf = os_path_join(self.saving_dir, "VelocityDistribution.h5")
else:
self.filename_csv = os_path_join(self.saving_dir, "VelocityDistribution_" + self.job_id + ".csv")
self.filename_hdf = os_path_join(self.saving_dir, "VelocityDistribution_" + self.job_id + ".h5")
if hasattr(self, "max_no_moment"):
self.moments_dataframe = None
self.mom_df_filename_csv = os_path_join(self.saving_dir, "Moments_" + self.job_id + ".csv")
if hasattr(self, "max_hermite_order"):
self.hermite_dataframe = None
self.herm_df_filename_csv = os_path_join(self.saving_dir, "HermiteCoefficients_" + self.job_id + ".csv")
# some checks
if not hasattr(self, "hermite_rms_tol"):
self.hermite_rms_tol = 0.05
self.species_plots_dirs = None
# Need this for pretty print
# Calculate the dimension of the velocity container
# 2nd Dimension of the raw velocity array
self.dim = 1 if self.dimensional_average else self.dimensions
# range(inv_dim) for the loop over dimension
self.inv_dim = self.dimensions if self.dimensional_average else 1
self.prepare_histogram_args()
self.save_pickle()
[docs] def compute(self, compute_moments: bool = False, compute_Grad_expansion: bool = False):
"""
Calculate the moments of the velocity distributions and save them to a pandas dataframes and csv.
Parameters
----------
hist_kwargs : dict, optional
Dictionary with arguments to pass to ``numpy.histogram``.
**kwargs :
These will overwrite any :class:`sarkas.core.Parameters`
or default :class:`sarkas.tools.observables.Observable`
attributes and/or add new ones.
"""
# Print info to screen
self.pretty_print()
# Grab simulation data
time, vel_raw = self.grab_sim_data(pva="vel")
# Normality test
self.normality_tests(time=time, vel_data=vel_raw)
# Make the velocity distribution
self.create_distribution(vel_raw, time)
# Calculate velocity moments
if compute_moments:
self.compute_moments(parse_data=False, vel_raw=vel_raw, time=time)
#
if compute_Grad_expansion:
self.compute_hermite_expansion(compute_moments=False)
[docs] def normality_tests(self, time, vel_data):
"""
Calculate the Shapiro-Wilks test for each timestep from the raw velocity data and store it into a dataframe.
Parameters
----------
Returns
-------
time : numpy.ndarray
One dimensional array with time data.
vel_data : numpy.ndarray
See `Returns` of :meth:`sarkas.tools.observables.Observable.grab_sim_data`
Array with shape (:attr:`sarkas.tools.observables.Observable.no_dumps`,
:attr:`sarkas.tools.observables.Observable.self.dim`, :attr:`sarkas.tools.observables.Observable.runs` *
:attr:`sarkas.tools.observables.Observable.inv_dim` * :attr:`sarkas.tools.observables.Observable.total_num_ptcls`).
`.dim` = 1 if :attr:`sarkas.tools.observables.Observable.dimensional_average = True` otherwise equals the number
of dimensions, (e.g. 3D : 3) `.runs` is the number of runs to be averaged over. Default = 1. `.inv_dim` is
the else option of `dim`. If `dim = 1` then `.inv_dim = .dimensions` and viceversa.
"""
no_dim = vel_data.shape[1]
stats_df_columns = (
"Time",
*[
"{}_{}_{}".format(sp, d, st)
for sp in self.species_names
for _, d in zip(range(no_dim), ["X", "Y", "Z"])
for st in ["s", "p"]
],
)
stats_mat = zeros((len(time), len(self.species_num) * no_dim * 2 + 1))
for it, tme in enumerate(time):
stats_mat[it, 0] = tme
for d, ds in zip(range(no_dim), ["X", "Y", "Z"]):
for sp, sp_start in enumerate(self.species_index_start[:-1]):
# Calculate the correct start and end index for storage
sp_end = self.species_index_start[sp + 1]
statcs, p_value = scp_stats.shapiro(vel_data[it, d, sp_start:sp_end])
stats_mat[it, 1 + 6 * sp + 2 * d] = statcs
stats_mat[it, 1 + 6 * sp + 2 * d + 1] = p_value
self.norm_test_df = DataFrame(stats_mat, columns=stats_df_columns)
self.norm_test_df.columns = MultiIndex.from_tuples([tuple(c.split("_")) for c in stats_df_columns])
[docs] def prepare_histogram_args(self):
# Initialize histograms arguments
if not hasattr(self, "hist_kwargs"):
self.hist_kwargs = {"density": [], "bins": [], "range": []}
# Default values
bin_width = 0.05
# Range of the histogram = (-wid * vth, wid * vth)
wid = 5
# The number of bins is calculated from default values of bin_width and wid
no_bins = int(2.0 * wid / bin_width)
# Calculate thermal speed from energy/temperature data.
try:
energy_fle = self.prod_energy_filename if self.phase == "production" else self.eq_energy_filename
energy_df = read_csv(energy_fle, index_col=False, encoding="utf-8")
if self.num_species > 1:
vth = zeros(self.num_species)
for sp, (sp_mass, sp_name) in enumerate(zip(self.species_masses, self.species_names)):
vth[sp] = sqrt(energy_df["{} Temperature".format(sp_name)].mean() * self.kB / sp_mass)
else:
vth = sqrt(energy_df["Temperature"].mean() * self.kB / self.species_masses)
except FileNotFoundError:
# In case you are using this in PreProcessing stage
vth = sqrt(self.kB * self.T_desired / self.species_masses)
self.vth = vth.copy()
# Create the default dictionary of histogram args
default_hist_kwargs = {"density": [], "bins": [], "range": []}
if self.num_species > 1:
for sp in range(self.num_species):
default_hist_kwargs["density"].append(True)
default_hist_kwargs["bins"].append(no_bins)
default_hist_kwargs["range"].append((-wid * vth[sp], wid * vth[sp]))
else:
default_hist_kwargs["density"].append(True)
default_hist_kwargs["bins"].append(no_bins)
default_hist_kwargs["range"].append((-wid * vth[0], wid * vth[0]))
# Now do some checks.
# Check for numpy.histogram args in kwargs
must_have_keys = ["bins", "range", "density"]
for k, key in enumerate(must_have_keys):
try:
# Is it empty?
if len(self.hist_kwargs[key]) == 0:
self.hist_kwargs[key] = default_hist_kwargs[key]
except KeyError:
self.hist_kwargs[key] = default_hist_kwargs[key]
# Ok at this point I have a dictionary whose elements are list.
# I want the inverse: a list whose elements are dicts
self.list_hist_kwargs = []
for indx in range(self.num_species):
another_dict = {}
# Loop over the keys and grab the species value
for key, values in self.hist_kwargs.items():
another_dict[key] = values[indx]
self.list_hist_kwargs.append(another_dict)
[docs] def create_distribution(self, vel_raw: ndarray = None, time: ndarray = None):
"""
Calculate the velocity distribution of each species and save the corresponding dataframes.
Parameters
----------
vel_raw: ndarray, optional
Container of particles velocity at each time step.
time: ndarray, optional
Time array.
"""
print("\nCreating velocity distributions ...")
tinit = self.timer.current()
no_dumps = vel_raw.shape[0]
no_dim = vel_raw.shape[1]
# I want a Hierarchical dataframe
# Example:
# Ca Yb Ca Yb Ca Yb
# X X Y Y Z Z
# 1.54e-03 -2.54e+22 3.54e+00 1 2 1.54e-03 -2.54e+22 3.54e+00 1 2 1.54e-03 -2.54e+22 3.54e+00 1 2
# This has 3 rows of columns. The first identifies the species, The second the axis,
# and the third the value of the bin_edge
# This can be easily obtained from pandas dataframe MultiIndex. But first I will create a dataframe
# from a huge matrix. The columns of this dataframe will be
# Ca_X_1.54e-03 Ca_X_-2.54e+22 Ca_X_3.54e+00 Yb_X_1 Yb_X_2 Ca_Y_1.54e-03 Ca_Y_-2.54e+22 Ca_Y_3.54e+00 Yb_Y_1 ...
# using MultiIndex.from_tuples([tuple(c.split("_")) for c in df.columns]) I can create a hierarchical df.
# This is because I can access the data as
# df['Ca']['X']
# >>> 1.54e-03 -2.54e+22 3.54e+00
# >>> Time
# >>> 0 -1.694058 1.217008 -0.260678
# with the index = to my time array.
# see https://pandas.pydata.org/pandas-docs/stable/user_guide/advanced.html#reconstructing-the-level-labels
# Columns
full_df_columns = []
# At the first time step I will create the columns list.
dist_matrix = zeros((len(time), self.dim * (sum(self.hist_kwargs["bins"]) + self.num_species)))
# The +1 at the end is because I decided to add a column containing the timestep
# For convenience save the bin edges somewhere else. The columns of the dataframe are string. This will give
# problems when plotting.
# Use a dictionary since the arrays could be different lengths
self.species_bin_edges = {}
for k in self.species_names:
# Initialize the sub-dictionary
self.species_bin_edges[k] = {}
for it in range(no_dumps):
for d, ds in zip(range(no_dim), ["X", "Y", "Z"]):
indx_0 = 0
for indx, (sp_name, sp_start) in enumerate(zip(self.species_names, self.species_index_start)):
# Calculate the correct start and end index for storage
sp_end = self.species_index_start[indx + 1]
bin_count, bin_edges = histogram(vel_raw[it, d, sp_start:sp_end], **self.list_hist_kwargs[indx])
# Executive decision: Center the bins
bin_loc = 0.5 * (bin_edges[:-1] + bin_edges[1:])
# Create the second_column_row the dataframe
if it == 0:
# Create the column array
full_df_columns.append(["{}_{}_Time".format(sp_name, ds)])
full_df_columns.append(["{}_{}_{:6e}".format(sp_name, ds, be) for be in bin_loc])
self.species_bin_edges[sp_name][ds] = bin_edges
# Ok. Now I have created the bins columns' names
# Time to insert in the huge matrix.
indx_1 = indx_0 + 1 + len(bin_count)
dist_matrix[it, indx_0] = time[it]
dist_matrix[it, indx_0 + 1 : indx_1] = bin_count
indx_0 = indx_1
# Alright. The matrix is filled now onto the dataframe
# First let's flatten the columns array. This is because I have something like this
# Example: Binary Mixture with 3 H bins per axis and 2 He bins per axis
# columns =[['H_X', 'H_X', 'H_X'], ['He_X', 'He_X'], ['H_Y', 'H_Y', 'H_Y'], ['He_Y', 'He_Y'] ... Z-axis]
# Flatten with list(concatenate(columns).flat)
# Now has become
# first_column_row=['H_X', 'H_X', 'H_X', 'He_X', 'He_X', 'H_Y', 'H_Y', 'H_Y', 'He_Y', 'He_Y' ... Z-axis]
# I think this is easier to understand than using nested list comprehension
# see https://stackabuse.com/python-how-to-flatten-list-of-lists/
full_df_columns = list(concatenate(full_df_columns).flat)
self.dataframe = DataFrame(dist_matrix, columns=full_df_columns)
# Save it
self.dataframe.to_csv(self.filename_csv, encoding="utf-8", index=False)
# Hierarchical DataFrame
self.hierarchical_dataframe = self.dataframe.copy()
self.hierarchical_dataframe.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.hierarchical_dataframe.columns]
)
self.hierarchical_dataframe.to_hdf(self.filename_hdf, key=self.__name__, encoding="utf-8")
tend = self.timer.current()
time_stamp(
self.log_file, "Velocity distribution calculation", self.timer.time_division(tend - tinit), self.verbose
)
[docs] def compute_moments(self, parse_data: bool = False, vel_raw: ndarray = None, time: ndarray = None):
"""Calculate and save moments of the distribution.
Parameters
----------
parse_data: bool
Flag for reading data. Default = False. If False, must pass ``vel_raw`` and ``time``.
If True it will parse data from simulations dumps.
vel_raw: ndarray, optional
Container of particles velocity at each time step.
time: ndarray, optional
Time array.
"""
self.moments_dataframe = DataFrame()
self.moments_hdf_dataframe = DataFrame()
if parse_data:
time, vel_raw = self.grab_sim_data()
self.moments_dataframe["Time"] = time
print("\nCalculating velocity moments ...")
tinit = self.timer.current()
moments, ratios = calc_moments(vel_raw, self.max_no_moment, self.species_index_start)
tend = self.timer.current()
time_stamp(self.log_file, "Velocity moments calculation", self.timer.time_division(tend - tinit), self.verbose)
# Save the dataframe
if self.dimensional_average:
for i, sp in enumerate(self.species_names):
self.moments_hdf_dataframe[f"{sp}_X_Time"] = time
for m in range(self.max_no_moment):
self.moments_dataframe[f"{sp} {m + 1} moment"] = moments[i, :, 0, m]
self.moments_hdf_dataframe[f"{sp}_X_{m + 1} moment"] = moments[i, :, 0, m]
for m in range(self.max_no_moment):
self.moments_dataframe[f"{sp} {m + 1} moment ratio"] = ratios[i, :, 0, m]
self.moments_hdf_dataframe[f"{sp}_X_{m + 1}-2 ratio"] = ratios[i, :, 0, m]
else:
for i, sp in enumerate(self.species_names):
for d, ds in zip(range(self.dim), ["X", "Y", "Z"]):
self.moments_hdf_dataframe[f"{sp}_{ds}_Time"] = time
for m in range(self.max_no_moment):
self.moments_dataframe[f"{sp} {m + 1} moment axis {ds}"] = moments[i, :, d, m]
self.moments_hdf_dataframe[f"{sp}_{ds}_{m + 1} moment"] = moments[i, :, d, m]
for d, ds in zip(range(self.dim), ["X", "Y", "Z"]):
self.moments_hdf_dataframe[f"{sp}_{ds}_Time"] = time
for m in range(self.max_no_moment):
self.moments_dataframe[f"{sp} {m + 1} moment ratio axis {ds}"] = ratios[i, :, d, m]
self.moments_hdf_dataframe[f"{sp}_{ds}_{m + 1}-2 ratio"] = ratios[i, :, d, m]
self.moments_dataframe.to_csv(self.filename_csv, index=False, encoding="utf-8")
# Hierarchical DF Save
# Make the columns
self.moments_hdf_dataframe.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.moments_hdf_dataframe.columns]
)
# Save the df in the hierarchical df with a new key/group
self.moments_hdf_dataframe.to_hdf(self.filename_hdf, mode="a", key="velocity_moments", encoding="utf-8")
[docs] def compute_hermite_expansion(self, compute_moments: bool = False):
"""
Calculate and save Hermite coefficients of the Grad expansion.
Parameters
----------
compute_moments: bool
Flag for calculating velocity moments. These are needed for the hermite calculation.
Default = True.
Notes
-----
This is still in the development stage. Do not trust the results. It requires more study in non-equilibrium
statistical mechanics.
"""
from scipy.optimize import curve_fit
self.hermite_dataframe = DataFrame()
self.hermite_hdf_dataframe = DataFrame()
if compute_moments:
self.compute_moments(parse_data=True)
if not hasattr(self, "hermite_rms_tol"):
self.hermite_rms_tol = 0.05
self.hermite_dataframe["Time"] = self.moments_dataframe["Time"].copy()
self.hermite_sigmas = zeros((self.num_species, self.dim, len(self.hermite_dataframe["Time"])))
self.hermite_epochs = zeros((self.num_species, self.dim, len(self.hermite_dataframe["Time"])))
hermite_coeff = zeros(
(self.num_species, self.dim, self.max_hermite_order + 1, len(self.hermite_dataframe["Time"]))
)
print("\nCalculating Hermite coefficients ...")
tinit = self.timer.current()
for sp, sp_name in enumerate(tqdm(self.species_names, desc="Species")):
for it, t in enumerate(tqdm(self.hermite_dataframe["Time"], desc="Time")):
# Grab the thermal speed from the moments
vrms = self.moments_dataframe["{} 2 moment".format(sp_name)].iloc[it]
# Loop over dimensions. No tensor availability yet
for d, ds in zip(range(self.dim), ["X", "Y", "Z"]):
# Grab the distribution from the hierarchical df
dist = self.hierarchical_dataframe[sp_name][ds].iloc[it, 1:]
# Grab and center the bins
v_bins = 0.5 * (self.species_bin_edges[sp_name][ds][1:] + self.species_bin_edges[sp_name][ds][:-1])
cntrl = True
j = 0
# This is a routine to calculate the hermite coefficient in the case of non-equilibrium simulations.
# In non-equilibrium we cannot define a temperature. This is because the temperature is defined from
# the rms width of a Gaussian distribution. In non-equilibrium we don't have a Gaussian, thus the
# first thing to do is to find the underlying Gaussian in our distribution. This is what this
# iterative procedure is for.
while cntrl:
# Normalize
norm = trapz(dist, x=v_bins / vrms)
# Calculate the hermite coeff
h_coeff = calculate_herm_coeff(v_bins / vrms, dist / norm, self.max_hermite_order)
# Fit the rms only to the Grad expansion. This finds the underlying Gaussian
res, _ = curve_fit(
# the lambda func is because i need to fit only rms not the h_coeff
lambda x, rms: grad_expansion(x, rms, h_coeff),
v_bins / vrms,
dist / norm,
self.curve_fit_kwargs,
)
vrms *= res[0]
if abs(1.0 - res[0]) < self.hermite_rms_tol:
cntrl = False
self.hermite_sigmas[sp, d, it] = vrms
self.hermite_epochs[sp, d, it] = j
hermite_coeff[sp, d, :, it] = h_coeff
j += 1
tend = self.timer.current()
for sp, sp_name in enumerate(self.species_names):
for d, ds in zip(range(self.dim), ["X", "Y", "Z"]):
for h in range(self.max_hermite_order):
self.hermite_dataframe["{} {} {} Hermite coeff".format(sp_name, ds, h)] = hermite_coeff[sp, d, h, :]
if h == 0:
self.hermite_hdf_dataframe["{}_{}_Time".format(sp_name, ds, h)] = hermite_coeff[sp, d, h, :]
self.hermite_hdf_dataframe["{}_{}_RMS".format(sp_name, ds, h)] = self.hermite_sigmas[sp, d, :]
self.hermite_hdf_dataframe["{}_{}_epoch".format(sp_name, ds, h)] = self.hermite_epochs[sp, d, :]
else:
self.hermite_hdf_dataframe["{}_{}_{} coeff".format(sp_name, ds, h)] = hermite_coeff[sp, d, h, :]
# Save the CSV
self.hermite_dataframe.to_csv(self.herm_df_filename_csv, index=False, encoding="utf-8")
# Make the columns
self.hermite_hdf_dataframe.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.hermite_hdf_dataframe.columns]
)
# Save the df in the hierarchical df with a new key/group
self.hermite_hdf_dataframe.to_hdf(self.filename_hdf, mode="a", key="hermite_coefficients", encoding="utf-8")
time_stamp(self.log_file, "Hermite expansion calculation", self.timer.time_division(tend - tinit), self.verbose)
[docs] def pretty_print(self):
"""Print information in a user-friendly way."""
print("\n\n{:=^70} \n".format(" " + self.__long_name__ + " "))
print("CSV dataframe saved in:\n\t ", self.filename_csv)
print("HDF5 dataframe saved in:\n\t ", self.filename_hdf)
print("Data accessible at: self.dataframe, self.hierarchical_dataframe, self.species_bin_edges")
print("\nMulti run average: ", self.multi_run_average)
print("No. of runs: ", self.runs)
print(
"Size of the parsed velocity array: {} x {} x {}".format(
self.no_dumps, self.dim, self.runs * self.inv_dim * self.total_num_ptcls
)
)
print("\nHistograms Information:")
for sp, (sp_name, dics) in enumerate(zip(self.species_names, self.list_hist_kwargs)):
if sp == 0:
print("Species: {}".format(sp_name))
else:
print("\nSpecies: {}".format(sp_name))
print("No. of samples = {}".format(self.species_num[sp] * self.inv_dim * self.runs))
print("Thermal speed: v_th = {:.6e} ".format(self.vth[sp]), end="")
print("[cm/s]" if self.units == "cgs" else "[m/s]")
for key, values in dics.items():
if key == "range":
print(
"{} : ( {:.2f}, {:.2f} ) v_th,"
"\n\t( {:.4e}, {:.4e} ) ".format(key, *values / self.vth[sp], *values),
end="",
)
print("[cm/s]" if self.units == "cgs" else "[m/s]")
else:
print("{}: {}".format(key, values))
bin_width = abs(dics["range"][1] - dics["range"][0]) / (self.vth[sp] * dics["bins"])
print("Bin Width = {:.4f}".format(bin_width))
if hasattr(self, "max_no_moment"):
print("\nMoments Information:")
print("CSV dataframe saved in:\n\t ", self.mom_df_filename_csv)
print("Data accessible at: self.moments_dataframe, self.moments_hdf_dataframe")
print("Highest moment to calculate: {}".format(self.max_no_moment))
if hasattr(self, "max_hermite_order"):
print("\nGrad Expansion Information:")
print("CSV dataframe saved in:\n\t ", self.herm_df_filename_csv)
print("Data accessible at: self.hermite_dataframe, self.hermite_hdf_dataframe")
print("Highest order to calculate: {}".format(self.max_hermite_order))
print("RMS Tolerance: {:.3f}".format(self.hermite_rms_tol))
[docs]@njit
def calc_Sk(nkt, k_list, k_counts, species_np, no_dumps):
"""
Calculate :math:`S_{ij}(k)` at each saved timestep.
Parameters
----------
nkt : numpy.ndarray, complex
Density fluctuations of all species. Shape = ( ``no_species``, ``no_dumps``, ``no_ka_values``)
k_list :
List of :math:`k` indices in each direction with corresponding magnitude and index of ``k_counts``.
Shape=(``no_ka_values``, 5)
k_counts : numpy.ndarray
Number of times each :math:`k` magnitude appears.
species_np : numpy.ndarray
Array with number of particles of each species.
no_dumps : int
Number of dumps.
Returns
-------
Sk_all : numpy.ndarray
Array containing :math:`S_{ij}(k)`. Shape=(``no_Sk``, ``no_ka_values``, ``no_dumps``)
"""
no_sk = int(len(species_np) * (len(species_np) + 1) / 2)
Sk_raw = zeros((no_sk, len(k_counts), no_dumps))
pair_indx = 0
for ip, si in enumerate(species_np):
for jp in range(ip, len(species_np)):
sj = species_np[jp]
dens_const = 1.0 / sqrt(si * sj)
for it in range(no_dumps):
for ik, ka in enumerate(k_list):
indx = int(ka[-1])
nk_i = nkt[ip, it, ik]
nk_j = nkt[jp, it, ik]
Sk_raw[pair_indx, indx, it] += real(nk_i.conjugate() * nk_j) * dens_const / k_counts[indx]
pair_indx += 1
return Sk_raw
[docs]def calc_Skw(nkt, ka_list, species_np, no_dumps, dt, dump_step):
"""
Calculate the Fourier transform of the correlation function of ``nkt``.
Parameters
----------
nkt : complex, numpy.ndarray
Particles' density or velocity fluctuations.
Shape = ( ``no_species``, ``no_dumps``, ``no_k_list``)
ka_list : list
List of :math:`k` indices in each direction with corresponding magnitude and index of ``ka_counts``.
Shape=(``no_ka_values``, 5)
species_np : numpy.ndarray
Array with one element giving number of particles.
no_dumps : int
Number of dumps.
dt : float
Time interval.
dump_step : int
Snapshot interval.
Returns
-------
Skw_all : numpy.ndarray
DSF/CCF of each species and pair of species.
Shape = (``no_skw``, ``no_ka_values``, ``no_dumps``)
"""
# Fourier transform normalization: norm = dt / Total time
norm = dt / sqrt(no_dumps * dt * dump_step)
# number of independent observables
no_skw = int(len(species_np) * (len(species_np) + 1) / 2)
# DSF
# Skw = zeros((no_skw, len(ka_counts), no_dumps))
Skw_all = zeros((no_skw, len(ka_list), no_dumps))
pair_indx = 0
for ip, si in enumerate(species_np):
for jp in range(ip, len(species_np)):
sj = species_np[jp]
dens_const = 1.0 / sqrt(si * sj)
for ik, ka in enumerate(ka_list):
# indx = int(ka[-1])
nkw_i = fft(nkt[ip, :, ik]) * norm
nkw_j = fft(nkt[jp, :, ik]) * norm
Skw_all[pair_indx, ik, :] = fftshift(real(nkw_i.conjugate() * nkw_j) * dens_const)
# Skw[pair_indx, indx, :] += Skw_all[pair_indx, ik, :] / ka_counts[indx]
pair_indx += 1
return Skw_all
[docs]@njit
def calc_elec_current(vel, sp_charge, sp_num):
"""
Calculate the total electric current and electric current of each species.
Parameters
----------
vel: numpy.ndarray
Particles' velocities.
sp_charge: numpy.ndarray
Charge of each species.
sp_num: numpy.ndarray
Number of particles of each species.
Returns
-------
Js : numpy.ndarray
Electric current of each species. Shape = (``no_species``, ``no_dim``, ``no_dumps``)
Jtot : numpy.ndarray
Total electric current. Shape = (``no_dim``, ``no_dumps``)
"""
no_dumps = vel.shape[1]
no_dim = vel.shape[0]
Js = zeros((sp_num.shape[0], no_dim, no_dumps))
Jtot = zeros((no_dim, no_dumps))
sp_start = 0
sp_end = 0
for s, (q_sp, n_sp) in enumerate(zip(sp_charge, sp_num)):
# Find the index of the last particle of species s
sp_end += n_sp
# Calculate the current of each species
Js[s, :, :] = q_sp * vel[:, :, sp_start:sp_end].sum(axis=-1)
# Add to the total current
Jtot[:, :] += Js[s, :, :]
sp_start += n_sp
return Js, Jtot
[docs]def calc_moments(dist, max_moment, species_index_start):
"""
Calculate the moments of the (velocity) distribution.
Parameters
----------
dist: numpy.ndarray
Distribution of each time step. Shape = (``no_dumps``, ``dim``, ``runs * inv_dim * total_num_ptlcs``)
max_moment: int
Maximum moment to calculate
species_index_start: numpy.ndarray
Array containing the start index of each species. The last value is equivalent to dist.shape[-1]
Returns
-------
moments: numpy.ndarray
Moments of the distribution.
Shape=( ``no_species``, ``no_dumps``, ``dim``, ``max_moment`` )
ratios: numpy.ndarray
Ratios of each moment with respect to the expected Maxwellian value.
Shape=( ``no_species``, ``no_dumps``, ``no_dim``, ``max_moment - 1``)
Notes
-----
See these `equations <https://en.wikipedia.org/wiki/Normal_distribution#Moments:~:text=distribution.-,Moments>`_
"""
# from scipy.stats import moment as scp_moment
from scipy.special import gamma as scp_gamma
no_species = len(species_index_start)
no_dumps = dist.shape[0]
dim = dist.shape[1]
moments = zeros((no_species, no_dumps, dim, max_moment))
ratios = zeros((no_species, no_dumps, dim, max_moment))
for indx, sp_start in enumerate(species_index_start[:-1]):
# Calculate the correct start and end index for storage
sp_end = species_index_start[indx + 1]
for mom in range(max_moment):
moments[indx, :, :, mom] = scp_stats.moment(dist[:, :, sp_start:sp_end], moment=mom + 1, axis=-1)
# sqrt( <v^2> ) = standard deviation = moments[:, :, :, 1] ** (1/2)
for mom in range(max_moment):
pwr = mom + 1
const = 2.0 ** (pwr / 2) * scp_gamma((pwr + 1) / 2) / sqrt(pi)
ratios[:, :, :, mom] = moments[:, :, :, mom] / (const * moments[:, :, :, 1] ** (pwr / 2.0))
return moments, ratios
[docs]@njit
def calc_nk(pos_data, k_list):
"""
Calculate the instantaneous microscopic density :math:`n(k)` defined as
.. math::
n_{A} ( k ) = \\sum_i^{N_A} \\exp [ -i \\mathbf k \\cdot \\mathbf r_{i} ]
Parameters
----------
pos_data : numpy.ndarray
Particles' position scaled by the box lengths.
Shape = ( ``no_dumps``, ``no_dim``, ``tot_no_ptcls``)
k_list : list
List of :math:`k` indices in each direction with corresponding magnitude and index of ``ka_counts``.
Shape=(``no_ka_values``, 5)
Returns
-------
nk : numpy.ndarray
Array containing :math:`n(k)`.
"""
nk = zeros(len(k_list), dtype=complex128)
for ik, k_vec in enumerate(k_list):
kr_i = 2.0 * pi * (k_vec[0] * pos_data[:, 0] + k_vec[1] * pos_data[:, 1] + k_vec[2] * pos_data[:, 2])
nk[ik] = (exp(-1j * kr_i)).sum()
return nk
[docs]def calc_nkt(fldr, slices, dump_step, species_np, k_list, verbose):
"""
Calculate density fluctuations :math:`n(k,t)` of all species.
.. math::
n_{A} ( k, t ) = \\sum_i^{N_A} \\exp [ -i \\mathbf k \\cdot \\mathbf r_{i}(t) ]
where :math:`N_A` is the number of particles of species :math:`A`.
Parameters
----------
fldr : str
Name of folder containing particles data.
slices : tuple, int
Initial, final step number of the slice, total number of slice steps.
dump_step : int
Snapshot interval.
species_np : numpy.ndarray
Number of particles of each species.
k_list : list
List of :math: `k` vectors.
Return
------
nkt : numpy.ndarray, complex
Density fluctuations. Shape = ( ``no_species``, ``no_dumps``, ``no_ka_values``)
"""
# Read particles' position for times in the slice
nkt = zeros((len(species_np), slices[2], len(k_list)), dtype=complex128)
for it, dump in enumerate(
tqdm(range(slices[0], slices[1], dump_step), desc="Timestep", position=1, disable=not verbose, leave=False)
):
data = load_from_restart(fldr, dump)
pos = data["pos"]
sp_start = 0
sp_end = 0
for i, sp in enumerate(species_np):
sp_end += sp
nkt[i, it, :] = calc_nk(pos[sp_start:sp_end, :], k_list)
sp_start += sp
return nkt
[docs]@njit
def calc_statistical_efficiency(observable, run_avg, run_std, max_no_divisions, no_dumps):
"""
Todo:
Parameters
----------
observable
run_avg
run_std
max_no_divisions
no_dumps
Returns
-------
"""
tau_blk = zeros(max_no_divisions)
sigma2_blk = zeros(max_no_divisions)
statistical_efficiency = zeros(max_no_divisions)
for i in range(2, max_no_divisions):
tau_blk[i] = int(no_dumps / i)
for j in range(i):
t_start = int(j * tau_blk[i])
t_end = int((j + 1) * tau_blk[i])
blk_avg = observable[t_start:t_end].mean()
sigma2_blk[i] += (blk_avg - run_avg) ** 2
sigma2_blk[i] /= i - 1
statistical_efficiency[i] = tau_blk[i] * sigma2_blk[i] / run_std**2
return tau_blk, sigma2_blk, statistical_efficiency
# @jit Numba doesn't like scipy.signal
[docs]def calc_diff_flux_acf(vel, sp_num, sp_conc, sp_mass):
"""
Calculate the diffusion fluxes and their autocorrelations functions in each direction.
Parameters
----------
vel : numpy.ndarray
Particles' velocities. Shape = (``dimensions``, ``no_dumps``, :attr:`total_num_ptcls`)
sp_num: numpy.ndarray
Number of particles of each species.
sp_conc: numpy.ndarray
Concentration of each species.
sp_mass: numpy.ndarray
Particle's mass of each species.
Returns
-------
J_flux: numpy.ndarray
Diffusion fluxes.
Shape = ( (``num_species - 1``), ``dimensions`` , ``no_dumps``)
jr_acf: numpy.ndarray
Relative Diffusion flux autocorrelation function.
Shape = ( (``num_species - 1``) x (``num_species - 1``), ``no_dim + 1``, ``no_dumps``)
"""
no_dim = vel.shape[0]
no_dumps = vel.shape[1]
no_species = len(sp_num)
# number of independent fluxes = no_species - 1,
# number of acf of ind fluxes = no_species - 1 ^2
no_jc_acf = int((no_species - 1) * (no_species - 1))
# Current of each species in each direction and at each timestep
tot_vel = zeros((no_species, no_dim, no_dumps))
sp_start = 0
sp_end = 0
# Calculate the total center of mass velocity (tot_com_vel)
# and the center of mass velocity of each species (com_vel)
for i, ns in enumerate(sp_num):
sp_end += ns
tot_vel[i, :, :] = vel[:, :, sp_start:sp_end].sum(axis=-1)
# tot_com_vel += mass_densities[i] * com_vel[i, :, :] / tot_mass_dens
sp_start += ns
# Diffusion Fluxes
J_flux = zeros((no_species - 1, no_dim, no_dumps))
# Relative Diffusion fluxes for ACF and Transport calc
jr_flux = zeros((no_species - 1, no_dim, no_dumps))
# Relative diff flux acf
jr_acf = zeros((no_jc_acf, no_dim + 1, no_dumps))
m_bar = sp_mass @ sp_conc
# the diffusion fluxes from eq.(3.5) in Zhou J Phs Chem
for i, m_alpha in enumerate(sp_mass[:-1]):
# Flux
for j, m_beta in enumerate(sp_mass):
delta_ab = 1 * (m_beta == m_alpha)
J_flux[i, :, :] += (m_bar * delta_ab - sp_conc[i] * m_beta) * tot_vel[j, :, :]
jr_flux[i, :, :] += (delta_ab - sp_conc[i]) * tot_vel[j, :, :]
J_flux[i, :, :] *= m_alpha / m_bar
indx = 0
# Remember to change this for 3+ species
# binary_const = ( m_bar/sp_mass.prod() )**2
# Calculate the correlation function in each direction
for i, sp1_flux in enumerate(jr_flux):
for j, sp2_flux in enumerate(jr_flux):
for d in range(no_dim):
# Calculate the correlation function and add it to the array
jr_acf[indx, d, :] = correlationfunction(sp1_flux[d, :], sp2_flux[d, :])
# Calculate the total correlation function by summing the three directions
jr_acf[indx, -1, :] += jr_acf[indx, d, :]
indx += 1
return J_flux, jr_acf
[docs]@njit
def calc_vk(pos_data, vel_data, k_list):
"""
Calculate the instantaneous longitudinal and transverse velocity fluctuations.
Parameters
----------
pos_data : numpy.ndarray
Particles' position. Shape = ( ``no_dumps``, 3, ``tot_no_ptcls``)
vel_data : numpy.ndarray
Particles' velocities. Shape = ( ``no_dumps``, 3, ``tot_no_ptcls``)
k_list : list
List of :math:`k` indices in each direction with corresponding magnitude and index of ``ka_counts``.
Shape=(``no_ka_values``, 5)
Returns
-------
vkt : numpy.ndarray
Array containing longitudinal velocity fluctuations.
vkt_i : numpy.ndarray
Array containing transverse velocity fluctuations in the :math:`x` direction.
vkt_j : numpy.ndarray
Array containing transverse velocity fluctuations in the :math:`y` direction.
vkt_k : numpy.ndarray
Array containing transverse velocity fluctuations in the :math:`z` direction.
"""
# Longitudinal
vk = zeros(len(k_list), dtype=complex128)
# Transverse
vk_i = zeros(len(k_list), dtype=complex128)
vk_j = zeros(len(k_list), dtype=complex128)
vk_k = zeros(len(k_list), dtype=complex128)
for ik, k_vec in enumerate(k_list):
# Calculate the dot product and cross product between k, r, and v
kr_i = 2.0 * pi * (k_vec[0] * pos_data[:, 0] + k_vec[1] * pos_data[:, 1] + k_vec[2] * pos_data[:, 2])
k_dot_v = 2.0 * pi * (k_vec[0] * vel_data[:, 0] + k_vec[1] * vel_data[:, 1] + k_vec[2] * vel_data[:, 2])
k_cross_v_i = 2.0 * pi * (k_vec[1] * vel_data[:, 2] - k_vec[2] * vel_data[:, 1])
k_cross_v_j = -2.0 * pi * (k_vec[0] * vel_data[:, 2] - k_vec[2] * vel_data[:, 0])
k_cross_v_k = 2.0 * pi * (k_vec[0] * vel_data[:, 1] - k_vec[1] * vel_data[:, 0])
# Microscopic longitudinal current
vk[ik] = (k_dot_v * exp(-1j * kr_i)).sum()
# Microscopic transverse current
vk_i[ik] = (k_cross_v_i * exp(-1j * kr_i)).sum()
vk_j[ik] = (k_cross_v_j * exp(-1j * kr_i)).sum()
vk_k[ik] = (k_cross_v_k * exp(-1j * kr_i)).sum()
return vk, vk_i, vk_j, vk_k
[docs]def calc_vkt(fldr, slices, dump_step, species_np, k_list, verbose):
r"""
Calculate the longitudinal and transverse velocities fluctuations of all species.
Longitudinal
.. math::
\lambda_A(\mathbf{k}, t) = \sum_i^{N_A} \mathbf{k} \cdot \mathbf{v}_{i}(t) \exp [ - i \mathbf{k} \cdot \mathbf{r}_{i}(t) ]
Transverse
.. math::
\\tau_A(\mathbf{k}, t) = \sum_i^{N_A} \mathbf{k} \\times \mathbf{v}_{i}(t) \exp [ - i \mathbf{k} \cdot \mathbf{r}_{i}(t) ]
where :math:`N_A` is the number of particles of species :math:`A`.
Parameters
----------
fldr : str
Name of folder containing particles data.
slices : tuple, int
Initial, final step number of the slice, number of steps per slice.
dump_step : int
Timestep interval saving.
species_np : numpy.ndarray
Number of particles of each species.
k_list : list
List of :math: `k` vectors.
Returns
-------
vkt : numpy.ndarray, complex
Longitudinal velocity fluctuations.
Shape = ( ``no_species``, ``no_dumps``, ``no_ka_values``)
vkt_perp_i : numpy.ndarray, complex
Transverse velocity fluctuations along the :math:`x` axis.
Shape = ( ``no_species``, ``no_dumps``, ``no_ka_values``)
vkt_perp_j : numpy.ndarray, complex
Transverse velocity fluctuations along the :math:`y` axis.
Shape = ( ``no_species``, ``no_dumps``, ``no_ka_values``)
vkt_perp_k : numpy.ndarray, complex
Transverse velocity fluctuations along the :math:`z` axis.
Shape = ( ``no_species``, ``no_dumps``, ``no_ka_values``)
"""
# Read particles' position for all times
no_dumps = slices[2]
vkt_par = zeros((len(species_np), no_dumps, len(k_list)), dtype=complex128)
vkt_perp_i = zeros((len(species_np), no_dumps, len(k_list)), dtype=complex128)
vkt_perp_j = zeros((len(species_np), no_dumps, len(k_list)), dtype=complex128)
vkt_perp_k = zeros((len(species_np), no_dumps, len(k_list)), dtype=complex128)
for it, dump in enumerate(
tqdm(range(slices[0], slices[1], dump_step), desc="Timestep", position=1, disable=not verbose, leave=False)
):
data = load_from_restart(fldr, dump)
pos = data["pos"]
vel = data["vel"]
sp_start = 0
sp_end = 0
for i, sp in enumerate(species_np):
sp_end += sp
vkt_par[i, it, :], vkt_perp_i[i, it, :], vkt_perp_j[i, it, :], vkt_perp_k[i, it, :] = calc_vk(
pos[sp_start:sp_end, :], vel[sp_start:sp_end], k_list
)
sp_start += sp
return vkt_par, vkt_perp_i, vkt_perp_j, vkt_perp_k
[docs]def grad_expansion(x, rms, h_coeff):
"""
Calculate the Grad expansion as given by eq.(5.97) in Liboff
Parameters
----------
x : numpy.ndarray
Array of the scaled velocities
rms : float
RMS width of the Gaussian.
h_coeff: numpy.ndarray
Hermite coefficients without the division by factorial.
Returns
-------
_ : numpy.ndarray
Grad expansion.
"""
gaussian = exp(-0.5 * (x / rms) ** 2) / (sqrt(2.0 * pi * rms**2))
herm_coef = h_coeff / [factorial(i) for i in range(len(h_coeff))]
hermite_series = hermite_e.hermeval(x, herm_coef)
return gaussian * hermite_series
[docs]def calculate_herm_coeff(v, distribution, maxpower):
r"""
Calculate Hermite coefficients by integrating the velocity distribution function. That is
.. math::
a_i = \int_{-\\infty}^{\infty} dv \, He_i(v)f(v)
Parameters
----------
v : numpy.ndarray
Range of velocities.
distribution: numpy.ndarray
Velocity histogram.
maxpower: int
Hermite order
Returns
-------
coeff: numpy.ndarray
Coefficients :math:`a_i`
"""
coeff = zeros(maxpower + 1)
for i in range(maxpower + 1):
hc = zeros(1 + i)
hc[-1] = 1.0
Hp = hermite_e.hermeval(v, hc)
coeff[i] = trapz(distribution * Hp, x=v)
return coeff
[docs]def kspace_setup(box_lengths, angle_averaging, max_k_harmonics, max_aa_harmonics):
"""
Calculate all allowed :math:`k` vectors.
Parameters
----------
max_k_harmonics : numpy.ndarray
Number of harmonics in each direction.
box_lengths : numpy.ndarray
Length of each box's side.
angle_averaging : str
Flag for calculating all the possible `k` vector directions and magnitudes. Default = 'principal_axis'
max_aa_harmonics : numpy.ndarray
Maximum `k` harmonics in each direction for angle average.
Returns
-------
k_arr : list
List of all possible combination of :math:`(n_x, n_y, n_z)` with their corresponding magnitudes and indexes.
k_counts : numpy.ndarray
Number of occurrences of each triplet :math:`(n_x, n_y, n_z)` magnitude.
k_unique : numpy.ndarray
Magnitude of the unique allowed triplet :math:`(n_x, n_y, n_z)`.
"""
if angle_averaging == "full":
# The first value of k_arr = [0, 0, 0]
first_non_zero = 1
# Obtain all possible permutations of the wave number arrays
k_arr = [
array([i / box_lengths[0], j / box_lengths[1], k / box_lengths[2]])
for i in range(max_k_harmonics[0] + 1)
for j in range(max_k_harmonics[1] + 1)
for k in range(max_k_harmonics[2] + 1)
]
harmonics = [
array([i, j, k], dtype=int)
for i in range(max_k_harmonics[0] + 1)
for j in range(max_k_harmonics[1] + 1)
for k in range(max_k_harmonics[2] + 1)
]
elif angle_averaging == "principal_axis":
# The first value of k_arr = [1, 0, 0]
first_non_zero = 0
# Calculate the k vectors along the principal axis only
k_arr = [array([i / box_lengths[0], 0, 0]) for i in range(1, max_k_harmonics[0] + 1)]
harmonics = [array([i, 0, 0], dtype=int) for i in range(1, max_k_harmonics[0] + 1)]
k_arr = np_append(k_arr, [array([0, i / box_lengths[1], 0]) for i in range(1, max_k_harmonics[1] + 1)], axis=0)
harmonics = np_append(harmonics, [array([0, i, 0], dtype=int) for i in range(1, max_k_harmonics[1] + 1)], axis=0)
k_arr = np_append(k_arr, [array([0, 0, i / box_lengths[2]]) for i in range(1, max_k_harmonics[2] + 1)], axis=0)
harmonics = np_append(harmonics, [array([0, 0, i], dtype=int) for i in range(1, max_k_harmonics[2] + 1)], axis=0)
elif angle_averaging == "custom":
# The first value of k_arr = [0, 0, 0]
first_non_zero = 1
# Obtain all possible permutations of the wave number arrays up to max_aa_harmonics included
k_arr = [
array([i / box_lengths[0], j / box_lengths[1], k / box_lengths[2]])
for i in range(max_aa_harmonics[0] + 1)
for j in range(max_aa_harmonics[1] + 1)
for k in range(max_aa_harmonics[2] + 1)
]
harmonics = [
array([i, j, k], dtype=int)
for i in range(max_aa_harmonics[0] + 1)
for j in range(max_aa_harmonics[1] + 1)
for k in range(max_aa_harmonics[2] + 1)
]
# Append the rest of k values calculated from principal axis
k_arr = np_append(
k_arr,
[array([i / box_lengths[0], 0, 0]) for i in range(max_aa_harmonics[0] + 1, max_k_harmonics[0] + 1)],
axis=0,
)
harmonics = np_append(
harmonics,
[array([i, 0, 0], dtype=int) for i in range(max_aa_harmonics[0] + 1, max_k_harmonics[0] + 1)],
axis=0,
)
k_arr = np_append(
k_arr,
[array([0, i / box_lengths[1], 0]) for i in range(max_aa_harmonics[1] + 1, max_k_harmonics[1] + 1)],
axis=0,
)
harmonics = np_append(
harmonics,
[array([0, i, 0], dtype=int) for i in range(max_aa_harmonics[1] + 1, max_k_harmonics[1] + 1)],
axis=0,
)
k_arr = np_append(
k_arr,
[array([0, 0, i / box_lengths[2]]) for i in range(max_aa_harmonics[2] + 1, max_k_harmonics[2] + 1)],
axis=0,
)
harmonics = np_append(
harmonics,
[array([0, 0, i], dtype=int) for i in range(max_aa_harmonics[2] + 1, max_k_harmonics[2] + 1)],
axis=0,
)
# Compute wave number magnitude - don't use |k| (skipping first entry in k_arr)
# The round off is needed to avoid ka value different beyond a certain significant digit. It will break other parts
# of the code otherwise.
k_mag = sqrt((array(k_arr) ** 2).sum(axis=1)[..., None])
harm_mag = sqrt((array(harmonics) ** 2).sum(axis=1)[..., None])
for i, k in enumerate(k_mag[:-1]):
if abs(k - k_mag[i + 1]) < 2.0e-5:
k_mag[i + 1] = k
# Add magnitude to wave number array
k_arr = concatenate((k_arr, k_mag), 1)
# Add magnitude to wave number array
harmonics = concatenate((harmonics, harm_mag), 1)
# Sort from lowest to highest magnitude
ind = argsort(k_arr[:, -1])
k_arr = k_arr[ind]
harmonics = harmonics[ind]
# Count how many times a |k| value appears
k_unique, k_counts = unique(k_arr[first_non_zero:, -1], return_counts=True)
# Generate a 1D array containing index to be used in S array
k_index = repeat(range(len(k_counts)), k_counts)[..., None]
# Add index to k_array
k_arr = concatenate((k_arr[int(first_non_zero) :, :], k_index), 1)
harmonics = concatenate((harmonics[int(first_non_zero) :, :], k_index), 1)
return k_arr, k_counts, k_unique, harmonics
[docs]def load_from_restart(fldr, it):
"""
Load particles' data from dumps.
Parameters
----------
fldr : str
Folder containing dumps.
it : int
Timestep to load.
Returns
-------
data : dict
Particles' data.
"""
file_name = os_path_join(fldr, "checkpoint_" + str(it) + ".npz")
data = load(file_name, allow_pickle=True)
return data
[docs]def plot_labels(xdata, ydata, xlbl, ylbl, units):
"""
Create plot labels with correct units and prefixes.
Parameters
----------
xdata: numpy.ndarray
X values.
ydata: numpy.ndarray
Y values.
xlbl: str
String of the X quantity.
ylbl: str
String of the Y quantity.
units: str
'cgs' or 'mks'.
Returns
-------
xmultiplier: float
Scaling factor for X data.
ymultiplier: float
Scaling factor for Y data.
xprefix: str
Prefix for X units label
yprefix: str
Prefix for Y units label.
xlabel: str
X label units.
ylabel: str
Y label units.
"""
if isinstance(xdata, (ndarray, Series)):
xmax = xdata.max()
else:
xmax = xdata
if isinstance(ydata, (ndarray, Series)):
ymax = ydata.max()
else:
ymax = ydata
# Find the correct Units
units_dict = UNITS[1] if units == "cgs" else UNITS[0]
if units == "cgs" and xlbl == "Length":
xmax *= 1e2
if units == "cgs" and ylbl == "Length":
ymax *= 1e2
# Use scientific notation. This returns a string
x_str = format_float_scientific(xmax)
y_str = format_float_scientific(ymax)
# Grab the exponent
x_exp = 10.0 ** (float(x_str[x_str.find("e") + 1 :]))
y_exp = 10.0 ** (float(y_str[y_str.find("e") + 1 :]))
# Find the units' prefix by looping over the possible values
xprefix = "none"
xmul = -1.5
# This is a 10 multiplier
i = 1.0
while xmul < 0:
for key, value in PREFIXES.items():
ratio = i * x_exp / value
if abs(ratio - 1) < 1.0e-6:
xprefix = key
xmul = 1 / value
i /= 10
# find the prefix
yprefix = "none"
ymul = -1.5
i = 1.0
while ymul < 0:
for key, value in PREFIXES.items():
ratio = i * y_exp / value
if abs(ratio - 1) < 1.0e-6:
yprefix = key
ymul = 1 / value
i /= 10.0
if "Energy" in ylbl:
yname = "Energy"
else:
yname = ylbl
if "Pressure" in ylbl:
yname = "Pressure"
else:
yname = ylbl
if yname in units_dict:
ylabel = " [" + yprefix + units_dict[yname] + "]"
else:
ylabel = ""
if "Energy" in xlbl:
xname = "Energy"
else:
xname = xlbl
if "Pressure" in xlbl:
xname = "Pressure"
else:
xname = xlbl
if xname in units_dict:
xlabel = " [" + xprefix + units_dict[xname] + "]"
else:
xlabel = ""
return xmul, ymul, xprefix, yprefix, xlabel, ylabel
[docs]def col_mapper(keys, vals):
return dict(zip(keys, vals))
[docs]def make_gaussian_plot(time, data, observable_name, units):
fig = plt.figure(figsize=(19, 6))
gs = GridSpec(4, 8)
main_plot = fig.add_subplot(gs[1:4, 0:3])
delta_plot = fig.add_subplot(gs[0, 0:3], sharex=main_plot)
hist_plot = fig.add_subplot(gs[1:4, 3], sharey=main_plot)
# Remove the label from delta and hist plots
delta_plot.tick_params(axis="x", labelbottom=False)
hist_plot.tick_params(axis="y", labelleft=False)
# Grab the color line list from the plt cycler. I will use this in the hist plots
color_from_cycler = plt.rcParams["axes.prop_cycle"].by_key()["color"]
# Calculate plot's labels and multipliers
time_mul, temp_mul, time_prefix, temp_prefix, time_lbl, temp_lbl = plot_labels(
time, data, "Time", observable_name, units
)
# Rescale quantities
time = time_mul * time
data_plot = temp_mul * data
data_mean = data_plot.mean()
data_std = data_plot.std()
# moving average
data_cumavg = data_plot.expanding().mean()
# deviation and its moving average
delta_data = (data_plot - data_mean) * 100 / data_mean
delta_data_cum_avg = delta_data.expanding().mean()
# Temperature Main plot
main_plot.plot(time, data_plot, alpha=0.7)
main_plot.plot(time, data_cumavg, label="Moving Average")
main_plot.axhline(data_mean, ls="--", c="r", alpha=0.7, label="Mean")
main_plot.legend(loc="best")
main_plot.set(ylabel=f"{observable_name} {temp_lbl}", xlabel=f"Time {time_lbl}")
# Temperature Deviation plot
delta_plot.plot(time, delta_data, alpha=0.5)
delta_plot.plot(time, delta_data_cum_avg, alpha=0.8)
delta_plot.set(ylabel=r"Deviation [%]")
dist_desired = scp_stats.norm(loc=data_mean, scale=data_std)
# Histogram plot
# sns_histplot(y=Temperature, bins="auto", stat="density", alpha=0.75, legend="False", ax=T_hist_plot)
hist_plot.hist(data_plot, bins="fd", density=True, alpha=0.75, orientation="horizontal")
hist_plot.plot(dist_desired.pdf(data_plot.sort_values()), data_plot.sort_values(), alpha=0.7, label="Gaussian")
hist_plot.grid(False)
hist_plot.legend()
return fig, (delta_plot, main_plot, hist_plot)