TPA8/TPA8_SCADA/ignition/timer/ActiveAlarms/handleTimerEvent.py
2026-02-28 17:04:25 +04:00

258 lines
7.4 KiB
Python

def handleTimerEvent():
# ---------------------------------------
# Gateway Timer Script - Active Alarms Table
# ---------------------------------------
#
# Runs every 3 seconds to query active alarms and populate a dataset table.
#
# Features:
# - Queries runtime alarms (ActiveUnacked, ActiveAcked) from alarm provider
# - Joins with alarm_events database table to get DB IDs and event timestamps
# - Filters alarms by minimum priority level (read from tag)
# - Calculates alarm duration from DB event time
# - Excludes System Startup/Shutdown alarms
# - Optional age filter to hide old ghost alarms
#
# Output Dataset Columns:
# - NumberID: Database ID from alarm_events table
# - EventTimestamp: Formatted timestamp from alarm_events
# - Duration: Calculated duration (HH:MM:SS)
# - Priority: Alarm priority text (Diagnostic/Low/Medium/High/Critical)
# - Location: Custom alarm property (myLocation)
# - Description: Formatted from display path and source
# - Tag: Custom alarm property (myTag)
# - Style: CSS style class based on priority
#
import system
TAG_TABLE = "[TPA8_SCADA_TAG_PROVIDER]System/Queries/Alarms/ActiveAlarmsTable"
PROVIDER = "TPA8_SCADA_TAG_PROVIDER"
TAG_MIN_PRIO = "[TPA8_SCADA_TAG_PROVIDER]System/Queries/Alarms/Priority"
DB = "MariaDB"
logger = system.util.getLogger("TPA8_ActiveAlarms")
MAX_AGE_HOURS = 0
def priority_text_and_style(p_raw):
if p_raw is None:
text = "Unknown"
else:
text = str(p_raw)
p = text.lower()
if p in ("high", "critical"):
style = "Alarms-Styles/High"
elif p == "medium":
style = "Alarms-Styles/Medium"
elif p == "low":
style = "Alarms-Styles/Low"
elif p == "diagnostic":
style = "Alarms-Styles/Diagnostic"
else:
style = "Alarms-Styles/NoAlarm"
return text, style
def priority_level(p_text):
p = (p_text or "").lower()
if p == "diagnostic":
return 0
if p == "low":
return 1
if p == "medium":
return 2
if p in ("high", "critical"):
return 3
return 0
try:
try:
min_val = system.tag.readBlocking([TAG_MIN_PRIO])[0].value
try:
min_prio_level = int(min_val)
except:
min_prio_level = 0
except:
min_prio_level = 0
if min_prio_level < 0:
min_prio_level = 0
if min_prio_level > 3:
min_prio_level = 3
headers_table = ["NumberID", "EventTimestamp", "Duration", "Priority",
"Location", "Description", "Tag", "Style"]
events = system.alarm.queryStatus(
state=["ActiveUnacked", "ActiveAcked"],
provider=[PROVIDER],
includeShelved=False,
includeSystem=False
)
if not events:
ds_empty_table = system.dataset.toDataSet(headers_table, [])
system.tag.writeBlocking([TAG_TABLE], [ds_empty_table])
now = system.date.now()
max_age_msec = MAX_AGE_HOURS * 60L * 60L * 1000L if MAX_AGE_HOURS > 0 else None
eventIdSet = set()
for ev in events:
try:
eid = str(ev.id)
except:
eid = None
if eid:
eventIdSet.add(eid)
idMap = {}
if eventIdSet:
try:
eventIds = list(eventIdSet)
placeholders = ",".join(["?"] * len(eventIds))
sql_ids = """
SELECT
eventid,
MIN(id) AS dbId,
MIN(eventtime) AS eventtime
FROM alarm_events
WHERE eventid IN (%s)
AND eventtype = 0
GROUP BY eventid
""" % placeholders
ds_ids = system.db.runPrepQuery(sql_ids, eventIds, DB)
for row in ds_ids:
try:
eid = str(row["eventid"])
dbId = row["dbId"]
evT = row["eventtime"]
if dbId is not None and evT is not None:
idMap[eid] = (dbId, evT)
except:
pass
except Exception, exId:
logger.error("ActiveAlarms: DB id map error: %s" % exId)
idMap = {}
rows_table = []
for ev in events:
try:
try:
eid_str = str(ev.id)
except:
eid_str = None
if not eid_str:
continue
info = idMap.get(eid_str)
if not info:
continue
dbId, evTime = info
srcObj = ev.getSource()
dpObj = ev.getDisplayPath()
src = str(srcObj) if srcObj is not None else ""
dp = str(dpObj) if dpObj is not None else ""
txt = src + " " + dp
if "System Startup" in txt or "System Shutdown" in txt:
continue
try:
loc = ev.getOrElse("myLocation", "")
except:
loc = ""
if loc is None:
loc = ""
location = str(loc)
try:
tagVal = ev.getOrElse("myTag", "")
except:
tagVal = ""
if tagVal is None:
tagVal = ""
tagPath = str(tagVal)
try:
p_raw = ev.getOrElse("priority", None)
except:
p_raw = None
if p_raw is None:
try:
p_raw = ev.getPriority()
except:
p_raw = None
priorityText, styleClass = priority_text_and_style(p_raw)
level = priority_level(priorityText)
if level < min_prio_level:
continue
try:
diff_ms = system.date.toMillis(now) - system.date.toMillis(evTime)
except:
diff_ms = 0
if diff_ms < 0:
diff_ms = -diff_ms
if max_age_msec is not None and diff_ms > max_age_msec:
continue
total_sec = diff_ms / 1000
hours = total_sec // 3600
minutes = (total_sec % 3600) // 60
seconds = total_sec % 60
duration_str = "%02d:%02d:%02d" % (hours, minutes, seconds)
event_ts_str = system.date.format(evTime, "yyyy-MM-dd HH:mm:ss")
if ":/alm:" in src:
desc_suffix = src.split(":/alm:", 1)[1]
else:
desc_suffix = src
description = (dp.replace("_", "-") + " " + desc_suffix).strip()
try:
num_id = int(dbId)
except:
continue
rows_table.append([
num_id,
event_ts_str,
duration_str,
priorityText,
location,
description,
tagPath,
styleClass
])
except Exception, rowErr:
try:
alarm_id = str(ev.id)
except:
alarm_id = "unknown"
logger.error("ActiveAlarms row error for %s: %s" % (alarm_id, rowErr))
ds_table = system.dataset.toDataSet(headers_table, rows_table)
system.tag.writeBlocking([TAG_TABLE], [ds_table])
except Exception, e:
logger.error("ActiveAlarms update failed: %s" % e)