src/Cli/func/Kubernetes/KEDA/V2/KedaV2Resource.cs (132 lines of code) (raw):
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.
using System.Globalization;
using Azure.Functions.Cli.Kubernetes.KEDA.Models;
using Azure.Functions.Cli.Kubernetes.KEDA.V2.Models;
using Azure.Functions.Cli.Kubernetes.Models;
using Azure.Functions.Cli.Kubernetes.Models.Kubernetes;
using Newtonsoft.Json.Linq;
namespace Azure.Functions.Cli.Kubernetes.KEDA.V2
{
public class KedaV2Resource : KedaResourceBase
{
public override IKubernetesResource GetKubernetesResource(
string name,
string @namespace,
TriggersPayload triggers,
DeploymentV1Apps deployment,
int? pollingInterval,
int? cooldownPeriod,
int? minReplicas,
int? maxReplicas)
{
return new ScaledObjectKedaV2
{
Metadata = new ObjectMetadataV1
{
Name = name,
Namespace = @namespace,
Labels = new Dictionary<string, string>()
},
Spec = new ScaledObjectSpecV1Alpha1
{
ScaleTargetRef = new ScaledObjectScaleTargetRefV1Alpha1
{
Name = deployment.Metadata.Name
},
PollingInterval = pollingInterval,
CooldownPeriod = cooldownPeriod,
MinReplicaCount = minReplicas,
MaxReplicaCount = maxReplicas,
Triggers = triggers
.FunctionsJson
.Select(kv => kv.Value)
.Where(v => v["bindings"] != null)
.Select(b => b["bindings"])
.SelectMany(i => i)
.Where(b => b?["type"] != null)
.Where(b => b["type"].ToString().IndexOf("Trigger", StringComparison.OrdinalIgnoreCase) != -1)
.Where(b => b["type"].ToString().IndexOf("httpTrigger", StringComparison.OrdinalIgnoreCase) == -1)
.GroupBy(b => IsDurable(b)) // Multiple durable triggers map to a single scaler
.SelectMany(group => group.Key ? GetDurableScalar(triggers.HostJson) : group.Select(GetStandardScalar))
}
};
}
private static bool IsDurable(JToken trigger) =>
trigger["type"].ToString().Equals("orchestrationTrigger", StringComparison.OrdinalIgnoreCase) ||
trigger["type"].ToString().Equals("activityTrigger", StringComparison.OrdinalIgnoreCase) ||
trigger["type"].ToString().Equals("entityTrigger", StringComparison.OrdinalIgnoreCase);
private static IEnumerable<ScaledObjectTriggerV1Alpha1> GetDurableScalar(JObject hostJson)
{
// Reference: https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-bindings#durable-functions-2-0-host-json
DurableTaskConfig durableTaskConfig = hostJson.SelectToken("extensions.durableTask")?.ToObject<DurableTaskConfig>();
string storageType = durableTaskConfig?.StorageProvider?["type"]?.ToString();
// Custom storage types are supported starting in Durable Functions v2.4.2
if (string.Equals(storageType, "MicrosoftSQL", StringComparison.OrdinalIgnoreCase) ||
string.Equals(storageType, "mssql", StringComparison.OrdinalIgnoreCase))
{
// By default, max 10 orchestrations and 1 activity per replica
string query = string.Format(
CultureInfo.InvariantCulture,
"SELECT dt.GetScaleRecommendation({0}, {1})",
durableTaskConfig.MaxConcurrentOrchestratorFunctions,
durableTaskConfig.MaxConcurrentActivityFunctions);
yield return new ScaledObjectTriggerV1Alpha1
{
// MSSQL scaler reference: https://keda.sh/docs/2.2/scalers/mssql/
Type = "mssql",
Metadata = new Dictionary<string, string>
{
// Durable SQL scaling: https://microsoft.github.io/durabletask-mssql/#/scaling?id=worker-auto-scale
["query"] = query,
["targetValue"] = "1",
["connectionStringFromEnv"] = durableTaskConfig.StorageProvider["connectionStringName"]?.ToString(),
}
};
}
else
{
// TODO: Support for the Azure Storage and Netherite backends
}
}
private ScaledObjectTriggerV1Alpha1 GetStandardScalar(JToken binding)
{
return new ScaledObjectTriggerV1Alpha1
{
Type = GetKedaTriggerType(binding["type"]?.ToString()),
Metadata = PopulateMetadataDictionary(binding)
};
}
public override IDictionary<string, string> PopulateMetadataDictionary(JToken t)
{
const string ConnectionField = "connection";
const string ConnectionFromEnvField = "connectionFromEnv";
IDictionary<string, string> metadata = t.ToObject<Dictionary<string, JToken>>()
.Where(i => i.Value.Type == JTokenType.String)
.ToDictionary(k => k.Key, v => v.Value.ToString());
var triggerType = t["type"].ToString().ToLower();
switch (triggerType)
{
case TriggerTypes.AzureBlobStorage:
case TriggerTypes.AzureStorageQueue:
metadata[ConnectionFromEnvField] = metadata.ContainsKey(ConnectionField) ? metadata[ConnectionField] : "AzureWebJobsStorage";
metadata.Remove(ConnectionField);
break;
case TriggerTypes.AzureServiceBus:
metadata[ConnectionFromEnvField] = metadata.ContainsKey(ConnectionField) ? metadata[ConnectionField] : "AzureWebJobsServiceBus";
metadata.Remove(ConnectionField);
break;
case TriggerTypes.AzureEventHubs:
metadata[ConnectionFromEnvField] = metadata[ConnectionField];
metadata.Remove(ConnectionField);
break;
case TriggerTypes.Kafka:
metadata["bootstrapServers"] = metadata["brokerList"];
metadata.Remove("brokerList");
metadata.Remove("protocol");
metadata.Remove("authenticationMode");
break;
case TriggerTypes.RabbitMq:
metadata["hostFromEnv"] = metadata["connectionStringSetting"];
metadata.Remove("connectionStringSetting");
break;
}
// Clean-up for all triggers
metadata.Remove("type");
metadata.Remove("name");
return metadata;
}
}
}