in kinesis_video_streamer/launch/kinesis_video_streamer.launch.py [0:0]
def generate_launch_description():
# Default to included config file
default_config = os.path.join(get_package_share_directory('kinesis_video_streamer'), 'config', 'sample_config.yaml')
with open(default_config, 'r') as f:
default_config_text = f.read()
default_config_yaml = yaml.safe_load(default_config_text)
default_aws_region = default_config_yaml['kinesis_video_streamer']['ros__parameters']['aws_client_configuration']['region']
default_stream_name = default_config_yaml['kinesis_video_streamer']['ros__parameters']['kinesis_video']['stream0']['stream_name']
default_log4cplus_config = default_config_yaml['kinesis_video_streamer']['ros__parameters']['kinesis_video']['log4cplus_config']
ld = launch.LaunchDescription([
launch.actions.DeclareLaunchArgument(
NODE_NAME,
default_value="kinesis_video_streamer",
),
launch.actions.DeclareLaunchArgument(
CONFIG,
default_value=default_config
),
launch.actions.DeclareLaunchArgument(
AWS_REGION,
default_value=default_aws_region
),
launch.actions.DeclareLaunchArgument(
STREAM_NAME,
default_value=default_stream_name
),
launch.actions.DeclareLaunchArgument(
REKOGNITION_DATA_STREAM,
default_value=''
)
])
node_parameters = [LaunchConfiguration(CONFIG), {
'aws_client_configuration': {
'region': LaunchConfiguration(AWS_REGION)
},
'kinesis_video': {
'stream0': PythonExpression([
"{ 'stream_name': '", LaunchConfiguration(STREAM_NAME), "',"
" 'rekognition_data_stream': '", LaunchConfiguration(REKOGNITION_DATA_STREAM), "' }",
" if '", LaunchConfiguration(REKOGNITION_DATA_STREAM), "' else ",
"{ 'stream_name': '", LaunchConfiguration(STREAM_NAME), "' }"
])
}
}]
logger_config_path = get_logger_config_path(default_log4cplus_config)
if logger_config_path is not None:
node_parameters.append({
'kinesis_video': {
'log4cplus_config': logger_config_path
}
})
streamer_node = launch_ros.actions.Node(
package="kinesis_video_streamer",
node_executable="kinesis_video_streamer",
node_name=LaunchConfiguration(NODE_NAME),
parameters=node_parameters
)
ld.add_action(streamer_node)
return ld