analytical_engine/frame/cython_pie_app_frame.cc (149 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// The _GRAPH_HEADER at begin
#define DO_QUOTE(X) #X
#define QUOTE(X) DO_QUOTE(X)
#if defined(_GRAPH_TYPE) && defined(_GRAPH_HEADER)
#include QUOTE(_GRAPH_HEADER)
#else
#error "Missing macro _GRAPH_TYPE or _GRAPH_HEADER"
#endif
#include <iostream>
#include <memory>
#include <string>
#include <utility>
#include "arrow/api.h"
#include "grape/util.h"
#include "vineyard/graph/fragment/arrow_fragment.h"
#if defined __has_include
#if __has_include("vineyard/graph/fragment/arrow_fragment_modifier.h")
#include "vineyard/graph/fragment/arrow_fragment_modifier.h"
#endif
#endif
#include "apps/python_pie/cython_pie_program.h"
#include "apps/python_pie/export.h"
#include "apps/python_pie/python_pie_app.h"
#include "core/app/app_invoker.h"
#include "core/error.h"
#include "frame/ctx_wrapper_builder.h"
#include "proto/data_types.pb.h"
#include "proto/types.pb.h"
#ifdef _APP_HEADER
#include QUOTE(_APP_HEADER)
#else
#error "Missing macro _APP_HEADER"
#endif
namespace bl = boost::leaf;
using string = std::string;
#if !defined(_OID_TYPE)
#define _OID_TYPE vineyard::property_graph_types::OID_TYPE
#endif
#if defined(_VD_TYPE) && defined(_MD_TYPE)
#else
#define _VD_TYPE double
#define _MD_TYPE double
#endif
#if defined(_MODULE_NAME)
#else
#error "Missing macro _MODULE_NAME"
#endif
// #if defined(_APP_TYPE) && defined(_APP_HEADER)
// #else
#define _APP_TYPE \
gs::PythonPIEApp<_GRAPH_TYPE, gs::CythonPIEProgram<_VD_TYPE, _MD_TYPE>>
// #endif
#define _DATA_TYPE typename _APP_TYPE::context_t::data_t
/**
* cython_pie_app_frame.cc is designed to serve for building apps as a library.
* The library provides CreateWorker, Query, and DeleteWorker functions to be
* invoked by the grape instance. The library will be loaded when a CREATE_APP
* request arrived on the analytical engine. Then multiple query requests can be
* emitted based on worker instance. Finally, a UNLOAD_APP request should be
* submitted to release the resources.
*/
namespace python_grape {
void _Init(Fragment& frag, Context<_VD_TYPE, _MD_TYPE>& context) {
Init(frag, context);
}
void _PEval(Fragment& frag, Context<_VD_TYPE, _MD_TYPE>& context) {
PEval(frag, context);
}
void _IncEval(Fragment& frag, Context<_VD_TYPE, _MD_TYPE>& context) {
IncEval(frag, context);
}
void AppInit() {
#define INIT_PREFIX PyInit_
#define PPCAT_NX(A, B) A##B
#define PPCAT(A, B) PPCAT_NX(A, B)
int err = PyImport_AppendInittab(QUOTE(_MODULE_NAME),
PPCAT(INIT_PREFIX, _MODULE_NAME));
if (err < 0) {
printf("Cannot initialize Python module...\\n");
return;
}
#undef PPCAT
#undef PPCAT_NX
#undef INIT_PREFIX
if (Py_IsInitialized()) {
Py_Finalize();
}
Py_Initialize();
PyImport_ImportModule(QUOTE(_MODULE_NAME));
}
std::shared_ptr<_APP_TYPE> CreateApp() {
AppInit();
gs::CythonPIEProgram<_VD_TYPE, _MD_TYPE> program;
program.SetInitFunction(_Init);
program.SetPEvalFunction(_PEval);
program.SetIncEvalFunction(_IncEval);
return std::make_shared<_APP_TYPE>(program);
}
} // namespace python_grape
typedef struct worker_handler {
std::shared_ptr<typename _APP_TYPE::worker_t> worker;
} worker_handler_t;
namespace detail {
__attribute__((visibility("hidden"))) void* CreateWorker(
const std::shared_ptr<void>& fragment, const grape::CommSpec& comm_spec,
const grape::ParallelEngineSpec& spec) {
auto app = python_grape::CreateApp();
auto* worker_handler = static_cast<worker_handler_t*>(new worker_handler_t);
worker_handler->worker = _APP_TYPE::CreateWorker(
app, std::static_pointer_cast<_APP_TYPE::fragment_t>(fragment));
worker_handler->worker->Init(comm_spec, spec);
return worker_handler;
}
__attribute__((visibility("hidden"))) std::nullptr_t DeleteWorker(
void* worker_handler) {
auto* handler = static_cast<worker_handler_t*>(worker_handler);
handler->worker->Finalize();
handler->worker.reset();
delete handler;
return nullptr;
}
__attribute__((visibility("hidden"))) bl::result<std::nullptr_t> Query(
void* worker_handler, const gs::rpc::QueryArgs& query_args,
const std::string& context_key,
std::shared_ptr<gs::IFragmentWrapper> frag_wrapper,
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper) {
auto worker = static_cast<worker_handler_t*>(worker_handler)->worker;
auto result = gs::AppInvoker<_APP_TYPE>::Query(worker, query_args);
if (result) {
if (!context_key.empty()) {
auto ctx = worker->GetContext();
ctx_wrapper = gs::CtxWrapperBuilder<typename _APP_TYPE::context_t>::build(
context_key, frag_wrapper, ctx);
}
}
return result;
}
} // namespace detail
extern "C" {
void* CreateWorker(const std::shared_ptr<void>& fragment,
const grape::CommSpec& comm_spec,
const grape::ParallelEngineSpec& spec) {
void* worker_handler = nullptr;
__FRAME_CATCH_AND_LOG_GS_ERROR(
worker_handler, detail::CreateWorker(fragment, comm_spec, spec));
return worker_handler;
}
void DeleteWorker(void* worker_handler) {
std::nullptr_t result = nullptr;
__FRAME_CATCH_AND_LOG_GS_ERROR(result, detail::DeleteWorker(worker_handler));
}
void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args,
const std::string& context_key,
std::shared_ptr<gs::IFragmentWrapper> frag_wrapper,
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper,
bl::result<nullptr_t>& wrapper_error) {
__FRAME_CATCH_AND_ASSIGN_GS_ERROR(
wrapper_error, detail::Query(worker_handler, query_args, context_key,
frag_wrapper, ctx_wrapper));
}
} // extern "C"