perfkitbenchmarker/events.py (127 lines of code) (raw):
# Copyright 2015 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines observable events in PerfKitBenchmarker.
All events are passed keyword arguments, and possibly a sender. See event
definitions below.
Event handlers are run synchronously in an unspecified order; any exceptions
raised will be propagated.
"""
import logging
import os
from absl import flags
import blinker
from perfkitbenchmarker import data
from perfkitbenchmarker import sample
from perfkitbenchmarker import stages
FLAGS = flags.FLAGS
_events = blinker.Namespace()
initialization_complete = _events.signal(
'system-ready',
doc="""
Signal sent once after the system is initialized (command-line flags
parsed, temporary directory initialized, run_uri set).
Sender: None
Payload: parsed_flags, the parsed FLAGS object.""",
)
provider_imported = _events.signal(
'provider-imported',
doc="""
Signal sent after a cloud provider's modules have been imported.
Sender: string. Cloud provider name chosen from provider_info.VALID_CLOUDS.""",
)
register_tracers = _events.signal(
'register_tracers',
doc="""
Signal sent at the beginning of a benchmark for tracers to be registered.
Sender: None
Payload: parsed_flags, the parsed FLAGS object.""",
)
benchmark_start = _events.signal(
'benchmark-start',
doc="""
Signal sent at the beginning of a benchmark before any resources are
provisioned.
Sender: None
Payload: benchmark_spec.""",
)
on_vm_startup = _events.signal(
'on-vm-startup',
doc="""
Signal sent on vm startup.
Sender: None
Payload: vm (VirtualMachine object).""",
)
benchmark_end = _events.signal(
'benchmark-end',
doc="""
Signal sent at the end of a benchmark after any resources have been
torn down (if run_stage includes teardown).
Sender: None
Payload: benchmark_spec.""",
)
before_phase = _events.signal(
'before-phase',
doc="""
Signal sent before run phase and trigger phase.
Sender: the stage.
Payload: benchmark_spec.""",
)
trigger_phase = _events.signal(
'trigger-phase',
doc="""
Signal sent immediately before run phase.
This is used for short running command right before the run phase.
Before run phase can be slow, time sensitive function should use
this event insteand.
Payload: benchmark_spec.""",
)
after_phase = _events.signal(
'after-phase',
doc="""
Signal sent immediately after a phase runs, regardless of whether it was
successful.
Sender: the stage.
Payload: benchmark_spec.""",
)
benchmark_samples_created = _events.signal(
'benchmark-samples-created',
doc="""
Called with samples list and benchmark spec.
Signal sent immediately after a sample is created.
Sample's should be added into this phase. Any global metadata should be
added in the samples finalized.
Payload: benchmark_spec (BenchmarkSpec), samples (list of sample.Sample).""",
)
all_samples_created = _events.signal(
'all_samples_created',
doc="""
Called with samples list and benchmark spec.
Signal sent immediately after a benchmark_samples_created is called.
The samples' metadata is mutable, and may be updated by the subscriber.
This stage is used for adding global metadata. No new samples should
be added during this phase.
Sender: the phase. Currently only stages.RUN.
Payload: benchmark_spec (BenchmarkSpec), samples (list of sample.Sample).""",
)
record_event = _events.signal(
'record-event',
doc="""
Signal sent when an event is recorded.
Signal sent after an event occurred. Record start, end timestamp and metadata
of the event for analysis.
Sender: None
Payload: event (string), start_timestamp (float), end_timestamp (float),
metadata (dict).""",
)
def RegisterTracingEvents():
record_event.connect(AddEvent, weak=False)
class TracingEvent:
"""Represents an event object.
Attributes:
sender: string. Name of the sending class/object.
event: string. Name of the event.
start_timestamp: float. Represents the start timestamp of the event.
end_timestamp: float. Represents the end timestamp of the event.
metadata: dict. Additional metadata of the event.
"""
events = []
def __init__(self, sender, event, start_timestamp, end_timestamp, metadata):
self.sender = sender
self.event = event
self.start_timestamp = start_timestamp
self.end_timestamp = end_timestamp
self.metadata = metadata
def AddEvent(sender, event, start_timestamp, end_timestamp, metadata):
"""Record a TracingEvent."""
TracingEvent.events.append(
TracingEvent(sender, event, start_timestamp, end_timestamp, metadata)
)
@on_vm_startup.connect
def _RunStartupScript(unused_sender, vm):
"""Run startup script if necessary."""
if FLAGS.startup_script:
vm.RemoteCopy(data.ResourcePath(FLAGS.startup_script))
vm.startup_script_output = vm.RemoteCommand(
'./%s' % os.path.basename(FLAGS.startup_script)
)
@benchmark_samples_created.connect
def _AddScriptSamples(unused_sender, benchmark_spec, samples):
def _ScriptResultToMetadata(out):
return {'stdout': out[0], 'stderr': out[1]}
for vm in benchmark_spec.vms:
if FLAGS.startup_script:
samples.append(
sample.Sample(
'startup',
0,
'',
_ScriptResultToMetadata(vm.startup_script_output),
)
)
if FLAGS.postrun_script:
samples.append(
sample.Sample(
'postrun',
0,
'',
_ScriptResultToMetadata(vm.postrun_script_output),
)
)
@after_phase.connect
def _RunPostRunScript(sender, benchmark_spec):
if sender != stages.RUN:
logging.info(
'Receive after_phase signal from :%s, not '
'triggering _RunPostRunScript.',
sender,
)
if FLAGS.postrun_script:
for vm in benchmark_spec.vms:
vm.RemoteCopy(FLAGS.postrun_script)
vm.postrun_script_output = vm.RemoteCommand(
'./%s' % os.path.basename(FLAGS.postrun_script)
)