in setupSingleVehicle.py [0:0]
def main(profile, stackname, cdfstackname, vin, firstname, lastname, username, password, skip, csr):
#Set Config path
CONFIG_PATH = 'config.ini'
config = Config(CONFIG_PATH)
config_parameters = config.get_section('SETTINGS')
secure_cert_path = config_parameters['SECURE_CERT_PATH']
bootstrap_cert = config_parameters['CLAIM_CERT']
root_cert_url = config_parameters['AWS_ROOT_CERT_URL']
root_cert = config_parameters['ROOT_CERT']
root_cert_path = config_parameters['ROOT_CERT_PATH']
default_role_name = config_parameters['DEFAULT_ROLE_NAME']
default_role_arn = config_parameters['POLICY_ARN_IOT']
deviceMaker = config_parameters['DEVICE_MAKER']
provisioning_template_name = config_parameters['PROVISIONING_TEMPLATE_NAME']
provisioning_template_description = config_parameters['PROVISIONING_TEMPLATE_DESCRIPTION']
provisioning_policy_file_name = config_parameters['POLICY_JSON']
provisioning_template_file_name = config_parameters['TEMPLATE_JSON']
provisioning_policy_name = config_parameters['PROVISIONING_POLICY_NAME']
externalId = uuid.uuid4().hex
thingName = vin
c = Cognito(profile)
m = ConnectedMobility(profile, stackname, cdfstackname)
i = IOT(profile, default_role_name, default_role_arn, CONFIG_PATH)
provisioner = ProvisioningHandler(CONFIG_PATH, provisioning_template_name, thingName, i.iotEndpoint)
#begin setting up device certificates for this thing
#We will use fleet provisioning to take a bootstrap certificate, this bootstrap certificate is allowed to connect to specific topics that will allow for the creation
#of the permananet certificate. The permanent certificate is then downloaded to the /certs folder and used to connect to the telemetry topics
print("Setting up provisioning templates.")
if skip is not True:
i.setupProvisioningTemplate(
provisioning_template_name,
provisioning_template_description,
provisioning_template_file_name,
provisioning_policy_name,
provisioning_policy_file_name)
if not c.checkCognitoUser(username,m.userPoolId):
print("Creating user ...")
c.createCognitoUser(username, password, m.userPoolId, m.userPoolClientId)
print("Logging in user ...")
authorization = c.authenticateCognitoUser(username, password, m.userPoolId, m.userPoolClientId)
#check to see if supplier exists, otherwise create a new one.
response = m.getSupplier(deviceMaker, authorization)
if response.status_code != 200:
print("Creating supplier ...")
response = m.createSupplier(deviceMaker = deviceMaker, GUID = externalId, authorization = authorization)
if response.status_code == 204:
print("Vehicle Supplier created successfully...")
else:
print("Error creating vehicle supplier. Exiting. Error: %s", response)
exit()
else:
#get externalId from existing supplier
data = json.loads(response.text)
externalId = data["attributes"]["externalId"]
print("Creating CMS User ...")
response = m.createCMSUser(firstName = firstname, lastName = lastname, username = username, authorization = authorization)
if response.status_code == 201 or response.status_code == 204:
print("CMS User created successfully...")
else:
print("Error creating CMS User. May currently exist, who knows, there's no method to check.")
#create provisioning certificate
print("Creating provisioning certificates and writing to local directory.")
certificateArn = i.createProvisioningCertificate(True, provisioning_template_name, vin)
print("Certificates created successfully")
#attach the new certificate to the policy already created
print("Attaching the boostrap policy to the certificate")
i.attachPolicyToCertificate(provisioning_policy_name, certificateArn)
print("Policy attached successfully")
if csr is True:
i.createCertificateSigningRequest(True, vin, common_name=vin, country=None, state=None, city=None,
organization=None, organizational_unit=None, email_address=None)
try: #to get root cert if it does not exist
print("Getting root certificate")
root_path = "{}/{}".format( root_cert_path, root_cert)
if not os.path.exists( root_path):
response = urlopen(root_cert_url)
content = response.read().decode('utf-8')
with open(root_path, "w" ) as f:
f.write( content )
f.close()
except Exception as e:
print(e)
exit()
print("Root certificate downloaded to certificates directory.")
#try:
if csr is not True:
print("{}/{}".format(secure_cert_path.format(unique_id=vin), bootstrap_cert))
with open("{}/{}".format(secure_cert_path.format(unique_id=vin), bootstrap_cert), 'r') as f:
# Call super-method to perform aquisition/activation
# of certs, association of thing, etc. Returns general
# purpose callback at this point.
# Instantiate provisioning handler, pass in path to config
provisioner.get_official_certs(callback, None, None)
else:
provisioner.get_official_certs(callback, vin, 'csr-bootstrap.csr')
#except IOError:
# print("### Bootstrap cert non-existent. Official cert may already be in place.")
print("Registering Device ...")
response = m.registerDevice(externalId, thingName, authorization, provisioner.CertificateId)
if response.status_code == 200 or response.status_code == 204 or response.status_code == 201:
print("Device registered successfully...")
elif response.status_code == 409:
print("Device already registered. Will attempt to activate the device...")
else:
print(response)
print("Error registering the device. Exiting.")
exit()
response = m.activateDevice(deviceMaker = deviceMaker, externalId = externalId, thingName = thingName, vin = vin, authorization = authorization)
if response.status_code == 204:
print("Device activated successfully...")
else:
print("Error activating device. Exiting.")
exit()
response = m.associateDevice(username = username, vin = vin, authorization = authorization)
if response.status_code == 204 or response.status_code == 200:
print("Device associated successfully...")
print("External ID: {}".format(externalId))
print("Thing Name: {}".format(thingName))
else:
print("Error associating device to user. Exiting.")
exit()
print("Vehicle setup sucessfully, please visit http://{} to login with your user and see your vehicle".format(m.cloudFrontDomainUrl))