SAT9/.resources/ee72435225fc39f0d1017338779d5d30627a3895132f0b650bd28d33aedf68e6

879 lines
34 KiB
Plaintext

import com.amazonaws.services.s3.AmazonS3ClientBuilder as AmazonS3ClientBuilder
import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest as GeneratePresignedUrlRequest
import com.amazonaws.HttpMethod as HttpMethod
import boto3
from botocore.client import BaseClient
from botocore.exceptions import ClientError
import json
from pprint import pformat
from urllib2_aws4auth import aws_urlopen, Request
from urllib2 import HTTPError
from urllib import urlencode
from helper.helper import sanitize_tree
from loggerConfig import getLogger
REGION_NAME = 'us-east-1'
LOGGER = getLogger('S3Manager', 'debug')
def getPresignedURL(self, objectKey):
"""
Generates a uri to retrieve images from an S3 bucket.
Bucket names are globally unique so different regions
must use a prefix for the bucket name.
Region and prefix are stored as custom session variables.
Args:
self: Refrence to the object calling the function.
param2: key to the s3 object returned.
Returns:
s3 Url to display the image in S3.
Raises:
KeyError: None.
"""
bucket_names = {"eu":"ignition-image-repo", "na":"ignition-image-repo-na",
"jp":"jp-ignition-image-repo"}
# aws = system.tag.readBlocking("Configuration/aws")[0].value
# aws = system.util.jsonDecode(aws)
# clientRegion = aws.get("region")
# prefix = aws.get("prefix")
clientRegion = self.session.custom.aws.region
prefix = self.session.custom.aws.prefix
bucketName = bucket_names.get(prefix, "ignition-image-repo")
s3Client = AmazonS3ClientBuilder.standard().withRegion(clientRegion).build();
generatePresignedUrlRequest = GeneratePresignedUrlRequest(bucketName, objectKey).withMethod(HttpMethod.GET);
url = s3Client.generatePresignedUrl(generatePresignedUrlRequest);
return url
S3_REPO_BUCKET_NAME = 'ignition-image-repo-na'
S3_SOURCE_BUCKET_NAME = 'ignition-image-source-na'
# api stage config
API_STAGES = ['beta', 'prod']
API_REGIONS = ['na', 'eu']
STAGE_CONFIG = {
'beta': {
'na': {
'region': 'us-east-1',
'lambda_name': 'RMESDScadaS3ManagementFlaskLambda-beta',
'endpoint': 'https://us-east-1.beta.scada-s3-management.scada.eurme.amazon.dev/',
'repo_bucket': 'ignition-image-repo-na',
'source_bucket': 'ignition-image-source-na',
's3_region': 'us-east-1',
'account_id': '006306898152',
'api_call_role': 'arn:aws:iam::604741092380:role/RMESDScadaS3ManagementAPIcallRole-beta-us-east-1'
},
'eu': {
'region': 'eu-west-2',
'lambda_name': 'RMESDScadaS3ManagementFlaskLambda-beta',
'endpoint': 'https://eu-west-2.beta.scada-s3-management.scada.eurme.amazon.dev/',
'repo_bucket': 'ignition-image-repo',
'source_bucket': 'ignition-image-source',
's3_region': 'eu-west-1',
'account_id': '006306898152',
'api_call_role': 'arn:aws:iam::604741092380:role/RMESDScadaS3ManagementAPIcallRole-beta-eu-west-2'
}
},
'prod': {
'na': {
'region': 'us-east-2',
'lambda_name': 'RMESDScadaS3ManagementFlaskLambda-prod',
'endpoint': 'https://us-east-2.scada-s3-management.scada.eurme.amazon.dev/',
'repo_bucket': 'ignition-image-repo-na',
'source_bucket': 'ignition-image-source-na',
's3_region': 'us-east-1',
'account_id': '006306898152',
'api_call_role': 'arn:aws:iam::609617486056:role/RMESDScadaS3ManagementAPIcallRole-prod-us-east-2'
},
'eu': {
'region': 'eu-west-1',
'lambda_name': 'RMESDScadaS3ManagementFlaskLambda-prod',
'endpoint': 'https://eu-west-1.scada-s3-management.scada.eurme.amazon.dev/',
'repo_bucket': 'ignition-image-repo',
'source_bucket': 'ignition-image-source',
's3_region': 'eu-west-1',
'account_id': '006306898152',
'api_call_role': 'arn:aws:iam::609617486056:role/RMESDScadaS3ManagementAPIcallRole-prod-eu-west-1'
}
}
}
OPERATION_MAP = {
'download': {
'method': 'GET',
'reqd_args': ['bucket', 'obj_key']
},
'get_presigned_url': {
'method': 'GET',
'reqd_args': ['bucket', 'obj_key']
},
'list_objects': {
'method': 'GET',
'reqd_args': ['bucket']
},
'list_object_versions': {
'method': 'GET',
'reqd_args': ['bucket']
},
'list_object_delete_markers': {
'method': 'GET',
'reqd_args': ['bucket']
},
'delete': {
'method': 'DELETE',
'reqd_args': ['bucket', 'obj_key']
},
'upload': {
'method': 'PUT',
'reqd_args': ['bucket', 'obj_key', 'obj_data']
},
'add_new_site': {
'method': 'PUT',
'reqd_args': ['site', 'bucket']
},
'copy_single': {
'method': 'POST',
'reqd_args': ['source_bucket', 'dest_bucket', 'source_key', 'dest_key']
},
'fetch_site_list': {
'method': 'GET',
'reqd_args': ['bucket']
},
'fetch_object_list_by_site_and_bucket': {
'method': 'GET',
'reqd_args': ['site', 'bucket']
},
'fetch_upload_url': {
'method': 'PUT',
'reqd_args': ['bucket', 'obj_key', 'region', 'content_type']
},
'query_audit_table': {
'method': 'POST',
'reqd_args': []
}
}
class S3Manager(object):
"""
This class contains convenience methods for working with S3 objects from Ignition python 2.7
"""
def __init__(self, api_stage='prod', api_region_name='na', username='', profile_name=None):
"""
Instantiates an S3 Class.
:param api_stage: str; (default='prod') api target stage (and default S3 folder)
:param api_region_name: str; (default='na') api target region (and account)
:param username: str; ignition session username (from `session.props.auth.user.userName`)
:return: None
"""
self._logger = LOGGER
# sanitize api stage and region values
if api_stage not in API_STAGES:
self._logger.info("`api_stage` must be one of: %s, received: %s" % (API_STAGES, api_stage))
api_stage = 'prod'
if api_region_name not in API_REGIONS:
self._logger.info("`api_region_name` must be one of: %s, received: %s" % (API_REGIONS, api_region_name))
api_region_name = 'na'
self._api_stage = api_stage
self._api_region_name = api_region_name
# grab stage config for this instance from global object
self._stage_config = STAGE_CONFIG.get(api_stage, STAGE_CONFIG['prod']).get(api_region_name, STAGE_CONFIG['prod']['na'])
d = self._stage_config
self._api_region = d.get('region', 'us-east-2')
self._s3_region = d.get('s3_region', 'us-east-1')
self._repo_bucket = d.get('repo_bucket', 'ignition-image-repo-na')
self._source_bucket = d.get('source_bucket', 'ignition-image-source-na')
self._lambda_name = d.get('lambda_name', 'RMESDScadaS3ManagementFlaskLambda-prod')
self._account_id = d.get('account_id', '006306898152')
self._endpoint = d.get('endpoint', 'https://us-east-2.scada-s3-management.scada.eurme.amazon.dev/')
self._service = 'execute-api'
if profile_name:
self._creds = boto3.Session(profile_name=profile_name).get_credentials()
# Define an opener method. The opener will apply AWS Sigv4 signing to requests
self._opener = aws_urlopen(
self._creds.access_key,
self._creds.secret_key,
self._api_region,
self._service,
session_token=self._creds.token,
verify=False
)
else:
# DEVNOTE: As the API has been segregated from the AWS account for the dev server, assume a dedicated role here
sts_client = boto3.Session().client('sts')
role_arn = d.get('api_call_role', None)
if role_arn:
response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName='ignition-s3-mgmt-client')
creds = response['Credentials']
# Define an opener method. The opener will apply AWS Sigv4 signing to requests
self._opener = aws_urlopen(
creds['AccessKeyId'],
creds['SecretAccessKey'],
self._api_region,
self._service,
session_token=creds['SessionToken'],
verify=False
)
else:
# use native boto3 creds if 'api_call_role' not defined in STAGE_CONFIG
self._creds = boto3.Session(profile_name=profile_name).get_credentials()
self._opener = aws_urlopen(
self._creds.access_key,
self._creds.secret_key,
self._api_region,
self._service,
session_token=self._creds.token,
verify=False
)
self._headers = {'Content-type': 'application/json', 'X-Remote-User': username}
def _send(self, operation='download', params={}, print_resp=False, **kwargs):
"""
private method to compile and send the request to api endpoint
:param operation: str; api endpoint method for request (See `OPERATION_MAP` for options)
:param params: dict; dictionary of parameters to pass to request (See `OPERATION_MAP` for reqd args)
:param print_resp: bool; if True, the associated logger will receive a print statement of the raw response, pprint.format'd
:return resp: dict; response object from api
"""
l = self._logger
if operation not in OPERATION_MAP.keys():
msg = 'operation "%s" is not a valid S3Manager operation! Options: %s' % (operation, list(OPERATION_MAP.keys()))
l.error(msg)
raise InvalidOperationS3Manager(msg)
op_config = OPERATION_MAP[operation]
method = op_config['method']
reqd_args = op_config['reqd_args']
missing_args = [x for x in reqd_args if x not in params.keys()]
if len(missing_args):
msg = 'The following required args were not provided in params for "%s" operation: %s' % (operation, missing_args)
l.error(msg)
raise InvalidParametersS3Manager(msg)
if method in ('GET', 'DELETE'):
querystring = '?%s' % urlencode(params)
payload = None
url = self._endpoint + operation + querystring
else:
try:
payload = json.dumps(params)
l.debug('payload for %s operation successfully serialized' % operation)
except:
payload = urlencode(params)
l.debug('payload for %s operation not serialized using json.dumps(), instead used urlencode()' % operation)
url = self._endpoint + operation
# Create a request object
req = Request(url=url, method=method, headers=self._headers, data=payload)
# open the request and process the read
try:
# use self._opener to sign and send the prepared request
resp = self._opener(req)
data = json.loads(resp.read())
if print_resp:
l.info('Response data: %s' % pformat(sanitize_tree(data)))
return data
except HTTPError, e:
try:
body = json.loads(e.fp.read())
e_msg = body.get('message', e.reason)
msg = 'Error sending S3Manager request: %s. Message: %s' % (str(e), e_msg)
l.error(msg)
raise HTTPErrorS3Manager(e.code, e_msg)
except AttributeError, e2:
# failed to extract reason or code from urllib2.HTTPError for some reason
import traceback
msg = 'Failed to extract reason and/or error code from urllib2.HTTPError. Trace: %s' % traceback.format_exc()
l.error(msg)
msg = 'Error sending S3Manager request: %s' % (str(e))
l.error(msg)# raise HTTPErrorS3Manager(e.code, msg)
raise HTTPErrorS3Manager(400, msg)
def upload(self, obj_data, obj_key, bucket=None, content_type='', region=None, **kwargs):
"""
Method to upload a JSON object to S3. Converts S3 to a compressed binary parquet file, then writes
the file to S3.
:param obj_data: JSON data object to upload to S3
:param obj_key: Path and object name of the object to create in S3
:param bucket: S3 bucket to write data to.
:param content_type: str; 'application/json' for json files, 'image/svg+xml' for svg files
:param region: AWS region that hosts the target S3 bucket.
:return: Boto3 `put_object` response
"""
l = self._logger
if not bucket:
# if no bucket provided, use repo bucket name from stage config
bucket = self._repo_bucket
if not region:
# if no region provided, use region name from stage config
region = self._s3_region
l.info('Uploading %s dataset to bucket %s' % (obj_key, bucket))
l.debug('++ Storing data file in S3')
operation = 'upload'
# check the suffix of obj_key and auto-populate content_type accordingly
if obj_key.endswith('json'):
content_type = 'application/json'
elif obj_key.endswith('svg'):
content_type = 'image/svg+xml'
elif obj_key.endswith('drawio'):
content_type = 'binary/octet-stream'
try:
if isinstance(obj_data, dict):
# serialize the object to a JSON string
obj_data = json.dumps(obj_data)
msg = '++ Uploading. Successfully serialized (json dump) object data for %s' % obj_key
l.debug(msg)
else:
msg = 'Uploading. Type of incoming object data: %s' % type(obj_data)
l.debug(msg)
except:
import traceback
msg = '++ Uploading. Error trying to serialize (json dump) object data: %s' % traceback.format_exc()
l.error(msg)
return msg
# params = {
# 'bucket': bucket,
# 'obj_key': obj_key,
# 'obj_data': obj_data,
# 'content_type': content_type,
# 'region': region
# }
# try:
# resp = self._send(operation, params, print_resp=kwargs.get('print_resp', False))
# l.debug('** Uploading Complete. Successfully uploaded %s' % obj_key)
# return resp
# except HTTPErrorS3Manager, e:
# return {'code': e.code, 'message': e.message}
# DEVNOTE: As there is a 10mb limitation on payload size to API gateway calls, going to use the
# `fetch_upload_url` method to get a presigned upload link and upload via system.net.httpPut
# so the above code will be commented out to use the below code
params = {
'bucket': bucket,
'obj_key': obj_key,
'region': region,
'content_type': content_type
}
try:
upload_url = self.fetch_upload_url(**params)
l.debug('** Fetching Upload URL Complete for object key: %s' % obj_key)
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
try:
# DEVNOTE: Test code below to upload to pre-signed S3 PUT url using urllib2_aws4auth module
# Create a request object using urllib2_aws4auth.Request and aws_urlopen methods
# see if this is limited like with the upload call to API gateway.
# system.net.httpPut call below is not limited
# Results: what works with `system.net.httpPut` fails with `urllib2_aws4auth` module (returns 400: BadRequest)
# if the file is > ~ 75 kb
# req = Request(url=upload_url, method='PUT', headers=self._headers, data=obj_data)
# resp = self._opener(req).read()
# msg = '** Successfully uploaded %s to %s bucket!\nResponse: %s' % (obj_key, bucket, pformat(resp))
resp = system.net.httpPut(upload_url, putData=obj_data, contentType=content_type)
msg = '** Successfully uploaded %s to %s bucket!' % (obj_key, bucket)
l.debug(msg)
return {'code': 200, 'message': msg}
except Exception, e:
msg = '++ Error uploading %s to %s bucket: %s' % (obj_key, bucket, str(e))
l.error(msg)
return {'code': 400, 'message': msg}
def fetch_upload_url(self, obj_key, bucket=None, region=None, expiration=3600, content_type="image/svg+xml", **kwargs):
"""
Retrieves a pre-signed URL for the obj key and bucket and the `put_object` client method.
Caller then uses pre-signed URL to upload the file to S3 directly.
:param obj_key: Path and object name of the object to create in S3
:param bucket: S3 bucket to write data to.
:param region: AWS region that hosts the target S3 bucket.
:param expiration: int; number of seconds until the link expires (default = 3600, 1 hour)
:param content_type: str; the content-type of the object (default = 'image/svg+xml')
:return: str; presigned URL as string.
"""
l = self._logger
if not bucket:
# if no bucket provided, use repo bucket name from stage config
bucket = self._repo_bucket
if not region:
# if no region provided, use region name from stage config
region = self._s3_region
l.info('Fetching upload pre-signed URL for %s object in %s bucket' % (obj_key, bucket))
operation = 'fetch_upload_url'
params = {
'bucket': bucket,
'obj_key': obj_key,
'expiration': expiration,
'region': region,
'content_type': content_type
}
try:
resp = self._send(operation, params, print_resp=kwargs.get('print_resp', False))
l.debug('** Fetching Upload URL Completed for %s' % obj_key)
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def add_new_site(self, site=None, bucket='both', **kwargs):
"""
Adds a new site folder to either repo, source, or both buckets
:param site: str; name of site/WHID. Must be 4 chars in format of "ABC1"
:param bucket: str; name of the bucket (S3_REPO_BUCKET_NAME, S3_SOURCE_BUCKET_NAME, or 'both') to add site folder to
if = 'both', then site folder will be added to both buckets
:return: dict; {'message': str} summarizing the folder add operation
"""
l = self._logger
l.info('Adding site %s folder' % (site))
operation = 'add_new_site'
params = {'site': site, 'bucket': bucket}
try:
resp = self._send(operation, params, print_resp=kwargs.get('print_resp', False))
l.debug('** Adding Site Complete. Successfully added %s to %s bucket(s)' % (site, bucket))
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def download(self, obj_key, bucket=None, region=None):
"""
Downloads a JSON object from S3. File is received as a compressed binary Parquet file
:param obj_key: Path and object name of the data stored in S3
:param bucket: Bucket the target object is stored in.
:param region: AWS Region of the target bucket.
:return: JSON data object generated from the Parquet file stored in S3
"""
l = self._logger
if not bucket:
# if no bucket provided, use repo bucket name from stage config
bucket = self._repo_bucket
if not region:
# if no region provided, use region name from stage config
region = self._s3_region
# - Only used for logging; extract dates and data source from the object key
obj_key_parts = obj_key.split('/')
l.info('-- Downloading %s object from bucket %s' % (obj_key, bucket))
operation = 'download'
params = {
'bucket': bucket,
'obj_key': obj_key,
'region': region
}
try:
resp = self._send(operation, params)
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def get_presigned_url(self, bucket=None, obj_key='', client_method='get_object', expiration=3600, region=None, content_type="text/plain"):
"""
Generate a presigned URL to object from S3.
Used primarily for retreiving image objects in Ignition
:param obj_key: str; uri of object to fetch
:param bucket_: str; bucket name where object resides
:param client_method: str; (default = 'get_object')
:param expiration: int; number of seconds until the link expires (default = 3600, 1 hour)
:param content_type: str; the content-type of the object (default = 'text/plain')
:return: str; presigned URL as string. If no client_method or error, returns None.
"""
l = self._logger
if not bucket:
# if no bucket provided, use repo bucket name from stage config
bucket = self._repo_bucket
if not region:
# if no region provided, use region name from stage config
region = self._s3_region
if not content_type:
msg = 'content_type cannot be null!'
l.error(msg)
raise InvalidParametersS3Manager(msg)
l.info('Fetching pre-signed url for %s from bucket %s' % (obj_key, bucket))
operation = 'get_presigned_url'
params = {
'bucket': bucket,
'obj_key': obj_key,
'client_method': client_method,
'expiration': expiration,
'content_type': content_type
}
try:
resp = self._send(operation, params)
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def delete(self, obj_key, bucket=None, region=None):
"""
Deletes a JSON object from S3. File is flagged for deletion in the S3 bucket
:param obj_key: Path and object name of the data stored in S3
:param bucket: Bucket the target object is stored in.
:param region: AWS Region of the target bucket.
:return: Boto3 `delete_object` response
"""
l = self._logger
if not bucket:
# if no bucket provided, use repo bucket name from stage config
bucket = self._repo_bucket
if not region:
# if no region provided, use region name from stage config
region = self._s3_region
l.info('Deleting %s object from bucket %s' % (obj_key, bucket))
operation = 'delete'
params = {
'bucket': bucket,
'obj_key': obj_key,
'region': region
}
try:
resp = self._send(operation, params)
l.debug('** Complete. Successfully deleted %s' % obj_key)
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def list_objects(self, bucket=None, prefix='', start_after='', region=None):
"""
Fetches a list of objects within a specified bucket, prefix, and starting point
:param bucket: str; Bucket target object is located
:param prefix: str; Limits the response to keys that begin with the specified prefix
:param start_after: str; StartAfter is where you want Amazon S3 to start listing from.
Amazon S3 starts listing after this specified key. StartAfter can be any key in the bucket.
:param region: Region of the target S3 Bucket
:return: Boto3 `list_objects_v2.Contents` response. This consists of the following keys per object returned:
{
'ETag': str; unique id,
'Key': str; path to object in bucket,
'LastModified': datetime.datetime(); time object last modified,
'Size': int; size in bytes of the object,
'StorageClass': str; type of storage used on the object
}
"""
l = self._logger
if not bucket:
# if no bucket provided, use repo bucket name from stage config
bucket = self._repo_bucket
if not region:
# if no region provided, use region name from stage config
region = self._s3_region
l.info('Fetching list of objects from bucket %s' % bucket)
operation = 'list_objects'
params = {
'bucket': bucket,
'prefix': prefix,
'start_after': start_after,
'region': region
}
try:
resp = self._send(operation, params)
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def list_object_versions(self, bucket=None, prefix='', region=None):
"""
Fetches a list of object versions within a specified bucket, prefix, and starting point
:param bucket: str; Bucket target object is located
:param prefix: str; Limits the response to keys that begin with the specified prefix
:param region: Region of the target S3 Bucket
:return: Boto3 `list_object_versions.Versions` response. This consists of the following keys per object returned:
{
'ETag': str; unique id,
'IsLatest': bool; only true for the current version,
'Key': str; path to object in bucket,
'LastModified': datetime.datetime(); time object last modified,
'Owner': {'DisplayName': str; name of owner/group, 'ID': str;,}
'Size': int; size in bytes of the object,
'StorageClass': str; type of storage used on the object,
'VersionId': str; ID of object version
}
"""
l = self._logger
if not bucket:
# if no bucket provided, use repo bucket name from stage config
bucket = self._repo_bucket
if not region:
# if no region provided, use region name from stage config
region = self._s3_region
l.info('Fetching list of object versions from bucket %s' % bucket)
operation = 'list_object_versions'
params = {
'bucket': bucket,
'prefix': prefix,
'region': region
}
try:
resp = self._send(operation, params)
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def list_object_delete_markers(self, bucket=None, prefix='', region=None):
"""
Fetches a list of object delete markers within a specified bucket, prefix, and starting point
:param bucket: str; Bucket target object is located
:param prefix: str; Limits the response to keys that begin with the specified prefix
:param region: Region of the target S3 Bucket
:return: Boto3 `list_object_versions.DeleteMarkers` response. This consists of the following keys per object returned:
{
'IsLatest': bool; only true for the current version,
'Key': str; path to object in bucket,
'LastModified': datetime.datetime(); time object last modified,
'Owner': {'DisplayName': str; name of owner/group, 'ID': str;,}
'VersionId': str; ID of object version
}
"""
l = self._logger
if not bucket:
# if no bucket provided, use repo bucket name from stage config
bucket = self._repo_bucket
if not region:
# if no region provided, use region name from stage config
region = self._s3_region
l.info('Fetching list of object delete markers from bucket %s' % bucket)
operation = 'list_object_delete_markers'
params = {
'bucket': bucket,
'prefix': prefix,
'region': region
}
try:
resp = self._send(operation, params)
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def copy_single(self, source_bucket=None, dest_bucket=None, source_key='', dest_key='', region=None):
"""
Method to copy a single object from source bucket|key to destination bucket|key.
:param source_bucket: str; Source bucket name to copy from
:param dest_bucket: str; Destination bucket name to copy to
:param source_key: str; Source object key name to copy
:param dest_key: str; Destination object key name to copy to
:param region: Region of the target S3 Bucket
:return: null or ClientError; returns null if successfully copied
"""
l = self._logger
if not source_bucket:
# if no source bucket provided, use repo bucket name from stage config
source_bucket = self._repo_bucket
if not dest_bucket:
# if no destination bucket provided, use repo bucket name from stage config
dest_bucket = self._repo_bucket
if not region:
# if no region provided, use region name from stage config
region = self._s3_region
l.info('Copying %s object from bucket %s to object %s in bucket %s' % (source_key, source_bucket, dest_key, dest_bucket))
l.debug('++ Copying data in S3')
operation = 'copy_single'
params = {
'source_bucket': source_bucket,
'dest_bucket': dest_bucket,
'source_key': source_key,
'dest_key': dest_key,
'region': region
}
try:
resp = self._send(operation, params)
l.debug('** Complete. Successfully copied object %s from bucket %s to object %s in bucket %s' %
(source_key, source_bucket, dest_key, dest_bucket))
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def fetch_site_list(self, bucket=None):
"""
This method will compile a list of all sites configured in the requested S3 bucket
:param bucket: str; the S3 bucket to fetch sites from. (Default = S3_REPO_BUCKET_NAME)
:return: list; array of whids present in the S3 bucket
"""
l = self._logger
if not bucket:
# if no bucket provided, use repo bucket name from stage config
bucket = self._repo_bucket
l.info('Requesting site list for bucket: %s' % bucket)
operation = 'fetch_site_list'
params = {
'bucket': bucket
}
try:
resp = self._send(operation, params)
l.debug('** Complete. Successfully returned %d sites for bucket %s' % (len(resp), bucket))
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def fetch_object_list_by_site_and_bucket(self, site='', bucket=None):
"""
This function fetches the list of file objects
from the S3 folder specified by the bucket and site args supplied.
:param site: str; whid name of the site to fetch from
:param bucket: str; name of the bucket where the files reside
:return: Dict[str, Any]; {'instance_configs': Dict[str,Any], 'flow_views': List[str]}
"""
l = self._logger
if not bucket:
# if no bucket provided, use repo bucket name from stage config
bucket = self._repo_bucket
l.info('Requesting object list for site %s in bucket: %s' % (site, bucket))
operation = 'fetch_object_list_by_site_and_bucket'
params = {
'site': site,
'bucket': bucket
}
try:
resp = self._send(operation, params)
l.debug('** Complete. Successfully returned object list for site %s on bucket %s' % (site, bucket))
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def query_audit_table(self, start_time = None, end_time = None, operation = None, copy_option = None,
destination_bucket = None, destination_view = None, destination_object_key = None,
destination_site = None, destination_stage = None, destination_version_id = None,
error_message = None, error_occurred = None, expires = None,
source_bucket = None, source_view = None, source_object_key = None, source_site = None,
source_stage = None, source_version_id = None, timestamp = None, username = None,
return_items_only = True, **kwargs):
"""
Query/scan the audit table and return records matching the supplied parameters
:param start_time: Optional[Union[str,datetime]]; if provided, will define the beginning of the
time range to filter on the `timestamp` column. `timestamp` column is a string in the format
"%Y-%m-%d %H:%M:%S"
:param end_time: Optional[Union[str,datetime]]; if provided, will define the beginning of the
time range to filter on the `timestamp` column. `timestamp` column is a string in the format
"%Y-%m-%d %H:%M:%S"
:param operation: Optional[Union[str,List,Dict]]; match on operation column
:param copy_option: Optional[Union[str,List,Dict]]; match on copy_option column ('svg', 'json', 'both')
:param destination_bucket: Optional[Union[str,List,Dict]]; match on destination_bucket column
:param destination_view: Optional[Union[str,List,Dict]]; match on destination_view column
:param destination_object_key: Optional[Union[str,List,Dict]]; match on destination_object_key column
:param destination_site: Optional[Union[str,List,Dict]]; match on destination_site column
:param destination_stage: Optional[Union[str,List,Dict]]; match on destination_stage column
:param destination_version_id: Optional[Union[str,List,Dict]]; match on destination_version_id column
:param error_message: Optional[Union[str,List,Dict]]; match on error_message column
:param error_occurred: Optional[Union[bool,List,Dict]]; match on error_error_occurred column
:param expires: Optional[Union[str,List,Dict]]; match/filter on expires column
:param source_bucket: Optional[Union[str,List,Dict]]; match on source_bucket column
:param source_view: Optional[Union[str,List,Dict]]; match on source_view column
:param source_object_key: Optional[Union[str,List,Dict]]; match on source_object_key column
:param source_site: Optional[Union[str,List,Dict]]; match on source_site column
:param source_stage: Optional[Union[str,List,Dict]]; match on source_stage column
:param source_version_id: Optional[Union[str,List,Dict]]; match on source_version_id column
:param timestamp: Optional[Union[str,List,Dict]]; match/filter on timestamp column
(overridden by `start_time` and `end_time` args)
:param username: Optional[Union[str,List,Dict]]; match on username column
:param return_items_only: bool; if true, strip the `Items` from boto3 response,
if false, return the entire response object
:returns: List[Dict[str,Any]]; array of items that match the scan filters supplied
"""
l = self._logger
# build params to send to Lambda using `locals()`. I know it's frowned upon but I'm not trying to type all that!!
params = {k:v for k,v in locals().items() if k not in ('self', 'l', 'kwargs') and v not in (None, '')}
# override `operation` arg for pass to `_send` method, as the value to query is already packed in `params`
operation = 'query_audit_table'
try:
resp = self._send(operation, params)
l.debug('** Complete. Successfully queried audit table using supplied query params')
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def check_user_site_permissions(self, whid = None, obj_key = None):
"""
Check if a given username has permissions to the site folder in the flow-view S3 bucket
:param whid: str; warehouse id/site name to check
:param obj_key: str; [OPTIONAL] if provided, will check user permissions to the object key, rather than the whid
:return: Dict[str,Any]; {
'code': int; 200 if the user has permissions, 403 if Forbidden to access
'message': str; explanation to display, if needed. Will include necessary group memberships missing if Forbidden
}
"""
l = self._logger
operation = 'check_user_site_permissions'
params = {'whid': whid, 'obj_key': obj_key}
try:
resp = self._send(operation, params)
l.debug('** Complete. Successfully checked user site permissions on backend')
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def fetch_user_site_permissions_and_area_list(self, username = None, stage_name = 'beta'):
"""
Fetch the sites for which the user has flow-view write permissions for the given stage.
Also fetches the list of "area" names that flow-views can be created for
:param username: str; user alias/id to fetch sites for
:param stage_name: str; stage folder of flow-view resources to check permissions on
:return: Dict[str,Any]; response object including a list of sites and area names.
{
"code": int; 200 if successful call, 4** if user not found,
"sites": List[str]; List of site names,
"areas": List[str]; List of valid flow-view area names
}
"""
l = self._logger
operation = 'fetch_user_site_permissions_and_area_list'
params = {'username': username, 'stage_name': stage_name}
try:
resp = self._send(operation, params)
l.debug('** Complete. Successfully fetched user site permissions and area list')
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
def fetch_master_area_list(self):
"""
Download a master list of valid flow-view area names, stored in S3
:return: List[str]; returns list of area names
"""
l = self._logger
operation = 'fetch_master_area_list'
params = {}
try:
resp = self._send(operation, params)
l.debug('** Complete. Successfully fetched master area list')
return resp
except HTTPErrorS3Manager, e:
return {'code': e.code, 'message': e.message}
class InvalidOperationS3Manager(Exception):
"""
Invalid operation requested for S3Manager class
"""
def __init__(self, code=400, msg='Invalid operation requested for S3Manager class'):
self.code = code
self.message = msg
class InvalidParametersS3Manager(Exception):
"""
Invalid parameters for S3Manager operation
"""
def __init__(self, code=400, msg='Invalid parameters for S3Manager operation'):
self.code = code
self.message = msg
class HTTPErrorS3Manager(Exception):
"""
HTTP Error for S3Manager Request
"""
def __init__(self, code=500, msg='HTTP Error Encountered Sending S3Manager Request'):
self.code = code
self.message = msg