quickstart-flink/quickstart-connector/assembly/conf/application.yml (179 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
flink:
option:
target: yarn-per-job
detached:
shutdownOnAttachedExit:
jobmanager:
property: #@see: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/
$internal.application.main: org.apache.streampark.flink.quickstart.QuickStartApp
pipeline.name: streampark-quickstartApp
yarn.application.queue:
taskmanager.numberOfTaskSlots: 1
parallelism.default: 2
jobmanager.memory:
flink.size:
heap.size:
jvm-metaspace.size:
jvm-overhead.max:
off-heap.size:
process.size:
taskmanager.memory:
flink.size:
framework.heap.size:
framework.off-heap.size:
managed.size:
process.size:
task.heap.size:
task.off-heap.size:
jvm-metaspace.size:
jvm-overhead.max:
jvm-overhead.min:
managed.fraction: 0.4
pipeline:
auto-watermark-interval: 200ms
# checkpoint
execution:
checkpointing:
mode: EXACTLY_ONCE
interval: 30s
timeout: 10min
unaligned: false
externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# state backend
state:
backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
backend.incremental: true
checkpoint-storage: filesystem
savepoints.dir: file:///tmp/chkdir
checkpoints.dir: file:///tmp/chkdir
# restart strategy
restart-strategy: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
restart-strategy.fixed-delay:
attempts: 3
delay: 5000
restart-strategy.failure-rate:
max-failures-per-interval:
failure-rate-interval:
delay:
# table
table:
table.local-time-zone: default # @see https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/
app: # user's parameter
# kafka source config....
kafka.source:
bootstrap.servers: kfk01:9092,kfk02:9092,kfk03:9092
topic: test_user
group.id: flink_02
auto.offset.reset: earliest
#enable.auto.commit: true
#start.from:
#timestamp: 1591286400000 #指定timestamp,针对所有的topic生效
#offset: # 给每个topic的partition指定offset
#topic: kafka01,kafka02
#kafka01: 0:182,1:183,2:182 #分区0从182开始消费,分区1从183...
#kafka02: 0:182,1:183,2:182
# kafka sink config....
kafka.sink:
bootstrap.servers: kfk01:9092,kfk02:9092,kfk03:9092
topic: test_user
transaction.timeout.ms: 1000
semantic: AT_LEAST_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
batch.size: 1
# jdbc config...
jdbc:
semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
driverClassName: com.mysql.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 123456
# influx config...
influx:
mydb:
jdbcUrl: http://test9:8086
#username: admin
#password: admin
# hbase
hbase:
zookeeper.quorum: test1,test2,test6
zookeeper.property.clientPort: 2181
zookeeper.session.timeout: 1200000
rpc.timeout: 5000
client.pause: 20
##clickhouse jdbc同步写入配置
#clickhouse:
# sink:
# # 写入节点地址
# jdbcUrl: jdbc:clickhouse://127.0.0.1:8123,192.168.1.2:8123
# socketTimeout: 3000000
# database: test
# user: $user
# password: $password
# # 写结果表及对应的字段,全部可不写字段
# targetTable: orders(userId,siteId)
# batch:
# size: 1
# delaytime: 6000000
clickhouse:
sink:
hosts: 127.0.0.1:8123,192.168.1.2:8123
socketTimeout: 3000000
database: test
user: $user
password: $password
targetTable: test.orders(userId,siteId)
batch:
size: 1
delaytime: 60000
threshold:
bufferSize: 10
# 异步写入的并发数
numWriters: 4
# 缓存队列大小
queueCapacity: 100
delayTime: 10
requestTimeout: 600
retries: 1
# 成功响应码
successCode: 200
failover:
table: chfailover
# 达到失败最大写入次数后,数据备份的组件
storage: kafka #kafka|mysql|hbase|hdfs
mysql:
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
username: user
password: pass
kafka:
bootstrap.servers: localhost:9092
topic: test1
group.id: user_01
auto.offset.reset: latest
hbase:
hdfs:
path: /data/chfailover
namenode: hdfs://localhost:8020
user: hdfs
#http sink 配置
http.sink:
threshold:
numWriters: 3
queueCapacity: 10000 #队列最大容量,视单条记录大小而自行估量队列大小,如值太大,上游数据源来的太快,下游写入数据跟不上可能会OOM.
timeout: 100 #发送http请求的超时时间
retries: 3 #发送失败时的最大重试次数
successCode: 200 #发送成功状态码,这里可以有多个值,用","号分隔
failover:
table: record
storage: mysql #kafka,hbase,hdfs
jdbc: # 保存类型为MySQL,将失败的数据保存到MySQL
jdbcUrl: jdbc:mysql://localhost:3306/test
username: root
password: 123456
kafka:
topic: bigdata
bootstrap.servers: localhost:9093
hbase:
zookeeper.quorum: localhost
zookeeper.property.clientPort: 2181
hdfs:
namenode: hdfs://localhost:8020 # namenode rpc address and port, e.g: hdfs://hadoop:8020 , hdfs://hadoop:9000
user: benjobs # user
path: /clickhouse/failover # save path
format: yyyy-MM-dd
#redis sink 配置
redis.sink:
# masterName: master 哨兵模式参数
# host: 192.168.0.1:6379, 192.168.0.3:6379 哨兵模式参数
host: 127.0.0.1
port: 6379
database: 2
connectType: jedisPool #可选参数:jedisPool(默认)|sentinel
es.sink:
# 必填参数,多个节点使用 host1:port, host2:port,
host: localhost:9200
# 选填参数
# es:
# disableFlushOnCheckpoint: false #是否
# auth:
# user:
# password:
# rest:
# max.retry.timeout:
# path.prefix:
# content.type:
# connect:
# request.timeout:
# timeout:
# cluster.name: elasticsearch
# client.transport.sniff:
# bulk.flush.:
sql:
my_flinksql: |
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5', --这个是注释--
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
);
CREATE TABLE print_table (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
);
INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;