in processors/vertexai.py [0:0]
def process(self, output_var='vertexai'):
if 'location' not in self.config:
raise NotConfiguredException('No location specified specified.')
if 'mode' not in self.config:
raise NotConfiguredException('No mode specified.')
if 'request' not in self.config:
raise NotConfiguredException('No request specified.')
credentials, credentials_project_id = google.auth.default()
project = self.config[
'project'] if 'project' in self.config else credentials_project_id
if not project:
project = credentials.quota_project_id
mode = self._jinja_expand_string(self.config['mode'], 'mode')
location = self._jinja_expand_string(self.config['location'],
'location')
collection = self._jinja_expand_string(
self.config['collection'], 'collection'
) if 'collection' in self.config else 'default_collection'
engine_id = None
datastore_id = None
if 'engineId' in self.config:
engine_id = self._jinja_expand_string(self.config['engineId'],
'engine_id')
else:
datastore_id = self._jinja_expand_string(self.config['datastoreId'],
'datastore_id')
serving_config = self._jinja_expand_string(
self.config['servingConfig'], 'serving_config'
) if 'servingConfig' in self.config else 'default_config'
api_version = self._jinja_expand_string(
self.config['apiVersion']) if 'apiVersion' in self.config else 'v1'
method = self._jinja_expand_string(
self.config['method'],
'method') if 'method' in self.config else 'search'
if mode == 'search':
if engine_id:
api_path = 'https://%s-discoveryengine.googleapis.com/%s/projects/%s/locations/%s/collections/%s/engineId/%s/servingConfigs/%s:%s' % (
api_version, location, project, location, collection,
engine_id, serving_config, method)
else:
api_path = 'https://%s-discoveryengine.googleapis.com/%s/projects/%s/locations/%s/collections/%s/dataStores/%s/servingConfigs/%s:%s' % (
location, api_version, project, location, collection,
datastore_id, serving_config, method)
return_errors = False
if 'returnErrors' in self.config:
return_errors = self._jinja_expand_bool(
self.config['returnErrors'], 'return_errors')
request = self._jinja_expand_dict_all_expr(self.config['request'],
'request')
headers = {
'User-Agent': get_user_agent(),
'Content-type': 'application/json; charset=utf-8',
}
self.logger.debug('Calling Vertex AI %s:%s' % (mode, method),
extra={
'request_body': request,
'api_url': api_path
})
request_body = json.dumps(request)
authed_session = AuthorizedSession(credentials)
response = authed_session.post(api_path,
data=request_body,
headers=headers)
try:
response.raise_for_status()
except Exception as e:
if return_errors:
response_json = response.json()
for err in response_json:
if isinstance(
err, dict
) and 'error' in err and 'message' in err['error']:
return {
output_var: {
'error': err['error']['message']
}
}
self.logger.error('Error calling %s: %s' % (e.request.url, e),
extra={'response': e.response.text})
raise e
response_json = response.json()
return {
output_var: response_json,
}