Commit 1c693a53 authored by Maximilian Schuette's avatar Maximilian Schuette 🌃
Browse files

Removed V. Rybnikov code, as not to mix with new code, added new DaqTimingPattern function

parent 5a06904a
import os
from datetime import datetime
from struct import pack, unpack
import numpy as np
from . import DAQbasic as dqb
SASE_info = {'FLASH': [{'Bits': list(range(0, 4)), 'Name': 'Charge', 'Op': 'Abs',
'Value': {'0': 0, '1': 0.02, '2': 0.03, '3': 0.04,
'4': 0.06, '5': 0.09, '6': 0.13, '7': 0.18,
'8': 0.25, '9': 0.36, '10': 0.50, '11': 0.71,
'12': 1.00, '13': 1.42, '14': 2.00, '15': 4.00}},
{'Bits': list(range(4, 8)), 'Name': 'Injector Laser', 'Op': 'Bits',
'Value': {'4': 'Laser1', '5': 'Laser2', '6': 'Laser3', '7': 'Laser4'}},
{'Bits': list(range(8, 14)), 'Name': 'User Laser', 'Op': 'Bits',
'Value': {'8': 'Seed1',
'9': 'Seed2',
'10': 'Seed3',
'11': 'PP1',
'12': 'PP2',
'13': 'PP3'}},
{'Bits': list(range(14, 18)), 'Name': 'Not used', 'Op': 'Bits',
'Value': {'14': 'Not used bit 14',
'15': 'Not used bit 15',
'16': 'Not used bit 16',
'17': 'Not used bit 17'}},
{'Bits': list(range(18, 22)), 'Name': 'Destination', 'Op': 'Abs',
'Value': {'0': 'None',
'1': 'LSt (Laser stand-alone)',
'2': 'FL2D (FLASH2 dump)',
'3': 'GnD (Gun dump/IDUMP)',
'4': 'FL1D (FLASH1 dump)',
'5': 'Not used value 5',
'6': 'Not used value 6',
'7': 'Not used value 7',
'8': 'FFwD (FLASH3 dump)'}},
{'Bits': list(range(22, 23)), 'Name': 'Event trigger 25', 'Op': 'Bits',
'Value': {'22': 'Reduced rep rate'}},
{'Bits': list(range(23, 27)), 'Name': 'Not used', 'Op': 'Bits',
'Value': {'23': 'Not used bit 23',
'24': 'Not used bit 24',
'25': 'Not used bit 25',
'26': 'Not used bit 26'}},
{'Bits': list(range(27, 32)), 'Name': 'Special Flags', 'Op': 'Bits',
'Value': {'27': 'Photon mirror',
'28': 'Wire scanner',
'29': 'LOLA',
'30': 'CRISP kicker',
'31': 'Undefined bit 31'}}],
'XFEL': [{'Bits': list(range(0, 4)), 'Name': 'Charge', 'Op': 'Abs',
'Value': {'0': 0, '1': 0.02, '2': 0.03, '3': 0.04,
'4': 0.06, '5': 0.09, '6': 0.13, '7': 0.18,
'8': 0.25, '9': 0.36, '10': 0.50, '11': 0.71,
'12': 1.00, '13': 1.42, '14': 2.00, '15': 4.00}},
{'Bits': list(range(4, 8)), 'Name': 'Injector Laser', 'Op': 'Bits',
'Value': {'4': 'I1.Laser1', '5': 'I1.Laser2', '6': 'I1.Laser3', '7': 'I2.Laser1'}},
{'Bits': list(range(8, 17)), 'Name': 'Seed/user Laser', 'Op': 'Bits',
'Value': {'8': 'Seed/user Laser1',
'9': 'Seed/user Laser2',
'10': 'Seed/user Laser3',
'11': 'Seed/user Laser4',
'12': 'Seed/user Laser5',
'13': 'Seed/user Laser6',
'14': 'Seed/user Laser7',
'15': 'Seed/user Laser8',
'16': 'Seed/user Laser9'}},
{'Bits': list(range(18, 22)), 'Name': 'Destination', 'Op': 'Abs',
'Value': {'0': 'None',
'1': 'I1lSt (Laser stand-alone)',
'2': 'T5D (SASE2 dump)',
'3': 'G1D (Gun dump/valve)',
'4': 'T4D (SASE1/3 dump)',
'5': 'I1D (Injector dump)',
'6': 'B1D (B1 dump)',
'7': 'B2D (B2 dump)',
'8': 'TLD'}},
{'Bits': list(range(22, 25)), 'Name': 'Not used', 'Op': 'Bits',
'Value': {'22': 'Not used bit 22',
'23': 'Not used bit 23',
'24': 'Not used bit 24'}},
{'Bits': list(range(25, 32)), 'Name': 'Special Flags', 'Op': 'Bits',
'Value': {'25': 'SASE3 soft kick (T4)',
'26': 'Beam distribution kicker (T1)',
'27': 'Soft kick (e.g.SA3)',
'28': 'Wire scanner',
'29': 'TDS BC2',
'30': 'TDS BC1',
'31': 'TDS Inj'}}]
}
class DAQBunchPattern:
def __init__(self, linac='XFEL'):
# constants for Scan Mode
self.linacs = SASE_info.keys()
self.linac = None
self.bunch_info = None
list_out = []
for lin in self.linacs:
linc = linac.upper()
list_out.append(lin)
if linc == lin:
self.bunch_info = SASE_info[lin]
self.linac = linc
if not self.bunch_info:
out = "Unknown LINAC name " + linac + ' (must be one of'
for l in list_out:
out += ' ' + l
out += ')'
raise Exception(out)
def get_names_of_all_linac(self):
out = []
for entry in self.linacs:
out.append(entry)
return out
def print_all_patterns(self):
print('LINAC->%s' % self.linac)
for entry in self.bunch_info:
print('\t%s:' % entry['Name'].strip('"\''), end='')
for val in entry['Value'].values():
print('\n\t\t%s' % str(val).strip('"\''), end='')
print()
def get_bunch_pattern_info(self, mode, debug):
if isinstance(mode, int):
pass
elif isinstance(mode, float) or isinstance(mode, np.float32):
mode = dqb.float2int(mode)
else:
raise Exception(
"Invalid data type %s (must be int or float) " % (type(mode)))
result = []
if debug:
print('mode: 0x%X' % mode)
for entry in self.bunch_info:
bits = entry['Bits']
op = entry['Op']
name = entry['Name']
values = entry['Value']
mask = (2**(len(bits))) - 1
shift = bits[0]
if debug:
print('Name: %s Op:%s Bits Total: %d Shift:%d Mask:%d' %
(name, op, len(bits), shift, mask))
if op == 'Abs':
rest = (mode >> shift) & mask
for val in values.keys():
if int(val) == rest:
if debug:
print('%s -> %s ' % (name, values[val]))
result.append({name: values[val]})
break
if op == 'Bits':
out = []
for val in values.keys():
if 1 << int(val) & mode:
#print('Comparing 0x%X 0x%X'%(1<<int(val), mode))
if debug:
print('%s -> %s ' % (name, values[val]))
result.append({name: values[val]})
# if out != []:
# if debug: print('%s -> '%(name), out)
# result.append(out)
if debug:
print('result:', result)
return result
def check_bunch_pattern(self, value, pattern, logic, debug):
if not isinstance(pattern, list):
raise Exception(
"Invalid data type %s (must be list) "(type(pattern)))
res = self.get_bunch_pattern_info(value, debug)
if pattern == []:
return True, res
found = 0
for item in pattern:
for entry in res:
ks = item.keys()
for t in ks:
if t in entry and (entry[t] == item[t]):
found += 1
if logic == 'AND' and found == len(pattern):
return True, res
if logic == 'OR' and found:
return True, res
return False, res
# channel description class
# Dimentions list should look like:
# [{'DIMn:'{'DIM_NAME':'', 'DIM_UNITS':'', 'DIM_DESCR':'', 'DIM_START':'', 'DIM_INC':'', 'DIM_NDATA':'', 'DIM_GROUPS':'', 'DIM_GROUP_INC':''}}, ...]
# where n - dimention index (0...)
# Bits list should look like:
# [{'BITn:'{'BIT_NAME':'', 'BIT_DESCR':''}}, ...]
# where n - bit index (0...)
# variable for XML channel description file parsing
tags = {'STREAM': 0, 'EVTYPE': '', 'DTYPE': '', 'COUNT': 0, 'SB_NAME': 'None', 'DB_KEY': 0, 'PROP': '',
'NAME': '', 'SETPOINT': 0, 'SUBSYSTEM': '', 'UNITS': '', 'DESC': '', 'SIZE': 0, 'DIM': {}, 'BIT': {}}
tagstoattr = {'EVTYPE': 'EventTypeMask', 'DTYPE': 'DAQdatatype', 'DB_KEY': 'RCDBkey', 'SB_NAME': 'ServerBlockName',
'PROP': 'DOOCSproperty', 'NAME': 'DAQchannel', 'SETPOINT': 'Setpoint', 'SUBSYSTEM': 'Subsystem', 'UNITS': 'Units', 'DESC': 'Description'}
tagstoattrsubch = {'DIM': {'DIM_NAME': 'SubchannelName', 'DIM_UNITS': 'SubchanelUnits',
'DIM_DESCR': 'SubchannelDescription'}, 'BIT': {'BIT_NAME': 'BitName', 'BIT_DESCR': 'BitDescription'}}
internaltags = {'DIM', 'BIT'}
dimtags = {'DIM_NAME': None, 'DIM_UNITS': None, 'DIM_DESCR': None, 'DIM_START': None, 'DIM_INC': None, 'DIM_NDATA': None, 'DIM_GROUPS': None, 'DIM_GROUP_INC': None,
'DIM_XSTART': None, 'DIM_WIDTH': None, 'DIM_XBIN': None, 'DIM_YSTART': None, 'DIM_HEIGHT': None, 'DIM_YBIN': None, 'DIM_BPP': None, 'DIM_EBITPP': None, 'DIM_UNSIGNED': None}
# table for mapping TAGS to attributes
dimtagstoattr = {'DIM_UNITS': 'Units', 'DIM_DESCR': 'Description', 'DIM_START': 'Start', 'DIM_INC': 'Inc', 'DIM_NDATA': 'GroupSamples', 'DIM_GROUPS': 'Groups', 'DIM_GROUP_INC': 'GroupInc', 'DIM_XSTART': 'Xstart',
'DIM_WIDTH': 'Width', 'DIM_XBIN': 'Xbin', 'DIM_YSTART': 'Ystart', 'DIM_HEIGHT': 'Height', 'DIM_YBIN': 'Ybin', 'DIM_BPP': 'Bytesperpixel', 'DIM_EBITPP': 'Bitsperpixel', 'DIM_UNSIGNED': 'Unsigned'}
# Tags that can have sub-tags
bittags = {'BIT_NAME': '', 'BIT_DESCR': ''}
# tags containig integers
inttags = ['STREAM', 'DB_KEY', 'COUNT', 'SETPOINT', 'SIZE', 'DIM_NDATA', 'DIM_GROUPS', 'DIM_XSTART',
'DIM_WIDTH', 'DIM_XBIN', 'DIM_YSTART', 'DIM_HEIGHT', 'DIM_YBIN', 'DIM_BPP', 'DIM_EBITPP', 'DIM_UNSIGNED']
# tags containig floats
floattags = ['DIM_START', 'DIM_INC', 'DIM_GROUP_INC']
def clean_dimtags():
global dimtags
dimtags = {'DIM_NAME': None, 'DIM_UNITS': None, 'DIM_DESCR': None, 'DIM_START': None, 'DIM_INC': None, 'DIM_NDATA': None, 'DIM_GROUPS': None, 'DIM_GROUP_INC': None,
'DIM_XSTART': None, 'DIM_WIDTH': None, 'DIM_XBIN': None, 'DIM_YSTART': None, 'DIM_HEIGHT': None, 'DIM_YBIN': None, 'DIM_BPP': None, 'DIM_EBITPP': None, 'DIM_UNSIGNED': None}
def clean_bittags():
global bittags
bittags = {'BIT_NAME': '', 'BIT_DESCR': ''}
def allcleanup():
global tags
tags = {'STREAM': 0, 'EVTYPE': '', 'DTYPE': '', 'COUNT': 0, 'SB_NAME': 'None', 'DB_KEY': 0, 'PROP': '',
'NAME': '', 'SETPOINT': 0, 'SUBSYSTEM': '', 'UNITS': '', 'DESC': '', 'SIZE': 0, 'DIM': {}, 'BIT': {}}
clean_dimtags()
clean_bittags()
def get_xml_tag(pat):
return '<'+pat+'>'
def get_xml_tagn(pat):
return '<'+pat+'>\n'
def get_xml_endtag(pat):
return '</'+pat+'>'
def get_xml_endtagn(pat):
return '</'+pat+'>\n'
class DAQChanDescr:
def __init__(self, dtype=0, sb_name="", prop="", name="", chan={}):
self.chan = {'STREAM': 0, 'EVTYPE': '', 'DTYPE': '', 'DB_KEY': 0, 'COUNT': 0, 'SB_NAME': 'None', 'PROP': '',
'NAME': '', 'SETPOINT': 0, 'SUBSYSTEM': '', 'UNITS': '', 'DESC': '', 'SIZE': 0, 'DIM': {}, 'BIT': {}}
if chan != {}:
for key in chan.keys():
if key in self.chan:
self.chan[key] = chan[key]
if sb_name != '':
self.chan['SB_NAME'] = sb_name
if prop != '':
self.chan['PROP'] = prop
if name != '':
self.chan['NAME'] = name
def get_attr(self, nm):
if nm in self.chan.keys():
return self.chan[nm]
else:
return None
def is_it_you(self, nm):
if self.chan['NAME'] == nm:
return True
return False
def get_dims(self, index):
tag = "DIM"+str(index)
#print(index, self.chan)
for hash in self.chan["DIM"]:
#print('index', index,'tag', tag, 'hash',hash)
if tag in hash:
return self.chan["DIM"][hash]
# return hash[tag]
return None
def get_bits(self, index):
tag = "BIT"+str(index)
for hash in self.chan["BIT"]:
if tag in hash:
return hash[tag]
return None
def get_name(self):
return self.chan['NAME']
def print(self):
print(self.chan)
def get_xml_description(self):
out = get_xml_tagn('CHANNEL')
for key in self.chan.keys():
if key == 'DIM' or key == 'BIT':
if self.chan[key] != {}:
if key == 'DIM':
n = self.chan[key]['DIMS']
else:
n = self.chan[key]['BITS']
total = 0
index = 0
while total < n and index < 64:
lkey = key + str(index)
if lkey in self.chan[key]:
out += get_xml_tag(key) + str(index) + '\n'
lkey = key + str(index)
for dkey in self.chan[key][lkey]:
if dkey != 'DIMS' and dkey != 'BITS':
out += get_xml_tag(dkey)
if self.chan[key][lkey][dkey] != None:
out += str(self.chan[key][lkey]
[dkey]).replace('\n', '')
out += get_xml_endtagn(dkey)
out += get_xml_endtagn(key)
total += 1
index += 1
else:
out += get_xml_tag(key)
if self.chan[key] == 0 or self.chan[key] == '':
out += str(0)
elif self.chan[key] == 'None':
out += ''
else:
out += str(self.chan[key])
out += get_xml_endtagn(key)
out += get_xml_endtagn('CHANNEL')
return out
'''
<CHANNEL>
<STREAM>0</STREAM>
<EVTYPE>0</EVTYPE>
<DTYPE>0</DTYPE>
<COUNT>0</COUNT>
<SB_NAME></SB_NAME>
<DB_KEY>138375</DB_KEY>
<PROP>FLASH.DIAG/BPM/11FLFMAFF/DAQ_CHANNEL</PROP>
<NAME>FLASH.DIAG/BPM/11FLFMAFF</NAME>
<SETPOINT>0</SETPOINT>
<SUBSYSTEM>FLASH_BPM_DIAGNOSTICS</SUBSYSTEM>
<UNITS>mm</UNITS>
<DESC></DESC>
<SIZE>2048</SIZE>
<DIM>0
<DIM_NAME>X.TD</DIM_NAME>
<DIM_UNITS>mm</DIM_UNITS>
<DIM_DESCR>FLASH BPM X</DIM_DESCR>
</DIM>
<DIM>1
<DIM_NAME>Y.TD</DIM_NAME>
<DIM_UNITS>mm</DIM_UNITS>
<DIM_DESCR>FLASH BPM Y</DIM_DESCR>
</DIM>
<BIT>0
<BIT_NAME>OpMode: GUN</BIT_NAME>
<BIT_DESCR>GUN operation mode</BIT_DESCR>
</BIT>
<BIT>1
<BIT_NAME>OpMode: ANALYSIS</BIT_NAME>
<BIT_DESCR>ANALYSIS operation mode</BIT_DESCR>
</BIT>
</CHANNEL>
'''
import sys
import os
import xml.etree.ElementTree as ET
import re
import DAQChanDescr
import datetime
class ChanDescrList:
def __init__(self, xmlfile="", chans=[], chandescrlst=[]):
self.chans = chans # channels to extract, if [] - all
self.xmlfile = xmlfile
self.chandescrlst = []
if chandescrlst == [] and ((self.xmlfile == '' or not os.path.exists(self.xmlfile)) or (not os.access(self.xmlfile, os.R_OK))):
raise Exception(
"Something wrong with XML file OR channel description list is empty" + self.xmlfile)
if chandescrlst != []:
self.chandescrlst = chandescrlst
return
tree = ET.parse(self.xmlfile)
root = tree.getroot()
dims = 0
bits = 0
for elem in root:
# print(elem.tag)
if elem.tag == 'CHANNEL':
if DAQChanDescr.tags['NAME'] != '':
# creating a new chan description
#tags['DIM'] = dims
#tags['BIT'] = bits
if dims:
DAQChanDescr.tags['DIM'].update({'DIMS': dims})
if bits:
DAQChanDescr.tags['BIT'].update({'BITS': bits})
dims = 0
bits = 0
chd = DAQChanDescr.DAQChanDescr(chan=DAQChanDescr.tags)
if self.chans == [] or chd.get_name() in self.chans:
self.chandescrlst.append(chd)
DAQChanDescr.allcleanup()
for subelem in elem:
if subelem.tag in DAQChanDescr.tags:
#print(subelem.tag, ":",subelem.text )
if subelem.tag not in DAQChanDescr.internaltags:
if subelem.tag in DAQChanDescr.inttags:
DAQChanDescr.tags[subelem.tag] = int(subelem.text)
elif subelem.tag in DAQChanDescr.floattags:
DAQChanDescr.tags[subelem.tag] = float(
subelem.text)
else:
DAQChanDescr.tags[subelem.tag] = subelem.text
if subelem.tag in DAQChanDescr.internaltags: # checking for DIM or BIT
DAQChanDescr.clean_dimtags()
DAQChanDescr.clean_bittags()
# DIM or BIT index
index = subelem.text.strip()
for inelem in subelem:
if inelem.tag in DAQChanDescr.dimtags:
#print(inelem.tag, " " , type(inelem.text), " ", inelem.text)
if inelem.text != None and inelem.text != 'None':
if inelem.tag in DAQChanDescr.inttags:
DAQChanDescr.dimtags[inelem.tag] = int(
inelem.text.strip())
elif inelem.tag in DAQChanDescr.floattags:
DAQChanDescr.dimtags[inelem.tag] = float(
inelem.text.strip())
else:
if type(inelem.text) == 'str':
DAQChanDescr.dimtags[inelem.tag] = inelem.text.strip(
)
else:
DAQChanDescr.dimtags[inelem.tag] = inelem.text
elif inelem.tag in DAQChanDescr.bittags:
if inelem.tag in DAQChanDescr.inttags:
DAQChanDescr.bittags[inelem.tag] = int(
inelem.text.strip())
elif inelem.tag in DAQChanDescr.floattags:
DAQChanDescr.bittags[inelem.tag] = float(
inelem.text.strip())
else:
if type(inelem.text) == 'str':
DAQChanDescr.bittags[inelem.tag] = inelem.text.strip(
)
else:
DAQChanDescr.bittags[inelem.tag] = inelem.text
item = {}
if subelem.tag == 'DIM':
#print('-------->', subelem.tag + index)
item[subelem.tag+index] = DAQChanDescr.dimtags
# print(item[subelem.tag+index])
DAQChanDescr.tags.setdefault(subelem.tag, {})
DAQChanDescr.tags[subelem.tag].update(item)
# print(DAQChanDescr.tags)
dims += 1
elif subelem.tag == 'BIT':
item[subelem.tag+index] = DAQChanDescr.bittags
DAQChanDescr.tags.setdefault(subelem.tag, {})
DAQChanDescr.tags[subelem.tag].update(item)
bits += 1
if DAQChanDescr.tags['NAME'] != '':
# creating a new chan description
#tags['DIM'] = dims
#tags['BIT'] = bits
if dims:
DAQChanDescr.tags['DIM'].update({'DIMS': dims})
if bits:
DAQChanDescr.tags['BIT'].update({'BITS': bits})
dims = 0
bits = 0
chd = DAQChanDescr.DAQChanDescr(chan=DAQChanDescr.tags)
if self.chans == [] or chd.get_name() in self.chans:
self.chandescrlst.append(chd)
def GetDescriptionList(self, chans=[]):
if chans == []:
return self.chandescrlst
else:
out = []
for entry in self.chandescrlst:
for ch in chans:
if re.search(ch, entry.get_name()):
out.append(entry)
return out
def GetDescriptionListAsXML(self, chans=[]):
res = self.GetDescriptionList(chans=[])
out = '<?xml version="1.0" encoding="ISO-8859-1" ?>\n'
out += DAQChanDescr.get_xml_tagn('CHANDESCRIPTIONS')
out += DAQChanDescr.get_xml_tagn('isodate')
out += datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
out += DAQChanDescr.get_xml_endtagn('isodate')
for d in res:
out += d.get_xml_description()
out += DAQChanDescr.get_xml_endtagn('CHANDESCRIPTIONS')
return out
import sys
import os
import xml.etree.ElementTree as ET
import re
import datetime
def get_key_by_value(a, v):
for key, value in a.iteritems():
if value == v:
return key
return None
class DAQRequest:
def __init__(self, xmlfile=''):
# constants for Scan Mode
self.TTF2_DAQ_SCAN_BY_TIME = 0x10000000
self.TTF2_DAQ_GIVE_WHAT_YOU_HAVE = 0x20000000
self.TTF2_DAQ_SCANM_FILES_SHIFT = 4
self.TTF2_DAQ_SCANM_ALL = 15
self.TTF2_DAQ_SCANM_MAX = self.TTF2_DAQ_SCANM_ALL
# definitions of Request Types
self.daqrequesttype = {
'TTF2_DAQ_REQ_DATA': 1,
'TTF2_DAQ_REQ_CHAN_LIST': 2,
'TTF2_DAQ_REQ_STAT': 3
}
# Definitions of communication type
self.daqcomtype = {
'TTF2_DAQ_COMM_PUSH': 1,
'TTF2_DAQ_COMM_HNDS': 2
}
# members
self.listtags = ['Chan', 'File']
self.intags = ['ReqId', 'RunFirst', 'RunLast',
'ScanMode', 'ReqType', 'CommMode']
self.request = {'ReqId': None, 'TStart': None, 'TStop': None, 'RunFirst': None, 'RunLast': None, 'Exp': None, 'DDir': None,
'CDir': None, 'Chan': None, 'File': None, 'ScanMode': None, 'ReqType': None, 'CommMode': None, 'ConfFile': None}
self.xmlfile = xmlfile
# initialization
self.roottag = 'DAQREQ'
if(self.xmlfile == '' or not os.path.exists(self.xmlfile)) or (not os.access(self.xmlfile, os.R_OK)):
raise Exception("Something wrong with XML file " + self.xmlfile)
tree = ET.parse(self.xmlfile)
root = tree.getroot()
if root.tag != self.roottag:
raise Exception("Root tag (%s) is invalid in XML file %s (must be %s) " % (
root.tag, self.xmlfile, self.roottag))
for elem in root:
# print(elem.tag)
if elem.tag in self.request: