python/graphscope/client/rpc.py (189 lines of code) (raw):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import logging
import pickle
import threading
import time
import grpc
from graphscope.client.utils import GS_GRPC_MAX_MESSAGE_LENGTH
from graphscope.client.utils import GRPCUtils
from graphscope.client.utils import handle_grpc_error
from graphscope.client.utils import suppress_grpc_error
from graphscope.proto import coordinator_service_pb2_grpc
from graphscope.proto import message_pb2
from graphscope.proto.error import coordinator_pb2
from graphscope.version import __version__
logger = logging.getLogger("graphscope")
class GRPCClient(object):
def __init__(self, launcher, endpoint, reconnect=False):
"""Connect to GRAPE engine at the given :code:`endpoint`."""
# create the gRPC stub
self._options = [
("grpc.max_send_message_length", GS_GRPC_MAX_MESSAGE_LENGTH),
("grpc.max_receive_message_length", GS_GRPC_MAX_MESSAGE_LENGTH),
("grpc.max_metadata_size", GS_GRPC_MAX_MESSAGE_LENGTH),
]
self._endpoint = endpoint
self._launcher = launcher
self._grpc_utils = GRPCUtils()
self._stub = self._get_stub()
self._session_id = None
self._logs_fetching_thread = None
self._reconnect = reconnect
def _get_stub(self):
channel = grpc.insecure_channel(self._endpoint, options=self._options)
return coordinator_service_pb2_grpc.CoordinatorServiceStub(channel)
def waiting_service_ready(self, timeout_seconds=60):
begin_time = time.time()
request = message_pb2.HeartBeatRequest()
while True:
if self._launcher:
code = self._launcher.poll()
if code is not None and code != 0:
raise RuntimeError(
f"Start coordinator failed with exit code {code}"
)
try:
self._stub.HeartBeat(request)
logger.info("GraphScope coordinator service connected.")
break
except grpc.RpcError as e:
# Cannot connect to coordinator for a short time is expected
# as the coordinator takes some time to launch
msg = f"code: {e.code().name}, details: {e.details()}"
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
logger.warning("Heart beat analytical engine failed, %s", msg)
if time.time() - begin_time >= timeout_seconds:
raise ConnectionError(f"Connect coordinator timeout, {msg}")
# refresh the channel in case the server became available
if e.code() == grpc.StatusCode.UNAVAILABLE:
self._stub = self._get_stub()
time.sleep(1)
def connect(self, cleanup_instance=True, dangling_timeout_seconds=60):
return self._connect_session_impl(
cleanup_instance=cleanup_instance,
dangling_timeout_seconds=dangling_timeout_seconds,
)
@property
def session_id(self):
return self._session_id
def __str__(self):
return "%s" % self._session_id
def __repr__(self):
return str(self)
def run(self, dag_def):
return self._run_step_impl(dag_def)
def fetch_logs(self):
if self._logs_fetching_thread is None:
self._logs_fetching_thread = threading.Thread(
target=self._fetch_logs_impl, args=()
)
self._logs_fetching_thread.daemon = True
self._logs_fetching_thread.start()
def add_lib(self, gar):
if self._session_id:
return self._add_lib_impl(gar)
logger.error("adding lib to a closed session")
def close(self):
if self._session_id:
self._close_session_impl()
self._session_id = None
if self._logs_fetching_thread is not None:
self._logs_fetching_thread.join(timeout=5)
@handle_grpc_error
def send_heartbeat(self):
request = message_pb2.HeartBeatRequest()
return self._stub.HeartBeat(request)
@handle_grpc_error
def _add_lib_impl(self, gar):
request = message_pb2.AddLibRequest(session_id=self._session_id, gar=gar)
return self._stub.AddLib(request)
@handle_grpc_error
def _connect_session_impl(self, cleanup_instance=True, dangling_timeout_seconds=60):
"""
Args:
cleanup_instance (bool, optional): If True, also delete graphscope
instance (such as pod) in closing process.
dangling_timeout_seconds (int, optional): After seconds of client
disconnect, coordinator will kill this graphscope instance.
Disable dangling check by setting -1.
"""
request = message_pb2.ConnectSessionRequest(
cleanup_instance=cleanup_instance,
dangling_timeout_seconds=dangling_timeout_seconds,
version=__version__,
reconnect=self._reconnect,
)
response = self._stub.ConnectSession(request)
self._session_id = response.session_id
return (
response.session_id,
response.cluster_type,
response.num_workers,
response.namespace,
json.loads(response.engine_config),
response.host_names,
)
@suppress_grpc_error
def _fetch_logs_impl(self):
request = message_pb2.FetchLogsRequest(session_id=self._session_id)
responses = self._stub.FetchLogs(request)
for res in responses:
info, error = res.info_message.rstrip(), res.error_message.rstrip()
if info:
logger.info(info, extra={"simple": True})
if error:
logger.error(error, extra={"simple": True})
@handle_grpc_error
def _close_session_impl(self):
request = message_pb2.CloseSessionRequest(session_id=self._session_id)
response = self._stub.CloseSession(request)
return response
@handle_grpc_error(False) # don't retry the "RunStep" request.
def _run_step_impl(self, dag_def):
# note that the "_impl" may be retried, thus the argument cannot be a
# generator or an iterator.
runstep_requests = self._grpc_utils.generate_runstep_requests(
self._session_id, dag_def
)
response = self._grpc_utils.parse_runstep_responses(
self._stub.RunStep(runstep_requests)
)
if response.code != coordinator_pb2.OK:
logger.error(
"Runstep failed with code: %s, message: %s",
coordinator_pb2.Code.Name(response.code),
response.error_msg,
)
if response.full_exception:
exc = pickle.loads(response.full_exception)
if isinstance(exc, tuple):
raise exc[0](*exc[1:])
else:
raise exc
return response
def create_analytical_instance(self):
request = message_pb2.CreateAnalyticalInstanceRequest(
session_id=self._session_id
)
response = self._stub.CreateAnalyticalInstance(request)
return json.loads(response.engine_config), response.host_names
def create_interactive_instance(self, object_id, schema_path, params, with_cypher):
request = message_pb2.CreateInteractiveInstanceRequest(
session_id=self._session_id,
object_id=object_id,
schema_path=schema_path,
with_cypher=with_cypher,
)
if params is not None:
for k, v in params.items():
request.params[str(k)] = str(v)
response = self._stub.CreateInteractiveInstance(request)
return response.gremlin_endpoint, response.cypher_endpoint
def create_learning_instance(self, object_id, handle, config, learning_backend):
request = message_pb2.CreateLearningInstanceRequest(session_id=self._session_id)
request.object_id = object_id
request.handle = handle
request.config = config
request.learning_backend = learning_backend
response = self._stub.CreateLearningInstance(request)
return response.handle, response.config, response.endpoints
def close_analytical_instance(self):
request = message_pb2.CloseAnalyticalInstanceRequest(
session_id=self._session_id
)
self._stub.CloseAnalyticalInstance(request)
def close_interactive_instance(self, object_id):
request = message_pb2.CloseInteractiveInstanceRequest(
session_id=self._session_id, object_id=object_id
)
self._stub.CloseInteractiveInstance(request)
def close_learning_instance(self, object_id):
request = message_pb2.CloseLearningInstanceRequest(
session_id=self._session_id, object_id=object_id
)
self._stub.CloseLearningInstance(request)