def _gremlin_to_subgraph()

in coordinator/gscoordinator/op_executor.py [0:0]


    def _gremlin_to_subgraph(self, op: op_def_pb2.OpDef):  # noqa: C901
        gremlin_script = op.attr[types_pb2.GIE_GREMLIN_QUERY_MESSAGE].s.decode()
        oid_type = op.attr[types_pb2.OID_TYPE].s.decode()
        request_options = None
        if types_pb2.GIE_GREMLIN_REQUEST_OPTIONS in op.attr:
            request_options = json.loads(
                op.attr[types_pb2.GIE_GREMLIN_REQUEST_OPTIONS].s.decode()
            )
        object_id = op.attr[types_pb2.VINEYARD_ID].i
        gremlin_client = self._object_manager.get(object_id)

        def create_global_graph_builder(
            graph_name, num_workers, threads_per_executor, vineyard_rpc_endpoint
        ):
            import vineyard

            vineyard_client = vineyard.connect(*vineyard_rpc_endpoint.split(":"))

            instances = [key for key in vineyard_client.meta]

            # duplicate each instances for each thread per worker.
            if len(instances) == num_workers:
                local_stream_chunks = threads_per_executor
            else:
                assert (
                    num_workers % len(instances) == 0
                ), f"Unable to distribute {num_workers} workers to {len(instances)} instances"
                local_stream_chunks = (
                    num_workers // len(instances) * threads_per_executor
                )
            chunk_instances = [
                key for key in instances for _ in range(local_stream_chunks)
            ]

            # build the vineyard::GlobalPGStream
            metadata = vineyard.ObjectMeta()
            metadata.set_global(True)
            metadata["typename"] = "vineyard::htap::GlobalPGStream"
            metadata["local_stream_chunks"] = local_stream_chunks
            metadata["total_stream_chunks"] = len(chunk_instances)

            # build the parallel stream for edge
            edge_metadata = vineyard.ObjectMeta()
            edge_metadata.set_global(True)
            edge_metadata["typename"] = "vineyard::ParallelStream"
            edge_metadata["__streams_-size"] = len(chunk_instances)

            # build the parallel stream for vertex
            vertex_metadata = vineyard.ObjectMeta()
            vertex_metadata.set_global(True)
            vertex_metadata["typename"] = "vineyard::ParallelStream"
            vertex_metadata["__streams_-size"] = len(chunk_instances)

            vertex_streams, edge_streams = [], []

            # NB: we don't respect `num_workers`, instead, we create a substream
            # on each vineyard instance.
            #
            # Such a choice is to handle cases where that etcd instance still contains
            # information about dead instances.
            #
            # It should be ok, as each engine work will get its own local stream. But,
            # generally it should be equal to `num_workers`.
            for worker, instance_id in enumerate(chunk_instances):
                edge_stream = vineyard.ObjectMeta()
                edge_stream["typename"] = "vineyard::RecordBatchStream"
                edge_stream["nbytes"] = 0
                edge_stream["params_"] = json.dumps(
                    {
                        "graph_name": graph_name,
                        "kind": "edge",
                    }
                )
                edge = vineyard_client.create_metadata(edge_stream, instance_id)
                vineyard_client.persist(edge.id)
                edge_metadata.add_member("__streams_-%d" % worker, edge)
                edge_streams.append(edge.id)

                vertex_stream = vineyard.ObjectMeta()
                vertex_stream["typename"] = "vineyard::RecordBatchStream"
                vertex_stream["nbytes"] = 0
                vertex_stream["params_"] = json.dumps(
                    {
                        "graph_name": graph_name,
                        "kind": "vertex",
                    }
                )
                vertex = vineyard_client.create_metadata(vertex_stream, instance_id)
                vineyard_client.persist(vertex.id)
                vertex_metadata.add_member("__streams_-%d" % worker, vertex)
                vertex_streams.append(vertex.id)

                chunk_stream = vineyard.ObjectMeta()
                chunk_stream["typename"] = "vineyard::htap::PropertyGraphOutStream"
                chunk_stream["graph_name"] = graph_name
                chunk_stream["graph_schema"] = "{}"
                chunk_stream["nbytes"] = 0
                chunk_stream["stream_index"] = worker
                chunk_stream.add_member("edge_stream", edge)
                chunk_stream.add_member("vertex_stream", vertex)
                chunk = vineyard_client.create_metadata(chunk_stream, instance_id)
                vineyard_client.persist(chunk.id)
                metadata.add_member("stream_chunk_%d" % worker, chunk)

            # build the vineyard::GlobalPGStream
            graph = vineyard_client.create_metadata(metadata)
            vineyard_client.persist(graph.id)
            vineyard_client.put_name(graph.id, graph_name)

            # build the parallel stream for edge
            edge = vineyard_client.create_metadata(edge_metadata)
            vineyard_client.persist(edge.id)
            vineyard_client.put_name(edge.id, f"__{graph_name}_edge_stream")

            # build the parallel stream for vertex
            vertex = vineyard_client.create_metadata(vertex_metadata)
            vineyard_client.persist(vertex.id)
            vineyard_client.put_name(vertex.id, f"__{graph_name}_vertex_stream")

            return (
                repr(graph.id),
                repr(edge.id),
                repr(vertex.id),
                vertex_streams,
                edge_streams,
            )

        def cleanup_stream(
            graph_name,
            vineyard_rpc_endpoint,
            vertex_stream_id,
            edge_stream_id,
            vertex_streams,
            edge_streams,
        ):
            import vineyard

            vineyard_client = vineyard.connect(*vineyard_rpc_endpoint.split(":"))

            vertex_stream_id = vineyard.ObjectID(vertex_stream_id)
            edge_stream_id = vineyard.ObjectID(edge_stream_id)
            for s in itertools.chain(vertex_streams, edge_streams):
                try:
                    vineyard_client.stop_stream(vineyard.ObjectID(s), failed=True)
                except Exception:  # noqa: E722, pylint: disable=broad-except
                    pass
                try:
                    vineyard_client.drop_stream(vineyard.ObjectID(s))
                except Exception:  # noqa: E722, pylint: disable=broad-except
                    pass
            try:
                vineyard_client.drop_name(f"__{graph_name}_vertex_stream")
            except Exception:  # noqa: E722, pylint: disable=broad-except
                pass
            try:
                vineyard_client.drop_name(f"__{graph_name}_edge_stream")
            except Exception:  # noqa: E722, pylint: disable=broad-except
                pass
            try:
                vineyard_client.drop_name(graph_name)
            except Exception:  # noqa: E722, pylint: disable=broad-except
                pass

        def load_subgraph(
            graph_name,
            total_builder_chunks,
            oid_type,
            edge_stream_id,
            vertex_stream_id,
            vineyard_rpc_endpoint,
        ):
            import vineyard

            # wait all flags been created, see also
            #
            # `PropertyGraphOutStream::Initialize(Schema schema)`
            vineyard_client = vineyard.connect(*vineyard_rpc_endpoint.split(":"))

            # wait for all stream been created by GAIA executor in FFI
            for worker in range(total_builder_chunks):
                name = "__%s_%d_streamed" % (graph_name, worker)
                vineyard_client.get_name(name, wait=True)

            vertices = [Loader(vineyard.ObjectID(vertex_stream_id))]
            edges = [Loader(vineyard.ObjectID(edge_stream_id))]
            oid_type = normalize_data_type_str(oid_type)
            v_labels = normalize_parameter_vertices(vertices, oid_type)
            e_labels = normalize_parameter_edges(edges, oid_type)
            loader_op = create_loader(v_labels + e_labels)
            config = {
                types_pb2.DIRECTED: utils.b_to_attr(True),
                types_pb2.OID_TYPE: utils.s_to_attr(oid_type),
                types_pb2.GENERATE_EID: utils.b_to_attr(False),
                # otherwise the new graph cannot be used for GIE
                types_pb2.RETAIN_OID: utils.b_to_attr(True),
                types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"),
                types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False),
                types_pb2.COMPACT_EDGES: utils.b_to_attr(False),
                types_pb2.USE_PERFECT_HASH: utils.b_to_attr(False),
            }
            new_op = create_graph(
                self._session_id,
                graph_def_pb2.ARROW_PROPERTY,
                inputs=[loader_op],
                attrs=config,
            )
            # spawn a vineyard stream loader on coordinator
            loader_op_def = loader_op.as_op_def()
            coordinator_dag = op_def_pb2.DagDef()
            coordinator_dag.op.extend([loader_op_def])
            # set the same key from subgraph to new op
            new_op_def = new_op.as_op_def()
            new_op_def.key = op.key
            dag = op_def_pb2.DagDef()
            dag.op.extend([new_op_def])
            self.run_on_coordinator(coordinator_dag, [], {})
            response_head, _ = self.run_on_analytical_engine(dag, [], {})
            logger.info("subgraph has been loaded")
            return response_head.head.results[-1]

        # generate a random graph name
        now_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
        random_num = random.randint(0, 10000000)
        graph_name = "subgraph-%s-%s" % (str(now_time), str(random_num))

        threads_per_worker = int(
            os.environ.get("THREADS_PER_WORKER", INTERACTIVE_ENGINE_THREADS_PER_WORKER)
        )

        if (
            self._launcher.type() == types_pb2.HOSTS
            and os.environ.get("PARALLEL_INTERACTIVE_EXECUTOR_ON_VINEYARD", "OFF")
            != "ON"
        ):
            executor_workers_num = 1
            threads_per_executor = self._launcher.num_workers * threads_per_worker
        else:
            executor_workers_num = self._launcher.num_workers
            threads_per_executor = threads_per_worker
        vineyard_rpc_endpoint = self._launcher.vineyard_endpoint
        total_builder_chunks = executor_workers_num * threads_per_executor

        (
            _graph_builder_id,
            edge_stream_id,
            vertex_stream_id,
            vertex_streams,
            edge_streams,
        ) = create_global_graph_builder(
            graph_name,
            executor_workers_num,
            threads_per_executor,
            vineyard_rpc_endpoint,
        )
        # start a thread to launch the graph
        pool = futures.ThreadPoolExecutor()
        subgraph_task = pool.submit(
            load_subgraph,
            graph_name,
            total_builder_chunks,
            oid_type,
            edge_stream_id,
            vertex_stream_id,
            vineyard_rpc_endpoint,
        )

        # add subgraph vertices and edges
        subgraph_script = "{0}.subgraph('{1}')".format(
            gremlin_script,
            graph_name,
        )

        gremlin_error_message, graph_loading_error_message = None, None

        try:
            gremlin_client.submit(
                subgraph_script, request_options=request_options
            ).all().result()
        except Exception:  # noqa: E722, pylint: disable=broad-except
            # # abort the streams
            e, err, _ = sys.exc_info()
            gremlin_error_message = (
                f"Exception during subgraph's gremlin query execution: "
                f"'{e}', '{err}', with traceback: {traceback.format_exc()}"
            )
            logger.error(gremlin_error_message)
            # cancel the stream to let the analytical engine exit the current loop
            logger.info("clean up stream ...")
            cleanup_stream(
                graph_name,
                vineyard_rpc_endpoint,
                vertex_stream_id,
                edge_stream_id,
                vertex_streams,
                edge_streams,
            )
            logger.info("clean up stream finished ...")

        subgraph_object = None
        try:
            subgraph_object = subgraph_task.result()
        except Exception:  # noqa: E722, pylint: disable=broad-except
            e, err, _ = sys.exc_info()
            graph_loading_error_message = (
                f"Exception during subgraph's graph loading execution: "
                f"'{e}', '{err}', with traceback: {traceback.format_exc()}"
            )
            logger.error(graph_loading_error_message)

        if gremlin_error_message is not None or graph_loading_error_message is not None:
            error_message = (
                f"Error during subgraph execution, "
                f'gremlin error: "{gremlin_error_message}", '
                f'graph loading error: "{graph_loading_error_message}"'
            )
            raise RuntimeError(error_message)
        return subgraph_object