"""Utility functions and classes."""from__future__importannotationsimportloggingfromcollectionsimportdefaultdict,dequefromdataclassesimportdataclassimportnumpyasnpfromwaveline.datatypesimportAERecord,TRRecordlogger=logging.getLogger(__name__)
[docs]defdecibel_to_volts(decibel:float|np.ndarray)->float|np.ndarray:""" Convert from dB(AE) to volts. Args: decibel: Input in decibel, scalar or array Returns: Input value(s) in volts """return1e-6*np.power(10,np.asarray(decibel)/20)
[docs]defvolts_to_decibel(volts:float|np.ndarray)->float|np.ndarray:""" Convert from volts to dB(AE). Args: volts: Inpult in volts, scalar or array Returns: Input value(s) in dB(AE) """return20*np.log10(np.asarray(volts)*1e6)
[docs]@dataclassclassHitRecord:"""Merged hit record combining AERecord and TRRecord."""ae:AERecordtr:TRRecord|None
[docs]classQueueFullError(Exception):"""Exception raised when a queue is full."""
[docs]classHitMerger:""" Merge AE and TR records into HitRecords based on the transient recorder index (trai). Since AE and TR records arrive in order for each channel, AE records are stored in channel-specific queues and merged with corresponding TR records as they become available. """
[docs]def__init__(self,max_queue_size:int|None=None):""" Initialize the HitMerger with an optional maximum queue size. Args: max_queue_size: Maximum queue size for each channel. If `None`, queues are unbounded. """self._channel_state:dict[int,HitMerger.ChannelState]=defaultdict(lambda:HitMerger.ChannelState(deque(maxlen=max_queue_size),0))
[docs]defclear(self):""" Clear all buffered AE records. """self._channel_state.clear()
[docs]defprocess(self,record:AERecord|TRRecord)->HitRecord|None:""" Process a single AE or TR record. Returns: HitRecord if a merge occurred, otherwise None. Raises: QueueFullError: If the queue for a channel is full when processing an AERecord. """ifisinstance(record,AERecord):returnself._handle_ae_record(record)ifisinstance(record,TRRecord):returnself._handle_tr_record(record)returnNone
def_handle_ae_record(self,ae_record:AERecord)->HitRecord|None:ifae_record.trai==0:returnHitRecord(ae=ae_record,tr=None)state=self._channel_state[ae_record.channel]ifstate.queue.maxlenisnotNoneandlen(state.queue)>=state.queue.maxlen:raiseQueueFullError()assertae_record.trai>state.last_trai,"TRAI must be strictly increasing per channel"state.queue.append(ae_record)state.last_trai=ae_record.traireturnNonedef_handle_tr_record(self,tr_record:TRRecord)->HitRecord|None:state=self._channel_state[tr_record.channel]logger.debug("AE queue size for channel %d: %s",tr_record.channel,len(state.queue))whilestate.queueandstate.queue[0].trai<tr_record.trai:ae_record=state.queue.popleft()logger.warning("Missing TR for TRAI %d, discard AE",ae_record.trai)ifnotstate.queueorstate.queue[0].trai>tr_record.trai:logger.warning("Missing AE for TRAI %d, discard TR",tr_record.trai)returnNoneae_record=state.queue.popleft()assertae_record.trai==tr_record.traiassertae_record.channel==tr_record.channelreturnHitRecord(ae=ae_record,tr=tr_record)