decisionai_plugin/common/tsanaclient.py (386 lines of code) (raw):
import json
import math
import datetime
import os
from .util.constant import STATUS_SUCCESS, STATUS_FAIL
from .util.constant import USER_ADDR
from .util.constant import INGESTION_API, META_API, TSG_API, STORAGE_GW_API
from .util.constant import INSTANCE_ID_KEY
from .util.constant import INSTANCE_ID_PLACEHOLDER
from .util.constant import META_ENDPOINT, TSG_ENDPOINT, INGESTION_ENDPOINT, STORAGE_GW_MT_ENDPOINT_PATTERN, STORAGE_GW_ST_ENDPOINT_PATTERN
from .util.constant import IS_INTERNAL, IS_MT
from .util.retryrequests import RetryRequests
from .util.series import Series
from .util.timeutil import get_time_offset, str_to_dt, dt_to_str, get_time_list
from telemetry import log
import pandas as pd
REQUEST_TIMEOUT_SECONDS = 120
def get_field_idx(fields, target):
for idx, field in enumerate(fields):
if field == '__FIELD__.' + target:
return idx
raise Exception("Not found field {} in {}".format(target, ','.join(fields)))
class TSANAClient(object):
def __init__(self, username=None, password=None, retrycount=3, retryinterval=1000):
self.username = username
self.password = password
self.retryrequests = RetryRequests(retrycount, retryinterval)
self.crt = os.environ['MA_CERT_CRT_PATH'] if 'MA_CERT_CRT_PATH' in os.environ else ''
self.key = os.environ['MA_CERT_KEY_PATH'] if 'MA_CERT_KEY_PATH' in os.environ else ''
def post(self, api_endpoint, instance_id, api_key, user, path, data):
if not api_endpoint.startswith('http'):
api_endpoint = "https://" + api_endpoint
url = api_endpoint.rstrip('/') + path
headers = {
"x-api-key": api_key,
"x-user": user,
"X-Base-Address": api_endpoint,
"Instance-Id": instance_id,
"X-Action-Source": "Plugin",
"Content-Type": "application/json"
}
if self.username and self.password:
auth = (self.username, self.password)
else:
auth = None
if self.crt and self.key:
cert = (self.crt, self.key)
else:
cert = None
try:
r = self.retryrequests.post(url=url, headers=headers, auth=auth, data=json.dumps(data),
timeout=REQUEST_TIMEOUT_SECONDS, cert=cert, verify=False)
if r.status_code != 204:
try:
return r.json()
except ValueError as e:
return "ValueError: " + str(e) + " Content: " + r.content.decode('UTF-8')
except Exception as e:
raise Exception('TSANA service api "{}" failed, request:{}, {}'.format(url, json.dumps(data), str(e)))
def put(self, api_endpoint, instance_id, api_key, user, path, data):
if not api_endpoint.startswith('http'):
api_endpoint = "https://" + api_endpoint
url = api_endpoint.rstrip('/') + path
headers = {
"x-api-key": api_key,
"x-user": user,
"X-Base-Address": api_endpoint,
"Instance-Id": instance_id,
"X-Action-Source": "Plugin",
"Content-Type": "application/json"
}
if self.username and self.password:
auth = (self.username, self.password)
else:
auth = None
if self.crt and self.key:
cert = (self.crt, self.key)
else:
cert = None
try:
r = self.retryrequests.put(url=url, headers=headers, auth=auth, data=json.dumps(data),
timeout=REQUEST_TIMEOUT_SECONDS, cert=cert, verify=False)
if r.status_code != 204:
try:
return r.json()
except ValueError:
return r.content
except Exception as e:
raise Exception('TSANA service api "{}" failed, request:{}, {}'.format(url, json.dumps(data), str(e)))
def get(self, api_endpoint, instance_id, api_key, user, path):
if not api_endpoint.startswith('http'):
api_endpoint = "https://" + api_endpoint
url = api_endpoint.rstrip('/') + path
headers = {
"x-api-key": api_key,
"x-user": user,
"X-Base-Address": api_endpoint,
"Instance-Id": instance_id,
"X-Action-Source": "Plugin",
"Content-Type": "application/json"
}
if self.username and self.password:
auth = (self.username, self.password)
else:
auth = None
if self.crt and self.key:
cert = (self.crt, self.key)
else:
cert = None
try:
r = self.retryrequests.get(url=url, headers=headers, auth=auth, timeout=REQUEST_TIMEOUT_SECONDS,
cert=cert, verify=False)
try:
return r.json()
except ValueError:
return r.content
except Exception as e:
raise Exception('TSANA service api "{}" failed, {}'.format(url, str(e)))
################ GATEWAY API ################
# Query time series from TSANA
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# series_sets: Array of series set
# start_time: inclusive, the first timestamp to be query
# end_time: exclusive
# top: top sereis number per series set
# Return:
# An array of Series object
# Series include
# series_id: UUID
# dim: dimension dict for this series
# fields: 1-d string array, ['time', '__VAL__', '__FIELD__.ExpectedValue', '__FIELD__.IsAnomaly', '__FIELD__.PredictionValue', '__FIELD__.PredictionModelScore', '__FIELD__.IsSuppress', '__FIELD__.Period', '__FIELD__.CostPoint', '__FIELD__.Mean', '__FIELD__.STD', '__FIELD__.TrendChangeAnnotate', '__FIELD__.TrendChang...tateIgnore', '__FIELD__.AnomalyAnnotate', ...]
# value: 2-d array, [['2020-10-12T17:55:00Z', 1.0, None, None, None, None, None, None, None, None, None, None, None, None, ...]]
def get_timeseries_gw(self, parameters, series_sets, start_time, end_time, top=20):
if start_time > end_time:
raise Exception('start_time should be less than or equal to end_time')
instance_id = parameters[INSTANCE_ID_KEY]
if IS_INTERNAL:
if IS_MT:
storage_gw_endpoint = STORAGE_GW_MT_ENDPOINT_PATTERN
storage_gw_endpoint = storage_gw_endpoint.replace(INSTANCE_ID_PLACEHOLDER, instance_id)
else:
storage_gw_endpoint = STORAGE_GW_ST_ENDPOINT_PATTERN
meta_endpoint = META_ENDPOINT
else:
storage_gw_endpoint = parameters['apiEndpointV2'] + STORAGE_GW_API
meta_endpoint = parameters['apiEndpointV2'] + META_API
end_str = dt_to_str(end_time)
start_str = dt_to_str(start_time)
multi_series_data = []
total_point_num = 0
loop = 0
# Query each series's tag
for data in series_sets:
dim = {}
if 'dimensionFilter' not in data:
data['dimensionFilter'] = data['filters']
for dimkey in data['dimensionFilter']:
dim[dimkey] = [data['dimensionFilter'][dimkey]]
skip = 0
count = 0
para = dict(metricId=data['metricId'], dimensionFilter=dim, activeSince=start_str)
gran_info = (data['metricMeta']['granularityName'], data['metricMeta']['granularityAmount'])
data_point_num_per_series = len(get_time_list(start_time, end_time, gran_info))
series_limit_per_call = min(max(100000 // data_point_num_per_series, 1), 1000)
while True:
# Max data points per call is 100000
ret = self.post(meta_endpoint, instance_id, parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/metrics/' + data['metricId'] + '/series/query?$skip={}&$maxpagesize={}'.format(skip, series_limit_per_call), data=para)
if len(ret['value']) == 0:
break
series_list = []
for s in ret['value']:
series = {}
series['metricsName'] = s['metricId']
series['begin'] = start_str
series['end'] = end_str
series['tagSet'] = s['dimension']
series['returnSeriesId'] = True
series_list.append(series)
if len(series_list) > 0:
ret_data = self.post(storage_gw_endpoint, instance_id, parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/api/query_series', data=series_list)
sub_multi_series_data = []
for factor in ret_data:
if len(factor['values']) <= 0:
continue
sub_multi_series_data.append(Series(factor['name'], factor['seriesId'], factor['tags'], factor['columns'], factor['values']))
total_point_num += len(factor['values'])
log.count("get_data_series_num", 1, endpoint=parameters['apiEndpoint'], group_id=parameters['groupId'], group_name=parameters['groupName'].replace(' ', '_'), instance_id=parameters['instance']['instanceId'], instance_name=parameters['instance']['instanceName'].replace(' ', '_'))
log.count("get_data_point_num", len(factor['values']), endpoint=parameters['apiEndpoint'], group_id=parameters['groupId'], group_name=parameters['groupName'].replace(' ', '_'), instance_id=parameters['instance']['instanceId'], instance_name=parameters['instance']['instanceName'].replace(' ', '_'))
multi_series_data.extend(sub_multi_series_data)
count += len(sub_multi_series_data)
if count >= top:
log.warning(f"Current series num {count} for series set {json.dumps(data)} has reached limit {top}!")
break
skip = skip + len(series_list)
loop = loop + 1
if loop % 10 == 0:
log.info(f"Loop times: {loop}, total series num: {len(multi_series_data)}, total points num {total_point_num}.")
# Max data points limit is 4000000, about 400Mb
if total_point_num >= 4000000:
log.info("Reach total point number limit 4000000.")
break
if not len(multi_series_data):
raise Exception("Series is empty")
log.info(f"Get time series completed! Total series num: {len(multi_series_data)}, total points num {total_point_num}.")
return multi_series_data
################ META API ################
# To get the meta of a specific datafeed from TSANA
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# datafeed_id: an UUID string
# Return:
# the meta of the specified datafeed, or None if there is something wrong.
def get_datafeed_meta(self, parameters, datafeed_id):
return self.get(META_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + META_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/datafeeds/' + datafeed_id)
# To get the meta of a specific metric from TSANA
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# metric_id: an UUID string
# Return:
# the meta of the specified metric, or None if there is something wrong.
def get_metric_meta(self, parameters, metric_id):
return self.get(META_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + META_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/metrics/' + metric_id + '/meta')
# To get the dimension values of a specific dimension of a metric from TSANA
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# metric_id: an UUID string
# dimension_name: dimension name for specific metric with metric_id
# Return:
# the dimension values of a specific dimension of a metric, or None if there is something wrong.
def get_dimesion_values(self, parameters, metric_id, dimension_name):
dims = self.get(META_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + META_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/metrics/' + metric_id + '/dimensions')
if 'dimensions' in dims and dimension_name in dims['dimensions']:
return dims['dimensions'][dimension_name]
else:
return None
# Query time series from TSANA
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# series_sets: Array of series set
# start_time: inclusive, the first timestamp to be query
# end_time: exclusive
# top: top sereis number per series set
# Return:
# An array of Series object
def get_timeseries(self, parameters, series_sets, start_time, end_time, top=20):
if start_time > end_time:
raise Exception('start_time should be less than or equal to end_time')
end_str = dt_to_str(end_time)
start_str = dt_to_str(start_time)
multi_series_data = []
total_point_num = 0
loop = 0
# Query each series's tag
for data in series_sets:
dim = {}
if 'dimensionFilter' not in data:
data['dimensionFilter'] = data['filters']
for dimkey in data['dimensionFilter']:
dim[dimkey] = [data['dimensionFilter'][dimkey]]
skip = 0
count = 0
para = dict(metricId=data['metricId'], dimensionFilter=dim, activeSince=start_str)
gran_info = (data['metricMeta']['granularityName'], data['metricMeta']['granularityAmount'])
data_point_num_per_series = len(get_time_list(start_time, end_time, gran_info))
series_limit_per_call = min(max(100000 // data_point_num_per_series, 1), 1000)
while True:
# Max data points per call is 100000
ret = self.post(META_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + META_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/metrics/' + data['metricId'] + '/series/query?$skip={}&$maxpagesize={}'.format(skip, series_limit_per_call), data=para)
if len(ret['value']) == 0:
break
series_list = []
for s in ret['value']:
s['startTime'] = start_str
s['endTime'] = end_str
s['returnSeriesId'] = True
series_list.append(s)
if len(series_list) > 0:
ret_data = self.post(META_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + META_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/metrics/series/data', data=dict(value=series_list))
sub_multi_series_data = []
for factor in ret_data['value']:
if len(factor['values']) <= 0:
continue
sub_multi_series_data.append(Series(factor['id']['metricId'], factor['id']['seriesId'], factor['id']['dimension'], factor['fields'], factor['values']))
total_point_num += len(factor['values'])
multi_series_data.extend(sub_multi_series_data)
count += len(sub_multi_series_data)
if count >= top:
log.warning(f"Current series num {count} for series set {json.dumps(data)} has reached limit {top}!")
break
skip = skip + len(series_list)
loop = loop + 1
if loop % 10 == 0:
log.info(f"Loop times: {loop}, total series num: {len(multi_series_data)}, total points num {total_point_num}.")
# Max data points limit is 4000000, about 400Mb
if total_point_num >= 4000000:
log.info("Reach total point number limit 4000000.")
break
if not len(multi_series_data):
raise Exception("Series is empty")
log.info(f"Get time series completed! Total series num: {len(multi_series_data)}, total points num {total_point_num}.")
return multi_series_data
# Get ranked dimensions
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# metric_id: uuid for metric
# dimensions: included dimensions
# start_time: inclusive, the first timestamp to be query
# top: max count for returned results
# skip: offset
# Return:
# ranked series dimensions
def rank_series(self, parameters, metric_id, dimensions, start_time, top=10, skip=0):
url = f'/metrics/{metric_id}/rank-series'
para = dict(dimensions=dimensions, count=top, startTime=start_time, skip=skip)
return self.post(META_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + META_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, url, data=para)
################ INGESTION API ################
# Query data from original data source for datafeed which does not need ingestion
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# datafeed_id: datafeed id for source data
# start_time: inclusive, the first timestamp to be query
# end_time: exclusive
# Return:
# LoadSchemaResponse:
# SeriesNumber: long number of rows
# Schemas:columns, dict<string, string> which includes <column_name, column_type>
# PreviewData: list<dict<String, Object>> which includes <column_name, column_value>
# Err: error code
def get_source_data(self, parameters, datafeed_id, start_time, end_time):
if start_time > end_time:
raise Exception('start_time should be less than or equal to end_time')
body = {
"datafeedId": datafeed_id,
"startTime": dt_to_str(start_time),
"endTime": dt_to_str(end_time)
}
ret_data = self.post(INGESTION_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + INGESTION_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/loadSourceData', body)
return ret_data
# Save a inference result back to TSANA
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# metric_id: an UUID string
# dimensions: a dict includes dimension name and value
# timestamps: an array of timestamps
# values: an array of inference result values
# fields: an array of field names
# field_values: an 2-d array of field values corresponding with fields
# push_data_type: 'DatabaseOnly'/'AnomalyDetection', default is 'AnomalyDetection'
# Return:
# result: STATE_SUCCESS / STATE_FAIL
# message: description for the result
def save_data_points(self, parameters, metric_id, dimensions, timestamps, values, fields=None, field_values=None, push_data_type='AnomalyDetection'):
try:
if len(values) <= 0:
raise Exception('empty values')
body = {
"metricId": metric_id,
"dimensions": dimensions,
"timestamps": timestamps,
"values": values,
"pushDataType": push_data_type
}
if fields and len(fields) > 0 and field_values and len(field_values) > 0:
body['fields'] = fields
body['fieldValues'] = field_values
self.post(INGESTION_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + INGESTION_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/pushData', body)
return STATUS_SUCCESS, ''
except Exception as e:
return STATUS_FAIL, str(e)
################ TSG API ################
# To get the detailed info of a specific group from TSANA
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# Return:
# detailed info of the specified group
def get_group_detail(self, parameters):
return self.get(TSG_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + TSG_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/timeSeriesGroups/' + parameters['groupId'])
# Save a training result back to TSANA
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# instance: instance object, which is copied from the inference request, or from the entity
# model_id: model id
# model_state: model state(TRAINING,READY,FAILED,DELETED)
# message: detail message
# Return:
# result: STATE_SUCCESS / STATE_FAIL
# message: description for the result
def save_training_result(self, parameters, model_id, model_state:str, message:str):
try:
body = {
'modelId': model_id,
'state': model_state,
'message': message
}
self.put(TSG_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + TSG_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/timeSeriesGroups/' + parameters['groupId'] + '/appInstances/' + parameters['instance']['instanceId'] + '/modelKey', body)
return STATUS_SUCCESS, ''
except Exception as e:
return STATUS_FAIL, str(e)
# Save a inference result back to TSANA
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# instance: instance object, which is copied from the inference request, or from the entity
# result: an array of inference result.
# Return:
# result: STATE_SUCCESS / STATE_FAIL
# message: description for the result
def save_inference_result(self, parameters, result, batch_size=1000):
try:
if len(result) <= 0:
return STATUS_SUCCESS, ''
for batch_index in range(math.ceil(len(result) / batch_size)):
body = {
'groupId': parameters['groupId'],
'instanceId': parameters['instance']['instanceId'],
'results': []
}
batch_start = batch_index * batch_size
for step in range(min(batch_size, len(result) - batch_start)):
item = result[batch_start + step]
item['timestamp'] = dt_to_str(str_to_dt(item['timestamp']))
body['results'].append({
'params': parameters['instance']['params'],
'timestamp': item['timestamp'],
'result': item['value'],
'status': item['status']
})
self.post(TSG_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + TSG_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/timeSeriesGroups/' + parameters['groupId'] + '/appInstances/' + parameters['instance']['instanceId'] + '/saveResult', body)
return STATUS_SUCCESS, ''
except Exception as e:
return STATUS_FAIL, str(e)
# Save training task status back to TSANA
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# instance: instance object, which is copied from the inference request, or from the entity
# status: ModelState:Pending, Training, Ready, Deleted, Failed
# last_error: last error message
# Return:
# result: STATE_SUCCESS / STATE_FAIL
# message: description for the result
def save_training_status(self, task_id, parameters, status, last_error=None):
try:
context = {}
context['groupId'] = parameters['groupId']
context['groupName'] = parameters['groupName']
context['instanceId'] = parameters['instance']['instanceId']
context['instanceName'] = parameters['instance']['instanceName']
context['startTime'] = parameters['startTime']
context['endTime'] = parameters['endTime']
body = {
'taskId': task_id,
'operation': 'Train',
'context': context,
'status': status,
'lastError': str(last_error)[:1000] if last_error is not None else ''
}
self.post(TSG_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + TSG_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/timeSeriesGroups/' + parameters['groupId'] + '/appInstances/' + parameters['instance']['instanceId'] + '/ops', body)
return STATUS_SUCCESS, ''
except Exception as e:
log.warning(f"Save training status failed. taskId: {task_id}, error: {str(e)}")
return STATUS_FAIL, str(e)
# Save inference task status back to TSANA
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# instance: instance object, which is copied from the inference request, or from the entity
# status: InferenceState:Pending, Running, Ready, Failed
# last_error: last error message
# Return:
# result: STATE_SUCCESS / STATE_FAIL
# message: description for the result
def save_inference_status(self, task_id, parameters, status, last_error=None):
try:
if not parameters['manually']:
return STATUS_SUCCESS, ''
context = {}
context['groupId'] = parameters['groupId']
context['groupName'] = parameters['groupName']
context['instanceId'] = parameters['instance']['instanceId']
context['instanceName'] = parameters['instance']['instanceName']
context['startTime'] = parameters['startTime']
context['endTime'] = parameters['endTime']
body = {
'taskId': task_id,
'operation': 'Inference',
'context': context,
'status': status,
'lastError': str(last_error)[:1000] if last_error is not None else ''
}
self.post(TSG_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + TSG_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/timeSeriesGroups/' + parameters['groupId'] + '/appInstances/' + parameters['instance']['instanceId'] + '/ops', body)
return STATUS_SUCCESS, ''
except Exception as e:
log.warning(f"Save inference status failed. taskId: {task_id}, error: {str(e)}")
return STATUS_FAIL, str(e)
# Get inference result from TSANA
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# instance: instance object, which is copied from the inference request, or from the entity
# Return:
# result: STATE_SUCCESS / STATE_FAIL
# message: description for the result
# value: inference result array
def get_inference_result(self, parameters):
try:
ret = self.get(TSG_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + TSG_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/timeSeriesGroups/'
+ parameters['groupId']
+ '/appInstances/'
+ parameters['instance']['instanceId']
+ '/history?startTime='
+ parameters['startTime']
+ '&endTime='
+ parameters['endTime'])
return STATUS_SUCCESS, '', ret
except Exception as e:
return STATUS_FAIL, str(e), None
# Push alert in general
# Parameters:
# parameters: a dict object which should includes
# apiEndpoint: api endpoint for specific user
# apiKey: api key for specific user
# groupId: groupId in TSANA, which is copied from inference request, or from the entity
# alert_type: alert type
# message: alert message
# Return:
# result: STATE_SUCCESS / STATE_FAIL
# message: description for the result
def push_alert(self, parameters, alert_type, message):
try:
url = '/timeSeriesGroups/alert'
para = dict(alertType=alert_type, message=message)
self.post(TSG_ENDPOINT if IS_INTERNAL else parameters['apiEndpointV2'] + TSG_API, parameters[INSTANCE_ID_KEY], parameters['apiKey'], parameters['groupId'] + USER_ADDR, url, data=para)
return STATUS_SUCCESS, ''
except Exception as e:
return STATUS_FAIL, repr(e)