import h5py
import numpy as np
import platform
import os
# Determine python version
PY_VERSION = int(platform.python_version().split('.')[0]) > 3
''' ANI data packer class
Class for storing data supplied as a dictionary.
'''
[docs]
class datapacker(object):
def __init__(self, store_file, mode='w-', complib='gzip', complevel=6):
"""Wrapper to store arrays within HFD5 file
"""
# opening file
self.store = h5py.File(store_file, mode=mode)
self.clib = complib
self.clev = complevel
[docs]
def store_data(self, store_loc, **kwargs):
"""Put arrays to store
"""
g = self.store.create_group(store_loc)
for k, v, in kwargs.items():
if type(v) == list:
if len(v) != 0:
if type(v[0]) is np.str_ or type(v[0]) is str:
v = [a.encode('utf8') for a in v]
#print(k)
g.create_dataset(k, data=v, compression=self.clib, compression_opts=self.clev)
[docs]
def cleanup(self):
"""Wrapper to close HDF5 file
"""
self.store.close()
''' ANI data loader class
Class for loading data stored with the datapacker class.
'''
[docs]
class anidataloader(object):
''' Contructor '''
def __init__(self, store_file):
if not os.path.exists(store_file):
raise FileNotFoundError('file ' + store_file + 'not found.')
self.store = h5py.File(store_file,'r')
''' Group recursive iterator (iterate through all groups in all branches and return datasets in dicts) '''
[docs]
def h5py_dataset_iterator(self,g, prefix=''):
for key in g.keys():
item = g[key]
path = '{}/{}'.format(prefix, key)
keys = [i for i in item.keys()]
if isinstance(item[keys[0]], h5py.Dataset): # test for dataset
data = {'path':path}
for k in keys:
if not isinstance(item[k], h5py.Group):
dataset = np.array(item[k][()])
if type(dataset) is np.ndarray:
if dataset.size != 0:
if type(dataset[0]) is np.bytes_:
dataset = [a.decode('ascii') for a in dataset]
data.update({k:dataset})
yield data
else: # test for group (go down)
yield from self.h5py_dataset_iterator(item, path)
''' Default class iterator (iterate through all data) '''
def __iter__(self):
for data in self.h5py_dataset_iterator(self.store):
yield data
''' Returns a list of all groups in the file '''
[docs]
def get_group_list(self):
return [g for g in self.store.values()]
''' Allows interation through the data in a given group '''
[docs]
def iter_group(self,g):
for data in self.h5py_dataset_iterator(g):
yield data
''' Returns the requested dataset '''
[docs]
def get_data(self, path, prefix=''):
item = self.store[path]
path = '{}/{}'.format(prefix, path)
keys = [i for i in item.keys()]
data = {'path': path}
for k in keys:
if not isinstance(item[k], h5py.Group):
dataset = np.array(item[k][()])
if type(dataset) is np.ndarray:
if dataset.size != 0:
if type(dataset[0]) is np.bytes_:
dataset = [a.decode('ascii') for a in dataset]
data.update({k: dataset})
return data
''' Returns the number of groups '''
[docs]
def group_size(self):
return len(self.get_group_list())
''' Returns the number of items in the entire file '''
[docs]
def size(self):
count = 0
for g in self.store.values():
count = count + len(g.items())
return count
''' Close the HDF5 file '''
[docs]
def cleanup(self):
self.store.close()