in extension/encoding/awslogsencodingextension/internal/unmarshaler/vpc-flow-log/vpc_flow_log_unmarshaler.go [268:419]
func handleField(
field string,
value string,
record plog.LogRecord,
addr *address,
key *resourceKey,
) (bool, error) {
// convert string to number
getNumber := func(value string) (int64, error) {
n, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return -1, fmt.Errorf("%q field in log file is not a number", field)
}
return n, nil
}
// convert string to number and add the
// value to an attribute
addNumber := func(field, value, attrName string) error {
n, err := getNumber(value)
if err != nil {
return fmt.Errorf("%q field in log file is not a number", field)
}
record.Attributes().PutInt(attrName, n)
return nil
}
switch field {
case "srcaddr":
// handled later
addr.source = value
case "pkt-srcaddr":
// handled later
addr.pktSource = value
case "dstaddr":
// handled later
addr.destination = value
case "pkt-dstaddr":
// handled later
addr.pktDestination = value
case "account-id":
key.accountID = value
case "vpc-id":
record.Attributes().PutStr("aws.vpc.id", value)
case "subnet-id":
record.Attributes().PutStr("aws.vpc.subnet.id", value)
case "instance-id":
record.Attributes().PutStr(conventions.AttributeHostID, value)
case "az-id":
record.Attributes().PutStr("aws.az.id", value)
case "interface-id":
// TODO Replace with conventions variable once it becomes available
record.Attributes().PutStr("network.interface.name", value)
case "srcport":
if err := addNumber(field, value, conventions.AttributeSourcePort); err != nil {
return false, err
}
case "dstport":
if err := addNumber(field, value, conventions.AttributeDestinationPort); err != nil {
return false, err
}
case "protocol":
n, err := getNumber(value)
if err != nil {
return false, err
}
protocolNumber := int(n)
if protocolNumber < 0 || protocolNumber >= len(protocolNames) {
return false, fmt.Errorf("protocol number %d does not have a protocol name", protocolNumber)
}
record.Attributes().PutStr(conventions.AttributeNetworkProtocolName, protocolNames[protocolNumber])
case "type":
record.Attributes().PutStr(conventions.AttributeNetworkType, strings.ToLower(value))
case "region":
key.region = value
case "flow-direction":
switch value {
case "ingress":
record.Attributes().PutStr(conventions.AttributeNetworkIoDirection, "receive")
case "egress":
record.Attributes().PutStr(conventions.AttributeNetworkIoDirection, "transmit")
default:
return true, fmt.Errorf("value %s not valid for field %s", value, field)
}
case "version":
if err := addNumber(field, value, "aws.vpc.flow.log.version"); err != nil {
return false, err
}
case "packets":
if err := addNumber(field, value, "aws.vpc.flow.packets"); err != nil {
return false, err
}
case "bytes":
if err := addNumber(field, value, "aws.vpc.flow.bytes"); err != nil {
return false, err
}
case "start":
if err := addNumber(field, value, "aws.vpc.flow.start"); err != nil {
return false, err
}
case "end":
unixSeconds, err := getNumber(value)
if err != nil {
return true, fmt.Errorf("value %s for field %s does not correspond to a valid timestamp", value, field)
}
timestamp := time.Unix(unixSeconds, 0)
record.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
case "action":
record.Attributes().PutStr("aws.vpc.flow.action", value)
case "log-status":
record.Attributes().PutStr("aws.vpc.flow.status", value)
case "tcp-flags":
record.Attributes().PutStr("network.tcp.flags", value)
case "sublocation-type":
record.Attributes().PutStr("aws.sublocation.type", value)
case "sublocation-id":
record.Attributes().PutStr("aws.sublocation.id", value)
case "pkt-src-aws-service":
record.Attributes().PutStr("aws.vpc.flow.source.service", value)
case "pkt-dst-aws-service":
record.Attributes().PutStr("aws.vpc.flow.destination.service", value)
case "traffic-path":
record.Attributes().PutStr("aws.vpc.flow.traffic_path", value)
case "ecs-cluster-arn":
record.Attributes().PutStr(conventions.AttributeAWSECSClusterARN, value)
case "ecs-cluster-name":
record.Attributes().PutStr("aws.ecs.cluster.name", value)
case "ecs-container-instance-arn":
record.Attributes().PutStr("aws.ecs.container.instance.arn", value)
case "ecs-container-instance-id":
record.Attributes().PutStr("aws.ecs.container.instance.id", value)
case "ecs-container-id":
record.Attributes().PutStr("aws.ecs.container.id", value)
case "ecs-second-container-id":
record.Attributes().PutStr("aws.ecs.second.container.id", value)
case "ecs-service-name":
record.Attributes().PutStr("aws.ecs.service.name", value)
case "ecs-task-definition-arn":
record.Attributes().PutStr("aws.ecs.task.definition.arn", value)
case "ecs-task-arn":
record.Attributes().PutStr(conventions.AttributeAWSECSTaskARN, value)
case "ecs-task-id":
record.Attributes().PutStr(conventions.AttributeAWSECSTaskID, value)
case "reject-reason":
record.Attributes().PutStr("aws.vpc.flow.reject_reason", value)
default:
return false, nil
}
return true, nil
}