import datetime import hashlib import hmac import boto3 try: from urllib.parse import quote_plus except ImportError: from urllib import quote_plus def sign(key, msg): return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() def getSignatureKey(key, dateStamp, regionName, serviceName): kDate = sign(("AWS4" + key).encode("utf-8"), dateStamp) kRegion = sign(kDate, regionName) kService = sign(kRegion, serviceName) kSigning = sign(kService, "aws4_request") return kSigning def build_querystring(access_key, session_key, algorithm, amz_date, credential_scope): query_strings = { "X-Amz-Algorithm": algorithm, "X-Amz-Credential": quote_plus(access_key + "/" + credential_scope), "X-Amz-Date": amz_date, #"X-Amz-Security-Token": quote_plus(session_key), "X-Amz-SignedHeaders": "host", } keys = list(query_strings.keys()) keys.sort() query = [] for key in keys: query.append("{}={}".format(key, query_strings[key])) canonical_query_string = "&".join( query #["{}={}".format(key, value) for key, value in query_strings.items()] ) return canonical_query_string def make_websocket_connection(api_id, region, stage, credentials): method = "GET" service = "execute-api" host = "{}.{}.{}.amazonaws.com".format(api_id, service, region) canonical_uri = "/{}".format(stage) access_key = credentials["AccessKey"] secret_key = credentials["SecretKey"] session_key = credentials["SessionKey"] now = datetime.datetime.utcnow() amz_date = now.strftime("%Y%m%dT%H%M%SZ") datestamp = now.strftime("%Y%m%d") canonical_headers = "host:" + host + "\n" signed_headers = "host" algorithm = "AWS4-HMAC-SHA256" credential_scope = "/".join([datestamp, region, service, "aws4_request"]) canonical_querystring = build_querystring( access_key, session_key, algorithm, amz_date, credential_scope ) payload_hash = hashlib.sha256(("").encode("utf-8")).hexdigest() canonical_request = "\n".join( [ method, canonical_uri, "", #canonical_querystring, canonical_headers, signed_headers, payload_hash, ] ) string_to_sign = "\n".join( [ algorithm, amz_date, credential_scope, hashlib.sha256(canonical_request.encode("utf-8")).hexdigest(), ] ) signing_key = getSignatureKey(secret_key, datestamp, region, service) signature = hmac.new( signing_key, string_to_sign.encode("utf-8"), hashlib.sha256 ).hexdigest() canonical_querystring += "&X-Amz-Signature=" + signature request_url = "wss://{}/{}".format(host, stage) auth_header = algorithm + " Credential=" + access_key + "/" + credential_scope + ", SignedHeaders=" + signed_headers + ", Signature=" + signature #print('-H "Authorization":"' + auth_header +'" -H "X-Amz-Date":"' + amz_date + '" -H "X-Amz-Security-Token":"' + session_key + '" ') request_headers = { "Authorization":auth_header, "X-Amz-Date": amz_date, "X-Amz-Security-Token": session_key } return request_url, request_headers