def lambda_handler()

in resources/flink-on-kda/index.py [0:0]


def lambda_handler(event, context):
    # log the event
    print("Calling lambda scaler for kda - " + json.dumps(event))

    # get KDA app name
    if 'scalableTargetDimensionId' in event['pathParameters']:
        resourceName = event['pathParameters']['scalableTargetDimensionId']
        print("Fetching resource name in kda scaler - " + resourceName)
    else:
        message = "Error, scalableTargetDimensionId not found"
        return response_function(400, str(message))

    # get details for the KDA app in question
    try:
        response = client_kda.describe_application(
            ApplicationName=resourceName
        )

        print("In kda scaler, response from kda.describe_application")
        print(response)

        appVersion = response["ApplicationDetail"]["ApplicationVersionId"]
        applicationStatus = response["ApplicationDetail"]["ApplicationStatus"]
        parallelism = response["ApplicationDetail"]["ApplicationConfigurationDescription"][
            "FlinkApplicationConfigurationDescription"]["ParallelismConfigurationDescription"]["Parallelism"]
        actualCapacity = parallelism
    except Exception as e:
        print("Exception in kda scaler: " + str(e))
        message = "Error, cannot find a kinesis data analytics app called " + resourceName
        return response_function(404, message)

    # try to retrive the desired capacity from ParameterStore

    response = client_ssm.get_parameter(
        Name=PARAMETER_STORE
    )
    print("In kda scaler, just called ssm.get_parameter (1)")
    print(response)
    

    if 'Parameter' in response:
        if 'Value' in response['Parameter']:
            desiredCapacity = response['Parameter']['Value']
            print("In kda scaler, desiredCapacity: " + str(desiredCapacity))
    else:
        # if I do not have an entry in ParameterStore, I assume that the desiredCapacity = actualCapacity
        desiredCapacity = actualCapacity

    if applicationStatus == "UPDATING":
        scalingStatus = "InProgress"
    elif applicationStatus == "RUNNING":
        scalingStatus = "Successful"

    print("In kda scaler, scalingStatus: " + scalingStatus)

    if event['httpMethod'] == "PATCH":

        # Check whether autoscaling is calling to change the Desired Capacity
        if 'desiredCapacity' in event['body']:
            desiredCapacityBody = json.loads(event['body'])
            desiredCapacityBody = desiredCapacityBody['desiredCapacity']

            # Check whether the new desired capacity is negative. If so, I need to calculate the new desired capacity
            if int(desiredCapacityBody) >= 0:
                desiredCapacity = desiredCapacityBody

                # Store the new desired capacity in a ParameterStore
                response = client_ssm.put_parameter(
                    Name=PARAMETER_STORE,
                    Value=str(int(desiredCapacity)),
                    Type='String',
                    Overwrite=True
                )
                print(response)
                print("In kda scaler, trying to set capacity to " + str(desiredCapacity))

                if scalingStatus == "Successful" and int(desiredCapacity) != int(actualCapacity):
                    scalingStatus = update_parallelism(context, desiredCapacity, resourceName, appVersion)
            else:
                print("desiredCapacity was < 0")

    elif event['httpMethod'] == "GET":
        if scalingStatus == "Successful" and int(desiredCapacity) != int(actualCapacity):
            scalingStatus = update_parallelism(context, desiredCapacity, resourceName, appVersion)
        elif scalingStatus == "Successful":
            print("Scaling successful; not doing anything")
    
    else:
        print("Unknown http method!: " + event['httpMethod'])

    # Do NOT change the version in response!
    # Doing so will cause the scalable target
    # to get unregistered along with the attached
    # scaling policies.
    returningJson = {
        "actualCapacity": float(actualCapacity),
        "desiredCapacity": float(desiredCapacity),
        "dimensionName": resourceName,
        "resourceName": resourceName,
        "scalableTargetDimensionId": resourceName,
        "scalingStatus": scalingStatus,
        "version": "KDAScaling"
    }

    try:
        returningJson['failureReason'] = failureReason
    except:
        pass

    print(returningJson)

    return response_function(200, returningJson)