in src/CRA.ClientLibrary/Main/CRAClientLibrary.cs [485:554]
public async Task InitializeVertexAsync(
string vertexName,
string vertexDefinition,
string instanceName,
ConcurrentDictionary<string, IVertex> table,
IVertex vertex, bool paramProvided, object param, bool performActivation = true)
{
// INITIALIZE
if ((VertexBase)vertex != null)
{
((VertexBase)vertex).VertexName = vertexName;
((VertexBase)vertex).ClientLibrary = this;
}
// LATCH CALLBACKS TO POPULATE ENDPOINT TABLE
vertex.OnAddInputEndpoint((name, endpt) => _endpointTableManager.AddEndpoint(vertexName, name, true, false));
vertex.OnAddOutputEndpoint((name, endpt) => _endpointTableManager.AddEndpoint(vertexName, name, false, false));
vertex.OnAddAsyncInputEndpoint((name, endpt) => _endpointTableManager.AddEndpoint(vertexName, name, true, true));
vertex.OnAddAsyncOutputEndpoint((name, endpt) => _endpointTableManager.AddEndpoint(vertexName, name, false, true));
//ADD TO TABLE
if (table != null)
{
table.AddOrUpdate(vertexName, vertex, (procName, oldProc) => { oldProc.Dispose(); return vertex; });
vertex.OnDispose(() =>
{
// Delete all endpoints of the vertex
foreach (var key in vertex.InputEndpoints)
{
_endpointTableManager.DeleteEndpoint(vertexName, key.Key);
}
foreach (var key in vertex.AsyncInputEndpoints)
{
_endpointTableManager.DeleteEndpoint(vertexName, key.Key);
}
foreach (var key in vertex.OutputEndpoints)
{
_endpointTableManager.DeleteEndpoint(vertexName, key.Key);
}
foreach (var key in vertex.AsyncOutputEndpoints)
{
_endpointTableManager.DeleteEndpoint(vertexName, key.Key);
}
IVertex old;
if (!table.TryRemove(vertexName, out old))
{
Trace.TraceError("Unable to remove vertex on disposal");
}
Task.Run(() =>
_vertexManager.DeleteInstanceVertex(
instanceName,
vertexName));
});
}
if (!paramProvided)
{
param = await GetParam(vertexDefinition, vertexName, instanceName);
}
await vertex.InitializeAsync(param);
if (performActivation)
{
// Activate vertex
await ActivateVertexAsync(vertexName, instanceName);
}
}