SAT9/.resources/8cdb6d9bf5f22917271d647f00ff55dca81825dc0da06d1c036e7d406e2999ae

67 lines
2.6 KiB
Plaintext

#from logging import raiseExceptions
class GetStatus():
def __init__(self, whid, alarm_data):
self.alarm_data = alarm_data
self.priority_dict = {}
self.id_to_status = {}
self.tag_provider = "[%s_SCADA_TAG_PROVIDER]" % (whid)
self.logger = system.util.getLogger("%s-Update-Visualisation" % (whid))
def convert_priority(self, priority):
#The alarm priority is converted into a status
#This is based on the highest active alarm priority
if str(priority) == "0":
return 4
elif str(priority) == "1":
return 3
elif str(priority) == "2":
return 2
elif str(priority) == "3" or priority == "4":
return 1
else:
return 6
def check_priority(self, alarm_id, priority):
#We check to see if the current priority is greater
#than the current priority in the priority_dict. This is
#because the status is based on the active alarm. With the
#highest priority.
controller_id = alarm_id.split("/")[0]
if self.priority_dict.get(controller_id) is None:
self.priority_dict[controller_id] = {}
self.priority_dict[controller_id][alarm_id] = self.convert_priority(priority)
elif self.priority_dict[controller_id].get(alarm_id) is None:
self.priority_dict[controller_id][alarm_id] = self.convert_priority(priority)
elif self.priority_dict[controller_id].get(alarm_id) < priority:
self.priority_dict[controller_id][alarm_id] = (
self.convert_priority
(priority)
)
def run_status(self):
for i in self.alarm_data:
alarm_id = i
priority = self.alarm_data.get(i, {}).get("priority")
if priority != None:
self.check_priority(alarm_id, priority)
def update_tags(self):
device_paths = []
status_values = []
if isinstance(self.priority_dict, dict):
for i in self.priority_dict:
device_path = "%sTags/%s/IdToStatus/json" % (self.tag_provider, i)
device_paths.append(device_path)
status_json = self.priority_dict.get(i)
status_values.append(system.util.jsonEncode(status_json, 4))
self.logger.info("device paths " + str(device_paths))
else:
raise TypeError("Not a python dictionary")
system.tag.writeAsync(device_paths, status_values)