in get-historical-metrics.py [0:0]
def GetHistoricalData():
global Logger,LastRealtimeRun,Data,DataSources,MetricUnitMapping,Data
Connect = boto3.client("connect")
#
# Build a list of information we need from the API.
#
ConnectList = {}
for Item in DataSources:
if Item not in Data: Data[Item] = "0"
(ConnectARN,QueueARN,Metric) = DataSources[Item].split(":")
if ConnectARN not in ConnectList: ConnectList[ConnectARN] = {}
if QueueARN not in ConnectList[ConnectARN]: ConnectList[ConnectARN][QueueARN] = []
ConnectList[ConnectARN][QueueARN].append({"Name":Metric,"Unit":MetricUnitMapping[Metric][0],"Statistic":MetricUnitMapping[Metric][1]})
FiveMinuteMark = datetime.datetime.now().minute-datetime.datetime.now().minute%5
#
# Now call the API for each Connect instance we're interested in.
#
for Instance in ConnectList:
Logger.debug("Retrieving historical data from "+Instance)
MetricList = []
for Queue in ConnectList[Instance]:
MetricList += ConnectList[Instance][Queue]
try:
Response = Connect.get_metric_data(
InstanceId=Instance,
StartTime=datetime.datetime.now().replace(hour=0, minute=0, second=0),
EndTime=datetime.datetime.now().replace(minute=FiveMinuteMark, second=0),
Groupings=["QUEUE"],
Filters={"Queues":list(ConnectList[Instance].keys())},
HistoricalMetrics=MetricList)
except Exception as e:
Logger.error("Failed to get historical data: "+str(e))
continue
if "MetricResults" not in Response: continue
for Collection in Response["MetricResults"]:
QueueARN = Collection["Dimensions"]["Queue"]["Id"]
for Metric in Collection["Collections"]:
MetricName = Metric["Metric"]["Name"]
MetricValue = Metric["Value"]
StoreMetric(Instance, QueueARN, MetricName, MetricValue)