# -*- coding: utf-8 -*-
'''A reader for HDF-5 files. This should be as generic as possible, but
at present this is pretty much customized for XSD-MPE (APS) uses.
Note that for further development of this routine, as more types of HDF5
image files occur "in the wild," it is often helpful to
map out the contents of a HDF5 file. If debug mode is on and the full file
name/path contains either 'tmp' or 'scratch' (case is ignored) then the
two files are created with filename + _HDF5Map.txt and + _NeXusMap.txt
that use HDF5 and NeXus routines to outline the file contents.
'''
import copy
import numpy as np
try:
import h5py
except ImportError:
h5py = None
from .. import GSASIIobj as G2obj
from .. import GSASIIfiles as G2fil
from .. import GSASIIpath
[docs]
class HDF5_Reader(G2obj.ImportImage):
'''Routine to read one or more HDF-5 images from a HDF5 file,
typically from APS Sectors 1, 6 or 20.
Initial version from Barbara Frosik/SDM.
'''
def __init__(self):
if h5py is None:
self.UseReader = False
msg = 'HDF5 Reader skipped because h5py library is not installed'
if GSASIIpath.condaTest():
msg += ' To fix this use command:\n\tconda install h5py hdf5'
G2fil.ImportErrorMsg(msg,{'HDF5 image importer':['h5py','hdf5']})
super(self.__class__,self).__init__( # fancy way to self-reference
extensionlist=('.hdf5','.hd5','.h5','.hdf'),strictExtension=True,
formatName = 'HDF5 image',longFormatName = 'HDF5 image file')
[docs]
def ContentsValidator(self, filename):
'''Test if valid by seeing if the HDF5 library recognizes the file.
'''
try:
fp = h5py.File(filename, 'r')
fp.close()
if not GSASIIpath.GetConfigValue('debug'): return True
# diagram out the file if in debug and if in a scratch area
if 'scratch' in filename.lower() or 'tmp' in filename.lower():
# first try NeXus
from ..imports import G2pwd_HDF5
NeXreader = G2pwd_HDF5.HDF5_Reader()
print('Performing NeXus debug scan')
NeXreader.HDF5list(filename)
# now scan as plain HDF5
fp = h5py.File(filename, 'r')
with open(filename+'_HDF5Map.txt', 'w') as log:
self.visit(fp,log=log)
return True
except IOError:
return False
[docs]
def Reader(self, filename, ParentFrame=None, **kwarg):
'''Read an image from a HDF5 file. Note that images are using :meth:`readDataset`.
When called the first time on a file, the file structure is scanned
using :meth:`visit` to map out locations of image(s). On subsequent calls,
if more than one image is in the file, the map of file structure (in
buffer arg) is reused. Depending on the Config setting for HDF5selection,
a window may be opened to allow selection of which images will be read.
When an image is reread, the blocknum will be a list item with the location
to be read, so the file scan can be skipped.
'''
try:
fp = h5py.File(filename, 'r')
except IOError:
return False
imagenum = kwarg.get('blocknum')
if imagenum is None: imagenum = 1
quick = False
# do we have a image number or a map to the section with the image?
imageTag = None
try:
int(imagenum) # test if image # is a tuple
except: # pull the section name and number out from the imagenum value
readargs = {'name':imagenum[0],'num':imagenum[1]}
imageTag = imagenum
quick = True
# set up an index as to where images are found
self.buffer = kwarg.get('buffer',{})
if not quick and not self.buffer.get('imagemap'):
try:
if GSASIIpath.GetConfigValue('debug'): print('Scanning for image map')
self.buffer['imagemap'] = []
self.UniversalComments = self.visit(fp)
if len(self.buffer['imagemap']) == 0:
self.errors = 'No valid images found in file'
fp.close()
return False
if imagenum > len(self.buffer['imagemap']):
self.errors = f"Only {len(self.buffer['imagemap'])} images found in file. {imagenum} cannot be read."
fp.close()
return False
nsel = GSASIIpath.GetConfigValue('HDF5selection',getDefault=True)
self.buffer['selectedImages'] = list(range(len(self.buffer['imagemap'])))
if ParentFrame and len(self.buffer['imagemap']) > nsel and nsel >= 0:
import wx
from .. import GSASIIctrlGUI as G2G
choices = []
for loc,num,siz in self.buffer['imagemap']:
if num is None:
choices.append(f'image in {loc} size={siz}')
else:
choices.append(f'image in {loc} sec {num} size={siz}')
dlg = G2G.G2MultiChoiceDialog(ParentFrame,'Select images to read',
'Choose images',choices)
dlg.Layout()
dlg.SendSizeEvent()
if dlg.ShowModal() == wx.ID_OK:
self.buffer['selectedImages'] = dlg.GetSelections()
dlg.Destroy()
if len(self.buffer['selectedImages']) == 0:
self.errors = 'No images selected from file'
fp.close()
return False
except Exception as msg:
print(f'Error mapping file:\n{msg}')
return False
if not quick:
self.buffer['selectedImages'] = self.buffer.get('selectedImages',
list(range(len(self.buffer['imagemap']))))
# get the next selected image
while imagenum <= len(self.buffer['imagemap']):
if imagenum-1 in self.buffer['selectedImages']:
del self.buffer['selectedImages'][self.buffer['selectedImages'].index(imagenum-1)]
break
else:
imagenum += 1
else: # unexpected!
self.errors = 'No images selected from file'
fp.close()
return False
readargs = {'imagenum':imagenum}
self.Data,self.Npix,self.Image = self.readDataset(fp,**readargs)
if quick:
fp.close()
if GSASIIpath.GetConfigValue('debug'): print(f'Read image {imagenum} from file {filename}')
# pointer to section of file & image number here
if imageTag:
self.Data['ImageTag'] = imageTag
return True
if self.Npix == 0:
self.errors = 'No valid images found in file'
fp.close()
return False
self.LoadImage(ParentFrame,filename,imagenum)
tag = self.buffer['imagemap'][imagenum-1][0]
sec = self.buffer['imagemap'][imagenum-1][1]
self.imageEntry = imagenum-1
if sec is None:
sec = "(none)"
self.repeatcount = 0
else:
self.repeatcount = self.buffer['imagemap'][imagenum-1][1]+1
if GSASIIpath.GetConfigValue('debug'): print(f'Read image #{imagenum} ({tag} section {sec}) from file {filename}')
self.Data['ImageSection'] = tag # save section of file here
# look for next image to read
while imagenum <= len(self.buffer['imagemap']):
if imagenum in self.buffer['selectedImages']:
self.repeat = True
break
else:
imagenum += 1
else:
self.repeat = False
fp.close()
return True
[docs]
def visit(self, fp, log=None):
'''Recursively visit every node in an HDF5 file & look at dimensions
of contents. If the shape is length 2, 3, or 4 assume an image
and index in self.buffer['imagemap']. Optionally save an outline
of the file contents on log, if defined.
:param fp: an HDF5 file object from h5py.File()
:param log: an optional text file object [from open()]. If supplied, an outline
of the file contents is placed here.
'''
header = []
if hasattr(self,'buffer'): self.buffer['ParamTrackingVars'] = {}
def func(name, dset):
'''process each entry in the file, classifying or sticking
values into the header (comments)
'''
if not hasattr(dset,'shape'):
if log is not None:
log.write(f'{name} (node)\n')
return # not array, can't be image
if isinstance(dset, h5py.Dataset) and log is not None:
dims = dset.shape
if len(dims) == 0:
log.write(f'{name} = {dset[()]}\n')
elif len(dims) == 1:
log.write(f'{name} ({dims[0]} elements)\n\t{dset[()][:5]}...\n')
else:
log.write(f'{name} dimensions {dims}\n')
if not hasattr(self,'buffer'): return
if isinstance(dset, h5py.Dataset):
dims = dset.shape
try:
if len(dims) <= 1:
# entries that will go into header or are parametric
val = dset[()]
if hasattr(val,'decode'):
val = val.decode()
elif dims == (1,) and hasattr(val,'tobytes') and str(val.dtype).startswith('|S'):
try:
val = val.tobytes().decode().rstrip('\x00')
except:
pass
#elif dims == (1,): # single value arrays
# val = val[0]
elif all(np.nan_to_num(val[0]) == np.nan_to_num(val)): # arrays where all values are the same
if 'float' in str(dset[()].dtype):
val = f'{val[0]:.8g}'
elif 'int' in str(dset[()].dtype):
val = f'{val[0]}'
elif '|S' in str(dset[()].dtype):
val = val[0].tobytes().decode().rstrip('\x00')
else: # not string, float or int, hope for best
val = val[0]
else:
# this is likely a parametric array. Store it for later
self.buffer['ParamTrackingVars'][dset.name] = np.array(dset[()])
return
header.append(f'{dset.name}: {val}')
elif len(dims) == 4:
size = dims[2:]
self.buffer['imagemap'] += [(dset.name,i,size) for i in range(dims[1])]
elif len(dims) == 3:
size = dims[1:]
self.buffer['imagemap'] += [(dset.name,i,size) for i in range(dims[0])]
elif len(dims) == 2:
size = dims
self.buffer['imagemap'] += [(dset.name,None,size)]
else:
print(f'Skipping entry {dset.name}. Shape is {dims}')
except Exception as msg:
print(f'Skipping entry {dset.name} Error getting shape\n{msg}')
fp.visititems(func)
return header
[docs]
def readDataset(self,fp,imagenum=1,name=None,num=None):
'''Read a specified image number from a file
'''
if name is None:
name,num,size = self.buffer['imagemap'][imagenum-1] # look up image in map
quick = False
else:
quick = True
dset = fp[name]
if num == None:
image = dset[()]
blocklen = 0
elif len(dset.shape) == 4:
image = dset[0,num,...]
blocklen = dset.shape[1]
elif len(dset.shape) == 3:
image = dset[num,...]
blocklen = dset.shape[0]
else:
msg = f'Unexpected image dimensions {name}'
print(msg)
raise Exception(msg)
if quick:
return {},None,image.T
# add parametric values to the brginning of the comments
self.Comments = []
for k in self.buffer.get('ParamTrackingVars',[]):
arr = self.buffer['ParamTrackingVars'][k]
if len(arr) != blocklen: continue
#self.Comments.append(f'{k.split("/")[-1]}: {arr[num]}')
self.Comments.append(f'{k}: {arr[num]}')
self.Comments += copy.deepcopy(self.UniversalComments)
sizexy = list(image.shape)
Npix = sizexy[0]*sizexy[1]
j = 0 # use 1st size/bin entry for all images
# get 1ID pixel size info. Currently an array, but this may change
try:
if 'PixelSizeX' in fp['/instrument/Detector'] and 'PixelSizeY' in fp['/instrument/Detector']:
pixelsize = [float(fp['/instrument/Detector/PixelSizeX'][0]),
float(fp['/instrument/Detector/PixelSizeY'][0])]
print(f'Using PixelSize[XY] for Pixel size: {pixelsize}.')
except:
pixelsize = None
try:
if not pixelsize:
misc = {}
for key in 'DetSizeX','DetSizeY':
misc[key] = [i for i in fp['misc'][key]]
for key in 'DetPixelSizeX','DetPixelSizeY':
misc[key] = [float(i) for i in fp['misc'][key]]
if 'DetSizeX' in misc and 'DetSizeY' in misc:
pixelsize = [misc[f'DetSize{i}'][j]*misc[f'DetPixelSize{i}'][j] for i in ('X','Y')]
print(f'Using DetSize[XY] & DetPixelSize[XY] for Pixel size: {pixelsize}.')
else:
pixelsize = [misc[f'DetPixelSize{i}'][j] for i in ('X','Y')]
print(f'Using DetPixelSize* for Pixel size: {pixelsize}.')
except:
pixelsize = None
print(f'No PixelSize[XY], DetSize[XY] or DetPixelSize[XY].')
# default pixel size (for APS sector 6?)
if not pixelsize:
pixelsize = [74.8,74.8]
print(f'Pixel size defaulting to {pixelsize}')
data = {'pixelSize':pixelsize,'wavelength':0.15,'distance':1000.,
'center':[sizexy[0]*0.1,sizexy[1]*0.1],'size':sizexy,'det2theta':0.0}
for item in self.Comments:
name,val = item.split(':',1)
if 'wavelength' in name and 'spread' not in name:
try:
data['wavelength'] = float(val)
except ValueError:
pass
elif 'distance' in name:
data['distance'] = float(val)
elif 'x_pixel_size' in name:
data['pixelSize'][0] = float(val)*1000.
elif 'y_pixel_size' in name:
data['pixelSize'][1] = float(val)*1000.
elif 'beam_center_x' in name:
data['center'][0] = float(val)
elif 'beam_center_y' in name:
data['center'][1] = float(val)
for item in self.Comments: # override previous with these
if "instrument/HEM/Energy" in item:
name,val = item.split(':',1)
data['wavelength'] = 12.398425/float(val)
return data,Npix,image.T