in APIs/1.0/base-py/ai4e_service.py [0:0]
def api_func(self, is_async, api_path, methods, request_processing_function, maximum_concurrent_requests, content_types = None, content_max_length = None, trace_name = None, *args, **kwargs):
def decorator_api_func(func):
if not api_path in self.func_properties:
self.func_properties[api_path] = {MAX_REQUESTS_KEY_NAME: maximum_concurrent_requests, CONTENT_TYPE_KEY_NAME: content_types, CONTENT_MAX_KEY_NAME: content_max_length}
self.func_request_counts[api_path] = 0
@wraps(func)
def api(*args, **kwargs):
internal_args = {"func": func, "api_path": api_path}
if request_processing_function:
return_values = request_processing_function(request)
combined_kwargs = {**internal_args, **kwargs, **return_values}
else:
combined_kwargs = {**internal_args, **kwargs}
if is_async:
task_info = self.api_task_manager.AddTask(request)
taskId = str(task_info['TaskId'])
combined_kwargs["taskId"] = taskId
self.wrap_async_endpoint(trace_name, *args, **combined_kwargs)
return 'TaskId: ' + taskId
else:
return self.wrap_sync_endpoint(trace_name, *args, **combined_kwargs)
api.__name__ = 'api_' + api_path.replace('/', '')
print("Adding url rule: " + self.api_prefix + api_path + ", " + api.__name__)
self.app.add_url_rule(self.api_prefix + api_path, view_func = api, methods=methods, provide_automatic_options=True)
return decorator_api_func