Source code for xrdtools.io

from __future__ import unicode_literals, print_function, division, absolute_import

import os
import io
import logging

from lxml import etree
import numpy as np

logger = logging.getLogger(__name__)

package_path = os.path.dirname(__file__)


[docs]def validate_xrdml_schema(filename): """ Validate the xml schema of a given file. Parameters ---------- filename : str The Filename of the `.xrdml` file to test. Returns ------- float or None Returns the version number as float or None if the file was not matching any provided xml schema. """ schemas = [(1.5, 'data/schemas/XRDMeasurement15.xsd'), (1.4, 'data/schemas/XRDMeasurement14.xsd'), (1.3, 'data/schemas/XRDMeasurement13.xsd'), (1.2, 'data/schemas/XRDMeasurement12.xsd'), (1.1, 'data/schemas/XRDMeasurement11.xsd'), (1.0, 'data/schemas/XRDMeasurement10.xsd'), ] schemas = [(v, os.path.join(package_path, schema)) for v, schema in schemas] with open(filename, 'r') as f: data_xml = etree.parse(f) for version, schema in schemas: with io.open(schema, 'r', encoding='utf8') as f: xmlschema_doc = etree.parse(f) xmlschema = etree.XMLSchema(xmlschema_doc) valid = xmlschema.validate(data_xml) if valid: return version return None
def _txt_list2arr(txt): """ Split a list of numbers `txt` into a numpy ndarray. Parameters ---------- txt : str String containing floats separated by spaces. Returns ------- ndarray Numpy ndarray of dtype float. """ if txt is None: return np.asarray([]) return np.fromstring(txt, dtype=float, count=-1, sep=' ') def _get_array_for_single_value(data, key): """ Create an array for a given `key` of the length of the data array. Parameters ---------- data : dict Dictionary containing the measurement data and settings. key : str Key of the parameter which needs to be transformed to the same length as the data array. Returns ------- dict Same data dictionary as input dictionary `data`. """ if key not in data: return data if data[key].size == 1: data[key] = np.ones_like(data['data']) * data[key] elif len(data[key]) > 1 and np.all(data[key] == data[key][0]): data[key] = data[key][0] return data def _sort_data(k, uid_scans, data): """ Retrieve settings of scan `k` and append to data. Parameters ---------- k : int Scan number uid_scans : list A list containing `lxml.etree._Element` elements pointing to the scans in a xml tree. data : dict Data dictionary containing the measurement data and settings. Returns ------- dict Same data dictionary as input dictionary `data`. """ # get a scan scan = _get_scan_data(uid_scans, k) if scan: # append data to the completed data keys if data['measType'] == 'Scan' or scan['status'] == 'Completed': data['scannb'].append(k) for key in ['data', 'time', '2Theta', 'Omega', 'Phi', 'Psi', 'X', 'Y', 'Z']: if key in scan: data = _append2arr(data, scan, key) # append data to the incomplete data keys # TODO: check if the following code actually works?! else: data['iscannb'].append(k) data['idata'].append(scan['data']) data['itime'].append(scan['time']) data['i2Theta'].append(scan['2Theta']) data['iOmega'].append(scan['Omega']) if 'Phi' in scan.keys(): data['iPhi'].append(scan['Phi']) if 'Psi' in scan.keys(): data['iPsi'].append(scan['Psi']) if 'X' in scan.keys(): data['iX'].append(scan['X']) if 'Y' in scan.keys(): data['iY'].append(scan['Y']) if 'Z' in scan.keys(): data['iZ'].append(scan['Z']) return data def _append2arr(data, scan, key): """ Append the data with key `key` from scan `scan` to the data dictionary. Parameters ---------- data : dict Data dictionary containing the measurement data and settings. scan : dict Scan dictionary containing the measurement data and settings of one particular scan. key : str Parameter key of `scan` dictionary. Returns ------- dict Same data dictionary as input dictionary `data`. """ if data[key] == []: # keep as is, otherwise it fails the test data[key] = scan[key] else: data[key] = np.vstack((data[key], scan[key])) return data def _get_scan_data(uid_scans, scannb, namespace=None): """ Get the data of scan with number `scannb`. Parameters ---------- uid_scans : list A list containing `lxml.etree._Element` elements pointing to the scans in a xml tree. scannb : int ID of the scan. namespace : dict or None, optional A dictionary defining the namespace `ns`. If None, it is determined from the uid_scan.nsmap[None]. Returns ------- dict A dictionary containing the data and settings for the specified scan `scannb`. """ if namespace is None: namespace = {'ns': uid_scans[0].nsmap[None]} # create output dictionary scan_data = {} # get correct scan uid_scan = uid_scans[scannb] # get the scan status scan_data['status'] = uid_scan.get('status') # get the scan axis type scan_data['scanAxis'] = uid_scan.get('scanAxis') # get dataPoint handler data_points = uid_scan.find('ns:dataPoints', namespaces=namespace) # get intensities intensities = data_points.findtext('ns:intensities', namespaces=namespace) scan_data['data'] = _txt_list2arr(intensities) units_intensities = data_points.find('ns:intensities', namespaces=namespace).get('unit') # get counting time scan_mode = uid_scan.get('mode') if scan_mode == 'Pre-set counts': time = data_points.findtext('ns:countingTimes', namespaces=namespace) else: time = data_points.findtext('ns:commonCountingTime', namespaces=namespace) scan_data['time'] = _txt_list2arr(time) # normalize intensity units to cps if units_intensities == 'counts': scan_data['data'] /= scan_data['time'] # get the position of all axes uid_pos = data_points.findall('ns:positions', namespaces=namespace) n = len(scan_data['data']) # nb of data points for pos in uid_pos: info = _read_axis_info(pos, n) if info['axis'] in ['2Theta', 'Omega', 'Phi', 'Psi', 'X', 'Y', 'Z']: scan_data[info['axis']] = info['data'] else: print('axis type not supported') return scan_data def _read_axis_info(uid_pos, n): """ Get the settings for a given axis. Parameters ---------- uid_pos : lxml.etree._Element A `lxml.etree._Element` element pointing to axis information in the xml tree. n : int Number of data points. Returns ------- dict Axis settings stored in a dictionary. """ info = {'axis': uid_pos.get('axis'), 'unit': uid_pos.get('unit'), 'data': np.array([0, 0])} is_array = True for child in list(uid_pos): if child.tag.find('listPositions') != -1: info['data'] = _txt_list2arr(child.text) elif child.tag.find('startPosition') != -1: info['data'][0] = np.double(child.text) is_array = False elif child.tag.find('endPosition') != -1: info['data'][1] = np.double(child.text) is_array = False elif child.tag.find('commonPosition') != -1: info['data'] = np.asarray(np.double(child.text)) else: logger.debug('unsupported tag') info['data'] = np.array([]) if not is_array: info['data'] = np.linspace(info['data'][0], info['data'][1], n) return info
[docs]def read_xrdml(filename): """ Load a Panalytical XRDML file. Parameters ---------- filename : str The filename of the xrdml file to be loaded. Returns ------- dict A dictionary with all relevant data of the measurement. """ if not os.path.exists(filename): logger.error('File "{}" does not exist.'.format(filename)) raise ValueError('This is not a valid filename.') filename = os.path.abspath(filename) path, basename = os.path.split(filename) file_base, file_ext = os.path.splitext(basename) if file_ext == '': filename = file_base + '.xrdml' # check if file is conform with xml schema valid = validate_xrdml_schema(filename) if valid is None: raise ValueError('The file is not conform with hte xrdml schema.') tree = etree.parse(os.path.join(path, filename)).getroot() # define the namespace namespace = {'ns': tree.nsmap[None]} xrd_measurement = tree.find('ns:xrdMeasurement', namespaces=namespace) data = {'filename': filename, 'sample': tree.findtext('ns:sample/ns:id', namespaces=namespace), 'status': tree.get('status'), 'comment': {}} # get comment (reads only the first comment, needs maybe improvement) lookup = tree.findtext('ns:comment/ns:entry', namespaces=namespace) data['comment']['1'] = lookup if lookup else '' # get nb. of scans uid_scans = xrd_measurement.findall('ns:scan', namespaces=namespace) nb_scans = len(uid_scans) # get (h k l) and substrate if nb_scans > 1: reflection_uid = xrd_measurement.find('ns:scan[1]/ns:reflection', namespaces=namespace) data['hkl'] = {'h': None, 'k': None, 'l': None} if reflection_uid is not None: data['substrate'] = reflection_uid.findtext('ns:material', namespaces=namespace) for hkl in 'hkl': data['hkl'][hkl] = int(reflection_uid.findtext('ns:hkl/ns:{}'.format(hkl), namespaces=namespace)) else: data['substrate'] = '' # get measurement type data['measType'] = xrd_measurement.get('measurementType') # if not a simple scan and not the 'Repeated scan' than get # the step axis type if data['measType'] not in ['Scan', 'Repeated scan']: data['stepAxis'] = xrd_measurement.get('measurementStepAxis') # get the scan axis type if nb_scans > 0: data['scanAxis'] = uid_scans[0].get('scanAxis') # get scans for key in ['scannb', 'data', 'time', '2Theta', 'Omega', 'Phi', 'Psi', 'X', 'Y', 'Z', 'iscannb', 'idata', 'itime', 'i2Theta', 'iOmega', 'iPhi', 'iPsi', 'iX', 'iY', 'iZ']: data[key] = [] for k in range(nb_scans): scan = _get_scan_data(uid_scans, k, namespace=namespace) if scan: if data['measType'] == 'Scan' or scan['status'] == 'Completed': data['scannb'].append(k) for key in ['data', 'time', '2Theta', 'Omega', 'Phi', 'Psi', 'X', 'Y', 'Z']: if key in scan: data = _append2arr(data, scan, key) # TODO: check if this code actually works?! else: data['iscannb'].append(k) data['idata'].append(scan['data']) data['itime'].append(scan['time']) data['i2Theta'].append(scan['2Theta']) data['iOmega'].append(scan['Omega']) if 'Phi' in scan.keys(): data['iPhi'].append(scan['Phi']) if 'Psi' in scan.keys(): data['iPsi'].append(scan['Psi']) if 'X' in scan.keys(): data['iX'].append(scan['X']) if 'Y' in scan.keys(): data['iY'].append(scan['Y']) if 'Z' in scan.keys(): data['iZ'].append(scan['Z']) # if we have only one incomplete scan, the scan is considered to be # completed and is moved to completed scans list if data['scannb'] == [] and len(data['iscannb']) == 1: logger.debug('One and only incomplete scan found in the data. This scan is considered complete.') for key, ikey in zip(['scannb', 'data', 'time', '2Theta', 'Omega', 'Phi', 'Psi', 'X', 'Y', 'Z'], ['iscannb', 'idata', 'itime', 'i2Theta', 'iOmega', 'iPhi', 'iPsi', 'iX', 'iY', 'iZ']): if ikey in data.keys() and data[ikey]: data[key] = data[ikey] data[ikey] = [] # remove redundant information [data.pop(key) for key in ['Phi', 'Psi', 'X', 'Y', 'Z'] if data[key] == []] if len(data['iscannb']) == 0: for key in ['iscannb', 'idata', 'itime', 'i2Theta', 'iOmega', 'iPhi', 'iPsi', 'iX', 'iY', 'iZ']: data.pop(key) data = _get_array_for_single_value(data, 'time') if data['measType'] != 'Area measurement': data = _get_array_for_single_value(data, '2Theta') data = _get_array_for_single_value(data, 'Omega') if nb_scans > 1: for key in ['Phi', 'Psi', 'X', 'Y', 'Z']: data = _get_array_for_single_value(data, key) # in case of 'Repeated scan' sum all completed scans together and # remove redundant data if data['measType'] == 'Repeated scan': # average completed scans (intensity is in cps) for k in np.arange(1, len(data['scannb'])): data['data'][0] += data['data'][k] data['data'] = data['data'][0] / len(data['scannb']) # reduce all possible axis for key in ['2Theta', 'Omega', 'Phi', 'Psi', 'X', 'Y', 'Z']: data = _get_array_for_single_value(data, key) # set true time data['time'] *= len(data['scannb']) # remove redundant information about scan number data.pop('scannb') # get wavelength uid = xrd_measurement.find('ns:usedWavelength', namespaces=namespace) data['kType'] = uid.get('intended') data['kAlpha1'] = np.double(uid.findtext('ns:kAlpha1', namespaces=namespace)) data['kAlpha2'] = np.double(uid.findtext('ns:kAlpha2', namespaces=namespace)) data['kBeta'] = np.double(uid.findtext('ns:kBeta', namespaces=namespace)) data['kAlphaRatio'] = np.double(uid.findtext('ns:ratioKAlpha2KAlpha1', namespaces=namespace)) if data['kType'] == 'K-Alpha 1': data['Lambda'] = data['kAlpha1'] elif data['kType'] == 'K-Alpha': data['Lambda'] = data['kAlpha1'] + data['kAlphaRatio'] * data['kAlpha2'] data['Lambda'] /= 1.5 else: print('usedWavelength type is not supported (using K-Alpha 1') data['Lambda'] = data['kAlpha1'] # get some useful information (x/y-label, x/y-units) if nb_scans > 0: if 'scanAxis' in data.keys(): if data['scanAxis'] == 'Gonio': data['xlabel'] = '2Theta-Theta' if data['measType'] in ['Scan', 'Repeated scan']: data['x'] = data['2Theta'] elif data['scanAxis'] in ['2Theta', '2Theta-Omega']: data['xlabel'] = data['scanAxis'] if data['measType'] in ['Scan', 'Repeated scan']: data['x'] = data['2Theta'] elif data['scanAxis'] in ['Omega', 'Omega-2Theta']: data['xlabel'] = data['scanAxis'] if data['measType'] in ['Scan', 'Repeated scan']: data['x'] = data['Omega'] elif data['scanAxis'] == 'Reciprocal Space': data['xlabel'] = 'Omega' if data['measType'] in ['Scan', 'Repeated scan']: data['x'] = data['Omega'] elif data['scanAxis'] in ['Phi', 'Psi', 'X', 'Y', 'Z']: data['xlable'] = data['scanAxis'] if data['measType'] == 'Scan': data['x'] = data[data['scanAxis']] else: logger.debug('The scanAxis type is not supported') data['xlabel'] = 'unknown' uid = xrd_measurement.find('ns:scan[1]/ns:dataPoints/ns:positions', namespaces=namespace) data['xunit'] = uid.get('unit', 'nd') if 'stepAxis' in data.keys(): if data['stepAxis'] in ['2Theta', '2Theta-Omega', 'Omega', 'Omega-2Theta', 'Phi', 'Psi', 'X', 'Y', 'Z']: data['ylabel'] = data['stepAxis'] elif data['stepAxis'] == 'Gonio': data['ylabel'] = '2Theta-Theta' else: print('scanAxis type not supported') data['ylabel'] = 'unknown' # TODO: maybe optimization possible, load units before uid = xrd_measurement.find('ns:scan[1]/ns:dataPoints/ns:positions', namespaces=namespace) data['yunit'] = uid.get('unit', 'nd') if data['measType'] == 'Area measurement': dim_2t = data['2Theta'].shape dim_o = data['Omega'].shape if dim_2t[1] != dim_o[1] and data['scanAxis'] == '2Theta' and data['stepAxis'] == 'Omega': tmp = np.empty_like(data['2Theta']) for k in range(dim_2t[1]): tmp[:, k] = data['Omega'].T data['Omega'] = tmp logger.debug('Omega array was corrected to match "2Theta" and "data" arrays') # Mask Width [OPTIONAL] xpath = 'ns:incidentBeamPath/ns:mask/ns:width' uid = xrd_measurement.find(xpath, namespaces=namespace) if uid is not None: unit = uid.get('unit') if unit != 'mm': logger.debug("Mask width units are not 'mm'") data['maskWidth'] = np.double(tree.findtext(xpath, namespaces=namespace)) # Divergence slit Height [OPTIONAL] xpath = 'ns:incidentBeamPath/ns:divergenceSlit/ns:height' uid = xrd_measurement.find(xpath, namespaces=namespace) if uid is not None: unit = uid.get('unit') if unit != 'mm': logger.debug("Divergence slit height units are not 'mm'") data['slitHeight'] = np.double(xrd_measurement.findtext(xpath, namespaces=namespace)) return data
if __name__ == '__main__': data = read_xrdml('test_area.xrdml')