def execute()

in zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py [0:0]


    def execute(self, request, context):
        # print("execute code:\n")
        # print(request.code.encode('utf-8'))
        sys.stdout.flush()
        stream_reply_queue = queue.Queue(maxsize = 30)
        payload_reply = []
        def _output_hook(msg):
            # print("msg: " + str(msg))
            msg_type = msg['header']['msg_type']
            content = msg['content']
            # print("******************")
            # print(msg)
            outStatus, outType, output = kernel_pb2.SUCCESS, None, None
            # prepare the reply
            if msg_type == 'stream':
                outType = kernel_pb2.TEXT
                output = content['text']
            elif msg_type in ('display_data', 'execute_result'):
                # print(content['data'])
                # The if-else order matters, can not be changed. Because ipython may provide multiple output.
                # TEXT is the last resort type.
                if 'text/html' in content['data']:
                    outType = kernel_pb2.HTML
                    output = content['data']['text/html']
                elif 'image/jpeg' in content['data']:
                    outType = kernel_pb2.JPEG
                    output = content['data']['image/jpeg']
                elif 'image/png' in content['data']:
                    outType = kernel_pb2.PNG
                    output = content['data']['image/png']
                elif 'application/javascript' in content['data']:
                    outType = kernel_pb2.HTML
                    output = '<script> ' + content['data']['application/javascript'] + ' </script>\n'
                elif 'application/vnd.holoviews_load.v0+json' in content['data']:
                    outType = kernel_pb2.HTML
                    output = '<script> ' + content['data']['application/vnd.holoviews_load.v0+json'] + ' </script>\n'
                elif 'text/plain' in content['data']:
                    outType = kernel_pb2.TEXT
                    output = content['data']['text/plain']
            elif msg_type == 'error':
                outStatus = kernel_pb2.ERROR
                outType = kernel_pb2.TEXT
                output = '\n'.join(content['traceback'])
            elif msg_type == 'clear_output':
                outType = kernel_pb2.CLEAR
                output = ""

            # send reply if we supported the output type
            if outType is not None:
                stream_reply_queue.put(
                    kernel_pb2.ExecuteResponse(status=outStatus,
                                                type=outType,
                                                output=output))
        def execute_worker():
            reply = self._kc.execute_interactive(request.code,
                                          output_hook=_output_hook,
                                          timeout=None)
            payload_reply.append(reply)

        t = threading.Thread(name="ConsumerThread", target=execute_worker)
        with self._lock:
            t.start()
            # We want to wait the end of the execution (and queue empty).
            # In our case when the thread is not alive -> it means that the execution is complete
            # However we also ensure that the kernel is alive because in case of OOM or other errors
            # Execution might be stuck there: (might open issue on jupyter client)
            # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L323
            while (t.is_alive() and self.isKernelAlive()) or not stream_reply_queue.empty():
                # Sleeping time to time to reduce cpu usage.
                # At worst it will bring a 0.05 delay for bunch of messages.
                # Overall it will improve performance.
                time.sleep(0.05)
                while not stream_reply_queue.empty():
                    yield stream_reply_queue.get()

            # if kernel is not alive or thread is still alive, it means that we face an issue.
            if not self.isKernelAlive() or t.is_alive():
                yield kernel_pb2.ExecuteResponse(status=kernel_pb2.ERROR,
                                                  type=kernel_pb2.TEXT,
                                                  output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
        if payload_reply:
            result = []
            if 'payload' in payload_reply[0]['content']:
                for payload in payload_reply[0]['content']['payload']:
                    if payload['data']['text/plain']:
                        result.append(payload['data']['text/plain'])
            if result:
                yield kernel_pb2.ExecuteResponse(status=kernel_pb2.SUCCESS,
                                                  type=kernel_pb2.TEXT,
                                                  output='\n'.join(result))