apps/hadoop.go (104 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apps
import (
"context"
"fmt"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
)
type MetricsReceiverHadoop struct {
confgenerator.ConfigComponent `yaml:",inline"`
confgenerator.MetricsReceiverSharedJVM `yaml:",inline"`
confgenerator.MetricsReceiverSharedCollectJVM `yaml:",inline"`
}
const defaultHadoopEndpoint = "localhost:8004"
func (r MetricsReceiverHadoop) Type() string {
return "hadoop"
}
func (r MetricsReceiverHadoop) Pipelines(_ context.Context) ([]otel.ReceiverPipeline, error) {
targetSystem := "hadoop"
if r.MetricsReceiverSharedCollectJVM.ShouldCollectJVMMetrics() {
targetSystem = fmt.Sprintf("%s,%s", targetSystem, "jvm")
}
return r.MetricsReceiverSharedJVM.
WithDefaultEndpoint(defaultHadoopEndpoint).
ConfigurePipelines(
targetSystem,
[]otel.Component{
otel.NormalizeSums(),
otel.MetricsTransform(
otel.AddPrefix("workload.googleapis.com"),
),
otel.ModifyInstrumentationScope(r.Type(), "1.0"),
},
)
}
func init() {
confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverHadoop{} })
}
type LoggingProcessorHadoop struct {
confgenerator.ConfigComponent `yaml:",inline"`
}
func (LoggingProcessorHadoop) Type() string {
return "hadoop"
}
func (p LoggingProcessorHadoop) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
// Sample log line:
// 2022-02-01 18:09:47,136 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Edit logging is async:true
regexParser := confgenerator.LoggingProcessorParseRegex{
Regex: `(?<timestamp>\d+-\d+-\d+ \d+:\d+:\d+,\d+)\s+(?<severity>\w+)\s+(?<source>\S+):\s+(?<message>[\S\s]*)`,
ParserShared: confgenerator.ParserShared{
TimeKey: "timestamp",
TimeFormat: "%Y-%m-%d %H:%M:%S,%L",
},
}
severityMappingComponents := confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"severity": {
CopyFrom: "jsonPayload.severity",
MapValues: map[string]string{
"TRACE": "DEBUG",
"DEBUG": "DEBUG",
"INFO": "INFO",
"WARN": "WARNING",
"DEPRECATION": "WARNING",
"ERROR": "ERROR",
"CRITICAL": "ERROR",
"FATAL": "FATAL",
},
MapValuesExclusive: true,
},
InstrumentationSourceLabel: instrumentationSourceValue(p.Type()),
},
}.Components(ctx, tag, uid)
c := regexParser.Components(ctx, tag, uid)
c = append(c, severityMappingComponents...)
return c
}
type LoggingReceiverHadoop struct {
LoggingProcessorHadoop `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline"`
}
func (r LoggingReceiverHadoop) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(r.ReceiverMixin.IncludePaths) == 0 {
// Default logs for hadoop
r.ReceiverMixin.IncludePaths = []string{
"/opt/hadoop/logs/hadoop-*.log",
"/opt/hadoop/logs/yarn-*.log",
}
}
r.ReceiverMixin.MultilineRules = []confgenerator.MultilineRule{
{
StateName: "start_state",
NextState: "cont",
Regex: `^\d+-\d+-\d+ \d+:\d+:\d+,\d+.*`,
},
{
StateName: "cont",
NextState: "cont",
Regex: `^(?!\d+-\d+-\d+ \d+:\d+:\d+,\d+).*`,
},
}
c := r.ReceiverMixin.Components(ctx, tag)
return append(c, r.LoggingProcessorHadoop.Components(ctx, tag, "hadoop")...)
}
func init() {
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverHadoop{} })
}