in src/CRA.ClientLibrary/Main/CRAClientLibrary.cs [196:295]
public async Task<CRAErrorCode> InstantiateVertexAsync(
string instanceName,
string vertexName,
string vertexDefinition,
object vertexParameter)
=> await InstantiateVertexAsync(
instanceName,
vertexName,
vertexDefinition,
vertexParameter,
false);
/// <summary>
///
/// </summary>
/// <param name="instanceName"></param>
/// <param name="vertexName"></param>
/// <param name="vertexDefinition"></param>
/// <param name="vertexParameter"></param>
/// <param name="sharded"></param>
/// <param name="logicalOnly"></param>
/// <param name="sideLoad"></param>
/// <returns></returns>
public async Task<CRAErrorCode> InstantiateVertexAsync(
string instanceName,
string vertexName,
string vertexDefinition,
object vertexParameter,
bool sharded = false, bool logicalOnly = false, bool sideLoad = false, bool activate = false)
{
string vertexCreateAction = null;
string blobName = vertexName + "-" + instanceName;
if (!sideLoad)
{
vertexCreateAction = (await _vertexManager.VertexInfoProvider.GetRowForVertexDefinition(vertexDefinition)).Value.VertexCreateAction;
using (var blobStream = await _blobStorage.GetWriteStream(vertexDefinition + "/" + blobName))
{
byte[] parameterBytes = Encoding.UTF8.GetBytes(
SerializationHelper.SerializeObject(vertexParameter));
blobStream.WriteByteArray(parameterBytes);
}
}
var newInfo = new VertexInfo(
instanceName: instanceName,
address: "",
port: 0,
vertexName: vertexName,
vertexDefinition: vertexDefinition,
vertexCreateAction: vertexCreateAction,
vertexParameter: blobName,
isActive: activate,
isSharded: sharded);
await _vertexManager.VertexInfoProvider.InsertOrReplace(newInfo);
CRAErrorCode result = CRAErrorCode.Success;
if (logicalOnly)
return result;
// Send request to CRA instance
VertexInfo instanceRow;
try
{
instanceRow = (await _vertexManager.GetRowForInstance(instanceName)).Value;
// Get a stream connection from the pool if available
Stream stream;
if (!TryGetSenderStreamFromPool(instanceRow.Address, instanceRow.Port.ToString(), out stream))
{
var client = new TcpClient();
client.NoDelay = true;
await client.ConnectAsync(instanceRow.Address, instanceRow.Port, _tcpConnectTimeoutMs);
stream = client.GetStream();
if (SecureStreamConnectionDescriptor != null)
stream = SecureStreamConnectionDescriptor.CreateSecureClient(stream, instanceName);
}
stream.WriteInt32((int)CRATaskMessageType.LOAD_VERTEX);
stream.WriteByteArray(Encoding.UTF8.GetBytes(vertexName));
stream.WriteByteArray(Encoding.UTF8.GetBytes(vertexDefinition));
result = (CRAErrorCode)stream.ReadInt32();
if (result != 0)
{
Trace.TraceInformation("Vertex was logically loaded. However, we received an error code from the hosting CRA instance: " + result);
}
// Add/Return stream connection to the pool
TryAddSenderStreamToPool(instanceRow.Address, instanceRow.Port.ToString(), stream);
}
catch
{
Trace.TraceInformation("The CRA instance appears to be down. Restart it and this vertex will be instantiated automatically");
}
return result;
}