BNA8/.resources/41e4948cdb5cbf9d4cfe3c67d9928e68da77677fded40c1041365032c445fbbe

100 lines
3.2 KiB
Plaintext

import datetime
import hashlib
import hmac
try:
from urllib.parse import quote_plus
except ImportError:
from urllib import quote_plus
def sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def getSignatureKey(key, dateStamp, regionName, serviceName):
kDate = sign(("AWS4" + key).encode("utf-8"), dateStamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, "aws4_request")
return kSigning
def build_querystring(access_key, session_key, algorithm, amz_date, credential_scope):
query_strings = {
"X-Amz-Algorithm": algorithm,
"X-Amz-Credential": quote_plus(access_key + "/" + credential_scope),
"X-Amz-Date": amz_date,
#"X-Amz-Security-Token": quote_plus(session_key),
"X-Amz-SignedHeaders": "host",
}
keys = list(query_strings.keys())
keys.sort()
query = []
for key in keys:
query.append("{}={}".format(key, query_strings[key]))
canonical_query_string = "&".join(
query
#["{}={}".format(key, value) for key, value in query_strings.items()]
)
return canonical_query_string
def make_websocket_connection(api_id, region, stage, credentials):
method = "GET"
service = "execute-api"
host = "{}.{}.{}.amazonaws.com".format(api_id, service, region)
canonical_uri = "/{}".format(stage)
access_key = credentials["AccessKey"]
secret_key = credentials["SecretKey"]
session_key = credentials["SessionKey"]
now = datetime.datetime.utcnow()
amz_date = now.strftime("%Y%m%dT%H%M%SZ")
datestamp = now.strftime("%Y%m%d")
canonical_headers = "host:" + host + "\n"
signed_headers = "host"
algorithm = "AWS4-HMAC-SHA256"
credential_scope = "/".join([datestamp, region, service, "aws4_request"])
canonical_querystring = build_querystring(
access_key, session_key, algorithm, amz_date, credential_scope
)
payload_hash = hashlib.sha256(("").encode("utf-8")).hexdigest()
canonical_request = "\n".join(
[
method,
canonical_uri,
"",
#canonical_querystring,
canonical_headers,
signed_headers,
payload_hash,
]
)
string_to_sign = "\n".join(
[
algorithm,
amz_date,
credential_scope,
hashlib.sha256(canonical_request.encode("utf-8")).hexdigest(),
]
)
signing_key = getSignatureKey(secret_key, datestamp, region, service)
signature = hmac.new(
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
).hexdigest()
canonical_querystring += "&X-Amz-Signature=" + signature
request_url = "wss://{}/{}".format(host, stage)
auth_header = algorithm + " Credential=" + access_key + "/" + credential_scope + ", SignedHeaders=" + signed_headers + ", Signature=" + signature
#print('-H "Authorization":"' + auth_header +'" -H "X-Amz-Date":"' + amz_date + '" -H "X-Amz-Security-Token":"' + session_key + '" ')
request_headers = {
"Authorization":auth_header,
"X-Amz-Date": amz_date,
"X-Amz-Security-Token": session_key
}
return request_url, request_headers