in src/CRA.ClientLibrary/Main/CRAClientLibrary.cs [620:760]
public async Task DeleteConnectionInfoAsync(string fromVertexName, string fromEndpoint, string toVertexName, string toEndpoint)
=> await _connectionTableManager.DeleteConnection(fromVertexName, fromEndpoint, toVertexName, toEndpoint);
/// <summary>
/// Delete connection info from metadata table
/// </summary>
/// <param name="connInfo">Connection info as a struct</param>
public async Task DeleteConnectionInfoAsync(ConnectionInfo connInfo)
=> await _connectionTableManager.DeleteConnection(connInfo.FromVertex, connInfo.FromEndpoint, connInfo.ToVertex, connInfo.ToEndpoint);
public async Task<CRAErrorCode> ConnectAsync(
string fromVertexName,
string fromEndpoint,
string toVertexName,
string toEndpoint)
=> await ConnectAsync(
fromVertexName,
fromEndpoint,
toVertexName,
toEndpoint,
ConnectionInitiator.FromSide);
public async Task<CRAErrorCode> ConnectAsync(
string fromVertexName,
string fromEndpoint,
string toVertexName,
string toEndpoint,
ConnectionInitiator direction,
bool logicalOnly = false, bool nonSharded = false, bool verifyVertices = true)
{
// Tell from vertex to establish connection
// Send request to CRA instance
if (verifyVertices)
{
// Check that vertex and endpoints are valid and existing
if (!await _vertexManager.ExistsVertex(fromVertexName)
|| !await _vertexManager.ExistsVertex(toVertexName))
{
if (nonSharded)
return CRAErrorCode.VertexNotFound;
// Check for sharded vertices
List<int> fromVertexShards, toVertexShards;
if ((fromVertexShards = await _vertexManager.ExistsShardedVertex(fromVertexName)).Count == 0)
{ return CRAErrorCode.VertexNotFound; }
if ((toVertexShards = await _vertexManager.ExistsShardedVertex(toVertexName)).Count == 0)
{ return CRAErrorCode.VertexNotFound; }
return ConnectSharded(fromVertexName, fromVertexShards, fromEndpoint, toVertexName, toVertexShards, toEndpoint, direction);
}
}
// Make the connection information stable
await _connectionTableManager.AddConnection(fromVertexName, fromEndpoint, toVertexName, toEndpoint);
if (logicalOnly)
return CRAErrorCode.Success;
// We now try best-effort to tell the CRA instance of this connection
var result = CRAErrorCode.Success;
VertexInfo? _row;
var vertexInfoProvider = _vertexManager.VertexInfoProvider;
try
{
// Get instance for source vertex
_row = await (direction == ConnectionInitiator.FromSide
? vertexInfoProvider.GetRowForActiveVertex(fromVertexName)
: vertexInfoProvider.GetRowForActiveVertex(toVertexName));
}
catch
{
Trace.TraceInformation("Unable to find active instance with vertex. On vertex activation, the connection should be completed automatically.");
return result;
}
try
{
if (_localWorker != null)
{
if (_localWorker.InstanceName == _row.Value.InstanceName)
{
return await _localWorker.Connect_InitiatorSide(
fromVertexName,
fromEndpoint,
toVertexName,
toEndpoint,
direction == ConnectionInitiator.ToSide);
}
}
// Send request to CRA instance
TcpClient client = null;
// Get address and port for instance, using row with vertex = ""
var row = (await _vertexManager.GetRowForInstance(_row.Value.InstanceName)).Value;
// Get a stream connection from the pool if available
Stream stream;
if (!TryGetSenderStreamFromPool(
row.Address,
row.Port.ToString(),
out stream))
{
client = new TcpClient();
client.NoDelay = true;
await client.ConnectAsync(row.Address, row.Port, _tcpConnectTimeoutMs);
stream = client.GetStream();
if (SecureStreamConnectionDescriptor != null)
stream = SecureStreamConnectionDescriptor.CreateSecureClient(stream, _row.Value.InstanceName);
}
if (direction == ConnectionInitiator.FromSide)
{ stream.WriteInt32((int)CRATaskMessageType.CONNECT_VERTEX_INITIATOR); }
else
{ stream.WriteInt32((int)CRATaskMessageType.CONNECT_VERTEX_INITIATOR_REVERSE); }
stream.WriteByteArray(Encoding.UTF8.GetBytes(fromVertexName));
stream.WriteByteArray(Encoding.UTF8.GetBytes(fromEndpoint));
stream.WriteByteArray(Encoding.UTF8.GetBytes(toVertexName));
stream.WriteByteArray(Encoding.UTF8.GetBytes(toEndpoint));
result = (CRAErrorCode)stream.ReadInt32();
if (result != 0)
{ Trace.TraceInformation("Connection was logically established. However, the client received an error code from the connection-initiating CRA instance: " + result); }
else
{ TryAddSenderStreamToPool(row.Address, row.Port.ToString(), stream); }
}
catch (Exception e)
{
Trace.TraceInformation("Exception: " + e.ToString() + "\nPossible reason: The connection-initiating CRA instance appears to be down or could not be found. Restart it and this connection will be completed automatically");
}
return result;
}