runnable-hub/python/runnable_workers/apiWorker/worker.py (42 lines of code) (raw):
import json
from runnable_hub import RunnableWorker, RunnableContext, RunnableStatus, RunnableOutputLoads
from .request.apiRequest import ApiRequest, ApiHttpMethod
from .response import ApiResponse
import aiohttp
class Worker(RunnableWorker):
runnableCode = "API"
Request = ApiRequest
Response = ApiResponse
def __init__(self):
pass
@staticmethod
def outputs(raw, loadType: RunnableOutputLoads):
if loadType == RunnableOutputLoads.JSON:
return json.loads(raw)
else:
return raw
async def onNext(self, context: RunnableContext[ApiRequest, ApiResponse]) -> RunnableContext:
if context.request.method == ApiHttpMethod.GET:
async with aiohttp.ClientSession() as session:
async with session.get(context.request.url,
headers=context.request.headers, params=context.request.params) as response:
result = await response.text()
context.response = ApiResponse(
result=result, outputs=self.outputs(result, context.request.outputLoads),
statusCode=response.status)
context.status = RunnableStatus.SUCCESS
return context
elif context.request.method == ApiHttpMethod.POST:
async with aiohttp.ClientSession() as session:
async with session.post(context.request.url,
headers=context.request.headers, params=context.request.params, json=context.request.payloads) as response:
result = await response.text()
context.response = ApiResponse(
result=result, outputs=self.outputs(result, context.request.outputLoads),
statusCode=response.status)
context.status = RunnableStatus.SUCCESS
return context
else:
context.status = RunnableStatus.ERROR
context.errorMessage = "method not support"
return context