in coordinator/gscoordinator/op_executor.py [0:0]
def _gremlin_to_subgraph(self, op: op_def_pb2.OpDef): # noqa: C901
gremlin_script = op.attr[types_pb2.GIE_GREMLIN_QUERY_MESSAGE].s.decode()
oid_type = op.attr[types_pb2.OID_TYPE].s.decode()
request_options = None
if types_pb2.GIE_GREMLIN_REQUEST_OPTIONS in op.attr:
request_options = json.loads(
op.attr[types_pb2.GIE_GREMLIN_REQUEST_OPTIONS].s.decode()
)
object_id = op.attr[types_pb2.VINEYARD_ID].i
gremlin_client = self._object_manager.get(object_id)
def create_global_graph_builder(
graph_name, num_workers, threads_per_executor, vineyard_rpc_endpoint
):
import vineyard
vineyard_client = vineyard.connect(*vineyard_rpc_endpoint.split(":"))
instances = [key for key in vineyard_client.meta]
# duplicate each instances for each thread per worker.
if len(instances) == num_workers:
local_stream_chunks = threads_per_executor
else:
assert (
num_workers % len(instances) == 0
), f"Unable to distribute {num_workers} workers to {len(instances)} instances"
local_stream_chunks = (
num_workers // len(instances) * threads_per_executor
)
chunk_instances = [
key for key in instances for _ in range(local_stream_chunks)
]
# build the vineyard::GlobalPGStream
metadata = vineyard.ObjectMeta()
metadata.set_global(True)
metadata["typename"] = "vineyard::htap::GlobalPGStream"
metadata["local_stream_chunks"] = local_stream_chunks
metadata["total_stream_chunks"] = len(chunk_instances)
# build the parallel stream for edge
edge_metadata = vineyard.ObjectMeta()
edge_metadata.set_global(True)
edge_metadata["typename"] = "vineyard::ParallelStream"
edge_metadata["__streams_-size"] = len(chunk_instances)
# build the parallel stream for vertex
vertex_metadata = vineyard.ObjectMeta()
vertex_metadata.set_global(True)
vertex_metadata["typename"] = "vineyard::ParallelStream"
vertex_metadata["__streams_-size"] = len(chunk_instances)
vertex_streams, edge_streams = [], []
# NB: we don't respect `num_workers`, instead, we create a substream
# on each vineyard instance.
#
# Such a choice is to handle cases where that etcd instance still contains
# information about dead instances.
#
# It should be ok, as each engine work will get its own local stream. But,
# generally it should be equal to `num_workers`.
for worker, instance_id in enumerate(chunk_instances):
edge_stream = vineyard.ObjectMeta()
edge_stream["typename"] = "vineyard::RecordBatchStream"
edge_stream["nbytes"] = 0
edge_stream["params_"] = json.dumps(
{
"graph_name": graph_name,
"kind": "edge",
}
)
edge = vineyard_client.create_metadata(edge_stream, instance_id)
vineyard_client.persist(edge.id)
edge_metadata.add_member("__streams_-%d" % worker, edge)
edge_streams.append(edge.id)
vertex_stream = vineyard.ObjectMeta()
vertex_stream["typename"] = "vineyard::RecordBatchStream"
vertex_stream["nbytes"] = 0
vertex_stream["params_"] = json.dumps(
{
"graph_name": graph_name,
"kind": "vertex",
}
)
vertex = vineyard_client.create_metadata(vertex_stream, instance_id)
vineyard_client.persist(vertex.id)
vertex_metadata.add_member("__streams_-%d" % worker, vertex)
vertex_streams.append(vertex.id)
chunk_stream = vineyard.ObjectMeta()
chunk_stream["typename"] = "vineyard::htap::PropertyGraphOutStream"
chunk_stream["graph_name"] = graph_name
chunk_stream["graph_schema"] = "{}"
chunk_stream["nbytes"] = 0
chunk_stream["stream_index"] = worker
chunk_stream.add_member("edge_stream", edge)
chunk_stream.add_member("vertex_stream", vertex)
chunk = vineyard_client.create_metadata(chunk_stream, instance_id)
vineyard_client.persist(chunk.id)
metadata.add_member("stream_chunk_%d" % worker, chunk)
# build the vineyard::GlobalPGStream
graph = vineyard_client.create_metadata(metadata)
vineyard_client.persist(graph.id)
vineyard_client.put_name(graph.id, graph_name)
# build the parallel stream for edge
edge = vineyard_client.create_metadata(edge_metadata)
vineyard_client.persist(edge.id)
vineyard_client.put_name(edge.id, f"__{graph_name}_edge_stream")
# build the parallel stream for vertex
vertex = vineyard_client.create_metadata(vertex_metadata)
vineyard_client.persist(vertex.id)
vineyard_client.put_name(vertex.id, f"__{graph_name}_vertex_stream")
return (
repr(graph.id),
repr(edge.id),
repr(vertex.id),
vertex_streams,
edge_streams,
)
def cleanup_stream(
graph_name,
vineyard_rpc_endpoint,
vertex_stream_id,
edge_stream_id,
vertex_streams,
edge_streams,
):
import vineyard
vineyard_client = vineyard.connect(*vineyard_rpc_endpoint.split(":"))
vertex_stream_id = vineyard.ObjectID(vertex_stream_id)
edge_stream_id = vineyard.ObjectID(edge_stream_id)
for s in itertools.chain(vertex_streams, edge_streams):
try:
vineyard_client.stop_stream(vineyard.ObjectID(s), failed=True)
except Exception: # noqa: E722, pylint: disable=broad-except
pass
try:
vineyard_client.drop_stream(vineyard.ObjectID(s))
except Exception: # noqa: E722, pylint: disable=broad-except
pass
try:
vineyard_client.drop_name(f"__{graph_name}_vertex_stream")
except Exception: # noqa: E722, pylint: disable=broad-except
pass
try:
vineyard_client.drop_name(f"__{graph_name}_edge_stream")
except Exception: # noqa: E722, pylint: disable=broad-except
pass
try:
vineyard_client.drop_name(graph_name)
except Exception: # noqa: E722, pylint: disable=broad-except
pass
def load_subgraph(
graph_name,
total_builder_chunks,
oid_type,
edge_stream_id,
vertex_stream_id,
vineyard_rpc_endpoint,
):
import vineyard
# wait all flags been created, see also
#
# `PropertyGraphOutStream::Initialize(Schema schema)`
vineyard_client = vineyard.connect(*vineyard_rpc_endpoint.split(":"))
# wait for all stream been created by GAIA executor in FFI
for worker in range(total_builder_chunks):
name = "__%s_%d_streamed" % (graph_name, worker)
vineyard_client.get_name(name, wait=True)
vertices = [Loader(vineyard.ObjectID(vertex_stream_id))]
edges = [Loader(vineyard.ObjectID(edge_stream_id))]
oid_type = normalize_data_type_str(oid_type)
v_labels = normalize_parameter_vertices(vertices, oid_type)
e_labels = normalize_parameter_edges(edges, oid_type)
loader_op = create_loader(v_labels + e_labels)
config = {
types_pb2.DIRECTED: utils.b_to_attr(True),
types_pb2.OID_TYPE: utils.s_to_attr(oid_type),
types_pb2.GENERATE_EID: utils.b_to_attr(False),
# otherwise the new graph cannot be used for GIE
types_pb2.RETAIN_OID: utils.b_to_attr(True),
types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"),
types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False),
types_pb2.COMPACT_EDGES: utils.b_to_attr(False),
types_pb2.USE_PERFECT_HASH: utils.b_to_attr(False),
}
new_op = create_graph(
self._session_id,
graph_def_pb2.ARROW_PROPERTY,
inputs=[loader_op],
attrs=config,
)
# spawn a vineyard stream loader on coordinator
loader_op_def = loader_op.as_op_def()
coordinator_dag = op_def_pb2.DagDef()
coordinator_dag.op.extend([loader_op_def])
# set the same key from subgraph to new op
new_op_def = new_op.as_op_def()
new_op_def.key = op.key
dag = op_def_pb2.DagDef()
dag.op.extend([new_op_def])
self.run_on_coordinator(coordinator_dag, [], {})
response_head, _ = self.run_on_analytical_engine(dag, [], {})
logger.info("subgraph has been loaded")
return response_head.head.results[-1]
# generate a random graph name
now_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
random_num = random.randint(0, 10000000)
graph_name = "subgraph-%s-%s" % (str(now_time), str(random_num))
threads_per_worker = int(
os.environ.get("THREADS_PER_WORKER", INTERACTIVE_ENGINE_THREADS_PER_WORKER)
)
if (
self._launcher.type() == types_pb2.HOSTS
and os.environ.get("PARALLEL_INTERACTIVE_EXECUTOR_ON_VINEYARD", "OFF")
!= "ON"
):
executor_workers_num = 1
threads_per_executor = self._launcher.num_workers * threads_per_worker
else:
executor_workers_num = self._launcher.num_workers
threads_per_executor = threads_per_worker
vineyard_rpc_endpoint = self._launcher.vineyard_endpoint
total_builder_chunks = executor_workers_num * threads_per_executor
(
_graph_builder_id,
edge_stream_id,
vertex_stream_id,
vertex_streams,
edge_streams,
) = create_global_graph_builder(
graph_name,
executor_workers_num,
threads_per_executor,
vineyard_rpc_endpoint,
)
# start a thread to launch the graph
pool = futures.ThreadPoolExecutor()
subgraph_task = pool.submit(
load_subgraph,
graph_name,
total_builder_chunks,
oid_type,
edge_stream_id,
vertex_stream_id,
vineyard_rpc_endpoint,
)
# add subgraph vertices and edges
subgraph_script = "{0}.subgraph('{1}')".format(
gremlin_script,
graph_name,
)
gremlin_error_message, graph_loading_error_message = None, None
try:
gremlin_client.submit(
subgraph_script, request_options=request_options
).all().result()
except Exception: # noqa: E722, pylint: disable=broad-except
# # abort the streams
e, err, _ = sys.exc_info()
gremlin_error_message = (
f"Exception during subgraph's gremlin query execution: "
f"'{e}', '{err}', with traceback: {traceback.format_exc()}"
)
logger.error(gremlin_error_message)
# cancel the stream to let the analytical engine exit the current loop
logger.info("clean up stream ...")
cleanup_stream(
graph_name,
vineyard_rpc_endpoint,
vertex_stream_id,
edge_stream_id,
vertex_streams,
edge_streams,
)
logger.info("clean up stream finished ...")
subgraph_object = None
try:
subgraph_object = subgraph_task.result()
except Exception: # noqa: E722, pylint: disable=broad-except
e, err, _ = sys.exc_info()
graph_loading_error_message = (
f"Exception during subgraph's graph loading execution: "
f"'{e}', '{err}', with traceback: {traceback.format_exc()}"
)
logger.error(graph_loading_error_message)
if gremlin_error_message is not None or graph_loading_error_message is not None:
error_message = (
f"Error during subgraph execution, "
f'gremlin error: "{gremlin_error_message}", '
f'graph loading error: "{graph_loading_error_message}"'
)
raise RuntimeError(error_message)
return subgraph_object