def latencyCalc(): import json import system from java.lang import Exception as JException, Thread import time whid = system.tag.readBlocking(["Configuration/FC"])[0].value aws_data,first_pass = system.tag.readBlocking(['[%s_SCADA_TAG_PROVIDER]Latency/aws_data_copy'% whid,'[%s_SCADA_TAG_PROVIDER]Latency/first_pass'% whid]) json_data = json.loads(aws_data.value) first_pass = first_pass.value ids = [] # in instances where aws_data tag has been reset if not json_data: system.tag.writeBlocking(['[%s_SCADA_TAG_PROVIDER]Latency/first_pass.value'% whid], [1]) return #### check if this is the first time running. if it is, set the values in prev_key to get this kicked off and then set the flag tag to False. if first_pass: if not json_data: return for key in json_data: ids.append(str(key)) system.tag.writeBlocking(["[%s_SCADA_TAG_PROVIDER]Latency/prev_key"% whid,"[%s_SCADA_TAG_PROVIDER]Latency/first_pass"% whid],[ids,0]) return # get a list of names that are new and can be used to calculate latency. # added try except in here in case there is a time when new prev keys are present during first pass or a reset of the tag accidentely. try: prev_ids = set(system.util.jsonDecode(system.tag.readBlocking(['[%s_SCADA_TAG_PROVIDER]Latency/prev_key'% whid])[0].value)) except: system.tag.writeBlocking(['[%s_SCADA_TAG_PROVIDER]Latency/first_pass.value'% whid], 1) return to_be_processed = [] new_to_be_processed = {} for key in json_data: if key not in prev_ids: to_be_processed.append(str(key)) if to_be_processed: new_to_be_processed = {key: json_data[key] for key in to_be_processed if key in json_data} #### This class will read the aws_data tag and calculate the latency of all the tags provided(newly added tags). class TimestampLatencyCollector: def __init__(self): self.latencies = [] def process_dict(self, data_dict): import system current_time_ms = system.date.toMillis(system.date.now()) # Current time in milliseconds for key, value in data_dict.items(): timestamp_ms = data_dict[key]['timestamp'] # Calculate latency in milliseconds latency_ms = current_time_ms - timestamp_ms self.latencies.append(latency_ms) if new_to_be_processed: last_alarm_change = '[%s_SCADA_TAG_PROVIDER]Latency/last_alarm_change_ts.value'% whid # Create an instance of the TimestampLatencyCollector collector = TimestampLatencyCollector() # Process the nested dictionary collector.process_dict(new_to_be_processed) #read in circular buffer of latencies circular_buffer_tag_path = '[%s_SCADA_TAG_PROVIDER]Latency/rolling_latency.value'% whid circular_buffer = system.tag.readBlocking([circular_buffer_tag_path])[0].value for latency in collector.latencies: circular_buffer.append(str(latency)) # keep only the last 30 entries. circular_buffer = circular_buffer[-30:] system.tag.writeBlocking([circular_buffer_tag_path,last_alarm_change], [circular_buffer,system.date.now()]) persistence = [] for key in json_data: persistence.append(str(key)) system.tag.writeBlocking(["[%s_SCADA_TAG_PROVIDER]Latency/prev_key"% whid],[persistence])