apps/flink.go (125 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apps
import (
"context"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
)
type MetricsReceiverFlink struct {
confgenerator.ConfigComponent `yaml:",inline"`
confgenerator.MetricsReceiverShared `yaml:",inline"`
Endpoint string `yaml:"endpoint" validate:"omitempty,url,startswith=http:"`
}
func (MetricsReceiverFlink) Type() string {
return "flink"
}
const defaultFlinkEndpoint = "http://localhost:8081"
func (r MetricsReceiverFlink) Pipelines(_ context.Context) ([]otel.ReceiverPipeline, error) {
if r.Endpoint == "" {
r.Endpoint = defaultFlinkEndpoint
}
return []otel.ReceiverPipeline{{
Receiver: otel.Component{
Type: "flinkmetrics",
Config: map[string]interface{}{
"collection_interval": r.CollectionIntervalString(),
"endpoint": r.Endpoint,
},
},
Processors: map[string][]otel.Component{"metrics": {
otel.NormalizeSums(),
otel.MetricsTransform(
otel.UpdateMetric("flink.jvm.gc.collections.count", otel.RenameLabel("name", "garbage_collector_name")),
otel.UpdateMetric("flink.jvm.gc.collections.time", otel.RenameLabel("name", "garbage_collector_name")),
otel.UpdateMetric("flink.operator.record.count", otel.RenameLabel("name", "operator_name")),
otel.UpdateMetric("flink.operator.watermark.output", otel.RenameLabel("name", "operator_name")),
otel.AddPrefix("workload.googleapis.com"),
),
otel.TransformationMetrics(
otel.FlattenResourceAttribute("host.name", "host_name"),
otel.FlattenResourceAttribute("flink.taskmanager.id", "taskmanager_id"),
otel.FlattenResourceAttribute("flink.job.name", "job_name"),
otel.FlattenResourceAttribute("flink.task.name", "task_name"),
otel.FlattenResourceAttribute("flink.subtask.index", "subtask_index"),
otel.FlattenResourceAttribute("flink.resource.type", "resource_type"),
),
otel.ModifyInstrumentationScope(r.Type(), "1.0"),
}},
}}, nil
}
func init() {
confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverFlink{} })
}
type LoggingProcessorFlink struct {
confgenerator.ConfigComponent `yaml:",inline"`
}
func (LoggingProcessorFlink) Type() string {
return "flink"
}
func (p LoggingProcessorFlink) Components(ctx context.Context, tag string, uid string) []fluentbit.Component {
c := confgenerator.LoggingProcessorParseMultilineRegex{
LoggingProcessorParseRegexComplex: confgenerator.LoggingProcessorParseRegexComplex{
Parsers: []confgenerator.RegexParser{
{
// Standalone session example
// Sample line: 2022-04-22 11:51:35,718 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager connection 668abb5d496646a153262b5896fd935d: Stopping JobMaster for job 'Streaming WordCount' (2538c8dff66c8cf6ec58ad32b149e23f).
// Taskexecutor example
// 2022-04-23 16:13:05,459 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*, retrying in 10000 ms: Could not connect to rpc endpoint under address akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*.
// Client example
// Sample line: 2022-04-22 11:51:32,901 INFO org.apache.flink.client.program.rest.RestClusterClient [] - Submitting job 'Streaming WordCount' (2538c8dff66c8cf6ec58ad32b149e23f).
Regex: `^(?<time>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d+)\s+(?<level>[A-Z]+)\s+(?<source>[^ ]*)(?<message>[\s\S]*)`,
Parser: confgenerator.ParserShared{
TimeKey: "time",
TimeFormat: "%Y-%m-%d %H:%M:%S,%L",
},
},
},
},
Rules: []confgenerator.MultilineRule{
{
StateName: "start_state",
NextState: "cont",
Regex: `^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d+`,
},
{
StateName: "cont",
NextState: "cont",
Regex: `^(?!\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d+)`,
},
},
}.Components(ctx, tag, uid)
// Log levels are just log4j log levels
// https://logging.apache.org/log4j/2.x/log4j-api/apidocs/org/apache/logging/log4j/Level.html
c = append(c,
confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"severity": {
CopyFrom: "jsonPayload.level",
MapValues: map[string]string{
"TRACE": "TRACE",
"DEBUG": "DEBUG",
"INFO": "INFO",
"ERROR": "ERROR",
"WARN": "WARNING",
"FATAL": "CRITICAL",
},
MapValuesExclusive: true,
},
InstrumentationSourceLabel: instrumentationSourceValue(p.Type()),
},
}.Components(ctx, tag, uid)...,
)
return c
}
type LoggingReceiverFlink struct {
LoggingProcessorFlink `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"`
}
func (r LoggingReceiverFlink) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(r.ReceiverMixin.IncludePaths) == 0 {
r.ReceiverMixin.IncludePaths = []string{
"/opt/flink/log/flink-*-standalonesession-*.log",
"/opt/flink/log/flink-*-taskexecutor-*.log",
"/opt/flink/log/flink-*-client-*.log",
}
}
c := r.ReceiverMixin.Components(ctx, tag)
c = append(c, r.LoggingProcessorFlink.Components(ctx, tag, "flink")...)
return c
}
func init() {
confgenerator.LoggingProcessorTypes.RegisterType(func() confgenerator.LoggingProcessor { return &LoggingProcessorFlink{} })
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverFlink{} })
}