SAT9/.resources/45fe4d5f2d21e947305533276ba3ab85dca61988c709f81a5bc50a63061f474a
2025-05-01 00:04:09 -07:00

302 lines
11 KiB
Plaintext

from datetime import datetime
import Queue
import copy
"""Global variables required so we can use different event timer scripts to perform
the writing of tag values seperateley from the reading of messages on the web socket.
State tags messages are queued into the global_queue, where they are read on a different
thread by the Update class and writen to tags. All alarms are written to global_alarms.
The alarms are then read by the Visualisation.status class where they are transformed into
an alarm state and written to tags. """
global_alarms = {}
global_states = set([])
global_queue = Queue.Queue(maxsize=100000)
global_first_connect = False
class A2C_MessageHandler():
"""
Handles the incoming A2C messages. Stores values in memory as dictionaries
and then writes them to tags.
Instance Attributes:
self.alarms: Holds the current active alarm data(dict).
self.state: Holds the current active state data(dict).
self.update_state: Flag, set when state data is available to write(bool)
self.update_alarms: Flag, set when alarm data is available to write(bool)
self.whid: Warehouse id for the site(str)
self.tag_provider: Tag provider to write tag values to(str)
self.A2C_MESSAGE_TYPES: Holds a reference to the methods called
for different message types(dict)
Returns:
NA.
Raises:
KeyError: NA.
"""
def __init__(self, whid):
global global_alarms
global global_first_connect
global_first_connect = False
global_alarms = {}
self.whid = whid
self.tag_provider = "[%s_SCADA_TAG_PROVIDER]" % (self.whid)
self.logger = system.util.getLogger("%s-Web-Socket-Message-Handler" % (self.whid))
tag_to_read = "%sSystem/wbsckt_logging" % (self.tag_provider)
self.check_logger_active = system.tag.readBlocking([tag_to_read])[0].value
self.A2C_MESSAGE_TYPES = {
"SCADAMetricsInterface.StateChange": self.handle_state_message,
# "SCADAMetricsInterface.Event": self.handle_alarm_message,
"CloudMetricsInterface.Event": self.handle_alarm_message,
"ScadaCloud.Download": self.handle_download_message,
"ScadaCloud.Connection": self.handle_cloud_connection}
def message_logger(self, message):
if self.check_logger_active:
self.logger.info(message)
def message_lookup(self, message_type, message):
try:
self.A2C_MESSAGE_TYPES[message_type](message)
except KeyError as e:
self.message_logger("Message type not found:"
+ str(message))
def handle_message(self, message):
heartbeat_message = message.get("action")
message_type = message.get("payload", {}).get("payloadType")
format = message.get("payload", {}).get("format")
if message_type == "ScadaCloud.Batch":
messages = message.get("payload", {}).get("messages",[])
for i in messages:
message_type = i.get("payload", {}).get("payloadType")
self.message_lookup(message_type, i)
elif message_type !="ScadaCloud.Batch" and not heartbeat_message:
self.message_lookup(message_type, message)
else:
self.message_logger("Heartbeat:" + str(message))
tag_to_write = "%sSystem/wbsckt_heartbeat_interval" % (self.tag_provider)
current_time = system.date.now()
system.tag.writeAsync([tag_to_write], [current_time])
def handle_alarm_message(self, message):
global global_alarms
header = message.get("header",{})
payload = message.get("payload",{})
source = header.get("sourceId")
alarm_message = payload.get("message")
alarm_type = payload.get("type")
timestamp = payload.get("timestamp")
priority = payload.get("priority")
alarm_id = payload.get("id")
shelve_expiry = payload.get("shelveExpiryEpoch",0)
state = payload.get("state")
if (isinstance(source, unicode) and isinstance(alarm_message, unicode)
and isinstance(alarm_type, int)and isinstance(timestamp, long)
and isinstance(priority, int) and isinstance(shelve_expiry, int)
and isinstance(state, int)) and isinstance(alarm_id, int):
scada_alarm_message = {"sourceId": source,
"message": alarm_message,
"type": alarm_type,
"timestamp": timestamp,
"priority": priority,
"id": alarm_id,
"shelveExpiryEpoch": shelve_expiry,
"state": state}
alarm_id = "%s/alarm/%s" % (source, alarm_id)
if state == 0:
removed_value = global_alarms.pop(alarm_id, "No key found")
self.message_logger("Value removed from aws_data: "
+ str(removed_value) + ":" + str(alarm_id))
else:
global_alarms[alarm_id] = scada_alarm_message
self.message_logger("Value added to aws_data: "
+ str(scada_alarm_message))
else:
self.message_logger("Incorrect type value in message fields: "
+ str(message))
def handle_state_message(self, message):
global global_queue
header = message.get("header",{})
payload = message.get("payload",{})
source_id = header.get("sourceId")
state = payload.get("currentMachineState")
time_stamp = payload.get("timestamp")
if isinstance(source_id, unicode) and isinstance(state, int):
scada_state_message = {"timestamp": time_stamp,
"state":state}
global_queue.put([source_id, state])
self.message_logger("State message written to queue: "
+ str({source_id:scada_state_message}))
else:
self.message_logger("Incorrect type value in message fields: "
+ str(message))
def handle_download_message(self, message):
url = message.get("payload", {}).get("downloadUrl", None)
session_id = message.get("payload", {}).get("sessionId", None)
download = {}
payload = {"session_id":session_id, "url": url}
download["data"] = [payload]
tag_to_write = "%sSystem/download" % (self.tag_provider)
json_payload = system.util.jsonEncode(download)
system.tag.writeAsync([tag_to_write], [json_payload])
self.message_logger("Download message received: "
+ str(message))
def handle_cloud_connection(self, message):
global global_alarms
UNKNOWN = 3
ACTIVE = 1
header = message.get("header",{})
payload = message.get("payload",{})
component_type = payload.get("componentType")
timestamp = header.get("timestamp", 0)
event_type = payload.get("eventType")
component_id = payload.get("componentId")
scada_alarm_message = create_disconnect_message(component_id, timestamp, 1)
if event_type == "DISCONNECT":
self.message_logger(str(scada_alarm_message))
# #Call disconnect routine with a value 3 which is an unknown state.
self.alarms_disconnect(component_id, UNKNOWN, scada_alarm_message)
if event_type == "CONNECT":
#Call disconnect routine with a value 1 which is an active state.
self.alarms_disconnect(component_id, ACTIVE, scada_alarm_message)
if event_type == "SYNC" and component_type == "PLC":
alarm_id = "%s/alarm/%s" % (component_id, message)
for k,v in global_alarms.items():
if k.startswith(component_id):
global_alarms.pop(alarm_id, "No key found")
def alarms_disconnect(self, component_id, value, message):
global global_alarms
#Set alarms in the global_alarms to an unknown state.
#If component id == "DATABRIDGE" set all alarms to unknown
SITE_DISCONNECTS = ["DATABRIDGE", self.whid]
if component_id in SITE_DISCONNECTS:
site_disconnect = True
source_id = ""
else:
source_id = component_id
site_disconnect = False
for k,v in global_alarms.items():
device_name = k.split("/")[0]
if k.startswith(source_id) and device_name not in SITE_DISCONNECTS:
global_alarms[k]["state"] = value
alarm_id = "%s/alarm/%s" % (component_id, "Device disconnected")
#Set the alarms to true for device disconnects
if site_disconnect:
data_bridge_disconnect(self.whid, value, message, component_id)
else:
tag_path = "%s%s/DCN" % (self.tag_provider, source_id)
if value == 3:
create_disconnect_tags(self.whid, source_id)
global_alarms[alarm_id] = message
system.tag.writeBlocking([tag_path], [1])
else:
global_alarms.pop(alarm_id, "No key found")
system.tag.writeBlocking([tag_path], [0])
def create_disconnect_message(component_id, timestamp, state):
alarm_message = {"sourceId": component_id,
"message": "Device disconnected",
"type": 0,
"timestamp":timestamp,
"priority": 4,
"shelveExpiryEpoch": 0,
"state": state}
return alarm_message
def data_bridge_disconnect(whid, value, message, component_id):
global global_alarms
device_list = get_device_list(whid)
time_stamp = message.get("timestamp")
tags_to_write = []
values_to_write = []
if value == 3:
disconnect = True
else:
disconnect = False
for i in device_list:
create_disconnect_tags(whid, i)
alarm_id = "%s/alarm/%s" % (i, "Device disconnected")
tag_path = "[%s_SCADA_TAG_PROVIDER]%s/DCN" % (whid, i)
device_message = create_disconnect_message(i, time_stamp, 3)
tags_to_write.append(tag_path)
if disconnect:
global_alarms[alarm_id] = device_message
values_to_write.append(1)
else:
global_alarms.pop(alarm_id, "No key found")
values_to_write.append(0)
alarm_id = "%s/alarm/%s" % (component_id, "Device disconnected")
if disconnect:
global_alarms[alarm_id] = message
else:
global_alarms.pop(alarm_id, "No key found")
system.tag.writeAsync(tags_to_write, values_to_write)
def get_device_list(whid):
provider = "[%s_SCADA_TAG_PROVIDER]" % (whid)
tag_path = "%sConfiguration/DetailedViews" % (provider)
tags_to_read = system.tag.readBlocking([tag_path])
devices = system.util.jsonDecode(tags_to_read[0].value)
device_list = []
for k,v in devices.items():
for i in v:
device_list.append(i)
return device_list
def create_disconnect_tags(whid, component_id):
logger_name = "%s-Create-Disconnect-Tags" % (whid)
logger = system.util.getLogger(logger_name)
base = "[%s_SCADA_TAG_PROVIDER]%s" % (whid, component_id)
if not system.tag.exists(base + "/DCN"):
tag = {"name": "DCN",
"valueSource": "memory",
"dataType": "Boolean",
"value": True}
create_tag = system.tag.configure(base, tag)
if not create_tag[0].isGood():
logger.warn("Failed to create tag: " + str(source_id))
class Update():
def __init__(self):
tags_to_read = system.tag.readBlocking(["Configuration/FC"])
self.fc = tags_to_read[0].value
self.tag_provider = "[%s_SCADA_TAG_PROVIDER]" % (self.fc)
self.tags_to_write = []
self.values_to_write = []
self.logger = system.util.getLogger("%s-Global variable reader"
% (self.fc))
def read_messages_from_queue(self):
size = global_queue.qsize()
self.logger.info("Queue size: " + str(size))
for i in range(0, size):
message = global_queue.get()
source_id ="%s%s/STATE" % (self.tag_provider, message[0])
create_tags_in_place(source_id, message[1], self.logger)
self.tags_to_write.append(source_id)
self.values_to_write.append(message[1])
def write_tags(self):
alarm_path = "%sSystem/aws_data" % ( self.tag_provider)
alarm_data = system.util.jsonEncode(global_alarms)
self.tags_to_write.append(alarm_path)
self.values_to_write.append(alarm_data)
system.tag.writeBlocking(self.tags_to_write, self.values_to_write)
self.logger.info("State messages written: " + str(len(self.values_to_write)))
def create_tags_in_place(source_id, value, logger):
base = source_id.replace("/STATE", "")
if not system.tag.exists(source_id):
tag = {"name": "STATE",
"valueSource": "memory",
"dataType": "Int1",
"value": value}
create_tag = system.tag.configure(base, tag)
if not create_tag[0].isGood():
logger.warn("Failed to create tag: " + str(source_id))