302 lines
11 KiB
Plaintext
302 lines
11 KiB
Plaintext
from datetime import datetime
|
|
import Queue
|
|
import copy
|
|
|
|
"""Global variables required so we can use different event timer scripts to perform
|
|
the writing of tag values seperateley from the reading of messages on the web socket.
|
|
State tags messages are queued into the global_queue, where they are read on a different
|
|
thread by the Update class and writen to tags. All alarms are written to global_alarms.
|
|
The alarms are then read by the Visualisation.status class where they are transformed into
|
|
an alarm state and written to tags. """
|
|
|
|
global_alarms = {}
|
|
global_states = set([])
|
|
global_queue = Queue.Queue(maxsize=100000)
|
|
global_first_connect = False
|
|
|
|
|
|
class A2C_MessageHandler():
|
|
"""
|
|
Handles the incoming A2C messages. Stores values in memory as dictionaries
|
|
and then writes them to tags.
|
|
|
|
Instance Attributes:
|
|
self.alarms: Holds the current active alarm data(dict).
|
|
self.state: Holds the current active state data(dict).
|
|
self.update_state: Flag, set when state data is available to write(bool)
|
|
self.update_alarms: Flag, set when alarm data is available to write(bool)
|
|
self.whid: Warehouse id for the site(str)
|
|
self.tag_provider: Tag provider to write tag values to(str)
|
|
self.A2C_MESSAGE_TYPES: Holds a reference to the methods called
|
|
for different message types(dict)
|
|
|
|
Returns:
|
|
NA.
|
|
|
|
Raises:
|
|
KeyError: NA.
|
|
"""
|
|
def __init__(self, whid):
|
|
global global_alarms
|
|
global global_first_connect
|
|
global_first_connect = False
|
|
global_alarms = {}
|
|
self.whid = whid
|
|
self.tag_provider = "[%s_SCADA_TAG_PROVIDER]" % (self.whid)
|
|
self.logger = system.util.getLogger("%s-Web-Socket-Message-Handler" % (self.whid))
|
|
tag_to_read = "%sSystem/wbsckt_logging" % (self.tag_provider)
|
|
self.check_logger_active = system.tag.readBlocking([tag_to_read])[0].value
|
|
self.A2C_MESSAGE_TYPES = {
|
|
"SCADAMetricsInterface.StateChange": self.handle_state_message,
|
|
# "SCADAMetricsInterface.Event": self.handle_alarm_message,
|
|
"CloudMetricsInterface.Event": self.handle_alarm_message,
|
|
"ScadaCloud.Download": self.handle_download_message,
|
|
"ScadaCloud.Connection": self.handle_cloud_connection}
|
|
|
|
def message_logger(self, message):
|
|
if self.check_logger_active:
|
|
self.logger.info(message)
|
|
|
|
def message_lookup(self, message_type, message):
|
|
try:
|
|
self.A2C_MESSAGE_TYPES[message_type](message)
|
|
except KeyError as e:
|
|
self.message_logger("Message type not found:"
|
|
+ str(message))
|
|
|
|
def handle_message(self, message):
|
|
heartbeat_message = message.get("action")
|
|
message_type = message.get("payload", {}).get("payloadType")
|
|
format = message.get("payload", {}).get("format")
|
|
if message_type == "ScadaCloud.Batch":
|
|
messages = message.get("payload", {}).get("messages",[])
|
|
for i in messages:
|
|
message_type = i.get("payload", {}).get("payloadType")
|
|
self.message_lookup(message_type, i)
|
|
elif message_type !="ScadaCloud.Batch" and not heartbeat_message:
|
|
self.message_lookup(message_type, message)
|
|
else:
|
|
self.message_logger("Heartbeat:" + str(message))
|
|
tag_to_write = "%sSystem/wbsckt_heartbeat_interval" % (self.tag_provider)
|
|
current_time = system.date.now()
|
|
system.tag.writeAsync([tag_to_write], [current_time])
|
|
|
|
|
|
def handle_alarm_message(self, message):
|
|
global global_alarms
|
|
header = message.get("header",{})
|
|
payload = message.get("payload",{})
|
|
source = header.get("sourceId")
|
|
alarm_message = payload.get("message")
|
|
alarm_type = payload.get("type")
|
|
timestamp = payload.get("timestamp")
|
|
priority = payload.get("priority")
|
|
alarm_id = payload.get("id")
|
|
shelve_expiry = payload.get("shelveExpiryEpoch",0)
|
|
state = payload.get("state")
|
|
if (isinstance(source, unicode) and isinstance(alarm_message, unicode)
|
|
and isinstance(alarm_type, int)and isinstance(timestamp, long)
|
|
and isinstance(priority, int) and isinstance(shelve_expiry, int)
|
|
and isinstance(state, int)) and isinstance(alarm_id, int):
|
|
scada_alarm_message = {"sourceId": source,
|
|
"message": alarm_message,
|
|
"type": alarm_type,
|
|
"timestamp": timestamp,
|
|
"priority": priority,
|
|
"id": alarm_id,
|
|
"shelveExpiryEpoch": shelve_expiry,
|
|
"state": state}
|
|
alarm_id = "%s/alarm/%s" % (source, alarm_id)
|
|
if state == 0:
|
|
removed_value = global_alarms.pop(alarm_id, "No key found")
|
|
self.message_logger("Value removed from aws_data: "
|
|
+ str(removed_value) + ":" + str(alarm_id))
|
|
else:
|
|
global_alarms[alarm_id] = scada_alarm_message
|
|
self.message_logger("Value added to aws_data: "
|
|
+ str(scada_alarm_message))
|
|
else:
|
|
self.message_logger("Incorrect type value in message fields: "
|
|
+ str(message))
|
|
|
|
def handle_state_message(self, message):
|
|
global global_queue
|
|
header = message.get("header",{})
|
|
payload = message.get("payload",{})
|
|
source_id = header.get("sourceId")
|
|
state = payload.get("currentMachineState")
|
|
time_stamp = payload.get("timestamp")
|
|
if isinstance(source_id, unicode) and isinstance(state, int):
|
|
scada_state_message = {"timestamp": time_stamp,
|
|
"state":state}
|
|
global_queue.put([source_id, state])
|
|
self.message_logger("State message written to queue: "
|
|
+ str({source_id:scada_state_message}))
|
|
else:
|
|
self.message_logger("Incorrect type value in message fields: "
|
|
+ str(message))
|
|
|
|
def handle_download_message(self, message):
|
|
url = message.get("payload", {}).get("downloadUrl", None)
|
|
session_id = message.get("payload", {}).get("sessionId", None)
|
|
download = {}
|
|
payload = {"session_id":session_id, "url": url}
|
|
download["data"] = [payload]
|
|
tag_to_write = "%sSystem/download" % (self.tag_provider)
|
|
json_payload = system.util.jsonEncode(download)
|
|
system.tag.writeAsync([tag_to_write], [json_payload])
|
|
self.message_logger("Download message received: "
|
|
+ str(message))
|
|
def handle_cloud_connection(self, message):
|
|
global global_alarms
|
|
UNKNOWN = 3
|
|
ACTIVE = 1
|
|
header = message.get("header",{})
|
|
payload = message.get("payload",{})
|
|
component_type = payload.get("componentType")
|
|
timestamp = header.get("timestamp", 0)
|
|
event_type = payload.get("eventType")
|
|
component_id = payload.get("componentId")
|
|
scada_alarm_message = create_disconnect_message(component_id, timestamp, 1)
|
|
if event_type == "DISCONNECT":
|
|
self.message_logger(str(scada_alarm_message))
|
|
# #Call disconnect routine with a value 3 which is an unknown state.
|
|
self.alarms_disconnect(component_id, UNKNOWN, scada_alarm_message)
|
|
if event_type == "CONNECT":
|
|
#Call disconnect routine with a value 1 which is an active state.
|
|
self.alarms_disconnect(component_id, ACTIVE, scada_alarm_message)
|
|
if event_type == "SYNC" and component_type == "PLC":
|
|
alarm_id = "%s/alarm/%s" % (component_id, message)
|
|
for k,v in global_alarms.items():
|
|
if k.startswith(component_id):
|
|
global_alarms.pop(alarm_id, "No key found")
|
|
|
|
def alarms_disconnect(self, component_id, value, message):
|
|
global global_alarms
|
|
#Set alarms in the global_alarms to an unknown state.
|
|
#If component id == "DATABRIDGE" set all alarms to unknown
|
|
SITE_DISCONNECTS = ["DATABRIDGE", self.whid]
|
|
if component_id in SITE_DISCONNECTS:
|
|
site_disconnect = True
|
|
source_id = ""
|
|
else:
|
|
source_id = component_id
|
|
site_disconnect = False
|
|
for k,v in global_alarms.items():
|
|
device_name = k.split("/")[0]
|
|
if k.startswith(source_id) and device_name not in SITE_DISCONNECTS:
|
|
global_alarms[k]["state"] = value
|
|
alarm_id = "%s/alarm/%s" % (component_id, "Device disconnected")
|
|
#Set the alarms to true for device disconnects
|
|
if site_disconnect:
|
|
data_bridge_disconnect(self.whid, value, message, component_id)
|
|
else:
|
|
tag_path = "%s%s/DCN" % (self.tag_provider, source_id)
|
|
if value == 3:
|
|
create_disconnect_tags(self.whid, source_id)
|
|
global_alarms[alarm_id] = message
|
|
system.tag.writeBlocking([tag_path], [1])
|
|
else:
|
|
global_alarms.pop(alarm_id, "No key found")
|
|
system.tag.writeBlocking([tag_path], [0])
|
|
|
|
def create_disconnect_message(component_id, timestamp, state):
|
|
alarm_message = {"sourceId": component_id,
|
|
"message": "Device disconnected",
|
|
"type": 0,
|
|
"timestamp":timestamp,
|
|
"priority": 4,
|
|
"shelveExpiryEpoch": 0,
|
|
"state": state}
|
|
return alarm_message
|
|
|
|
def data_bridge_disconnect(whid, value, message, component_id):
|
|
global global_alarms
|
|
device_list = get_device_list(whid)
|
|
time_stamp = message.get("timestamp")
|
|
tags_to_write = []
|
|
values_to_write = []
|
|
if value == 3:
|
|
disconnect = True
|
|
else:
|
|
disconnect = False
|
|
for i in device_list:
|
|
create_disconnect_tags(whid, i)
|
|
alarm_id = "%s/alarm/%s" % (i, "Device disconnected")
|
|
tag_path = "[%s_SCADA_TAG_PROVIDER]%s/DCN" % (whid, i)
|
|
device_message = create_disconnect_message(i, time_stamp, 3)
|
|
tags_to_write.append(tag_path)
|
|
if disconnect:
|
|
global_alarms[alarm_id] = device_message
|
|
values_to_write.append(1)
|
|
else:
|
|
global_alarms.pop(alarm_id, "No key found")
|
|
values_to_write.append(0)
|
|
alarm_id = "%s/alarm/%s" % (component_id, "Device disconnected")
|
|
if disconnect:
|
|
global_alarms[alarm_id] = message
|
|
else:
|
|
global_alarms.pop(alarm_id, "No key found")
|
|
system.tag.writeAsync(tags_to_write, values_to_write)
|
|
|
|
def get_device_list(whid):
|
|
provider = "[%s_SCADA_TAG_PROVIDER]" % (whid)
|
|
tag_path = "%sConfiguration/DetailedViews" % (provider)
|
|
tags_to_read = system.tag.readBlocking([tag_path])
|
|
devices = system.util.jsonDecode(tags_to_read[0].value)
|
|
device_list = []
|
|
for k,v in devices.items():
|
|
for i in v:
|
|
device_list.append(i)
|
|
return device_list
|
|
|
|
def create_disconnect_tags(whid, component_id):
|
|
logger_name = "%s-Create-Disconnect-Tags" % (whid)
|
|
logger = system.util.getLogger(logger_name)
|
|
base = "[%s_SCADA_TAG_PROVIDER]%s" % (whid, component_id)
|
|
if not system.tag.exists(base + "/DCN"):
|
|
tag = {"name": "DCN",
|
|
"valueSource": "memory",
|
|
"dataType": "Boolean",
|
|
"value": True}
|
|
create_tag = system.tag.configure(base, tag)
|
|
if not create_tag[0].isGood():
|
|
logger.warn("Failed to create tag: " + str(source_id))
|
|
|
|
class Update():
|
|
def __init__(self):
|
|
tags_to_read = system.tag.readBlocking(["Configuration/FC"])
|
|
self.fc = tags_to_read[0].value
|
|
self.tag_provider = "[%s_SCADA_TAG_PROVIDER]" % (self.fc)
|
|
self.tags_to_write = []
|
|
self.values_to_write = []
|
|
self.logger = system.util.getLogger("%s-Global variable reader"
|
|
% (self.fc))
|
|
def read_messages_from_queue(self):
|
|
size = global_queue.qsize()
|
|
self.logger.info("Queue size: " + str(size))
|
|
for i in range(0, size):
|
|
message = global_queue.get()
|
|
source_id ="%s%s/STATE" % (self.tag_provider, message[0])
|
|
create_tags_in_place(source_id, message[1], self.logger)
|
|
self.tags_to_write.append(source_id)
|
|
self.values_to_write.append(message[1])
|
|
|
|
def write_tags(self):
|
|
alarm_path = "%sSystem/aws_data" % ( self.tag_provider)
|
|
alarm_data = system.util.jsonEncode(global_alarms)
|
|
self.tags_to_write.append(alarm_path)
|
|
self.values_to_write.append(alarm_data)
|
|
system.tag.writeBlocking(self.tags_to_write, self.values_to_write)
|
|
self.logger.info("State messages written: " + str(len(self.values_to_write)))
|
|
|
|
def create_tags_in_place(source_id, value, logger):
|
|
base = source_id.replace("/STATE", "")
|
|
if not system.tag.exists(source_id):
|
|
tag = {"name": "STATE",
|
|
"valueSource": "memory",
|
|
"dataType": "Int1",
|
|
"value": value}
|
|
create_tag = system.tag.configure(base, tag)
|
|
if not create_tag[0].isGood():
|
|
logger.warn("Failed to create tag: " + str(source_id)) |