SAT9/.resources/0470e3a8c0d099d79f0822cd23d9842d97260e89d8d6d32510a03b987698cc8f
2025-04-18 19:44:27 +04:00

67 lines
2.6 KiB
Plaintext

#from logging import raiseExceptions
class GetStatus():
def __init__(self, whid, alarm_data):
self.alarm_data = alarm_data
self.priority_dict = {}
self.id_to_status = {}
self.tag_provider = "[%s_SCADA_TAG_PROVIDER]" % (whid)
self.logger = system.util.getLogger("%s-Update-Visualisation" % (whid))
def convert_priority(self, priority):
#The alarm priority is converted into a status
#This is based on the highest active alarm priority
if str(priority) == "0":
return 4
elif str(priority) == "1":
return 3
elif str(priority) == "2":
return 2
elif str(priority) == "3" or priority == "4":
return 1
else:
return 6
def check_priority(self, alarm_id, priority):
#We check to see if the current priority is greater
#than the current priority in the priority_dict. This is
#because the status is based on the active alarm. With the
#highest priority.
controller_id = alarm_id.split("/")[0]
if self.priority_dict.get(controller_id) is None:
self.priority_dict[controller_id] = {}
self.priority_dict[controller_id][alarm_id] = self.convert_priority(priority)
elif self.priority_dict[controller_id].get(alarm_id) is None:
self.priority_dict[controller_id][alarm_id] = self.convert_priority(priority)
elif self.priority_dict[controller_id].get(alarm_id) < priority:
self.priority_dict[controller_id][alarm_id] = (
self.convert_priority
(priority)
)
def run_status(self):
for i in self.alarm_data:
alarm_id = i
priority = self.alarm_data.get(i, {}).get("priority")
if priority != None:
self.check_priority(alarm_id, priority)
def update_tags(self):
device_paths = []
status_values = []
if isinstance(self.priority_dict, dict):
for i in self.priority_dict:
device_path = "%sTags/%s/IdToStatus/json" % (self.tag_provider, i)
device_paths.append(device_path)
status_json = self.priority_dict.get(i)
status_values.append(system.util.jsonEncode(status_json, 4))
self.logger.info("device paths " + str(device_paths))
else:
raise TypeError("Not a python dictionary")
system.tag.writeAsync(device_paths, status_values)