def get_timeseries_gw()

in decisionai_plugin/common/tsanaclient.py [0:0]


    def get_timeseries_gw(self, parameters, series_sets, start_time, end_time, top=20):

        if start_time > end_time:
            raise Exception('start_time should be less than or equal to end_time')
        
        instance_id = parameters[INSTANCE_ID_KEY]
        if IS_INTERNAL:
            if IS_MT:
                storage_gw_endpoint = STORAGE_GW_MT_ENDPOINT_PATTERN
                storage_gw_endpoint = storage_gw_endpoint.replace(INSTANCE_ID_PLACEHOLDER, instance_id)
            else:
                storage_gw_endpoint = STORAGE_GW_ST_ENDPOINT_PATTERN
            meta_endpoint = META_ENDPOINT
        else:
            storage_gw_endpoint = parameters['apiEndpointV2'] + STORAGE_GW_API
            meta_endpoint = parameters['apiEndpointV2'] + META_API

        end_str = dt_to_str(end_time)
        start_str = dt_to_str(start_time)

        multi_series_data = []
        total_point_num = 0
        loop = 0

        # Query each series's tag
        for data in series_sets:
            dim = {}
            if 'dimensionFilter' not in data:
                data['dimensionFilter'] = data['filters']

            for dimkey in data['dimensionFilter']:
                dim[dimkey] = [data['dimensionFilter'][dimkey]]

            skip = 0
            count = 0
            para = dict(metricId=data['metricId'], dimensionFilter=dim, activeSince=start_str)
            gran_info = (data['metricMeta']['granularityName'], data['metricMeta']['granularityAmount'])
            data_point_num_per_series = len(get_time_list(start_time, end_time, gran_info))
            series_limit_per_call = min(max(100000 // data_point_num_per_series, 1), 1000)

            while True:
                # Max data points per call is 100000
                ret = self.post(meta_endpoint, instance_id, parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/metrics/' + data['metricId'] + '/series/query?$skip={}&$maxpagesize={}'.format(skip, series_limit_per_call), data=para)
                if len(ret['value']) == 0:
                    break

                series_list = []
                for s in ret['value']:
                    series = {}
                    series['metricsName'] = s['metricId']
                    series['begin'] = start_str
                    series['end'] = end_str
                    series['tagSet'] = s['dimension']
                    series['returnSeriesId'] = True
                    series_list.append(series)

                if len(series_list) > 0:                    
                    ret_data = self.post(storage_gw_endpoint, instance_id, parameters['apiKey'], parameters['groupId'] + USER_ADDR, '/api/query_series', data=series_list)
                    sub_multi_series_data = []
                    for factor in ret_data:
                        if len(factor['values']) <= 0:
                            continue

                        sub_multi_series_data.append(Series(factor['name'], factor['seriesId'], factor['tags'], factor['columns'], factor['values']))
                        total_point_num += len(factor['values'])
                        log.count("get_data_series_num", 1, endpoint=parameters['apiEndpoint'], group_id=parameters['groupId'], group_name=parameters['groupName'].replace(' ', '_'), instance_id=parameters['instance']['instanceId'], instance_name=parameters['instance']['instanceName'].replace(' ', '_'))
                        log.count("get_data_point_num", len(factor['values']), endpoint=parameters['apiEndpoint'], group_id=parameters['groupId'], group_name=parameters['groupName'].replace(' ', '_'), instance_id=parameters['instance']['instanceId'], instance_name=parameters['instance']['instanceName'].replace(' ', '_'))
                    
                    multi_series_data.extend(sub_multi_series_data)
                    count += len(sub_multi_series_data)

                    if count >= top:
                        log.warning(f"Current series num {count} for series set {json.dumps(data)} has reached limit {top}!")
                        break
                
                skip = skip + len(series_list)
                loop = loop + 1
                if loop % 10 == 0:
                    log.info(f"Loop times: {loop}, total series num: {len(multi_series_data)}, total points num {total_point_num}.")

            # Max data points limit is 4000000, about 400Mb
            if total_point_num >= 4000000:
                log.info("Reach total point number limit 4000000.")
                break

        if not len(multi_series_data):
            raise Exception("Series is empty")
        
        log.info(f"Get time series completed! Total series num: {len(multi_series_data)}, total points num {total_point_num}.")
        return multi_series_data