in index.py [0:0]
def lambda_handler(event, context):
# log the event
print (json.dumps(event))
# get Stream name
if 'scalableTargetDimensionId' in event['pathParameters']:
resourceName = event['pathParameters']['scalableTargetDimensionId']
print (resourceName)
else:
message = "Error, scalableTargetDimensionId not found"
return response_function(400, str(message))
# try to get information of the Kinesis stream
try:
response = client_kinesis.describe_stream_summary(
StreamName=resourceName,
)
print(response)
streamStatus = response['StreamDescriptionSummary']['StreamStatus']
shardsNumber = response['StreamDescriptionSummary']['OpenShardCount']
actualCapacity = shardsNumber
except Exception as e:
message = "Error, cannot find a Kinesis stream called " + resourceName
return response_function(404, message)
# try to retrive the desired capacity from ParameterStore
response = client_ssm.get_parameter(
Name=PARAMETER_STORE
)
print(response)
if 'Parameter' in response:
if 'Value' in response['Parameter']:
desiredCapacity = response['Parameter']['Value']
print(desiredCapacity)
else:
# if I do not have an entry in ParameterStore, I assume that the desiredCapacity is like the actualCapacity
desiredCapacity = actualCapacity
if streamStatus == "UPDATING":
scalingStatus = "InProgress"
elif streamStatus == "ACTIVE":
scalingStatus = "Successful"
if event['httpMethod'] == "PATCH":
# Check whether autoscaling is calling to change the Desired Capacity
if 'desiredCapacity' in event['body']:
desiredCapacityBody = json.loads(event['body'])
desiredCapacityBody = desiredCapacityBody['desiredCapacity']
# Check whether the new desired capacity is negative. If so, I need to calculate the new desired capacity
if int(desiredCapacityBody) >= 0:
desiredCapacity = desiredCapacityBody
# Store the new desired capacity in a ParamenterStore
response = client_ssm.put_parameter(
Name=PARAMETER_STORE,
Value=str(int(desiredCapacity)),
Type='String',
Overwrite=True
)
print(response)
print ("Trying to set capacity to "+ str(desiredCapacity))
global AUTOSCALINGPOLICYOUT_ARN
global AUTOSCALINGPOLICYIN_ARN
if 'AutoScalingPolicyOut' and 'AutoScalingPolicyIn' not in os.environ:
autoscaling_policy_arn(context)
AUTOSCALINGPOLICYOUT_ARN = os.environ['AutoScalingPolicyOut']
AUTOSCALINGPOLICYIN_ARN = os.environ['AutoScalingPolicyIn']
scalingStatus = update_shards(desiredCapacity,resourceName)
if scalingStatus == "Successful" and float(desiredCapacity) != float(actualCapacity):
scalingStatus = update_shards(desiredCapacity,resourceName)
returningJson = {
"actualCapacity": float(actualCapacity),
"desiredCapacity": float(desiredCapacity),
"dimensionName": resourceName,
"resourceName": resourceName,
"scalableTargetDimensionId": resourceName,
"scalingStatus": scalingStatus,
"version": "MyVersion"
}
try:
returningJson['failureReason'] = failureReason
except:
pass
print(returningJson)
return response_function(200, returningJson)