importosimportnumpyasnpfrompyscal.formats.aseimportconvert_snapimportpyscal.coreaspc#import h5pyimportwarnings#def hdf_to_dump(infile, outfile, keys=None):""" A support function that can convert hdf formatted trajectory to dump format Parameters ---------- infile : string name of the input hdf file outfile : string name of the output dump file keys : list, optional output keys to be written. default keys are box, [id, type, x, y, z]"""""" if keys is None: outkeys = ['x', 'y', 'z'] mainkey = ['id', 'type', 'x', 'y', 'z'] else: outkeys = np.concatenate((['x', 'y', 'z'], keys)) mainkey = np.concatenate((['id', 'type', 'x', 'y', 'z'], keys)) keyheader = " ".join(mainkey) keyheader = " ".join(["ITEM: ATOMS", keyheader, "\n"]) with open(outfile, "w") as dump: with h5py.File(infile, "r") as hf: for key in hf.keys(): natoms = len(np.array(hf[key]["atoms"]["x"])) box = np.array(hf[key]["box"]) dump.write("ITEM: TIMESTEP\n") dump.write("%s\n" % key) dump.write("ITEM: NUMBER OF ATOMS\n") dump.write("%d\n" % natoms) dump.write("ITEM: BOX BOUNDS\n") dump.write("%f %f\n" % (box[0][0], box[0][1])) dump.write("%f %f\n" % (box[1][0], box[1][1])) dump.write("%f %f\n" % (box[2][0], box[2][1])) dump.write(keyheader) for i in range(natoms): outval1 = ["%d %d"%(int(hf[key]["atoms"]["id"][i]), int(hf[key]["atoms"]["type"][i])) ] outval2 = [str(hf[key]["atoms"][x][i]) for x in outkeys] outvals = [*outval1, *outval2] outvals.append("\n") outline = " ".join(outvals) dump.write(outline)"""
[docs]classTimeslice:""" Timeslice containing info about a single time slice Timeslices can also be added to each """
[docs]def__init__(self,trajectory,blocklist):""" Initialize instance with data """self.trajectory=trajectoryself.blocklist=blocklistself.trajectories=[trajectory]self.blocklists=[blocklist]
def__repr__(self):""" String of the class """blockstring=["%d-%d"%(x[0],x[-1])forxinself.blocklists]blockstring="/".join(blockstring)data="Trajectory slice\n%s\n natoms=%d\n"%(blockstring,self.trajectory.natoms)returndatadef__add__(self,ntraj):""" Add a method for summing """fortrajinntraj.trajectories:self.trajectories.append(traj)forblockinntraj.blocklists:self.blocklists.append(block)returnselfdef__radd__(self,ntraj):""" Reverse add method """ifntraj==0:returnselfelse:returnself.__add__(ntraj)
[docs]defto_system(self,customkeys=None):""" Get block as pyscal system Parameters ---------- blockno : int number of the block to be read, starts from 0 Returns ------- sys : System pyscal System """sys=[]forcount,trajinenumerate(self.trajectories):forxinself.blocklists[count]:s=self.trajectories[count]._get_block_as_system(x,customkeys=customkeys)sys.append(s)returnsys
[docs]defto_ase(self,species=None):""" Get block as Ase objects Parameters ---------- blockno : int number of the block to be read, starts from 0 Returns ------- sys : ASE object """sys=[]forcount,trajinenumerate(self.trajectories):forxinself.blocklists[count]:s=self.trajectories[count]._get_block_as_ase(x,species=species)sys.append(s)returnsys
[docs]defto_dict(self,):""" Get the required block as data """data=[]forcount,trajinenumerate(self.trajectories):forxinself.blocklists[count]:self.trajectories[count].load(x)data.append(self.trajectories[count].data[x])self.trajectories[count].unload(x)returndata
[docs]defto_file(self,outfile,mode="w"):""" Get block as outputfile Parameters ---------- outfile : string name of output file mode : string write mode to be used, optional default "w" write also can be "a" to append. Returns ------- None """fout=open(outfile,mode)forcount,trajinenumerate(self.trajectories):self.trajectories[count]._get_blocks_to_file(fout,self.blocklists[count])fout.close()
#def to_hdf(self, outfile, keys=None, mode='w', compression="gzip"):""" Get the block as hdf file Parameters ---------- outfile : string name of the output file headers : list, optional The keys to be stored in hdf format. Default values stored are [id, type, x, y, z] mode : string, optional h5 write mode. see here - https://docs.h5py.org/en/stable/high/file.html compression : string, optional the compression algorithm to choose. Default gzip Returns ------- None """""" if keys is None: outkeys = ['id', 'type', 'x', 'y', 'z'] else: outkeys = np.concatenate((['id', 'type', 'x', 'y', 'z'], keys)) c = 0 with h5py.File(outfile, 'w') as hf: for count, traj in enumerate(self.trajectories): for x in self.blocklists[count]: self.trajectories[count].load(x) data = self.trajectories[count].data[x] self.trajectories[count].unload(x) tk = str(c) #warnings.warn(tk) hf.create_group(tk) hf[tk].create_dataset('box', data=data['box'], compression=compression) hf[tk].create_group("atoms") for key in outkeys: hf[tk]["atoms"].create_dataset(key, data=data['atoms'][key], compression=compression) c += 1 """
[docs]classTrajectory:""" A Trajectory class for LAMMPS """
[docs]def__init__(self,filename):""" Initiaze the class Parameters ---------- filename : string name of the inputfile customkeys : list of string keys other than position, id that needs to be read in from the input file """ifos.path.exists(filename):self.filename=filenameelse:raiseFileNotFoundError("%s file not found"%filename)self.natoms=0self.blocksize=0self.nblocks=0self.loadlist=Noneself.data=Noneself._get_natoms()self._get_nblocks()
def__repr__(self):""" String of the class """return"Trajectory of %d slices with %d atoms"%(self.nblocks,self.natoms)def__getitem__(self,blockno):""" Allow for slice indexing """ifisinstance(blockno,slice):blocklist=range(*blockno.indices(self.nblocks))timeslice=Timeslice(self,blocklist)returntimesliceelse:blocklist=[blockno]timeslice=Timeslice(self,blocklist)returntimeslicedef_get_natoms(self):""" Get number of atoms in the system Parameters ---------- None Returns ------- None """withopen(self.filename,"rb")asfout:data=[next(fout)forxinrange(0,4)]self.natoms=(int(data[-1]))def_get_nlines(self):""" Get total number of lines in the file Parameters ---------- None Returns ------- nlines : int number of lines """line_offset=[]offset=0nlines=0forlineinopen(self.filename,'rb'):line_offset.append(offset)offset+=len(line)nlines+=1self.nlines=nlinesself.line_offset=line_offsetreturnnlinesdef_get_nblocks(self):""" Get number of blocks in the trajectory file Parameters ---------- None Returns ------- None """self._get_natoms()nlines=self._get_nlines()self.blocksize=self.natoms+9self.nblocks=nlines//self.blocksizeself.straylines=nlines-self.nblocks*self.blocksize#set load list to Falseself.loadlist=[Falseforxinrange(self.nblocks)]self.data=[Noneforxinrange(self.nblocks)]
[docs]defget_block(self,blockno):""" Get a block from the file as raw data Parameters ---------- blockno : int number of the block to be read, starts from 0 Returns ------- data : list list of strings containing data """start=blockno*self.blocksizestop=(blockno+1)*self.blocksizefin=open(self.filename,"rb")fin.seek(0)fin.seek(self.line_offset[start])data=[]foriinrange(self.blocksize):line=fin.readline().decode("utf-8")data.append(line)returndata
[docs]defload(self,blockno):""" Load the data of a block into memory as a dictionary of numpy arrays Parameters ---------- blockno : int number of the block to be read, starts from 0 Returns ------- None Notes ----- When the data of a block is loaded, it is accessible through `Trajectory.data[x]`. This data can then be modified. When the block is written out, the modified data is written instead of existing one. But, loaded data is kept in memory until unloaded using `unload` method. """data=self.get_block(blockno)box=np.loadtxt(data[5:8])columns=np.loadtxt(data[9:])header=np.loadtxt(data[8:9],dtype=str)[2:]outdict={}outdict["box"]=boxoutdict["atoms"]={}forcount,hinenumerate(header):outdict["atoms"][h]=columns[:,count]self.data[blockno]=outdictself.loadlist[blockno]=True
[docs]defunload(self,blockno):""" Unload the data that is loaded to memory using `load` method Parameters ---------- blockno : int number of the block to be read, starts from 0 Returns ------- None """self.data[blockno]=Noneself.loadlist[blockno]=False
def_convert_data_to_lines(self,blockno):""" Create lines from loaded data Parameters ---------- blockno : int number of the block to be read, starts from 0 Returns ------- data : list of strs list of lines """dd=self.data[blockno]data=[]data.append("ITEM: TIMESTEP\n")data.append("".join([str(0),os.linesep]))data.append("ITEM: NUMBER OF ATOMS\n")data.append("".join([str(self.natoms),os.linesep]))data.append("ITEM: BOX BOUNDS pp pp pp\n")forbindd["box"]:dstr=" ".join(b.astype(str))data.append("".join([dstr,os.linesep]))xf=[]xd=[]xfkeys=[]xdkeys=[]forkey,valindd["atoms"].items():ifkeyin["id","type"]:val.astype(int)xd.append(val)xdkeys.append(key)else:xf.append(val)xfkeys.append(key)xdstrs=[]iflen(xd)>0:foriinrange(len(xd[0])):substr=[]forjinrange(len(xdkeys)):substr.append("%d"%xd[j][i])xdstrs.append(" ".join(substr))xdheader=" ".join(xdkeys)xdheader=" ".join(["ITEM: ATOMS",xdheader])xfstrs=[]xf=np.array(xf)iflen(xf)>0:foriinrange(len(xf[0])):dstr=" ".join((xf[:,i]).astype(str))xfstrs.append("".join([dstr,os.linesep]))xfheader=" ".join(xfkeys)mainheader=" ".join([xdheader,xfheader])mainheader="".join([mainheader,os.linesep])data.append(mainheader)foriinrange(len(xfstrs)):valstr=" ".join([xdstrs[i],xfstrs[i]])#valstr = "".join([valstr, os.linesep])data.append(valstr)returndatadef_get_block_as_system(self,blockno,customkeys=None):""" Get block as pyscal system Parameters ---------- blockno : int number of the block to be read, starts from 0 Returns ------- sys : System pyscal System """#convert to system and returndata=self.get_block(blockno)sys=pc.System()sys.read_inputfile(data,customkeys=customkeys)returnsysdef_get_block_as_ase(self,blockno,species=None):""" Get block as pyscal system Parameters ---------- blockno : int number of the block to be read, starts from 0 Returns ------- sys : System pyscal System """#convert to system and returndata=self.get_block(blockno)sys=pc.System()sys.read_inputfile(data,customkeys=None)asesys=convert_snap(sys,species=species)returnasesysdef_get_blocks_to_file(self,fout,blocklist):""" Get a series of blocks from the file as raw data Parameters ---------- blockno : int number of the block to be read, starts from 0 Returns ------- data : list list of strings containing data """xl=[xforxinblocklist]xl=np.array(xl)#open file#convert lines to start from endforxinxl:ifself.loadlist[x]:data=self._convert_data_to_lines(x)else:data=self.get_block(x)forlineindata:fout.write(line)