SAT9/.resources/f0b285a16f8e35a2d05c4a68bbd10a60615fffbaadad54e3fbc7b4c1a5b898ec

211 lines
7.8 KiB
Plaintext

import java.net.http.WebSocketHandshakeException
from java.net.http import HttpClient;
from java.net.http import WebSocket;
from java.net import URI
from java.lang import Thread
import uuid
import json
import time
#Check for a http client stored in the globals.
whid = system.tag.readBlocking(["Configuration/FC"])[0].value
client = system.util.getGlobals().get(whid, {}).get("http_client", None)
#Store the http client as a global variable to be reused on project saves.
if not client:
client = HttpClient.newHttpClient()
system.util.getGlobals()[whid]["http_client"] = client
class Listener(WebSocket.Listener):
"""
Creates a Listener for receiving web socket messages.
The mehtods in this class are standard Java methods
that have been overidden to include additional functionality. onOpen,
onText, onClose and onError are called by the class whenthe web socket
is opened, when the web socket receives data,
when the web socket is closed, when the web socket encounters an error,
respectively. Messages are sent from the web socket by calling the sendText
method on the Listener object.
Args:
whid: Warehouse id for the tag provider (string).
message_handler: A message handler object which parses
the messages received from the onText
method (class)
Returns:
Listener object.
Raises:
Error handling is performed by the onError method.
This method can be overidden with additional logic
for handling errors detected by the Listener object.
"""
def __init__(self, whid, message_handler):
self.whid = whid
self.alarms = {}
self.tag_provider = "[%s_SCADA_TAG_PROVIDER]" % (self.whid)
self.logger = system.util.getLogger("%s-Web-Socket-Listener" % (self.whid))
self.message = ""
self.message_handler = message_handler
def onOpen(self, websocket):
#Generate uuid to help track the connection in aws.
uid = uuid.uuid4()
on_open_subscribe = json.dumps({"action": "subscribe",
"parameters": {"siteId": self.whid,
"clientName": str(uid)}}
)
websocket.sendText(on_open_subscribe, True)
logger = system.util.getLogger("Web-Socket-OnOpen")
self.logger.info("message sent =" + str(on_open_subscribe))
websocket.request(1)
def onText(self, websocket, data, last):
self.message += str(data)
if not last:
websocket.request(1)
else:
json_message = json.loads(self.message)
self.message = ""
self.message_handler.handle_message(json_message)
websocket.request(1)
def onClose(self, websocket, error):
self.logger.info("Onclose method " + str(error))
def onError(self, websocket, error):
self.logger.error("OnError method " + str(error))
def web_socket_main(whid, provider, region, message_handler, secret_name):
"""
Main function for running a web socket. This function can
be called in an asynchronous thread and should only exit
when the socket has been closed or an error is encountered.
The function will create a web socket object and run in a
while loop to keep the socket connection open.
It will exit if an error is encounterd, the socket is manually
closed from the tag provider or the socket is closed.
Args:
whid: Warehouse id for the tag provider (string).
provider: Tag provider that the web socket will use to write messages to/from (string).
region: The AWS region of the api endpoint. Usally the same region as the EC2
running the web socket (string).
message_handler: message handler object used for parsing of the web socket messages (class).
secret_name : name of the secret to be passed into the web socket. This will retreive the api endpoint for AWS.
Returns:
N/A.
Raises:
Secrets manager error
web socket error
"""
thread_name = str(Thread.getId(Thread.currentThread()))
system.tag.writeAsync([provider + "system/thread_id"],[thread_name])
system.util.getGlobals()[whid]["wbsckt_running"] = True
system.tag.writeAsync([provider + "System/wbsckt_running"],[1])
logger_name = "%s-web-socket-main" % (whid)
logger = system.util.getLogger(logger_name)
timer_end = None
timer_started = False
"""The heartbeat is initalised with the current time on first connect
Each time a new heartbeat is recieved in AWS.message_types
the current time is written to the tag wbsckt_heartbeat_interval.
The websocket checks that a heartbeat has been recieved at least every 120 secs.
If a heartbeat is not recieved within the 120 sec duration the connection is closed and the loop will exit.
"""
AWS.heartbeat.get_heartbeat(provider)
tags_to_read = system.tag.readBlocking([provider + "System/wbsckt_heartbeat_interval"])
wbsckt_heartbeat_interval = tags_to_read[0].value
#Return api endpoint from secrets manager.
API_ID, STAGE, ACC_ID, FUNC_URL = AWS.secrets_manager.get_secret(whid, secret_name)
try:
credentials = AWS.credentials.assume_role(profile_name = "default", region = region, arn = ACC_ID, api_id = API_ID, stage = STAGE)
except:
AWS.errors.error_handler(whid, "AWS.credentials.assume_role")
return
logger.info("Building URL ....")
url, headers = AWS.build_url.make_websocket_connection(API_ID, region, STAGE, credentials)
listener = AWS.web_socket.Listener(whid, message_handler)
# client = HttpClient.newHttpClient()
#set the client as global (stored in the system global variables).
global client
uri = URI.create(url)
logger.info(str(uri))
logger.info("Building web-socket object ....")
wsBuilder = client.newWebSocketBuilder()
wsBuilder.header("Authorization", headers["Authorization"])
wsBuilder.header("X-Amz-Date", headers["X-Amz-Date"])
wsBuilder.header("X-Amz-Security-Token", headers["X-Amz-Security-Token"])
try:
wsObj = wsBuilder.buildAsync(uri, listener)
except:
AWS.errors.error_handler(whid, "Build web socket")
return
web_socket = wsObj.get()
logger.info("Web socket object built, starting while loop ....")
running = 1
while True:
time.sleep(0.1)
if running == 1:
logger.info("While loop running ....")
running = 0
if AWS.heartbeat.check_heartbeat(provider, 70):
web_socket.sendClose(web_socket.NORMAL_CLOSURE, "Missing heartbeat")
logger.warn("socket closed , missing heartbeat")
web_socket.abort()
text_val = web_socket.sendText(str({"action":"abort"}), True)
break
check_socket_closed_in_loop = AWS.wbsckt_abort.check_web_socket()
if check_socket_closed_in_loop:
web_socket.sendClose(web_socket.NORMAL_CLOSURE, "")
logger.info("socket close initiated")
# web_socket.abort()
text_val = web_socket.sendText(str({"action":"abort"}), True)
break
if not timer_started:
timer_start = system.date.now()
timer_started = True
timer_end = system.date.now()
time_diff = system.date.secondsBetween(timer_start, timer_end)
if time_diff >= wbsckt_heartbeat_interval:
send_heartbeat = True
timer_started = False
if web_socket.isOutputClosed():
logger.info("Websocket output closed")
break
if web_socket.isInputClosed():
logger.info("Websocket input closed")
break
this_thread = system.tag.readBlocking(provider + "System/thread_id")[0].value
if this_thread != thread_name:
logger.warn("thread_id does not match current thread_id")
break
tags_to_read = system.tag.readBlocking(["System/wbsckt_messages_send"])
messages = system.util.jsonDecode(tags_to_read[0].value)
message_list = messages.get("message_list")
if message_list:
for i in message_list:
message_string = str(i)
formatted_string = message_string.replace("u'","'")
json_string = formatted_string.replace("'","\"")
web_socket.sendText(str(json_string), True)
logger.info("Message sent: " + str(json_string))
system.tag.writeAsync(["System/wbsckt_messages_send"], "{}")
system.util.getGlobals()[whid]["wbsckt_running"] = False
web_socket.abort()
system.tag.writeBlocking([provider + "System/wbsckt_running"], [0])