import java.net.http.WebSocketHandshakeException from java.net.http import HttpClient; from java.net.http import WebSocket; from java.net import URI from java.lang import Thread import uuid import json import time #Check for a http client stored in the globals. whid = system.tag.readBlocking(["Configuration/FC"])[0].value client = system.util.getGlobals().get(whid, {}).get("http_client", None) #Store the http client as a global variable to be reused on project saves. if not client: client = HttpClient.newHttpClient() system.util.getGlobals()[whid]["http_client"] = client class Listener(WebSocket.Listener): """ Creates a Listener for receiving web socket messages. The mehtods in this class are standard Java methods that have been overidden to include additional functionality. onOpen, onText, onClose and onError are called by the class whenthe web socket is opened, when the web socket receives data, when the web socket is closed, when the web socket encounters an error, respectively. Messages are sent from the web socket by calling the sendText method on the Listener object. Args: whid: Warehouse id for the tag provider (string). message_handler: A message handler object which parses the messages received from the onText method (class) Returns: Listener object. Raises: Error handling is performed by the onError method. This method can be overidden with additional logic for handling errors detected by the Listener object. """ def __init__(self, whid, message_handler): self.whid = whid self.alarms = {} self.tag_provider = "[%s_SCADA_TAG_PROVIDER]" % (self.whid) self.logger = system.util.getLogger("%s-Web-Socket-Listener" % (self.whid)) self.message = "" self.message_handler = message_handler def onOpen(self, websocket): #Generate uuid to help track the connection in aws. uid = uuid.uuid4() on_open_subscribe = json.dumps({"action": "subscribe", "parameters": {"siteId": self.whid, "clientName": str(uid)}} ) websocket.sendText(on_open_subscribe, True) logger = system.util.getLogger("Web-Socket-OnOpen") self.logger.info("message sent =" + str(on_open_subscribe)) websocket.request(1) def onText(self, websocket, data, last): self.message += str(data) if not last: websocket.request(1) else: json_message = json.loads(self.message) self.message = "" self.message_handler.handle_message(json_message) websocket.request(1) def onClose(self, websocket, error): self.logger.info("Onclose method " + str(error)) def onError(self, websocket, error): self.logger.error("OnError method " + str(error)) def web_socket_main(whid, provider, region, message_handler, secret_name): """ Main function for running a web socket. This function can be called in an asynchronous thread and should only exit when the socket has been closed or an error is encountered. The function will create a web socket object and run in a while loop to keep the socket connection open. It will exit if an error is encounterd, the socket is manually closed from the tag provider or the socket is closed. Args: whid: Warehouse id for the tag provider (string). provider: Tag provider that the web socket will use to write messages to/from (string). region: The AWS region of the api endpoint. Usally the same region as the EC2 running the web socket (string). message_handler: message handler object used for parsing of the web socket messages (class). secret_name : name of the secret to be passed into the web socket. This will retreive the api endpoint for AWS. Returns: N/A. Raises: Secrets manager error web socket error """ thread_name = str(Thread.getId(Thread.currentThread())) system.tag.writeAsync([provider + "system/thread_id"],[thread_name]) system.util.getGlobals()[whid]["wbsckt_running"] = True system.tag.writeAsync([provider + "System/wbsckt_running"],[1]) logger_name = "%s-web-socket-main" % (whid) logger = system.util.getLogger(logger_name) timer_end = None timer_started = False """The heartbeat is initalised with the current time on first connect Each time a new heartbeat is recieved in AWS.message_types the current time is written to the tag wbsckt_heartbeat_interval. The websocket checks that a heartbeat has been recieved at least every 120 secs. If a heartbeat is not recieved within the 120 sec duration the connection is closed and the loop will exit. """ AWS.heartbeat.get_heartbeat(provider) tags_to_read = system.tag.readBlocking([provider + "System/wbsckt_heartbeat_interval"]) wbsckt_heartbeat_interval = tags_to_read[0].value #Return api endpoint from secrets manager. API_ID, STAGE, ACC_ID, FUNC_URL = AWS.secrets_manager.get_secret(whid, secret_name) try: credentials = AWS.credentials.assume_role(profile_name = "default", region = region, arn = ACC_ID, api_id = API_ID, stage = STAGE) except: AWS.errors.error_handler(whid, "AWS.credentials.assume_role") return logger.info("Building URL ....") url, headers = AWS.build_url.make_websocket_connection(API_ID, region, STAGE, credentials) listener = AWS.web_socket.Listener(whid, message_handler) # client = HttpClient.newHttpClient() #set the client as global (stored in the system global variables). global client uri = URI.create(url) logger.info(str(uri)) logger.info("Building web-socket object ....") wsBuilder = client.newWebSocketBuilder() wsBuilder.header("Authorization", headers["Authorization"]) wsBuilder.header("X-Amz-Date", headers["X-Amz-Date"]) wsBuilder.header("X-Amz-Security-Token", headers["X-Amz-Security-Token"]) try: wsObj = wsBuilder.buildAsync(uri, listener) except: AWS.errors.error_handler(whid, "Build web socket") return web_socket = wsObj.get() logger.info("Web socket object built, starting while loop ....") running = 1 while True: time.sleep(0.1) if running == 1: logger.info("While loop running ....") running = 0 if AWS.heartbeat.check_heartbeat(provider, 70): web_socket.sendClose(web_socket.NORMAL_CLOSURE, "Missing heartbeat") logger.warn("socket closed , missing heartbeat") web_socket.abort() text_val = web_socket.sendText(str({"action":"abort"}), True) break check_socket_closed_in_loop = AWS.wbsckt_abort.check_web_socket() if check_socket_closed_in_loop: web_socket.sendClose(web_socket.NORMAL_CLOSURE, "") logger.info("socket close initiated") # web_socket.abort() text_val = web_socket.sendText(str({"action":"abort"}), True) break if not timer_started: timer_start = system.date.now() timer_started = True timer_end = system.date.now() time_diff = system.date.secondsBetween(timer_start, timer_end) if time_diff >= wbsckt_heartbeat_interval: send_heartbeat = True timer_started = False if web_socket.isOutputClosed(): logger.info("Websocket output closed") break if web_socket.isInputClosed(): logger.info("Websocket input closed") break this_thread = system.tag.readBlocking(provider + "System/thread_id")[0].value if this_thread != thread_name: logger.warn("thread_id does not match current thread_id") break tags_to_read = system.tag.readBlocking(["System/wbsckt_messages_send"]) messages = system.util.jsonDecode(tags_to_read[0].value) message_list = messages.get("message_list") if message_list: for i in message_list: message_string = str(i) formatted_string = message_string.replace("u'","'") json_string = formatted_string.replace("'","\"") web_socket.sendText(str(json_string), True) logger.info("Message sent: " + str(json_string)) system.tag.writeAsync(["System/wbsckt_messages_send"], "{}") system.util.getGlobals()[whid]["wbsckt_running"] = False web_socket.abort() system.tag.writeBlocking([provider + "System/wbsckt_running"], [0])