func()

in lambda/go/add-podcasts/main.go [207:259]


func (h *Handler) filterEpisodes(ctx context.Context, episodes []workshop.Episode) (
	[]workshop.Episode, error,
) {
	log.Printf("filtering on %v episodes", len(episodes))

	keys := make([]map[string]ddbtypes.AttributeValue, 0, len(episodes))
	for _, episode := range episodes {
		log.Printf("searching for episode, %v", episode.ID)
		keys = append(keys, episode.AttributeValuePrimaryKey())
	}

	unprocessedKeys := map[string]ddbtypes.KeysAndAttributes{
		h.episodeTableName: {
			Keys: keys,
		},
	}

	foundEpisodes := make([]workshop.Episode, 0, len(episodes))
	for len(unprocessedKeys) != 0 {
		resp, err := h.ddbClient.BatchGetItem(ctx, &ddb.BatchGetItemInput{
			RequestItems: unprocessedKeys,
		})
		if err != nil {
			return nil, fmt.Errorf("failed to get episodes from DynamoDB, %w", err)
		}
		unprocessedKeys = resp.UnprocessedKeys
		log.Printf("BatchGetItem returned with %v unprocessed items", len(unprocessedKeys))

		foundItems := resp.Responses[h.episodeTableName]
		log.Printf("BatchGetItem returned %v existing episodes", len(foundItems))

		items := make([]workshop.Episode, 0, len(foundItems))
		if err = ddbav.UnmarshalListOfMaps(foundItems, &items); err != nil {
			return nil, fmt.Errorf("failed decode existing episodes in DynamoDB, %w", err)
		}

		foundEpisodes = append(foundEpisodes, items...)
	}

	filteredEpisodes := make([]workshop.Episode, 0, len(episodes))
	for _, episode := range episodes {
		if ep, ok := workshop.GetEpisodeByID(foundEpisodes, episode.ID); ok {
			if ep.Status != workshop.EpisodeStatusFailure {
				log.Printf("filtering out known non failed episode %v", episode.ID)
				continue
			}
		}
		filteredEpisodes = append(filteredEpisodes, episode)
	}

	return filteredEpisodes, nil

}