func()

in gdbclient/internal/pool/conn.go [337:393]


func (cn *ConnWebSocket) SubmitRequestAsync(request *graphsonv3.Request) (*graphsonv3.ResponseFuture, error) {
	if cn.brokenOrClosed() {
		internal.Logger.Error("request send close", zapPtr(cn), zap.Time("time", time.Now()), zap.Error(errConnClosed))
		return nil, errConnClosed
	}
	if atomic.LoadInt32(&cn.pendingSize) >= cn.maxInProcess {
		internal.Logger.Error("conn", zap.Stringer("cn", cn))
		internal.Logger.Error("request send over", zapPtr(cn), zap.Time("time", time.Now()), zap.Error(errOverQueue))
		return nil, errOverQueue
	}

	future := graphsonv3.NewResponseFuture(request, cn.returnToPool)
	// serializer request
	outBuf, err := graphsonv3.SerializerRequest(request)
	if err != nil {
		response := graphsonv3.NewErrorResponse(request.RequestID,
			graphsonv3.RESPONSE_STATUS_REQUEST_ERROR_SERIALIZATION, err)
		future.Complete(response)
		internal.Logger.Error("request send serializer", zapPtr(cn), zap.Time("time", time.Now()), zap.Error(err))
		return future, nil
	}

	// check pending or not
	if _, ok := cn.pendingResponses.LoadOrStore(request.RequestID, future); ok {
		// rewrite the same 'requestId' with pending requests
		if request.Op != graph.OPS_AUTHENTICATION {
			response := graphsonv3.NewErrorResponse(request.RequestID,
				graphsonv3.RESPONSE_STATUS_REQUEST_ERROR_DELIVER,
				errDuplicateId)
			internal.Logger.Error("request duplicate", zap.Time("time", time.Now()), zap.String("id ", request.RequestID))
			future.Complete(response)
			return future, nil
		}
	} else {
		atomic.AddInt32(&cn.pendingSize, 1)
	}

	// send request to server
	cn.wLock.Lock()
	if err = cn.netConn.SetWriteDeadline(cn.deadline(cn.opt.WriteTimeout)); err == nil {
		err = cn.netConn.WriteMessage(websocket.BinaryMessage, outBuf)
	}
	cn.wLock.Unlock()

	// check network write status and write back notifier to writer
	if err != nil {
		response := graphsonv3.NewErrorResponse(request.RequestID,
			graphsonv3.RESPONSE_STATUS_REQUEST_ERROR_DELIVER, err)

		cn.pendingResponses.Delete(request.RequestID)
		atomic.AddInt32(&cn.pendingSize, -1)

		future.Complete(response)
		internal.Logger.Error("request send io", zapPtr(cn), zap.Time("time", time.Now()), zap.Error(err))
	}
	return future, nil
}