Source code for pylam._datafile

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from io import StringIO
import re

from .base import (IndexedFile, BlockFile, Table)
from ._block import (ThermoBlock, FixBlock)


[docs]class SimpleDataFile(IndexedFile): """ Bases: :class:`pylam.base.IndexedFile` Class for a simple (csv) data file with an optional header line. :param filename: file name :type filename: str :param header_prefix: prefix of header line (default: ``#``) :type header_prefix: str :param delimiter: string used to separate values (default: any whitespace) :type delimiter: str :return: simple data file object :rtype: .SimpleDataFile """ def __init__(self, filename, header_prefix='#', delimiter=None): self.header_line = None self._header = None self._header_prefix = header_prefix self._delimiter = delimiter self._number_of_headerlines = 0 self._currentRowIndex = 0 super(SimpleDataFile, self).__init__(filename) self._get_header() def _get_header(self): header_line = super(SimpleDataFile, self).getLines(0, 0) #print 'h:', header_line if self._header_prefix in header_line: self._header = str(header_line.strip()[2:]).split() self.header_line = header_line self._number_of_headerlines = 1 self._currentLineIndex = 1 def _dataStr2npArray(self, data_string): data_string = StringIO(unicode(data_string)) return np.loadtxt(data_string, delimiter=self._delimiter)
[docs] def getRows(self, startRowIndex, endRowIndex): """ Returns a part of the file as a numpy array. :param startRowIndex: index of first data row :type startRowIndex: int :param endRowIndex: index of last data row (included!) :type endRowIndex: int :return: data :rtype: numpy.ndarray """ startRowIndex += self._number_of_headerlines endRowIndex += self._number_of_headerlines data_string = self.getLines(startRowIndex, endRowIndex) return self._dataStr2npArray(data_string)
[docs] def getRow(self, rowIndex): """ Returns the data in row with index (0,1,..) as a numpy array. :param rowIndex: index of data row in file :type rowIndex: int :return: data :rtype: numpy.ndarray """ return self.getRows(rowIndex, rowIndex)
[docs] def getCol(self, idx): """ Returns a *whole* column as a np.ndarray. :param idx: column index :type idx: int :return: column data :rtype: np.ndarray """ return self.data[:, idx]
@property def data(self): """ :return: the *whole* data :rtype: numpy.ndarray """ data_string = self.getLines(self._number_of_headerlines, self.fileLineSize-1) return self._dataStr2npArray(data_string) @property def header(self): """ :return: the header line as list :rtype: list """ return self._header @property def properHeader(self): """ :return: tests if a proper *header* is defined :rtype: bool """ self.ncolumns = len(self.getRow(0)) if len(self._header) == self.ncolumns: return True else: return False
[docs] def info(self): """ Prints debug info to screen, e.g.:: file name : example_data_files/simple.dat data header : ['Chunk', 'Coord1', 'Ncount', 'v_temp'] number of lines : 491 """ print 'file name :', self.filename print 'data header :', self.header print 'number of lines :', self.fileLineSize
[docs] def columnName2Index(self, name): """ Returns the column *index* for a given *name* as defined in the header. Therefore :attr:`pylam.SimpleDataFile.properHeader` must be *True*. :param name: column name :type name: str :return: column index :rtype: int """ if not self.properHeader: raise StandardError('No proper header defined. You can not use names to access columns.') if name not in self.header: msg = "Column name '" + name + "' not defined." raise NameError(msg) return self.header.index(name)
[docs] def getColumnByName(self, name): # type: (object) -> object """ Returns a *whole* column as a np.ndarray. :param name: column name :type name: str :return: column data :rtype: np.ndarray """ idx = self.columnName2Index(name) return self.data[:, idx]
def __getitem__(self, index): return self.getRow(index)
[docs] def __len__(self): """ Returns the number of *data* rows. (support for ``len()``)""" return self.fileLineSize - self._number_of_headerlines
[docs] def next(self): """ Returns the data of the *next* row from the file as numpy array.""" if self._currentRowIndex < self.fileLineSize + self._number_of_headerlines - 1: line = self.getRow(self._currentRowIndex) self._currentRowIndex += 1 return line self._currentRowIndex = 0 raise StopIteration()
#=======================================================================================================================
[docs]class FixBlockFile(BlockFile): """ Bases: :class:`pylam.base.BlockFile` Class for LAMMPS fix data files. :param filename: file name :type filename: str :return: fix block data file object :rtype: .FixBlockFile """ #: Block class which is attached, here: :class:`pylam.DataBlock` blockClass = FixBlock def __init__(self, filename): self._blockHeaderLines = 1 super(FixBlockFile, self).__init__(filename) self._anaHeader() self._readHeader() self._createBlocks() def _anaHeader(self): self._fileHeaderLines = 0 while '# ' == str(self.getLine(self._fileHeaderLines))[0:2]: self._fileHeaderLines += 1 if self._fileHeaderLines == 2: self._multi = False self._blockHeaderLines = 0 elif self._fileHeaderLines == 3: self._multi = True self._blockHeaderLines = 1 else: raise StandardError('Not a valid file format.') def _readHeader(self): self.title = str(self.getLine(0).strip())[2:] self.data_type = self.title.split()[0] if self._multi: self.block_header = str(self.getLine(1).strip())[2:].split() self.data_header_line = self.getLine(2)[2:] self.data_header = str(self.data_header_line.strip()).split() self.blocksize = int(self.getLine(3).strip().split()[1]) else: self.block_header = None self.data_header_line = self.getLine(1)[2:] self.data_header = str(self.data_header_line.strip()).split() self.blocksize = self.fileLineSize - self._fileHeaderLines def _createBlocks(self): nblocks = (self.fileLineSize - self._fileHeaderLines) / \ (self.blocksize + self._blockHeaderLines) for i in range(0, nblocks): startLineIndex = self._fileHeaderLines + self._blockHeaderLines startLineIndex += (self.blocksize + self._blockHeaderLines) * i endLineIndex = startLineIndex + self.blocksize - 1 # print startLineIndex, endLineIndex self.addBlock(fline=startLineIndex, lline=endLineIndex, header_line=self.data_header_line) if self.block_header: bhd = self.getLine(startLineIndex-1).strip().split() temp = {} for i in range(0, len(self.block_header)): temp[self.block_header[i]] = bhd[i] self._blocks[-1].blockHeaderData = temp
[docs] def aveBlocks(self,bids): """ Average over blocks. :param bids: list of block ids to use for average :type bids: list :return: numpy array containing the 'average block' :rtype: numpy.ndarray """ _data_sets = [] for bid in bids: if bid not in range(0, len(self)): raise IndexError('Invalid block ID!') _data_sets.append(self[bid]) num_rows = len(_data_sets[0]) num_cols = len(_data_sets[0][0]) data_new = np.zeros((num_rows,num_cols)) for i in range(0, num_rows): for j in range(0, num_cols): for k in range(0, len(_data_sets)): data_new[i][j] += _data_sets[k][i][j] data_new[i][j] = data_new[i][j]/len(_data_sets) return data_new
[docs] def info(self): """ Prints debug info to screen, e.g.:: file name : example_data_files/chunk1D.dat title : Chunk-averaged data for fix TPROFILE and group all data type : Chunk-averaged block header : ['Timestep', 'Number-of-chunks', 'Total-count'] data header : ['Chunk', 'Coord1', 'Ncount', 'v_temp'] block size : 121 number of lines : 491 number of blocks : 4 multi : True """ print 'file name :', self.filename print 'title :', self.title print 'data type :', self.data_type print 'block header :', self.block_header print 'data header :', self.data_header print 'block size :', self.blocksize print 'number of lines :', self.fileLineSize print 'number of blocks :', len(self) print 'multi :', self._multi
[docs] def columnName2Index(self, name): """ The index of the column with a given name. :param name: name of the column (property) :type name: str :return: index of the column :rtype: int """ if name not in self.data_header: msg = "Column name '" + name + "' not defined." raise NameError(msg) return self.data_header.index(name)
[docs] def blocks2cols(self, colname, fix=[]): """ Extracts a selected column (property) from a each block and combines them. For example, if ``chunk1D.dat`` looks like:: # Chunk-averaged data for fix TPROFILE and group all # Timestep Number-of-chunks Total-count # Chunk Coord1 Ncount v_temp 500000 121 2000 1 0.0881802 12.1949 0.842912 2 0.264541 12.09 0.842986 3 0.440901 12.0788 0.840996 ... 1000000 121 2000 1 0.0881802 11.6526 0.841493 2 0.264541 11.7525 0.840792 3 0.440901 11.7087 0.842102 ... ... with >>> import pylam >>> FBF = pylam.FixBlockFile('chunk1D.dat') >>> newTable = FBF.blocks2cols('v_temp', fix=['Chunk','Coord1']) >>> newTable.write('new.dat') ``new.dat`` will look like:: # Chunk Coord1 v_temp_0 v_temp_1 v_temp_2 v_temp_3 +1.00000000e+00 +8.81802000e-02 +8.42912000e-01 +8.41493000e-01 +8.40999000e-01 +8.43067000e-01 +2.00000000e+00 +2.64541000e-01 +8.42986000e-01 +8.40792000e-01 +8.45285000e-01 +8.40694000e-01 +3.00000000e+00 +4.40901000e-01 +8.40996000e-01 +8.42102000e-01 +8.40272000e-01 +8.44249000e-01 +4.00000000e+00 +6.17262000e-01 +8.37955000e-01 +8.37166000e-01 +8.39804000e-01 +8.38266000e-01 :param colname: selected column to collect :param fix: common column (like, e.g. 'Bin') :return: Table object :rtype: pylam.base.Table """ ncols = len(fix) + len(self) data = np.zeros((len(self[0]), ncols)) n = 0 for f in fix: fid = self.columnName2Index(f) data[:,n] = self[0].getColumnByName(f) n += 1 props = fix for i in range(0, len(self)): temp=str('{0:s}_{1:d}'.format(colname, i)) props.append( temp ) data[:,n] = self[i].getColumnByName(colname) n += 1 return Table(data_in=data, props=props)
#=======================================================================================================================
[docs]class LogFile(BlockFile): """ Bases: :class:`pylam.base.BlockFile` Class for LAMMPS log files. :param filename: log file name :type filename: str :return: log file object :rtype: .LogFile """ _start_pattern = ('Memory usage ', 'Per MPI rank memory allocation ') _end_pattern = ('Loop time',) #: Block class which is attached, here: :class:`pylam.ThermoBlock` blockClass = ThermoBlock def __init__(self, filename): self.totalSteps = 0 self.totalTime = 0.0 self.natoms = None self.groups = {} self._start_regex = re.compile('|'.join(self._start_pattern)) self._end_regex = re.compile('|'.join(self._end_pattern)) self.__in_block = False self.__temp_fline = None self.__temp_hline = None self.runs = [] self._parse_dict={'orthogonal box': self._parseBox, 'units': self._parseUnits, 'atoms in group': self._parseGroup} super(LogFile, self).__init__(filename) self._postParse() def _is_block_start(self, line): # for kw in self._start_pattern: # if kw in line: # return True # return False # or # return any(kw in line for kw in self._start_pattern) # or return re.search(self._start_regex, line) is not None def _is_block_end(self, line): # for kw in self._end_pattern: # if kw in line: # return True # return False # or # return any(kw in line for kw in self._end_pattern) # or return re.search(self._end_regex, line) is not None def _parseLine(self, lineString, lineIndex): if self.__in_block: # ---- we are IN a thermo block ---- if not self.__temp_hline: # if header not already set, set it... self.__temp_hline = lineString.strip() #elif self.__class__._end_pattern in lineString: # find 'end' pattern elif self._is_block_end(lineString): self.addBlock(fline=self.__temp_fline, lline=lineIndex - 1, header_line=self.__temp_hline) self.__in_block = False else: # ---- we are NOT IN a thermo block ---- #if self.__class__._start_pattern in lineString: # find 'start' pattern if self._is_block_start(lineString): self.__in_block = True self.__temp_fline = lineIndex+2 self.__temp_hline = None elif '#' != lineString[0]: self._parseKeyWords(lineString) # keyword scan def _postParse(self): natoms = [] for tblock in self: self.totalTime += tblock.runprops['rtime'] self.totalSteps += tblock.runprops['steps'] natoms.append(tblock.runprops['atoms']) if len(set(natoms)) == 1: self.natoms = natoms[0] else: self.natoms = natoms def _parseKeyWords(self, lineString): for key in self._parse_dict.keys(): if key in lineString: self._parse_dict[key](lineString) def _parseBox(self, line): h0 = line.strip().split()[-7:-4] h1 = line.strip().split()[-3:] self.box = {'xlo': float(h0[0][1:]), 'ylo': float(h0[1]), 'zlo': float(h0[2][:-1]), 'xhi': float(h1[0][1:]), 'yhi': float(h1[1]), 'zhi': float(h1[2][:-1])} self.box['Lx'] = self.box['xhi'] - self.box['xlo'] self.box['Ly'] = self.box['yhi'] - self.box['ylo'] self.box['Lz'] = self.box['zhi'] - self.box['zlo'] self.box['V'] = self.box['Lx'] * self.box['Ly'] * self.box['Lz'] self.box['Axy'] = self.box['Lx'] * self.box['Ly'] self.box['Axz'] = self.box['Lx'] * self.box['Lz'] self.box['Ayz'] = self.box['Ly'] * self.box['Lz'] def _parseUnits(self, line): if len(line.strip().split()) == 2: self.units = line.strip().split()[1] #print '-> units:', self.units def _parseGroup(self, line): w = line.strip().split() if len(w) == 5: self.groups[w[4]] = int(w[0])
[docs] def info(self, re=False): """ Prints debug info to screen, e.g.:: file name: example_data_files/log.lammps units: real N atoms 1372 N runs: 3 total run time: 3682.2074 sec total steps: 5010000 :param re: recursive info for :class:`pylam.ThermoBlock` :type re: bool """ print 'file name: ', self.filename print 'units: ', self.units print 'N atoms ', self.natoms print 'N runs: ', len(self) print 'total run time: ', self.totalTime,'sec' print 'total steps: ', self.totalSteps if re: print 'run info:' for i in range(0, len(self)): print '-'*10 + 'run ' + str(i) + ' ' + '-'*10 self.__getitem__(i).info()