Source code for sarkas.processes

"""
Module handling stages of an MD run: PreProcessing, Simulation, PostProcessing.
"""

from importlib import import_module
from IPython import get_ipython
from threading import Thread

if get_ipython().__class__.__name__ == "ZMQInteractiveShell":
    from tqdm import tqdm_notebook as tqdm
    from tqdm.notebook import trange
else:
    from tqdm import tqdm, trange

import matplotlib.pyplot as plt
from matplotlib.cm import get_cmap, ScalarMappable
from matplotlib.colors import LogNorm
from numpy import (
    arange,
    array,
    int64,
    linspace,
    log2,
    log10,
    logspace,
    meshgrid,
    sqrt,
    zeros,
)
from os import listdir, mkdir
from os import remove as os_remove
from os import stat as os_stat
from os.path import exists, join
from pandas import DataFrame, read_csv
from seaborn import scatterplot
from warnings import warn

from .core import Parameters
from .particles import Particles
from .plasma import Species
from .potentials.core import Potential
from .time_evolution.integrators import Integrator

# Sarkas modules
from .utilities.io import InputOutput, print_to_logger
from .utilities.maths import force_error_analytic_pp, force_error_approx_pppm
from .utilities.timing import SarkasTimer


[docs]class Process: """Parent class for :class:`sarkas.process.PreProcess`, :class:`sarkas.process.Simulation`, and :class:`sarkas.process.PostProcess`. Parameters ---------- input_file : str Path to the YAML input file. Default = `None` Attributes ---------- potential : :class:`sarkas.potential.base.Potential` Class handling the interaction between particles. integrator : :class:`sarkas.time_evolution.integrators.Integrator` Class handling the integrator. particles: :class:`sarkas.particles.Particles` Class handling particles properties. parameters : :class:`sarkas.core.Parameters` Class handling simulation's parameters. species : list List of :class:`sarkas.plasma.Species` classes. input_file : str Path to YAML input file. timer : :class:`sarkas.utilities.timing.SarkasTimer` Class handling the timing of processes. io : :class:`sarkas.utilities.io.InputOutput` Class handling the IO in Sarkas. """
[docs] def __init__(self, input_file: str = None): self.potential = Potential() self.integrator = Integrator() self.parameters = Parameters() self.particles = Particles() self.species = [] # Deprecated self.species = [] self.threads_ls = [] self.observables_dict = {} self.transport_dict = {} self.input_file = input_file self.timer = SarkasTimer() self.io = InputOutput(process=self.__name__)
[docs] def common_parser(self, filename: str = None): """ Parse simulation parameters from YAML file. Parameters ---------- filename: str Input YAML file Return ------ dics : dict Nested dictionary from reading the YAML input file. """ if filename: self.input_file = filename dics = self.io.from_yaml(self.input_file) return dics
[docs] def update_subclasses_from_dict(self, nested_dict: dict): """Update the subclasses parameters using a dictionary. Parameters ---------- nested_dict : dict Nested dictionary. See example for format. """ for lkey, vals in nested_dict.items(): if lkey not in ["Particles", "Observables", "TransportCoefficients"]: self.__dict__[lkey.lower()].__dict__.update(vals) elif lkey in ["Particles", "Plasma"]: # Remember Particles should be a list of dict # example: # args = {"Particles" : [ { "Species" : { "name": "O" } } ] } # Check if you already have a non-empty dict of species if len(self.species) == 0: # If so do you want to replace or update? # Update species attributes for sp, species in enumerate(vals): spec = Species(species["Species"]) if hasattr(spec, "replace"): self.species[sp].__dict__.update(spec.__dict__) else: self.species.append(spec) else: # Append new species for sp, species in enumerate(vals): spec = Species(species["Species"]) self.species.append(spec) elif lkey in ["Observables"]: for obs_dict in vals: for obs, params in obs_dict.items(): module = import_module(".observables", "sarkas.tools") class_ = getattr(module, obs) inst = class_() if inst.__long_name__ in self.observables_dict.keys(): self.observables_dict[inst.__long_name__].__dict__.update(params) else: self.observables_dict[inst.__long_name__] = inst self.observables_dict[inst.__long_name__].from_dict(params) elif lkey in ["TransportCoefficients"]: for obs_dict in vals: for obs, params in obs_dict.items(): module = import_module(".transport", "sarkas.tools") class_ = getattr(module, obs) inst = class_() if inst.__long_name__ in self.transport_dict.keys(): self.transport_dict[inst.__long_name__].__dict__.update(params) else: self.transport_dict[inst.__long_name__] = inst self.transport_dict[inst.__long_name__].from_dict(params) # electron properties has been moved to the Parameters class. Therefore I need to put this here. if hasattr(self.potential, "electron_temperature"): self.parameters.electron_temperature = self.potential.electron_temperature elif hasattr(self.potential, "electron_temperature_eV"): self.parameters.electron_temperature_eV = self.potential.electron_temperature_eV
[docs] def instantiate_subclasses_from_dict(self, nested_dict: dict): """Instantiate the process subclasses from a dictionary.""" for lkey, vals in nested_dict.items(): if lkey not in ["Particles", "Observables", "TransportCoefficients"]: # self.__dict__[lkey.lower()].__dict__.update(vals) if lkey == "Potential": self.potential.from_dict(nested_dict[lkey]) elif lkey == "Integrator": self.integrator.from_dict(nested_dict[lkey]) elif lkey == "Parameters": self.parameters.from_dict(nested_dict[lkey]) elif lkey in ["Particles", "Plasma"]: for sp, species in enumerate(vals): spec = Species(species["Species"]) self.species.append(spec) elif lkey in ["Observables"]: for obs_dict in vals: for obs, params in obs_dict.items(): module = import_module(".observables", "sarkas.tools") class_ = getattr(module, obs) inst = class_() self.observables_dict[inst.__long_name__] = inst self.observables_dict[inst.__long_name__].from_dict(params) elif lkey in ["TransportCoefficients"]: for obs_dict in vals: for obs, params in obs_dict.items(): module = import_module(".transport", "sarkas.tools") class_ = getattr(module, obs) inst = class_() self.transport_dict[inst.__long_name__] = inst self.transport_dict[inst.__long_name__].from_dict(params) # electron properties has been moved to the Parameters class. Therefore I need to put this here. if hasattr(self.potential, "electron_temperature"): self.parameters.electron_temperature = self.potential.electron_temperature elif hasattr(self.potential, "electron_temperature_eV"): self.parameters.electron_temperature_eV = self.potential.electron_temperature_eV
[docs] def directory_sizes(self): """Calculate the size of the dumps directories and print them to logger.""" # Estimate size of dump folder # Grab one file from the dump directory and get the size of it. if self.parameters.equilibration_phase: if not listdir(self.io.eq_dump_dir): raise FileNotFoundError( "Could not estimate the size of the equilibration phase dumps" " because there are no dumps in the equilibration directory." "Re-run .time_n_space_estimate(loops) with loops > eq_dump_step" ) else: eq_dump_size = os_stat(join(self.io.eq_dump_dir, listdir(self.io.eq_dump_dir)[0])).st_size eq_dump_fldr_size = eq_dump_size * (self.parameters.equilibration_steps / self.parameters.eq_dump_step) else: eq_dump_size = 0 eq_dump_fldr_size = 0 if not listdir(self.io.prod_dump_dir): raise FileNotFoundError( "Could not estimate the size of the production phase dumps because" " there are no dumps in the production directory." "Re-run .time_n_space_estimate(loops) with loops > prod_dump_step" ) # Grab one file from the dump directory and get the size of it. prod_dump_size = os_stat(join(self.io.eq_dump_dir, listdir(self.io.eq_dump_dir)[0])).st_size prod_dump_fldr_size = prod_dump_size * (self.parameters.production_steps / self.parameters.prod_dump_step) # Prepare arguments to pass for print out sizes = array([[eq_dump_size, eq_dump_fldr_size], [prod_dump_size, prod_dump_fldr_size]]) # Check for electrostatic equilibration if self.parameters.magnetized and self.parameters.electrostatic_equilibration: if not listdir(self.io.mag_dump_dir): raise FileNotFoundError( "Could not estimate the size of the magnetization phase dumps because" " there are no dumps in the production directory." "Re-run .time_n_space_estimate(loops) with loops > mag_dump_step" ) dump = self.parameters.mag_dump_step mag_dump_size = os_stat(join(self.io.mag_dump_dir, "checkpoint_" + str(dump) + ".npz")).st_size mag_dump_fldr_size = mag_dump_size * (self.parameters.magnetization_steps / self.parameters.mag_dump_step) sizes = array( [ [eq_dump_size, eq_dump_fldr_size], [prod_dump_size, prod_dump_fldr_size], [mag_dump_size, mag_dump_fldr_size], ] ) self.io.directory_size_report(sizes, process=self.__name__)
[docs] def evolve(self, phase, thermalization, it_start, it_end, dump_step): """ Evolve the system forward in time. Parameters ---------- phase: str Indicates the stage of the simulation used for saving dumps in the right directory. \n Choices = ("equilibration", "production", "magnetization") thermalization : bool Indicates whether to apply the thermostat or not. it_start: int Initial timestep of the loop. it_end: int Final timestep of the loop. dump_step: int Interval for dumping data. """ for it in trange(it_start, it_end, disable=not self.parameters.verbose): # Calculate the Potential energy and update particles' data self.integrator.update(self.particles) if (it + 1) % dump_step == 0: self.particles.calculate_observables() self.io.dump(phase, self.particles, it + 1) if thermalization and (it + 1 >= self.integrator.thermalization_timestep): self.particles.calculate_species_kinetic_temperature() self.integrator.thermostate(self.particles)
[docs] def evolve_loop_threading(self, phase, thermalization, it_start, it_end, dump_step): """ Evolve the system forward in time. This method is similar to :meth:`sarkas.processes.Process.evolve_loop` with the only difference that it uses `threading` for saving data, it starts a new thread to save the data. In the case of small number of particles this can slow down the simulation, therefore it must be chosen by setting the parameters `threading = True` in the input file or in the :class:`sarkas.core.Parameters` class. Parameters ---------- phase: str Indicates the stage of the simulation used for saving dumps in the right directory. \n Choices = ("equilibration", "production", "magnetization") thermalization : bool Indicates whether to apply the thermostat or not. it_start: int Initial timestep of the loop. it_end: int Final timestep of the loop. dump_step: int Interval for dumping data. """ for it in trange(it_start, it_end, disable=not self.parameters.verbose): # Calculate the Potential energy and update particles' data self.integrator.update(self.particles) if (it + 1) % dump_step == 0: th = Thread( target=self.io.dump, name=f"Sarkas_{phase.capitalize()}_Thread - {it + 1}", args=( phase, self.particles.__deepcopy__(), it + 1, ), ) self.threads_ls.append(th) th.start() if thermalization and (it + 1 >= self.integrator.thermalization_timestep): self.integrator.thermostate(self.particles) # Wait for all the threads to finish for x in self.threads_ls: x.join() self.threads_ls.clear()
[docs] def initialization(self): """Initialize all classes.""" # initialize the directories and filenames self.io.setup() # Copy relevant subsclasses attributes into parameters class. This is needed for post-processing. self.parameters.copy_io_attrs(self.io) # Update parameters' dictionary with filenames and directories self.parameters.potential_type = self.potential.type.lower() self.parameters.setup(self.species) self.parameters.dt = self.integrator.dt # Initialize particles t0 = self.timer.current() self.particles.setup(self.parameters, self.species) time_ptcls = self.timer.current() # Initialize potential and calculate initial potential self.potential.setup(self.parameters, self.species) self.potential.calc_acc_pot(self.particles) time_pot = self.timer.current() self.parameters.cutoff_radius = self.potential.rc # Initialize Integrator self.integrator.setup(self.parameters, self.potential) # Copy needed parameters for pretty print self.parameters.dt = self.integrator.dt self.parameters.equilibration_integrator = self.integrator.equilibration_type self.parameters.production_integrator = self.integrator.production_type if self.parameters.magnetized: self.parameters.magnetization_integrator = self.integrator.magnetization_type # Copy some parameters needed for saving data self.io.copy_params(self.parameters) # For restart and backups. self.io.setup_checkpoint(self.parameters) self.io.save_pickle(self) # Print Process summary to file and screen self.io.simulation_summary(self) time_end = self.timer.current() # self.evolve = self.evolve_loop_threading if self.parameters.threading else self.evolve_loop # Print timing self.io.time_stamp("Particles Initialization", self.timer.time_division(time_ptcls - t0)) self.io.time_stamp("Potential Initialization", self.timer.time_division(time_pot - time_ptcls)) self.io.time_stamp("Total Simulation Initialization", self.timer.time_division(time_end - t0)) self.print_initial_state()
[docs] def print_initial_state(self): """Print the initial energies of the system.""" init_eng = " Initial Energies " msg = f"\n\n{init_eng:-^70}\n" f"Initial temperature and kinetic energy of each species\n" self.particles.calculate_species_kinetic_temperature() self.particles.calculate_species_potential_energy() factor = self.parameters.J2erg if self.parameters.units == "mks" else 1.0 / self.parameters.J2erg for sp, kp, tp, pot_sp in zip( self.species, self.particles.species_kinetic_energy, self.particles.species_temperatures, self.particles.species_potential_energy, ): sp_msg = ( f"Species {sp.name} :\n" f"\tTemperature = {tp:.6e} {self.parameters.units_dict['temperature']} = {tp * self.parameters.eV2K:.6e} {self.parameters.units_dict['electron volt']}\n" f"\tKinetic Energy = {kp:.6e} {self.parameters.units_dict['energy']} = {kp * factor / self.parameters.eV2J:.6e} {self.parameters.units_dict['electron volt']}\n" f"\tPotential Energy = {pot_sp:.6e} {self.parameters.units_dict['energy']} = {pot_sp * factor / self.parameters.eV2J:.6e} {self.parameters.units_dict['electron volt']}\n" ) msg += sp_msg tot_kin_e = self.particles.species_kinetic_energy.sum() tot_pot_e = self.particles.species_potential_energy.sum() tot_e = tot_kin_e + tot_pot_e msg += ( f"Initial total kinetic energy = {tot_kin_e:.6e} {self.parameters.units_dict['energy']} = {tot_kin_e * factor / self.parameters.eV2J:.6e} {self.parameters.units_dict['electron volt']}\n" f"Initial total potential energy = {tot_pot_e:.6e} {self.parameters.units_dict['energy']} = {tot_pot_e * factor / self.parameters.eV2J:.6e} {self.parameters.units_dict['electron volt']}\n" f"Initial total energy = {tot_e:.6e} {self.parameters.units_dict['energy']} = {tot_e * factor / self.parameters.eV2J:.6e} {self.parameters.units_dict['electron volt']}\n" ) self.io.write_to_logger(msg)
[docs] def setup(self, read_yaml: bool = True, input_file: str = None, other_inputs: dict = None): """ Setup simulations' subclasses by reading the YAML input file and update them with `other_inputs`. Parameters ---------- read_yaml: bool Flag for reading YAML input file. Default = True. input_file: str (optional) Path to YAML file with inputs. other_inputs: dict (optional) Nested dictionary with additional simulations options. This is called after reading the YAML file. """ if input_file: self.input_file = input_file if read_yaml: yaml_dict = self.common_parser() self.instantiate_subclasses_from_dict(yaml_dict) if other_inputs: if not isinstance(other_inputs, dict): raise TypeError("Wrong input type. other_inputs should be a nested dictionary") self.update_subclasses_from_dict(other_inputs) if self.__name__ == "postprocessing": # Create the file paths without creating directories and redefining io attributes self.io.make_directory_tree() self.io.make_directories() self.io.make_files_tree() # Read previously stored files self.io.read_pickle(self, self.io.directory_tree["simulation"]["path"]) self.io.copy_params(self.parameters) # Print parameters to log file if not exists(self.io.log_file): # if the file exists do not print the file header self.io.file_header() self.io.simulation_summary(self) self.io.datetime_stamp() if self.grab_last_step: # Initialize the Particles class attributes by reading the last step old_method = self.parameters.load_method self.parameters.load_method = "production_restart" no_dumps = len(listdir(self.io.prod_dump_dir)) last_step = self.parameters.prod_dump_step * (no_dumps - 1) if no_dumps == 0: self.parameters.load_method = "equilibration_restart" no_dumps = len(listdir(self.io.eq_dump_dir)) last_step = self.parameters.eq_dump_step * (no_dumps - 1) self.parameters.restart_step = last_step self.particles.setup(self.parameters, self.species) # Restore the original value for future use self.parameters.load_method = old_method # Update the log file. It is set to the simulation log in the parameters class, but it is correct in the IO class. self.parameters.log_file = self.io.log_file else: self.initialization() if self.parameters.plot_style: plt.style.use(self.parameters.plot_style)
[docs] def setup_from_dict(self, input_dict: dict): """Setup simulations' subclasses from a nested dictionary. Parameters ---------- input_dict: dict Nested dictionary with all necessary simulations parameters. Note ---- This method does the same as:meth:`setup` but without reading a yaml file. If you want to update the attributes of this class use :meth:`update_subclasses_from_dict`. """ self.instantiate_subclasses_from_dict(input_dict) if self.__name__ == "postprocessing": # Create the file paths without creating directories and redefining io attributes self.io.make_directory_tree() self.io.make_directories() self.io.make_files_tree() # Read previously stored files self.io.read_pickle(self, self.io.directory_tree["simulation"]["path"]) self.io.copy_params(self.parameters) # Print parameters to log file if not exists(self.io.log_file): # if the file exists do not print the file header self.io.file_header() self.io.simulation_summary(self) self.io.datetime_stamp() if self.grab_last_step: # Initialize the Particles class attributes by reading the last step old_method = self.parameters.load_method self.parameters.load_method = "production_restart" no_dumps = len(listdir(self.io.prod_dump_dir)) last_step = self.parameters.prod_dump_step * (no_dumps - 1) if no_dumps == 0: self.parameters.load_method = "equilibration_restart" no_dumps = len(listdir(self.io.eq_dump_dir)) last_step = self.parameters.eq_dump_step * (no_dumps - 1) self.parameters.restart_step = last_step self.particles.setup(self.parameters, self.species) # Restore the original value for future use self.parameters.load_method = old_method # Update the log file. It is set to the simulation log in the parameters class, but it is correct in the IO class. self.parameters.log_file = self.io.log_file else: self.initialization() if self.parameters.plot_style: plt.style.use(self.parameters.plot_style)
[docs]class PostProcess(Process): """ Class handling the post-processing stage of a simulation. Parameters ---------- input_file : str Path to the YAML input file. """
[docs] def __init__(self, input_file: str = None, grab_last_step: bool = False): self.__name__ = "postprocessing" self.grab_last_step = grab_last_step super().__init__(input_file)
[docs] def run(self): """Calculate all the observables from the YAML input file.""" if len(self.observables_dict.keys()) == 0: print("No observables found in observables_dict") else: for obs_key, obs_class in self.observables_dict.items(): obs_class.setup(self.parameters) msg = obs_class.pretty_print_msg() self.io.write_to_logger(msg) if obs_key == "Thermodynamics": # Make Temperature and Energy plots obs_class.temp_energy_plot(self) else: obs_class.compute() if len(self.transport_dict.keys()) == 0: print("No transport coefficients found in tranport_dict") else: for obs_key, obs_class in self.transport_dict.items(): obs_class.setup(self.parameters) msg = obs_class.pretty_print_msg() self.io.write_to_logger(msg) obs_class.compute(observable=self.observables_dict[obs_class.required_observable])
[docs] def setup_from_simulation(self, simulation): """ Setup postprocess' subclasses by (shallow) copying them from simulation object. Parameters ---------- simulation: :class:`sarkas.core.processes.Simulation` Simulation object """ self.parameters = simulation.parameters.__copy__() self.integrator = simulation.integrator.__copy__() self.potential = simulation.potential.__copy__() self.species = simulation.species.copy() self.io = simulation.io.__copy__() self.io.process = "postprocess"
[docs]class PreProcess(Process): """ Wrapper class handling the estimation of time and best parameters of a simulation. Parameters ---------- input_file : str Path to the YAML input file. Attributes ---------- loops: int Number of timesteps to run for time and size estimates. Default = 10 estimate: bool Run an estimate for the best PPPM parameters in the simulation. Default=False. pm_meshes: numpy.ndarray Array of mesh sizes used in the PPPM parameters estimation. pp_cells: numpy.ndarray Array of simulations box cells used in the PPPM parameters estimation. kappa: float Screening parameter. Calculated from :meth:`sarkas.potentials.core.Potential.matrix`. """
[docs] def __init__(self, input_file: str = None): self.__name__ = "preprocessing" self.estimate = False self.pm_meshes = logspace(3, 7, 12, base=2, dtype=int64) # array([16, 24, 32, 48, 56, 64, 72, 88, 96, 112, 128], dtype=int64) self.pm_caos = arange(1, 8, dtype=int64) self.pp_cells = arange(3, 16, dtype=int64) self.kappa = None super().__init__(input_file)
[docs] def analytical_approx_pppm(self): """Calculate the total force error as given in :cite:`Dharuman2017`.""" a_min = self.potential.pppm_alpha_ewald * 0.5 a_max = self.potential.pppm_alpha_ewald * 1.5 r_min = self.potential.rc * 0.5 r_max = self.potential.rc * 1.5 alphas = linspace(a_min, a_max, 101) rcuts = linspace(r_min, r_max, 101) pm_force_error = zeros(len(alphas)) pp_force_error = zeros((len(alphas), len(rcuts))) total_force_error = zeros((len(alphas), len(rcuts))) # TODO: Fix this hack potential_copy = self.potential.__copy__() potential_copy.setup(self.parameters, self.species) # potential_copy = self.potential.__deepcopy__() for ia, alpha in enumerate(alphas): for ir, rc in enumerate(rcuts): potential_copy.rc = rc potential_copy.pppm_alpha_ewald = alpha tot_err, pm_err, pp_err = force_error_approx_pppm(potential_copy) total_force_error[ia, ir] = tot_err pm_force_error[ia] = pm_err pp_force_error[ia, ir] = pp_err return ( total_force_error, pp_force_error, pm_force_error, rcuts / potential_copy.a_ws, alphas * potential_copy.a_ws, )
[docs] def green_function_timer(self): """Time Potential setup.""" self.timer.start() self.potential.pppm_setup() return self.timer.stop()
[docs] def make_color_map(self, rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error): """ Plot a color map of the total force error approximation. Parameters ---------- rcuts: numpy.ndarray Cut off distances. alphas: numpy.ndarray Ewald parameters. chosen_alpha: float Chosen Ewald parameter. chosen_rcut: float Chosen cut off radius. total_force_error: numpy.ndarray Force error matrix. Raises ------ DeprecationWarning """ warn( f"The function has been renamed make_pppm_color_map. make_color_map will be removed in v2.0.0.", DeprecationWarning, ) # Line Plot self.make_pppm_color_map(rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error)
[docs] @staticmethod def make_fit_plot(pp_xdata, pm_xdata, pp_times, pm_times, pp_opt, pm_opt, pp_xlabels, pm_xlabels, fig_path): """ Make a dual plot of the fitted functions. """ fig, ax = plt.subplots(1, 2, figsize=(12, 7)) ax[0].plot(pm_xdata, pm_times.mean(axis=-1), "o", label="Measured times") # ax[0].plot(pm_xdata, quadratic(pm_xdata, *pm_opt), '--r', label="Fit $f(x) = a + b x + c x^2$") ax[1].plot(pp_xdata, pp_times.mean(axis=-1), "o", label="Measured times") # ax[1].plot(pp_xdata, linear(pp_xdata, *pp_opt), '--r', label="Fit $f(x) = a x$") ax[0].set_xscale("log") ax[0].set_yscale("log") ax[1].set_xscale("log") ax[1].set_yscale("log") ax[0].legend() ax[1].legend() ax[0].set_xticks(pm_xdata) ax[0].set_xticklabels(pm_xlabels) # Rotate the tick labels and set their alignment. plt.setp(ax[0].get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor") ax[1].set_xticks(pp_xdata[0:-1:3]) ax[1].set_xticklabels(pp_xlabels) # Rotate the tick labels and set their alignment. plt.setp(ax[1].get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor") ax[0].set_title("PM calculation") ax[1].set_title("PP calculation") ax[0].set_xlabel("Mesh sizes") ax[1].set_xlabel(r"$r_c / a_{ws}$") fig.tight_layout() fig.savefig(join(fig_path, "Timing_Fit.png"))
[docs] def make_line_plot(self, rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error): """ Plot selected values of the total force error approximation. Parameters ---------- rcuts: numpy.ndarray Cut off distances. alphas: numpy.ndarray Ewald parameters. chosen_alpha: float Chosen Ewald parameter. chosen_rcut: float Chosen cut off radius. total_force_error: numpy.ndarray Force error matrix. Raises ------ : DeprecationWarning """ warn( f"The function has been renamed make_pppm_line_plot. make_line_plot will be removed in v2.0.0.", DeprecationWarning, ) # Line Plot self.make_pppm_line_plot(rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error)
[docs] def make_lagrangian_plot(self): "TODO: complete this." c_mesh, m_mesh = meshgrid(self.pp_cells, self.pm_meshes) fig = plt.figure() ax = fig.add_subplot(111) # projection='3d') # CS = ax.plot_surface(m_mesh, c_mesh, self.lagrangian, rstride=1, cstride=1, cmap='viridis', edgecolor='none') CS = ax.contourf( m_mesh, c_mesh, self.lagrangian, norm=LogNorm(vmin=self.lagrangian.min(), vmax=self.lagrangian.max()) ) CS2 = ax.contour(CS, colors="w") ax.clabel(CS2, fmt="%1.0e", colors="w") fig.colorbar(CS) ax.scatter(self.best_mesh, self.best_cells, s=200, c="k") ax.set_xlabel("Mesh size") ax.set_ylabel(r"Cells = $L/r_c$") ax.set_title("2D Lagrangian") fig.savefig(join(self.io.preprocessing_dir, "2D_Lagrangian.png"))
# ax[1].set_xticks([8, 16, 32, 64, 128]) # ax[1].set_xticklabels([8, 16, 32, 64, 128])
[docs] def make_pppm_line_plot(self, rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error): """ Plot selected values of the total force error approximation. Parameters ---------- rcuts: numpy.ndarray Cut off distances. alphas: numpy.ndarray Ewald parameters. chosen_alpha: float Chosen Ewald parameter. chosen_rcut: float Chosen cut off radius. total_force_error: numpy.ndarray Force error matrix. """ # Plot the results fig_path = self.pppm_plots_dir fig, ax = plt.subplots(1, 2, constrained_layout=True, figsize=(19, 7)) linestyles = [(0, (5, 10)), "dashed", "solid", "dashdot", (0, (3, 10, 1, 10))] indexes = [30, 40, 50, 60, 70] for lns, i in zip(linestyles, indexes): min_rc = rcuts[total_force_error[i, :].argmin()] rc_lbl = ( r"$\alpha a_{ws} = " + "{:.2f}$".format(alphas[i]) + r" min @ $r_c = " + "{:.2f}".format(min_rc) + r" a_{\rm ws}$" ) ax[0].plot(rcuts, total_force_error[i, :], ls=lns, label=rc_lbl) min_a = alphas[total_force_error[:, i].argmin()] a_lbl = ( r"$r_c = {:.2f}".format(rcuts[i]) + " a_{ws}$" + r" min @ $\alpha_{\rm min} a_{ws} = " + "{:.2f}$".format(min_a) ) ax[1].plot(alphas, total_force_error[:, i], ls=lns, label=a_lbl) ax[0].set(ylabel=r"$\Delta F^{approx}_{tot}$", xlabel=r"$r_c/a_{ws}$", yscale="log") ax[1].set(xlabel=r"$\alpha \; a_{ws}$", yscale="log") ax[0].axvline(chosen_rcut, ls="--", c="k") ax[0].axhline(self.potential.force_error, ls="--", c="k") ax[1].axhline(self.potential.force_error, ls="--", c="k") ax[1].axvline(chosen_alpha, ls="--", c="k") if rcuts[-1] * self.parameters.a_ws > 0.5 * self.parameters.box_lengths.min(): ax[0].axvline(0.5 * self.parameters.box_lengths.min() / self.parameters.a_ws, c="r", label=r"$L/2$") for a in ax: a.grid(True, alpha=0.3) a.legend(loc="best") fig.suptitle( r"Parameters $N = {}, \quad M = {}, \quad p = {}, \quad \kappa = {:.2f}$".format( self.parameters.total_num_ptcls, self.potential.pppm_mesh[0], self.potential.pppm_cao[0], self.parameters.a_ws / self.potential.screening_length, ) ) fig.savefig(join(fig_path, "LinePlot_ForceError_" + self.io.job_id + ".png"))
[docs] def make_pppm_color_map(self, rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error): """ Plot a color map of the total force error approximation. Parameters ---------- rcuts: numpy.ndarray Cut off distances. alphas: numpy.ndarray Ewald parameters. chosen_alpha: float Chosen Ewald parameter. chosen_rcut: float Chosen cut off radius. total_force_error: numpy.ndarray Force error matrix. """ # Plot the results fig_path = self.pppm_plots_dir r_mesh, a_mesh = meshgrid(rcuts, alphas) fig, ax = plt.subplots(1, 1, figsize=(10, 7)) # if total_force_error.min() == 0.0: # minv = 1e-120 # else: # minv = total_force_error.min() # total_force_error[total_force_error == 0.0] = minv CS = ax.pcolormesh(a_mesh, r_mesh, total_force_error, shading="auto", norm=LogNorm()) CS2 = ax.contour(a_mesh, r_mesh, total_force_error, levels=10, colors="w", norm=LogNorm()) ax.clabel(CS2, fmt="%1.0e", colors="w") ax.scatter(chosen_alpha, chosen_rcut, s=200, c="k") if rcuts[-1] * self.parameters.a_ws > 0.5 * self.parameters.box_lengths.min(): ax.axhline(0.5 * self.parameters.box_lengths.min() / self.parameters.a_ws, c="r", label=r"$L/2$") # ax.tick_parameters(labelsize=fsz) ax.set_xlabel(r"$\alpha \;a_{ws}$") ax.set_ylabel(r"$r_c/a_{ws}$") ax.set_title( r"Parameters $N = {}, \quad M = {}, \quad p = {}, \quad \kappa = {:.2f}$".format( self.parameters.total_num_ptcls, self.potential.pppm_mesh[0], self.potential.pppm_cao[0], self.parameters.a_ws / self.potential.screening_length, ) ) clb = fig.colorbar(CS) clb.set_label(r"$\Delta F^{approx}_{tot}(r_c,\alpha)$", va="bottom", rotation=270) fig.tight_layout() fig.savefig(join(fig_path, "ClrMap_ForceError_" + self.io.job_id + ".png"))
[docs] def make_timing_plots(self, data_df: DataFrame = None): """ Makes a figure with three subplots of the CPU times vs PPPM parameters.\n The first plot is PP acc time vs the number of cells at different Mesh sizes.\n The second plot is the PM acc time vs the mesh size at different charge assignment orders.\n The third plot is the time for the calculation of the optimal green's function for different charge asssignment orders. Parameters ---------- data_df : pandas.DataFrame, Optional Timing study data. If `None` it will look for previously saved data, otherwise it will run :meth:`sarkas.processes.PreProcess.timing_study_calculation` to calculate the data. Default is `None`. """ if not data_df: try: data_df = read_csv( join(self.io.preprocessing_dir, f"TimingStudy_data_{self.io.job_id}.csv"), index_col=False ) self.dataframe = data_df.copy() except FileNotFoundError: print(f"I could not find the data from the timing study. Running the timing study now.") self.timing_study_calculation() else: data_df = self.dataframe.copy(deep=True) fig, ax = plt.subplots(1, 3, figsize=(21, 7)) scatterplot(data=data_df, x="pp_cells", y="pp_acc_time [s]", hue="M_x", s=100, palette="viridis", ax=ax[0]) scatterplot(data=data_df, x="M_x", y="pm_acc_time [s]", hue="pppm_cao_x", s=150, palette="viridis", ax=ax[1]) scatterplot(data=data_df, x="M_x", y="G_k time [s]", hue="pppm_cao_x", s=150, palette="viridis", ax=ax[2]) # ax[0].legend(ncol = 2) ax[0].set(yscale="log", xlabel="LCL Cells", ylabel="PP Time [s]") ax[1].set(yscale="log", xlabel="Mesh", ylabel="PM Time [s]") ax[2].set(yscale="log", xlabel="Mesh", ylabel="Green Function Time [s]") ax[1].set_xscale("log", base=2) ax[2].set_xscale("log", base=2) fig_path = self.pppm_plots_dir fig.savefig(join(fig_path, f"PPPM_Times_{self.io.job_id}.png")) print(f"\nFigures can be found in {self.pppm_plots_dir}")
[docs] def make_force_v_timing_plot(self, data_df: DataFrame = None): """Make contour maps of the force error and total acc time as functions of LCL cells and PM meshes for each charge assignment order sequence. Parameters ---------- data_df : pandas.DataFrame, Optional Timing study data. If `None` it will look for previously saved data, otherwise it will run :meth:`sarkas.processes.PreProcess.timing_study_calculation` to calculate the data. Default is `None`. """ from scipy.interpolate import griddata fig_path = self.pppm_plots_dir if not data_df: try: data_df = read_csv( join(self.io.preprocessing_dir, f"TimingStudy_data_{self.io.job_id}.csv"), index_col=False ) self.dataframe = data_df.copy() except FileNotFoundError: print(f"I could not find the data from the timing study. Running the timing study now.") self.timing_study_calculation() else: data_df = self.dataframe.copy(deep=True) # Plot the results for _, cao in enumerate(self.pm_caos): mask = self.dataframe["pppm_cao_x"] == cao df = data_df[mask][ ["M_x", "pp_cells", "force error [measured]", "pp_acc_time [s]", "pm_acc_time [s]", "tot_acc_time [s]"] ] # 2D-arrays from DataFrame n_meshes = len(df["M_x"].unique()) x1 = logspace(log2(df["M_x"].min()), log2(df["M_x"].max()), 5 * n_meshes, base=2) n_cells = len(df["pp_cells"].unique()) y1 = linspace(df["pp_cells"].min(), df["pp_cells"].max(), 5 * n_cells) m_mesh, c_mesh = meshgrid(x1, y1) # Interpolate unstructured D-dimensional data. tot_time_map = griddata((df["M_x"], df["pp_cells"]), df["tot_acc_time [s]"], (m_mesh, c_mesh)) force_error_map = griddata((df["M_x"], df["pp_cells"]), df["force error [measured]"], (m_mesh, c_mesh)) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 9)) if force_error_map.min() == 0.0: minv = 1e-120 else: minv = force_error_map.min() maxt = force_error_map.max() nlvl = 12 lvls = logspace(log10(minv), log10(maxt), nlvl) luxmap = get_cmap("viridis", nlvl) luxnorm = LogNorm(vmin=minv, vmax=maxt) CS = ax1.contourf(m_mesh, c_mesh, force_error_map, levels=lvls, cmap=luxmap, norm=luxnorm) clb = fig.colorbar(ScalarMappable(norm=luxnorm, cmap=luxmap), ax=ax1) clb.set_label(r"Force Error [$Q^2/ a_{\rm ws}^2$] " + f"@ cao = {cao}", rotation=270, va="bottom") CS2 = ax1.contour(CS, colors="w") ax1.clabel(CS2, fmt="%1.0e", colors="w") if cao == self.potential.pppm_cao[0]: input_Nc = int(self.potential.box_lengths[0] / self.potential.rc) ax1.scatter(self.potential.pppm_mesh[0], input_Nc, s=200, c="k") ax1.set_xscale("log", base=2) ax1.set(xlabel="Mesh size", ylabel=r"LCL Cells", title=f"Force Error Map @ cao = {cao}") # Timing Plot maxt = tot_time_map.max() mint = tot_time_map.min() # nlvl = 13 lvls = logspace(log10(mint), log10(maxt), nlvl) luxmap = get_cmap("viridis", nlvl) luxnorm = LogNorm(vmin=minv, vmax=maxt) CS = ax2.contourf(m_mesh, c_mesh, tot_time_map, levels=lvls, cmap=luxmap) CS2 = ax2.contour(CS, colors="w", levels=lvls) ax2.clabel(CS2, fmt="%.2e", colors="w") # fig.colorbar(, ax = ax2) clb = fig.colorbar(ScalarMappable(norm=luxnorm, cmap=luxmap), ax=ax2) clb.set_label("CPU Time [s]", rotation=270, va="bottom") if cao == self.potential.pppm_cao[0]: input_Nc = int(self.potential.box_lengths[0] / self.potential.rc) ax2.scatter(self.potential.pppm_mesh[0], input_Nc, s=200, c="k") ax2.set_xscale("log", base=2) ax2.set(xlabel="Mesh size", title=f"Timing Map @ cao = {cao}") fig.savefig(join(fig_path, f"ForceErrorMap_v_Timing_cao_{cao}_{self.io.job_id}.png"))
[docs] def postproc_estimates(self): # POST- PROCESSING self.io.postprocess_info(self, observable="header") # Header of process process_title = f"{'PostProcessing':^80}" msg = f"{'':*^80}\n {process_title} \n{'':*^80}" print_to_logger(msg, self.parameters.log_file, self.parameters.verbose) for _, obs_class in self.observables_dict.items(): obs_class.setup(self.parameters) msg += self.rdf.pretty_print_msg() print_to_logger(msg, self.parameters.log_file, self.parameters.verbose)
[docs] def make_pppm_plots_dir(self): self.pppm_plots_dir = join(self.io.preprocessing_dir, "PPPM_Plots") if not exists(self.pppm_plots_dir): mkdir(self.pppm_plots_dir)
[docs] def pppm_approximation(self): """ Calculate the force error for a PPPM simulation using analytical approximations.\n Plot the force error in the parameter space. """ self.make_pppm_plots_dir() # Calculate Force error from analytic approximation given in Dharuman et al. J Chem Phys 2017 total_force_error, pp_force_error, pm_force_error, rcuts, alphas = self.analytical_approx_pppm() chosen_alpha = self.potential.pppm_alpha_ewald * self.parameters.a_ws chosen_rcut = self.potential.rc / self.parameters.a_ws # mesh_dir = join(self.pppm_plots_dir, 'Mesh_{}'.format(self.potential.pppm_mesh[0])) # if not exists(mesh_dir): # mkdir(mesh_dir) # # cell_num = int(self.parameters.box_lengths.min() / self.potential.rc) # cell_dir = join(mesh_dir, 'Cells_{}'.format(cell_num)) # if not exists(cell_dir): # mkdir(cell_dir) # # self.pppm_plots_dir = cell_dir # Color Map self.make_pppm_color_map(rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error) # Line Plot self.make_pppm_line_plot(rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error) msg = f"\nFigures can be found in {self.pppm_plots_dir}" self.io.write_to_logger(msg)
[docs] def remove_preproc_dumps(self): # Delete the energy files created during the estimation runs os_remove(self.io.eq_energy_filename) os_remove(self.io.prod_energy_filename) # Delete dumps created during the estimation runs for npz in listdir(self.io.eq_dump_dir): os_remove(join(self.io.eq_dump_dir, npz)) for npz in listdir(self.io.prod_dump_dir): os_remove(join(self.io.prod_dump_dir, npz)) if self.parameters.magnetized and self.parameters.electrostatic_equilibration: os_remove(self.io.mag_energy_filename) # Remove dumps for npz in listdir(self.io.mag_dump_dir): os_remove(join(self.io.mag_dump_dir, npz))
[docs] def run( self, loops: int = 10, timing: bool = True, timing_study: bool = False, pppm_estimate: bool = False, postprocessing: bool = False, remove: bool = False, ): """ Estimate the time of the simulation and best parameters if wanted. Parameters ---------- loops : int Number of loops over which to average the acceleration calculation. Default = 10. timing : bool Flag for estimating simulation times. Default =True. timing_study : bool Flag for estimating time for simulation parameters. pppm_estimate : bool Flag for showing the force error plots in case of pppm algorithm. postprocessing : bool Flag for calculating Post processing parameters. remove : bool Flag for removing energy files and dumps created during times estimation. Default = False. """ # Clean everything plt.close("all") # Set the screening parameter self.kappa = self.potential.matrix[0, 0, 1] if self.potential.type == "yukawa" else 0.0 if timing: self.time_n_space_estimates(loops=loops) if remove: self.remove_preproc_dumps() if pppm_estimate: self.pppm_approximation() if timing_study: self.timing_study_calculation() self.make_timing_plots() self.make_force_v_timing_plot() print(f"\nFigures can be found in {self.pppm_plots_dir}") if postprocessing: self.postproc_estimates()
[docs] def time_acceleration(self, loops: int = 11): """ Run loops number of acceleration calculations for timing estimate. Parameters ---------- loops: int Number of simulation steps to run. Default = 11. """ if self.potential.linked_list_on: self.pp_acc_time = zeros(loops) for i in trange(loops, desc="PP acceleration timer", disable=not self.parameters.verbose): self.timer.start() self.potential.update_linked_list(self.particles) self.pp_acc_time[i] = self.timer.stop() # Calculate the mean excluding the first value because that time include numba compilation time pp_mean_time = self.timer.time_division(self.pp_acc_time[1:].mean()) self.io.preprocess_timing("PP", pp_mean_time, loops) # PM acceleration if self.potential.pppm_on: self.pm_acc_time = zeros(loops) for i in trange(loops, desc="PM acceleration timer", disable=not self.parameters.verbose): self.timer.start() self.potential.update_pm(self.particles) self.pm_acc_time[i] = self.timer.stop() pm_mean_time = self.timer.time_division(self.pm_acc_time[1:].mean()) self.io.preprocess_timing("PM", pm_mean_time, loops) if self.potential.method == "fmm": self.fmm_acc_time = zeros(loops) for i in range(loops): self.timer.start() self.integrator.update_accelerations(self.particles) self.fmm_acc_time[i] = self.timer.stop() fmm_mean_time = self.timer.time_division(self.fmm_acc_time[:].mean()) self.io.preprocess_timing("FMM", fmm_mean_time, loops)
[docs] def time_evolution_loop(self, loops: int = 11): """Run several loops of the equilibration and production phase to estimate the total time of the simulation. Parameters ---------- loops: int Number of simulation steps to run. Default = 11. """ msg = f"\nRunning {loops} steps for each phase to estimate simulation times\n" self.io.write_to_logger(msg) # Run few equilibration steps to estimate the equilibration time if self.parameters.equilibration_phase and self.parameters.electrostatic_equilibration: self.integrator.update = self.integrator.type_setup(self.integrator.equilibration_type) self.timer.start() self.evolve("equilibration", self.integrator.thermalization, 0, loops, self.parameters.eq_dump_step) self.eq_mean_time = self.timer.stop() / loops # Print the average equilibration & production times self.io.preprocess_timing("Equilibration", self.timer.time_division(self.eq_mean_time), loops) if self.parameters.magnetized and self.parameters.electrostatic_equilibration: self.integrator.update = self.integrator.type_setup(self.integrator.magnetization_type) self.timer.start() self.evolve("magnetization", self.integrator.thermalization, 0, loops, self.parameters.mag_dump_step) self.mag_mean_time = self.timer.stop() / loops # Print the average equilibration & production times self.io.preprocess_timing("Magnetization", self.timer.time_division(self.mag_mean_time), loops) # Run few production steps to estimate the equilibration time self.integrator.update = self.integrator.type_setup(self.integrator.production_type) self.timer.start() self.evolve("production", False, 0, loops, self.parameters.prod_dump_step) self.prod_mean_time = self.timer.stop() / loops self.io.preprocess_timing("Production", self.timer.time_division(self.prod_mean_time), loops) if self.parameters.equilibration_phase and self.parameters.electrostatic_equilibration: # Print the estimate for the full run eq_prediction = self.eq_mean_time * self.parameters.equilibration_steps self.io.time_stamp("Equilibration", self.timer.time_division(eq_prediction)) else: eq_prediction = 0.0 if self.parameters.magnetized and self.parameters.electrostatic_equilibration: mag_prediction = self.mag_mean_time * self.parameters.magnetization_steps self.io.time_stamp("Magnetization", self.timer.time_division(mag_prediction)) eq_prediction += mag_prediction prod_prediction = self.prod_mean_time * self.parameters.production_steps self.io.time_stamp("Production", self.timer.time_division(prod_prediction)) tot_time = eq_prediction + prod_prediction self.io.time_stamp("Total Run", self.timer.time_division(tot_time))
[docs] def time_n_space_estimates(self, loops: int = 10): """Estimate simulation times and space Parameters ---------- loops: int Number of simulation steps to run. Default = 10. """ if loops: loops += 1 self.io.preprocess_timing("header", [0, 0, 0, 0, 0, 0], 0) if self.potential.pppm_on: green_time = self.timer.time_division(self.green_function_timer()) self.io.preprocess_timing("GF", green_time, 0) self.time_acceleration(loops) self.time_evolution_loop(loops) self.directory_sizes()
[docs] def timing_study_calculation(self): """Estimate the best number of mesh points and cutoff radius.""" self.pppm_plots_dir = join(self.io.preprocessing_dir, "PPPM_Plots") if not exists(self.pppm_plots_dir): mkdir(self.pppm_plots_dir) msg = "\n\n{:=^70} \n".format(" Timing Study ") self.io.write_to_logger(msg) self.input_rc = self.potential.rc self.input_mesh = self.potential.pppm_mesh.copy() self.input_alpha = self.potential.pppm_alpha_ewald self.input_cao = self.potential.pppm_cao.copy() data = DataFrame() # Rescaling constant to calculate the PP force error rescaling_constant = ( sqrt(self.potential.total_num_ptcls) * self.potential.a_ws**2 / sqrt(self.potential.pbox_volume) ) # Set the maximum number of cells to be L / (2 * a_ws). 2* a_ws is the closest two particles can be. max_cells = int(0.5 * self.parameters.box_lengths.min() / self.parameters.a_ws) if max_cells != self.pp_cells[-1]: self.pp_cells = arange(3, max_cells, dtype=int) # Start the loop for averaging PM and PP acceleration times for _, m in enumerate( tqdm(self.pm_meshes, desc="Looping over the PM meshes", disable=not self.parameters.verbose) ): # Setup PM params self.potential.pppm_mesh = m * array([1, 1, 1], dtype=int) self.potential.pppm_alpha_ewald = 0.3 * m / self.potential.box_lengths.min() self.potential.pppm_h_array = self.potential.box_lengths / self.potential.pppm_mesh self.io.write_to_logger(f"\n Mesh = {m, m, m}:\n\t cao = ") for _, cao in enumerate(self.pm_caos): self.io.write_to_logger(f"{cao}, ") self.potential.pppm_cao = cao * array([1, 1, 1], dtype=int) # Update the potential matrix since alpha has changed if self.potential.type == "qsp": self.potential.pot_update_params(self.potential, self.species) else: self.potential.pot_update_params(self.potential, self.species) # The Green's function depends on alpha, Mesh and cao. It also updates the pppm_pm_err green_time = self.green_function_timer() # Calculate the PM acceleration timing 3x and average pm_acc_time = 0.0 for it in range(3): self.timer.start() self.potential.update_pm(self.particles) pm_acc_time += self.timer.stop() / 3.0 # Loop over the number of cells for _, cell in enumerate(self.pp_cells): # Cutoff radius is the side of the cells. self.potential.rc = self.potential.box_lengths.min() / cell # Update the potential pp error self.potential.pppm_pp_err = force_error_analytic_pp( self.potential.type, self.potential.rc, self.potential.screening_length, self.potential.pppm_alpha_ewald, rescaling_constant, ) # Note: the PM error does not depend on rc. Only on alpha and it is given by G_k self.potential.force_error = sqrt(self.potential.pppm_pp_err**2 + self.potential.pppm_pm_err**2) # The PP acceleration does not depend on cao. # However, it still needs to be in its loop for updating the dataframe. pp_acc_time = 0.0 for it in range(3): self.timer.start() self.potential.update_linked_list(self.particles) pp_acc_time += self.timer.stop() / 3.0 data = data.append( { "pp_cells": cell, "r_cut": self.potential.rc, "pppm_alpha_ewald": self.potential.pppm_alpha_ewald, "pppm_cao_x": self.potential.pppm_cao[0], "pppm_cao_y": self.potential.pppm_cao[1], "pppm_cao_z": self.potential.pppm_cao[2], "M_x": self.potential.pppm_mesh[0], "M_y": self.potential.pppm_mesh[1], "M_z": self.potential.pppm_mesh[2], "Mesh volume": self.potential.pppm_mesh.prod(), "Mesh": f"{self.potential.pppm_mesh[0], self.potential.pppm_mesh[1], self.potential.pppm_mesh[2]}", "h_x": self.potential.pppm_h_array[0], "h_y": self.potential.pppm_h_array[1], "h_z": self.potential.pppm_h_array[2], "h_M volume": self.potential.pppm_h_array.prod(), "h_x alpha": self.potential.pppm_h_array[0] * self.potential.pppm_alpha_ewald, "h_y alpha": self.potential.pppm_h_array[1] * self.potential.pppm_alpha_ewald, "h_z alpha": self.potential.pppm_h_array[2] * self.potential.pppm_alpha_ewald, "h_M a_ws^3": self.potential.pppm_h_array.prod() * self.potential.pppm_alpha_ewald**3, "G_k time [s]": green_time * 1.0e-9, "pp_acc_time [s]": pp_acc_time * 1.0e-9, "pm_acc_time [s]": pm_acc_time * 1.0e-9, "tot_acc_time [s]": (pp_acc_time + pm_acc_time) * 1.0e-9, "pppm_pp_error [measured]": self.potential.pppm_pp_err, "pppm_pm_error [measured]": self.potential.pppm_pm_err, "force error [measured]": self.potential.force_error, }, ignore_index=True, ) self.dataframe = data csv_location = join(self.io.preprocessing_dir, f"TimingStudy_data_{self.io.job_id}.csv") self.dataframe.to_csv(csv_location, index=False) # Reset the original values. self.potential.rc = self.input_rc self.potential.pppm_mesh = self.input_mesh.copy() self.potential.pppm_alpha_ewald = self.input_alpha self.potential.pppm_cao = self.input_cao.copy() self.potential.setup(self.parameters, self.species) msg = ( f"\nThe force error and computation times can be found in a dataframe at PreProcess.dataframe " f"and the corresponding csv file is saved in {csv_location}" ) self.io.write_to_logger(msg)
# pm_popt = zeros((len(self.pm_caos), 2)) # for ic, cao in enumerate(self.pm_caos): # # Fit the PM times # pm_popt[ic, :], _ = curve_fit( # lambda x, a, b: a + 5 * b * x ** 3 * log2(x ** 3), self.pm_meshes, pm_times[:, ic] # ) # fit_str = ( # r"Fit = $a_2 + 5 a_3 M^3 \log_2(M^3)$ [s]" # + "\n" # + r"$a_2 = ${:.4e}, $a_3 = ${:.4e} ".format(pm_popt[ic, 0], pm_popt[ic, 1]) # ) # print(f"\nPM Time for cao {cao}: " + fit_str) # # # Fit the PP Times # pp_popt, _ = curve_fit( # lambda x, a, b: a + b / x ** 3, # self.pp_cells, # pp_times.mean(axis=0), # p0=[pp_times.mean(axis=0)[0], self.parameters.total_num_ptcls], # bounds=(0, [pp_times.mean(axis=0)[0], 1e9]), # ) # fit_pp_str = ( # r"Fit = $a_0 + a_1 / N_c^3$ [s]" # + "\n" # + "$a_0 = ${:.4e}, $a_1 = ${:.4e}".format(pp_popt[0], pp_popt[1]) # ) # print(f"\nPP Time cao {cao}:" + fit_pp_str) # # # Start the plot # fig, (ax_pp, ax_pm) = plt.subplots(1, 2, sharey=True, figsize=(12, 7)) # ax_pm.plot(self.pm_meshes, pm_times[:, ic], "o", label=f"Measured cao: {cao}") # ax_pm.plot( # self.pm_meshes, # pm_popt[ic, 0] + 5 * pm_popt[ic, 1] * self.pm_meshes ** 3 * log2(self.pm_meshes ** 3), # ls="--", # label="Fit", # ) # ax_pm.annotate( # text=fit_str, # xy=(self.pm_meshes[-1], pm_times[-1, ic]), # xytext=(self.pm_meshes[0], pm_times[-1, ic]), # bbox=dict(boxstyle="round4", fc="white", ec="k", lw=2), # ) # # ax_pm.set(title=f"PM calculation time and estimate @ cao = {cao}", yscale="log", xlabel="Mesh size") # ax_pm.set_xscale("log", base=2) # ax_pm.legend(ncol=2) # # # Scatter Plot the PP Times # self.tot_time_map = zeros(pp_times.shape) # for j, mesh_points in enumerate(self.pm_meshes): # self.tot_time_map[j, :] = pm_times[j, ic] + pp_times[j, :] # ax_pp.plot(self.pp_cells, pp_times[j], "o", label=r"@ Mesh {}$^3$".format(mesh_points)) # # # Plot the Fit PP times # ax_pp.plot(self.pp_cells, pp_popt[0] + pp_popt[1] / self.pp_cells ** 3, ls="--", label="Fit") # ax_pp.legend(ncol=2) # ax_pp.annotate( # text=fit_pp_str, # xy=(self.pp_cells[0], pp_times[0, 0]), # xytext=(self.pp_cells[0], pp_times[-1, -1]), # bbox=dict(boxstyle="round4", fc="white", ec="k", lw=2), # ) # ax_pp.set( # title=f"PP calculation time and estimate @ cao = {cao}", # yscale="log", # ylabel="CPU Times [s]", # xlabel=r"$N_c $ = Cells", # ) # fig.tight_layout() # fig.savefig(join(self.pppm_plots_dir, f"Times_cao_{cao}_" + self.io.job_id + ".png")) # # self.make_force_v_timing_plot(ic) # self.lagrangian = np.empty((len(self.pm_meshes), len(self.pp_cells))) # self.tot_times = np.empty((len(self.pm_meshes), len(self.pp_cells))) # self.pp_times = pp_times.copy() # self.pm_times = pm_times.copy() # for i in range(len(self.pm_meshes)): # self.tot_times[i, :] = pp_times[i] + pm_times[i] # self.lagrangian[i, :] = self.force_error_map[i, :] # # best = np.unravel_index(self.lagrangian.argmin(), self.lagrangian.shape) # self.best_mesh = self.pm_meshes[best[0]] # self.best_cells = self.pp_cells[best[1]] # self.make_lagrangian_plot() # # # set the best parameter # self.potential.pppm_mesh = self.best_mesh * np.ones(3, dtype=int) # self.potential.rc = self.parameters.box_lengths.min() / self.best_cells # self.potential.pppm_alpha_ewald = 0.3 * self.best_mesh / self.parameters.box_lengths.min() # self.potential.pppm_setup(self.parameters) # # # print report # self.io.timing_study(self) # # time prediction # self.predicted_times = pp_times[best] + pm_times[best[0]] # # Print estimate of run times # self.io.time_stamp('Equilibration', # self.timer.time_division(self.predicted_times * self.parameters.equilibration_steps)) # self.io.time_stamp('Production', # self.timer.time_division(self.predicted_times * self.parameters.production_steps)) # self.io.time_stamp('Total Run', # self.timer.time_division(self.predicted_times * (self.parameters.equilibration_steps # + self.parameters.production_steps)))
[docs]class Simulation(Process): """ Sarkas simulation wrapper. This class manages the entire simulation and its small moving parts. Parameters ---------- input_file : str Path to the YAML input file. """
[docs] def __init__(self, input_file: str = None): self.__name__ = "simulation" super().__init__(input_file)
[docs] def check_restart(self, phase): """ Check if the simulation is a restart. Parameters ---------- phase: str Simulation phase, e.g. equilibration, Magnetization, production. Returns ------- it_start: int Restart step. """ if self.parameters.verbose: print(f"\n\n{phase.capitalize():-^70} \n") # Check if this is restart if self.parameters.load_method[:2] == phase[:2] and self.parameters.load_method[-7:] == "restart": it_start = self.parameters.restart_step else: it_start = 0 self.io.dump(phase, self.particles, 0) return it_start
[docs] def equilibrate(self): """ Run the time integrator with the thermostat to evolve the system to its thermodynamics equilibrium state. """ it_start = self.check_restart(phase="equilibration") self.integrator.update = self.integrator.type_setup(self.integrator.equilibration_type) # Start timer, equilibrate, and print run time. self.timer.start() self.evolve( "equilibration", self.integrator.thermalization, it_start, self.parameters.equilibration_steps, self.parameters.eq_dump_step, ) time_eq = self.timer.stop() self.io.time_stamp("Equilibration", self.timer.time_division(time_eq))
# def check_equil_dist(self): # # vel_dist = VelocityDistribution() # vel_dist.setup( # params=self.parameters, # phase="equilibration", # no_slices=1, # max_no_moment=6, # multi_run_average=False, # dimensional_average=False, # runs=1, # ) # vel_dist.compute() # # pass
[docs] def magnetize(self): # Check for magnetization phase it_start = self.check_restart(phase="magnetization") # Update integrator self.integrator.update = self.integrator.type_setup(self.integrator.magnetization_type) # Start timer, magnetize, and print run time. self.timer.start() self.evolve( "magnetization", self.integrator.thermalization, it_start, self.parameters.magnetization_steps, self.parameters.mag_dump_step, ) time_eq = self.timer.stop() self.io.time_stamp("Magnetization", self.timer.time_division(time_eq))
[docs] def produce(self): it_start = self.check_restart(phase="production") self.integrator.update = self.integrator.type_setup(self.integrator.production_type) # Update measurement flag for rdf. self.potential.measure = True self.timer.start() self.evolve("production", False, it_start, self.parameters.production_steps, self.parameters.prod_dump_step) time_eq = self.timer.stop() self.io.time_stamp("Production", self.timer.time_division(time_eq))
[docs] def run(self): """Run the simulation.""" time0 = self.timer.current() if self.parameters.equilibration_phase and self.parameters.electrostatic_equilibration: if self.parameters.remove_initial_drift: self.particles.remove_drift() self.equilibrate() if self.parameters.magnetization_phase: if self.parameters.remove_initial_drift: self.particles.remove_drift() self.magnetize() if self.parameters.production_phase: if self.parameters.remove_initial_drift: self.particles.remove_drift() self.produce() time_tot = self.timer.current() self.io.time_stamp("Total", self.timer.time_division(time_tot - time0)) self.directory_sizes()