102 lines
3.2 KiB
Plaintext
102 lines
3.2 KiB
Plaintext
import datetime
|
|
import hashlib
|
|
import hmac
|
|
|
|
import boto3
|
|
|
|
try:
|
|
from urllib.parse import quote_plus
|
|
except ImportError:
|
|
from urllib import quote_plus
|
|
|
|
|
|
def sign(key, msg):
|
|
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
|
|
|
|
|
|
def getSignatureKey(key, dateStamp, regionName, serviceName):
|
|
kDate = sign(("AWS4" + key).encode("utf-8"), dateStamp)
|
|
kRegion = sign(kDate, regionName)
|
|
kService = sign(kRegion, serviceName)
|
|
kSigning = sign(kService, "aws4_request")
|
|
return kSigning
|
|
|
|
|
|
def build_querystring(access_key, session_key, algorithm, amz_date, credential_scope):
|
|
query_strings = {
|
|
"X-Amz-Algorithm": algorithm,
|
|
"X-Amz-Credential": quote_plus(access_key + "/" + credential_scope),
|
|
"X-Amz-Date": amz_date,
|
|
#"X-Amz-Security-Token": quote_plus(session_key),
|
|
"X-Amz-SignedHeaders": "host",
|
|
}
|
|
keys = list(query_strings.keys())
|
|
keys.sort()
|
|
query = []
|
|
for key in keys:
|
|
query.append("{}={}".format(key, query_strings[key]))
|
|
|
|
canonical_query_string = "&".join(
|
|
query
|
|
#["{}={}".format(key, value) for key, value in query_strings.items()]
|
|
)
|
|
return canonical_query_string
|
|
|
|
|
|
def make_websocket_connection(api_id, region, stage, credentials):
|
|
method = "GET"
|
|
service = "execute-api"
|
|
host = "{}.{}.{}.amazonaws.com".format(api_id, service, region)
|
|
canonical_uri = "/{}".format(stage)
|
|
access_key = credentials["AccessKey"]
|
|
secret_key = credentials["SecretKey"]
|
|
session_key = credentials["SessionKey"]
|
|
now = datetime.datetime.utcnow()
|
|
|
|
amz_date = now.strftime("%Y%m%dT%H%M%SZ")
|
|
datestamp = now.strftime("%Y%m%d")
|
|
canonical_headers = "host:" + host + "\n"
|
|
signed_headers = "host"
|
|
algorithm = "AWS4-HMAC-SHA256"
|
|
credential_scope = "/".join([datestamp, region, service, "aws4_request"])
|
|
|
|
canonical_querystring = build_querystring(
|
|
access_key, session_key, algorithm, amz_date, credential_scope
|
|
)
|
|
payload_hash = hashlib.sha256(("").encode("utf-8")).hexdigest()
|
|
canonical_request = "\n".join(
|
|
[
|
|
method,
|
|
canonical_uri,
|
|
"",
|
|
#canonical_querystring,
|
|
canonical_headers,
|
|
signed_headers,
|
|
payload_hash,
|
|
]
|
|
)
|
|
string_to_sign = "\n".join(
|
|
[
|
|
algorithm,
|
|
amz_date,
|
|
credential_scope,
|
|
hashlib.sha256(canonical_request.encode("utf-8")).hexdigest(),
|
|
]
|
|
)
|
|
signing_key = getSignatureKey(secret_key, datestamp, region, service)
|
|
signature = hmac.new(
|
|
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
|
|
).hexdigest()
|
|
canonical_querystring += "&X-Amz-Signature=" + signature
|
|
request_url = "wss://{}/{}".format(host, stage)
|
|
auth_header = algorithm + " Credential=" + access_key + "/" + credential_scope + ", SignedHeaders=" + signed_headers + ", Signature=" + signature
|
|
#print('-H "Authorization":"' + auth_header +'" -H "X-Amz-Date":"' + amz_date + '" -H "X-Amz-Security-Token":"' + session_key + '" ')
|
|
request_headers = {
|
|
"Authorization":auth_header,
|
|
"X-Amz-Date": amz_date,
|
|
"X-Amz-Security-Token": session_key
|
|
}
|
|
return request_url, request_headers
|
|
|
|
|