in smlogs-kubectl-plugin/pkg/cmd/smlogs.go [277:387]
func (o *SmLogsOptions) RunTraining(c *cobra.Command, args []string) error {
ctx := context.Background()
smJob := &trainingjobv1.TrainingJob{}
if err := o.k8sClient.Get(ctx, client.ObjectKey{Namespace: *o.configFlags.Namespace, Name: o.k8sJobName}, smJob); err != nil {
return err
}
if smJob.Spec.TrainingJobName == nil || len(*smJob.Spec.TrainingJobName) == 0 {
return fmt.Errorf("SageMaker training job's spec does not have name\n")
}
if smJob.Spec.Region == nil || len(*smJob.Spec.Region) == 0 {
return fmt.Errorf("SageMaker training job's spec does not have region\n")
}
trainingJobName := string(*smJob.Spec.TrainingJobName)
o.awsConfig.Region = smJob.Spec.Region
fmt.Fprintf(o.ErrOut, "\"%s\" has SageMaker TrainingJobName \"%s\" in region \"%s\", status \"%s\" and secondary status \"%s\"\n", o.k8sJobName, trainingJobName, string(*smJob.Spec.Region), smJob.Status.TrainingJobStatus, smJob.Status.SecondaryStatus)
cwClient := o.createCloudWatchLogsClientForConfig(o.awsConfig)
var latestEvent *cloudwatchlogs.FilteredLogEvent = nil
var nextToken *string = nil
// startTimeMillis and endTimeMillis specify the time range in which CloudWatchLogs will search
// for logs.
// TODO add custom timestamp ranges
var startTimeMillis int64 = 0
var endTimeMillis int64
streamName := trainingJobName
if len(o.logSearchPrefix) > 0 {
streamName = streamName + "/" + o.logSearchPrefix
}
for {
// Set endTimeMillis to be the current time in milliseconds since the Unix epoch.
endTimeMillis = time.Now().Unix() * 1000
// If we are done paginating and this is at least the second poll in streaming mode,
// we only query for new logs between the latestEvent.Timestamp and now.
if nextToken == nil && latestEvent != nil {
startTimeMillis = *latestEvent.Timestamp
}
filterRequest, filterResponse := cwClient.FilterLogEventsRequest(&cloudwatchlogs.FilterLogEventsInput{
LogGroupName: &o.logGroupName,
LogStreamNamePrefix: &streamName,
NextToken: nextToken,
StartTime: &startTimeMillis,
EndTime: &endTimeMillis,
})
err := filterRequest.Send()
if err != nil {
return err
}
if len(filterResponse.Events) > 0 {
// We do not want to print duplicate events, so we filter out events
// that we've already seen.
// This assumes that the API response presents chronologically ordered
// events, and that the API response does not change if it is queried again
// (except for new events that occur after the latest event).
startingIndex := 0
if latestEvent != nil {
for index, event := range filterResponse.Events {
if *event.EventId == *latestEvent.EventId {
startingIndex = index + 1
break
}
}
}
for _, event := range filterResponse.Events[startingIndex:] {
latestEvent = event
// time.Unix only accepts in seconds or nanoseconds, Timestamp is in milliseconds
timestampInNano := *event.Timestamp * 1e6
utcTimestamp := time.Unix(0, timestampInNano)
// TODO would be great to have customization of which fields to show
// For example, we should show LogStreamName if they use multiple worker instances.
// They may or may not want IngestionTime as well.
if o.hideLogStreamName {
fmt.Fprintf(o.Out, "%s %s\n", utcTimestamp, *event.Message)
} else {
fmt.Fprintf(o.Out, "%s %s %s\n", *event.LogStreamName, utcTimestamp, *event.Message)
}
}
}
nextToken = filterResponse.NextToken
if nextToken == nil {
if o.tail {
time.Sleep(streamingPollIntervalMillis * time.Millisecond)
} else {
break
}
}
}
return nil
}