logging/redaction/log_redaction.py (102 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import argparse
import json
import logging
from apache_beam import (
CombineFn,
CombineGlobally,
DoFn,
io,
ParDo,
Pipeline,
WindowInto,
)
from apache_beam.error import PipelineError
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from google.cloud import logging_v2
# TODO: Place inspection and de-identification configurations
class PayloadAsJson(DoFn):
"""Convert PubSub message payload to UTF-8 and return as JSON"""
def process(self, element):
yield json.loads(element.decode("utf-8"))
class BatchPayloads(CombineFn):
"""Opinionated way to batch all payloads in the window"""
def create_accumulator(self):
return []
def add_input(self, accumulator, input):
accumulator.append(input)
return accumulator
def merge_accumulators(self, accumulators):
merged = [item for accumulator in accumulators for item in accumulator]
return merged
def extract_output(self, accumulator):
return accumulator
# TODO: Placeholder for LogRedaction class
class IngestLogs(DoFn):
"""Ingest payloads into destination log"""
def __init__(self, destination_log_name):
self.destination_log_name = destination_log_name
self.logger = None
def _replace_log_name(self, entry):
# update log name in the entry with destination log
entry["logName"] = self.logger.name
return entry
def setup(self):
# initialize logging client
if self.logger:
return
logging_client = logging_v2.Client()
if not logging_client:
logging.error("Cannot create GCP Logging Client")
raise PipelineError("Cannot create GCP Logging Client")
self.logger = logging_client.logger(self.destination_log_name)
if not self.logger:
logging.error("Google client library cannot create Logger object")
raise PipelineError("Google client library cannot create Logger object")
def process(self, element):
if self.logger:
logs = list(map(self._replace_log_name, element))
self.logger.client.logging_api.write_entries(logs)
yield logs
def run(
pubsub_subscription: str,
destination_log_name: str,
window_size: float,
pipeline_args: list[str] = None,
) -> None:
"""Runs Dataflow pipeline"""
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
# TODO: Read job's deployment region
pipeline = Pipeline(options=pipeline_options)
_ = (
pipeline
| "Read log entries from Pub/Sub"
>> io.ReadFromPubSub(subscription=pubsub_subscription)
| "Convert log entry payload to Json" >> ParDo(PayloadAsJson())
| "Aggregate payloads in fixed time intervals"
>> WindowInto(FixedWindows(window_size))
# Optimize Google API consumption and avoid possible throttling
# by calling APIs for batched data and not per each element
| "Batch aggregated payloads"
>> CombineGlobally(BatchPayloads()).without_defaults()
# TODO: Placeholder for redaction transformation
| "Ingest to output log" >> ParDo(IngestLogs(destination_log_name))
)
pipeline.run()
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--pubsub_subscription",
help="The Cloud Pub/Sub subscription to read from in the format "
'"projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>".',
)
parser.add_argument(
"--destination_log_name",
help="The log name to ingest log entries in the format "
'"projects/<PROJECT_ID>/logs/<LOG_ID>".',
)
parser.add_argument(
"--window_size",
type=float,
default=60.0,
help="Output file's window size in seconds.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.pubsub_subscription,
known_args.destination_log_name,
known_args.window_size,
pipeline_args,
)