Flink 实时计算

Apache Flink 诞生于柏林工业大学的一个研究性项目,原名 StratoSphere 。2014 年,由 StratoSphere 项目孵化出 Flink,并于同年捐赠 Apache,之后成为 Apache 的顶级项目。2019 年 1 年,阿里巴巴收购了 Flink 的母公司 Data Artisans,并宣布开源内部的 Blink,Blink 是阿里巴巴基于 Flink 优化后的版本,增加了大量的新功能,并在性能和稳定性上进行了各种优化,经历过阿里内部多种复杂业务的挑战和检验。同时阿里巴巴也表示会逐步将这些新功能和特性 Merge 回社区版本的 Flink 中,因此 Flink 成为目前最为火热的大数据处理框架。

简单来说,Flink 是一个分布式的流处理框架,它能够对有界和无界的数据流进行高效的处理。Flink 的核心是流处理,当然它也能支持批处理,Flink 将批处理看成是流处理的一种特殊情况,即数据流是有明确界限的。这和 Spark Streaming 的思想是完全相反的,Spark Streaming 的核心是批处理,它将流处理看成是批处理的一种特殊情况, 即把数据流进行极小粒度的拆分,拆分为多个微批处理。

Flink 有界数据流和无界数据流:

image-20230927231348743

Spark Streaming 数据流的拆分:

image-20230927231353142

Flink 采用分层的架构设计,从而保证各层在功能和职责上的清晰。如下图所示,由上而下分别是 API & Libraries 层、Runtime 核心层以及物理部署层:

image-20230927231356608

2.1 API & Libraries 层

这一层主要提供了编程 API 和 顶层类库:

  • 编程 API : 用于进行流处理的 DataStream API 和用于进行批处理的 DataSet API;
  • 顶层类库:包括用于复杂事件处理的 CEP 库;用于结构化数据查询的 SQL & Table 库,以及基于批处理的机器学习库 FlinkML 和 图形处理库 Gelly。

2.2 Runtime 核心层

这一层是 Flink 分布式计算框架的核心实现层,包括作业转换,任务调度,资源分配,任务执行等功能,基于这一层的实现,可以在流式引擎下同时运行流处理程序和批处理程序。

2.3 物理部署层

Flink 的物理部署层,用于支持在不同平台上部署运行 Flink 应用。

在上面介绍的 API & Libraries 这一层,Flink 又进行了更为具体的划分。具体如下:

image-20230927231359983

按照如上的层次结构,API 的一致性由下至上依次递增,接口的表现能力由下至上依次递减,各层的核心功能如下:

3.1 SQL & Table API

SQL & Table API 同时适用于批处理和流处理,这意味着你可以对有界数据流和无界数据流以相同的语义进行查询,并产生相同的结果。除了基本查询外, 它还支持自定义的标量函数,聚合函数以及表值函数,可以满足多样化的查询需求。

3.2 DataStream & DataSet API

DataStream & DataSet API 是 Flink 数据处理的核心 API,支持使用 Java 语言或 Scala 语言进行调用,提供了数据读取,数据转换和数据输出等一系列常用操作的封装。

3.3 Stateful Stream Processing

Stateful Stream Processing 是最低级别的抽象,它通过 Process Function 函数内嵌到 DataStream API 中。 Process Function 是 Flink 提供的最底层 API,具有最大的灵活性,允许开发者对于时间和状态进行细粒度的控制。

4.1 核心组件

按照上面的介绍,Flink 核心架构的第二层是 Runtime 层, 该层采用标准的 Master - Slave 结构, 其中,Master 部分又包含了三个核心组件:Dispatcher、ResourceManager 和 JobManager,而 Slave 则主要是 TaskManager 进程。它们的功能分别如下:

  • JobManagers (也称为 masters) :JobManagers 接收由 Dispatcher 传递过来的执行程序,该执行程序包含了作业图 (JobGraph),逻辑数据流图 (logical dataflow graph) 及其所有的 classes 文件以及第三方类库 (libraries) 等等 。紧接着 JobManagers 会将 JobGraph 转换为执行图 (ExecutionGraph),然后向 ResourceManager 申请资源来执行该任务,一旦申请到资源,就将执行图分发给对应的 TaskManagers 。因此每个作业 (Job) 至少有一个 JobManager;高可用部署下可以有多个 JobManagers,其中一个作为 leader,其余的则处于 standby 状态。
  • TaskManagers (也称为 workers) : TaskManagers 负责实际的子任务 (subtasks) 的执行,每个 TaskManagers 都拥有一定数量的 slots。Slot 是一组固定大小的资源的合集 (如计算能力,存储空间)。TaskManagers 启动后,会将其所拥有的 slots 注册到 ResourceManager 上,由 ResourceManager 进行统一管理。
  • Dispatcher:负责接收客户端提交的执行程序,并传递给 JobManager 。除此之外,它还提供了一个 WEB UI 界面,用于监控作业的执行情况。
  • ResourceManager :负责管理 slots 并协调集群资源。ResourceManager 接收来自 JobManager 的资源请求,并将存在空闲 slots 的 TaskManagers 分配给 JobManager 执行任务。Flink 基于不同的部署平台,如 YARN , Mesos,K8s 等提供了不同的资源管理器,当 TaskManagers 没有足够的 slots 来执行任务时,它会向第三方平台发起会话来请求额外的资源。

image-20230927231403468

4.2 Task & SubTask

上面我们提到:TaskManagers 实际执行的是 SubTask,而不是 Task,这里解释一下两者的区别:

在执行分布式计算时,Flink 将可以链接的操作 (operators) 链接到一起,这就是 Task。之所以这样做, 是为了减少线程间切换和缓冲而导致的开销,在降低延迟的同时可以提高整体的吞吐量。 但不是所有的 operator 都可以被链接,如下 keyBy 等操作会导致网络 shuffle 和重分区,因此其就不能被链接,只能被单独作为一个 Task。 简单来说,一个 Task 就是一个可以链接的最小的操作链 (Operator Chains) 。如下图,source 和 map 算子被链接到一块,因此整个作业就只有三个 Task:

image-20230927231406228

解释完 Task ,我们在解释一下什么是 SubTask,其准确的翻译是: A subtask is one parallel slice of a task,即一个 Task 可以按照其并行度拆分为多个 SubTask。如上图,source & map 具有两个并行度,KeyBy 具有两个并行度,Sink 具有一个并行度,因此整个虽然只有 3 个 Task,但是却有 5 个 SubTask。Jobmanager 负责定义和拆分这些 SubTask,并将其交给 Taskmanagers 来执行,每个 SubTask 都是一个单独的线程。

4.3 资源管理

理解了 SubTasks ,我们再来看看其与 Slots 的对应情况。一种可能的分配情况如下:

image-20230927231408771

这时每个 SubTask 线程运行在一个独立的 TaskSlot, 它们共享所属的 TaskManager 进程的TCP 连接(通过多路复用技术)和心跳信息 (heartbeat messages),从而可以降低整体的性能开销。此时看似是最好的情况,但是每个操作需要的资源都是不尽相同的,这里假设该作业 keyBy 操作所需资源的数量比 Sink 多很多 ,那么此时 Sink 所在 Slot 的资源就没有得到有效的利用。

基于这个原因,Flink 允许多个 subtasks 共享 slots,即使它们是不同 tasks 的 subtasks,但只要它们来自同一个 Job 就可以。假设上面 souce & map 和 keyBy 的并行度调整为 6,而 Slot 的数量不变,此时情况如下:

image-20230927231411156

可以看到一个 Task Slot 中运行了多个 SubTask 子任务,此时每个子任务仍然在一个独立的线程中执行,只不过共享一组 Sot 资源而已。那么 Flink 到底如何确定一个 Job 至少需要多少个 Slot 呢?Flink 对于这个问题的处理很简单,默认情况一个 Job 所需要的 Slot 的数量就等于其 Operation 操作的最高并行度。如下, A,B,D 操作的并行度为 4,而 C,E 操作的并行度为 2,那么此时整个 Job 就需要至少四个 Slots 来完成。通过这个机制,Flink 就可以不必去关心一个 Job 到底会被拆分为多少个 Tasks 和 SubTasks。

image-20230927231413594

4.4 组件通讯

Flink 的所有组件都基于 Actor System 来进行通讯。Actor system是多种角色的 actor 的容器,它提供调度,配置,日志记录等多种服务,并包含一个可以启动所有 actor 的线程池,如果 actor 是本地的,则消息通过共享内存进行共享,但如果 actor 是远程的,则通过 RPC 的调用来传递消息。

image-20230927231416695

最后基于上面的介绍,来总结一下 Flink 的优点:

  • Flink 是基于事件驱动 (Event-driven) 的应用,能够同时支持流处理和批处理;
  • 基于内存的计算,能够保证高吞吐和低延迟,具有优越的性能表现;
  • 支持精确一次 (Exactly-once) 语意,能够完美地保证一致性和正确性;
  • 分层 API ,能够满足各个层次的开发需求;
  • 支持高可用配置,支持保存点机制,能够提供安全性和稳定性上的保证;
  • 多样化的部署方式,支持本地,远端,云端等多种部署方案;
  • 具有横向扩展架构,能够按照用户的需求进行动态扩容;
  • 活跃度极高的社区和完善的生态圈的支持。

参考资料

.png)

Flink 开发环境搭建

一、安装 Scala 插件

Flink 分别提供了基于 Java 语言和 Scala 语言的 API ,如果想要使用 Scala 语言来开发 Flink 程序,可以通过在 IDEA 中安装 Scala 插件来提供语法提示,代码高亮等功能。打开 IDEA , 依次点击 File => settings => plugins 打开插件安装页面,搜索 Scala 插件并进行安装,安装完成后,重启 IDEA 即可生效。

image-20230927231420920

2.1 使用官方脚本构建

Flink 官方支持使用 Maven 和 Gradle 两种构建工具来构建基于 Java 语言的 Flink 项目;支持使用 SBT 和 Maven 两种构建工具来构建基于 Scala 语言的 Flink 项目。 这里以 Maven 为例进行说明,因为其可以同时支持 Java 语言和 Scala 语言项目的构建。需要注意的是 Flink 1.9 只支持 Maven 3.0.4 以上的版本,Maven 安装完成后,可以通过以下两种方式来构建项目:

1. 直接基于 Maven Archetype 构建

直接使用下面的 mvn 语句来进行构建,然后根据交互信息的提示,依次输入 groupId , artifactId 以及包名等信息后等待初始化的完成:

1
2
3
4
$ mvn archetype:generate                               \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0

注:如果想要创建基于 Scala 语言的项目,只需要将 flink-quickstart-java 换成 flink-quickstart-scala 即可,后文亦同。

2. 使用官方脚本快速构建

为了更方便的初始化项目,官方提供了快速构建脚本,可以直接通过以下命令来进行调用:

1
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0

该方式其实也是通过执行 maven archetype 命令来进行初始化,其脚本内容如下:

1
2
3
4
5
6
7
8
9
10
11
PACKAGE=quickstart

mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=${1:-1.8.0} \
-DgroupId=org.myorg.quickstart \
-DartifactId=$PACKAGE \
-Dversion=0.1 \
-Dpackage=org.myorg.quickstart \
-DinteractiveMode=false

可以看到相比于第一种方式,该种方式只是直接指定好了 groupId ,artifactId ,version 等信息而已。

2.2 使用 IDEA 构建

如果你使用的是开发工具是 IDEA ,可以直接在项目创建页面选择 Maven Flink Archetype 进行项目初始化:

image-20230927231425011

如果你的 IDEA 没有上述 Archetype, 可以通过点击右上角的 ADD ARCHETYPE ,来进行添加,依次填入所需信息,这些信息都可以从上述的 archetype:generate 语句中获取。点击 OK 保存后,该 Archetype 就会一直存在于你的 IDEA 中,之后每次创建项目时,只需要直接选择该 Archetype 即可:

image-20230927231428836

选中 Flink Archetype ,然后点击 NEXT 按钮,之后的所有步骤都和正常的 Maven 工程相同。

三、项目结构

3.1 项目结构

创建完成后的自动生成的项目结构如下:

image-20230927231433476

其中 BatchJob 为批处理的样例代码,源码如下:

1
2
3
4
5
6
7
8
9
import org.apache.flink.api.scala._

object BatchJob {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
....
env.execute("Flink Batch Scala API Skeleton")
}
}

getExecutionEnvironment 代表获取批处理的执行环境,如果是本地运行则获取到的就是本地的执行环境;如果在集群上运行,得到的就是集群的执行环境。如果想要获取流处理的执行环境,则只需要将 ExecutionEnvironment 替换为 StreamExecutionEnvironment, 对应的代码样例在 StreamingJob 中:

1
2
3
4
5
6
7
8
9
10
import org.apache.flink.streaming.api.scala._

object StreamingJob {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
...
env.execute("Flink Streaming Scala API Skeleton")
}
}

需要注意的是对于流处理项目 env.execute() 这句代码是必须的,否则流处理程序就不会被执行,但是对于批处理项目则是可选的。

3.2 主要依赖

基于 Maven 骨架创建的项目主要提供了以下核心依赖:其中 flink-scala 用于支持开发批处理程序 ;flink-streaming-scala 用于支持开发流处理程序 ;scala-library 用于提供 Scala 语言所需要的类库。如果在使用 Maven 骨架创建时选择的是 Java 语言,则默认提供的则是 flink-javaflink-streaming-java 依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>

需要特别注意的以上依赖的 scope 标签全部被标识为 provided ,这意味着这些依赖都不会被打入最终的 JAR 包。因为 Flink 的安装包中已经提供了这些依赖,位于其 lib 目录下,名为 flink-dist_*.jar ,它包含了 Flink 的所有核心类和依赖:

image-20230927231438125

scope 标签被标识为 provided 会导致你在 IDEA 中启动项目时会抛出 ClassNotFoundException 异常。基于这个原因,在使用 IDEA 创建项目时还自动生成了以下 profile 配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>

<activation>
<property>
<name>idea.version</name>
</property>
</activation>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>

在 id 为 add-dependencies-for-IDEA 的 profile 中,所有的核心依赖都被标识为 compile,此时你可以无需改动任何代码,只需要在 IDEA 的 Maven 面板中勾选该 profile,即可直接在 IDEA 中运行 Flink 项目:

image-20230927231441755

四、词频统计案例

项目创建完成后,可以先书写一个简单的词频统计的案例来尝试运行 Flink 项目,以下以 Scala 语言为例,分别介绍流处理程序和批处理程序的编程示例:

4.1 批处理示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.flink.api.scala._

object WordCountBatch {

def main(args: Array[String]): Unit = {
val benv = ExecutionEnvironment.getExecutionEnvironment
val dataSet = benv.readTextFile("D:\\wordcount.txt")
dataSet.flatMap { _.toLowerCase.split(",")}
.filter (_.nonEmpty)
.map { (_, 1) }
.groupBy(0)
.sum(1)
.print()
}
}

其中 wordcount.txt 中的内容如下:

1
2
3
4
a,a,a,a,a
b,b,b
c,c
d,d

本机不需要配置其他任何的 Flink 环境,直接运行 Main 方法即可,结果如下:

image-20230927231446126

4.2 流处理示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WordCountStreaming {

def main(args: Array[String]): Unit = {

val senv = StreamExecutionEnvironment.getExecutionEnvironment

val dataStream: DataStream[String] = senv.socketTextStream("192.168.0.229", 9999, '\n')
dataStream.flatMap { line => line.toLowerCase.split(",") }
.filter(_.nonEmpty)
.map { word => (word, 1) }
.keyBy(0)
.timeWindow(Time.seconds(3))
.sum(1)
.print()
senv.execute("Streaming WordCount")
}
}

这里以监听指定端口号上的内容为例,使用以下命令来开启端口服务:

1
nc -lk 9999

之后输入测试数据即可观察到流处理程序的处理情况。

五、使用 Scala Shell

对于日常的 Demo 项目,如果你不想频繁地启动 IDEA 来观察测试结果,可以像 Spark 一样,直接使用 Scala Shell 来运行程序,这对于日常的学习来说,效果更加直观,也更省时。Flink 安装包的下载地址如下:

1
https://flink.apache.org/downloads.html

Flink 大多数版本都提供有 Scala 2.11 和 Scala 2.12 两个版本的安装包可供下载:

image-20230927231449697

下载完成后进行解压即可,Scala Shell 位于安装目录的 bin 目录下,直接使用以下命令即可以本地模式启动:

1
./start-scala-shell.sh local

命令行启动完成后,其已经提供了批处理 (benv 和 btenv)和流处理(senv 和 stenv)的运行环境,可以直接运行 Scala Flink 程序,示例如下:

image-20230927231452921

最后解释一个常见的异常:这里我使用的 Flink 版本为 1.9.1,启动时会抛出如下异常。这里因为按照官方的说明,目前所有 Scala 2.12 版本的安装包暂时都不支持 Scala Shell,所以如果想要使用 Scala Shell,只能选择 Scala 2.11 版本的安装包。

1
2
[root@hadoop001 bin]# ./start-scala-shell.sh local
错误: 找不到或无法加载主类 org.apache.flink.api.scala.FlinkShell

Flink Data Source

一、内置 Data Source

Flink Data Source 用于定义 Flink 程序的数据来源,Flink 官方提供了多种数据获取方法,用于帮助开发者简单快速地构建输入流,具体如下:

1.1 基于文件构建

**1. readTextFile(path)**:按照 TextInputFormat 格式读取文本文件,并将其内容以字符串的形式返回。示例如下:

1
env.readTextFile(filePath).print();

2. readFile(fileInputFormat, path) :按照指定格式读取文件。

**3. readFile(inputFormat, filePath, watchType, interval, typeInformation)**:按照指定格式周期性的读取文件。其中各个参数的含义如下:

  • inputFormat:数据流的输入格式。
  • filePath:文件路径,可以是本地文件系统上的路径,也可以是 HDFS 上的文件路径。
  • watchType:读取方式,它有两个可选值,分别是 FileProcessingMode.PROCESS_ONCEFileProcessingMode.PROCESS_CONTINUOUSLY:前者表示对指定路径上的数据只读取一次,然后退出;后者表示对路径进行定期地扫描和读取。需要注意的是如果 watchType 被设置为 PROCESS_CONTINUOUSLY,那么当文件被修改时,其所有的内容 (包含原有的内容和新增的内容) 都将被重新处理,因此这会打破 Flink 的 exactly-once 语义。
  • interval:定期扫描的时间间隔。
  • typeInformation:输入流中元素的类型。

使用示例如下:

1
2
3
4
5
6
7
8
final String filePath = "D:\\log4j.properties";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readFile(new TextInputFormat(new Path(filePath)),
filePath,
FileProcessingMode.PROCESS_ONCE,
1,
BasicTypeInfo.STRING_TYPE_INFO).print();
env.execute();

1.2 基于集合构建

**1. fromCollection(Collection)**:基于集合构建,集合中的所有元素必须是同一类型。示例如下:

1
env.fromCollection(Arrays.asList(1,2,3,4,5)).print();

**2. fromElements(T …)**: 基于元素构建,所有元素必须是同一类型。示例如下:

1
env.fromElements(1,2,3,4,5).print();

**3. generateSequence(from, to)**:基于给定的序列区间进行构建。示例如下:

1
env.generateSequence(0,100);

**4. fromCollection(Iterator, Class)**:基于迭代器进行构建。第一个参数用于定义迭代器,第二个参数用于定义输出元素的类型。使用示例如下:

1
env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print();

其中 CustomIterator 为自定义的迭代器,这里以产生 1 到 100 区间内的数据为例,源码如下。需要注意的是自定义迭代器除了要实现 Iterator 接口外,还必须要实现序列化接口 Serializable ,否则会抛出序列化失败的异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.io.Serializable;
import java.util.Iterator;

public class CustomIterator implements Iterator<Integer>, Serializable {
private Integer i = 0;

@Override
public boolean hasNext() {
return i < 100;
}

@Override
public Integer next() {
i++;
return i;
}
}

**5. fromParallelCollection(SplittableIterator, Class)**:方法接收两个参数,第二个参数用于定义输出元素的类型,第一个参数 SplittableIterator 是迭代器的抽象基类,它用于将原始迭代器的值拆分到多个不相交的迭代器中。

1.3 基于 Socket 构建

Flink 提供了 socketTextStream 方法用于构建基于 Socket 的数据流,socketTextStream 方法有以下四个主要参数:

  • hostname:主机名;
  • port:端口号,设置为 0 时,表示端口号自动分配;
  • delimiter:用于分隔每条记录的分隔符;
  • maxRetry:当 Socket 临时关闭时,程序的最大重试间隔,单位为秒。设置为 0 时表示不进行重试;设置为负值则表示一直重试。示例如下:
1
env.socketTextStream("192.168.0.229", 9999, "\n", 3).print();

二、自定义 Data Source

2.1 SourceFunction

除了内置的数据源外,用户还可以使用 addSource 方法来添加自定义的数据源。自定义的数据源必须要实现 SourceFunction 接口,这里以产生 [0 , 1000) 区间内的数据为例,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new SourceFunction<Long>() {

private long count = 0L;
private volatile boolean isRunning = true;

public void run(SourceContext<Long> ctx) {
while (isRunning && count < 1000) {
// 通过collect将输入发送出去
ctx.collect(count);
count++;
}
}

public void cancel() {
isRunning = false;
}

}).print();
env.execute();

2.2 ParallelSourceFunction 和 RichParallelSourceFunction

上面通过 SourceFunction 实现的数据源是不具有并行度的,即不支持在得到的 DataStream 上调用 setParallelism(n) 方法,此时会抛出如下的异常:

1
Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source

如果你想要实现具有并行度的输入流,则需要实现 ParallelSourceFunction 或 RichParallelSourceFunction 接口,其与 SourceFunction 的关系如下图:

image-20230927231505049
ParallelSourceFunction 直接继承自 ParallelSourceFunction,具有并行度的功能。RichParallelSourceFunction 则继承自 AbstractRichFunction,同时实现了 ParallelSourceFunction 接口,所以其除了具有并行度的功能外,还提供了额外的与生命周期相关的方法,如 open() ,closen() 。

三、Streaming Connectors

3.1 内置连接器

除了自定义数据源外, Flink 还内置了多种连接器,用于满足大多数的数据收集场景。当前内置连接器的支持情况如下:

  • Apache Kafka (支持 source 和 sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)

除了上述的连接器外,你还可以通过 Apache Bahir 的连接器扩展 Flink。Apache Bahir 旨在为分布式数据分析系统 (如 Spark,Flink) 等提供功能上的扩展,当前其支持的与 Flink 相关的连接器如下:

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

随着 Flink 的不断发展,可以预见到其会支持越来越多类型的连接器,关于连接器的后续发展情况,可以查看其官方文档:Streaming Connectors 。在所有 DataSource 连接器中,使用的广泛的就是 Kafka,所以这里我们以其为例,来介绍 Connectors 的整合步骤。

3.2 整合 Kakfa

1. 导入依赖

整合 Kafka 时,一定要注意所使用的 Kafka 的版本,不同版本间所需的 Maven 依赖和开发时所调用的类均不相同,具体如下:

Maven 依赖 Flink 版本 Consumer and Producer 类的名称 Kafka 版本
flink-connector-kafka-0.8_2.11 1.0.0 + FlinkKafkaConsumer08
FlinkKafkaProducer08
0.8.x
flink-connector-kafka-0.9_2.11 1.0.0 + FlinkKafkaConsumer09
FlinkKafkaProducer09
0.9.x
flink-connector-kafka-0.10_2.11 1.2.0 + FlinkKafkaConsumer010
FlinkKafkaProducer010
0.10.x
flink-connector-kafka-0.11_2.11 1.4.0 + FlinkKafkaConsumer011
FlinkKafkaProducer011
0.11.x
flink-connector-kafka_2.11 1.7.0 + FlinkKafkaConsumer
FlinkKafkaProducer
>= 1.0.0

这里我使用的 Kafka 版本为 kafka_2.12-2.2.0,添加的依赖如下:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.0</version>
</dependency>

2. 代码开发

这里以最简单的场景为例,接收 Kafka 上的数据并打印,代码如下:

1
2
3
4
5
6
7
8
9
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// 指定Kafka的连接位置
properties.setProperty("bootstrap.servers", "hadoop001:9092");
// 指定监听的主题,并定义Kafka字节消息到Flink对象之间的转换规则
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties));
stream.print();
env.execute("Flink Streaming");

3.3 整合测试

1. 启动 Kakfa

Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:

1
2
3
4
5
# zookeeper启动命令
bin/zkServer.sh start

# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties

启动单节点 kafka 用于测试:

1
# bin/kafka-server-start.sh config/server.properties

2. 创建 Topic

1
2
3
4
5
6
7
8
9
# 创建用于测试主题
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 \
--partitions 1 \
--topic flink-stream-in-topic

# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3. 启动 Producer

这里 启动一个 Kafka 生产者,用于发送测试数据:

1
bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic flink-stream-in-topic

4. 测试结果

在 Producer 上输入任意测试数据,之后观察程序控制台的输出:

image-20230927231512855
程序控制台的输出如下:

image-20230927231516490
可以看到已经成功接收并打印出相关的数据。

参考资料

  1. data-sources:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html#data-sources
  2. Streaming Connectors:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/index.html
  3. Apache Kafka Connector: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html

.png)

Flink Transformation

一、Transformations 分类

Flink 的 Transformations 操作主要用于将一个和多个 DataStream 按需转换成新的 DataStream。它主要分为以下三类:

  • DataStream Transformations:进行数据流相关转换操作;
  • Physical partitioning:物理分区。Flink 提供的底层 API ,允许用户定义数据的分区规则;
  • Task chaining and resource groups:任务链和资源组。允许用户进行任务链和资源组的细粒度的控制。

以下分别对其主要 API 进行介绍:

二、DataStream Transformations

2.1 Map [DataStream → DataStream]

对一个 DataStream 中的每个元素都执行特定的转换操作:

1
2
3
DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5);
integerDataStream.map((MapFunction<Integer, Object>) value -> value * 2).print();
// 输出 2,4,6,8,10

2.2 FlatMap [DataStream → DataStream]

FlatMap 与 Map 类似,但是 FlatMap 中的一个输入元素可以被映射成一个或者多个输出元素,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
String string01 = "one one one two two";
String string02 = "third third third four";
DataStream<String> stringDataStream = env.fromElements(string01, string02);
stringDataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String s : value.split(" ")) {
out.collect(s);
}
}
}).print();
// 输出每一个独立的单词,为节省排版,这里去掉换行,后文亦同
one one one two two third third third four

2.3 Filter [DataStream → DataStream]

用于过滤符合条件的数据:

1
env.fromElements(1, 2, 3, 4, 5).filter(x -> x > 3).print();

2.4 KeyBy 和 Reduce

  • KeyBy [DataStream → KeyedStream] :用于将相同 Key 值的数据分到相同的分区中;
  • Reduce [KeyedStream → DataStream] :用于对数据执行归约计算。

如下例子将数据按照 key 值分区后,滚动进行求和计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
DataStream<Tuple2<String, Integer>> tuple2DataStream = env.fromElements(new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("b", 3),
new Tuple2<>("b", 5));
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = tuple2DataStream.keyBy(0);
keyedStream.reduce((ReduceFunction<Tuple2<String, Integer>>) (value1, value2) ->
new Tuple2<>(value1.f0, value1.f1 + value2.f1)).print();

// 持续进行求和计算,输出:
(a,1)
(a,3)
(b,3)
(b,8)

KeyBy 操作存在以下两个限制:

  • KeyBy 操作用于用户自定义的 POJOs 类型时,该自定义类型必须重写 hashCode 方法;
  • KeyBy 操作不能用于数组类型。

2.5 Aggregations [KeyedStream → DataStream]

Aggregations 是官方提供的聚合算子,封装了常用的聚合操作,如上利用 Reduce 进行求和的操作也可以利用 Aggregations 中的 sum 算子重写为下面的形式:

1
tuple2DataStream.keyBy(0).sum(1).print();

除了 sum 外,Flink 还提供了 min , max , minBy,maxBy 等常用聚合算子:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 滚动计算指定key的最小值,可以通过index或者fieldName来指定key
keyedStream.min(0);
keyedStream.min("key");
// 滚动计算指定key的最大值
keyedStream.max(0);
keyedStream.max("key");
// 滚动计算指定key的最小值,并返回其对应的元素
keyedStream.minBy(0);
keyedStream.minBy("key");
// 滚动计算指定key的最大值,并返回其对应的元素
keyedStream.maxBy(0);
keyedStream.maxBy("key");

2.6 Union [DataStream* → DataStream]

用于连接两个或者多个元素类型相同的 DataStream 。当然一个 DataStream 也可以与其本生进行连接,此时该 DataStream 中的每个元素都会被获取两次:

1
2
3
4
5
6
DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 1), 
new Tuple2<>("a", 2));
DataStreamSource<Tuple2<String, Integer>> streamSource02 = env.fromElements(new Tuple2<>("b", 1),
new Tuple2<>("b", 2));
streamSource01.union(streamSource02);
streamSource01.union(streamSource01,streamSource02);

2.7 Connect [DataStream,DataStream → ConnectedStreams]

Connect 操作用于连接两个或者多个类型不同的 DataStream ,其返回的类型是 ConnectedStreams ,此时被连接的多个 DataStreams 可以共享彼此之间的数据状态。但是需要注意的是由于不同 DataStream 之间的数据类型是不同的,如果想要进行后续的计算操作,还需要通过 CoMap 或 CoFlatMap 将 ConnectedStreams 转换回 DataStream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 3), 
new Tuple2<>("b", 5));
DataStreamSource<Integer> streamSource02 = env.fromElements(2, 3, 9);
// 使用connect进行连接
ConnectedStreams<Tuple2<String, Integer>, Integer> connect = streamSource01.connect(streamSource02);
connect.map(new CoMapFunction<Tuple2<String, Integer>, Integer, Integer>() {
@Override
public Integer map1(Tuple2<String, Integer> value) throws Exception {
return value.f1;
}

@Override
public Integer map2(Integer value) throws Exception {
return value;
}
}).map(x -> x * 100).print();

// 输出:
300 500 200 900 300

2.8 Split 和 Select

  • **Split [DataStream → SplitStream]**:用于将一个 DataStream 按照指定规则进行拆分为多个 DataStream,需要注意的是这里进行的是逻辑拆分,即 Split 只是将数据贴上不同的类型标签,但最终返回的仍然只是一个 SplitStream;
  • **Select [SplitStream → DataStream]**:想要从逻辑拆分的 SplitStream 中获取真实的不同类型的 DataStream,需要使用 Select 算子,示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
// 标记
SplitStream<Integer> split = streamSource.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
output.add(value % 2 == 0 ? "even" : "odd");
return output;
}
});
// 获取偶数数据集
split.select("even").print();
// 输出 2,4,6,8

2.9 project [DataStream → DataStream]

project 主要用于获取 tuples 中的指定字段集,示例如下:

1
2
3
4
5
6
7
8
DataStreamSource<Tuple3<String, Integer, String>> streamSource = env.fromElements(
new Tuple3<>("li", 22, "2018-09-23"),
new Tuple3<>("ming", 33, "2020-09-23"));
streamSource.project(0,2).print();

// 输出
(li,2018-09-23)
(ming,2020-09-23)

三、物理分区

物理分区 (Physical partitioning) 是 Flink 提供的底层的 API,允许用户采用内置的分区规则或者自定义的分区规则来对数据进行分区,从而避免数据在某些分区上过于倾斜,常用的分区规则如下:

3.1 Random partitioning [DataStream → DataStream]

随机分区 (Random partitioning) 用于随机的将数据分布到所有下游分区中,通过 shuffle 方法来进行实现:

1
dataStream.shuffle();

3.2 Rebalancing [DataStream → DataStream]

Rebalancing 采用轮询的方式将数据进行分区,其适合于存在数据倾斜的场景下,通过 rebalance 方法进行实现:

1
dataStream.rebalance();

3.3 Rescaling [DataStream → DataStream]

当采用 Rebalancing 进行分区平衡时,其实现的是全局性的负载均衡,数据会通过网络传输到其他节点上并完成分区数据的均衡。 而 Rescaling 则是低配版本的 rebalance,它不需要额外的网络开销,它只会对上下游的算子之间进行重新均衡,通过 rescale 方法进行实现:

1
dataStream.rescale();

ReScale 这个单词具有重新缩放的意义,其对应的操作也是如此,具体如下:如果上游 operation 并行度为 2,而下游的 operation 并行度为 6,则其中 1 个上游的 operation 会将元素分发到 3 个下游 operation,另 1 个上游 operation 则会将元素分发到另外 3 个下游 operation。反之亦然,如果上游的 operation 并行度为 6,而下游 operation 并行度为 2,则其中 3 个上游 operation 会将元素分发到 1 个下游 operation,另 3 个上游 operation 会将元素分发到另外 1 个下游operation:

image-20230927231522872

3.4 Broadcasting [DataStream → DataStream]

将数据分发到所有分区上。通常用于小数据集与大数据集进行关联的情况下,此时可以将小数据集广播到所有分区上,避免频繁的跨分区关联,通过 broadcast 方法进行实现:

1
dataStream.broadcast();

3.5 Custom partitioning [DataStream → DataStream]

Flink 运行用户采用自定义的分区规则来实现分区,此时需要通过实现 Partitioner 接口来自定义分区规则,并指定对应的分区键,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromElements(new Tuple2<>("Hadoop", 1),
new Tuple2<>("Spark", 1),
new Tuple2<>("Flink-streaming", 2),
new Tuple2<>("Flink-batch", 4),
new Tuple2<>("Storm", 4),
new Tuple2<>("HBase", 3));
streamSource.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
// 将第一个字段包含flink的Tuple2分配到同一个分区
return key.toLowerCase().contains("flink") ? 0 : 1;
}
}, 0).print();


// 输出如下:
1> (Flink-streaming,2)
1> (Flink-batch,4)
2> (Hadoop,1)
2> (Spark,1)
2> (Storm,4)
2> (HBase,3)

四、任务链和资源组

任务链和资源组 ( Task chaining and resource groups ) 也是 Flink 提供的底层 API,用于控制任务链和资源分配。默认情况下,如果操作允许 (例如相邻的两次 map 操作) ,则 Flink 会尝试将它们在同一个线程内进行,从而可以获取更好的性能。但是 Flink 也允许用户自己来控制这些行为,这就是任务链和资源组 API:

4.1 startNewChain

startNewChain 用于基于当前 operation 开启一个新的任务链。如下所示,基于第一个 map 开启一个新的任务链,此时前一个 map 和 后一个 map 将处于同一个新的任务链中,但它们与 filter 操作则分别处于不同的任务链中:

1
someStream.filter(...).map(...).startNewChain().map(...);

4.2 disableChaining

disableChaining 操作用于禁止将其他操作与当前操作放置于同一个任务链中,示例如下:

1
someStream.map(...).disableChaining();

4.3 slotSharingGroup

slot 是任务管理器 (TaskManager) 所拥有资源的固定子集,每个操作 (operation) 的子任务 (sub task) 都需要获取 slot 来执行计算,但每个操作所需要资源的大小都是不相同的,为了更好地利用资源,Flink 允许不同操作的子任务被部署到同一 slot 中。slotSharingGroup 用于设置操作的 slot 共享组 (slot sharing group) ,Flink 会将具有相同 slot 共享组的操作放到同一个 slot 中 。示例如下:

1
someStream.filter(...).slotSharingGroup("slotSharingGroupName");

参考资料

Flink Operators: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/

.png)

Flink Sink

一、Data Sinks

在使用 Flink 进行数据处理时,数据经 Data Source 流入,然后通过系列 Transformations 的转化,最终可以通过 Sink 将计算结果进行输出,Flink Data Sinks 就是用于定义数据流最终的输出位置。Flink 提供了几个较为简单的 Sink API 用于日常的开发,具体如下:

1.1 writeAsText

writeAsText 用于将计算结果以文本的方式并行地写入到指定文件夹下,除了路径参数是必选外,该方法还可以通过指定第二个参数来定义输出模式,它有以下两个可选值:

  • WriteMode.NO_OVERWRITE:当指定路径上不存在任何文件时,才执行写出操作;
  • WriteMode.OVERWRITE:不论指定路径上是否存在文件,都执行写出操作;如果原来已有文件,则进行覆盖。

使用示例如下:

1
streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE);

以上写出是以并行的方式写出到多个文件,如果想要将输出结果全部写出到一个文件,需要设置其并行度为 1:

1
streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

1.2 writeAsCsv

writeAsCsv 用于将计算结果以 CSV 的文件格式写出到指定目录,除了路径参数是必选外,该方法还支持传入输出模式,行分隔符,和字段分隔符三个额外的参数,其方法定义如下:

1
writeAsCsv(String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter) 

1.3 print \ printToErr

print \ printToErr 是测试当中最常用的方式,用于将计算结果以标准输出流或错误输出流的方式打印到控制台上。

1.4 writeUsingOutputFormat

采用自定义的输出格式将计算结果写出,上面介绍的 writeAsTextwriteAsCsv 其底层调用的都是该方法,源码如下:

1
2
3
4
5
public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
TextOutputFormat<T> tof = new TextOutputFormat<>(new Path(path));
tof.setWriteMode(writeMode);
return writeUsingOutputFormat(tof);
}

1.5 writeToSocket

writeToSocket 用于将计算结果以指定的格式写出到 Socket 中,使用示例如下:

1
streamSource.writeToSocket("192.168.0.226", 9999, new SimpleStringSchema());

二、Streaming Connectors

除了上述 API 外,Flink 中还内置了系列的 Connectors 连接器,用于将计算结果输入到常用的存储系统或者消息中间件中,具体如下:

  • Apache Kafka (支持 source 和 sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Google PubSub (source/sink)

除了内置的连接器外,你还可以通过 Apache Bahir 的连接器扩展 Flink。Apache Bahir 旨在为分布式数据分析系统 (如 Spark,Flink) 等提供功能上的扩展,当前其支持的与 Flink Sink 相关的连接器如下:

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)

这里接着在 Data Sources 章节介绍的整合 Kafka Source 的基础上,将 Kafka Sink 也一并进行整合,具体步骤如下。

三、整合 Kafka Sink

3.1 addSink

Flink 提供了 addSink 方法用来调用自定义的 Sink 或者第三方的连接器,想要将计算结果写出到 Kafka,需要使用该方法来调用 Kafka 的生产者 FlinkKafkaProducer,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 1.指定Kafka的相关配置属性
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.200.0:9092");

// 2.接收Kafka上的数据
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties));

// 3.定义计算结果到 Kafka ProducerRecord 的转换
KafkaSerializationSchema<String> kafkaSerializationSchema = new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
return new ProducerRecord<>("flink-stream-out-topic", element.getBytes());
}
};
// 4. 定义Flink Kafka生产者
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("flink-stream-out-topic",
kafkaSerializationSchema,
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5);
// 5. 将接收到输入元素*2后写出到Kafka
stream.map((MapFunction<String, String>) value -> value + value).addSink(kafkaProducer);
env.execute("Flink Streaming");

3.2 创建输出主题

创建用于输出测试的主题:

1
2
3
4
5
6
7
8
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 \
--partitions 1 \
--topic flink-stream-out-topic

# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3.3 启动消费者

启动一个 Kafka 消费者,用于查看 Flink 程序的输出情况:

1
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flink-stream-out-topic

3.4 测试结果

在 Kafka 生产者上发送消息到 Flink 程序,观察 Flink 程序转换后的输出情况,具体如下:

image-20230927231535788

可以看到 Kafka 生成者发出的数据已经被 Flink 程序正常接收到,并经过转换后又输出到 Kafka 对应的 Topic 上。

四、自定义 Sink

除了使用内置的第三方连接器外,Flink 还支持使用自定义的 Sink 来满足多样化的输出需求。想要实现自定义的 Sink ,需要直接或者间接实现 SinkFunction 接口。通常情况下,我们都是实现其抽象类 RichSinkFunction,相比于 SinkFunction ,其提供了更多的与生命周期相关的方法。两者间的关系如下:

image-20230927231539613

这里我们以自定义一个 FlinkToMySQLSink 为例,将计算结果写出到 MySQL 数据库中,具体步骤如下:

4.1 导入依赖

首先需要导入 MySQL 相关的依赖:

1
2
3
4
5
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>

4.2 自定义 Sink

继承自 RichSinkFunction,实现自定义的 Sink :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class FlinkToMySQLSink extends RichSinkFunction<Employee> {

private PreparedStatement stmt;
private Connection conn;

@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.cj.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://192.168.0.229:3306/employees" +
"?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",
"root",
"123456");
String sql = "insert into emp(name, age, birthday) values(?, ?, ?)";
stmt = conn.prepareStatement(sql);
}

@Override
public void invoke(Employee value, Context context) throws Exception {
stmt.setString(1, value.getName());
stmt.setInt(2, value.getAge());
stmt.setDate(3, value.getBirthday());
stmt.executeUpdate();
}

@Override
public void close() throws Exception {
super.close();
if (stmt != null) {
stmt.close();
}
if (conn != null) {
conn.close();
}
}

}

4.3 使用自定义 Sink

想要使用自定义的 Sink,同样是需要调用 addSink 方法,具体如下:

1
2
3
4
5
6
7
8
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Date date = new Date(System.currentTimeMillis());
DataStreamSource<Employee> streamSource = env.fromElements(
new Employee("hei", 10, date),
new Employee("bai", 20, date),
new Employee("ying", 30, date));
streamSource.addSink(new FlinkToMySQLSink());
env.execute();

4.4 测试结果

启动程序,观察数据库写入情况:

image-20230927231544556

数据库成功写入,代表自定义 Sink 整合成功。

以上所有用例的源码见本仓库:flink-kafka-integration

参考资料

  1. data-sinks: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html#data-sinks
  2. Streaming Connectors:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/index.html
  3. Apache Kafka Connector: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html

.png)

Flink Windows

一、窗口概念

在大多数场景下,我们需要统计的数据流都是无界的,因此我们无法等待整个数据流终止后才进行统计。通常情况下,我们只需要对某个时间范围或者数量范围内的数据进行统计分析:如每隔五分钟统计一次过去一小时内所有商品的点击量;或者每发生1000次点击后,都去统计一下每个商品点击率的占比。在 Flink 中,我们使用窗口 (Window) 来实现这类功能。按照统计维度的不同,Flink 中的窗口可以分为 时间窗口 (Time Windows) 和 计数窗口 (Count Windows) 。

二、Time Windows

Time Windows 用于以时间为维度来进行数据聚合,具体分为以下四类:

2.1 Tumbling Windows

滚动窗口 (Tumbling Windows) 是指彼此之间没有重叠的窗口。例如:每隔1小时统计过去1小时内的商品点击量,那么 1 天就只能分为 24 个窗口,每个窗口彼此之间是不存在重叠的,具体如下:

image-20230927231548641

这里我们以词频统计为例,给出一个具体的用例,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 接收socket上的数据输入
DataStreamSource<String> streamSource = env.socketTextStream("hadoop001", 9999, "\n", 3);
streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
String[] words = value.split("\t");
for (String word : words) {
out.collect(new Tuple2<>(word, 1L));
}
}
}).keyBy(0).timeWindow(Time.seconds(3)).sum(1).print(); //每隔3秒统计一次每个单词出现的数量
env.execute("Flink Streaming");

测试结果如下:

image-20230927231552178

2.2 Sliding Windows

滑动窗口用于滚动进行聚合分析,例如:每隔 6 分钟统计一次过去一小时内所有商品的点击量,那么统计窗口彼此之间就是存在重叠的,即 1天可以分为 240 个窗口。图示如下:

image-20230927231554871

可以看到 window 1 - 4 这四个窗口彼此之间都存在着时间相等的重叠部分。想要实现滑动窗口,只需要在使用 timeWindow 方法时额外传递第二个参数作为滚动时间即可,具体如下:

1
2
// 每隔3秒统计一次过去1分钟内的数据
timeWindow(Time.minutes(1),Time.seconds(3))

2.3 Session Windows

当用户在进行持续浏览时,可能每时每刻都会有点击数据,例如在活动区间内,用户可能频繁的将某类商品加入和移除购物车,而你只想知道用户本次浏览最终的购物车情况,此时就可以在用户持有的会话结束后再进行统计。想要实现这类统计,可以通过 Session Windows 来进行实现。

image-20230927231558094

具体的实现代码如下:

1
2
3
4
// 以处理时间为衡量标准,如果10秒内没有任何数据输入,就认为会话已经关闭,此时触发统计
window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 以事件时间为衡量标准
window(EventTimeSessionWindows.withGap(Time.seconds(10)))

2.4 Global Windows

最后一个窗口是全局窗口, 全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。如果没有相应触发器,则计算将不会被执行。

image-20230927231601191

这里继续以上面词频统计的案例为例,示例代码如下:

1
2
// 当单词累计出现的次数每达到10次时,则触发计算,计算整个窗口内该单词出现的总数
window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print();

三、Count Windows

Count Windows 用于以数量为维度来进行数据聚合,同样也分为滚动窗口和滑动窗口,实现方式也和时间窗口完全一致,只是调用的 API 不同,具体如下:

1
2
3
4
// 滚动计数窗口,每1000次点击则计算一次
countWindow(1000)
// 滑动计数窗口,每10次点击发生后,则计算过去1000次点击的情况
countWindow(1000,10)

实际上计数窗口内部就是调用的我们上一部分介绍的全局窗口来实现的,其源码如下:

1
2
3
4
5
6
7
8
9
10
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}


public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}

参考资料

Flink Windows: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html

.png)

Flink 状态管理

一、状态分类

相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用:

image-20230927231605364

具体而言,Flink 又将状态 (State) 分为 Keyed State 与 Operator State:

2.1 算子状态

算子状态 (Operator State):顾名思义,状态是和算子进行绑定的,一个算子的状态不能被其他算子所访问到。官方文档上对 Operator State 的解释是:each operator state is bound to one parallel operator instance,所以更为确切的说一个算子状态是与一个并发的算子实例所绑定的,即假设算子的并行度是 2,那么其应有两个对应的算子状态:

image-20230927231607806

2.2 键控状态

键控状态 (Keyed State) :是一种特殊的算子状态,即状态是根据 key 值进行区分的,Flink 会为每类键值维护一个状态实例。如下图所示,每个颜色代表不同 key 值,对应四个不同的状态实例。需要注意的是键控状态只能在 KeyedStream 上进行使用,我们可以通过 stream.keyBy(...) 来得到 KeyedStream

image-20230927231610638

二、状态编程

2.1 键控状态

Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):

  • ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。
  • ListState:存储列表类型的状态。可以使用 add(T)addAll(List) 添加元素;并通过 get() 获得整个列表。
  • ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。
  • AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。
  • FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。
  • MapState:维护 Map 类型的状态。

以上所有增删改查方法不必硬记,在使用时通过语法提示来调用即可。这里给出一个具体的使用示例:假设我们正在开发一个监控系统,当监控数据超过阈值一定次数后,需要发出报警信息。这里之所以要达到一定次数,是因为由于偶发原因,偶尔一次超过阈值并不能代表什么,故需要达到一定次数后才触发报警,这就需要使用到 Flink 的状态编程。相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ThresholdWarning extends 
RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {

// 通过ListState来存储非正常数据的状态
private transient ListState<Long> abnormalData;
// 需要监控的阈值
private Long threshold;
// 触发报警的次数
private Integer numberOfTimes;

ThresholdWarning(Long threshold, Integer numberOfTimes) {
this.threshold = threshold;
this.numberOfTimes = numberOfTimes;
}

@Override
public void open(Configuration parameters) {
// 通过状态名称(句柄)获取状态实例,如果不存在则会自动创建
abnormalData = getRuntimeContext().getListState(
new ListStateDescriptor<>("abnormalData", Long.class));
}

@Override
public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out)
throws Exception {
Long inputValue = value.f1;
// 如果输入值超过阈值,则记录该次不正常的数据信息
if (inputValue >= threshold) {
abnormalData.add(inputValue);
}
ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());
// 如果不正常的数据出现达到一定次数,则输出报警信息
if (list.size() >= numberOfTimes) {
out.collect(Tuple2.of(value.f0 + " 超过指定阈值 ", list));
// 报警信息输出后,清空状态
abnormalData.clear();
}
}
}

调用自定义的状态监控,这里我们使用 a,b 来代表不同类型的监控数据,分别对其数据进行监控:

1
2
3
4
5
6
7
8
9
10
11
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(
Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));
tuple2DataStreamSource
.keyBy(0)
.flatMap(new ThresholdWarning(100L, 3)) // 超过100的阈值3次后就进行报警
.printToErr();
env.execute("Managed Keyed State");

输出如下结果如下:

image-20230927231615481

2.2 状态有效期

以上任何类型的 keyed state 都支持配置有效期 (TTL) ,示例如下:

1
2
3
4
5
6
7
8
9
10
11
StateTtlConfig ttlConfig = StateTtlConfig
// 设置有效期为 10 秒
.newBuilder(Time.seconds(10))
// 设置有效期更新规则,这里设置为当创建和写入时,都重置其有效期到规定的10秒
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
/*设置只要值过期就不可见,另外一个可选值是ReturnExpiredIfNotCleanedUp,
代表即使值过期了,但如果还没有被物理删除,就是可见的*/
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("abnormalData", Long.class);
descriptor.enableTimeToLive(ttlConfig);

2.3 算子状态

相比于键控状态,算子状态目前支持的存储类型只有以下三种:

  • ListState:存储列表类型的状态。
  • UnionListState:存储列表类型的状态,与 ListState 的区别在于:如果并行度发生变化,ListState 会将该算子的所有并发的状态实例进行汇总,然后均分给新的 Task;而 UnionListState 只是将所有并发的状态实例汇总起来,具体的划分行为则由用户进行定义。
  • BroadcastState:用于广播的算子状态。

这里我们继续沿用上面的例子,假设此时我们不需要区分监控数据的类型,只要有监控数据超过阈值并达到指定的次数后,就进行报警,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String, Long>, 
Tuple2<String, List<Tuple2<String, Long>>>> implements CheckpointedFunction {

// 非正常数据
private List<Tuple2<String, Long>> bufferedData;
// checkPointedState
private transient ListState<Tuple2<String, Long>> checkPointedState;
// 需要监控的阈值
private Long threshold;
// 次数
private Integer numberOfTimes;

ThresholdWarning(Long threshold, Integer numberOfTimes) {
this.threshold = threshold;
this.numberOfTimes = numberOfTimes;
this.bufferedData = new ArrayList<>();
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 注意这里获取的是OperatorStateStore
checkPointedState = context.getOperatorStateStore().
getListState(new ListStateDescriptor<>("abnormalData",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
})));
// 如果发生重启,则需要从快照中将状态进行恢复
if (context.isRestored()) {
for (Tuple2<String, Long> element : checkPointedState.get()) {
bufferedData.add(element);
}
}
}

@Override
public void flatMap(Tuple2<String, Long> value,
Collector<Tuple2<String, List<Tuple2<String, Long>>>> out) {
Long inputValue = value.f1;
// 超过阈值则进行记录
if (inputValue >= threshold) {
bufferedData.add(value);
}
// 超过指定次数则输出报警信息
if (bufferedData.size() >= numberOfTimes) {
// 顺便输出状态实例的hashcode
out.collect(Tuple2.of(checkPointedState.hashCode() + "阈值警报!", bufferedData));
bufferedData.clear();
}
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 在进行快照时,将数据存储到checkPointedState
checkPointedState.clear();
for (Tuple2<String, Long> element : bufferedData) {
checkPointedState.add(element);
}
}
}

调用自定义算子状态,这里需要将并行度设置为 1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启检查点机制
env.enableCheckpointing(1000);
// 设置并行度为1
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.setParallelism(1).fromElements(
Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));
tuple2DataStreamSource
.flatMap(new ThresholdWarning(100L, 3))
.printToErr();
env.execute("Managed Keyed State");
}

此时输出如下:

image-20230927231619356

在上面的调用代码中,我们将程序的并行度设置为 1,可以看到三次输出中状态实例的 hashcode 全是一致的,证明它们都同一个状态实例。假设将并行度设置为 2,此时输出如下:

image-20230927231622476

可以看到此时两次输出中状态实例的 hashcode 是不一致的,代表它们不是同一个状态实例,这也就是上文提到的,一个算子状态是与一个并发的算子实例所绑定的。同时这里只输出两次,是因为在并发处理的情况下,线程 1 可能拿到 5 个非正常值,线程 2 可能拿到 4 个非正常值,因为要大于 3 次才能输出,所以在这种情况下就会出现只输出两条记录的情况,所以需要将程序的并行度设置为 1。

三、检查点机制

3.1 CheckPoints

为了使 Flink 的状态具有良好的容错性,Flink 提供了检查点机制 (CheckPoints) 。通过检查点机制,Flink 定期在数据流上生成 checkpoint barrier ,当某个算子收到 barrier 时,即会基于当前状态生成一份快照,然后再将该 barrier 传递到下游算子,下游算子接收到该 barrier 后,也基于当前状态生成一份快照,依次传递直至到最后的 Sink 算子上。当出现异常后,Flink 就可以根据最近的一次的快照数据将所有算子恢复到先前的状态。

image-20230927231625398

3.2 开启检查点

默认情况下,检查点机制是关闭的,需要在程序中进行开启:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 开启检查点机制,并指定状态检查点之间的时间间隔
env.enableCheckpointing(1000);

// 其他可选配置如下:
// 设置语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置两个检查点之间的最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 设置执行Checkpoint操作时的超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 设置最大并发执行的检查点的数量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 将检查点持久化到外部存储
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 如果有更近的保存点时,是否将作业回退到该检查点
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

3.3 保存点机制

保存点机制 (Savepoints) 是检查点机制的一种特殊的实现,它允许你通过手工的方式来触发 Checkpoint,并将结果持久化存储到指定路径中,主要用于避免 Flink 集群在重启或升级时导致状态丢失。示例如下:

1
2
# 触发指定id的作业的Savepoint,并将结果存储到指定目录下
bin/flink savepoint :jobId [:targetDirectory]

更多命令和配置可以参考官方文档:savepoints

四、状态后端

4.1 状态管理器分类

默认情况下,所有的状态都存储在 JVM 的堆内存中,在状态数据过多的情况下,这种方式很有可能导致内存溢出,因此 Flink 该提供了其它方式来存储状态数据,这些存储方式统一称为状态后端 (或状态管理器):

image-20230927231636757

主要有以下三种:

1. MemoryStateBackend

默认的方式,即基于 JVM 的堆内存进行存储,主要适用于本地开发和调试。

2. FsStateBackend

基于文件系统进行存储,可以是本地文件系统,也可以是 HDFS 等分布式文件系统。 需要注意而是虽然选择使用了 FsStateBackend ,但正在进行的数据仍然是存储在 TaskManager 的内存中的,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上。

3. RocksDBStateBackend

RocksDBStateBackend 是 Flink 内置的第三方状态管理器,采用嵌入式的 key-value 型数据库 RocksDB 来存储正在进行的数据。等到 checkpoint 时,再将其中的数据持久化到指定的文件系统中,所以采用 RocksDBStateBackend 时也需要配置持久化存储的文件系统。之所以这样做是因为 RocksDB 作为嵌入式数据库安全性比较低,但比起全文件系统的方式,其读取速率更快;比起全内存的方式,其存储空间更大,因此它是一种比较均衡的方案。

4.2 配置方式

Flink 支持使用两种方式来配置后端管理器:

第一种方式:基于代码方式进行配置,只对当前作业生效:

1
2
3
4
// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));

配置 RocksDBStateBackend 时,需要额外导入下面的依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.0</version>
</dependency>

第二种方式:基于 flink-conf.yaml 配置文件的方式进行配置,对所有部署在该集群上的作业都生效:

1
2
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

注:本篇文章所有示例代码下载地址:flink-state-management

参考资料

Flink Standalone Cluster

一、部署模式

Flink 支持使用多种部署模式来满足不同规模应用的需求,常见的有单机模式,Standalone Cluster 模式,同时 Flink 也支持部署在其他第三方平台上,如 YARN,Mesos,Docker,Kubernetes 等。以下主要介绍其单机模式和 Standalone Cluster 模式的部署。

二、单机模式

单机模式是一种开箱即用的模式,可以在单台服务器上运行,适用于日常的开发和调试。具体操作步骤如下:

2.1 安装部署

1. 前置条件

Flink 的运行依赖 JAVA 环境,故需要预先安装好 JDK,具体步骤可以参考:Linux 环境下 JDK 安装

2. 下载 & 解压 & 运行

Flink 所有版本的安装包可以直接从其官网进行下载,这里我下载的 Flink 的版本为 1.9.1 ,要求的 JDK 版本为 1.8.x +。 下载后解压到指定目录:

1
tar -zxvf flink-1.9.1-bin-scala_2.12.tgz  -C /usr/app

不需要进行任何配置,直接使用以下命令就可以启动单机版本的 Flink:

1
bin/start-cluster.sh

3. WEB UI 界面

Flink 提供了 WEB 界面用于直观的管理 Flink 集群,访问端口为 8081

image-20230927231649754

Flink 的 WEB UI 界面支持大多数常用功能,如提交作业,取消作业,查看各个节点运行情况,查看作业执行情况等,大家可以在部署完成后,进入该页面进行详细的浏览。

2.2 作业提交

启动后可以运行安装包中自带的词频统计案例,具体步骤如下:

1. 开启端口

1
nc -lk 9999

2. 提交作业

1
bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999

该 JAR 包的源码可以在 Flink 官方的 GitHub 仓库中找到,地址为 :SocketWindowWordCount ,可选传参有 hostname, port,对应的词频数据需要使用空格进行分割。

3. 输入测试数据

1
a a b b c c c a e

4. 查看控制台输出

可以通过 WEB UI 的控制台查看作业统运行情况:

image-20230927231653244

也可以通过 WEB 控制台查看到统计结果:

image-20230927231656691

2.3 停止作业

可以直接在 WEB 界面上点击对应作业的 Cancel Job 按钮进行取消,也可以使用命令行进行取消。使用命令行进行取消时,需要先获取到作业的 JobId,可以使用 flink list 命令查看,输出如下:

1
2
3
4
5
6
[root@hadoop001 flink-1.9.1]# ./bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
05.11.2019 08:19:53 : ba2b1cc41a5e241c32d574c93de8a2bc : Socket Window WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

获取到 JobId 后,就可以使用 flink cancel 命令取消作业:

1
bin/flink cancel ba2b1cc41a5e241c32d574c93de8a2bc

命令如下:

1
bin/stop-cluster.sh

三、Standalone Cluster

Standalone Cluster 模式是 Flink 自带的一种集群模式,具体配置步骤如下:

3.1 前置条件

使用该模式前,需要确保所有服务器间都已经配置好 SSH 免密登录服务。这里我以三台服务器为例,主机名分别为 hadoop001,hadoop002,hadoop003 , 其中 hadoop001 为 master 节点,其余两台为 slave 节点,搭建步骤如下:

3.2 搭建步骤

修改 conf/flink-conf.yaml 中 jobmanager 节点的通讯地址为 hadoop001:

1
jobmanager.rpc.address: hadoop001

修改 conf/slaves 配置文件,将 hadoop002 和 hadoop003 配置为 slave 节点:

1
2
hadoop002
hadoop003

将配置好的 Flink 安装包分发到其他两台服务器上:

1
2
scp -r /usr/app/flink-1.9.1 hadoop002:/usr/app
scp -r /usr/app/flink-1.9.1 hadoop003:/usr/app

在 hadoop001 上使用和单机模式相同的命令来启动集群:

1
bin/start-cluster.sh

此时控制台输出如下:

image-20230927231700377

启动完成后可以使用 Jps 命令或者通过 WEB 界面来查看是否启动成功。

3.3 可选配置

除了上面介绍的 jobmanager.rpc.address 是必选配置外,Flink h还支持使用其他可选参数来优化集群性能,主要如下:

  • jobmanager.heap.size:JobManager 的 JVM 堆内存大小,默认为 1024m 。
  • taskmanager.heap.size:Taskmanager 的 JVM 堆内存大小,默认为 1024m 。
  • taskmanager.numberOfTaskSlots:Taskmanager 上 slots 的数量,通常设置为 CPU 核心的数量,或其一半。
  • parallelism.default:任务默认的并行度。
  • io.tmp.dirs:存储临时文件的路径,如果没有配置,则默认采用服务器的临时目录,如 LInux 的 /tmp 目录。

更多配置可以参考 Flink 的官方手册:Configuration

四、Standalone Cluster HA

上面我们配置的 Standalone 集群实际上只有一个 JobManager,此时是存在单点故障的,所以官方提供了 Standalone Cluster HA 模式来实现集群高可用。

4.1 前置条件

在 Standalone Cluster HA 模式下,集群可以由多个 JobManager,但只有一个处于 active 状态,其余的则处于备用状态,Flink 使用 ZooKeeper 来选举出 Active JobManager,并依赖其来提供一致性协调服务,所以需要预先安装 ZooKeeper 。

另外在高可用模式下,还需要使用分布式文件系统来持久化存储 JobManager 的元数据,最常用的就是 HDFS,所以 Hadoop 也需要预先安装。关于 Hadoop 集群和 ZooKeeper 集群的搭建可以参考:

4.2 搭建步骤

修改 conf/flink-conf.yaml 文件,增加如下配置:

1
2
3
4
5
6
7
8
9
10
# 配置使用zookeeper来开启高可用模式
high-availability: zookeeper
# 配置zookeeper的地址,采用zookeeper集群时,可以使用逗号来分隔多个节点地址
high-availability.zookeeper.quorum: hadoop003:2181
# 在zookeeper上存储flink集群元信息的路径
high-availability.zookeeper.path.root: /flink
# 集群id
high-availability.cluster-id: /standalone_cluster_one
# 持久化存储JobManager元数据的地址,zookeeper上存储的只是指向该元数据的指针信息
high-availability.storageDir: hdfs://hadoop001:8020/flink/recovery

修改 conf/masters 文件,将 hadoop001 和 hadoop002 都配置为 master 节点:

1
2
hadoop001:8081
hadoop002:8081

确保 Hadoop 和 ZooKeeper 已经启动后,使用以下命令来启动集群:

1
bin/start-cluster.sh

此时输出如下:

image-20230927231704329

可以看到集群已经以 HA 的模式启动,此时还需要在各个节点上使用 jps 命令来查看进程是否启动成功,正常情况如下:

image-20230927231707293

只有 hadoop001 和 hadoop002 的 JobManager 进程,hadoop002 和 hadoop003 上的 TaskManager 进程都已经完全启动,才表示 Standalone Cluster HA 模式搭建成功。

4.3 常见异常

如果进程没有启动,可以通过查看 log 目录下的日志来定位错误,常见的一个错误如下:

1
2
3
4
5
6
7
8
9
10
11
2019-11-05 09:18:35,877 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint      
- Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics
java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
.......
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file
system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no
Hadoop file system to support this scheme could be loaded.
.....
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in
the classpath/dependencies.
......

可以看到是因为在 classpath 目录下找不到 Hadoop 的相关依赖,此时需要检查是否在环境变量中配置了 Hadoop 的安装路径,如果路径已经配置但仍然存在上面的问题,可以从 Flink 官网下载对应版本的 Hadoop 组件包:

image-20230927231712390

下载完成后,将该 JAR 包上传至所有 Flink 安装目录的 lib 目录即可。

参考资料