自由屋推书网—热门的小说推荐平台!

你的位置: 首页 > mysql

使用FlinkSQL实现实时采集Kafka内容到MySQL

2022-04-20 15:34:24

01 引言

最近在做实时采集​​Kafka​发布的内容到​​MySQL​​,本文记录一下关键的点,细节不再描述,希望能帮助到大家。

02 实现

2.1 添加依赖

在工程,除了添加基础的​​Flink​环境依赖,还需要添加​​flink-connector-kafka​​的依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.13.6</version></dependency>

除此,因为​​Flink​把​​Kafka​作为了​​Source​,所以读取的字符串是有解析方式的,本文主要使用的是“​​json​”的方式,因此还需要引入序列化包的,但是​​flink-connector-kafka​​已经自带了,所以没必要再引入。

ok,到这里如果我们写好​​FlinkSQL​去启动,直接就会一闪而退了,为什么呢?因为我们缺少了’ ​​kafka-clients-2.1.0.jar​'这个包,但是也无需引入,因为在​​flink-connector-kafka​​里面已经自带了。

为什么要在这里特别提示 “序列化包”和“kafka-clients包呢”?因为如果我们采用Flink On Yarn的方式部署时,这两个包是需要放到HDFS的,如下:

2.2 Flink SQL

好了,到了关键的​​FlinkSQL​​了,该如何写呢?

首先看看​​Source​,也就是我们的​​Kafka​​,如下:

CREATE TABLE t_student (id INT,name STRING) WITH ('connector' = 'kafka','topic' = 'cdc_user','properties.bootstrap.servers' = '10.194.166.92:9092','properties.group.id' = 'flink-cdc-mysql-kafka','scan.startup.mode' = 'earliest-offset','format' = 'json')

然后​​Sink​输出,我这里需要输出到​​MySQL​​:

CREATE TABLE t_student_copy (id INT,name STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://127.0.0.1:3306/big_data', 'username' = 'root', 'password' = '123456', 'table-name' = 't_student_copy')

最后,使用​​INSERT INTO​​声明如何写入:

INSERT INTO t_student_copy(id,name) SELECT id,name FROM t_student

2.3 配置Kafka域名

还有一点需要注意的是,当我们跑Flink的程序的时候,会出现类似如下错误:

unable to connect broker…

这个时候,我们要在跑​​Flink​的程序的服务器配置​​Kafka​的域名,具体在​​hosts​​文件里配置:

vi /etc/hosts

ok,到这里,只要我们只要使用​​Kafka​工具发送​​json​格式的数据,​​Flink​程序就能实时收到,并写入​​MySQL​​数据库。

03 文末

本文主要是记录​​Kafka​如何实时写入到​​MySQL​​的一些坑点,完整源码就不贴出来了,希望能给大家一点启示并帮助到大家,谢谢大家的阅读,本文完!

编辑推荐

热门小说