"""
Transport Module.
"""
import inspect
from copy import deepcopy
from IPython import get_ipython
if get_ipython().__class__.__name__ == "ZMQInteractiveShell":
from tqdm import tqdm_notebook as tqdm
else:
from tqdm import tqdm
import sys
from matplotlib.pyplot import subplots
from numpy import array, column_stack, ndarray, pi, trapz
from os import mkdir as os_mkdir
from os import remove as os_remove
from os.path import exists as os_path_exists
from os.path import join as os_path_join
from pandas import DataFrame, MultiIndex, read_hdf
from warnings import warn
from ..utilities.io import print_to_logger
from ..utilities.maths import fast_integral_loop
from ..utilities.misc import add_col_to_df
from ..utilities.timing import datetime_stamp, SarkasTimer
# Sarkas Modules
from .observables import plot_labels, Thermodynamics
[docs]class TransportCoefficients:
"""Transport Coefficients parent."""
[docs] def __init__(self):
self.time_array = None
self.dataframe = None
self.dataframe_slices = None
self.saving_dir = None
#
# To be copied from parameters class
self.postprocessing_dir = None
self.units = None
self.job_id = None
self.verbose = None
self.dt = None
self.units_dict = None
self.total_plasma_frequency = None
self.dimensions = None
self.box_volume = None
self.pbox_volume = None
self.phase = None
self.no_slices = None
self.acf_slice_steps = None
self.dump_step = None
self.kB = 0.0
self.beta_slice = 0.0
#
self.log_file = None
self.df_fnames = {}
self.timer = SarkasTimer()
[docs] def setup(self, params, observable):
"""
Parameters
----------
params: :class:`sarkas.core.Parameters`
Simulation parameters.
observable: :class:`sarkas.tools.observables.Observable`
Observable class
"""
#
self.copy_params(params=params)
self.postprocessing_dir = self.directory_tree["postprocessing"]["path"]
self.get_observable_data(observable)
self.calculate_average_temperature(params)
self.make_directories()
self.create_df_filenames()
self.log_file = os_path_join(self.saving_dir, f"{self.__name__}_logfile.out")
datetime_stamp(self.log_file)
self.pretty_print()
[docs] def copy_params(self, params):
for i, val in params.__dict__.items():
if not inspect.ismethod(val):
if isinstance(val, dict):
self.__dict__[i] = deepcopy(val)
elif isinstance(val, ndarray):
self.__dict__[i] = val.copy()
else:
self.__dict__[i] = val
def __repr__(self):
sortedDict = dict(sorted(self.__dict__.items(), key=lambda x: x[0].lower()))
disp = f"{self.__name__}( \n"
for key, value in sortedDict.items():
disp += "\t{} : {}\n".format(key, value)
disp += ")"
return disp
[docs] def calculate_average_temperature(self, params):
"""
Calculate the average temperature from the :class:`sarkas.tools.observables.Thermodynamics` data. It updates the :attr:`beta_slice` attribute.
Parameters
----------
params : :class:`sarkas.core.Parameters`
Simulation parameters.
"""
energy = Thermodynamics()
energy.setup(params, self.phase, self.no_slices)
energy.parse(acf_data=False)
energy.calculate_beta_slices()
# self.T_avg = energy.dataframe["Temperature"].mean()
self.beta_slice = energy.beta_slice.copy()
[docs] def initialize_dataframes(self, observable):
"""
Grab observables autocorrelation data and initialize the dataframes where to store the data.
Parameters
----------
observable : :class:`sarkas.tools.observables.Observable`
Observable object containing the ACF whose time integral leads to the desired transport coefficient.
"""
time_col_name = observable.dataframe_acf.columns[0]
self.time_array = observable.dataframe_acf[time_col_name].values
self.dataframe = DataFrame()
self.dataframe_slices = DataFrame()
self.dataframe["Integration_Interval"] = self.time_array.copy()
self.dataframe_slices["Integration_Interval"] = self.time_array.copy()
[docs] def create_df_filenames(self):
"""Create paths of the filenames of the dataframes."""
fnames = {}
fnames["dataframe_slices"] = os_path_join(self.saving_dir, f"{self.__name__}_slices_{self.job_id}.h5")
fnames["dataframe"] = os_path_join(self.saving_dir, f"{self.__name__}_{self.job_id}.h5")
self.df_fnames = fnames
[docs] def make_directories(self):
"""Create directories where to save the transport coefficients."""
transport_dir = os_path_join(self.postprocessing_dir, "TransportCoefficients")
if not os_path_exists(transport_dir):
os_mkdir(transport_dir)
coeff_dir = os_path_join(transport_dir, self.__name__)
if not os_path_exists(coeff_dir):
os_mkdir(coeff_dir)
self.saving_dir = os_path_join(coeff_dir, self.phase.capitalize())
if not os_path_exists(self.saving_dir):
os_mkdir(self.saving_dir)
[docs] def diffusion(self, observable, plot: bool = True, display_plot: bool = False):
"""
Calculate the transport coefficient from the Green-Kubo formula.
Raises
------
: DeprecationWarning
"""
warn(
"Deprecated feature. It will be removed in a future release.\nEach transport coefficient is now a class. Create an instance of the class Diffusion and then pass the same parameters to `Diffusion.compute()`.",
category=DeprecationWarning,
)
[docs] def electrical_conductivity(
self,
observable,
plot: bool = True,
display_plot: bool = False,
):
"""
Calculate the transport coefficient from the Green-Kubo formula.
Raises
------
: DeprecationWarning
"""
warn(
"Deprecated feature. It will be removed in a future release.\nEach transport coefficient is now a class. Create an instance of the class `ElectricalConductivity` and then pass the same parameters to `ElectricalConductivity.compute()`.",
category=DeprecationWarning,
)
[docs] def interdiffusion(self, observable, plot: bool = True, display_plot: bool = False):
"""
Calculate the transport coefficient from the Green-Kubo formula
Raises
------
: DeprecationWarning
"""
warn(
"Deprecated feature. It will be removed in a future release.\nEach transport coefficient is now a class. Create an instance of the class InterDiffusion and then pass the same parameters to `InterDiffusion.compute()`.",
category=DeprecationWarning,
)
[docs] def viscosity(self, observable, plot: bool = True, display_plot: bool = False):
"""
Calculate the transport coefficient from the Green-Kubo formula
Raises
------
: DeprecationWarning
"""
warn(
"Deprecated feature. It will be removed in a future release.\nEach transport coefficient is now a class. Create an instance of the class Viscosity and then pass the same parameters to `Viscosity.compute()`.",
category=DeprecationWarning,
)
[docs] def get_observable_data(self, observable):
"""Grab the autocorrelation function datasets by calling the observable's :meth:`parse` method.
Parameters
----------
observable: :class:`sarkas.tools.observables.Observable`
Observable class.
"""
# Check that the phase and no_slices is the same from the one computed in the Observable
observable.parse_acf()
self.phase = observable.phase
self.no_slices = observable.no_slices
self.acf_slice_steps = observable.acf_slice_steps
self.dump_step = observable.dump_step
[docs] def parse(self, observable):
"""Read the HDF files containing the transport coefficients.
Parameters
----------
observable : :class:`sarkas.tools.observables.Observable`
Observable object containing the ACF whose time integral leads to the electrical conductivity.
"""
# Copy relevant info
self.phase = observable.phase
self.no_slices = observable.no_slices
self.acf_slice_steps = observable.acf_slice_steps
self.dump_step = observable.dump_step
self.dataframe = read_hdf(os_path_join(self.saving_dir, self.df_fnames["dataframe"]), mode="r", index_col=False)
self.dataframe_slices = read_hdf(
os_path_join(self.saving_dir, self.df_fnames["dataframe_slices"]), mode="r", index_col=False
)
# Print some info
self.pretty_print()
[docs] def plot_tc(self, time, acf_data, tc_data, acf_name, tc_name, figname, show: bool = False):
"""
Make dual plots with ACF and transport coefficient.
Parameters
----------
time : numpy.ndarray
Time array.
acf_data: numpy.ndarray
Mean and Std of the ACF. \n
Shape = (:attr:`sarkas.tools.observables.Observable.acf_slice_steps`, 2).
tc_data: numpy.ndarray
Mean and Std of the transport coefficient. \n
Shape = (:attr:`sarkas.tools.observables.Observable.acf_slice_steps`, 2).
acf_name: str
y-Label of the ACF plot.
tc_name: str
y-label of the transport coefficient plot.
figname: str
Name with which to save the plot.
show: bool
Flag for displaying the plot if using IPython or terminal.
Returns
-------
fig : :class:`matplotlib.figure.Figure`
Figure object.
(ax1, ax2, ax3, ax4) : tuple
Tuple with the axes handles. \n
ax1 = ACF axes, ax2 = transport coefficient axes, ax3 = ax1.twiny(), ax4 = ax2.twiny()
"""
# Make the plot
fig, (ax1, ax2) = subplots(1, 2, figsize=(16, 7))
ax3 = ax1.twiny()
ax4 = ax2.twiny()
# Calculate axis multipliers and labels
xmul, ymul, _, _, xlbl, ylbl = plot_labels(time, tc_data[:, 0], "Time", tc_name, self.units)
# ACF
ax1.plot(xmul * time, acf_data[:, 0] / acf_data[0, 0])
ax1.fill_between(
xmul * time,
(acf_data[:, 0] - acf_data[:, 1]) / (acf_data[0, 0] - acf_data[0, 1]),
(acf_data[:, 0] + acf_data[:, 1]) / (acf_data[0, 0] + acf_data[0, 1]),
alpha=0.2,
)
# Coefficient
ax2.plot(xmul * time, ymul * tc_data[:, 0])
ax2.fill_between(
xmul * time, ymul * (tc_data[:, 0] - tc_data[:, 1]), ymul * (tc_data[:, 0] + tc_data[:, 1]), alpha=0.2
)
xlims = (xmul * time[1], xmul * time[-1] * 1.5)
ax1.set(xlim=xlims, xscale="log", ylim=(-0.5, 1.1), ylabel=acf_name, xlabel=r"Time difference" + xlbl)
xlims = (xmul * time[1], xmul * time[-1] * 1.05)
ax2.set(xlim=xlims, ylim=(-0.05, ax2.get_ylim()[1]), ylabel=tc_name + ylbl, xlabel=r"$\tau$" + xlbl, xscale="log")
# ax1.legend(loc='best')
# ax2.legend(loc='best')
# Finish the index axes
ax3.set(xlim=(1, len(time) * 1.5), xscale="log")
ax4.set(xlim=(1, len(time) * 1.5), xscale="log")
for axi in [ax3, ax4]:
axi.grid(alpha=0.1)
axi.set(xlabel="Index")
fig.tight_layout()
fig.savefig(os_path_join(self.saving_dir, figname))
if show:
fig.show()
return fig, (ax1, ax2, ax3, ax4)
[docs] def pretty_print_msg(self):
"""Print to screen the location where data is stored and other relevant information."""
tc_name = self.__long_name__
# Create the message to print
dtau = self.dt * self.dump_step
tau = dtau * self.acf_slice_steps
t_wp = 2.0 * pi / self.total_plasma_frequency # Plasma period
tau_wp = int(tau / t_wp)
msg = (
f"\n\n{tc_name:=^70}\n"
f"Data saved in: \n {self.df_fnames['dataframe_slices']} \n {self.df_fnames['dataframe_slices']} \n"
f"No. of slices = {self.no_slices}\n"
f"No. dumps per slice = {int(self.acf_slice_steps)}\n"
f"Total time interval of autocorrelation function: tau = {tau:.4e} {self.units_dict['time']} ~ {tau_wp} plasma periods\n"
f"Time interval step: dtau = {dtau:.4e} ~ {dtau / t_wp:.4e} plasma period"
)
return msg
[docs] def pretty_print(self):
msg = self.pretty_print_msg()
# Print the message to log file and screen
print_to_logger(message=msg, log_file=self.log_file, print_to_screen=self.verbose)
[docs] def save_hdf(self):
"""Save the HDF dataframes to disk in the TransportCoefficient folder."""
# TODO: Fix this hack. We should be able to add data to HDF instead of removing it and rewriting it.
# Save the data.
if os_path_exists(self.df_fnames["dataframe_slices"]):
os_remove(self.df_fnames["dataframe_slices"])
self.dataframe_slices.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.dataframe_slices.columns]
)
self.dataframe_slices = self.dataframe_slices.sort_index()
self.dataframe_slices.to_hdf(self.df_fnames["dataframe_slices"], mode="w", key=self.__name__, index=False)
# Save the data.
if os_path_exists(self.df_fnames["dataframe"]):
os_remove(self.df_fnames["dataframe"])
self.dataframe.columns = MultiIndex.from_tuples([tuple(c.split("_")) for c in self.dataframe.columns])
self.dataframe = self.dataframe.sort_index()
self.dataframe.to_hdf(self.df_fnames["dataframe"], mode="w", key=self.__name__, index=False)
[docs] def time_stamp(self, message: str, timing: tuple):
"""
Print out to screen elapsed times. If verbose output, print to file first and then to screen.
Parameters
----------
message : str
Message to print.
timing : tuple
Time in hrs, min, sec, msec, usec, nsec.
"""
screen = sys.stdout
f_log = open(self.log_file, "a+")
repeat = 2 if self.verbose else 1
t_hrs, t_min, t_sec, t_msec, t_usec, t_nsec = timing
# redirect printing to file
sys.stdout = f_log
while repeat > 0:
if t_hrs == 0 and t_min == 0 and t_sec <= 2:
print(f"\n{message} Time: {int(t_sec)} sec {int(t_msec)} msec {int(t_usec)} usec {int(t_nsec)} nsec")
else:
print(f"\n{message} Time: {int(t_hrs)} hrs {int(t_min)} min {int(t_sec)} sec")
repeat -= 1
sys.stdout = screen
f_log.close()
[docs]class Diffusion(TransportCoefficients):
"""
The diffusion coefficient is calculated from the Green-Kubo formula
.. math::
D_{\\alpha} = \\frac{1}{3 N_{\\alpha}} \\sum_{i}^{N_{\\alpha}} \\int_0^{\\tau} dt \\,
\\langle \\mathbf v^{(\\alpha)}_{i}(t) \\cdot \\mathbf v^{(\\alpha)}_{i}(0) \\rangle.
where :math:`\\mathbf v_{i}^{(\\alpha)}(t)` is the velocity of particle :math:`i` of species
:math:`\\alpha`. Notice that the diffusion coefficient is averaged over all :math:`N_{\\alpha}` particles.
Data is retrievable at :attr:`~.dataframe` and :attr:`~.dataframe_slices`.
"""
[docs] def __init__(self):
self.__name__ = "Diffusion"
self.__long_name__ = "Diffusion Coefficients"
self.required_observable = "Velocity Autocorrelation Function"
super().__init__()
[docs] def compute(self, observable, plot: bool = True, display_plot: bool = False):
"""
Calculate the transport coefficient from the Green-Kubo formula.
Parameters
----------
observable : :class:`sarkas.tools.observables.VelocityAutoCorrelationFunction`
Observable object containing the ACF whose time integral leads to the self diffusion coefficient.
plot : bool, optional
Flag for making the dual plot of the ACF and transport coefficient. Default = True.
display_plot : bool, optional
Flag for displaying the plot if using the IPython. Default = False.
"""
# Write Log File
observable.parse_acf()
self.initialize_dataframes(observable)
vacf_str = "VACF"
const = 1.0 / self.dimensions
t0 = self.timer.current()
if not observable.magnetized:
# Loop over time slices
for isl in tqdm(range(self.no_slices), disable=not observable.verbose):
# Iterate over the number of species
for i, sp in enumerate(observable.species_names):
sp_vacf_str = f"{sp} " + vacf_str
# Grab vacf data of each slice
integrand = observable.dataframe_acf_slices[(sp_vacf_str, "Total", f"slice {isl}")].values
df_str = f"{sp} Diffusion_slice {isl}"
self.dataframe_slices[df_str] = const * fast_integral_loop(time=self.time_array, integrand=integrand)
# Average and std of each diffusion coefficient.
for isp, sp in enumerate(observable.species_names):
col_str = [f"{sp} Diffusion_slice {isl}" for isl in range(observable.no_slices)]
# Mean
col_data = self.dataframe_slices[col_str].mean(axis=1).values
col_name = f"{sp} Diffusion_Mean"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Std
col_data = self.dataframe_slices[col_str].std(axis=1).values
col_name = f"{sp} Diffusion_Std"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
else:
# Loop over time slices
for isl in tqdm(range(observable.no_slices), disable=not observable.verbose):
# Iterate over the number of species
for i, sp in enumerate(observable.species_names):
sp_vacf_str = f"{sp} " + vacf_str
# Parallel
par_vacf_str = (sp_vacf_str, "Z", f"slice {isl}")
integrand_par = observable.dataframe_acf_slices[par_vacf_str].to_numpy()
col_data = fast_integral_loop(time=self.time_array, integrand=integrand_par)
col_name = f"{sp} Diffusion_Parallel_slice {isl}"
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
# Perpendicular
x_vacf_str = (sp_vacf_str, "X", f"slice {isl}")
y_vacf_str = (sp_vacf_str, "Y", f"slice {isl}")
integrand_perp = 0.5 * (
observable.dataframe_acf_slices[x_vacf_str].to_numpy()
+ observable.dataframe_acf_slices[y_vacf_str].to_numpy()
)
col_data = fast_integral_loop(time=self.time_array, integrand=integrand_perp)
col_name = f"{sp} Diffusion_Perpendicular_slice {isl}"
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
# Add the average and std of perp and par VACF to its dataframe
for isp, sp in enumerate(observable.species_names):
sp_vacf_str = f"{sp} " + vacf_str
sp_diff_str = f"{sp} " + "Diffusion"
par_col_str = [(sp_vacf_str, "Z", f"slice {isl}") for isl in range(self.no_slices)]
observable.dataframe_acf[(sp_vacf_str, "Parallel", "Mean")] = observable.dataframe_acf_slices[
par_col_str
].mean(axis=1)
observable.dataframe_acf[(sp_vacf_str, "Parallel", "Std")] = observable.dataframe_acf_slices[
par_col_str
].std(axis=1)
x_col_str = [(sp_vacf_str, "X", f"slice {isl}") for isl in range(self.no_slices)]
y_col_str = [(sp_vacf_str, "Y", f"slice {isl}") for isl in range(self.no_slices)]
perp_vacf = 0.5 * (
observable.dataframe_acf_slices[x_col_str].to_numpy()
+ observable.dataframe_acf_slices[y_col_str].to_numpy()
)
observable.dataframe_acf[(sp_vacf_str, "Perpendicular", "Mean")] = perp_vacf.mean(axis=1)
observable.dataframe_acf[(sp_vacf_str, "Perpendicular", "Std")] = perp_vacf.std(axis=1)
# Average and std of each diffusion coefficient.
par_col_str = [sp_diff_str + f"_Parallel_slice {isl}" for isl in range(self.no_slices)]
perp_col_str = [sp_diff_str + f"_Perpendicular_slice {isl}" for isl in range(self.no_slices)]
# Mean
col_data = self.dataframe_slices[par_col_str].mean(axis=1).values
col_name = sp_diff_str + "_Parallel_Mean"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Std
col_data = self.dataframe_slices[par_col_str].std(axis=1).values
col_name = sp_diff_str + "_Parallel_Std"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Mean
col_data = self.dataframe_slices[perp_col_str].mean(axis=1).values
col_name = sp_diff_str + "_Perpendicular_Mean"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Std
col_data = self.dataframe_slices[perp_col_str].std(axis=1).values
col_name = sp_diff_str + "_Perpendicular_Std"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Save the updated dataframe
observable.save_hdf()
# Endif magnetized.
# Time stamp
tend = self.timer.current()
self.time_stamp("Diffusion Calculation", self.timer.time_division(tend - t0))
# Save
self.save_hdf()
# Plot
if plot:
_, _ = self.plot(observable, display_plot=display_plot)
[docs] def plot(self, observable, display_plot: bool = False):
"""Make a dual plot comparing the ACF and the Transport Coefficient by using the :meth:`plot_tc` method.
Parameters
----------
observable : :class:`sarkas.tools.observables.VelocityAutoCorrelationFunction`
Observable object containing the ACF whose time integral leads to the self diffusion coefficient.
display_plot : bool, optional
Flag for displaying the plot if using the IPython. Default = False.
Return
------
figs: dict
Dictionary of `matplotlib` figure handles for each species. If the system is magnetized then it returns a nested dictionary.
Each `figs[species_name]` is a dictionary with keys `Parallel` and `Perpendicular`.
axes: dict,
Dictionary of tuples containing the axes handles for each element of `figs`. Each element of `axes` is a tuple of four axes handles.
'ax1` and `ax2` are the handles for the left and right plots respectively.
`ax3` and `ax4` are the handles for the "Index" axes, created from `ax1.twiny()` and `ax2.twiny()` respectively.\n
If the system is magnetized, then it returns a nested dictionary.
Each `axes[species_name]` is a dictionary with keys `Parallel` and `Perpendicular` each containing a tuple of four axes handles.
"""
vacf_str = "VACF"
figs = {}
axes = {}
if observable.magnetized:
for isp, sp in enumerate(observable.species_names):
sp_vacf_str = f"{sp} " + vacf_str
sp_diff_str = f"{sp} Diffusion"
# Parallel
acf_avg = observable.dataframe_acf[(sp_vacf_str, "Parallel", "Mean")].to_numpy()
acf_std = observable.dataframe_acf[(sp_vacf_str, "Parallel", "Std")].to_numpy()
tc_avg = self.dataframe[(sp_diff_str, "Parallel", "Mean")].to_numpy()
tc_std = self.dataframe[(sp_diff_str, "Parallel", "Std")].to_numpy()
fig, (ax1, ax2, ax3, ax4) = self.plot_tc(
time=self.time_array,
acf_data=column_stack((acf_avg, acf_std)),
tc_data=column_stack((tc_avg, tc_std)),
acf_name=sp_vacf_str + " Parallel",
tc_name=sp_diff_str + " Parallel",
figname=f"{sp}_Parallel_Diffusion_Plot.png",
show=display_plot,
)
figs[sp] = {"Parallel": fig}
axes[sp] = {"Parallel": (ax1, ax2, ax3, ax4)}
# Perpendicular
acf_avg = observable.dataframe_acf[(sp_vacf_str, "Perpendicular", "Mean")]
acf_std = observable.dataframe_acf[(sp_vacf_str, "Perpendicular", "Std")]
tc_avg = self.dataframe[(sp_diff_str, "Perpendicular", "Mean")]
tc_std = self.dataframe[(sp_diff_str, "Perpendicular", "Std")]
fig, (ax1, ax2, ax3, ax4) = self.plot_tc(
time=self.time_array,
acf_data=column_stack((acf_avg, acf_std)),
tc_data=column_stack((tc_avg, tc_std)),
acf_name=sp_vacf_str + " Perpendicular",
tc_name=sp_diff_str + " Perpendicular",
figname=f"{sp}_Perpendicular_Diffusion_Plot.png",
show=display_plot,
)
figs[sp]["Perpendicular"] = fig
axes[sp]["Perpendicular"] = (ax1, ax2, ax3, ax4)
else:
for isp, sp in enumerate(observable.species_names):
sp_vacf_str = f"{sp} " + vacf_str
acf_avg = observable.dataframe_acf[(sp_vacf_str, "Total", "Mean")].to_numpy()
acf_std = observable.dataframe_acf[(sp_vacf_str, "Total", "Std")].to_numpy()
d_str = f"{sp} Diffusion"
tc_avg = self.dataframe[(d_str, "Mean")].to_numpy()
tc_std = self.dataframe[(d_str, "Std")].to_numpy()
fig, (ax1, ax2, ax3, ax4) = self.plot_tc(
time=self.time_array,
acf_data=column_stack((acf_avg, acf_std)),
tc_data=column_stack((tc_avg, tc_std)),
acf_name=sp_vacf_str,
tc_name=d_str,
figname=f"{sp}_Diffusion_Plot.png",
show=display_plot,
)
figs[sp] = fig
axes[sp] = (ax1, ax2, ax3, ax4)
return figs, axes
[docs]class InterDiffusion(TransportCoefficients):
"""
The interdiffusion coefficient is calculated from the Green-Kubo formula
.. math::
D_{\\alpha} = \\frac{1}{3Nx_1x_2} \\int_0^\\tau dt
\\langle \\mathbf {J}_{\\alpha}(0) \\cdot \\mathbf {J}_{\\alpha}(t) \\rangle,
where :math:`x_{1,2}` are the concentration of the two species and
:math:`\\mathbf {J}_{\\alpha}(t)` is the diffusion current calculated by the
:class:`sarkas.tools.observables.DiffusionFlux` class.
Data is retrievable at :attr:`~.dataframe` and :attr:`~.dataframe_slices`.
"""
[docs] def __init__(self):
self.__name__ = "InterDiffusion"
self.__long_name__ = "InterDiffusion Coefficients"
self.required_observable = "Diffusion Flux"
super().__init__()
[docs] def compute(self, observable, plot: bool = True, display_plot: bool = False):
"""
Calculate the transport coefficient from the Green-Kubo formula
Parameters
----------
observable : :class:`sarkas.tools.observables.DiffusionFlux`
Observable object containing the ACF whose time integral leads to the interdiffusion coefficient.
plot : bool, optional
Flag for making the dual plot of the ACF and transport coefficient. Default = True.
display_plot : bool, optional
Flag for displaying the plot if using the IPython. Default = False
"""
observable.parse_acf()
self.initialize_dataframes(observable)
no_fluxes_acf = observable.no_fluxes_acf
# Normalization constant
const = 1.0 / (3.0 * observable.total_num_ptcls * observable.species_concentrations.prod())
df_str = "Diffusion Flux ACF"
id_str = "Inter Diffusion Flux"
# Time
t0 = self.timer.current()
for isl in tqdm(range(self.no_slices), disable=not self.verbose):
# D_ij = zeros((no_fluxes_acf, jc_acf.acf_slice_steps))
for ij in range(no_fluxes_acf):
acf_df_str = (df_str + f" {ij}", "Total", f"slice {isl}")
integrand = observable.dataframe_acf_slices[acf_df_str].to_numpy()
col_data = const * fast_integral_loop(time=self.time_array, integrand=integrand)
col_name = id_str + f" {ij}_slice {isl}"
self.dataframe_slices = add_col_to_df(observable, col_data, col_name)
# Average and Std of slices
for ij in range(no_fluxes_acf):
col_str = [id_str + f" {ij}_slice {isl}" for isl in range(self.no_slices)]
# Mean
col_data = self.dataframe_slices[col_str].mean(axis=1).values
col_name = id_str + f" {ij}_Mean"
self.dataframe = add_col_to_df(observable, col_data, col_name)
# Mean
col_data = self.dataframe_slices[col_str].std(axis=1).values
col_name = id_str + f" {ij}_Std"
self.dataframe = add_col_to_df(observable, col_data, col_name)
# Time stamp
tend = self.timer.current()
self.time_stamp("Interdiffusion Calculation", self.timer.time_division(tend - t0))
# Save
self.save_hdf()
# Plot
if plot:
_, _ = self.plot(observable, display_plot=display_plot)
[docs] def plot(self, observable, display_plot: bool = False):
"""Make a dual plot comparing the ACF and the Transport Coefficient by using the :meth:`plot_tc` method.
Parameters
----------
observable : :class:`sarkas.tools.observables.DiffusionFlux`
Observable object containing the ACF whose time integral leads to the self diffusion coefficient.
display_plot : bool, optional
Flag for displaying the plot if using the IPython. Default = False.
Return
------
figs: dict
Dictionary of `matplotlib` figure handles for each flux. Keys are `flux_0`, `flux_1`, etc..
axes: dict
Dictionary of tuples containing the axes handles for each element of `figs`. Keys are the same as in `figs`. Each element of `axes` is a tuple of four axes handles. 'ax1` and `ax2` are the handles for the left and right plots respectively.
`ax3` and `ax4` are the handles for the "Index" axes, created from `ax1.twiny()` and `ax2.twiny()` respectively.\n
"""
df_str = "Diffusion Flux ACF"
id_str = "Inter Diffusion Coefficient Flux"
figs = {}
axes = {}
for flux in range(observable.no_fluxes_acf):
flux_str = f"{df_str} {flux}"
acf_avg = observable.dataframe_acf[(flux_str, "Total", "Mean")].to_numpy()
acf_std = observable.dataframe_acf[(flux_str, "Total", "Std")].to_numpy()
d_str = f"{id_str} {flux}"
tc_avg = self.dataframe[(d_str, "Mean")].to_numpy()
tc_std = self.dataframe[(d_str, "Std")].to_numpy()
fig, (ax1, ax2, ax3, ax4) = self.plot_tc(
time=self.time_array,
acf_data=column_stack((acf_avg, acf_std)),
tc_data=column_stack((tc_avg, tc_std)),
acf_name=flux_str,
tc_name=d_str,
figname=f"InterDiffusion_Flux{flux}_Plot.png",
show=display_plot,
)
figs[f"Flux_{flux}"] = fig
axes[f"Flux_{flux}"] = (ax1, ax2, ax3, ax4)
return figs, axes
[docs]class Viscosity(TransportCoefficients):
"""Viscosisty coefficients class.
The shear viscosity is obtained from the Green-Kubo formula
.. math::
\\eta = \\frac{\\beta V}{6} \\sum_{\\alpha} \\sum_{\\gamma \\neq \\alpha} \\int_0^{\\tau} dt \\,
\\left \\langle \\mathcal P_{\\alpha\\gamma}(t) \\mathcal P_{\\alpha\\gamma}(0) \\right \\rangle
where :math:`\\beta = 1/k_B T`, :math:`\\alpha,\\gamma = {x, y, z}` and
:math:`\\mathcal P_{\\alpha\\gamma}(t)` is the element of the Pressure Tensor calculated with
:class:`sarkas.tools.observables.PressureTensor`.
The bulk viscosity is obtained from
.. math::
\\eta_V = \\beta V \\int_0^{\\tau}dt \\,
\\left \\langle \\delta \\mathcal P(t) \\delta \\mathcal P(0) \\right \\rangle,
where
.. math::
\\delta \\mathcal P(t) = \\mathcal P(t) - \\left \\langle \\mathcal P \\right \\rangle
is the deviation of the scalar pressure.
Data is retrievable at :attr:`~.dataframe` and :attr:`~.dataframe_slices`.
"""
[docs] def __init__(self):
self.__name__ = "Viscosities"
self.__long_name__ = "Viscosity Coefficients"
self.required_observable = "Pressure Tensor"
super().__init__()
[docs] def compute(self, observable, plot: bool = True, display_plot: bool = False):
"""
Calculate the transport coefficient from the Green-Kubo formula.
Parameters
----------
observable : :class:`sarkas.tools.observables.PressureTensor`
Observable object containing the ACF whose time integral leads to the viscsosity coefficients.
plot : bool, optional
Flag for making the dual plot of the ACF and transport coefficient. Default = True.
display_plot : bool, optional
Flag for displaying the plot if using the IPython. Default = False
"""
observable.parse_acf()
self.initialize_dataframes(observable)
# Initialize Timer
t0 = self.timer.current()
if observable.kinetic_potential_division:
pt_str_list = [
"Pressure Tensor Kinetic ACF",
"Pressure Tensor Potential ACF",
"Pressure Tensor Kin-Pot ACF",
"Pressure Tensor Pot-Kin ACF",
"Pressure Tensor ACF",
]
eta_str_list = [
"Shear Viscosity Tensor Kinetic",
"Shear Viscosity Tensor Potential",
"Shear Viscosity Tensor Kin-Pot",
"Shear Viscosity Tensor Pot-Kin",
"Shear Viscosity Tensor",
]
else:
pt_str_list = ["Pressure Tensor ACF"]
eta_str_list = ["Shear Viscosity Tensor"]
start_steps = 0
end_steps = 0
time_steps = len(self.time_array)
for isl in tqdm(range(self.no_slices), disable=not observable.verbose):
end_steps += time_steps
const = observable.box_volume * self.beta_slice[isl]
# Calculate Bulk Viscosity
# It is calculated from the fluctuations of the pressure eq. 2.124a Allen & Tilsdeley
integrand = observable.dataframe_acf_slices[(f"Pressure Bulk ACF", f"slice {isl}")].to_numpy()
col_name = f"Bulk Viscosity_slice {isl}"
col_data = const * fast_integral_loop(self.time_array, integrand)
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
# Calculate the Shear Viscosity Elements
for iax, ax1 in enumerate(observable.dim_labels):
for _, ax2 in enumerate(observable.dim_labels[iax:], iax):
for _, (pt_str, eta_str) in enumerate(zip(pt_str_list, eta_str_list)):
pt_str_temp = (pt_str + f" {ax1}{ax2}{ax1}{ax2}", f"slice {isl}")
integrand = observable.dataframe_acf_slices[pt_str_temp].to_numpy()
col_name = eta_str + f" {ax1}{ax2}_slice {isl}"
col_data = const * fast_integral_loop(self.time_array, integrand)
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
start_steps += time_steps
# Now average the slices
col_str = [f"Bulk Viscosity_slice {isl}" for isl in range(observable.no_slices)]
col_name = "Bulk Viscosity_Mean"
col_data = self.dataframe_slices[col_str].mean(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
col_name = "Bulk Viscosity_Std"
col_data = self.dataframe_slices[col_str].std(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
for iax, ax1 in enumerate(observable.dim_labels):
for _, ax2 in enumerate(observable.dim_labels[iax + 1 :], iax + 1):
for _, eta_str in enumerate(eta_str_list):
col_str = [eta_str + f" {ax1}{ax2}_slice {isl}" for isl in range(observable.no_slices)]
col_name = eta_str + f" {ax1}{ax2}_Mean"
col_data = self.dataframe_slices[col_str].mean(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
col_name = eta_str + f" {ax1}{ax2}_Std"
col_data = self.dataframe_slices[col_str].std(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
list_coord = ["XY", "XZ", "YZ"]
col_str = [eta_str + f" {coord}_Mean" for coord in list_coord]
# Mean
col_data = self.dataframe[col_str].mean(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, "Shear Viscosity_Mean")
# Std
col_data = self.dataframe[col_str].std(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, "Shear Viscosity_Std")
# Time stamp
tend = self.timer.current()
self.time_stamp("Viscosities Calculation", self.timer.time_division(tend - t0))
# Save
self.save_hdf()
# Plot
if plot:
_, _ = self.plot(observable, display_plot=display_plot)
[docs] def plot(self, observable, display_plot: bool = False):
"""Make a dual plot comparing the ACF and the Transport Coefficient by using the :meth:`plot_tc` method.
Parameters
----------
observable : :class:`sarkas.tools.observables.PressureTensor`
Observable object containing the ACF whose time integral leads to the self diffusion coefficient.
display_plot : bool, optional
Flag for displaying the plot if using the IPython. Default = False.
Return
------
figs: list, :class:`matplotlib.pyplot.Figure`
List of `matplotlib` figure handles for the bulk and shear viscosity respectively.
axes: list,
List of tuples containing the axes handles for each element of `figs`. Each element of `axes` is a tuple of four axes handles. 'ax1` and `ax2` are the handles for the left and right plots respectively.
`ax3` and `ax4` are the handles for the "Index" axes, created from `ax1.twiny()` and `ax2.twiny()` respectively.\n
"""
# Plot
plot_quantities = ["Bulk Viscosity", "Shear Viscosity"]
shear_list_coord = ["XYXY", "XZXZ", "YZYZ"]
figs = []
axes = []
# Make the plot
for ipq, pq in enumerate(plot_quantities):
if pq == "Bulk Viscosity":
acf_str = "Pressure Bulk ACF"
acf_avg = observable.dataframe_acf[("Pressure Bulk ACF", "Mean")]
acf_std = observable.dataframe_acf[("Pressure Bulk ACF", "Std")]
elif pq == "Shear Viscosity":
# The axis are the last two elements in the string
acf_str = "Stress Tensors ACF"
acf_strs = [(f"Pressure Tensor ACF {coord}", "Mean") for coord in shear_list_coord]
acf_avg = observable.dataframe_acf[acf_strs].mean(axis=1)
acf_std = observable.dataframe_acf[acf_strs].std(axis=1)
tc_avg = self.dataframe[(pq, "Mean")]
tc_std = self.dataframe[(pq, "Std")]
fig, (ax1, ax2, ax3, ax4) = self.plot_tc(
time=self.time_array,
acf_data=column_stack((acf_avg, acf_std)),
tc_data=column_stack((tc_avg, tc_std)),
acf_name=acf_str,
tc_name=pq,
figname=f"{pq}_Plot.png",
show=display_plot,
)
figs.append(fig)
axes.append((ax1, ax2, ax3, ax4))
return figs, axes
[docs]class ElectricalConductivity(TransportCoefficients):
"""The electrical conductivity is calculated from the Green-Kubo formula
.. math::
\\sigma = \\frac{\\beta}{V} \\int_0^{\\tau} dt J(t).
where :math:`\\beta = 1/k_B T` and :math:`V` is the volume of the simulation box.
Data is retrievable at :attr:`~.dataframe` and :attr:`~.dataframe_slices`.
"""
[docs] def __init__(self):
self.__name__ = "ElectricalConductivity"
self.__long_name__ = "Electrical Conductivity"
self.required_observable = "Electric Current"
super().__init__()
[docs] def compute(
self,
observable,
plot: bool = True,
display_plot: bool = False,
):
"""
Calculate the transport coefficient from the Green-Kubo formula
Parameters
----------
observable : :class:`sarkas.tools.observables.ElectricCurrent`
Observable object containing the ACF whose time integral leads to the electrical conductivity.
plot : bool, optional
Flag for making the dual plot of the ACF and transport coefficient.\n
Default = True.
display_plot : bool, optional
Flag for displaying the plot if using the IPython. Default = False.
"""
observable.parse_acf()
self.initialize_dataframes(observable)
# Time
t0 = self.timer.current()
jc_str = "Electric Current ACF"
sigma_str = "Electrical Conductivity"
const = self.beta_slice / observable.box_volume
if not observable.magnetized:
for isl in tqdm(range(observable.no_slices), disable=not observable.verbose):
integrand = array(observable.dataframe_acf_slices[(jc_str, "Total", "slice {}".format(isl))])
col_name = sigma_str + "_slice {}".format(isl)
col_data = const[isl] * fast_integral_loop(self.time_array, integrand)
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
col_str = [sigma_str + "_slice {}".format(isl) for isl in range(observable.no_slices)]
# Mean
col_data = self.dataframe_slices[col_str].mean(axis=1).values
col_name = sigma_str + "_Mean"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Std
col_data = self.dataframe_slices[col_str].std(axis=1).values
col_name = sigma_str + "_Std"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
else:
for isl in tqdm(range(observable.no_slices), disable=not observable.verbose):
# Parallel
par_str = (jc_str, "Z", f"slice {isl}")
integrand = observable.dataframe_acf_slices[par_str].to_numpy()
col_data = const * fast_integral_loop(self.time_array, integrand)
col_name = sigma_str + f"_Parallel_slice {isl}"
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
# Perpendicular
x_col_str = (jc_str, "X", f"slice {isl}")
y_col_str = (jc_str, "Y", f"slice {isl}")
perp_integrand = 0.5 * (
observable.dataframe_acf_slices[x_col_str].to_numpy()
+ observable.dataframe_acf_slices[y_col_str].to_numpy()
)
col_name = sigma_str + f"_Perpendicular_slice {isl}"
col_data = const * fast_integral_loop(self.time_array, perp_integrand)
self.dataframe_slices = add_col_to_df(self.dataframe_slices, col_data, col_name)
par_col_str = [(jc_str, "Z", f"slice {isl}") for isl in range(self.no_slices)]
observable.dataframe_acf[(jc_str, "Parallel", "Mean")] = observable.dataframe_acf_slices[par_col_str].mean(
axis=1
)
observable.dataframe_acf[(jc_str, "Parallel", "Std")] = observable.dataframe_acf_slices[par_col_str].std(
axis=1
)
x_col_str = [(jc_str, "X", f"slice {isl}") for isl in range(self.no_slices)]
y_col_str = [(jc_str, "Y", f"slice {isl}") for isl in range(self.no_slices)]
perp_jc = 0.5 * (
observable.dataframe_acf_slices[x_col_str].to_numpy()
+ observable.dataframe_acf_slices[y_col_str].to_numpy()
)
observable.dataframe_acf[(jc_str, "Perpendicular", "Mean")] = perp_jc.mean(axis=1)
observable.dataframe_acf[(jc_str, "Perpendicular", "Std")] = perp_jc.std(axis=1)
# Save the updated dataframe
observable.save_hdf()
# Average and std of transport coefficient.
col_str = [sigma_str + f"_Parallel_slice {isl}".format(isl) for isl in range(observable.no_slices)]
# Mean
col_data = self.dataframe_slices[col_str].mean(axis=1).values
col_name = sigma_str + "_Parallel_Mean"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Std
col_data = self.dataframe_slices[col_str].std(axis=1).values
col_name = sigma_str + "_Parallel_Std"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Perpendicular
col_str = [sigma_str + f"_Perpendicular_slice {isl}" for isl in range(observable.no_slices)]
# Mean
col_data = sigma_str + "_Perpendicular_Mean"
col_name = self.dataframe_slices[col_str].mean(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Std
col_data = sigma_str + "_Perpendicular_Std"
col_name = self.dataframe_slices[col_str].std(axis=1).values
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Endif magnetized.
# Time stamp
tend = self.timer.current()
self.time_stamp(f"{self.__long_name__} Calculation", self.timer.time_division(tend - t0))
# Save
self.save_hdf()
# Plot
if plot:
_, _ = self.plot(observable, display_plot=display_plot)
[docs] def plot(self, observable, display_plot: bool = False):
"""Make a dual plot comparing the ACF and the Transport Coefficient by using the :meth:`plot_tc` method.
Parameters
----------
observable : :class:`sarkas.tools.observables.VelocityAutoCorrelationFunction`
Observable object containing the ACF whose time integral leads to the self diffusion coefficient.
display_plot : bool, optional
Flag for displaying the plot if using the IPython. Default = False.
Return
------
fig (fig_par, fig_perp) : :class:`matplotlib.pyplot.Figure`, tuple
Matplotlib figure handle. If the system is magnetized then it return a tuple with the handles for the parallel (`fig_par`) and perpendicular (`fig_perp`) figures.
(ax1, ax2, ax3, ax4), ((ax1_par, ax2_par, ax3_par, ax4_par), (ax1_perp, ax2_perp, ax3_perp, ax4_perp)): tuple, :class:`matplotlib.axes.Axes`
Tuple containing the axes handles for `fig`. 'ax1` and `ax2` are the handles for the left and right plots respectively.
`ax3` and `ax4` are the handles for the "Index" axes, created from `ax1.twiny()` and `ax2.twiny()` respectively.\n
If the system is magnetized then it returns a tuple of tuples whose elements are the axes handles of each figure.
"""
jc_str = "Electric Current ACF"
sigma_str = "Electrical Conductivity"
figs = []
axes = []
if not observable.magnetized:
acf_avg = observable.dataframe_acf[(jc_str, "Total", "Mean")].to_numpy()
acf_std = observable.dataframe_acf[(jc_str, "Total", "Std")].to_numpy()
tc_avg = self.dataframe[(sigma_str, "Mean")].to_numpy()
tc_std = self.dataframe[(sigma_str, "Std")].to_numpy()
fig, (ax1, ax2, ax3, ax4) = self.plot_tc(
time=self.time_array,
acf_data=column_stack((acf_avg, acf_std)),
tc_data=column_stack((tc_avg, tc_std)),
acf_name="Electric Current ACF",
tc_name="Electrical Conductivity",
figname="ElectricalConductivity_Plot.png",
show=display_plot,
)
figs.append[fig]
axes.append[(ax1, ax2, ax3, ax4)]
else:
acf_avg = observable.dataframe_acf[(jc_str, "Parallel", "Mean")].to_numpy()
acf_std = observable.dataframe_acf[(jc_str, "Parallel", "Std")].to_numpy()
tc_avg = self.dataframe[(sigma_str, "Parallel", "Mean")].to_numpy()
tc_std = self.dataframe[(sigma_str, "Parallel", "Std")].to_numpy()
fig, (ax1, ax2, ax3, ax4) = self.plot_tc(
time=self.time_array,
acf_data=column_stack((acf_avg, acf_std)),
tc_data=column_stack((tc_avg, tc_std)),
acf_name="Electric Current ACF Parallel",
tc_name="Electrical Conductivity Parallel",
figname="ElectricalConductivity_Parallel_Plot.png",
show=display_plot,
)
figs.append[fig]
axes.append[(ax1, ax2, ax3, ax4)]
acf_avg = observable.dataframe_acf[(jc_str, "Perpendicular", "Mean")].to_numpy()
acf_std = observable.dataframe_acf[(jc_str, "Perpendicular", "Std")].to_numpy()
tc_avg = self.dataframe[(sigma_str, "Perpendicular", "Mean")].to_numpy()
tc_std = self.dataframe[(sigma_str, "Perpendicular", "Std")].to_numpy()
fig, (ax1, ax2, ax3, ax4) = self.plot_tc(
time=self.time_array,
acf_data=column_stack((acf_avg, acf_std)),
tc_data=column_stack((tc_avg, tc_std)),
acf_name="Electric Current ACF Perpendicular",
tc_name="Electrical Conductivity Perpendicular",
figname="ElectricalConductivity_Perpendicular_Plot.png",
show=display_plot,
)
figs.append[fig]
axes.append[(ax1, ax2, ax3, ax4)]
return figs, axes
[docs]class ThermalConductivity(TransportCoefficients):
[docs] def __init__(self):
self.__name__ = "ThermalConductivity"
self.__long_name__ = "Thermal Conductivity"
self.required_observable = "Heat Flux"
super().__init__()
[docs] def compute(
self,
observable,
plot: bool = True,
display_plot: bool = False,
):
"""
Calculate the transport coefficient from the Green-Kubo formula.
Parameters
----------
observable : :class:`sarkas.tools.observables.PressureTensor`
Observable object containing the ACF whose time integral leads to the viscsosity coefficients.
plot : bool, optional
Flag for making the dual plot of the ACF and transport coefficient. Default = True.
display_plot : bool, optional
Flag for displaying the plot if using the IPython. Default = False
"""
observable.parse_acf()
self.initialize_dataframes(observable)
# Initialize Timer
t0 = self.timer.current()
const = self.kB * self.beta_slice**2 / self.box_volume
sp_vacf_str = f"{observable.__long_name__} ACF"
if self.num_species > 1:
species_list = [*observable.species_names, "all"]
else:
species_list = observable.species_names
# Loop over time slices
for isl in tqdm(range(self.no_slices), disable=not observable.verbose):
# Iterate over the number of species
for isp, sp1 in enumerate(species_list):
for _, sp2 in enumerate(species_list[isp:], isp):
# Grab vacf data of each slice
integrand = observable.dataframe_acf_slices[
(sp_vacf_str, f"{sp1}-{sp2}", "Total", f"slice {isl}")
].values
df_str = f"{self.__long_name__}_{sp1}-{sp2}_slice {isl}"
self.dataframe_slices[df_str] = const[isl] * fast_integral_loop(
time=self.time_array, integrand=integrand
)
# Average and std of each transport coefficient.
for isp, sp1 in enumerate(species_list):
for _, sp2 in enumerate(species_list[isp:], isp):
col_str = [f"{self.__long_name__}_{sp1}-{sp2}_slice {isl}" for isl in range(observable.no_slices)]
# Mean
col_data = self.dataframe_slices[col_str].mean(axis=1).values
col_name = f"{self.__long_name__}_{sp1}-{sp2}_Mean"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Std
col_data = self.dataframe_slices[col_str].std(axis=1).values
col_name = f"{self.__long_name__}_{sp1}-{sp2}_Std"
self.dataframe = add_col_to_df(self.dataframe, col_data, col_name)
# Time stamp
tend = self.timer.current()
self.time_stamp(f"{self.__long_name__} Calculation", self.timer.time_division(tend - t0))
# Save
self.save_hdf()
# Plot
if plot:
_, _ = self.plot(observable, display_plot=display_plot)
[docs] def plot(self, observable, display_plot: bool = False):
"""
Make a dual plot comparing the ACF and the Transport Coefficient by using the :meth:`plot_tc` method.
Parameters
----------
observable: :class:`sarkas.tools.observables.HeatFlux`
Observable object containing the ACF whose time integral leads to the self diffusion coefficient.
display_plot : bool, optional
Flag for displaying the plot if using the IPython. Default = False.
Return
------
fig : :class:`matplotlib.pyplot.Figure`
Matplotlib figure handle
(ax1, ax2, ax3, ax4) : tuple, :class:`matplotlib.axes.Axes`
Tuple containing the axes handles for `fig`. 'ax1` and `ax2` are the handles for the left and right plots respectively.
`ax3` and `ax4` are the handles for the "Index" axes, created from `ax1.twiny()` and `ax2.twiny()` respectively.
"""
sp_vacf_str = f"{observable.__long_name__} ACF"
if self.num_species > 1:
species_list = [*observable.species_names, "all"]
else:
species_list = observable.species_names
for isp, sp1 in enumerate(species_list):
for _, sp2 in enumerate(species_list[isp:], isp):
acf_avg = observable.dataframe_acf[(sp_vacf_str, f"{sp1}-{sp2}", "Total", "Mean")].to_numpy()
acf_std = observable.dataframe_acf[(sp_vacf_str, f"{sp1}-{sp2}", "Total", "Std")].to_numpy()
col_name = (f"{self.__long_name__}", f"{sp1}-{sp2}", "Mean")
tc_avg = self.dataframe[col_name].to_numpy()
col_name = (f"{self.__long_name__}", f"{sp1}-{sp2}", "Std")
tc_std = self.dataframe[col_name].to_numpy()
fig, (ax1, ax2, ax3, ax4) = self.plot_tc(
time=self.time_array,
acf_data=column_stack((acf_avg, acf_std)),
tc_data=column_stack((tc_avg, tc_std)),
acf_name=sp_vacf_str,
tc_name=f"{sp1}-{sp2} {self.__long_name__}",
figname=f"{self.__name__}_{sp1}-{sp2}_Plot.png",
show=display_plot,
)
return fig, (ax1, ax2, ax3, ax4)