in Sources/Core/Microsoft.StreamProcessing/Operators/EquiJoin/Basic/EquijoinTransformer.cs [39:286]
internal static Tuple<Type, string> Generate<TKey, TLeft, TRight, TResult>(
BinaryStreamable<TKey, TLeft, TRight, TResult> stream,
Expression<Func<TLeft, TRight, TResult>> selector)
{
Contract.Requires(stream != null);
Contract.Ensures(Contract.Result<Tuple<Type, string>>() == null || typeof(BinaryPipe<TKey, TLeft, TRight, TResult>).GetTypeInfo().IsAssignableFrom(Contract.Result<Tuple<Type, string>>().Item1));
string errorMessages = null;
try
{
var template = new EquiJoinTemplate($"GeneratedEquiJoin_{EquiJoinSequenceNumber++}", typeof(TKey), typeof(TLeft), typeof(TRight), typeof(TResult));
var keyAndLeftGenericParameters = template.tm.GenericTypeVariables(template.keyType, template.leftType).BracketedCommaSeparatedString();
var keyAndRightGenericParameters = template.tm.GenericTypeVariables(template.keyType, template.rightType).BracketedCommaSeparatedString();
template.TKeyTResultGenericParameters = template.tm.GenericTypeVariables(template.keyType, template.resultType).BracketedCommaSeparatedString();
template.genericParameters = template.tm.GenericTypeVariables(template.keyType, template.leftType, template.rightType, template.resultType).BracketedCommaSeparatedString();
template.leftMessageRepresentation = new ColumnarRepresentation(template.leftType);
template.rightMessageRepresentation = new ColumnarRepresentation(template.rightType);
var resultMessageRepresentation = new ColumnarRepresentation(template.resultType);
var batchGeneratedFrom_TKey_TLeft = Transformer.GetBatchClassName(template.keyType, template.leftType);
var batchGeneratedFrom_TKey_TRight = Transformer.GetBatchClassName(template.keyType, template.rightType);
template.BatchGeneratedFrom_TKey_TResult = Transformer.GetBatchClassName(template.keyType, template.resultType);
template.LeftBatchType = batchGeneratedFrom_TKey_TLeft + keyAndLeftGenericParameters;
template.RightBatchType = batchGeneratedFrom_TKey_TRight + keyAndRightGenericParameters;
template.leftFields = template.leftMessageRepresentation.AllFields;
template.rightFields = template.rightMessageRepresentation.AllFields;
template.resultFields = resultMessageRepresentation.AllFields;
template.ActiveEventTypeLeft = template.leftType.GetTypeInfo().IsValueType ? template.TLeft : "Active_Event_Left";
template.ActiveEventTypeRight = template.rightType.GetTypeInfo().IsValueType ? template.TRight : "Active_Event_Right";
#region Key Equals
var keyComparer = stream.Properties.KeyEqualityComparer.GetEqualsExpr();
template.keyComparerEquals =
(left, right) =>
keyComparer.Inline(left, right);
if (template.keyType.IsAnonymousType())
{
template.keyComparerEquals =
(left, right) => $"keyComparerEquals({left}, {right})";
}
#endregion
#region Left Payload Equals
{
var leftPayloadComparer = stream.Left.Properties.PayloadEqualityComparer.GetEqualsExpr();
var newLambda = Extensions.TransformFunction<TKey, TLeft>(leftPayloadComparer, "leftIndex", 0);
template.leftComparerEquals = (left, right) => newLambda.Inline(left, right);
}
#endregion
#region Right Payload Equals
{
var rightPayloadComparer = stream.Right.Properties.PayloadEqualityComparer.GetEqualsExpr();
var newLambda = Extensions.TransformFunction<TKey, TRight>(rightPayloadComparer, "rightIndex", 0);
template.rightComparerEquals = (left, right) => newLambda.Inline(left, right);
}
#endregion
#region Result Selector
{
var leftMessageType = StreamMessageManager.GetStreamMessageType<TKey, TLeft>();
var rightMessageType = StreamMessageManager.GetStreamMessageType<TKey, TRight>();
if (!ConstantExpressionFinder.IsClosedExpression(selector))
{
errorMessages = "result selector is not a closed expression";
throw new InvalidOperationException();
}
#region LeftBatchSelector
{
var leftBatchIndexVariable = selector.Parameters.GenerateFreshVariableName("i");
var parameterSubsitutions = new List<Tuple<ParameterExpression, SelectParameterInformation>>()
{
Tuple.Create(selector.Parameters[0], new SelectParameterInformation() { BatchName = "leftBatch", BatchType = leftMessageType, IndexVariableName = leftBatchIndexVariable, parameterRepresentation = template.leftMessageRepresentation, }),
};
var projectionResult = SelectTransformer.Transform(selector, parameterSubsitutions, resultMessageRepresentation, true);
if (projectionResult.Error)
{
errorMessages = "error while transforming the result selector";
throw new InvalidOperationException();
}
template.leftBatchSelector = (leftBatch, leftIndex, rightEvent) =>
{
var parameterMap = new Dictionary<ParameterExpression, string>
{
{ Expression.Variable(leftMessageType, "leftBatch"), leftBatch },
{ Expression.Variable(typeof(int), leftBatchIndexVariable), leftIndex },
{ selector.Parameters[1], rightEvent }
};
if (projectionResult.ProjectionReturningResultInstance != null)
{
return $"this.output[index] = {projectionResult.ProjectionReturningResultInstance.ExpressionToCSharpStringWithParameterSubstitution(parameterMap)};";
}
else
{
var sb = new System.Text.StringBuilder();
sb.AppendLine("{");
foreach (var kv in projectionResult.ComputedFields)
{
var f = kv.Key;
var e = kv.Value;
if (f.OptimizeString())
{
sb.AppendLine($"this.output.{f.Name}.AddString({e.ExpressionToCSharpStringWithParameterSubstitution(parameterMap)});");
}
else
{
sb.AppendLine($"this.output.{f.Name}.col[index] = {e.ExpressionToCSharpStringWithParameterSubstitution(parameterMap)};");
}
}
sb.AppendLine("}");
return sb.ToString();
}
};
}
#endregion
#region RightBatchSelector
{
var rightBatchIndexVariable = selector.Parameters.GenerateFreshVariableName("j");
var parameterSubsitutions = new List<Tuple<ParameterExpression, SelectParameterInformation>>()
{
Tuple.Create(selector.Parameters[1], new SelectParameterInformation() { BatchName = "rightBatch", BatchType = rightMessageType, IndexVariableName = rightBatchIndexVariable, parameterRepresentation = template.rightMessageRepresentation, }),
};
var projectionResult = SelectTransformer.Transform(selector, parameterSubsitutions, resultMessageRepresentation, true);
if (projectionResult.Error)
{
errorMessages = "error while transforming the result selector";
throw new InvalidOperationException();
}
template.rightBatchSelector = (leftEvent, rightBatch, rightIndex) =>
{
var parameterMap = new Dictionary<ParameterExpression, string>
{
{ selector.Parameters[0], leftEvent },
{ Expression.Variable(rightMessageType, "rightBatch"), rightBatch },
{ Expression.Variable(typeof(int), rightBatchIndexVariable), rightIndex }
};
if (projectionResult.ProjectionReturningResultInstance != null)
{
return $"this.output[index] = {projectionResult.ProjectionReturningResultInstance.ExpressionToCSharpStringWithParameterSubstitution(parameterMap)};";
}
else
{
var sb = new System.Text.StringBuilder();
sb.AppendLine("{");
foreach (var kv in projectionResult.ComputedFields)
{
var f = kv.Key;
var e = kv.Value;
if (f.OptimizeString())
{
sb.AppendLine($"this.output.{f.Name}.AddString({e.ExpressionToCSharpStringWithParameterSubstitution(parameterMap)});");
}
else
{
sb.AppendLine($"this.output.{f.Name}.col[index] = {e.ExpressionToCSharpStringWithParameterSubstitution(parameterMap)};");
}
}
sb.AppendLine("}");
return sb.ToString();
}
};
}
#endregion
#region ActiveSelector
{
var parameterSubsitutions = new List<Tuple<ParameterExpression, SelectParameterInformation>>();
var projectionResult = SelectTransformer.Transform(selector, parameterSubsitutions, resultMessageRepresentation, true);
if (projectionResult.Error)
{
errorMessages = "error while transforming the result selector";
throw new InvalidOperationException();
}
template.activeSelector = (leftEvent, rightEvent) =>
{
var parameterMap = new Dictionary<ParameterExpression, string>
{
{ selector.Parameters[0], leftEvent },
{ selector.Parameters[1], rightEvent }
};
if (projectionResult.ProjectionReturningResultInstance != null)
{
return $"this.output[index] = {projectionResult.ProjectionReturningResultInstance.ExpressionToCSharpStringWithParameterSubstitution(parameterMap)};";
}
else
{
var sb = new System.Text.StringBuilder();
sb.AppendLine("{");
foreach (var kv in projectionResult.ComputedFields)
{
var f = kv.Key;
var e = kv.Value;
if (f.OptimizeString())
{
sb.AppendLine($"this.output.{f.Name}.AddString({e.ExpressionToCSharpStringWithParameterSubstitution(parameterMap)});");
}
else
{
sb.AppendLine($"this.output.{f.Name}.col[index] = {e.ExpressionToCSharpStringWithParameterSubstitution(parameterMap)};");
}
}
sb.AppendLine("}");
return sb.ToString();
}
};
}
#endregion
}
#endregion
if (stream.Left.Properties.IsIntervalFree && stream.Right.Properties.IsConstantDuration)
{
template.endPointHeap = "EndPointQueue";
}
else if (stream.Right.Properties.IsIntervalFree && stream.Left.Properties.IsConstantDuration)
{
template.endPointHeap = "EndPointQueue";
}
else if (stream.Left.Properties.IsConstantDuration && stream.Right.Properties.IsConstantDuration &&
stream.Left.Properties.ConstantDurationLength == stream.Right.Properties.ConstantDurationLength)
{
template.endPointHeap = "EndPointQueue";
}
else
{
template.endPointHeap = "EndPointHeap";
}
return template.Generate<TKey, TLeft, TRight, TResult>(selector);
}
catch
{
if (Config.CodegenOptions.DontFallBackToRowBasedExecution)
{
throw new InvalidOperationException("Code Generation failed when it wasn't supposed to!");
}
return Tuple.Create((Type)null, errorMessages);
}
}