func()

in exporter/collector/logs.go [236:283]


func (l *LogsExporter) PushLogs(ctx context.Context, ld plog.Logs) error {
	if l.loggingClient == nil {
		return errors.New("not started")
	}
	projectEntries, err := l.mapper.createEntries(ld)
	if err != nil {
		return err
	}

	var errs []error
	for project, entries := range projectEntries {
		entry := 0
		currentBatchSize := 0
		// Send entries in WriteRequest chunks
		for len(entries) > 0 {
			// default to max int so that when we are at index=len we skip the size check to avoid panic
			// (index=len is the break condition when we reassign entries=entries[len:])
			entrySize := l.mapper.maxRequestSize
			if entry < len(entries) {
				entrySize = proto.Size(entries[entry])
			}

			// this block gets skipped if we are out of entries to check
			if currentBatchSize+entrySize < l.mapper.maxRequestSize {
				// if adding the current entry to the current batch doesn't go over the request size,
				// increase the index and account for the new request size, then continue
				currentBatchSize += entrySize
				entry++
				continue
			}

			// override destination project quota for this write request, if applicable
			if l.cfg.DestinationProjectQuota {
				ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"x-goog-user-project": strings.TrimPrefix(project, "projects/")}))
			}

			// if the current entry goes over the request size (or we have gone over every entry, i.e. index=len),
			// write the list up to but not including the current entry's index
			_, err := l.writeLogEntries(ctx, entries[:entry])
			errs = append(errs, err)

			entries = entries[entry:]
			entry = 0
			currentBatchSize = 0
		}
	}
	return errors.Join(errs...)
}