105 lines
4.2 KiB
Plaintext
105 lines
4.2 KiB
Plaintext
#############################################################################################
|
|
# Purpose: This package contains all the modules that get called by the symbol library #
|
|
# and edit item views. These modules MAKES calls to S3 under the SCADA account. #
|
|
# Some of these modules depend on the AWS > S3 manager. #
|
|
# Login: Date: #Comment: Version: #
|
|
# dmamani 1/4/23 Release to Production V1 #
|
|
# #
|
|
#############################################################################################
|
|
|
|
from AWS.s3 import S3Manager
|
|
from datetime import datetime
|
|
from pprint import pprint
|
|
import json
|
|
|
|
BUCKET_REGION = "us-east-1"
|
|
BUCKET_NAME = "map-ignition-parent-docs"
|
|
SYMBOL_LIBRARY_JSON = "symbol_library.json"
|
|
|
|
def fetch_library(backup_path=SYMBOL_LIBRARY_JSON, username=None):
|
|
# - Create a client to interact with AWS S3
|
|
s3 = S3Manager(username=username)
|
|
# - Fetch the object from S3. If backup path was provided, use that, otherwise fetch the primary file. Convert
|
|
# data response to a string
|
|
library_json = s3.download(backup_path or SYMBOL_LIBRARY_JSON, bucket=BUCKET_NAME, region=BUCKET_REGION)
|
|
return library_json
|
|
|
|
def list_backups(username=None):
|
|
# - Create a client to interact with AWS S3
|
|
s3 = S3Manager(username=username)
|
|
# - Fetch list of objects in S3 from the backups folder. List comp is used to convert response objects from
|
|
# aws to a simple list of S3 paths to each Backup object
|
|
backup_objects = []
|
|
|
|
for backup_object in s3.list_objects(bucket=BUCKET_NAME, region=BUCKET_REGION):
|
|
if backup_object['Key'] != SYMBOL_LIBRARY_JSON:
|
|
backup_objects.append(backup_object['Key'])
|
|
return sorted(backup_objects, reverse=True)
|
|
|
|
def write_library(library, backup=True, username=None):
|
|
# - Check type of object to write. Raise exception if it isnt a dictionary
|
|
if not isinstance(library, dict):
|
|
raise ValueError("'library' argument must be a dictionary of the entire symbol "
|
|
"library dataset, not %" % type(library))
|
|
# - Create a client to interact with AWS S3
|
|
library = json.dumps(library).encode()
|
|
s3 = S3Manager(username=username)
|
|
if backup:
|
|
resp = s3.upload(
|
|
library,
|
|
"%s--%s" % (datetime.utcnow().strftime('%m-%d-%y %H:%M:%S.%f'), username),
|
|
bucket=BUCKET_NAME,
|
|
region=BUCKET_REGION,
|
|
content_type='application/json'
|
|
)
|
|
resp = s3.upload(library, SYMBOL_LIBRARY_JSON, bucket=BUCKET_NAME, region=BUCKET_REGION, content_type='application/json')
|
|
return resp
|
|
|
|
def update_symbol_library(path, username=None, **value):
|
|
# - Fetch the most recent library from S3
|
|
current_lib = fetch_library(username=username)
|
|
# - Update the given path with the **value dictionary of key-value arguments
|
|
if path in current_lib:
|
|
current_lib[path].update(value)
|
|
else:
|
|
current_lib[path]=value
|
|
print(current_lib[path])
|
|
print(value)
|
|
# - Write the modified library to S3
|
|
resp = write_library(current_lib, username=username)
|
|
return resp
|
|
|
|
|
|
def delete_symbol(path, username=None):
|
|
current_lib = fetch_library(username=username)
|
|
try:
|
|
del current_lib[path]
|
|
return write_library(current_lib, username=username)
|
|
except KeyError:
|
|
raise Exception("path %s does not exist" % path)
|
|
|
|
def rollback(backup_path, username=None):
|
|
# - Fetch the backup object
|
|
library = fetch_library(backup_path, username=username)
|
|
# - Write it to the primary location. Disable backup to prevent duplicate backups
|
|
write_library(library, backup=False, username=username)
|
|
|
|
|
|
# - Gets List of Categories
|
|
def list_categories(library_items):
|
|
# - Example of Entry: !!! insert example here bro
|
|
categories = list(set([entry["category"] for entry in library_items.values() if entry["category"]]))
|
|
categories = sorted(categories)
|
|
categories.insert(0,"ALL")
|
|
return categories
|
|
|
|
# - Searches List of
|
|
def search_items(library_items, category):
|
|
if category == 'ALL' :
|
|
items = [name for name, entry in library_items.items()]
|
|
else:
|
|
items = [name for name, entry in library_items.items() if entry["category"] == category]
|
|
# - Obtains the last part of the path for example (
|
|
items.sort(key = lambda x : x.split("/")[len(x.split("/"))-1])
|
|
return items
|
|
|