Providers/nxOMSAutomationWorker/automationworker/worker/streamhandler.py (87 lines of code) (raw):
#!/usr/bin/env python2
#
# Copyright (C) Microsoft Corporation, All rights reserved.
"""Stream handler module. Is used to process output from stdout and stderr"""
import codecs
import traceback
from threading import Thread
import jrdsclient
import tracer
PREFIX_DEBUG = "DEBUG:"
PREFIX_ERROR = "ERROR:"
PREFIX_VERBOSE = "VERBOSE:"
PREFIX_WARNING = "WARNING:"
STREAM_TYPE_DEBUG = "Debug"
STREAM_TYPE_ERROR = "Error"
STREAM_TYPE_OUTPUT = "Output"
STREAM_TYPE_VERBOSE = "Verbose"
STREAM_TYPE_WARNING = "Warning"
class StreamHandler(Thread):
"""Stream handler class."""
def __init__(self, job_data, runtime_process, jrds_client):
"""
:type job_data: jrdsclient.JobData
:type runtime_process :
:type jrds_client : jrdsclient.JRDSClient
"""
Thread.__init__(self)
self.daemon = True
self.job_data = job_data
self.runtime_process = runtime_process
self.jrds_client = jrds_client
def get_job_extended_properties(self):
return {
'accountid': str(self.job_data.account_id),
'accountname': str(self.job_data.account_name),
'trackingid': str(self.job_data.tracking_id),
'jobid': str(self.job_data.job_id),
'resourcegroup': str(self.job_data.resource_group_name),
'runbookname': str(self.job_data.runbook_name),
'subscriptionid': str(self.job_data.subscription_id),
'runon': str(self.job_data.run_on)
}
def run(self):
"""Monitor the job's subprocess for output (which will be uploaded as streams).
Notes:
PowerShell stdout : http://stackoverflow.com/questions/22349139/utf-8-output-from-powershell
IMPORTANT: Do not log streams to cloud.
"""
stream_count = 0
while True:
try:
output = codecs.getwriter('utf8')(self.runtime_process.stdout).readline()
error_output = codecs.getwriter('utf8')(self.runtime_process.stderr).readline()
if output == '' and error_output == '' and self.runtime_process.poll() is not None:
break
if output:
if output.startswith(PREFIX_DEBUG.lower()) or \
output.startswith(PREFIX_DEBUG.upper()) or \
output.startswith(PREFIX_DEBUG.capitalize()):
self.process_debug_stream(stream_count, output)
elif output.startswith(PREFIX_ERROR.lower()) or \
output.startswith(PREFIX_ERROR.upper()) or \
output.startswith(PREFIX_ERROR.capitalize()):
self.process_error_stream(stream_count, output)
elif output.startswith(PREFIX_VERBOSE.lower()) or \
output.startswith(PREFIX_VERBOSE.upper()) or \
output.startswith(PREFIX_VERBOSE.capitalize()):
self.process_verbose_stream(stream_count, output)
elif output.startswith(PREFIX_WARNING.lower()) or \
output.startswith(PREFIX_WARNING.upper()) or \
output.startswith(PREFIX_WARNING.capitalize()):
self.process_warning_stream(stream_count, output)
else:
self.process_output_stream(stream_count, output)
stream_count += 1
# leave trace at the end to prevent encoding issue from pushing streams to cloud
# leave this as debug trace to prevent logging customer streams to automation logs
tracer.log_debug_trace("STDOUT : " + str(output.strip()))
if error_output:
self.process_error_stream(stream_count, error_output)
stream_count += 1
tracer.log_debug_trace("STDERR : " + str(error_output.strip()))
except:
tracer.log_sandbox_job_streamhandler_unhandled_exception(self.job_data.job_id, traceback.format_exc())
continue
tracer.log_sandbox_job_streamhandler_processing_complete(self.job_data.job_id)
def process_debug_stream(self, stream_count, output):
self.set_stream(stream_count, STREAM_TYPE_DEBUG, output)
pass
def process_error_stream(self, stream_count, output):
self.set_stream(stream_count, STREAM_TYPE_ERROR, output)
pass
def process_output_stream(self, stream_count, output):
self.set_stream(stream_count, STREAM_TYPE_OUTPUT, output)
pass
def process_verbose_stream(self, stream_count, output):
self.set_stream(stream_count, STREAM_TYPE_VERBOSE, output)
pass
def process_warning_stream(self, stream_count, output):
self.set_stream(stream_count, STREAM_TYPE_WARNING, output)
pass
def set_stream(self, stream_count, stream_type, output):
self.jrds_client.set_stream(self.job_data.job_id, self.job_data.runbook_version_id, output.strip(),
stream_type, stream_count, self.get_job_extended_properties())