in pkg/source/gcp/api/gcp_client.go [599:671]
func (c *GCPClientImpl) ListLogEntries(ctx context.Context, resourceNames []string, filter string, logSink chan any) error {
type logEntriesListRequest struct {
ResourceNames []string `json:"resourceNames"`
Filter string `json:"filter"`
OrderBy string `json:"orderBy"`
PageSize int64 `json:"pageSize"`
PageToken string `json:"pageToken,omitempty"`
}
type logEntriesListResponse struct {
Entries []any `json:"entries"`
NextPageToken string `json:"nextPageToken"`
}
defer close(logSink)
ENDPOINT := "https://logging.googleapis.com/v2/entries:list"
MAXIMUM_PAGE_SIZE := 1000
nextPageToken := ""
pageCount := 0
for entryIndex := 0; entryIndex < c.MaxLogEntries; entryIndex += MAXIMUM_PAGE_SIZE {
queryEnd := false
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
// cancel operation
return err
}
default:
requestBody := logEntriesListRequest{
ResourceNames: resourceNames,
Filter: filter,
OrderBy: "timestamp asc",
PageSize: int64(math.Min(float64(MAXIMUM_PAGE_SIZE), float64(c.MaxLogEntries-entryIndex))), // logging API can take 1000 entries at most.
PageToken: nextPageToken,
}
requestBytes, err := json.Marshal(requestBody)
if err != nil {
return err
}
req, err := c.CreateGCPHttpRequest(ctx, "POST", ENDPOINT, bytes.NewReader(requestBytes))
if err != nil {
return err
}
client := httpclient.NewJsonResponseHttpClient[logEntriesListResponse](c.BaseClient)
response, httpResponse, err := client.DoWithContext(ctx, req)
if httpResponse != nil && httpResponse.Body != nil {
defer httpResponse.Body.Close()
}
if err != nil {
if httpResponse != nil {
slog.ErrorContext(ctx, fmt.Sprintf("Unretriable error found: %d:%s", httpResponse.StatusCode, httpResponse.Status))
}
return err
}
for _, entry := range response.Entries {
logSink <- entry
}
if response.NextPageToken == "" {
queryEnd = true
break
}
nextPageToken = response.NextPageToken
pageCount += 1
}
if queryEnd {
break
}
}
return nil
}