Source code for GSASII.imports.G2pwd_BrukerBRML

# -*- coding: utf-8 -*-
'''
'''
import os
import shutil
import numpy as np
from .. import GSASIIpath
try:
    import xmltodict as xml
except Exception as msg:
    #if GSASIIpath.GetConfigValue('debug'): print(f'Debug: xmltodict error = {msg}')
    xml = None
from .. import GSASIIobj as G2obj

[docs] class brml_ReaderClass(G2obj.ImportPowderData): 'Routines to import powder data from a zip Bruker .brml file' def __init__(self): super(self.__class__,self).__init__( # fancy way to self-reference extensionlist=('.brml',), strictExtension=True, formatName = 'Bruker brml', longFormatName = 'Bruker .brml powder data file' ) if xml is None: self.UseReader = False msg = 'Bruker .brml Reader skipped because xmltodict module is not installed.' from .. import GSASIIfiles as G2fil G2fil.ImportErrorMsg(msg,{'Bruker .brml Importer':['xmltodict']}) self.scriptable = True
[docs] def ContentsValidator(self, filename): '''Validate by testing if the file can be opened by zip and if so, does it contain at least one file named "RawData*.xml" ''' if xml is None: return False try: import zipfile as ZF with ZF.ZipFile(filename, 'r') as zipObj: for fil in zipObj.namelist(): if 'RawData' in fil and '.xml' in fil: return True return False except: return False
[docs] def Reader(self,filename, ParentFrame=None, **kwarg): 'Read a Bruker brml file' def XtractXMLScan(): '''Read the XML info into the GSAS-II reader structure. This structure seems to be for where the detector is scanned. Code from Bob with some modifications to read a wider assortment of files and sample temperature :returns: True if read suceeds. May also throw an exception on failure ''' self.idstring = f'{os.path.basename(filename)} {os.path.basename(fil)}' self.powderentry[0] = filename self.powderentry[2] = filNum self.comments = [] datano = 1 try: nSteps = int(data['RawData']['DataRoutes']['DataRoute'][datano]['ScanInformation']['MeasurementPoints']) except KeyError: datano = 0 nSteps = int(data['RawData']['DataRoutes']['DataRoute']['ScanInformation']['MeasurementPoints']) if nSteps <= 10: return False # too short x = np.zeros(nSteps, dtype=float) y = np.zeros(nSteps, dtype=float) w = np.zeros(nSteps, dtype=float) if datano: effTime = float(data['RawData']['DataRoutes']['DataRoute'][datano]['ScanInformation']['TimePerStepEffective']) else: effTime = float(data['RawData']['DataRoutes']['DataRoute']['ScanInformation']['TimePerStepEffective']) # Extract 2-theta angle and counts from the XML document i=0 T = {} tcol = GSASIIpath.GetConfigValue('BRML_T_col',-1) if tcol >= 0: tcols = (tcol,) else: tcols = (8,6,5) # columns to use for temperature for j in tcols: T[f'{j}max'] = -1 T[f'{j}min'] = 9999 T[f'{j}sum'] = 0 T[f'{j}c'] = 0 # data appears to be in column 4 most of the time # but at least in one file, it is in column 3 y3 = [] while i < nSteps : if datano: entry = data['RawData']['DataRoutes']['DataRoute'][datano]['Datum'][i].split(',') else: entry = data['RawData']['DataRoutes']['DataRoute']['Datum'][i].split(',') x[i] = float(entry[2]) y[i] = float(entry[4])*float(entry[0])/effTime y3.append(float(entry[3])) i = i + 1 for j in tcols: # columns with temperature? try: t = float(entry[j]) if t > 0: T[f'{j}max'] = max(T[f'{j}max'],t) T[f'{j}min'] = min(T[f'{j}min'],t) T[f'{j}sum'] += t T[f'{j}c'] += 1 except: pass #breakpoint() try: # is there some range in col 4 values? if abs(1. - min(y)/max(y)) < 1e-4: raise Exception except: y = np.array(y3) w = np.where(y>0,1/y,0.) for j in sorted(tcols): # show all columns with temperature if T[f'{j}c'] > 0: print(f"Column {j} T min {T[f'{j}min']:.2f}" f" max {T[f'{j}max']:.2f}" f" avg {T[f'{j}sum']/T[f'{j}c']:.2f} (C assumed)") for j in tcols: # take 1st column with non-zero temperatures if T[f'{j}c'] > 0: self.Sample['Temperature'] = 273.15 + T[f'{j}sum']/T[f'{j}c'] if len(tcols) > 1: print(f"Using column {j}, T={self.Sample['Temperature']:.3f} K") break self.powderdata = [x,y,w,np.zeros(nSteps),np.zeros(nSteps),np.zeros(nSteps)] return True def XtractXMLNoscan(): '''Read the XML info into the GSAS-II reader structure. This structure seems to be for where the detector is stationary. :returns: True if read suceeds. May also throw an exception on failure ''' self.idstring = f'{os.path.basename(filename)} {os.path.basename(fil)}' self.powderentry[0] = filename self.powderentry[2] = filNum self.comments = [] try: scandata = data['RawData']['DataRoutes']['DataRoute']['ScanInformation']['ScaleAxes'] if not scandata: return if 'ScaleAxisInfo' not in scandata: return except: return False try: start = float(scandata['ScaleAxisInfo']['Start']) stop = float(scandata['ScaleAxisInfo']['Stop']) incr = float(scandata['ScaleAxisInfo']['Increment']) nSteps = int(0.5 + (stop-start)/incr) if nSteps <= 10: return False # too short xyT = data['RawData']['DataRoutes']['DataRoute']['Datum'].split(',') except: return False try: self.Sample['Temperature'] = 273. + float(xyT[2]) # seems to be temperature except: return y = np.array([float(y) for y in xyT[3:3+nSteps]]) x = np.array([start + i * incr for i in range(len(y))]) w = np.where(y>0,1/y,0.) self.powderdata = [x,y,w,np.zeros(nSteps),np.zeros(nSteps),np.zeros(nSteps)] return True #### beginning of Reader if xml is None: return False self.buffer = kwarg.get('buffer',{}) if 'filesFound' not in self.buffer or 'maxNum' not in self.buffer: # do a scan of zip to find RawData self.buffer['filesFound'] = [] self.buffer['maxNum'] = -1 try: import zipfile as ZF with ZF.ZipFile(filename, 'r') as zipObj: for fil in zipObj.namelist(): if 'RawData' in fil and '.xml' in fil: self.buffer['filesFound'].append(fil) try: self.buffer['maxNum'] = max( self.buffer['maxNum'], int(fil.split('RawData')[1].split('.')[0])) except: pass except Exception as msg: self.errors = "Error with dir unzip:\n"+str(msg) return False filNum = kwarg.get('blocknum',1)-1 # loop over the files in the zip structure until we find one we can read while filNum <= self.buffer['maxNum']: # find file(s) matching the number files = [i for i,j in enumerate(self.buffer['filesFound']) if f'RawData{filNum}.xml' in j] if not files: filNum += 1 continue indx = files[0] fil = self.buffer['filesFound'].pop(indx) # take the 1st located file try: import zipfile as ZF with ZF.ZipFile(filename, 'r') as zipObj: zipObj.extract(fil) with open(fil) as fd: data = dict(xml.parse(fd.read())) except Exception as msg: print(f"Error with unzip/xml extraction on {fil}:\n{msg}") continue finally: if os.path.exists(os.path.dirname(fil)): shutil.rmtree(os.path.dirname(fil)) try: readOK = XtractXMLNoscan() if readOK: break readOK = XtractXMLScan() if readOK: break #print(f"Rejected {fil} (no points)") except Exception as msg: print(f"Read of {fil} failed:\n{msg}") else: self.errors = "Error: no datasets found to read" return False try: import zipfile as ZF with ZF.ZipFile(filename, 'r') as zipObj: zipObj.extract(fil) with open(fil) as fd: data = dict(xml.parse(fd.read())) os.remove(fil) os.rmdir('Experiment0') except Exception as msg: self.errors = f"Error with unzip/read:\n{msg}" return False finally: if os.path.exists(os.path.dirname(fil)): shutil.rmtree(os.path.dirname(fil)) # add some comments for key in 'TimeStampStarted','TimeStampFinished': self.comments.append(f'{key}: {data["RawData"].get(key,"?")}\n') for key in data['RawData'].get('Identifier',{}): val = data['RawData']['Identifier'][key] if type(val) is str: self.comments.append(f'{key}: {val}\n') # file read was successful, are there more files to read? self.repeatcount = filNum while filNum <= self.buffer['maxNum']: # find file(s) matching the number files = [i for i,j in enumerate(self.buffer['filesFound']) if f'RawData{filNum}.xml' in j] if not files: filNum += 1 continue self.repeat = True break else: self.repeat = False return True