import os
import numpy as np
from pyscal.formats.ase import convert_snap
import pyscal.core as pc
#import h5py
import warnings
import random
#def hdf_to_dump(infile, outfile, keys=None):
"""
A support function that can convert hdf formatted
trajectory to dump format
Parameters
----------
infile : string
name of the input hdf file
outfile : string
name of the output dump file
keys : list, optional
output keys to be written.
default keys are box, [id, type, x, y, z]
"""
"""
if keys is None:
outkeys = ['x', 'y', 'z']
mainkey = ['id', 'type', 'x', 'y', 'z']
else:
outkeys = np.concatenate((['x', 'y', 'z'], keys))
mainkey = np.concatenate((['id', 'type', 'x', 'y', 'z'], keys))
keyheader = " ".join(mainkey)
keyheader = " ".join(["ITEM: ATOMS", keyheader, "\n"])
with open(outfile, "w") as dump:
with h5py.File(infile, "r") as hf:
for key in hf.keys():
natoms = len(np.array(hf[key]["atoms"]["x"]))
box = np.array(hf[key]["box"])
dump.write("ITEM: TIMESTEP\n")
dump.write("%s\n" % key)
dump.write("ITEM: NUMBER OF ATOMS\n")
dump.write("%d\n" % natoms)
dump.write("ITEM: BOX BOUNDS\n")
dump.write("%f %f\n" % (box[0][0], box[0][1]))
dump.write("%f %f\n" % (box[1][0], box[1][1]))
dump.write("%f %f\n" % (box[2][0], box[2][1]))
dump.write(keyheader)
for i in range(natoms):
outval1 = ["%d %d"%(int(hf[key]["atoms"]["id"][i]), int(hf[key]["atoms"]["type"][i])) ]
outval2 = [str(hf[key]["atoms"][x][i]) for x in outkeys]
outvals = [*outval1, *outval2]
outvals.append("\n")
outline = " ".join(outvals)
dump.write(outline)
"""
[docs]
class Timeslice:
"""
Timeslice containing info about a single time slice
Timeslices can also be added to each
"""
[docs]
def __init__(self, trajectory, blocklist):
"""
Initialize instance with data
"""
self.trajectory = trajectory
self.blocklist = blocklist
self.trajectories = [trajectory]
self.blocklists = [blocklist]
def __repr__(self):
"""
String of the class
"""
blockstring = ["%d-%d"%(x[0], x[-1]) for x in self.blocklists]
blockstring = "/".join(blockstring)
data = "Trajectory slice\n %s\n natoms=%d\n"%(blockstring, self.trajectory.natoms)
return data
def __add__(self, ntraj):
"""
Add a method for summing
"""
for traj in ntraj.trajectories:
self.trajectories.append(traj)
for block in ntraj.blocklists:
self.blocklists.append(block)
return self
def __radd__(self, ntraj):
"""
Reverse add method
"""
if ntraj == 0:
return self
else:
return self.__add__(ntraj)
[docs]
def to_system(self, customkeys=None):
"""
Get block as pyscal system
Parameters
----------
blockno : int
number of the block to be read, starts from 0
Returns
-------
sys : System
pyscal System
"""
sys = []
for count, traj in enumerate(self.trajectories):
for x in self.blocklists[count]:
s = self.trajectories[count]._get_block_as_system(x, customkeys=customkeys)
sys.append(s)
return sys
[docs]
def to_ase(self, species=None):
"""
Get block as Ase objects
Parameters
----------
blockno : int
number of the block to be read, starts from 0
Returns
-------
sys : ASE object
"""
sys = []
for count, traj in enumerate(self.trajectories):
for x in self.blocklists[count]:
s = self.trajectories[count]._get_block_as_ase(x, species=species)
sys.append(s)
return sys
[docs]
def to_dict(self,):
"""
Get the required block as data
"""
data = []
for count, traj in enumerate(self.trajectories):
for x in self.blocklists[count]:
self.trajectories[count].load(x)
data.append(self.trajectories[count].data[x])
self.trajectories[count].unload(x)
return data
[docs]
def to_file(self, outfile, mode="w"):
"""
Get block as outputfile
Parameters
----------
outfile : string
name of output file
mode : string
write mode to be used, optional
default "w" write
also can be "a" to append.
Returns
-------
None
"""
fout = open(outfile, mode)
for count, traj in enumerate(self.trajectories):
self.trajectories[count]._get_blocks_to_file(fout, self.blocklists[count])
fout.close()
#def to_hdf(self, outfile, keys=None, mode='w', compression="gzip"):
"""
Get the block as hdf file
Parameters
----------
outfile : string
name of the output file
headers : list, optional
The keys to be stored in hdf format.
Default values stored are [id, type, x, y, z]
mode : string, optional
h5 write mode.
see here - https://docs.h5py.org/en/stable/high/file.html
compression : string, optional
the compression algorithm to choose. Default gzip
Returns
-------
None
"""
"""
if keys is None:
outkeys = ['id', 'type', 'x', 'y', 'z']
else:
outkeys = np.concatenate((['id', 'type', 'x', 'y', 'z'], keys))
c = 0
with h5py.File(outfile, 'w') as hf:
for count, traj in enumerate(self.trajectories):
for x in self.blocklists[count]:
self.trajectories[count].load(x)
data = self.trajectories[count].data[x]
self.trajectories[count].unload(x)
tk = str(c)
#warnings.warn(tk)
hf.create_group(tk)
hf[tk].create_dataset('box', data=data['box'], compression=compression)
hf[tk].create_group("atoms")
for key in outkeys:
hf[tk]["atoms"].create_dataset(key, data=data['atoms'][key], compression=compression)
c += 1
"""
[docs]
class Trajectory:
"""
A Trajectory class for LAMMPS
"""
[docs]
def __init__(self, filename):
"""
Initiaze the class
Parameters
----------
filename : string
name of the inputfile
customkeys : list of string
keys other than position, id that needs to be read
in from the input file
"""
if os.path.exists(filename):
self.filename = filename
else:
raise FileNotFoundError("%s file not found"%filename)
self.natoms = 0
self.blocksize = 0
self.nblocks = 0
self.loadlist = None
self.data = None
self._get_natoms()
self._get_nblocks()
def __repr__(self):
"""
String of the class
"""
return "Trajectory of %d slices with %d atoms"%(self.nblocks, self.natoms)
def __getitem__(self, blockno):
"""
Allow for slice indexing
"""
if isinstance(blockno, slice):
blocklist = range(*blockno.indices(self.nblocks))
timeslice = Timeslice(self, blocklist)
return timeslice
else:
blocklist = [blockno]
timeslice = Timeslice(self, blocklist)
return timeslice
def _get_natoms(self):
"""
Get number of atoms in the system
Parameters
----------
None
Returns
-------
None
"""
with open(self.filename, "rb") as fout:
data = [next(fout) for x in range(0, 4)]
self.natoms = (int(data[-1]))
def _get_nlines(self):
"""
Get total number of lines in the file
Parameters
----------
None
Returns
-------
nlines : int
number of lines
"""
line_offset = []
offset = 0
nlines = 0
for line in open(self.filename, 'rb'):
line_offset.append(offset)
offset += len(line)
nlines += 1
self.nlines = nlines
self.line_offset = line_offset
return nlines
def _get_nblocks(self):
"""
Get number of blocks in the trajectory file
Parameters
----------
None
Returns
-------
None
"""
self._get_natoms()
nlines = self._get_nlines()
self.blocksize = self.natoms+9
self.nblocks = nlines//self.blocksize
self.straylines = nlines - self.nblocks*self.blocksize
#set load list to False
self.loadlist = [False for x in range(self.nblocks)]
self.data = [None for x in range(self.nblocks)]
def _yield(self, index, output):
if output == "index":
return index
elif output == "system":
return Timeslice(self, [index]).to_system()[0]
#iterator for index
[docs]
def iter(self, method="ascending", n_slices=None, output="index"):
"""
Iterate and provide slices from the trajectory
Parameters
----------
method: str, how to provide slices
ascending: from 0 to Nblocks
descending: from Nblocks to 0
random: randomly between 0 and N
n_slices: int, number of slices to provide
if None, nblocks is the maximum
output: string, output format
index: provide index number
system: provide pyscal system
"""
if n_slices is None:
n_slices = self.nblocks
if method == "ascending":
for i in range(n_slices):
yield self._yield(i, output)
elif method == "descending":
for i in range(n_slices):
yield self._yield(self.nblocks-i, output)
elif method == "random":
slice_list = np.arange(0, self.nblocks, 1).astype(int)
random.shuffle(slice_list)
for i in slice_list[:n_slices]:
yield self._yield(i, output)
else:
raise ValueError("Unknown method")
[docs]
def get_block(self, blockno):
"""
Get a block from the file as raw data
Parameters
----------
blockno : int
number of the block to be read, starts from 0
Returns
-------
data : list
list of strings containing data
"""
start = blockno*self.blocksize
stop = (blockno+1)*self.blocksize
fin = open(self.filename, "rb")
fin.seek(0)
fin.seek(self.line_offset[start])
data = []
for i in range(self.blocksize):
line = fin.readline().decode("utf-8")
data.append(line)
return data
[docs]
def load(self, blockno):
"""
Load the data of a block into memory as a dictionary
of numpy arrays
Parameters
----------
blockno : int
number of the block to be read, starts from 0
Returns
-------
None
Notes
-----
When the data of a block is loaded, it is accessible
through `Trajectory.data[x]`. This data can then be
modified. When the block is written out, the modified
data is written instead of existing one. But, loaded
data is kept in memory until unloaded using `unload`
method.
"""
data = self.get_block(blockno)
box = np.loadtxt(data[5:8])
columns = np.loadtxt(data[9:])
header = np.loadtxt(data[8:9], dtype=str)[2:]
outdict = {}
outdict["box"] = box
outdict["atoms"] = {}
for count, h in enumerate(header):
outdict["atoms"][h] = columns[:,count]
self.data[blockno] = outdict
self.loadlist[blockno] = True
[docs]
def unload(self, blockno):
"""
Unload the data that is loaded to memory using
`load` method
Parameters
----------
blockno : int
number of the block to be read, starts from 0
Returns
-------
None
"""
self.data[blockno] = None
self.loadlist[blockno] = False
def _convert_data_to_lines(self, blockno):
"""
Create lines from loaded data
Parameters
----------
blockno : int
number of the block to be read, starts from 0
Returns
-------
data : list of strs
list of lines
"""
dd = self.data[blockno]
data = []
data.append("ITEM: TIMESTEP\n")
data.append("".join([str(0), os.linesep]))
data.append("ITEM: NUMBER OF ATOMS\n")
data.append("".join([str(self.natoms), os.linesep]))
data.append("ITEM: BOX BOUNDS pp pp pp\n")
for b in dd["box"]:
dstr = " ".join(b.astype(str))
data.append("".join([dstr, os.linesep]))
xf = []
xd = []
xfkeys = []
xdkeys = []
for key, val in dd["atoms"].items():
if key in ["id", "type"]:
val.astype(int)
xd.append(val)
xdkeys.append(key)
else:
xf.append(val)
xfkeys.append(key)
xdstrs = []
if len(xd)>0:
for i in range(len(xd[0])):
substr = []
for j in range(len(xdkeys)):
substr.append("%d"%xd[j][i])
xdstrs.append(" ".join(substr))
xdheader = " ".join(xdkeys)
xdheader = " ".join(["ITEM: ATOMS", xdheader])
xfstrs = []
xf = np.array(xf)
if len(xf)>0:
for i in range(len(xf[0])):
dstr = " ".join((xf[:,i]).astype(str))
xfstrs.append("".join([dstr, os.linesep]))
xfheader = " ".join(xfkeys)
mainheader = " ".join([xdheader, xfheader])
mainheader = "".join([mainheader, os.linesep])
data.append(mainheader)
for i in range(len(xfstrs)):
valstr = " ".join([xdstrs[i], xfstrs[i]])
#valstr = "".join([valstr, os.linesep])
data.append(valstr)
return data
def _get_block_as_system(self, blockno, customkeys=None):
"""
Get block as pyscal system
Parameters
----------
blockno : int
number of the block to be read, starts from 0
Returns
-------
sys : System
pyscal System
"""
#convert to system and return
data = self.get_block(blockno)
sys = pc.System()
sys.read_inputfile(data, customkeys=customkeys)
return sys
def _get_block_as_ase(self, blockno, species=None):
"""
Get block as pyscal system
Parameters
----------
blockno : int
number of the block to be read, starts from 0
Returns
-------
sys : System
pyscal System
"""
#convert to system and return
data = self.get_block(blockno)
sys = pc.System()
sys.read_inputfile(data, customkeys=None)
asesys = convert_snap(sys, species=species)
return asesys
def _get_blocks_to_file(self, fout, blocklist):
"""
Get a series of blocks from the file as raw data
Parameters
----------
blockno : int
number of the block to be read, starts from 0
Returns
-------
data : list
list of strings containing data
"""
xl = [x for x in blocklist]
xl = np.array(xl)
#open file
#convert lines to start from end
for x in xl:
if self.loadlist[x]:
data = self._convert_data_to_lines(x)
else:
data = self.get_block(x)
for line in data:
fout.write(line)