284 lines
10 KiB
Plaintext
284 lines
10 KiB
Plaintext
class helper:
|
|
@staticmethod
|
|
def list_of_dict_to_dataset(dict_data,dict_column): #dict_data and dict_column are lists of dictionaries
|
|
import pprint
|
|
#get keys in the first dictionary in the list
|
|
#This is used to ensure only real fields are used in the column
|
|
if len(dict_data) != 0 and len(dict_column) != 0:
|
|
if isinstance(dict_data[0], dict) and isinstance(dict_column[0], dict):
|
|
pass
|
|
else:
|
|
raise TypeError('Dictionary','The items in lists are not Dictionaries.')
|
|
else:
|
|
raise TypeError('List','List provided is empty')
|
|
test = dict_data[0]
|
|
pprint.pprint(test)
|
|
d_keys = dict_data[0].keys()
|
|
|
|
#create the column headers from provided dict_column
|
|
#but only if the column name is used as a dict in the data.
|
|
column_header = []
|
|
pprint.pprint(dict_column)
|
|
for row in dict_column:
|
|
if row['field'] in d_keys:
|
|
column_header.append(row['field'])
|
|
pprint.pprint(column_header)
|
|
# create rows used as final list of lists for conversion to dataset
|
|
rows = []
|
|
for di in dict_data:
|
|
#create individual row
|
|
row = []
|
|
for column in column_header:
|
|
if di.has_key(column):
|
|
row.append(di[column])
|
|
else:
|
|
row.append('')
|
|
#append each row to rows so all data is captured in a list of lists.
|
|
rows.append(row)
|
|
|
|
#screate dataset from header and rows of data
|
|
dataset = system.dataset.toDataSet(column_header,rows)
|
|
return({'basic_dataset':dataset})
|
|
|
|
|
|
|
|
@staticmethod
|
|
def sanitize_tree(element):
|
|
"""
|
|
Arguments:
|
|
element: array/list or dict to be sanitized (remove Java "object wrappers")
|
|
Returns:
|
|
element: sanitized input array/list/dict
|
|
Usage:
|
|
from helper.helper import sanitize_tree
|
|
sanitizedTableData = sanitize_tree(myTable.props.data)
|
|
|
|
## This function loops recursively over a component property and converts all
|
|
## siblings and children to base elements
|
|
"""
|
|
if hasattr(element, '__iter__'):
|
|
if hasattr(element, 'keys'):
|
|
return dict((helper.string_decode(k), helper.sanitize_tree(helper.string_decode(element[k]))) for k in element.keys())
|
|
else:
|
|
return list(helper.sanitize_tree(helper.string_decode(x)) for x in element)
|
|
return element
|
|
|
|
|
|
|
|
@staticmethod
|
|
def dataset_to_dict(dataset): #provide a basicdataset as input
|
|
#verify basic dataset was passed in
|
|
string_type = str(type(dataset))
|
|
if string_type != "<type 'com.inductiveautomation.ignition.common.BasicDataset'>":
|
|
raise TypeError('Dataset','expected BaiscDataset type received %s'%(string_type))
|
|
else:
|
|
raw_dataset = system.dataset.toPyDataSet(dataset)
|
|
#convert from PyDataSet to list of lists and header names.
|
|
headers = [((str(col).replace(' ','')).replace('_','')) for col in raw_dataset.getColumnNames()]
|
|
raw_dataset = [[col for col in row] for row in raw_dataset]
|
|
|
|
#create needed empty lists to put dictionaries in
|
|
col_data = []
|
|
data_data = []
|
|
#create column names for column field of a table
|
|
for col in headers:
|
|
col_data.append({'field':col,
|
|
'visible': True,
|
|
'editable':False,
|
|
'render':'auto',
|
|
'justify':'auto',
|
|
'align':'center',
|
|
'resizable':True,
|
|
'sortable':True,
|
|
'sort':'none',
|
|
'boolean':'checkbox',
|
|
'number':'value',
|
|
'numberFormat': '0,0.##',
|
|
'dateFormat':'MM/DD/YYYY'
|
|
})
|
|
#Create Data dictionaries used for data field of a table
|
|
data_counter = 0
|
|
for row in raw_dataset:
|
|
data = {}
|
|
for col_num in range(len(row)):
|
|
data[headers[col_num]] = row[col_num]
|
|
data_data.append(data)
|
|
data_counter = data_counter+1
|
|
#pass out list of dictionary, list of dictionary, string
|
|
return({'data':data_data,'column':col_data})
|
|
|
|
@staticmethod
|
|
def xyChartTransform(ds=None, columns=None):
|
|
## This function is for use with the XY chart in Perspective
|
|
## This component requires an array of dictionaries for the chart data structure, rather than an Ignition dataset
|
|
## Each dict item should have the format of: {'Column_X': valueX, 'Column_Y': valueY}
|
|
## ds = Ignition dataset, such as that returned by a SQL query binding
|
|
## columns = list of column names to be included in the array of dicts returned by function.
|
|
## NOTE: the first column name listed will be the X axis
|
|
## If no column list passed in, the function will grab all names from the dataset
|
|
|
|
## NOTE: This function will return every column/value in the dataset as a dict key
|
|
rows = []
|
|
if ds is None:
|
|
return rows ## return empty array if no dataset passed in
|
|
try:
|
|
data = system.dataset.toPyDataSet(ds) ## convert to python dataset
|
|
if columns is None:
|
|
columns = system.dataset.getColumnHeaders(ds) ## extract column names from dataset
|
|
for row in data: ## Loop over python dataset
|
|
d = {} ## initialize empty dictionary for each row
|
|
for c in columns: ## loop over column list
|
|
d[c] = row[c] ## add key for each column as key and assign value
|
|
rows.append(d) ## append new row
|
|
return rows
|
|
except:
|
|
return rows
|
|
|
|
@staticmethod
|
|
def string_decode(v):
|
|
"""
|
|
Arguments:
|
|
v: value to be decoded (ignore if not unicode string)
|
|
Returns:
|
|
v: decoded value
|
|
Usage:
|
|
from helper.helper import string_decode
|
|
my_unicode_string = (u'hello world')
|
|
decoded_string = string_decode(my_unicode_string)
|
|
|
|
## This function is a helper for the sanitize_tree function, normally
|
|
"""
|
|
# vscode pylint v2.7.* will complain about this but it works, also works in Ignition v.8.1.*
|
|
if isinstance(v, unicode):
|
|
# replace any nasty em- or en-dashes with a more sane '-' short dash
|
|
# this should avoid most of the nested try:except: blocks below
|
|
if u'\u2013' in v: v = v.replace(u'\u2013', '-')
|
|
try: v = str(v)
|
|
except:
|
|
import traceback
|
|
from loggerConfig import getLogger
|
|
logger = getLogger('sanitize_tree_string_decode')
|
|
logger.warn(traceback.format_exc())
|
|
try:
|
|
v = v.encode('utf-8')
|
|
v = str(v)
|
|
except:
|
|
logger.warn(traceback.format_exc())
|
|
try: v.encode('ascii', 'ignore')
|
|
except:
|
|
logger.warn(traceback.format_exc())
|
|
try: v = repr(v)
|
|
except:
|
|
logger.warn(traceback.format_exc())
|
|
return v
|
|
return v
|
|
|
|
@staticmethod
|
|
def centerJustifyTableColumns(columns=[]):
|
|
"""
|
|
## This method takes a table column config object (array of dicts) and returns it with all columns and their headers set to center justified
|
|
Arguments:
|
|
columns: [array of dict] table column configuration
|
|
Returns:
|
|
columns: [array of dict] table column config, with all columns/headers center justified
|
|
Usage:
|
|
from helper.helper import centerJustifyTableColumns
|
|
myTable.props.columns = centerJustifyTableColumns(myTable.props.columns)
|
|
"""
|
|
try:
|
|
for column in columns:
|
|
try:
|
|
column['justify'] = 'center'
|
|
column['header']['justify'] = 'center'
|
|
except: continue
|
|
except: pass
|
|
return columns
|
|
|
|
@staticmethod
|
|
def get_dropdown_options_from_dataset(ds=None, value_column=None, label_column=None):
|
|
"""
|
|
This method takes an ignition dataset object, and a name or index for "value" and "label" columns
|
|
and returns an array of dict objects for each row to use in the binding of perspective dropdown "options" prop.
|
|
Arguments:
|
|
ds: Ignition dataset object, ie from named SQL query
|
|
value_column: [string or integer] if string, the name of the column to represent the value when option is selected.
|
|
if integer, the column index for the value
|
|
label_column: [string or integer] if string, the name of the column to represent the label (display) for option selection.
|
|
if integer, the column index for the label
|
|
Returns: [{
|
|
value: value to be assigned to dropdown "value" property when options selection
|
|
label: value to be displayed in the dropdown for each option
|
|
}]
|
|
Usage:
|
|
# in a script transform on the dropdown.props.options binding
|
|
from helper.helper import get_dropdown_options_from_dataset
|
|
options = get_dropdown_options_from_dataset(
|
|
ds=value, # from property/query binding above this transform
|
|
value_column='my_value_column_name',
|
|
label_column='my_label_column_name
|
|
)
|
|
return options
|
|
"""
|
|
# Check required arguments for null values
|
|
if ds is None:
|
|
msg = 'No dataset passed in'
|
|
return {'error': msg}
|
|
if value_column is None:
|
|
msg = 'No value_column name or index passed in'
|
|
return {'error': msg}
|
|
if label_column is None:
|
|
msg = 'No label_column name or index passed in'
|
|
return {'error': msg}
|
|
# convert the ignition dataset to python dataset to iterate over
|
|
try: data = system.dataset.toPyDataSet(ds)
|
|
except:
|
|
import traceback
|
|
msg = 'Error converting dataset to python data: %s' % traceback.format_exc()
|
|
return {'error': msg}
|
|
# grab the column headers from input dataset
|
|
headers = system.dataset.getColumnHeaders(ds)
|
|
# verify both the value and label columns are in the list of column headers, if passed in as strings
|
|
# if passed in as integer indexes, make sure they are valid
|
|
if isinstance(value_column, str):
|
|
if value_column not in headers:
|
|
msg = 'value_column (%s) not in dataset column headers!' % value_column
|
|
return {'error': msg}
|
|
elif isinstance(value_column, int):
|
|
if value_column not in range(len(headers)):
|
|
msg = 'value_column index (%d) not valid!' % value_column
|
|
return {'error': msg}
|
|
else: # if not string or integer, invalid type
|
|
msg = 'invalid type for value_column (%s). Must be integer or string' % type(value_column)
|
|
return {'error': msg}
|
|
if isinstance(label_column, str):
|
|
if label_column not in headers:
|
|
msg = 'label_column (%s) not in dataset column headers!' % label_column
|
|
return {'error': msg}
|
|
elif isinstance(label_column, int):
|
|
if label_column not in range(len(headers)):
|
|
msg = 'label_column index (%d) not valid!' % label_column
|
|
return {'error': msg}
|
|
else: # if not string or integer, invalid type
|
|
msg = 'invalid type for label_column (%s). Must be integer or string' % type(label_column)
|
|
return {'error': msg}
|
|
# if passed all verification checks, build array of objects representing label/value pairs for each dataset row
|
|
options = [{'value': row[value_column], 'label': row[label_column]} for row in data]
|
|
return options
|
|
|
|
@staticmethod
|
|
def keys_exists(element, *keys):
|
|
'''
|
|
Check if *keys (nested) exists in `element` (dict).
|
|
'''
|
|
if not isinstance(element, dict):
|
|
raise AttributeError('keys_exists() expects dict as first argument.')
|
|
if len(keys) == 0:
|
|
raise AttributeError('keys_exists() expects at least two arguments, one given.')
|
|
|
|
_element = element
|
|
for key in keys:
|
|
try:
|
|
_element = _element[key]
|
|
except KeyError:
|
|
return False
|
|
return True |