Providers/Scripts/3.x/Scripts/nxOMSKeyMgmt.py (253 lines of code) (raw):

#!/usr/bin/env python3 #============================================================================ # Copyright (C) Microsoft Corporation, All rights reserved. #============================================================================ from contextlib import contextmanager import os import sys import subprocess import imp import re # class MSFT_nxOMSKeyMgmt:OMI_BaseResource # { # [Key] string KeyContents; # [Write,required] string KeySignature; # [Write,ValueMap{"present", "absent"},Values{"present", "absent"}] string Ensure; # }; protocol = imp.load_source('protocol', '../protocol.py') nxDSCLog = imp.load_source('nxDSCLog', '../nxDSCLog.py') LG = nxDSCLog.DSCLog dsc_keyring_path = '/etc/opt/omi/conf/omsconfig/keyring.gpg' signature_keyring_path = '/etc/opt/omi/conf/omsconfig/keymgmtring.gpg' gpg_bin = '/usr/bin/gpg' tmpdir = '/tmp/omsmgmt' key_contents_path = tmpdir + '/' + 'tmpkey.pub' key_signature_path = tmpdir + '/' + 'tmpkey.asc' def init_vars(KeyContents, KeySignature, Ensure): return KeyContents, KeySignature, Ensure.lower() def Set_Marshall(KeyContents, KeySignature, Ensure): if KeyContents is None or len(KeyContents) == 0 or \ KeySignature is None or len(KeySignature) == 0: LG().Log('ERROR', 'Both "KeyContents" and "KeySignature" must be set.') print( 'Both "KeyContents" and "KeySignature" must be set.', file=sys.stderr) return [-1] KeyContents, KeySignature, Ensure = init_vars(KeyContents, KeySignature, Ensure) return Set(KeyContents, KeySignature, Ensure) def Test_Marshall(KeyContents, KeySignature, Ensure): if KeyContents is None or len(KeyContents) == 0 or \ KeySignature is None or len(KeySignature) == 0: LG().Log('ERROR', 'Both "KeyContents" and "KeySignature" must be set.') print( 'Both "KeyContents" and "KeySignature" must be set.', file=sys.stderr) return [-1] KeyContents, KeySignature, Ensure = init_vars( KeyContents, KeySignature, Ensure) return Test(KeyContents, KeySignature, Ensure) def Get_Marshall(KeyContents, KeySignature, Ensure): if KeyContents is None or len(KeyContents) == 0 or \ KeySignature is None or len(KeySignature) == 0: LG().Log('ERROR', 'Both "KeyContents" and "KeySignature" must be set.') print( 'Both "KeyContents" and "KeySignature" must be set.', file=sys.stderr) return 0, KeyContents, KeySignature, Ensure arg_names = list(locals().keys()) KeyContents, KeySignature, Ensure = init_vars( KeyContents, KeySignature, Ensure) KeyContents, KeySignature, Ensure = Get(KeyContents, KeySignature, Ensure) KeyContents = protocol.MI_String(KeyContents) KeySignature = protocol.MI_String(KeySignature) Ensure = protocol.MI_String(Ensure) retd = {} ld = locals() for k in arg_names: retd[k] = ld[k] return 0, retd def Set(KeyContents, KeySignature, Ensure): if Test(KeyContents, KeySignature, Ensure) == [0]: return [0] if not os.path.isdir(tmpdir): if MakeDirs(tmpdir) != None: LG().Log('ERROR', "Exception - Unable to creaste " + tmpdir) print("Exception - Unable to creaste " + tmpdir, file=sys.stderr) return [-1] error = None with opened_w_error(key_signature_path, 'w+') as (F, error): if error: print("ERROR: Exception opening file " + key_signature_path + " " + str(error), file=sys.stderr) LG().Log('ERROR', "Exception opening file " + key_signature_path + " " + str(error)) cleanup() return [-1] else: F.write(KeySignature) F.close() with opened_w_error(key_contents_path, 'w+') as (F, error): if error: print("ERROR: Exception opening file " + key_contents_path + " " + str(error), file=sys.stderr) LG().Log('ERROR', "Exception opening file " + key_contents_path + " " + str(error)) cleanup() return [-1] else: F.write(KeyContents) F.close() # Verify the signature. cmd = 'HOME=/var/opt/microsoft/omsagent/run ' + gpg_bin + ' --no-default-keyring --keyring ' \ + signature_keyring_path + ' --verify ' + \ key_signature_path + ' ' + key_contents_path code, out = RunGetOutput(cmd, False, False) if code != 0: LG().Log('ERROR', '"KeyContents" could not be verified by "KeySignature". Output is ' + out) print('"KeyContents" could not be verified by "KeySignature". Output is ' + out, file=sys.stderr) cleanup() return [-1] if Ensure == 'absent': # Get the id so we can remove the key. cmd = 'HOME=/var/opt/microsoft/omsagent/run ' + gpg_bin + ' --dry-run --no-default-keyring --keyring ' \ + dsc_keyring_path + ' --import ' + key_contents_path code, out = RunGetOutput(cmd, False, False) r = re.search(r'.*?key (.*?):.*?not changed', out) if r != None: key_id = r.group(1) else: LG().Log( 'ERROR', 'Key ID could not be determined. Output is ' + out) cleanup() return [-1] # We must use the fingerprint in batch mode to prevent interactive questions cmd = 'HOME=/var/opt/microsoft/omsagent/run ' + gpg_bin + ' --batch --yes --no-default-keyring --keyring ' \ + dsc_keyring_path + ' --fingerprint ' + key_id code, out = RunGetOutput(cmd, False, False) r = re.search(r'.*?fingerprint = (.*?)\n', out) if r != None: # Older versions of gpg cannot parse space padding. key_fingerprint = r.group(1).replace(' ','') else: LG().Log( 'ERROR', 'Key fingerprint could not be determined. Output is ' + out) cleanup() return [-1] # Delete the key. cmd = 'HOME=/var/opt/microsoft/omsagent/run ' + gpg_bin + ' --batch --yes --no-default-keyring --keyring ' \ + dsc_keyring_path + ' --delete-secret-and-public-key "' + key_fingerprint + '"' code, out = RunGetOutput(cmd, False, False) if code != 0: LG().Log('ERROR', 'Unable to delete Key. Output is ' + out) print('Unable to delete Key. Output is ' + out, file=sys.stderr) cleanup() return [-1] else: # Add the key. cmd = 'HOME=/var/opt/microsoft/omsagent/run ' + gpg_bin + ' --no-default-keyring --keyring ' \ + dsc_keyring_path + ' --import ' + key_contents_path code, out = RunGetOutput(cmd, False, False) if code != 0: LG().Log('ERROR', 'Unable to import Key. Output is ' + out) print('Unable to import Key. Output is ' + out, file=sys.stderr) cleanup() return [-1] cleanup() return [0] def Test(KeyContents, KeySignature, Ensure): if not os.path.isdir(tmpdir): if MakeDirs(tmpdir) != None: LG().Log('ERROR', "Exception - Unable to creaste " + tmpdir) print("Exception - Unable to creaste " + tmpdir, file=sys.stderr) return [-1] error = None with opened_w_error(key_contents_path, 'w+') as (F, error): if error: LG().Log('ERROR', "Exception opening file " + key_contents_path + " " + str(error)) print("ERROR: Exception opening file " + key_contents_path + " " + str(error), file=sys.stderr) return [-1] else: F.write(KeyContents) F.close() cmd = 'HOME=/var/opt/microsoft/omsagent/run ' + gpg_bin + ' --dry-run --options /dev/null --no-default-keyring --keyring ' \ + dsc_keyring_path + ' --import ' + key_contents_path code, out = RunGetOutput(cmd, False, False) if code != 0: LG().Log('ERROR', '"KeyContents" may be invalid. Output is: ' + out) print('ERROR: "KeyContents" may be invalid. Output is: ' + out, file=sys.stderr) present = 'not changed' in out if ( Ensure == 'present' and not present ) or \ (Ensure == 'absent' and present): cleanup() return [-1] cleanup() return [0] def Get(KeyContents, KeySignature, Ensure): # Get wil not determine the value of Ensure. return KeyContents, KeySignature, Ensure def cleanup(): if os.path.isdir(tmpdir): os.system('rm -rf ' + tmpdir + ' 2>&1 >/dev/null') def MakeDirs(path): error = None try: os.makedirs(path) except OSError as error: print("Exception making dir " + path + " " + str(error), file=sys.stderr) LG().Log('ERROR', "Exception making dir" + path + " " + str(error)) except IOError as error: print("Exception making dir " + path + " " + str(error), file=sys.stderr) LG().Log('ERROR', "Exception making dir " + path + " " + str(error)) return error @contextmanager def opened_w_error(filename, mode="r"): """ This context ensures the file is closed. """ try: f=open(filename, mode) except IOError as err: yield None, err else: try: yield f, None finally: f.close() def RunGetOutput(cmd, no_output, chk_err=True): """ Wrapper for subprocess.check_output. Execute 'cmd'. Returns return code and STDOUT, trapping expected exceptions. Reports exceptions to Error if chk_err parameter is True """ def check_output(no_output, *popenargs, **kwargs): r"""Backport from subprocess module from python 2.7""" if 'stdout' in kwargs: raise ValueError( 'stdout argument not allowed, it will be overridden.') if no_output: out_file = None else: out_file = subprocess.PIPE process = subprocess.Popen(stdout=out_file, *popenargs, **kwargs) output, unused_err = process.communicate() retcode = process.poll() if retcode: cmd = kwargs.get("args") if cmd is None: cmd = popenargs[0] raise CalledProcessError(retcode, cmd, output=output) return output # Exception classes used by this module. class CalledProcessError(Exception): def __init__(self, returncode, cmd, output=None): self.returncode = returncode self.cmd = cmd self.output = output def __str__(self): return "Command '%s' returned non-zero exit status %d" \ % (self.cmd, self.returncode) output=b'' try: output = check_output(no_output, cmd, stderr=subprocess.STDOUT, shell=True) if output is None: output=b'' except CalledProcessError as e: if chk_err: print('CalledProcessError. Error Code is ' + str(e.returncode), file=sys.stdout) LG().Log( 'ERROR', 'CalledProcessError. Error Code is ' + str(e.returncode)) print( 'CalledProcessError. Command string was ' + e.cmd, file=sys.stdout) LG().Log( 'ERROR', 'CalledProcessError. Command string was ' + e.cmd) print('CalledProcessError. Command result was ' + (e.output[:-1]).decode('ascii','ignore'), file=sys.stdout) LG().Log( 'ERROR', 'CalledProcessError. Command result was ' + (e.output[:-1]).decode('ascii','ignore')) if no_output: return e.returncode, None else: return e.returncode, e.output.decode('ascii','ignore') if no_output: return 0, None else: return 0, output.decode('ascii','ignore')