in azure_functions_worker/dispatcher.py [0:0]
def __poll_grpc(self):
options = []
if self._grpc_max_msg_len:
options.append(('grpc.max_receive_message_length',
self._grpc_max_msg_len))
options.append(('grpc.max_send_message_length',
self._grpc_max_msg_len))
channel = grpc.insecure_channel(
f'{self._host}:{self._port}', options)
try:
grpc.channel_ready_future(channel).result(
timeout=self._grpc_connect_timeout)
except Exception as ex:
self._loop.call_soon_threadsafe(
self._grpc_connected_fut.set_exception, ex)
return
else:
self._loop.call_soon_threadsafe(
self._grpc_connected_fut.set_result, True)
stub = protos.FunctionRpcStub(channel)
def gen(resp_queue):
while True:
msg = resp_queue.get()
if msg is self._GRPC_STOP_RESPONSE:
grpc_req_stream.cancel()
return
yield msg
grpc_req_stream = stub.EventStream(gen(self._grpc_resp_queue))
try:
for req in grpc_req_stream:
self._loop.call_soon_threadsafe(
self._loop.create_task, self._dispatch_grpc_request(req))
except Exception as ex:
if ex is grpc_req_stream:
# Yes, this is how grpc_req_stream iterator exits.
return
error_logger.exception(
'unhandled error in gRPC thread. Exception: {0}'.format(
format_exception(ex)))
raise