def process()

in processors/vertexai.py [0:0]


    def process(self, output_var='vertexai'):
        if 'location' not in self.config:
            raise NotConfiguredException('No location specified specified.')
        if 'mode' not in self.config:
            raise NotConfiguredException('No mode specified.')
        if 'request' not in self.config:
            raise NotConfiguredException('No request specified.')
        credentials, credentials_project_id = google.auth.default()
        project = self.config[
            'project'] if 'project' in self.config else credentials_project_id
        if not project:
            project = credentials.quota_project_id

        mode = self._jinja_expand_string(self.config['mode'], 'mode')
        location = self._jinja_expand_string(self.config['location'],
                                             'location')
        collection = self._jinja_expand_string(
            self.config['collection'], 'collection'
        ) if 'collection' in self.config else 'default_collection'
        engine_id = None
        datastore_id = None
        if 'engineId' in self.config:
            engine_id = self._jinja_expand_string(self.config['engineId'],
                                                  'engine_id')
        else:
            datastore_id = self._jinja_expand_string(self.config['datastoreId'],
                                                     'datastore_id')
        serving_config = self._jinja_expand_string(
            self.config['servingConfig'], 'serving_config'
        ) if 'servingConfig' in self.config else 'default_config'

        api_version = self._jinja_expand_string(
            self.config['apiVersion']) if 'apiVersion' in self.config else 'v1'

        method = self._jinja_expand_string(
            self.config['method'],
            'method') if 'method' in self.config else 'search'
        if mode == 'search':
            if engine_id:
                api_path = 'https://%s-discoveryengine.googleapis.com/%s/projects/%s/locations/%s/collections/%s/engineId/%s/servingConfigs/%s:%s' % (
                    api_version, location, project, location, collection,
                    engine_id, serving_config, method)
            else:
                api_path = 'https://%s-discoveryengine.googleapis.com/%s/projects/%s/locations/%s/collections/%s/dataStores/%s/servingConfigs/%s:%s' % (
                    location, api_version, project, location, collection,
                    datastore_id, serving_config, method)

            return_errors = False
            if 'returnErrors' in self.config:
                return_errors = self._jinja_expand_bool(
                    self.config['returnErrors'], 'return_errors')

            request = self._jinja_expand_dict_all_expr(self.config['request'],
                                                       'request')
            headers = {
                'User-Agent': get_user_agent(),
                'Content-type': 'application/json; charset=utf-8',
            }

            self.logger.debug('Calling Vertex AI %s:%s' % (mode, method),
                              extra={
                                  'request_body': request,
                                  'api_url': api_path
                              })

            request_body = json.dumps(request)
            authed_session = AuthorizedSession(credentials)
            response = authed_session.post(api_path,
                                           data=request_body,
                                           headers=headers)
            try:
                response.raise_for_status()
            except Exception as e:
                if return_errors:
                    response_json = response.json()
                    for err in response_json:
                        if isinstance(
                                err, dict
                        ) and 'error' in err and 'message' in err['error']:
                            return {
                                output_var: {
                                    'error': err['error']['message']
                                }
                            }
                self.logger.error('Error calling %s: %s' % (e.request.url, e),
                                  extra={'response': e.response.text})
                raise e

            response_json = response.json()
        return {
            output_var: response_json,
        }