in decisionai_plugin/common/tsanaclient.py [0:0]
def get_timeseries_gw(self, parameters, series_sets, start_time, end_time, top=20):
if start_time > end_time:
raise Exception('start_time should be less than or equal to end_time')
instance_id = parameters[INSTANCE_ID_KEY]
if IS_INTERNAL:
if IS_MT:
storage_gw_endpoint = STORAGE_GW_MT_ENDPOINT_PATTERN
storage_gw_endpoint = storage_gw_endpoint.replace(INSTANCE_ID_PLACEHOLDER, instance_id)
else:
storage_gw_endpoint = STORAGE_GW_ST_ENDPOINT_PATTERN
meta_endpoint = META_ENDPOINT
else:
storage_gw_endpoint = parameters['apiEndpointV2'] + STORAGE_GW_API
meta_endpoint = parameters['apiEndpointV2'] + META_API
end_str = dt_to_str(end_time)
start_str = dt_to_str(start_time)
multi_series_data = []
total_point_num = 0
loop = 0
# Query each series's tag
for data in series_sets:
dim = {}
if 'dimensionFilter' not in data:
data['dimensionFilter'] = data['filters']
for dimkey in data['dimensionFilter']:
dim[dimkey] = [data['dimensionFilter'][dimkey]]
skip = 0
count = 0
para = dict(metricId=data['metricId'], dimensionFilter=dim, activeSince=start_str)
gran_info = (data['metricMeta']['granularityName'], data['metricMeta']['granularityAmount'])
data_point_num_per_series = len(get_time_list(start_time, end_time, gran_info))
series_limit_per_call = min(max(100000 // data_point_num_per_series, 1), 1000)
while True:
# Max data points per call is 100000
ret = self.post(meta_endpoint, instance_id, parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/metrics/' + data['metricId'] + '/series/query?$skip={}&$maxpagesize={}'.format(skip, series_limit_per_call), data=para)
if len(ret['value']) == 0:
break
series_list = []
for s in ret['value']:
series = {}
series['metricsName'] = s['metricId']
series['begin'] = start_str
series['end'] = end_str
series['tagSet'] = s['dimension']
series['returnSeriesId'] = True
series_list.append(series)
if len(series_list) > 0:
ret_data = self.post(storage_gw_endpoint, instance_id, parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/api/query_series', data=series_list)
sub_multi_series_data = []
for factor in ret_data:
if len(factor['values']) <= 0:
continue
sub_multi_series_data.append(Series(factor['name'], factor['seriesId'], factor['tags'], factor['columns'], factor['values']))
total_point_num += len(factor['values'])
log.count("get_data_series_num", 1, endpoint=parameters['apiEndpoint'], group_id=parameters['groupId'], group_name=parameters['groupName'].replace(' ', '_'), instance_id=parameters['instance']['instanceId'], instance_name=parameters['instance']['instanceName'].replace(' ', '_'))
log.count("get_data_point_num", len(factor['values']), endpoint=parameters['apiEndpoint'], group_id=parameters['groupId'], group_name=parameters['groupName'].replace(' ', '_'), instance_id=parameters['instance']['instanceId'], instance_name=parameters['instance']['instanceName'].replace(' ', '_'))
multi_series_data.extend(sub_multi_series_data)
count += len(sub_multi_series_data)
if count >= top:
log.warning(f"Current series num {count} for series set {json.dumps(data)} has reached limit {top}!")
break
skip = skip + len(series_list)
loop = loop + 1
if loop % 10 == 0:
log.info(f"Loop times: {loop}, total series num: {len(multi_series_data)}, total points num {total_point_num}.")
# Max data points limit is 4000000, about 400Mb
if total_point_num >= 4000000:
log.info("Reach total point number limit 4000000.")
break
if not len(multi_series_data):
raise Exception("Series is empty")
log.info(f"Get time series completed! Total series num: {len(multi_series_data)}, total points num {total_point_num}.")
return multi_series_data