in lambda/go/add-podcasts/main.go [207:259]
func (h *Handler) filterEpisodes(ctx context.Context, episodes []workshop.Episode) (
[]workshop.Episode, error,
) {
log.Printf("filtering on %v episodes", len(episodes))
keys := make([]map[string]ddbtypes.AttributeValue, 0, len(episodes))
for _, episode := range episodes {
log.Printf("searching for episode, %v", episode.ID)
keys = append(keys, episode.AttributeValuePrimaryKey())
}
unprocessedKeys := map[string]ddbtypes.KeysAndAttributes{
h.episodeTableName: {
Keys: keys,
},
}
foundEpisodes := make([]workshop.Episode, 0, len(episodes))
for len(unprocessedKeys) != 0 {
resp, err := h.ddbClient.BatchGetItem(ctx, &ddb.BatchGetItemInput{
RequestItems: unprocessedKeys,
})
if err != nil {
return nil, fmt.Errorf("failed to get episodes from DynamoDB, %w", err)
}
unprocessedKeys = resp.UnprocessedKeys
log.Printf("BatchGetItem returned with %v unprocessed items", len(unprocessedKeys))
foundItems := resp.Responses[h.episodeTableName]
log.Printf("BatchGetItem returned %v existing episodes", len(foundItems))
items := make([]workshop.Episode, 0, len(foundItems))
if err = ddbav.UnmarshalListOfMaps(foundItems, &items); err != nil {
return nil, fmt.Errorf("failed decode existing episodes in DynamoDB, %w", err)
}
foundEpisodes = append(foundEpisodes, items...)
}
filteredEpisodes := make([]workshop.Episode, 0, len(episodes))
for _, episode := range episodes {
if ep, ok := workshop.GetEpisodeByID(foundEpisodes, episode.ID); ok {
if ep.Status != workshop.EpisodeStatusFailure {
log.Printf("filtering out known non failed episode %v", episode.ID)
continue
}
}
filteredEpisodes = append(filteredEpisodes, episode)
}
return filteredEpisodes, nil
}