image_test/oslogin-ssh/oslogin-ssh-master-tester.py (178 lines of code) (raw):

#!/usr/bin/env python3 # Copyright 2018 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime from google import auth from googleapiclient import discovery import utils from utils import logging MM = utils.MetadataManager MD = None MASTER_KEY = None OSLOGIN_TESTER = None OSADMINLOGIN_TESTER = None TESTEE = None TESTER_SH = 'slave_tester.sh' OSLOGIN_USERS_LIB = None UNIQUE_ID_USER = None def MasterExecuteInSsh(machine, commands, expect_fail=False): ret, output = utils.ExecuteInSsh( MASTER_KEY, MD.ssh_user, machine, commands, expect_fail, capture_output=True) output = output.strip() if output else None return ret, output @utils.RetryOnFailure() def MasterExecuteInSshRetry(machine, commands, expect_fail=False): return MasterExecuteInSsh(machine, commands, expect_fail) def AddOsLoginKeys(): _, key_oslogin = MasterExecuteInSsh( OSLOGIN_TESTER, [TESTER_SH, 'add_key']) _, key_osadminlogin = MasterExecuteInSsh( OSADMINLOGIN_TESTER, [TESTER_SH, 'add_key']) return key_oslogin, key_osadminlogin def RemoveOsLoginKeys(): MasterExecuteInSsh(OSLOGIN_TESTER, [TESTER_SH, 'remove_key']) MasterExecuteInSsh(OSADMINLOGIN_TESTER, [TESTER_SH, 'remove_key']) def SetEnableOsLogin(state, level, md=None): md = md if md else MD md.SetMetadata('enable-oslogin', state, level) def GetServiceAccountUsername(machine): _, username = MasterExecuteInSsh( machine, ['gcloud', 'compute', 'os-login', 'describe-profile', '--format="value(posixAccounts.username)"']) return username @utils.RetryOnFailure() def CheckAuthorizedKeys(user, key, expect_empty=False): _, auth_keys = MasterExecuteInSsh(TESTEE, ['google_authorized_keys', user]) auth_keys = auth_keys if auth_keys else '' if expect_empty and key in auth_keys: raise ValueError( 'Os Login key DETECTED in google_authorized_keys when NOT expected') elif not expect_empty and key not in auth_keys: raise ValueError( 'Os Login key NOT DETECTED in google_authorized_keys when expected') @utils.RetryOnFailure() def CheckNss(user_oslogin, user_osadminlogin, expect_empty=False): _, users = MasterExecuteInSsh(TESTEE, ['getent', 'passwd']) if expect_empty and (user_oslogin in users or user_osadminlogin in users): raise ValueError( 'Os Login usernames DETECTED in getent passwd (nss) when NOT expected') elif not expect_empty and ( user_oslogin not in users or user_osadminlogin not in users): raise ValueError( 'Os Login usernames NOT DETECTED in getent passwd (nss) when expected') def TestLoginFromSlaves(user_oslogin, user_osadminlogin, expect_fail=False): host_oslogin = '%s@%s' % (user_oslogin, TESTEE) host_osadminlogin = '%s@%s' % (user_osadminlogin, TESTEE) MasterExecuteInSshRetry( OSLOGIN_TESTER, [TESTER_SH, 'test_login', host_oslogin], expect_fail=expect_fail) MasterExecuteInSshRetry( OSADMINLOGIN_TESTER, [TESTER_SH, 'test_login', host_osadminlogin], expect_fail=expect_fail) MasterExecuteInSshRetry( OSLOGIN_TESTER, [TESTER_SH, 'test_login_sudo', host_oslogin], expect_fail=True) MasterExecuteInSshRetry( OSADMINLOGIN_TESTER, [TESTER_SH, 'test_login_sudo', host_osadminlogin], expect_fail=expect_fail) def TestOsLogin(level): key_oslogin, key_osadminlogin = AddOsLoginKeys() user_oslogin = GetServiceAccountUsername(OSLOGIN_TESTER) user_osadminlogin = GetServiceAccountUsername(OSADMINLOGIN_TESTER) SetEnableOsLogin(True, level) CheckNss(user_oslogin, user_osadminlogin) CheckAuthorizedKeys(user_oslogin, key_oslogin) CheckAuthorizedKeys(user_osadminlogin, key_osadminlogin) TestLoginFromSlaves(user_oslogin, user_osadminlogin) RemoveOsLoginKeys() TestLoginFromSlaves(user_oslogin, user_osadminlogin, expect_fail=True) key_oslogin, key_osadminlogin = AddOsLoginKeys() TestLoginFromSlaves(user_oslogin, user_osadminlogin) SetEnableOsLogin(None, level) TestLoginFromSlaves(user_oslogin, user_osadminlogin, expect_fail=True) CheckNss(user_oslogin, user_osadminlogin, expect_empty=True) CheckAuthorizedKeys(user_oslogin, key_oslogin, expect_empty=True) CheckAuthorizedKeys(user_osadminlogin, key_osadminlogin, expect_empty=True) RemoveOsLoginKeys() def TestMetadataWithOsLogin(level): tester_key = MD.AddSshKey(MM.SSH_KEYS, level) MD.TestSshLogin(tester_key) SetEnableOsLogin(True, level) MD.TestSshLogin(tester_key, expect_fail=True) SetEnableOsLogin(None, level) MD.TestSshLogin(tester_key) MD.RemoveSshKey(tester_key, MM.SSH_KEYS, level) MD.TestSshLogin(tester_key, expect_fail=True) def TestOsLoginFalseInInstance(): tester_key = MD.AddSshKey(MM.SSH_KEYS, MM.INSTANCE_LEVEL) MD.TestSshLogin(tester_key) SetEnableOsLogin(True, MM.PROJECT_LEVEL) MD.TestSshLogin(tester_key, expect_fail=True) SetEnableOsLogin(False, MM.INSTANCE_LEVEL) MD.TestSshLogin(tester_key) SetEnableOsLogin(None, MM.INSTANCE_LEVEL) MD.TestSshLogin(tester_key, expect_fail=True) SetEnableOsLogin(None, MM.PROJECT_LEVEL) MD.TestSshLogin(tester_key) MD.RemoveSshKey(tester_key, MM.SSH_KEYS, MM.INSTANCE_LEVEL) MD.TestSshLogin(tester_key, expect_fail=True) def AddKeyOsLogin(key): """Returns the key fingerprint for using later to remove it""" key = open(key).read() # Microseconds since epoch + 30 minutes from now expire_obj = datetime.datetime.now() + datetime.timedelta(minutes=30) expire_str = '%d' % (int(expire_obj.strftime("%s")) * 1000000) body = {'key': key, 'expirationTimeUsec': expire_str} request = OSLOGIN_USERS_LIB.importSshPublicKey(parent=UNIQUE_ID_USER, body=body) response = request.execute() keys = response[u'loginProfile'][u'sshPublicKeys'] # retrieves the fingerprint of the added key for k_dict in keys.values(): if k_dict[u'key'] == key: return k_dict[u'fingerprint'] raise Exception('Added key not found on existing keys: %s' % key) def RemoveKeyOsLogin(fingerprint): name = '%s/sshPublicKeys/%s' % (UNIQUE_ID_USER, fingerprint) OSLOGIN_USERS_LIB.sshPublicKeys().delete(name=name).execute() def main(): global MD global MASTER_KEY global OSLOGIN_TESTER global OSADMINLOGIN_TESTER global TESTEE global OSLOGIN_USERS_LIB global UNIQUE_ID_USER logging.info("master: starting environment configuration") TESTEE = MM.FetchMetadataDefault('testee') OSLOGIN_TESTER = MM.FetchMetadataDefault('osLoginTester') OSADMINLOGIN_TESTER = MM.FetchMetadataDefault('osAdminLoginTester') credentials, _ = auth.default() OSLOGIN_USERS_LIB = utils.GetOslogin(discovery, credentials).users() UNIQUE_ID_USER = utils.GetServiceAccountUniqueIDUser() username = utils.GetCurrentLoginProfileUsername(OSLOGIN_USERS_LIB, UNIQUE_ID_USER) compute = utils.GetCompute(discovery, credentials) MD = MM(compute, TESTEE, username) SetEnableOsLogin(None, MM.PROJECT_LEVEL) SetEnableOsLogin(None, MM.INSTANCE_LEVEL) # Enable OsLogin in slaves md = MM(compute, OSLOGIN_TESTER, username) SetEnableOsLogin(True, MM.INSTANCE_LEVEL, md) md = MM(compute, OSADMINLOGIN_TESTER, username) SetEnableOsLogin(True, MM.INSTANCE_LEVEL, md) # Add key in Metadata and in OsLogin to allow access peers in both modes MASTER_KEY = MD.AddSshKey(MM.SSH_KEYS, MM.PROJECT_LEVEL) master_key_fingerprint = AddKeyOsLogin(MASTER_KEY + '.pub') # Execute tests try: logging.info("master: starting: TestOsLogin(MM.INSTANCE_LEVEL)") TestOsLogin(MM.INSTANCE_LEVEL) logging.info("master: starting: TestOsLogin(MM.PROJECT_LEVEL)") TestOsLogin(MM.PROJECT_LEVEL) logging.info( "master: starting: TestMetadataWithOsLogin(MM.INSTANCE_LEVEL)") TestMetadataWithOsLogin(MM.INSTANCE_LEVEL) logging.info("master: starting: TestMetadataWithOsLogin(MM.PROJECT_LEVEL)") TestMetadataWithOsLogin(MM.PROJECT_LEVEL) logging.info("master: starting test: TestOsLoginFalseInInstance()") TestOsLoginFalseInInstance() # Clean keys finally: logging.info("master: starting environment cleanup") MD.RemoveSshKey(MASTER_KEY, MM.SSH_KEYS, MM.PROJECT_LEVEL) RemoveKeyOsLogin(master_key_fingerprint) if __name__ == '__main__': utils.RunTest(main)