kafka-connect-bigtable-sink/doc/performance/terraform/kubernetes.tf (334 lines of code) (raw):
locals {
kubernetes_kafka_connect_service_key = base64decode(google_service_account_key.kubernetes_kafka_connect_key.private_key)
kafka_namespace = kubernetes_namespace.kafka.metadata[0].name
kafka_connect_crd_name = "my-connect-cluster"
kafka_secret_service_key_secret_field = "service_key"
kafka_service_key_email = jsondecode(local.kubernetes_kafka_connect_service_key)["client_email"]
# Must match contents of ../kubernetes/otlp
otel_endpoint_url = "http://opentelemetry-collector.opentelemetry.svc.cluster.local:4317"
otel_namespace = kubernetes_namespace.otel.metadata[0].name
kafka_credentials_secret = kubernetes_secret.kafka_credentials.metadata[0].name
kafka_image = "${local.kafka_connect_docker_registry_url}:latest"
# Test parameters, adjust them here.
kafka_vcpus = 3
kafka_ram_gbs = 3
kafka_partitions = 60
kafka_connect_nodes = 3
kafka_connect_vcpus = "2.66"
kafka_connect_ram = "6Gi"
kafka_connect_version = "3.8.1"
load_generation_messages_per_second = "3000"
load_generation_seconds = "600"
load_generation_field_value_size = "50"
load_generation_column_families = "1"
load_generation_columns_per_family = "2"
load_generation_vcpus = "2"
load_generation_ram = "8Gi"
connector_batch_size = "1000"
}
resource "kubernetes_namespace" "kafka" {
metadata {
name = "kafka"
}
}
resource "kubernetes_secret" "kafka_credentials" {
metadata {
name = "bigtable-kafka-connect-credentials"
namespace = local.kafka_namespace
}
data = {
username = local.kafka_service_key_email
(local.kafka_secret_service_key_secret_field) = base64encode(local.kubernetes_kafka_connect_service_key)
}
type = "generic"
}
resource "kubernetes_service_account" "kafka_connect_service_account" {
metadata {
name = "${local.kafka_connect_crd_name}-connect"
namespace = local.kafka_namespace
}
# To avoid clashing with Strimzi Operator.
lifecycle {
ignore_changes = [metadata["labels"], automount_service_account_token]
}
}
resource "google_project_iam_member" "kafka_connect_connectors_bigtable_permission" {
project = local.project
role = "roles/bigtable.admin"
member = "principal://iam.googleapis.com/projects/${data.google_project.project.number}/locations/global/workloadIdentityPools/${local.workload_identity_pool}/subject/ns/${local.kafka_namespace}/sa/${kubernetes_service_account.kafka_connect_service_account.metadata[0].name}"
}
resource "kubernetes_namespace" "otel" {
metadata {
name = "opentelemetry"
}
}
resource "kubernetes_service_account" "otel_service_account" {
metadata {
name = "opentelemetry-collector"
namespace = local.otel_namespace
}
# To avoid clashing with OTEL manifest.
lifecycle {
ignore_changes = [metadata["labels"], automount_service_account_token]
}
}
# https://cloud.google.com/stackdriver/docs/instrumentation/opentelemetry-collector-gke#configure_permissions_for_the_collector
resource "google_project_iam_member" "otel_permissions" {
for_each = toset(["roles/logging.logWriter", "roles/monitoring.metricWriter", "roles/cloudtrace.agent"])
project = local.project
role = each.value
member = "principal://iam.googleapis.com/projects/${data.google_project.project.number}/locations/global/workloadIdentityPools/${local.workload_identity_pool}/subject/ns/${local.otel_namespace}/sa/${kubernetes_service_account.otel_service_account.metadata[0].name}"
}
output "kubernetes_kafka_namespace" {
value = local.kafka_namespace
}
output "kafka_connect_manifest" {
value = yamlencode({
"apiVersion" = "kafka.strimzi.io/v1beta2"
"kind" = "KafkaConnect"
"metadata" = {
"annotations" = {
# So that we can configure the connector using the operator's custom resources.
"strimzi.io/use-connector-resources" = "true"
}
"name" = local.kafka_connect_crd_name
"namespace" = local.kafka_namespace
}
"spec" = {
"authentication" = {
"passwordSecret" = {
"password" = local.kafka_secret_service_key_secret_field
"secretName" = local.kafka_credentials_secret
}
"type" = "plain"
"username" = local.kafka_service_key_email
}
"bootstrapServers" = local.kafka_url
"config" = {
"config.storage.replication.factor" = -1
"config.storage.topic" = "connect-cluster-configs"
"fetch.min.bytes" = 1
"group.id" = "connect-cluster"
"key.converter" = "org.apache.kafka.connect.json.JsonConverter"
"key.converter.schemas.enable" = false
"offset.storage.replication.factor" = -1
"offset.storage.topic" = "connect-cluster-offsets"
"status.storage.replication.factor" = -1
"status.storage.topic" = "connect-cluster-status"
"value.converter" = "org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable" = true
}
"image" = local.kafka_image
"logging" = {
"loggers" = {
"connect.root.logger.level" = "INFO"
"log4j.logger.com.google.cloud.kafka.connect.bigtable" = "INFO"
"log4j.logger.io.opentelemetry" = "TRACE"
}
"type" = "inline"
}
"metricsConfig" = {
"type" = "jmxPrometheusExporter"
"valueFrom" = {
"configMapKeyRef" = {
# They must match value from ../kubernetes/kafka-connec-metrics.config.yaml
"name" = "connect-metrics"
"key" = "metrics-config.yml"
}
}
}
"replicas" = local.kafka_connect_nodes
"resources" = {
"limits" = {
"cpu" = local.kafka_connect_vcpus
"memory" = local.kafka_connect_ram
}
}
"template" = {
"connectContainer" = {
"env" = [
{
"name" = "OTEL_SERVICE_NAME"
"value" = "kafka-connect"
},
{
"name" = "OTEL_EXPORTER_OTLP_ENDPOINT"
"value" = local.otel_endpoint_url
},
# Set up GlobalOpenTelemetry in java code
{
"name" = "OTEL_JAVA_GLOBAL_AUTOCONFIGURE_ENABLED"
"value" = "true"
},
{
"name" = "OTEL_EXPORTER_OTLP_PROTOCOL"
"value" = "grpc"
},
{
"name" = "OTEL_TRACES_EXPORTER"
"value" = "otlp"
},
{
"name" = "OTEL_JAVAAGENT_DEBUG"
"value" = "true"
}
]
}
}
"tls" = {
"trustedCertificates" = []
}
"tracing" = {
"type" = "opentelemetry"
}
"version" = local.kafka_connect_version
}
})
sensitive = true
}
output "kafka_connect_connector_manifest" {
value = yamlencode({
"apiVersion" = "kafka.strimzi.io/v1beta2"
"kind" = "KafkaConnector"
"metadata" = {
"labels" = {
"strimzi.io/cluster" = local.kafka_connect_crd_name
}
"name" = "${local.name}-connector"
}
"spec" = {
"class" = "com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector"
"config" = {
"auto.create.column.families" = "false",
"auto.create.tables" = "false",
"connector.class" = "com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector",
"default.column.family" = local.bigtable_default_column_family,
"default.column.qualifier" = "default_column",
"error.mode" = "FAIL",
"gcp.bigtable.instance.id" = google_bigtable_instance.bigtable.name,
"gcp.bigtable.project.id" = local.project,
"insert.mode" = "upsert",
"max.batch.size" = local.connector_batch_size,
"retry.timeout.ms" = "90000",
"row.key.definition" = "",
"row.key.delimiter" = "#",
"table.name.format" = local.bigtable_table_name,
"topics" = local.kafka_topic,
"value.null.mode" = "write"
}
"tasksMax" = local.kafka_partitions
}
})
sensitive = true
}
output "load_generator_manifest" {
value = yamlencode({
"apiVersion" = "v1"
"kind" = "Pod"
"metadata" = {
"name" = "load-generator"
"namespace" = local.kafka_namespace
}
"spec" = {
"containers" = [
{
"args" = ["/usr/bin/generate_load.sh"]
"env" = [
{
"name" = "KAFKA_CONNECT_BOOTSTRAP_SERVERS"
"value" = local.kafka_url
},
{
"name" = "KAFKA_CONNECT_TLS"
"value" = "true"
},
{
"name" = "KAFKA_CONNECT_SASL_USERNAME"
"value" = local.kafka_service_key_email
},
{
"name" = "KAFKA_CONNECT_SASL_PASSWORD_FILE"
"value" = "${local.kafka_credentials_secret}/${local.kafka_secret_service_key_secret_field}"
},
{
"name" = "KAFKA_CONNECT_SASL_MECHANISM"
"value" = "plain"
},
# Load generation args
{
"name" = "THROUGHPUT"
"value" = local.load_generation_messages_per_second
},
{
"name" = "TIMEOUT"
"value" = local.load_generation_seconds
},
{
"name" = "TOPIC"
"value" = local.kafka_topic
},
{
"name" = "FIELD_VALUE_SIZE"
"value" = local.load_generation_field_value_size
},
{
"name" = "COLUMN_FAMILIES"
"value" = local.load_generation_column_families
},
{
"name" = "COLUMNS_PER_FAMILY"
"value" = local.load_generation_columns_per_family
},
# Tracing
{
"name" = "OTEL_SERVICE_NAME"
"value" = "load-generator"
},
{
"name" = "OTEL_EXPORTER_OTLP_ENDPOINT"
"value" = local.otel_endpoint_url
},
]
"image" = local.kafka_image
"imagePullPolicy" = "Always"
"name" = "kafka-load-generator"
"resources" = {
"requests" = {
"cpu" = local.load_generation_vcpus
"memory" = local.load_generation_ram
}
}
"volumeMounts" = [
{
"mountPath" = "/opt/kafka/connect-password/${local.kafka_credentials_secret}"
"name" = local.kafka_credentials_secret
},
]
},
]
"restartPolicy" = "Never"
"volumes" = [
{
"name" = local.kafka_credentials_secret
"secret" = {
"defaultMode" = 292 # 0x124
"secretName" = local.kafka_credentials_secret
}
},
]
}
})
sensitive = true
}
output "kafka_connect_docker_registry_url" {
value = local.kafka_connect_docker_registry_url
}
output "perf_test_config" {
value = {
kafka_vcpus = local.kafka_vcpus
kafka_ram_gbs = local.kafka_ram_gbs
kafka_partitions = local.kafka_partitions
kafka_connect_nodes = local.kafka_connect_nodes
kafka_connect_vcpus = local.kafka_connect_vcpus
kafka_connect_ram = local.kafka_connect_ram
kafka_connect_version = local.kafka_connect_version
load_generation_messages_per_second = local.load_generation_messages_per_second
load_generation_seconds = local.load_generation_seconds
load_generation_field_value_size = local.load_generation_field_value_size
load_generation_column_families = local.load_generation_column_families
load_generation_columns_per_family = local.load_generation_columns_per_family
load_generation_vcpus = local.load_generation_vcpus
load_generation_ram = local.load_generation_ram
connector_batch_size = local.connector_batch_size
}
}