in benchmarking/pipemode_benchmark/benchmark.py [0:0]
def benchmark(region, role_arn, dataset, output_path, instance_type, script):
"""Run a single benchmark task.
Returns a description of the benchmark task, together with the time TensorFlow spent
iterating over data via the PipeModeDataset.
Args:
region: The AWS region to run the benchmark in
role_arn: The ARN of a role to run the training task with.
dataset: A BenchmarkDataset
output_path: A place to dump models (not needed, but required by the API)
instance_type: The EC2 instance to benchmark on
image: A BenchmarkScript
"""
training_job_name = "-".join([
"pmb",
"-".join(dataset.name.split(".")),
script.name,
datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")
])
client = boto3.client('sagemaker', region_name=region_helper.region)
client.create_training_job(TrainingJobName=training_job_name,
RoleArn=role_arn,
AlgorithmSpecification={
'TrainingImage': script.image,
'TrainingInputMode': 'Pipe'
},
HyperParameters={'dimension': str(dataset.dimension)},
InputDataConfig=[{
'ChannelName': 'elizabeth',
'DataSource': {
'S3DataSource': {
'S3DataType': 'S3Prefix',
'S3Uri': dataset.s3_uri,
'S3DataDistributionType': 'FullyReplicated'
}
}
}],
OutputDataConfig={
'S3OutputPath': output_path
},
StoppingCondition={
'MaxRuntimeInSeconds': 86400
},
ResourceConfig={
'InstanceType': instance_type,
'InstanceCount': 1,
'VolumeSizeInGB': 100
})
print "Created benchmarking training job: {}".format(training_job_name)
# Wait for training job to complete.
while True:
status = client.describe_training_job(TrainingJobName=training_job_name)['TrainingJobStatus']
if status == 'Failed':
raise BenchmarkException("Failed job: " + training_job_name)
if status == 'Completed':
break
else:
time.sleep(30)
# Extract the iteration time from the logs and return this.
logs = boto3.client('logs', region_name=region_helper.region)
[log_stream] = logs.describe_log_streams(logGroupName="/aws/sagemaker/TrainingJobs",
logStreamNamePrefix=training_job_name)['logStreams']
log_stream_name = log_stream['logStreamName']
next_token = None
while True:
if next_token:
log_event_response = logs.get_log_events(
logGroupName="/aws/sagemaker/TrainingJobs",
logStreamName=log_stream_name,
nextToken=next_token)
else:
log_event_response = logs.get_log_events(
logGroupName="/aws/sagemaker/TrainingJobs",
logStreamName=log_stream_name)
next_token = log_event_response['nextForwardToken']
events = log_event_response['events']
if not events:
break
for event in events:
message = event['message']
if 'iteration time' in message:
total_iteration_time = datetime.timedelta(seconds=float(message[15:].strip()))
if 'PipeModeDatasetOp::Dataset::Iterator read_time_ms' in message:
iterator_time = datetime.timedelta(milliseconds=float(message.strip().split()[2]))
if 'PipeModeDatasetOp::Dataset::Iterator read_bytes' in message:
read_bytes = long(message.strip().split()[2])
if 'PipeModeDatasetOp::Dataset::Iterator read_GB/s' in message:
read_gb_sec = float(message.strip().split()[2])
return BenchmarkResult(total_iteration_time, read_bytes, iterator_time,
read_gb_sec, dataset.name, training_job_name, script.name)