#from logging import raiseExceptions class GetStatus(): def __init__(self, whid, alarm_data): self.alarm_data = alarm_data self.priority_dict = {} self.id_to_status = {} self.tag_provider = "[%s_SCADA_TAG_PROVIDER]" % (whid) self.logger = system.util.getLogger("%s-Update-Visualisation" % (whid)) def convert_priority(self, priority): #The alarm priority is converted into a status #This is based on the highest active alarm priority if str(priority) == "0": return 4 elif str(priority) == "1": return 3 elif str(priority) == "2": return 2 elif str(priority) == "3" or priority == "4": return 1 else: return 6 def check_priority(self, alarm_id, priority): #We check to see if the current priority is greater #than the current priority in the priority_dict. This is #because the status is based on the active alarm. With the #highest priority. controller_id = alarm_id.split("/")[0] if self.priority_dict.get(controller_id) is None: self.priority_dict[controller_id] = {} self.priority_dict[controller_id][alarm_id] = self.convert_priority(priority) elif self.priority_dict[controller_id].get(alarm_id) is None: self.priority_dict[controller_id][alarm_id] = self.convert_priority(priority) elif self.priority_dict[controller_id].get(alarm_id) < priority: self.priority_dict[controller_id][alarm_id] = ( self.convert_priority (priority) ) def run_status(self): for i in self.alarm_data: alarm_id = i priority = self.alarm_data.get(i, {}).get("priority") if priority != None: self.check_priority(alarm_id, priority) def update_tags(self): device_paths = [] status_values = [] if isinstance(self.priority_dict, dict): for i in self.priority_dict: device_path = "%sTags/%s/IdToStatus/json" % (self.tag_provider, i) device_paths.append(device_path) status_json = self.priority_dict.get(i) status_values.append(system.util.jsonEncode(status_json, 4)) self.logger.info("device paths " + str(device_paths)) else: raise TypeError("Not a python dictionary") system.tag.writeAsync(device_paths, status_values)