in Sources/Core/Microsoft.StreamProcessing/Operators/GroupedWindow/GroupedWindowTransformer.cs [62:313]
internal static Tuple<Type, string> Generate<TKey, TInput, TState, TOutput, TResult>(
GroupedWindowStreamable<TKey, TInput, TState, TOutput, TResult> stream)
{
Contract.Requires(stream != null);
Contract.Ensures(Contract.Result<Tuple<Type, string>>() == null || typeof(IStreamObserver<Empty, TInput>).GetTypeInfo().IsAssignableFrom(Contract.Result<Tuple<Type, string>>().Item1));
string errorMessages = null;
try
{
string expandedCode;
var template = new GroupedWindowTemplate($"GeneratedGroupedAggregate_{GroupedAggregateSequenceNumber++}");
var keyType = template.keyType = typeof(TKey);
var inputType = template.inputType = typeof(TInput);
var stateType = template.stateType = typeof(TState);
var outputType = template.outputType = typeof(TOutput);
var resultType = template.resultType = typeof(TResult);
template.TResult = resultType.GetCSharpSourceSyntax(); // BUGBUG: need to get any generic parameters needed
template.isUngrouped = (keyType == typeof(Empty));
var inputMessageRepresentation = new ColumnarRepresentation(inputType);
var resultRepresentation = new ColumnarRepresentation(resultType);
var assemblyReferences = new List<Assembly>();
#region Key Selector
var keySelector = stream.KeySelector;
string transformedKeySelectorAsString;
if (keyType.IsAnonymousTypeName())
{
Contract.Assume(keySelector.Body is NewExpression);
var transformedFunction = Extensions.TransformUnaryFunction<TKey, TInput>(keySelector);
var newBody = (NewExpression)transformedFunction.Body;
transformedKeySelectorAsString = string.Join(",", newBody.Arguments);
}
else
{
var transformedFunction = Extensions.TransformUnaryFunction<Empty, TInput>(keySelector).Body;
if (transformedFunction == null) return null;
transformedKeySelectorAsString = transformedFunction.ExpressionToCSharp();
}
template.keySelector = transformedKeySelectorAsString;
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(keySelector));
#endregion
#region Key Comparer and HashCode
var keyComparer = EqualityComparerExpression<TKey>.Default;
template.keyComparerEquals =
(left, right) =>
keyComparer.GetEqualsExpr().Inline(left, right);
template.keyComparerGetHashCode =
(x) =>
keyComparer.GetGetHashCodeExpr().Inline(x);
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(keyComparer.GetEqualsExpr()));
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(keyComparer.GetGetHashCodeExpr()));
#endregion
#region Aggregate functions
var initialStateLambda = stream.Aggregate.InitialState();
if (ConstantExpressionFinder.IsClosedExpression(initialStateLambda))
{
template.initialState = initialStateLambda.Body.ExpressionToCSharp();
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(initialStateLambda));
}
else
{
if (Config.CodegenOptions.SuperStrictColumnar)
{
errorMessages = "Code Generation for GroupedWindow: couldn't inline the initial state lambda!";
throw new InvalidOperationException(errorMessages);
}
else
{
template.useCompiledInitialState = true;
template.initialState = "initialState()";
}
}
var accumulateLambda = stream.Aggregate.Accumulate();
if (ConstantExpressionFinder.IsClosedExpression(accumulateLambda))
{
var accTransformedLambda = Extensions.TransformFunction<TKey, TInput>(accumulateLambda, 2);
template.accumulate = (stateArg, longArg) => accTransformedLambda.Inline(stateArg, longArg);
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(accumulateLambda));
}
else
{
if (Config.CodegenOptions.SuperStrictColumnar)
{
errorMessages = "Code Generation for GroupedWindow: couldn't inline the accumulate lambda!";
throw new InvalidOperationException(errorMessages);
}
else
{
template.useCompiledAccumulate = true;
template.accumulate = (s1, s2) => $"accumulate({s1}, {s2}, batch[i]);";
}
}
var deaccumulateLambda = stream.Aggregate.Deaccumulate();
if (ConstantExpressionFinder.IsClosedExpression(deaccumulateLambda))
{
var deaccumulateTransformedLambda = Extensions.TransformFunction<TKey, TInput>(deaccumulateLambda, 2);
template.deaccumulate = (stateArg, longArg) => deaccumulateTransformedLambda.Inline(stateArg, longArg);
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(deaccumulateLambda));
}
else
{
if (Config.CodegenOptions.SuperStrictColumnar)
{
throw new InvalidOperationException("Code Generation couldn't inline a lambda!");
}
else
{
template.useCompiledDeaccumulate = true;
template.deaccumulate = (s1, s2) => $"deaccumulate({s1}, {s2}, batch[i]);";
}
}
var differenceLambda = stream.Aggregate.Difference();
if (ConstantExpressionFinder.IsClosedExpression(differenceLambda))
{
template.difference = (stateArg1, stateArg2) => differenceLambda.Inline(stateArg1, stateArg2);
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(differenceLambda));
}
else
{
if (Config.CodegenOptions.SuperStrictColumnar)
{
errorMessages = "Code Generation for GroupedWindow: couldn't inline the deaccumulate lambda!";
throw new InvalidOperationException(errorMessages);
}
else
{
template.useCompiledDifference = true;
template.deaccumulate = (s1, s2) => $"difference({s1}, {s2});";
}
}
var computeResultLambda = stream.Aggregate.ComputeResult();
if (ConstantExpressionFinder.IsClosedExpression(computeResultLambda))
{
if (outputType.IsAnonymousType())
{
if (computeResultLambda.Body is NewExpression newExpression)
{
errorMessages = "Code Generation for GroupedWindow: result selector must be a new expression for anonymous types";
throw new NotImplementedException(errorMessages);
}
else
{
template.computeResult = (stateArg) => computeResultLambda.Inline(stateArg);
}
}
else
{
template.computeResult = (stateArg) => computeResultLambda.Inline(stateArg);
}
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(computeResultLambda));
}
else
{
if (Config.CodegenOptions.SuperStrictColumnar)
{
errorMessages = "Code Generation for GroupedWindow: couldn't inline the result selector lambda!";
throw new InvalidOperationException(errorMessages);
}
else
{
template.useCompiledComputeResult = true;
template.computeResult = (stateArg) => $"computeResult({stateArg})";
}
}
#endregion
template.BatchGeneratedFrom_Unit_TInput = Transformer.GetBatchClassName(typeof(Empty), inputType);
template.UnitTInputGenericParameters = string.Empty; // BUGBUG
template.UnitTResultGenericParameters = string.Empty; // BUGBUG
template.inputFields = inputMessageRepresentation.AllFields;
template.outputFields = resultRepresentation.AllFields;
var resultSelector = stream.ResultSelector;
var parameterSubstitutions = new List<Tuple<ParameterExpression, SelectParameterInformation>>(); // dont want the parameters substituted for at all
var projectionResult = SelectTransformer.Transform(resultSelector, parameterSubstitutions, resultRepresentation, true);
if (projectionResult.Error)
{
return null;
}
template.finalResultSelector =
(key, aggregateResult) =>
{
var parameters = new Dictionary<ParameterExpression, string>
{
{ resultSelector.Parameters.ElementAt(0), key }
};
var sb = new System.Text.StringBuilder();
sb.AppendLine("{");
sb.AppendLine($"var {resultSelector.Parameters.ElementAt(1).Name} = {aggregateResult};\n");
if (projectionResult.ProjectionReturningResultInstance != null)
{
sb.AppendLine($"this.batch[_c] = {projectionResult.ProjectionReturningResultInstance.ExpressionToCSharp()};");
}
else
{
foreach (var kv in projectionResult.ComputedFields)
{
var f = kv.Key;
var e = kv.Value;
sb.AppendLine($"this.batch.{f.Name}.col[_c] = {e.ExpressionToCSharpStringWithParameterSubstitution(parameters)};");
}
}
sb.AppendLine("}");
return sb.ToString();
};
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(resultSelector));
expandedCode = template.TransformText();
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(typeof(Empty), typeof(TKey), typeof(TInput), typeof(TState), typeof(TOutput), typeof(FastDictionaryGenerator3)));
assemblyReferences.Add(typeof(IStreamable<,>).GetTypeInfo().Assembly);
assemblyReferences.Add(Transformer.GeneratedStreamMessageAssembly<Empty, TInput>());
assemblyReferences.Add(Transformer.GeneratedStreamMessageAssembly<Empty, TResult>());
assemblyReferences.Add(Transformer.GeneratedMemoryPoolAssembly<Empty, TResult>());
var assembly = Transformer.CompileSourceCode(expandedCode, assemblyReferences, out errorMessages);
var t = assembly.GetType(template.className);
if (t.GetTypeInfo().IsGenericType)
{
var list = typeof(TKey).GetAnonymousTypes();
list.AddRange(typeof(TInput).GetAnonymousTypes());
list.AddRange(typeof(TState).GetAnonymousTypes());
list.AddRange(typeof(TOutput).GetAnonymousTypes());
return Tuple.Create(t.MakeGenericType(list.ToArray()), errorMessages);
}
else
{
return Tuple.Create(t, errorMessages);
}
}
catch
{
if (Config.CodegenOptions.DontFallBackToRowBasedExecution)
{
throw new InvalidOperationException("Code Generation failed when it wasn't supposed to!");
}
return Tuple.Create((Type)null, errorMessages);
}
}