"""
Storage layer for perses automated molecular design.
TODO
----
* Add write_sampler_state(modname, sampler_state, iteration)
* Generalize write_quantity to handle units
* Add data access routines for reading to isolate low-level storage layer
"""
__author__ = 'John D. Chodera'
################################################################################
# IMPORTS
################################################################################
import os, os.path
import sys, math
import numpy as np
import copy
import time
import netCDF4 as netcdf
import pickle
import json
import mdtraj
from simtk import unit
import codecs
################################################################################
# LOGGER
################################################################################
import logging
logger = logging.getLogger(__name__)
################################################################################
# STORAGE
################################################################################
[docs]class NetCDFStorage(object):
"""NetCDF storage layer.
"""
[docs] def __init__(self, filename, mode='w'):
"""Create NetCDF storage layer, creating or appending to an existing file.
Parameters
----------
filename : str
Name of storage file to bind to.
mode : str, optional, default='w'
File open mode, 'w' for (over)write, 'a' for append.
"""
self._filename = filename
self._ncfile = netcdf.Dataset(self._filename, mode=mode)
self._envname = None
self._modname = None
# Create standard dimensions.
if 'iterations' not in self._ncfile.dimensions:
self._ncfile.createDimension('iterations', size=None)
if 'spatial' not in self._ncfile.dimensions:
self._ncfile.createDimension('spatial', size=3)
def _find_group(self):
"""Retrieve the specified group, creating it if it does not exist.
"""
groupname = '/'
if self._envname is not None:
groupname += self._envname + '/'
if self._modname is not None:
groupname += self._modname + '/'
ncgrp = self._ncfile.createGroup(groupname)
return ncgrp
def _encode_string(string, encoding='ascii'):
"""Encode strings to ASCII to avoid python 3 crap.
"""
try:
return string.encode(encoding)
except UnicodeEncodeError:
return string
[docs] def sync(self):
"""Flush write buffer.
"""
self._ncfile.sync()
[docs] def close(self):
"""Close the storage layer.
"""
self._ncfile.close()
[docs] def write_configuration(self, varname, positions, topology, iteration=None, frame=None, nframes=None):
"""Write a configuration (or one of a sequence of configurations) to be stored as a native NetCDF array
Parameters
----------
varname : str
The variable name to be stored
positions : simtk.unit.Quantity of size [natoms,3] with units compatible with angstroms
The positions to be written
topology : md.Topology object
The corresponding Topology object
iteration : int, optional, default=None
The local iteration for the module, or `None` if this is a singleton
frame : int, optional, default=None
If these coordinates are part of multiple frames in a sequence, the frame number
nframes : int, optional, default=None
If these coordinates are part of multiple frames in a sequence, the total number of frames in the sequence
"""
ncgrp = self._find_group()
if ((nframes is not None) and (frame is None)) or ((nframes is None) and (frame is not None)):
raise Exception("Both 'nfranes' and 'frame' must be used together.")
def dimension_name(iteration, suffix):
dimension_name = ''
if self._envname: dimension_name += self._envname + '_'
if self._modname: dimension_name += self._modname + '_'
dimension_name += varname + '_' + suffix + '_' + str(iteration)
return dimension_name
if iteration is not None:
varname += '_' + str(iteration)
if varname not in ncgrp.variables:
# Create dimensions
if (frame is not None):
frames_dimension_name = dimension_name(varname, 'frames')
ncdim = self._ncfile.createDimension(frames_dimension_name, nframes)
natoms = topology.n_atoms
atoms_dimension_name = dimension_name(varname, 'atoms')
ncdim = self._ncfile.createDimension(atoms_dimension_name, natoms)
# Create variables
# TODO: Handle cases with no iteration but with specified frames
if (iteration is not None) and (frame is not None):
ncgrp.createVariable(varname, np.float32, dimensions=(frames_dimension_name, atoms_dimension_name, 'spatial'), chunksizes=(1,natoms,3))
elif (iteration is not None):
ncgrp.createVariable(varname, np.float32, dimensions=(atoms_dimension_name, 'spatial'), chunksizes=(natoms,3))
else:
ncgrp.createVariable(varname, np.float32, dimensions=(atoms_dimension_name, 'spatial'), chunksizes=(natoms,3))
# Write Topology
if (frame is None) or (frame == 0):
topology_varname = varname + '_topology'
if (iteration is not None):
topology_varname += '_' + str(iteration)
self.write_object(topology_varname, topology, iteration=iteration)
# Write positions
# TODO: Handle cases with no iteration but with specified frames
positions_unit = unit.angstroms
if (frame is not None):
ncgrp.variables[varname][frame,:,:] = positions[:,:] / positions_unit
else:
ncgrp.variables[varname] = positions[:,:] / positions_unit
[docs] def write_object(self, varname, obj, iteration=None):
"""Serialize a Python object, encoding as pickle when storing as string in NetCDF.
Parameters
----------
varname : str
The variable name to be stored
obj : object
The object to be serialized
iteration : int, optional, default=None
The local iteration for the module, or `None` if this is a singleton
"""
ncgrp = self._find_group()
if varname not in ncgrp.variables:
if iteration is not None:
ncgrp.createVariable(varname, str, dimensions=('iterations',), chunksizes=(1,))
else:
ncgrp.createVariable(varname, str, dimensions=(), chunksizes=(1,))
pickled = codecs.encode(pickle.dumps(obj), "base64").decode()
if iteration is not None:
ncgrp.variables[varname][iteration] = pickled
else:
ncgrp.variables[varname] = pickled
[docs] def get_object(self, envname, modname, varname, iteration=None):
"""Get the serialized Python object.
Parameters
----------
envname : str
The name of the environment for the variable
modname : str
The name of the module for the variable
varname : str
The variable name to be stored
iteration : int, optional, default=None
The local iteration for the module, or `None` if this is a singleton
Returns
-------
obj : object
The retrieved object
"""
nc_path = "/{envname}/{modname}/{varname}".format(envname=envname, modname=modname, varname=varname)
if iteration is not None:
pickled = self._ncfile[nc_path][iteration]
else:
pickled = self._ncfile[nc_path][0]
obj = pickle.loads(codecs.decode(pickled.encode(), "base64"))
return obj
[docs] def write_quantity(self, varname, value, iteration=None):
"""Write a floating-point number
Parameters
----------
varname : str
The variable name to be stored
value : float
The floating-point value to be written
iteration : int, optional, default=None
The local iteration for the module, or `None` if this is a singleton
"""
ncgrp = self._find_group()
if varname not in ncgrp.variables:
if iteration is not None:
ncgrp.createVariable(varname, 'f8', dimensions=('iterations',), chunksizes=(1,))
else:
ncgrp.createVariable(varname, 'f8', dimensions=(), chunksizes=(1,))
if iteration is not None:
ncgrp.variables[varname][iteration] = value
else:
ncgrp.variables[varname] = value
[docs] def write_array(self, varname, array, iteration=None):
"""Write a numpy array as a native NetCDF array
Parameters
----------
varname : str
The variable name to be stored
array : numpy.array of arbitrary dimension
The numpy array to be written
iteration : int, optional, default=None
The local iteration for the module, or `None` if this is a singleton
"""
ncgrp = self._find_group()
def dimension_name(dimension_index):
dimension_name = ''
if self._envname: dimension_name += self._envname + '_'
if self._modname: dimension_name += self._modname + '_'
dimension_name += varname + '_' + str(dimension_index)
return dimension_name
if varname not in ncgrp.variables:
# Create dimensions
dimensions = list()
if iteration is not None:
dimensions.append('iterations')
for (dimension_index, size) in enumerate(array.shape):
ncdim = self._ncfile.createDimension(dimension_name(dimension_index), size)
dimensions.append(dimension_name(dimension_index))
dimensions = tuple(dimensions)
# Create variables
if iteration is not None:
ncgrp.createVariable(varname, array.dtype, dimensions=dimensions, chunksizes=((1,) + array.shape))
else:
ncgrp.createVariable(varname, array.dtype, dimensions=dimensions, chunksizes=array.shape)
# Check dimensions
expected_shape = list()
for (dimension_index, size) in enumerate(array.shape):
expected_shape.append(self._ncfile.dimensions[dimension_name(dimension_index)].size)
expected_shape = tuple(expected_shape)
if expected_shape != array.shape:
raise Exception("write_array called for /%s/%s/%s with different dimension (%s) than initially called (%s); dimension must stay constant." % (envname, modname, varname, str(array.shape), str(expected_shape)))
if iteration is not None:
ncgrp.variables[varname][iteration] = array
else:
ncgrp.variables[varname] = array
################################################################################
# BOUND STORAGE VIEWS THAT ENCAPSULATE ENVIRONMENT NAMES AND MODULE NAMES
################################################################################
class NetCDFStorageView(NetCDFStorage):
"""NetCDF storage view with bound environment and module names.
"""
def __init__(self, storage, envname=None, modname=None):
"""Initialize a view of the storage with a specific environment and module name.
Parameters
----------
envname : str, optional, default=None
Set the name of the environment this module is attached to.
modname : str, optional, default=None
Set the name of the module in the code writing the variable
"""
self._filename = storage._filename
self._ncfile = storage._ncfile
self._envname = storage._envname
self._modname = storage._modname
if envname: self._envname = envname
if modname: self._modname = modname