quickstart-flink/quickstart-connector/assembly/conf/application.yml (179 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # flink: option: target: yarn-per-job detached: shutdownOnAttachedExit: jobmanager: property: #@see: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/ $internal.application.main: org.apache.streampark.flink.quickstart.QuickStartApp pipeline.name: streampark-quickstartApp yarn.application.queue: taskmanager.numberOfTaskSlots: 1 parallelism.default: 2 jobmanager.memory: flink.size: heap.size: jvm-metaspace.size: jvm-overhead.max: off-heap.size: process.size: taskmanager.memory: flink.size: framework.heap.size: framework.off-heap.size: managed.size: process.size: task.heap.size: task.off-heap.size: jvm-metaspace.size: jvm-overhead.max: jvm-overhead.min: managed.fraction: 0.4 pipeline: auto-watermark-interval: 200ms # checkpoint execution: checkpointing: mode: EXACTLY_ONCE interval: 30s timeout: 10min unaligned: false externalized-checkpoint-retention: RETAIN_ON_CANCELLATION # state backend state: backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'), backend.incremental: true checkpoint-storage: filesystem savepoints.dir: file:///tmp/chkdir checkpoints.dir: file:///tmp/chkdir # restart strategy restart-strategy: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies] restart-strategy.fixed-delay: attempts: 3 delay: 5000 restart-strategy.failure-rate: max-failures-per-interval: failure-rate-interval: delay: # table table: table.local-time-zone: default # @see https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/ app: # user's parameter # kafka source config.... kafka.source: bootstrap.servers: kfk01:9092,kfk02:9092,kfk03:9092 topic: test_user group.id: flink_02 auto.offset.reset: earliest #enable.auto.commit: true #start.from: #timestamp: 1591286400000 #指定timestamp,针对所有的topic生效 #offset: # 给每个topic的partition指定offset #topic: kafka01,kafka02 #kafka01: 0:182,1:183,2:182 #分区0从182开始消费,分区1从183... #kafka02: 0:182,1:183,2:182 # kafka sink config.... kafka.sink: bootstrap.servers: kfk01:9092,kfk02:9092,kfk03:9092 topic: test_user transaction.timeout.ms: 1000 semantic: AT_LEAST_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE batch.size: 1 # jdbc config... jdbc: semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE driverClassName: com.mysql.jdbc.Driver jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true username: root password: 123456 # influx config... influx: mydb: jdbcUrl: http://test9:8086 #username: admin #password: admin # hbase hbase: zookeeper.quorum: test1,test2,test6 zookeeper.property.clientPort: 2181 zookeeper.session.timeout: 1200000 rpc.timeout: 5000 client.pause: 20 ##clickhouse jdbc同步写入配置 #clickhouse: # sink: # # 写入节点地址 # jdbcUrl: jdbc:clickhouse://127.0.0.1:8123,192.168.1.2:8123 # socketTimeout: 3000000 # database: test # user: $user # password: $password # # 写结果表及对应的字段,全部可不写字段 # targetTable: orders(userId,siteId) # batch: # size: 1 # delaytime: 6000000 clickhouse: sink: hosts: 127.0.0.1:8123,192.168.1.2:8123 socketTimeout: 3000000 database: test user: $user password: $password targetTable: test.orders(userId,siteId) batch: size: 1 delaytime: 60000 threshold: bufferSize: 10 # 异步写入的并发数 numWriters: 4 # 缓存队列大小 queueCapacity: 100 delayTime: 10 requestTimeout: 600 retries: 1 # 成功响应码 successCode: 200 failover: table: chfailover # 达到失败最大写入次数后,数据备份的组件 storage: kafka #kafka|mysql|hbase|hdfs mysql: driverClassName: com.mysql.cj.jdbc.Driver jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true username: user password: pass kafka: bootstrap.servers: localhost:9092 topic: test1 group.id: user_01 auto.offset.reset: latest hbase: hdfs: path: /data/chfailover namenode: hdfs://localhost:8020 user: hdfs #http sink 配置 http.sink: threshold: numWriters: 3 queueCapacity: 10000 #队列最大容量,视单条记录大小而自行估量队列大小,如值太大,上游数据源来的太快,下游写入数据跟不上可能会OOM. timeout: 100 #发送http请求的超时时间 retries: 3 #发送失败时的最大重试次数 successCode: 200 #发送成功状态码,这里可以有多个值,用","号分隔 failover: table: record storage: mysql #kafka,hbase,hdfs jdbc: # 保存类型为MySQL,将失败的数据保存到MySQL jdbcUrl: jdbc:mysql://localhost:3306/test username: root password: 123456 kafka: topic: bigdata bootstrap.servers: localhost:9093 hbase: zookeeper.quorum: localhost zookeeper.property.clientPort: 2181 hdfs: namenode: hdfs://localhost:8020 # namenode rpc address and port, e.g: hdfs://hadoop:8020 , hdfs://hadoop:9000 user: benjobs # user path: /clickhouse/failover # save path format: yyyy-MM-dd #redis sink 配置 redis.sink: # masterName: master 哨兵模式参数 # host: 192.168.0.1:6379, 192.168.0.3:6379 哨兵模式参数 host: 127.0.0.1 port: 6379 database: 2 connectType: jedisPool #可选参数:jedisPool(默认)|sentinel es.sink: # 必填参数,多个节点使用 host1:port, host2:port, host: localhost:9200 # 选填参数 # es: # disableFlushOnCheckpoint: false #是否 # auth: # user: # password: # rest: # max.retry.timeout: # path.prefix: # content.type: # connect: # request.timeout: # timeout: # cluster.name: elasticsearch # client.transport.sniff: # bulk.flush.: sql: my_flinksql: | CREATE TABLE datagen ( f_sequence INT, f_random INT, f_random_str STRING, ts AS localtimestamp, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'datagen', -- optional options -- 'rows-per-second'='5', --这个是注释-- 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ); CREATE TABLE print_table ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'print' ); INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;