func()

in smlogs-kubectl-plugin/pkg/cmd/smlogs.go [277:387]


func (o *SmLogsOptions) RunTraining(c *cobra.Command, args []string) error {

	ctx := context.Background()

	smJob := &trainingjobv1.TrainingJob{}
	if err := o.k8sClient.Get(ctx, client.ObjectKey{Namespace: *o.configFlags.Namespace, Name: o.k8sJobName}, smJob); err != nil {
		return err
	}

	if smJob.Spec.TrainingJobName == nil || len(*smJob.Spec.TrainingJobName) == 0 {
		return fmt.Errorf("SageMaker training job's spec does not have name\n")
	}

	if smJob.Spec.Region == nil || len(*smJob.Spec.Region) == 0 {
		return fmt.Errorf("SageMaker training job's spec does not have region\n")
	}

	trainingJobName := string(*smJob.Spec.TrainingJobName)
	o.awsConfig.Region = smJob.Spec.Region

	fmt.Fprintf(o.ErrOut, "\"%s\" has SageMaker TrainingJobName \"%s\" in region \"%s\", status \"%s\" and secondary status \"%s\"\n", o.k8sJobName, trainingJobName, string(*smJob.Spec.Region), smJob.Status.TrainingJobStatus, smJob.Status.SecondaryStatus)

	cwClient := o.createCloudWatchLogsClientForConfig(o.awsConfig)

	var latestEvent *cloudwatchlogs.FilteredLogEvent = nil
	var nextToken *string = nil

	// startTimeMillis and endTimeMillis specify the time range in which CloudWatchLogs will search
	// for logs.
	// TODO add custom timestamp ranges
	var startTimeMillis int64 = 0
	var endTimeMillis int64

	streamName := trainingJobName

	if len(o.logSearchPrefix) > 0 {
		streamName = streamName + "/" + o.logSearchPrefix
	}

	for {
		// Set endTimeMillis to be the current time in milliseconds since the Unix epoch.
		endTimeMillis = time.Now().Unix() * 1000

		// If we are done paginating and this is at least the second poll in streaming mode,
		// we only query for new logs between the latestEvent.Timestamp and now.
		if nextToken == nil && latestEvent != nil {
			startTimeMillis = *latestEvent.Timestamp
		}

		filterRequest, filterResponse := cwClient.FilterLogEventsRequest(&cloudwatchlogs.FilterLogEventsInput{
			LogGroupName:        &o.logGroupName,
			LogStreamNamePrefix: &streamName,
			NextToken:           nextToken,
			StartTime:           &startTimeMillis,
			EndTime:             &endTimeMillis,
		})
		err := filterRequest.Send()

		if err != nil {
			return err
		}

		if len(filterResponse.Events) > 0 {

			// We do not want to print duplicate events, so we filter out events
			// that we've already seen.
			// This assumes that the API response presents chronologically ordered
			// events, and that the API response does not change if it is queried again
			// (except for new events that occur after the latest event).
			startingIndex := 0
			if latestEvent != nil {
				for index, event := range filterResponse.Events {
					if *event.EventId == *latestEvent.EventId {
						startingIndex = index + 1
						break
					}
				}
			}

			for _, event := range filterResponse.Events[startingIndex:] {

				latestEvent = event

				// time.Unix only accepts in seconds or nanoseconds, Timestamp is in milliseconds
				timestampInNano := *event.Timestamp * 1e6
				utcTimestamp := time.Unix(0, timestampInNano)

				// TODO would be great to have customization of which fields to show
				// For example, we should show LogStreamName if they use multiple worker instances.
				// They may or may not want IngestionTime as well.
				if o.hideLogStreamName {
					fmt.Fprintf(o.Out, "%s %s\n", utcTimestamp, *event.Message)
				} else {
					fmt.Fprintf(o.Out, "%s %s %s\n", *event.LogStreamName, utcTimestamp, *event.Message)
				}
			}
		}

		nextToken = filterResponse.NextToken

		if nextToken == nil {
			if o.tail {
				time.Sleep(streamingPollIntervalMillis * time.Millisecond)
			} else {
				break
			}
		}
	}

	return nil
}