gdbclient/gdbclient.go (197 lines of code) (raw):
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
/**
* @author : Liu Jianping
* @date : 2019/11/20
*/
package gdbclient
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
"unsafe"
"github.com/aliyun/alibabacloud-gdb-go-sdk/gdbclient/graph"
"github.com/aliyun/alibabacloud-gdb-go-sdk/gdbclient/internal"
"github.com/aliyun/alibabacloud-gdb-go-sdk/gdbclient/internal/graphsonv3"
"github.com/aliyun/alibabacloud-gdb-go-sdk/gdbclient/internal/pool"
"go.uber.org/zap"
)
func SetLogger(logger *zap.Logger) {
internal.Logger = logger
}
//---------------------- Gdb baseClient ---------------------//
// transaction ops
const (
_OPEN = "g.tx().open()"
_COMMIT = "g.tx().commit()"
_ROLLBACK = "g.tx().rollback()"
)
// client shell for submit serial API
type ClientShell interface {
SubmitScript(gremlin string) ([]Result, error)
SubmitScriptBound(gremlin string, bindings map[string]interface{}) ([]Result, error)
SubmitScriptOptions(gremlin string, options *graph.RequestOptions) ([]Result, error)
SubmitScriptAsync(gremlin string) (ResultSetFuture, error)
SubmitScriptBoundAsync(gremlin string, bindings map[string]interface{}) (ResultSetFuture, error)
SubmitScriptOptionsAsync(gremlin string, options *graph.RequestOptions) (ResultSetFuture, error)
}
// session client support batch submit
type SessionClient interface {
BatchSubmit(func(ClientShell) error) error
Close()
}
// session-less client support submit in sync or async, all in auto-transaction
type Client interface {
ClientShell
Close()
}
type baseClient struct {
setting *Settings
sessionId string
session bool
connPool *pool.ConnPool
}
func NewClient(settings *Settings) Client {
settings.init()
client := &baseClient{setting: settings, session: false}
client.connPool = pool.NewConnPool(settings.getOpts())
internal.Logger.Info("new client", zap.String("server", client.String()), zap.Bool("session", client.session), zap.Time("createTime", time.Now()))
return client
}
func NewSessionClient(sessionId string, settings *Settings) SessionClient {
settings.init()
client := &baseClient{setting: settings, session: true, sessionId: sessionId}
client.connPool = pool.NewConnPool(settings.getSessionOpts())
internal.Logger.Info("new client", zap.String("server", client.String()), zap.Bool("session", client.session), zap.Time("createTime", time.Now()))
return client
}
func (c *baseClient) String() string {
return fmt.Sprintf("Gdb<%s>", c.getEndpoint())
}
func (c *baseClient) Close() {
if c.session {
c.closeSession()
}
c.connPool.Close()
internal.Logger.Info("close client", zap.Bool("session", c.session), zap.Time("time", time.Now()))
}
func (c *baseClient) getEndpoint() string {
return c.setting.Host + ":" + strconv.FormatInt(int64(c.setting.Port), 10)
}
func (c *baseClient) SubmitScript(gremlin string) ([]Result, error) {
return c.SubmitScriptBound(gremlin, nil)
}
func (c *baseClient) SubmitScriptBound(gremlin string, bindings map[string]interface{}) ([]Result, error) {
options := graph.NewRequestOptionsWithBindings(bindings)
return c.SubmitScriptOptions(gremlin, options)
}
func (c *baseClient) SubmitScriptOptions(gremlin string, options *graph.RequestOptions) ([]Result, error) {
if future, err := c.SubmitScriptOptionsAsync(gremlin, options); err != nil {
return nil, err
} else {
timeout_ms := options.GetTimeout()
if timeout_ms == 0 {
// default server timeout is 30s
timeout_ms = 30000
}
if result, timeout, err := future.GetResultsOrTimeout(time.Millisecond * time.Duration(timeout_ms+100)); timeout {
return nil, errors.New("request timeout")
} else {
return result, err
}
}
}
func (c *baseClient) SubmitScriptAsync(gremlin string) (ResultSetFuture, error) {
return c.SubmitScriptBoundAsync(gremlin, nil)
}
func (c *baseClient) SubmitScriptBoundAsync(gremlin string, bindings map[string]interface{}) (ResultSetFuture, error) {
options := graph.NewRequestOptionsWithBindings(bindings)
return c.SubmitScriptOptionsAsync(gremlin, options)
}
func (c *baseClient) SubmitScriptOptionsAsync(gremlin string, options *graph.RequestOptions) (ResultSetFuture, error) {
// set session args if session mode
if c.session {
if options == nil {
options = graph.NewRequestOptionsWithBindings(nil)
}
options.AddArgs(graph.ARGS_SESSION, c.sessionId)
options.AddArgs(graph.ARGS_MANAGE_TRANSACTION, c.setting.IsManageTransaction)
}
request, err := graphsonv3.MakeRequestWithOptions(gremlin, options)
if err != nil {
return nil, err
}
respFuture, err := c.requestAsync(request)
if err != nil {
return nil, err
}
return NewResultSetFuture(respFuture), nil
}
// session batch submit with 'SubmitScript' serial , must check return errors
func (c *baseClient) BatchSubmit(batchSubmit func(ClientShell) error) error {
if !c.session {
return errors.New("batch submit is not allowed in non-session client")
}
if err := c.transaction(_OPEN); err != nil {
return err
}
err := batchSubmit(c)
if err == nil {
err = c.transaction(_COMMIT)
}
// rollback submit errors, include batch submit and commit
if err != nil {
err2 := c.transaction(_ROLLBACK)
if err2 != nil {
internal.Logger.Error("unstable transaction status as rollback failed", zap.Error(err), zap.Time("time", time.Now()))
return err2
}
}
return err
}
func (c *baseClient) closeSession() {
request := graphsonv3.MakeRequestCloseSession(c.sessionId)
respFuture, err := c.requestAsync(request)
if err != nil {
internal.Logger.Warn("fail to close session", zap.Error(err), zap.Time("time", time.Now()))
return
}
// NOTICE: wait to get response of session close request
if resp, timeout := respFuture.GetOrTimeout(2 * time.Second); timeout {
internal.Logger.Warn("response timeout for close session", zap.Time("time", time.Now()))
} else {
if resp.Code != graphsonv3.RESPONSE_STATUS_NO_CONTENT && resp.Code != graphsonv3.RESPONSE_STATUS_SUCCESS {
internal.Logger.Warn("response error for close session", zap.Error(resp.Data.(error)), zap.Time("time", time.Now()))
}
}
}
func (c *baseClient) transaction(ops string) error {
options := graph.NewRequestOptionsWithBindings(nil)
options.AddArgs(graph.ARGS_SESSION, c.sessionId)
options.AddArgs(graph.ARGS_MANAGE_TRANSACTION, c.setting.IsManageTransaction)
request, err := graphsonv3.MakeRequestWithOptions(ops, options)
if err != nil {
return err
}
respFuture, err := c.requestAsync(request)
if err != nil {
return err
}
// just check response code instead of un-json Data, transaction return 'null'...
resp := respFuture.Get()
if err, ok := resp.Data.(error); ok {
return err
}
return nil
}
func (c *baseClient) requestAsync(request *graphsonv3.Request) (*graphsonv3.ResponseFuture, error) {
conn, err := c.connPool.Get()
if err != nil {
internal.Logger.Error("request connect failed",
zap.Time("time", time.Now()),
zap.Error(err))
return nil, err
}
bindingsStr, _ := json.Marshal(request.Args[graph.ARGS_BINDINGS])
// send request to connection, and return future
internal.Logger.Info("submit script",
zap.Time("time", time.Now()),
zap.Uintptr("conn", uintptr(unsafe.Pointer(conn))),
zap.String("dsl", request.Args[graph.ARGS_GREMLIN].(string)),
zap.String("bindings", string(bindingsStr)),
zap.String("processor", request.Processor))
f, err := conn.SubmitRequestAsync(request)
if err != nil {
// return connection to pool if request is not pending
c.connPool.Put(conn)
internal.Logger.Warn("submit script failed",
zap.Time("time", time.Now()),
zap.Uintptr("conn", uintptr(unsafe.Pointer(conn))),
zap.Error(err),
zap.String("dsl", request.Args[graph.ARGS_GREMLIN].(string)))
}
return f, err
}