Providers/nxOMSAutomationWorker/automationworker/worker/runbook.py (71 lines of code) (raw):
#!/usr/bin/env python2
#
# Copyright (C) Microsoft Corporation, All rights reserved.
"""Runbook module."""
import os
import configuration
import gpg
import iohelper
import jrdsclient
import linuxutil
from workerexception import *
definition_kind_int_to_str = {0: "Unknown",
1: "Script",
2: "Workflow",
3: "Graph",
4: "Configuration",
5: "PowerShell",
6: "PowerShellWorkflow",
7: "GraphPowerShell",
8: "GraphPowerShellWorkflow",
9: "Python2",
10: "Python3",
11: "Bash",
13: "PowerShell7"}
class Runbook:
def __init__(self, runbook_data):
""":type runbook_data: jrdsclient.RunbookData"""
self.runbook_data = runbook_data
self.definition_kind = runbook_data.runbook_definition_kind
self.definition_kind_str = definition_kind_int_to_str[runbook_data.runbook_definition_kind]
self.runbook_file_path = ""
# should be overwritten by derived class
self.file_extension = ""
def initialize(self):
"""Initializes the runbook."""
self.write_to_disk()
if configuration.get_enforce_runbook_signature_validation():
self.validate_signature()
def write_to_disk(self):
"""Writes the runbook's definition to disk."""
file_name = self.runbook_data.name + self.runbook_data.runbook_version_id + self.file_extension
self.runbook_file_path = os.path.join(configuration.get_working_directory_path(), file_name)
runbook_definition = str(self.runbook_data.definition.encode("utf-8"))
if linuxutil.is_posix_host() is True:
# replace dos end of line to unix end of line
runbook_definition = runbook_definition.replace("\r\n", "\n")
iohelper.write_to_file(self.runbook_file_path, data=runbook_definition, mode="w+b")
def validate_signature(self):
"""Validates that the runbook signature is valid.
Raises:
InvalidRunbookSignature if the signature is invalid
"""
decrypted_runbook_path = self.runbook_file_path + ".d"
is_valid = gpg.verify_signature(self.runbook_file_path, decrypted_runbook_path)
if is_valid:
os.remove(self.runbook_file_path)
os.rename(decrypted_runbook_path, self.runbook_file_path)
else:
raise InvalidRunbookSignature()
class Python2Runbook(Runbook):
def __init__(self, runbook_data):
Runbook.__init__(self, runbook_data)
self.file_extension = ".py"
self.initialize()
class Python3Runbook(Runbook):
def __init__(self, runbook_data):
Runbook.__init__(self, runbook_data)
self.file_extension = ".py"
self.initialize()
class BashRunbook(Runbook):
def __init__(self, runbook_data):
Runbook.__init__(self, runbook_data)
self.file_extension = ".sh"
self.initialize()
class PowerShellRunbook(Runbook):
def __init__(self, runbook_data):
Runbook.__init__(self, runbook_data)
self.file_extension = ".ps1"
self.initialize()
class PowerShell7Runbook(Runbook):
def __init__(self, runbook_data):
Runbook.__init__(self, runbook_data)
self.file_extension = ".ps1"
self.initialize()