in api/conftest.py [0:0]
def mock_dynamodb():
mock_dynamodb2().start()
session = Session(aws_access_key_id='<ACCESS_KEY_ID>',
aws_secret_access_key='<SECRET_KEY>',
region_name='us-west-2')
dynamodb = session.resource('dynamodb')
wheel_table = dynamodb.create_table(
TableName=WHEEL_TABLE_NAME,
KeySchema=[
{
'AttributeName': 'id',
'KeyType': 'HASH'
}
],
AttributeDefinitions=[
{
'AttributeName': 'id',
'AttributeType': 'S'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
}
)
participant_table = dynamodb.create_table(
TableName=PARTICIPANT_TABLE_NAME,
KeySchema=[
{
'AttributeName': 'wheel_id',
'KeyType': 'HASH'
},
{
'AttributeName': 'id',
'KeyType': 'RANGE'
}
],
AttributeDefinitions=[
{
'AttributeName': 'wheel_id',
'AttributeType': 'S'
},
{
'AttributeName': 'id',
'AttributeType': 'S'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
# Wait on table creation
wheel_table.meta.client.get_waiter('table_exists').wait(TableName=WHEEL_TABLE_NAME)
participant_table.meta.client.get_waiter('table_exists').wait(TableName=PARTICIPANT_TABLE_NAME)
yield dynamodb
mock_dynamodb2().stop()