modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.Server.cs (93 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Apache.Ignite.Internal; using System; using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; using Buffers; using Compute.Executor; using Proto; using Proto.MsgPack; /// <summary> /// Server -> client request handling logic. /// </summary> internal sealed partial class ClientSocket { private static async Task HandleServerOpInnerAsync( ServerOp op, PooledBuffer request, PooledArrayBuffer response, ClientSocket socket) { switch (op) { case ServerOp.Ping: // No-op. break; case ServerOp.ComputeJobExec: IgniteApiAccessor api = await socket._config.ApiTask.ConfigureAwait(false); await ComputeJobExecutor.ExecuteJobAsync(request, response, api).ConfigureAwait(false); break; case ServerOp.ComputeJobCancel: // TODO IGNITE-25153: Add cancellation support for platform jobs. response.MessageWriter.Write(false); break; case ServerOp.DeploymentUnitsUndeploy: response.MessageWriter.Write(false); break; default: throw new InvalidOperationException("Unsupported ServerOp code: " + op); } } private bool QueueServerOp(long requestId, ServerOp op, PooledBuffer request) { ThreadPool.QueueUserWorkItem<(ClientSocket Socket, PooledBuffer Buf, long RequestId, ServerOp Op)>( callBack: static state => { // Ignore the returned task. _ = state.Socket.HandleServerOpAsync(state.Buf, state.RequestId, state.Op); }, state: (this, request, requestId, op), preferLocal: true); return true; } [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Thread root.")] private async Task HandleServerOpAsync(PooledBuffer buf, long requestId, ServerOp op) { _logger.LogServerOpTrace(requestId, (int)op, op, ConnectionContext.ClusterNode.Address); using var request = buf; using var response = ProtoCommon.GetMessageWriter(); try { await HandleServerOpInnerAsync(op, request, response, this).ConfigureAwait(false); await SendServerOpResponseAsync(requestId, response).ConfigureAwait(false); } catch (Exception serverOpEx) { try { using var errResponse = ProtoCommon.GetMessageWriter(); WriteError(errResponse.MessageWriter, serverOpEx); await SendServerOpResponseAsync(requestId, errResponse).ConfigureAwait(false); } catch (Exception resultSendEx) { var aggregateEx = new AggregateException(serverOpEx, resultSendEx); _logger.LogServerOpResponseError(aggregateEx, requestId, serverOpEx.Message); } } static void WriteError(MsgPackWriter w, Exception e) { var igniteEx = e as IgniteException; Guid traceId = igniteEx?.TraceId ?? Guid.NewGuid(); int code = igniteEx?.ErrorCode ?? ErrorGroups.Compute.ComputeJobFailed; string className = e.GetType().ToString(); string message = e.Message; string? stackTrace = e.StackTrace; w.Write((int)ServerOpResponseFlags.Error); w.Write(traceId); w.Write(code); w.Write(className); w.Write(message); w.Write(stackTrace); w.Write(0); // Extensions count. } } private async Task SendServerOpResponseAsync( long requestId, PooledArrayBuffer? response) => await SendRequestAsync(response, ClientOp.ServerOpResponse, requestId).ConfigureAwait(false); }