from datetime import datetime import Queue import copy """Global variables required so we can use different event timer scripts to perform the writing of tag values seperateley from the reading of messages on the web socket. State tags messages are queued into the global_queue, where they are read on a different thread by the Update class and writen to tags. All alarms are written to global_alarms. The alarms are then read by the Visualisation.status class where they are transformed into an alarm state and written to tags. """ global_alarms = {} global_states = set([]) global_queue = Queue.Queue(maxsize=100000) global_first_connect = False class A2C_MessageHandler(): """ Handles the incoming A2C messages. Stores values in memory as dictionaries and then writes them to tags. Instance Attributes: self.alarms: Holds the current active alarm data(dict). self.state: Holds the current active state data(dict). self.update_state: Flag, set when state data is available to write(bool) self.update_alarms: Flag, set when alarm data is available to write(bool) self.whid: Warehouse id for the site(str) self.tag_provider: Tag provider to write tag values to(str) self.A2C_MESSAGE_TYPES: Holds a reference to the methods called for different message types(dict) Returns: NA. Raises: KeyError: NA. """ def __init__(self, whid): global global_alarms global global_first_connect global_first_connect = False global_alarms = {} self.whid = whid self.tag_provider = "[%s_SCADA_TAG_PROVIDER]" % (self.whid) self.logger = system.util.getLogger("%s-Web-Socket-Message-Handler" % (self.whid)) tag_to_read = "%sSystem/wbsckt_logging" % (self.tag_provider) self.check_logger_active = system.tag.readBlocking([tag_to_read])[0].value self.A2C_MESSAGE_TYPES = { "SCADAMetricsInterface.StateChange": self.handle_state_message, # "SCADAMetricsInterface.Event": self.handle_alarm_message, "CloudMetricsInterface.Event": self.handle_alarm_message, "ScadaCloud.Download": self.handle_download_message, "ScadaCloud.Connection": self.handle_cloud_connection} def message_logger(self, message): if self.check_logger_active: self.logger.info(message) def message_lookup(self, message_type, message): try: self.A2C_MESSAGE_TYPES[message_type](message) except KeyError as e: self.message_logger("Message type not found:" + str(message)) def handle_message(self, message): heartbeat_message = message.get("action") message_type = message.get("payload", {}).get("payloadType") format = message.get("payload", {}).get("format") if message_type == "ScadaCloud.Batch": messages = message.get("payload", {}).get("messages",[]) for i in messages: message_type = i.get("payload", {}).get("payloadType") self.message_lookup(message_type, i) elif message_type !="ScadaCloud.Batch" and not heartbeat_message: self.message_lookup(message_type, message) else: self.message_logger("Heartbeat:" + str(message)) tag_to_write = "%sSystem/wbsckt_heartbeat_interval" % (self.tag_provider) current_time = system.date.now() system.tag.writeAsync([tag_to_write], [current_time]) def handle_alarm_message(self, message): global global_alarms header = message.get("header",{}) payload = message.get("payload",{}) source = header.get("sourceId") alarm_message = payload.get("message") alarm_type = payload.get("type") timestamp = payload.get("timestamp") priority = payload.get("priority") alarm_id = payload.get("id") shelve_expiry = payload.get("shelveExpiryEpoch",0) state = payload.get("state") if (isinstance(source, unicode) and isinstance(alarm_message, unicode) and isinstance(alarm_type, int)and isinstance(timestamp, long) and isinstance(priority, int) and isinstance(shelve_expiry, int) and isinstance(state, int)) and isinstance(alarm_id, int): scada_alarm_message = {"sourceId": source, "message": alarm_message, "type": alarm_type, "timestamp": timestamp, "priority": priority, "id": alarm_id, "shelveExpiryEpoch": shelve_expiry, "state": state} alarm_id = "%s/alarm/%s" % (source, alarm_id) if state == 0: removed_value = global_alarms.pop(alarm_id, "No key found") self.message_logger("Value removed from aws_data: " + str(removed_value) + ":" + str(alarm_id)) else: global_alarms[alarm_id] = scada_alarm_message self.message_logger("Value added to aws_data: " + str(scada_alarm_message)) else: self.message_logger("Incorrect type value in message fields: " + str(message)) def handle_state_message(self, message): global global_queue header = message.get("header",{}) payload = message.get("payload",{}) source_id = header.get("sourceId") state = payload.get("currentMachineState") time_stamp = payload.get("timestamp") if isinstance(source_id, unicode) and isinstance(state, int): scada_state_message = {"timestamp": time_stamp, "state":state} global_queue.put([source_id, state]) self.message_logger("State message written to queue: " + str({source_id:scada_state_message})) else: self.message_logger("Incorrect type value in message fields: " + str(message)) def handle_download_message(self, message): url = message.get("payload", {}).get("downloadUrl", None) session_id = message.get("payload", {}).get("sessionId", None) download = {} payload = {"session_id":session_id, "url": url} download["data"] = [payload] tag_to_write = "%sSystem/download" % (self.tag_provider) json_payload = system.util.jsonEncode(download) system.tag.writeAsync([tag_to_write], [json_payload]) self.message_logger("Download message received: " + str(message)) def handle_cloud_connection(self, message): global global_alarms UNKNOWN = 3 ACTIVE = 1 header = message.get("header",{}) payload = message.get("payload",{}) component_type = payload.get("componentType") timestamp = header.get("timestamp", 0) event_type = payload.get("eventType") component_id = payload.get("componentId") scada_alarm_message = create_disconnect_message(component_id, timestamp, 1) if event_type == "DISCONNECT": self.message_logger(str(scada_alarm_message)) # #Call disconnect routine with a value 3 which is an unknown state. self.alarms_disconnect(component_id, UNKNOWN, scada_alarm_message) if event_type == "CONNECT": #Call disconnect routine with a value 1 which is an active state. self.alarms_disconnect(component_id, ACTIVE, scada_alarm_message) if event_type == "SYNC" and component_type == "PLC": alarm_id = "%s/alarm/%s" % (component_id, message) for k,v in global_alarms.items(): if k.startswith(component_id): global_alarms.pop(alarm_id, "No key found") def alarms_disconnect(self, component_id, value, message): global global_alarms #Set alarms in the global_alarms to an unknown state. #If component id == "DATABRIDGE" set all alarms to unknown SITE_DISCONNECTS = ["DATABRIDGE", self.whid] if component_id in SITE_DISCONNECTS: site_disconnect = True source_id = "" else: source_id = component_id site_disconnect = False for k,v in global_alarms.items(): device_name = k.split("/")[0] if k.startswith(source_id) and device_name not in SITE_DISCONNECTS: global_alarms[k]["state"] = value alarm_id = "%s/alarm/%s" % (component_id, "Device disconnected") #Set the alarms to true for device disconnects if site_disconnect: data_bridge_disconnect(self.whid, value, message, component_id) else: tag_path = "%s%s/DCN" % (self.tag_provider, source_id) if value == 3: create_disconnect_tags(self.whid, source_id) global_alarms[alarm_id] = message system.tag.writeBlocking([tag_path], [1]) else: global_alarms.pop(alarm_id, "No key found") system.tag.writeBlocking([tag_path], [0]) def create_disconnect_message(component_id, timestamp, state): alarm_message = {"sourceId": component_id, "message": "Device disconnected", "type": 0, "timestamp":timestamp, "priority": 4, "shelveExpiryEpoch": 0, "state": state} return alarm_message def data_bridge_disconnect(whid, value, message, component_id): global global_alarms device_list = get_device_list(whid) time_stamp = message.get("timestamp") tags_to_write = [] values_to_write = [] if value == 3: disconnect = True else: disconnect = False for i in device_list: create_disconnect_tags(whid, i) alarm_id = "%s/alarm/%s" % (i, "Device disconnected") tag_path = "[%s_SCADA_TAG_PROVIDER]%s/DCN" % (whid, i) device_message = create_disconnect_message(i, time_stamp, 3) tags_to_write.append(tag_path) if disconnect: global_alarms[alarm_id] = device_message values_to_write.append(1) else: global_alarms.pop(alarm_id, "No key found") values_to_write.append(0) alarm_id = "%s/alarm/%s" % (component_id, "Device disconnected") if disconnect: global_alarms[alarm_id] = message else: global_alarms.pop(alarm_id, "No key found") system.tag.writeAsync(tags_to_write, values_to_write) def get_device_list(whid): provider = "[%s_SCADA_TAG_PROVIDER]" % (whid) tag_path = "%sConfiguration/DetailedViews" % (provider) tags_to_read = system.tag.readBlocking([tag_path]) devices = system.util.jsonDecode(tags_to_read[0].value) device_list = [] for k,v in devices.items(): for i in v: device_list.append(i) return device_list def create_disconnect_tags(whid, component_id): logger_name = "%s-Create-Disconnect-Tags" % (whid) logger = system.util.getLogger(logger_name) base = "[%s_SCADA_TAG_PROVIDER]%s" % (whid, component_id) if not system.tag.exists(base + "/DCN"): tag = {"name": "DCN", "valueSource": "memory", "dataType": "Boolean", "value": True} create_tag = system.tag.configure(base, tag) if not create_tag[0].isGood(): logger.warn("Failed to create tag: " + str(source_id)) class Update(): def __init__(self): tags_to_read = system.tag.readBlocking(["Configuration/FC"]) self.fc = tags_to_read[0].value self.tag_provider = "[%s_SCADA_TAG_PROVIDER]" % (self.fc) self.tags_to_write = [] self.values_to_write = [] self.logger = system.util.getLogger("%s-Global variable reader" % (self.fc)) def read_messages_from_queue(self): size = global_queue.qsize() self.logger.info("Queue size: " + str(size)) for i in range(0, size): message = global_queue.get() source_id ="%s%s/STATE" % (self.tag_provider, message[0]) create_tags_in_place(source_id, message[1], self.logger) self.tags_to_write.append(source_id) self.values_to_write.append(message[1]) def write_tags(self): alarm_path = "%sSystem/aws_data" % ( self.tag_provider) alarm_data = system.util.jsonEncode(global_alarms) self.tags_to_write.append(alarm_path) self.values_to_write.append(alarm_data) system.tag.writeBlocking(self.tags_to_write, self.values_to_write) self.logger.info("State messages written: " + str(len(self.values_to_write))) def create_tags_in_place(source_id, value, logger): base = source_id.replace("/STATE", "") if not system.tag.exists(source_id): tag = {"name": "STATE", "valueSource": "memory", "dataType": "Int1", "value": value} create_tag = system.tag.configure(base, tag) if not create_tag[0].isGood(): logger.warn("Failed to create tag: " + str(source_id))