in opentelemetry-exporter-gcp-logging/src/opentelemetry/exporter/cloud_logging/__init__.py [0:0]
def export(self, batch: Sequence[LogData]):
now = datetime.datetime.now()
log_entries = []
for log_data in batch:
log_entry = LogEntry()
log_record = log_data.log_record
attributes = log_record.attributes or {}
project_id = str(
attributes.get(PROJECT_ID_ATTRIBUTE_KEY, self.project_id)
)
log_suffix = urllib.parse.quote_plus(
str(
attributes.get(
LOG_NAME_ATTRIBUTE_KEY, self.default_log_name
)
)
)
log_entry.log_name = f"projects/{project_id}/logs/{log_suffix}"
# If timestamp is unset fall back to observed_time_unix_nano as recommended,
# see https://github.com/open-telemetry/opentelemetry-proto/blob/4abbb78/opentelemetry/proto/logs/v1/logs.proto#L176-L179
ts = Timestamp()
if log_record.timestamp or log_record.observed_timestamp:
ts.FromNanoseconds(
log_record.timestamp or log_record.observed_timestamp
)
else:
ts.FromDatetime(now)
log_entry.timestamp = ts
monitored_resource_data = get_monitored_resource(
log_record.resource or Resource({})
)
if monitored_resource_data:
log_entry.resource = MonitoredResource(
type=monitored_resource_data.type,
labels=monitored_resource_data.labels,
)
log_entry.trace_sampled = (
log_record.trace_flags is not None
and log_record.trace_flags.sampled
)
if log_record.trace_id:
log_entry.trace = f"projects/{project_id}/traces/{format_trace_id(log_record.trace_id)}"
if log_record.span_id:
log_entry.span_id = format_span_id(log_record.span_id)
if (
log_record.severity_number
and log_record.severity_number.value in SEVERITY_MAPPING
):
log_entry.severity = SEVERITY_MAPPING[ # type: ignore[assignment]
log_record.severity_number.value # type: ignore[index]
]
log_entry.labels = {
k: _convert_any_value_to_string(v)
for k, v in attributes.items()
}
_set_payload_in_log_entry(log_entry, log_record.body)
log_entries.append(log_entry)
self._write_log_entries(log_entries)