in exporter/collector/logs.go [236:283]
func (l *LogsExporter) PushLogs(ctx context.Context, ld plog.Logs) error {
if l.loggingClient == nil {
return errors.New("not started")
}
projectEntries, err := l.mapper.createEntries(ld)
if err != nil {
return err
}
var errs []error
for project, entries := range projectEntries {
entry := 0
currentBatchSize := 0
// Send entries in WriteRequest chunks
for len(entries) > 0 {
// default to max int so that when we are at index=len we skip the size check to avoid panic
// (index=len is the break condition when we reassign entries=entries[len:])
entrySize := l.mapper.maxRequestSize
if entry < len(entries) {
entrySize = proto.Size(entries[entry])
}
// this block gets skipped if we are out of entries to check
if currentBatchSize+entrySize < l.mapper.maxRequestSize {
// if adding the current entry to the current batch doesn't go over the request size,
// increase the index and account for the new request size, then continue
currentBatchSize += entrySize
entry++
continue
}
// override destination project quota for this write request, if applicable
if l.cfg.DestinationProjectQuota {
ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"x-goog-user-project": strings.TrimPrefix(project, "projects/")}))
}
// if the current entry goes over the request size (or we have gone over every entry, i.e. index=len),
// write the list up to but not including the current entry's index
_, err := l.writeLogEntries(ctx, entries[:entry])
errs = append(errs, err)
entries = entries[entry:]
entry = 0
currentBatchSize = 0
}
}
return errors.Join(errs...)
}