Source code for sarkas.particles

"""
Module containing the basic class for handling particles properties.
"""

import csv
from copy import deepcopy
from h5py import File as h5File
from numba import float64, int64, jit, njit, void
from numpy import arange, array, empty, floor, int64
from numpy import load as np_load
from numpy import (
    loadtxt,
    meshgrid,
    ndarray,
    outer,
    rint,
    savetxt,
    savez,
    sqrt,
    triu_indices,
    zeros,
)
from numpy.random import Generator, PCG64
from os.path import join
from scipy.linalg import norm
from scipy.spatial.distance import pdist
from warnings import warn

from .utilities.exceptions import ParticlesError, ParticlesWarning


[docs]class Particles: """ Class handling particles' properties. Attributes ---------- kB : float Boltzmann constant. fourpie0: float Electrostatic constant :math:`4\\pi \\epsilon_0`. pos : numpy.ndarray Particles' positions. vel : numpy.ndarray Particles' velocities. acc : numpy.ndarray Particles' accelerations. box_lengths : numpy.ndarray Box sides' lengths. pbox_lengths : numpy.ndarray Initial particle box sides' lengths. masses : numpy.ndarray Mass of each particle. Shape = (attr:`sarkas.core.Parameters.total_num_ptcls`). charges : numpy.ndarray Charge of each particle. Shape = (attr:`sarkas.core.Parameters.total_num_ptcls`). id : numpy.ndarray, Species identifier. Shape = (attr:`sarkas.core.Parameters.total_num_ptcls`). names : numpy.ndarray Species' names. (attr:`sarkas.core.Parameters.total_num_ptcls`). rdf_nbins : int Number of bins for radial pair distribution. no_grs : int Number of independent :math:`g_{ij}(r)`. rdf_hist : numpy.ndarray Histogram array for the radial pair distribution function. prod_dump_dir : str Directory name where to store production phase's simulation's checkpoints. Default = 'dumps'. eq_dump_dir : str Directory name where to store equilibration phase's simulation's checkpoints. Default = 'dumps'. total_num_ptcls : int Total number of simulation's particles. num_species : int Number of species. species_num : numpy.ndarray Number of particles of each species. Shape = (attr:`sarkas.particles.Particles.num_species`). dimensions : int Number of non-zero dimensions. Default = 3. potential_energy : float Instantaneous value of the potential energy of each particle. Note that the total potential energy requires the multiplication of the array's sum by 0.5 to avoid double counting.\n For example: `N` = 3, `particle_potential_energy[0] = U_12 + U_13` and `particle_potential_energy[1] = U_21 + U_23` and `particle_potential_energy[1] = U_31 + U_32` rnd_gen : numpy.random.Generator Random number generator. Notes ----- Naming convention:\n Properties/Quantities of each particle are stored in `numpy.ndarray`'s as `.[property]`.\n Species properties/quantities are stored in `numpy.ndarray`'s as `.species_[property]`.\n Total properties/quantities are stored as `float`/`int` identified by `.total_[property]`.\n For example:\n :attr:`total_num_ptcls` is an `int` indicating the total number of particles. :attr:`potential_energy` is a 1-D array of length :attr:`total_num_ptcls` containing the potential energy of each particle.\n :attr:`species_num` is a 1-D array of length `num_species` containing the number of particles of each species.\n Methods for the calculation of properties/quantities follow the same convention as above but we the prefix `.calculate_[quantity]`.\n For example:\n :meth:`calculate_kinetic_energy()` calculates the kinetic energy of each particle and stores it in :attr:`kinetic_energy` a 1-D array of length :attr:`total_num_ptcls`.\n :meth:`calculate_species_kinetic_energy()` calculates the kinetic energy of each species and stores it in :attr:`species_kinetic_energy` a 1-D array of length :attr:`num_species`.\n :meth:`calculate_total_kinetic_energy()` calculates the total kinetic energy and stores it in :attr:`tottal_kinetic_energy` a float.\n\n Quantities requiring cross species evaluation are stored in `numpy.ndarray` as `.[quantity]_species_tensor`.\n For example: :attr:`virial_species_tensor` is a 3 x 3 x :attr:`num_species` x :attr:`num_species` tensor. :attr:`heat_flux_species_tensor` is a 3 x :attr:`num_species` x :attr:`num_species` tensor. The attributes :attr:`potential_energy`, :attr:`virial_species_tensor`, :attr:`heat_flux_species_tensor` are calculated by the :class:`sarkas.potentials.core.Potential` class.\n Therefore, this class is missing the :meth:`calculate_potential_energy`, :meth:`calculate_virial`, :meth:`calculate_heat_flux` methods. """
[docs] def __init__(self): self.mag_dump_dir = None self.rdf_nbins = None self.kB = None self.fourpie0 = None self.prod_dump_dir = None self.eq_dump_dir = None self.box_lengths = None self.pbox_lengths = None self.total_num_ptcls = None self.num_species = 1 self.species_num = None self.dimensions = None self.rnd_gen = None self.names = None self.id = None self.pos = None self.vel = None self.acc = None self.virial_species_tensor = None self.heat_flux_species_tensor = None self.potential_energy = None self.pbc_cntr = None self.masses = None self.charges = None self.cyclotron_frequencies = None self.species_initial_velocity = None self.species_thermal_velocity = None self.species_thermostat_temperatures = None self.species_masses = None self.species_charges = None self.no_grs = None self.rdf_hist = None self.observables_list = [ "Kinetic Energy", "Potential Energy", ] # "Pressure Tensor", "Enthalpy", "Heat Flux"] self.species_observables_calculator_dict = { "Kinetic Energy": self.calculate_species_kinetic_temperature, "Potential Energy": self.calculate_species_potential_energy, "Momentum": self.calculate_species_momentum, "Electric Current": self.calculate_species_electric_current, "Pressure Tensor": self.calculate_species_pressure_tensor, "Enthalpy": self.calculate_species_enthalpy, "Heat Flux": self.calculate_species_heat_flux, } self.thermodynamics_calculator_dict = { "Kinetic Energy": self.calculate_species_kinetic_temperature, "Potential Energy": self.calculate_species_potential_energy, "Momentum": self.calculate_species_momentum, "Electric Current": self.calculate_species_electric_current, "Pressure Tensor": self.calculate_species_pressure_tensor, "Enthalpy": self.calculate_species_enthalpy, "Heat Flux": self.calculate_species_heat_flux, }
def __repr__(self): sortedDict = dict(sorted(self.__dict__.items(), key=lambda x: x[0].lower())) disp = "Particles( \n" for key, value in sortedDict.items(): disp += "\t{} : {}\n".format(key, value) disp += ")" return disp def __copy__(self): """ Make a shallow copy of the object using copy by creating a new instance of the object and copying its __dict__.""" # Create a new object _copy = type(self)() # copy the dictionary _copy.__dict__.update(self.__dict__) return _copy def __deepcopy__(self, memodict: dict = {}): """Make a deepcopy of the object. Parameters ---------- memodict: dict Dictionary of id's to copies Returns ------- _copy: :class:`sarkas.particles.Particles` A new Particles class. """ id_self = id(self) # memorization avoids unnecessary recursion _copy = memodict.get(id_self) if _copy is None: # Make a shallow copy of all attributes _copy = type(self)() # Make a deepcopy of the mutable arrays using numpy copy function for k, v in self.__dict__.items(): if isinstance(v, ndarray): _copy.__dict__[k] = v.copy() else: _copy.__dict__[k] = deepcopy(v, memodict) return _copy def __getstate__(self): """Copy the object's state from self.__dict__ which contains all our instance attributes. Always use the dict.copy() method to avoid modifying the original state. Reference: https://docs.python.org/3/library/pickle.html#handling-stateful-objects """ state = self.__dict__.copy() # Remove the data that is stored already del state["pos"] del state["vel"] del state["acc"] del state["id"] del state["names"] del state["pbc_cntr"] del state["rdf_hist"] del state["virial_species_tensor"] del state["potential_energy"] del state["heat_flux_species_tensor"] return state def __setstate__(self, state): # Restore instance attributes. self.__dict__.update(state) # Initialize arrays self.pos = zeros((self.__dict__["total_num_ptcls"], 3)) self.vel = zeros((self.__dict__["total_num_ptcls"], 3)) self.acc = zeros((self.__dict__["total_num_ptcls"], 3)) self.id = zeros(self.__dict__["total_num_ptcls"]) self.names = zeros(self.__dict__["total_num_ptcls"]) self.pbc_cntr = zeros((self.__dict__["total_num_ptcls"], 3)) self.rdf_hist = zeros((self.__dict__["rdf_nbins"], self.__dict__["num_species"], self.__dict__["num_species"])) self.virial_species_tensor = zeros((3, 3, self.__dict__["num_species"], self.__dict__["num_species"])) self.potential_energy = zeros((self.__dict__["total_num_ptcls"])) self.heat_flux_species_tensor = zeros((3, self.__dict__["num_species"], self.__dict__["num_species"]))
[docs] def copy_params(self, params): """ Copy necessary parameters. Parameters ---------- params: :class:`sarkas.core.Parameters` Simulation's parameters. """ self.directory_tree = deepcopy(params.directory_tree) self.filenames_tree = deepcopy(params.filenames_tree) self.kB = params.kB self.dt = params.dt self.fourpie0 = params.fourpie0 self.prod_dump_dir = params.directory_tree["simulation"]["production"]["dumps"]["path"] self.eq_dump_dir = params.directory_tree["simulation"]["equilibration"]["dumps"]["path"] self.box_lengths = params.box_lengths.copy() self.pbox_lengths = params.pbox_lengths.copy() self.total_num_ptcls = params.total_num_ptcls self.total_num_density = params.total_num_density self.num_species = params.num_species self.species_num = params.species_num.copy() self.species_masses = params.species_masses.copy() self.species_charges = params.species_charges.copy() self.dimensions = params.dimensions self.box_volume = params.box_volume self.pbox_volume = params.pbox_volume self.load_method = params.load_method self.restart_step = params.restart_step self.particles_input_file = params.particles_input_file self.load_perturb = params.load_perturb self.load_rejection_radius = params.load_rejection_radius self.load_halton_bases = params.load_halton_bases if params.observables_list: for obs in params.observables_list: self.observables_list.append(obs) if hasattr(params, "np_per_side"): self.np_per_side = params.np_per_side if hasattr(params, "initial_lattice_config"): self.lattice_type = params.initial_lattice_config if hasattr(params, "load_gauss_sigma"): self.load_gauss_sigma = params.load_gauss_sigma.copy() self.species_names = params.species_names if hasattr(params, "rdf_nbins"): self.rdf_nbins = params.rdf_nbins else: # nbins = 5% of the number of particles. self.rdf_nbins = int64(0.05 * params.total_num_ptcls) params.rdf_nbins = self.rdf_nbins
[docs] def dump_arrays(self, filename, data_to_save): """ Save particles' data to binary file (uncompressed npz) for future restart. Parameters ---------- filename : str Name of the file. data_to_save : list Name of the arrays to save to file. """ kwargs = {key: self.__dict__[key] for key in data_to_save} savez(f"{filename}", **kwargs)
[docs] def dump_pva_h5(self, filename, data_to_save): """ Save particles' data to HDF5 file. Parameters ---------- filename : str Name of the file. data_to_save : list Name of the arrays to save to file. """ ## DEV Note: This method seems to be slower than npz. with h5File(f"{filename}.h5", "w") as hf: for key in data_to_save: hf.create_dataset(key, data=self.__dict__[key])
[docs] def gaussian(self, mean, sigma, size): """ Initialize particles' velocities according to a normalized Maxwell-Boltzmann (Normal) distribution. It calls :meth:`numpy.random.Generator.normal` Parameters ---------- size : tuple Size of the array to initialize. (no. of particles, dimensions). mean : float Center of the normal distribution. sigma : float Scale of the normal distribution. Returns ------- : numpy.ndarray Particles property distributed according to a Normal probability density function. """ return self.rnd_gen.normal(mean, sigma, size)
[docs] def halton_reject(self, bases, r_reject): """ Place particles according to a Halton sequence from 0 to LP (the initial particle box length) and uses a rejection radius to avoid placing particles to close to each other. Parameters ---------- bases : numpy.ndarray Array of 3 ints each of which is a base for the Halton sequence. Defualt: bases = array([2,3,5]) r_reject : float Value of rejection radius. """ # Get bases b1, b2, b3 = bases # Allocate space and store first value from Halton x = zeros(self.total_num_ptcls) y = zeros(self.total_num_ptcls) z = zeros(self.total_num_ptcls) # Initialize particle counter and Halton counter i = 1 k = 1 # Loop over all particles while i < self.total_num_ptcls: # Increment particle counter n = k m = k p = k # Determine x coordinate f1 = 1 r1 = 0 while n > 0: f1 /= b1 r1 += f1 * (n % int(b1)) n = floor(n / b1) x_new = self.pbox_lengths[0] * r1 # new x value # Determine y coordinate f2 = 1 r2 = 0 while m > 0: f2 /= b2 r2 += f2 * (m % int(b2)) m = floor(m / b2) y_new = self.pbox_lengths[1] * r2 # new y value # Determine z coordinate f3 = 1 r3 = 0 while p > 0: f3 /= b3 r3 += f3 * (p % int(b3)) p = floor(p / b3) z_new = self.pbox_lengths[2] * r3 # new z value # Check if particle was place too close relative to all other current particles for j in range(len(x)): # Flag for if particle is outside of cutoff radius (1 -> not inside rejection radius) flag = 1 # Compute distance b/t particles for initial placement x_diff = x_new - x[j] y_diff = y_new - y[j] z_diff = z_new - z[j] # Periodic condition applied for minimum image if x_diff < -self.pbox_lengths[0] / 2: x_diff = x_diff + self.pbox_lengths[0] if x_diff > self.pbox_lengths[0] / 2: x_diff = x_diff - self.pbox_lengths[0] if y_diff < -self.pbox_lengths[1] / 2: y_diff = y_diff + self.pbox_lengths[1] if y_diff > self.pbox_lengths[1] / 2: y_diff = y_diff - self.pbox_lengths[1] if z_diff < -self.pbox_lengths[2] / 2: z_diff = z_diff + self.pbox_lengths[2] if z_diff > self.pbox_lengths[2] / 2: z_diff = z_diff - self.pbox_lengths[2] # Compute distance r = sqrt(x_diff**2 + y_diff**2 + z_diff**2) # Check if new particle is below rejection radius. If not, break out and try again if r <= r_reject: k += 1 # Increment Halton counter flag = 0 # New position not added (0 -> no longer outside reject r) break # If flag true add new position if flag == 1: # Add new positions to arrays x[i] = x_new y[i] = y_new z[i] = z_new k += 1 # Increment Halton counter i += 1 # Increment particle number self.pos[:, 0] = x + self.box_lengths[0] / 2 - self.pbox_lengths[0] / 2 self.pos[:, 1] = y + self.box_lengths[1] / 2 - self.pbox_lengths[1] / 2 self.pos[:, 2] = z + self.box_lengths[2] / 2 - self.pbox_lengths[2] / 2
[docs] def initialize_accelerations(self): """ Initialize particles' accelerations. """ self.acc = zeros((self.total_num_ptcls, 3))
[docs] def initialize_arrays(self): """Initialize the needed arrays""" self.names = empty(self.total_num_ptcls, dtype=self.species_names.dtype) self.id = zeros(self.total_num_ptcls, dtype=int64) self.pos = zeros((self.total_num_ptcls, 3)) self.vel = zeros((self.total_num_ptcls, 3)) self.acc = zeros((self.total_num_ptcls, 3)) self.pbc_cntr = zeros((self.total_num_ptcls, 3)) self.masses = zeros(self.total_num_ptcls) # mass of each particle self.charges = zeros(self.total_num_ptcls) # charge of each particle self.cyclotron_frequencies = zeros(self.total_num_ptcls) self.kinetic_energy = zeros(self.total_num_ptcls) self.potential_energy = zeros(self.total_num_ptcls) self.temperature = zeros(self.total_num_ptcls) self.species_initial_velocity = zeros((self.num_species, 3)) self.species_thermal_velocity = zeros((self.num_species, 3)) self.species_kinetic_energy = zeros(self.num_species) self.species_potential_energy = zeros(self.num_species) self.species_temperatures = zeros(self.num_species) self.species_thermostat_temperatures = zeros(self.num_species) # No. of independent rdf self.no_grs = int64(self.num_species * (self.num_species + 1) / 2) self.rdf_hist = zeros((self.rdf_nbins, self.num_species, self.num_species)) if "Momentum" in self.observables_list: self.momentum = zeros((self.total_num_ptcls, 3)) self.species_momentum = zeros((self.num_species, 3)) if "Electric Current" in self.observables_list: self.electric_current = zeros((self.total_num_ptcls, 3)) self.species_electric_current = zeros((self.num_species, 3)) if "Pressure Tensor" in self.observables_list: self.pressure = zeros(self.total_num_ptcls) self.species_pressure = zeros(self.species_num) self.species_pressure_kin_tensor = zeros((3, 3, self.num_species)) self.species_pressure_pot_tensor = zeros((3, 3, self.num_species)) self.species_pressure_tensor = zeros((3, 3, self.num_species)) self.virial_species_tensor = zeros((3, 3, self.num_species, self.num_species)) if "Enthalpy": self.enthalpy = zeros(self.total_num_ptcls) self.species_enthalpy = zeros(self.num_species) if "Heat Flux" in self.observables_list: self.heat_flux_species_tensor = zeros((3, self.num_species, self.num_species)) self.species_heat_flux = zeros((self.num_species, 3))
[docs] def initialize_positions(self): """ Initialize particles' positions based on the load method. """ # position distribution. if self.load_method == "lattice": self.lattice(self.load_perturb) elif self.load_method == "random_reject": # check if not hasattr(self, "load_rejection_radius"): raise AttributeError("Rejection radius not defined. " "Please define Parameters.load_rejection_radius.") self.random_reject(self.load_rejection_radius) elif self.load_method == "halton_reject": # check if not hasattr(self, "load_rejection_radius"): raise AttributeError("Rejection radius not defined. " "Please define Parameters.load_rejection_radius.") self.halton_reject(self.load_halton_bases, self.load_rejection_radius) elif self.load_method in ["uniform", "random_no_reject"]: self.pos = self.uniform_no_reject( 0.5 * self.box_lengths - 0.5 * self.pbox_lengths, 0.5 * self.box_lengths + 0.5 * self.pbox_lengths ) elif self.load_method == "gaussian": sp_start = 0 sp_end = 0 for sp, sp_num in enumerate(self.species_num): sp_end += sp_num self.pos[sp_start:sp_end, :] = self.gaussian( self.box_lengths[0] / 2.0, self.load_gauss_sigma[sp], (sp_num, 3) ) sp_start += sp_num else: raise AttributeError("Incorrect particle placement scheme specified.")
[docs] def initialize_velocities(self, species): """ Initialize particles' velocities based on the species input values. The velocities can be initialized from a Maxwell-Boltzmann distribution or from a monochromatic distribution. Parameters ---------- species: list List of :class:`sarkas.core.Species`. """ species_end = 0 species_start = 0 for ic, sp in enumerate(species): if sp.name != "electron_background": species_end += sp.num self.species_initial_velocity[ic, :] = sp.initial_velocity self.species_thermostat_temperatures[ic] = sp.temperature if sp.initial_velocity_distribution == "boltzmann": if isinstance(sp.temperature, (int, float)): sp_temperature = zeros(3) for d in range(self.dimensions): sp_temperature[d] = sp.temperature self.species_thermal_velocity[ic] = sqrt(self.kB * sp_temperature / sp.mass) # Note gaussian(0.0, 0.0, N) = array of zeros self.vel[species_start:species_end, :] = self.gaussian( sp.initial_velocity, self.species_thermal_velocity[ic], (sp.num, 3) ) elif sp.initial_velocity_distribution == "monochromatic": vrms = sqrt(self.dimensions * self.kB * sp.temperature / sp.mass) self.vel[species_start:species_end, :] = vrms * self.random_unit_vectors(sp.num, self.dimensions) species_start += sp.num
[docs] def kinetic_temperature(self): """Calculate the kinetic energy and temperature of each species. Returns ------- K : numpy.ndarray Kinetic energy of each species. Shape=(:attr:`num_species`). T : numpy.ndarray Temperature of each species. Shape=(:attr:`num_species`). Raises ------ : DeprecationWarning """ warn( "Deprecated feature. It will be removed in a future release. \n" "Use particles.calculate_species_kinetic_temperature()", category=DeprecationWarning, ) self.calculate_species_kinetic_temperature() return self.species_kinetic_energy, self.species_temperatures
[docs] def lattice(self, perturb: float = 0.05): """ Place particles in a simple cubic lattice with a slight perturbation ranging from 0 to 0.5 times the lattice spacing. Parameters ---------- perturb : float Value of perturbation, p, such that 0 <= p <= 0.5. Default = 0.05 """ # Check if perturbation is below maximum allowed. If not, default to maximum perturbation. if perturb > 0.5: warn("Random perturbation must not exceed 0.5. Setting perturb = 0.5", category=ParticlesWarning) perturb = 0.5 if self.lattice_type == "simple_cubic": # Determining number of particles per side of simple cubic lattice part_per_side = self.total_num_ptcls ** (1.0 / 3.0) # Number of particles per side of cubic lattice # Check if total number of particles is a perfect cube, if not, place more than the requested amount if round(part_per_side) ** 3 != self.total_num_ptcls: part_per_side = rint(self.total_num_ptcls ** (1.0 / 3.0)) raise ParticlesError( f"N = {self.total_num_ptcls} cannot be placed in a simple cubic lattice. " f"Use {int(part_per_side ** 3)} particles instead." ) dx_lattice = self.pbox_lengths[0] / (self.total_num_ptcls ** (1.0 / 3.0)) # Lattice spacing dy_lattice = self.pbox_lengths[1] / (self.total_num_ptcls ** (1.0 / 3.0)) # Lattice spacing dz_lattice = self.pbox_lengths[2] / (self.total_num_ptcls ** (1.0 / 3.0)) # Lattice spacing # Create x, y, and z position arrays x = arange(0, self.pbox_lengths[0], dx_lattice) + 0.5 * dx_lattice y = arange(0, self.pbox_lengths[1], dy_lattice) + 0.5 * dy_lattice z = arange(0, self.pbox_lengths[2], dz_lattice) + 0.5 * dz_lattice # Create a lattice with appropriate x, y, and z values based on arange X, Y, Z = meshgrid(x, y, z) # Perturb lattice X += self.rnd_gen.uniform(-0.5, 0.5, X.shape) * perturb * dx_lattice Y += self.rnd_gen.uniform(-0.5, 0.5, Y.shape) * perturb * dy_lattice Z += self.rnd_gen.uniform(-0.5, 0.5, Z.shape) * perturb * dz_lattice # Flatten the meshgrid values for plotting and computation self.pos[:, 0] = X.ravel() + self.box_lengths[0] / 2 - self.pbox_lengths[0] / 2 self.pos[:, 1] = Y.ravel() + self.box_lengths[1] / 2 - self.pbox_lengths[1] / 2 self.pos[:, 2] = Z.ravel() + self.box_lengths[2] / 2 - self.pbox_lengths[2] / 2 elif self.lattice_type == "bcc": # Determining number of particles per side of simple cubic lattice part_per_side = int(0.5 * self.total_num_ptcls) ** ( 1.0 / 3.0 ) # Number of particles per side of cubic lattice # Check if total number of particles is a perfect cube, if not, place more than the requested amount if round(part_per_side) ** 3 != int(0.5 * self.total_num_ptcls): part_per_side = rint((0.5 * self.total_num_ptcls) ** (1.0 / 3.0)) raise ParticlesError( f"N = {self.total_num_ptcls} cannot be placed in a bcc lattice. " f"Use {int(2.0*part_per_side ** 3)} particles instead." ) dx_lattice = self.pbox_lengths[0] / (0.5 * self.total_num_ptcls) ** (1.0 / 3.0) # Lattice spacing dy_lattice = self.pbox_lengths[1] / (0.5 * self.total_num_ptcls) ** (1.0 / 3.0) # Lattice spacing dz_lattice = self.pbox_lengths[2] / (0.5 * self.total_num_ptcls) ** (1.0 / 3.0) # Lattice spacing # Create x, y, and z position arrays x = arange(0, self.pbox_lengths[0], dx_lattice) + 0.5 * dx_lattice y = arange(0, self.pbox_lengths[1], dy_lattice) + 0.5 * dy_lattice z = arange(0, self.pbox_lengths[2], dz_lattice) + 0.5 * dz_lattice # Create a lattice with appropriate x, y, and z values based on arange X, Y, Z = meshgrid(x, y, z) # Perturb lattice X += self.rnd_gen.uniform(-0.5, 0.5, X.shape) * perturb * dx_lattice Y += self.rnd_gen.uniform(-0.5, 0.5, Y.shape) * perturb * dy_lattice Z += self.rnd_gen.uniform(-0.5, 0.5, Z.shape) * perturb * dz_lattice half_Np = int(self.total_num_ptcls / 2) # Flatten the meshgrid values for plotting and computation self.pos[:half_Np, 0] = X.ravel() + self.box_lengths[0] / 2 - self.pbox_lengths[0] / 2 self.pos[:half_Np, 1] = Y.ravel() + self.box_lengths[1] / 2 - self.pbox_lengths[1] / 2 self.pos[:half_Np, 2] = Z.ravel() + self.box_lengths[2] / 2 - self.pbox_lengths[2] / 2 self.pos[half_Np:, 0] = X.ravel() + 0.5 * dx_lattice + self.box_lengths[0] / 2 - self.pbox_lengths[0] / 2 self.pos[half_Np:, 1] = Y.ravel() + 0.5 * dy_lattice + self.box_lengths[1] / 2 - self.pbox_lengths[1] / 2 self.pos[half_Np:, 2] = Z.ravel() + 0.5 * dz_lattice + self.box_lengths[2] / 2 - self.pbox_lengths[2] / 2 elif self.lattice_type in ["square", "tetragonal_2D"]: # Determining number of particles per side of simple cubic lattice part_per_side = rint(sqrt(self.total_num_ptcls)) # Number of particles per side of a square lattice # Check if total number of particles is a perfect cube, if not, place more than the requested amount if part_per_side**2 != self.total_num_ptcls: raise ParticlesError( f"N = {self.total_num_ptcls} cannot be placed in a square lattice. " f"Use {int(part_per_side ** 2)} particles instead." ) dx_lattice = self.pbox_lengths[0] / sqrt(self.total_num_ptcls) # Lattice spacing dy_lattice = self.pbox_lengths[1] / sqrt(self.total_num_ptcls) # Lattice spacing # Create x, y, and z position arrays x = arange(0, self.pbox_lengths[0], dx_lattice) + 0.5 * dx_lattice y = arange(0, self.pbox_lengths[1], dy_lattice) + 0.5 * dy_lattice # Create a lattice with appropriate x, y, and z values based on arange X, Y = meshgrid(x, y) # Perturb lattice X += self.rnd_gen.uniform(-0.5, 0.5, X.shape) * perturb * dx_lattice Y += self.rnd_gen.uniform(-0.5, 0.5, Y.shape) * perturb * dy_lattice # Flatten the meshgrid values for plotting and computation self.pos[:, 0] = X.ravel() + self.box_lengths[0] / 2 - self.pbox_lengths[0] / 2 self.pos[:, 1] = Y.ravel() + self.box_lengths[1] / 2 - self.pbox_lengths[1] / 2 self.pos[:, 2] = 0.0 elif self.lattice_type in ["hexagonal", "triangular"]: # Determining number of particles per side of simple cubic lattice part_per_side = round(sqrt(self.total_num_ptcls)) # Number of particles per side of cubic lattice # Check if total number of particles is a perfect cube, if not, place more than the requested amount if self.np_per_side[:2].prod() != part_per_side * (part_per_side + 1): raise ParticlesError( f"N = {self.total_num_ptcls} cannot be placed in an hexagonal lattice. " f"Use Nx = {part_per_side} and Ny = {part_per_side + 1} particles instead." ) dx_lattice = self.pbox_lengths[0] / (self.np_per_side[0]) # Lattice spacing dy_lattice = self.pbox_lengths[1] / (self.np_per_side[1]) # Lattice spacing if self.np_per_side[0] > self.np_per_side[1]: # Create x, y, and z position arrays x = arange(0, self.pbox_lengths[0], dx_lattice) y = arange(0, self.pbox_lengths[1], dy_lattice) + 0.5 * dy_lattice # Create a lattice with appropriate x, y, and z values based on arange X, Y = meshgrid(x, y) # Shift the Y axis of every other row of particles X[:, ::2] += 0.5 * dx_lattice else: # Create x, y, and z position arrays x = arange(0, self.pbox_lengths[0], dx_lattice) + 0.5 * dx_lattice y = arange(0, self.pbox_lengths[1], dy_lattice) # Create a lattice with appropriate x, y, and z values based on arange X, Y = meshgrid(x, y) # Shift the Y axis of every other row of particles Y[:, ::2] += 0.5 * dy_lattice # Perturb lattice X += self.rnd_gen.uniform(-0.5, 0.5, X.shape) * perturb * dx_lattice Y += self.rnd_gen.uniform(-0.5, 0.5, Y.shape) * perturb * dy_lattice # Flatten the meshgrid values for plotting and computation self.pos[:, 0] = X.ravel() + self.box_lengths[0] / 2 - self.pbox_lengths[0] / 2 self.pos[:, 1] = Y.ravel() + self.box_lengths[1] / 2 - self.pbox_lengths[1] / 2 self.pos[:, 2] = 0.0
[docs] def load(self): """ Initialize particles' positions and velocities. Positions are initialized based on the load method while velocities are chosen from a Maxwell-Boltzmann distribution. """ warn( "Deprecated feature. It will be removed in a future release. \n" "Use parameters.calc_electron_properties(species). You need to pass the species list.", category=DeprecationWarning, ) self.initialize_positions()
[docs] def load_from_file(self, f_name): """ Load particles' data from a specific file. Parameters ---------- f_name : str Filename Raises ------ : DeprecationWarning """ warn( "Deprecated feature. This is a legacy feature that will be removed in a future release unless requests to keep it are made.", category=DeprecationWarning, ) pv_data = loadtxt(f_name) if not (pv_data.shape[0] == self.total_num_ptcls): msg = ( f"Number of particles is not same between input file and initial p & v data file. \n " f"Input file: N = {self.total_num_ptcls}, load data: N = {pv_data.shape[0]}" ) raise ParticlesError(msg) self.pos[:, 0] = pv_data[:, 0] self.pos[:, 1] = pv_data[:, 1] self.pos[:, 2] = pv_data[:, 2] self.vel[:, 0] = pv_data[:, 3] self.vel[:, 1] = pv_data[:, 4] self.vel[:, 2] = pv_data[:, 5]
[docs] def load_from_npz(self, file_name): """ Load particles' data from an .npz data file. Parameters ---------- file_name : str Path to file. """ # file_name = join(self.eq_dump_dir, "checkpoint_" + str(it) + ".npz") data = np_load(file_name, allow_pickle=True) if not data["pos"].shape[0] == self.total_num_ptcls: msg = ( f"Number of particles is not same between input file and particles data file. \n " f"Input file: N = {self.total_num_ptcls}, particles data file: N = {data['pos'].shape[0]}" ) raise ParticlesError(msg) self.id = data["id"] self.names = data["names"] self.pos = data["pos"] self.vel = data["vel"] self.acc = data["acc"]
# self.pbc_cntr = data["cntr"] # self.rdf_hist = data["rdf_hist"] # self.heat_flux_species_tensor = data["energy current"] # self.virial_species_tensor = data["virial_species_tensor"]
[docs] def load_from_restart(self, phase, it): """ Initialize particles' data from a checkpoint of a previous run. Raises ------ : DeprecationWarning """ warn( "Deprecated feature. It will be removed in a future release. \n" "Use load_from_checkpoint. ", category=DeprecationWarning, ) self.load_from_checkpoint(phase, it)
[docs] def load_from_checkpoint(self, phase, it): """ Load particles' data from a checkpoint of a previous run Parameters ---------- it : int Timestep. phase: str Restart phase. """ if phase == "equilibration": file_name = join(self.eq_dump_dir, "checkpoint_" + str(it) + ".npz") elif phase == "production": file_name = join(self.prod_dump_dir, "checkpoint_" + str(it) + ".npz") elif phase == "magnetization": file_name = join(self.mag_dump_dir, "checkpoint_" + str(it) + ".npz") data = np_load(file_name, allow_pickle=True) self.pos = data["pos"] self.vel = data["vel"] self.acc = data["acc"] self.rdf_hist = data["rdf_hist"] if "cntr" in data.files: self.pbc_cntr = data["cntr"]
[docs] def calculate_electric_current(self): """Calculate the electric current of each particle and store it into :attr:`electric_current`.""" self.electric_current = self.charges * self.vel
[docs] def calculate_kinetic_energy(self): """Calculate the kinetic energy of each particle. Return ------ kin : numpy.ndarray Total kinetic energy. Shape = (:attr:`total_num_ptcls`) """ self.kinetic_energy = 0.5 * self.masses * (self.vel * self.vel).sum(axis=-1)
[docs] def calculate_observables(self): """Calculate the observables in :attr:`observables_list`.""" for i in self.observables_list: self.species_observables_calculator_dict[i]()
# def calculate_pressure(self): # """Calculate the pressure of each particle. # Return # ------ # pressure : numpy.ndarray # Pressure of each particle. Shape = (:attr:`total_num_ptcls`) # Notes # ----- # It does not calculate the tensor since that could lead to very large arrays. # """ # self.pressure = 2.0 * self.kinetic_energy + self.virial_species_tensor[0, 0] + self.virial_species_tensor[1, 1] + self.virial_species_tensor[2, 2] # self.pressure /= self.box_volume
[docs] def calculate_species_electric_current(self): """Calculate the energy current of each species from :attr:`heat_flux_species_tensor` and stores it into :attr:`species_heat_flux`.\n Note that :attr:`heat_flux_species_tensor` is calculated in the force loop if requested.""" self.species_electric_current = self.species_charges * vector_species_loop(self.vel, self.species_num)
[docs] def calculate_species_heat_flux(self): """Calculate the energy current of each species from :attr:`heat_flux_species_tensor` and stores it into :attr:`species_heat_flux`.\n Note that :attr:`heat_flux_species_tensor` is calculated in the force loop if requested.""" self.species_heat_flux = vector_cross_species_loop(self.heat_flux_species_tensor, self.species_num)
[docs] def calculate_species_enthalpy(self): energy = scalar_species_loop(self.kinetic_energy + self.potential_energy, self.species_num) self.enthalpy = energy + self.species_pressure * self.box_volume self.species_enthalpy = scalar_species_loop(self.enthalpy, self.species_num)
[docs] def calculate_species_kinetic_temperature(self): """ Calculate the kinetic energy and temperature of each species. Returns ------- K : numpy.ndarray Kinetic energy of each species. Shape=(:attr:`num_species`). T : numpy.ndarray Temperature of each species. Shape=(:attr:`num_species`). """ # K = zeros(self.num_species) # T = zeros(self.num_species) const = 2.0 / (self.kB * self.species_num * self.dimensions) # kinetic = 0.5 * self.masses * (self.vel * self.vel).sum(axis = -1) self.calculate_kinetic_energy() self.species_kinetic_energy = scalar_species_loop(self.kinetic_energy, self.species_num) self.species_temperatures = const * self.species_kinetic_energy
# species_start = 0 # species_end = 0 # for i, num in enumerate(self.species_num): # species_end += num # K[i] = kinetic[species_start:species_end].sum() # T[i] = const[i] * K[i] # species_start += num # return K, T
[docs] def calculate_species_momentum(self): velocity = vector_species_loop(self.vel.transpose(), self.species_num) self.species_momentum = self.species_masses * velocity
[docs] def calculate_species_potential_energy(self): """Calculate the potential energy of each species from :attr:`potential_energy`, calculated in the force loop, and stores it into :attr:`species_potential_energy`.""" self.species_potential_energy = scalar_species_loop(self.potential_energy, self.species_num)
# sp_start = 0 # sp_end = 0 # sp_pot = zeros(self.num_species) # for sp, sp_num in enumerate(self.species_num): # sp_end += sp_num # sp_pot[sp] = self.potential_energy[sp_start:sp_end].sum() # sp_start += sp_num # return sp_pot
[docs] def calculate_species_pressure_tensor(self): """Calculate the pressure, the kinetic part of the pressure tensor, the potential part of the kinetic tensor of each species and store them into :attr:`species_pressure`, :attr:`species_pressure_kin_tensor`, :attr:`species_pressure_pot_tensor`.""" self.species_pressure, self.species_pressure_kin_tensor, self.species_pressure_pot_tensor = calc_pressure_tensor( self.vel, self.virial_species_tensor, self.species_masses, self.species_num, self.box_volume, self.dimensions )
# def calculate_thermodynamic_quantities(self): # for i in self.observables_list: # self.species_observables_calculator_dict[i]()
[docs] def calculate_thermodynamic_quantities_partial(self): """Calculate the main thermodynamics quantities from particles data and return a dictionary with their values.""" self.calculate_total_kinetic_energy() self.calculate_total_potential_energy()
[docs] def calculate_thermodynamic_quantities_full(self): """Calculate thermodynamics quantities from particles data.""" self.calculate_total_kinetic_energy() self.calculate_total_potential_energy() self.calculate_total_pressure() self.calculate_total_enthalpy()
[docs] def calculate_total_electric_current(self): """Calculate the total electric current of the system, by summing the electric current of each species and store it into :attr:`total_electric_current`.""" self.calculate_species_electric_current() self.total_electric_current = self.species_electric_current.sum()
[docs] def calculate_total_enthalpy(self): """Calculate the total enthalpy of the system, by summing the enthalpy of each species and store it into :attr:`total_enthalpy`.""" self.calculate_species_enthalpy() self.total_enthalpy = self.species_enthalpy.sum()
[docs] def calculate_total_kinetic_energy(self): """Calculate the total kinetic energy by summing the :attr:`kinetic_energy` array and store it into :attr:`total_kinetic_energy`.""" self.calculate_species_kinetic_temperature() self.total_kinetic_energy = self.species_kinetic_energy.sum()
[docs] def calculate_total_momentum(self): self.calculate_species_momentum() self.total_momentum = self.species_momentum.sum()
[docs] def calculate_total_potential_energy(self): """Calculate the total potential energy by summing the :attr:`potential_energy` array. The total potential energy is store in :attr:`total_potential_energy`.""" self.calculate_species_potential_energy() self.total_potential_energy = self.species_potential_energy.sum()
[docs] def calculate_total_pressure(self): self.calculate_species_pressure_tensor() self.total_pressure = self.species_pressure.sum()
[docs] def make_thermodynamics_dictionary_partial(self): """ Put the main thermodynamic quantities into a dictionary. This is used for saving data while running. Return ------ data : dict Thermodynamics data. In case of multiple species, it returns thermodynamics quantities per species. keys = [`Total Energy`, `Total Kinetic Energy`, `Total Potential Energy`, `Total Temperature`] """ # Save Energy data data = { "Total Energy": self.species_kinetic_energy.sum() + self.species_potential_energy.sum(), "Total Kinetic Energy": self.species_kinetic_energy.sum(), "Total Potential Energy": self.species_potential_energy.sum(), "Total Temperature": self.species_num.transpose() @ self.species_temperatures / self.total_num_ptcls, } if self.num_species > 1: for sp, (temp, kin, pot) in enumerate( zip(self.species_temperatures, self.species_kinetic_energy, self.species_potential_energy) ): data[f"{self.species_names[sp]} Kinetic Energy"] = kin data[f"{self.species_names[sp]} Potential Energy"] = pot data[f"{self.species_names[sp]} Temperature"] = temp return data
[docs] def make_thermodynamics_dictionary_full(self): """ Put all thermodynamic quantities into a dictionary. This is used for saving data while running. Return ------ data : dict Thermodynamics data. In case of multiple species, it returns thermodynamics quantities per species. keys = [`Total Energy`, `Total Kinetic Energy`, `Total Potential Energy`, `Total Temperature`, `Total Pressure`, `Ideal Pressure`, `Excess Pressure, `Total Enthalpy`] """ # Save Energy data data = { "Total Energy": self.species_kinetic_energy.sum() + self.species_potential_energy.sum(), "Total Kinetic Energy": self.species_kinetic_energy.sum(), "Total Potential Energy": self.species_potential_energy.sum(), "Total Temperature": self.species_num.transpose() @ self.species_temperatures / self.total_num_ptcls, "Total Pressure": self.species_pressure.sum(), "Ideal Pressure": self.species_pressure_kin_tensor.sum(axis=-1).trace() / self.dimensions, "Excess Pressure": self.species_pressure_pot_tensor.sum(axis=-1).trace() / self.dimensions, "Total Enthalpy": self.species_enthalpy.sum(), } if self.num_species > 1: for sp, (temp, kin, pot) in enumerate( zip(self.species_temperatures, self.species_kinetic_energy, self.species_potential_energy) ): data[f"{self.species_names[sp]} Kinetic Energy"] = kin data[f"{self.species_names[sp]} Potential Energy"] = pot data[f"{self.species_names[sp]} Temperature"] = temp data[f"{self.species_names[sp]} Total Pressure"] = self.species_pressure[sp] data[f"{self.species_names[sp]} Ideal Pressure"] = ( self.species_pressure_kin_tensor[:, :, sp].trace() / self.dimensions ) data[f"{self.species_names[sp]} Excess Pressure"] = ( self.species_pressure_pot_tensor[:, :, sp].trace() / self.dimensions ) data[f"{self.species_names[sp]} Enthalpy"] = self.species_enthalpy[sp] return data
[docs] def random_reject(self, r_reject): """ Place particles by sampling a uniform distribution from 0 to LP (the initial particle box length) and uses a rejection radius to avoid placing particles to close to each other. Parameters ---------- r_reject : float Value of rejection radius. """ # Initialize Arrays x = zeros(self.total_num_ptcls) y = zeros(self.total_num_ptcls) z = zeros(self.total_num_ptcls) # Set first x, y, and z positions x_new = self.rnd_gen.uniform(0, self.pbox_lengths[0]) y_new = self.rnd_gen.uniform(0, self.pbox_lengths[1]) z_new = self.rnd_gen.uniform(0, self.pbox_lengths[2]) # Append to arrays x[0] = x_new y[0] = y_new z[0] = z_new # Particle counter i = 1 cntr_reject = 0 cntr_total = 0 # Loop to place particles while i < self.total_num_ptcls: # Set x, y, and z positions x_new = self.rnd_gen.uniform(0.0, self.pbox_lengths[0]) y_new = self.rnd_gen.uniform(0.0, self.pbox_lengths[1]) z_new = self.rnd_gen.uniform(0.0, self.pbox_lengths[2]) # Check if particle was place too close relative to all other current particles for j in range(len(x)): # Flag for if particle is outside of cutoff radius (True -> not inside rejection radius) flag = 1 # Compute distance b/t particles for initial placement x_diff = x_new - x[j] y_diff = y_new - y[j] z_diff = z_new - z[j] # periodic condition applied for minimum image if x_diff < -self.pbox_lengths[0] / 2: x_diff += self.pbox_lengths[0] if x_diff > self.pbox_lengths[0] / 2: x_diff -= self.pbox_lengths[0] if y_diff < -self.pbox_lengths[1] / 2: y_diff += self.pbox_lengths[1] if y_diff > self.pbox_lengths[1] / 2: y_diff -= self.pbox_lengths[1] if z_diff < -self.pbox_lengths[2] / 2: z_diff += self.pbox_lengths[2] if z_diff > self.pbox_lengths[2] / 2: z_diff -= self.pbox_lengths[2] # Compute distance r = sqrt(x_diff**2 + y_diff**2 + z_diff**2) # Check if new particle is below rejection radius. If not, break out and try again if r <= r_reject: flag = 0 # new position not added (False -> no longer outside reject r) cntr_reject += 1 cntr_total += 1 break # If flag true add new position if flag == 1: x[i] = x_new y[i] = y_new z[i] = z_new # Increment particle number i += 1 cntr_total += 1 self.pos[:, 0] = x + self.box_lengths[0] / 2 - self.pbox_lengths[0] / 2 self.pos[:, 1] = y + self.box_lengths[1] / 2 - self.pbox_lengths[1] / 2 self.pos[:, 2] = z + self.box_lengths[2] / 2 - self.pbox_lengths[2] / 2
[docs] def random_unit_vectors(self, num_ptcls, dimensions): """ Initialize random unit vectors for particles' velocities (e.g. for monochromatic energies but random velocities). It calls :meth:`numpy.random.Generator.normal`. Parameters ---------- num_ptcls : int Number of particles to initialize. dimensions : int Number of non-zero dimensions. Returns ------- uvec : numpy.ndarray Random unit vectors of specified dimensions for all particles """ uvec = self.rnd_gen.normal(size=(num_ptcls, dimensions)) # Broadcasting uvec /= norm(uvec, axis=1).reshape(num_ptcls, 1) return uvec
[docs] def remove_drift(self): """ Enforce conservation of total linear momentum. Updates particles velocities """ remove_drift_nb(self.vel, self.species_num)
[docs] def setup(self, params, species): """ Initialize class' attributes Parameters ---------- params: :class:`sarkas.core.Parameters` Simulation's parameters. species : list List of :class:`sarkas.plasma.Species` objects. """ if hasattr(params, "rand_seed"): self.rand_seed = params.rand_seed self.rnd_gen = Generator(PCG64(params.rand_seed)) else: self.rnd_gen = Generator(PCG64()) self.copy_params(params) self.initialize_arrays() self.update_attributes(species) # Particles Position Initialization if self.load_method in [ "equilibration_restart", "eq_restart", "magnetization_restart", "mag_restart", "production_restart", "prod_restart", ]: # checks if self.restart_step is None: raise AttributeError("Restart step not defined." "Please define Parameters.restart_step.") if type(self.restart_step) is not int: self.restart_step = int(self.restart_step) if self.load_method[:2] == "eq": self.load_from_restart("equilibration", self.restart_step) elif self.load_method[:2] == "pr": self.load_from_restart("production", self.restart_step) elif self.load_method[:2] == "ma": self.load_from_restart("magnetization", self.restart_step) elif self.load_method == "file": # check if not hasattr(self, "particles_input_file"): raise AttributeError("Input file not defined. Please define Parameters.particles_input_file.") if self.particles_input_file[-3:] == "npz": self.load_from_npz(self.particles_input_file) else: self.load_from_file(self.particles_input_file) else: self.initialize_positions() self.initialize_velocities(species) self.initialize_accelerations() if len(self.observables_list) > 2: self.calculate_thermodynamic_quantities = self.calculate_thermodynamic_quantities_full self.make_thermodynamics_dictionary = self.make_thermodynamics_dictionary_full else: self.calculate_thermodynamic_quantities = self.calculate_thermodynamic_quantities_partial self.make_thermodynamics_dictionary = self.make_thermodynamics_dictionary_partial
[docs] def uniform_no_reject(self, mins, maxs): """ Randomly distribute particles along each direction. Parameters ---------- mins : float Minimum value of the range of a uniform distribution. maxs : float Maximum value of the range of a uniform distribution. Returns ------- : numpy.ndarray Particles' property, e.g. pos, vel. Shape = (:attr:`total_num_ptcls`, 3). """ return self.rnd_gen.uniform(mins, maxs, (self.total_num_ptcls, 3))
[docs] def update_attributes(self, species): """ Assign particles attributes. Parameters ---------- species : list List of :class:`sarkas.plasma.Species` objects. """ species_end = 0 species_start = 0 for ic, sp in enumerate(species): if sp.name != "electron_background": species_end += sp.num self.names[species_start:species_end] = sp.name self.masses[species_start:species_end] = sp.mass if hasattr(sp, "charge"): self.charges[species_start:species_end] = sp.charge else: self.charges[species_start:species_end] = 1.0 if hasattr(sp, "cyclotron_frequency"): self.cyclotron_frequencies[species_start:species_end] = sp.cyclotron_frequency self.id[species_start:species_end] = ic species_start += sp.num
[docs]@njit def calc_pressure_tensor(vel, virial_species_tensor, species_masses, species_num, box_volume, dimensions): """ Calculate the pressure tensor of each species. Parameters ---------- vel : numpy.ndarray Particles' velocities. virial_species_tensor : numpy.ndarray Virial tensor of each particle. Shape= (3, 3, :attr:`total_num_ptcls`). Note that the size of the first two axis is 3 even if the system is 2D. species_masses : numpy.ndarray Mass of each species. Shape = (:attr:`num_species`) species_np : numpy.ndarray Number of particles of each species. box_volume : float Volume of simulation's box. dimensions : int Number of dimensions. Returns ------- pressure : float Scalar Pressure i.e. trace of the pressure tensor pressure_kin : numpy.ndarray Kinetic part of the Pressure tensor. Shape(:attr:`dimensions`,:attr:`dimensions`, :attr:`num_species`) pressure_pot : numpy.ndarray Potential energy part of the Pressure tensor. Shape(:attr:`dimensions`,:attr:`dimensions`, `num_species`) """ # sp_start = 0 # sp_end = 0 # Rescale vel of each particle by their individual mass pressure = zeros(species_num.shape[0]) pressure_kin = zeros((3, 3, species_num.shape[0])) pressure_pot = zeros((3, 3, species_num.shape[0])) temp_kin_tensor = zeros((3, 3, vel.shape[0])) # TODO: There must be a faster way to do this tensor product for ip in range(vel.shape[0]): temp_kin_tensor[:, :, ip] = outer(vel[ip, :], vel[ip, :]) pressure_kin = species_masses * tensor_species_loop(temp_kin_tensor, species_num) / box_volume pressure_pot = tensor_cross_species_loop(virial_species_tensor, species_num) / box_volume pressure_tensor = pressure_kin + pressure_pot pressure = (pressure_tensor[0, 0] + pressure_tensor[1, 1] + pressure_tensor[2, 2]) / dimensions # for sp, num in enumerate(species_num): # sp_end += num # pressure_kin[:,:,sp] = species_masses[sp] * temp_kin_tensor[:, :, sp_start:sp_end].sum() / box_volume # pressure_pot[:,:,sp] = virial_species_tensor[:,:,sp_start:sp_end].sum(axis = -1) / box_volume # # .trace is not supported by numba (at the time of this writing), hence the addition of the three terms # # Pressure of each species # pressure[sp] = (pressure_kin[0,0, sp] + pressure_pot[0,0, sp] + pressure_kin[1,1, sp] + pressure_pot[1,1, sp] + pressure_kin[2,2, sp] + pressure_pot[2,2, sp] ) / dimensions # sp_start += num # Calculate the total pressure tensor # pressure_tensor = (pressure_kin + pressure_pot).sum(axis = -1) return pressure, pressure_kin, pressure_pot
# Dev note: Because I want to use numba I need to separate between a scalar and a vector quantity. Numba compiles the function to return either a scalar or a vector. not all.
[docs]@njit def scalar_species_loop(observable, species_num): sp_start = 0 sp_end = 0 sp_obs = zeros(species_num.shape[0]) for sp, sp_num in enumerate(species_num): sp_end += sp_num sp_obs[sp] = observable[sp_start:sp_end].sum() sp_start += sp_num return sp_obs
[docs]@njit def vector_species_loop(observable, species_num): sp_start = 0 sp_end = 0 sp_obs = zeros((species_num.shape[0], 3)) for sp, sp_num in enumerate(species_num): sp_end += sp_num sp_obs[sp, :] = observable[sp_start:sp_end, :].sum(axis=0) sp_start += sp_num return sp_obs
[docs]@njit def vector_cross_species_loop(observable, species_num): sp_obs = zeros((3, species_num.shape[0])) for sp in range(species_num.shape[0]): sp_obs[:, sp] = observable[:, sp, :].sum(axis=-1) return sp_obs
[docs]@njit def tensor_species_loop(observable, species_num): sp_start = 0 sp_end = 0 sp_obs = zeros((3, 3, species_num.shape[0])) for sp, sp_num in enumerate(species_num): sp_end += sp_num sp_obs[:, :, sp] = observable[:, :, sp_start:sp_end].sum(axis=-1) sp_start += sp_num return sp_obs
[docs]@njit def tensor_cross_species_loop(observable, species_num): sp_obs = zeros((3, 3, species_num.shape[0])) for sp in range(species_num.shape[0]): sp_obs[:, :, sp] = observable[:, :, sp, :].sum(axis=-1) return sp_obs
[docs]@njit def remove_drift_nb(vel, nums): """ Numba'd function to enforce conservation of total linear momentum. It updates :attr:`sarkas.particles.Particles.vel`. Parameters ---------- vel: numpy.ndarray Particles' velocities. nums: numpy.ndarray Number of particles of each species. masses: numpy.ndarray Mass of each species. """ species_start = 0 species_end = 0 for ic, sp_num in enumerate(nums): species_end += sp_num vel[species_start:species_end, :] -= vel[species_start:species_end, :].sum(axis=0) / sp_num species_start += sp_num