一、Flink 简介
Apache Flink 是一个用于对无边界和有边界数据流进行有状态计算的框架和分布式处理引擎。Flink 被设计为运行在所有常见的集群环境中,并且以内存速度和任意规模执行运算。
- 无边界的数据集
无边界定义了开始但没有定义结束。它们不会在生成时终止提供数据,必须持续地处理无边界流,即必须在拉取到事件后立即处理它。无法等待所有输入数据到达后处理,因为输入是无边界的,并且在任务时间点都不会完成。处理无边界数据通常要求以特定顺序(例如事件发生的顺序)拉取事件,以便能够推断结果完整性。
- 有边界的数据集
有边界流定义了开始和结束。可以在执行任何计算之前通过拉取到所有数据后处理有界流。处理有界流不需要有序拉取,因为可以随时对有界数据集进行排序。有边界流的处理也称为批处理。
二、Flink 体系架构
Flink 是一种 master-slave 风格的架构,主节点是 JobManager,从节点是 TaskManager。
Flink 集群启动后,会有两种进程,一种是 JobManager(Master),一种是 TaskManager(Worker),我们可以通过 jps 或者 ps -ef | grep java 命名来查看 Flink 进程。
Master 进程(JobManager):用于分布式执行,调度任务,协调检查点(checkpoint),协调失败恢复等。Flink 集群中至少有一个 Master 进程;为了高可用性,通常会有多个 Master 节点,选举其中一个作为 leader,其余作为 standby。
Worker 进程(TaskManager):用于执行 dataflow 上的 task(subtask),缓存和交换数据流,TaskManager 至少有一个。
Flink 集群的 Master 进程和 Worker 进程可以通过多种方式启动,既可以在物理机上部署启动,也可以通过容器计数、或者像 YARN 这样的资源管理框架启动。Worker 连接到 Master,告知自身可用,并等待分配任务。
Client 不是 Flink 集群运行时的一部分,它作为客户端,用来准备和发送数据流到 Master,在这之后,客户端可以断开,或者保持连接接受结果数据。客户端程序可以是 Java 或 Scala 程序,也可以通过命令行的方式(bin/flink run...)来触发 Flink 集群执行。
客户端向 JobManager 提交作业,JobManager 分配给 TaskManager 执行,每个 TaskManager 有多个 TaskSolt,每个 TaskSolt 可以处理一个 Task,所以一个 TaskManager 可以处理多个 Task,TaskManager 执行 Task 时会向 JobManager 汇报执行的进度和状态,所以 JobManager 能够及时更新了解整个作业执行的情况,并且在完成或执行失败时,将状态和结果返回给客户端。
三、Flink 生态圈
Flink 以层级式执行组件其软件栈,不同层的栈建立在其下层基础上,并且各层接受程序不同层的抽象形式。
由下到上分为部署层、核心层、API 层和库。
部署层(Deploy)指 Flink 的部署模式,可以是在本地部署一个单机节点,也可以是部署一个集群,集群可以是 Standalone 模式也可以是 YARN 模式(前者是自身管理资源,后者是由 Yarn 负责资源调度),最后是云部署,就是将 Flink 部署在 Docker、K8S 等云容器上。
核心层(Core)就是指 Flink 的核心运行环境,类似于 Spark Core。
API 层按照离线计算和实时计算,分为 DataSet API(离线批量处理)、DataStream API(实时流式处理)。
库层(Libraries)指 Flink 在 API 层的基础上,构建了很多高级的库,方便用户使用,其中基于 DataSet API 的有 Flink ML(机器学习)、Gelly(图计算)、Table(表操作和 SQL);基于 DataStream API 的有 CEP(Complex Event Processing,复杂事件处理)、Table(实时表操作和 SQL)。
四、环境搭建部署
1、单机模式
# 解压安装包
tar -zxvf flink-1.7.2-bin-hadoop27-scala_2.11.tgz -C ~/training
# 启动 Flink 服务器
cd ~/training/flink-1.7.2/
bin/start-cluster.sh
测试一下离线计算和流计算
# 离线计算
bin/flink run examples/batch/WordCount.jar -input hdfs://bigdata111:9000/input/data.txt -output hdfs://bigdata111:9000/flink/wc1
# 流计算
bin/flink run examples/streaming/SocketWindowWordCount.jar --port 1234
# 在本机开启一个发送数据的socket
nc -l -p 1234
# 查看流处理执行结果需要到log下的.out文件中查看
2、集群模式
在 bigdata112、bigdata113、bigdata114 上部署,其中 bigdata112 是主节点,按照单机模式的步骤先解压安装,然后修改配置文件
# conf/flink-conf.yaml
jobmanager.rpc.address: bigdata112
# conf/slaves
bigdata113
bigdata114
# 将 bigdata112 上 flink 的安装目录发送到另外两个节点
scp -r flink-1.7.2/ root@bigdata113:/root/training
scp -r flink-1.7.2/ root@bigdata114:/root/training
在主节点上启动集群,可以看到 JobManager 对应的进程
五、Flink On Yarn
1、两种模式
Yarn 是一个资源调度系统,如果我们不想让 Flink 自己来管理分配资源,专注任务执行,就可以将资源调度委托给 Yarn 来处理,Flink On Yarn 有两种处理模式,一种是内存集中管理模式,一种是内存 Job 管理模式。
- 内存集中管理模式
本模式中,在 Yarn 中初始化一个 Flink 集群,开辟指定的资源,之后我们提交的 Flink Job 都在这个 Flink yarn-session 中,也就是说不管提交多少个 job,这些 job 都会共用开始时在 yarn 中申请的资源。这个 Flink 集群会常驻在 Yarn 集群中,除非手动停止。
- 内存 Job 管理模式
本模式中,每次提交 job 都会创建一个新的 Flink 集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。
【对比】推荐使用内存 job 管理模式,因其资源管理更加有弹性,需要时申请,用完释放,且任务之间的资源使用互相隔离。
2、使用
- 内存集中管理模式
# 启动一个 flink yarn-session
# 查看选项: bin/yarn-session.sh --help
bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -d
# Yarn 的命令解析
# 必选
-n,--container <arg> 分配多少个yarn容器(=taskmanager的数量)
# 可选
-D <property=value> 动态属性
-d,--detached 独立运行
-jm,--jobManagerMemory <arg> JobManager的内存 [in MB]
-nm,--name 在YARN上为一个自定义的应用设置一个名字
-q,--query 显示yarn中可用的资源(内存,cpu核数)
-qu,--queue <arg> 指定YARN队列
-s,--slots <arg> 每个TaskManager使用的slots数量
-tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB]
-z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
-id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中
如果启动类似下面报错
container_1658459582260_0001_01_000001] is running beyond virtual memory limits. Current usage: 235.9 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container.
则在 hadoop 配置(etc/hadoop/yarn-site.xml)中添加以下配置,然后重启 hadoop
<!-- 禁用Hadoop的虚拟内存检查 -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
如果配置完还是报错,需要检查是否配置了 Yarn 的调度器,应用提交创建的容器使用资源大小超过了调度器队列的容量,如果是则需要调整调度器的配置。
- 内存 Job 管理模式
bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 examples/batch/WordCount.jar
# 参数解析
-m:运行在yarn上
-yn:分配一个NodeManager
-yjm: JobManager的内容
-ytm:TaskManager内存
六、Flink HA
Flink 集群是主从架构,如果主节点是单点,容易因单点故障导致集群不可用,所以要实现 HA,必须部署多个主节点,其中一个处于 Active 状态,其他处于 Standby 状态,并且依赖 zk 实现对主节点状态和故障转移选举的处理(这跟 HDFS 的 HA 非常像)。
Standalone 模式(独立模式)下 JobManager 的高可用性的基本思想是,任何时候都有一个 Master JobManager,并且多个 Standby JobManagers。Standby JobManagers 可以在 Master JobManager 挂掉的情况下接管集群称为 Master JobManager。这样保证了没有单点故障,一旦某一个 Standby JobManager 接管集群,程序就可以继续运行。Standby JobManager 和 Master JobManager 实例之间没有明确区别。每个 JobManager 可以称为 Master 或 Standby 节点。
下面使用 bigdata112、bigdata113、bigdata114 三个节点来搭建一个支持 HA 的 Flink 集群。
1、配置
首先在 bigdata112 上安装 Flink 并做以下配置
# conf/flink-conf.yaml
# 用来实现多个 JobManager HA 的zk集群地址
high-availability.zookeeper.quorum: bigdata112:2181,bigdata113:2181,bigdata114:2181
# 保存数据的zk路径
high-availability.zookeeper.path.root: /flink
# 高可用集群的id
high-availability.cluster-id: /cluster_one
# 存储flink元信息的hdfs路径
high-availability.storageDir: hdfs://bigdata111:9000/flink/recovery
# conf/masters
bigdata112
bigdata113
# conf/slaves
bigdata113
bigdata114
# conf/zoo.cfg
server.1=bigdata112:2888:3888
server.2=bigdata113:2888:3888
server.3=bigdata114:2888:3888
# 将 bigdata112 上的安装目录复制到另外两个节点
scp -r flink-1.7.2/ root@bigdata113:/root/training
scp -r flink-1.7.2/ root@bigdata114:/root/training
在这里配置了主节点为 bigdata112、bigdata113,也就是说会在这两个节点上启动主节点进程,具体哪个是 Active 哪个是 Standby 看 zk 选举的结果。
2、启动
首先启动 zk 集群
# 在 bigdata112、bigdata113、bigdata114 上启动 zk
zkServer.sh start
因为有数据要存储到 HDFS 上,所以启动 HDFS
# 在 bigdata111 上启动 HDFS
start-all.sh
接下来在 bigdata112 上启动 flink 集群
cd ~/training/flink-1.7.2
bin/start-cluster.sh
3、验证
启动完之后可以查看三台机器上的进程
其中,StandaloneSessionClusterEntrypoint 是主节点的进程,TaskManagerRunner 是从节点进程,可以看到在 bigdata112、bigdata113 上各启动了一个主节点进程,在 bigdata113、bigdata114 上各启动了一个从节点进程。
既然 flink 主节点选举的元信息是保存在 zk 上的,查看 zk 的文件树,可以看到当前活跃的主节点是 bigdata113。
Flink 集群启动的同时,也提供了一个 web 控制台,打开 bigdata113:8081,可以看到当前集群的主节点是 bigdata113,也可以打开 bigdata112:8081,但是会自动跳转到 [主节点:8081] 的访问地址。
因为配置了 flink 存储数据在 HDFS 中,目录为 /flink/recovery,打开 HDFS 控制台,查看是否创建了对应的目录和文件。
为了验证是否能实现高可用,即在一个主节点故障的时候,另一个备用主节点能接管主节点的工作,使集群依然可以正常提供服务,将 bigdata113 上的主节点进程杀死 kill -9 2153 ,再次访问 flink web 控制台,发现原来 bigdata113:8081 访问不了了,该访问 bigdata112:8081,可以看到主节点已经变成了 bigdata112。
再次查看 zk 的节点信息,可以看到当前是 bigdata112 持有锁(zk选举通过文件锁来实现选举)。
由此我们验证了 Flink 集群通过 zk 实现的高可用功能。
七、实战
需要引入的依赖(根据使用的编程语言引入):
<!-- Java依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<!-- Scala依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
PS.特别注意如果使用 scala 编写程序,要特别注意 scala 环境的版本跟引入依赖的 scala 版本的兼容关系,比如本地 scala 环境是 2.11.11,但是依赖的 scala 类库版本是 2.11.12,在编写一些 lambda 表达式时就会有编译报错。
1、批处理
使用 Flink 批处理的方式编写一个 WordCount 小程序。
Java 版本
package demo.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCountExample {
public static void main(String[] args) throws Exception {
// 创建一个运行环境执行DataSet API
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 构造一份处理的数据
DataSet<String> source = env.fromElements("I love Beijing","I love China","Beijing is the capital of China");
// Tuple2不同数据类型的集合
DataSet<Tuple2<String, Integer>> mapOut = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//数据:I love Beijing
String[] words = value.split(" ");
for(String w : words) {
out.collect(new Tuple2<String, Integer>(w, 1));
}
}
});
// 安装Tuple下标为0的字段分组,按照下标为1的字段求和
DataSet<Tuple2<String, Integer>> reduceOut = mapOut.groupBy(0).sum(1);
reduceOut.print();
}
}
程序执行结果:
(China,2)
(love,2)
(the,1)
(of,1)
(Beijing,2)
(I,2)
(capital,1)
(is,1)
【解析】首先创建一个批处理的运行环境,通过它加载数据得到 DataSet 对象,最后再对数据集对象执行算子运算,并打印处理结果。
Scala 版本
package demo.batch
import org.apache.flink.api.scala._
object WordCountExampleSc {
def main(args: Array[String]): Unit = {
//创建一个批处理的运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//创建数据集合
val source = env.fromElements("I love Beijing","I love China","Beijing is the capital of China")
//执行单词计数
val result = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
result.print
}
}
2、流处理
使用 Flink 流处理的方式编写一个 WordCount 小程序。
Java 版本
package demo.stream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountWithStream {
public static void main(String[] args) throws Exception {
// 创建一个 DataStream 的运行环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
//senv.setParallelism(2);
// 创建一个输入流
DataStreamSource<String> source = senv.socketTextStream("bigdata111", 1234);
// 数据统计
source.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new WordCount(word, 1));
}
}
}).keyBy("word").sum("count").print().setParallelism(1);
// 启动流式计算
senv.execute("WordCountDemo With Stream");
}
public static class WordCount {
private String word;
private int count;
public WordCount() {}
public WordCount(String word, int count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
执行结果:
【解析】首先创建一个流处理的运行环境,再监听 bigdata111 的 1234 端口,拿到数据再执行算子运算,先分词再创建 WordCount 对象输出,根据对象的 word 字段分组,再对分组的 count 字段计数,最后再打印结果,每个算子操作都可以设置执行的并行度。
Scala 版本
package demo.stream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object WordCountWithStreamSc {
def main(args: Array[String]): Unit = {
// 创建一个 DataStream 的运行环境
val senv = StreamExecutionEnvironment.getExecutionEnvironment
// 得到一个输入流
val source = senv.socketTextStream("bigdata111", 1234)
// 数据执行单词计数: I love Beijing
source.flatMap(_.split(" ")).map(word => WordWithCount(word, 1))
.keyBy("word")
.sum("count")
.print()
.setParallelism(1) // 设置并行度为1,不设默认跟机器CPU的核数一样
// 启动应用
senv.execute("WordCountWithStream")
}
case class WordWithCount(word:String, count:Int)
}
3、窗口计算
在很多场景中,我们不想处理数据流中所有的历史数据,只想处理某个时间窗口内的数据(比如最近3分钟,最近一个小时,最近2天等),Flink 支持这样的时间窗口操作。
还是以 WordCount 小程序为例子,现在只想统计5秒内处理的数据的统计结果,则修改后的程序如下:
package demo.stream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCountJava {
public static void main(String[] args) throws Exception {
// 创建一个 DataStream 的运行环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
//senv.setParallelism(2);
// 创建一个输入流
DataStreamSource<String> source = senv.socketTextStream("bigdata111", 1234);
// 数据统计
source.flatMap(new FlatMapFunction<String, WordCountWithStream.WordCount>() {
@Override
public void flatMap(String value, Collector<WordCountWithStream.WordCount> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new WordCountWithStream.WordCount(word, 1));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
.print().setParallelism(1);
// 启动流式计算
senv.execute("WordCountDemo With timeWindow Stream");
}
}
执行结果:
可以看到虽然同一个单词出现了多次,但是后面输出的统计并不是完整的历史数据的统计,所以时间窗口生效了。
Scala 版本
package demo.stream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object SocketWindowWordCountScala {
def main(args: Array[String]): Unit = {
// 创建一个 DataStream 的运行环境
val senv = StreamExecutionEnvironment.getExecutionEnvironment
// 得到一个输入流
val source = senv.socketTextStream("bigdata111", 1234)
// 数据执行单词计数: I love Beijing
source.flatMap(_.split(" ")).map(word => WordWithCount(word, 1))
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
.print()
.setParallelism(1) // 设置并行度为1,不设默认跟机器CPU的核数一样
// 启动应用
senv.execute("SocketWindowWordCountScala")
}
case class WordWithCount(word:String, count:Int)
}
4、Shell
Flink 内置支持交互式的 Scala Shell,我们既可以在本地安装模式下或者集群模式下运行它。我们可以通过下面的命令在单机模式下启动 Shell。
启动 Flink Scala Shell
bin/start-scala-shell.sh remote bigdata112 8081
执行
val text = benv.fromElements("hello scala", "hello world")
val counts = text.flatMap{_.split(" ")}.map{(_,1)}.groupBy(0).sum(1)
counts.print()
:quit
5、并行度分析
首先,要了解 TaskManager 和 Slot 这两个基本概念的关系,具体如下:
- Flink 的每个 TaskManager 为集群提供 slot。
- slot 的数量通常与每个 TaskManager 节点的可用 CPU 内核数成比例。
- 一般情况下你的 slot 数是你每个节点的 cpu 的核数。
什么是并行度?
一个Flink程序由多个任务组成(source、transformation和 sink)。 一个任务由多个并行的实例(线程)来执行。
一个任务的并行实例(线程)数目就被称为该任务的并行度。
并行度的设置?
-
算子层次(Operator Level)
一个算子、数据源和sink的并行度可以通过调用setParallelism()方法来指定。
eg: .sum("count").setParallelism(1)
-
执行环境层次(Execution Environment Level)
执行环境(任务)的默认并行度可以通过调用setParallelism()方法指定。
eg:senv.setParallelism(1)
-
客户端层次(Client Level)
并行度可以在客户端将job提交到Flink时设定。对于CLI客户端,可以通过-p参数指定并行度./bin/flink run -p 10 WordCount-java.jar。
eg:bin/flink run -p 10
-
系统层次(System Level)
在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度。
eg:fink-conf.yaml的参数parallelism.default: 1
八、DataSet API
常用的 DataSet API 如下:
Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
FlatMap:输入一个元素,可以返回零个,一个或者多个元素
MapPartition:类似map,一次处理一个分区的数据
Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
Aggregate:聚合操作,sum、max、min等
Distinct:返回一个数据集中去重之后的元素
Join:内连接
OuterJoin:外链接
Cross:获取两个数据集的笛卡尔积
Union:返回两个数据集的总和,数据类型需要一致
First-n:获取集合中的前N个元素
1、map、flatMap、mapPartition
map 跟 mapPartition 很类似,但是 mapPartition 会将不同分区的数据单独集中处理。
package demo.dataset;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class Demo1 {
public static void main(String[] args) throws Exception {
// 创建一个 DataSet API 的运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<String>();
data.add("I love Beijing");
data.add("I love China");
data.add("Beijing is the capital of China");
DataSet<String> source = env.fromCollection(data);
System.out.println("***************** map算子 *****************");
source.map(new MapFunction<String, List<String>>() {
@Override
public List<String> map(String value) throws Exception {
String[] words = value.split(" ");
List<String> result = new ArrayList<String>();
for (String w : words) {
result.add(w);
}
return result;
}
}).print();
System.out.println("***************** flatMap算子 *****************");
source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String w : words) {
out.collect(w);
}
}
}).print();
System.out.println("***************** mapPartition算子 *****************");
source.mapPartition(new MapPartitionFunction<String, String>() {
@Override
public void mapPartition(Iterable<String> iterable, Collector<String> out) throws Exception {
// Iterable<String> values:代表该分区中的所有的元素
Iterator<String> paritionData = iterable.iterator();
while (paritionData.hasNext()) {
String data = paritionData.next();
String[] words = data.split(" ");
for (String w : words) {
out.collect(w);
}
out.collect("-----------------------");
}
}
}).print();
}
}
执行结果
***************** map算子 *****************
[I, love, Beijing]
[I, love, China]
[Beijing, is, the, capital, of, China]
***************** flatMap算子 *****************
I
love
Beijing
I
love
China
Beijing
is
the
capital
of
China
***************** mapPartition算子 *****************
I
love
Beijing
-----------------------
I
love
China
-----------------------
Beijing
is
the
capital
of
China
-----------------------
2、filter、distinct
filter 用来数据过滤,Flink SQL 的 where 就是用 filter 来实现的,distince 用来除重。
package demo.dataset;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
public class Demo2 {
public static void main(String[] args) throws Exception {
// 创建一个 DataSet API 的运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<String>();
data.add("I love Beijing");
data.add("I love China");
data.add("Beijing is the capital of China");
DataSet<String> source = env.fromCollection(data);
System.out.println("***************** distinct算子 *****************");
source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String w : words) {
out.collect(w);
}
}
}).distinct().print();
System.out.println("***************** filter算子 *****************");
// 选择长度大于3的单词:I、is、the 单词被过滤掉了
source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String w : words) {
out.collect(w);
}
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.length() > 3 ? true : false;
}
}).print();
}
}
执行结果
***************** distinct算子 *****************
China
love
the
of
Beijing
I
capital
is
***************** filter算子 *****************
love
Beijing
love
China
Beijing
capital
China
3、join、outerjoin
将 DataSet 看成一个数据表,join 类似数据库的表连接。连接有内连接、外连接之分,外连接又有左外连接、右外连接、全外连接。
package demo.dataset;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;
public class JoinDemo {
public static void main(String[] args) throws Exception {
// 创建一个 DataSet API 的运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 用户信息 城市ID 用户名字
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2<Integer, String>(1,"Tom"));
data1.add(new Tuple2<Integer, String>(2,"Mary"));
data1.add(new Tuple2<Integer, String>(3,"Mike"));
data1.add(new Tuple2<Integer, String>(4,"Jone"));
// 城市的信息 城市ID 城市名称
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2<Integer, String>(1,"北京"));
data2.add(new Tuple2<Integer, String>(2,"上海"));
data2.add(new Tuple2<Integer, String>(3,"重庆"));
data2.add(new Tuple2<Integer, String>(4,"深圳"));
// 生成两张表(两个数据集合)
DataSet<Tuple2<Integer, String>> user = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> city = env.fromCollection(data2);
//执行Join
//表示:用于user表的第一个字段连接city表的第一字段
//where user.cityID = city.cityID
user.join(city).where(0).equalTo(0)
// 表示user表 表示city表 结果表:ID号 用户名字 城市的名字
.with(new JoinFunction<Tuple2<Integer, String>,Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> userTable,
Tuple2<Integer, String> cityTable) throws Exception {
return new Tuple3<Integer, String, String>(userTable.f0, userTable.f1, cityTable.f1);
}
})
.print();
}
}
执行结果
(3,Mike,重庆)
(1,Tom,北京)
(4,Jone,深圳)
(2,Mary,上海)
外连接的示例代码如下:
package demo.dataset;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;
public class OuterJoinDemo {
public static void main(String[] args) throws Exception {
// 创建一个 DataSet API 的运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 用户信息 城市ID 用户名字
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2<Integer, String>(1,"Tom"));
//data1.add(new Tuple2<Integer, String>(2,"Mary"));
data1.add(new Tuple2<Integer, String>(3,"Mike"));
data1.add(new Tuple2<Integer, String>(4,"Jone"));
// 城市的信息 城市ID 城市名称
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2<Integer, String>(1,"北京"));
data2.add(new Tuple2<Integer, String>(2,"上海"));
//data2.add(new Tuple2<Integer, String>(3,"重庆"));
data2.add(new Tuple2<Integer, String>(4,"深圳"));
// 生成两张表(两个数据集合)
DataSet<Tuple2<Integer, String>> user = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> city = env.fromCollection(data2);
System.out.println("***************** 左外连接 *****************");
user.leftOuterJoin(city).where(0).equalTo(0)
// 第一张表 user 第二张表city 连接的结果
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> left,
Tuple2<Integer, String> right) throws Exception {
if (right == null) {
return new Tuple3<>(left.f0, left.f1, null);
} else {
return new Tuple3<>(left.f0, left.f1, right.f1);
}
}
}).print();
System.out.println("***************** 右外连接 *****************");
user.rightOuterJoin(city).where(0).equalTo(0)
// 第一张表 user 第二张表city 连接的结果
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> left,
Tuple2<Integer, String> right) throws Exception {
if (left == null) {
return new Tuple3<>(right.f0, null, right.f1);
} else {
return new Tuple3<>(right.f0, left.f1, right.f1);
}
}
}).print();
System.out.println("***************** 全外连接 *****************");
user.fullOuterJoin(city).where(0).equalTo(0)
// 第一张表 user 第二张表city 连接的结果
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> left,
Tuple2<Integer, String> right) throws Exception {
if (left == null) {
return new Tuple3<>(right.f0, null, right.f1);
} else if (right == null) {
return new Tuple3<>(left.f0, left.f1, null);
} else {
return new Tuple3<>(right.f0, left.f1, right.f1);
}
}
}).print();
}
}
执行结果
***************** 左外连接 *****************
(3,Mike,null)
(1,Tom,北京)
(4,Jone,深圳)
***************** 右外连接 *****************
(1,Tom,北京)
(4,Jone,深圳)
(2,null,上海)
***************** 全外连接 *****************
(3,Mike,null)
(1,Tom,北京)
(4,Jone,深圳)
(2,null,上海)
4、笛卡尔积
两个表之间的笛卡尔积,最终得到的结果行数就是原来两个表的行数相乘,列数是原来两个表的列数相加。
package demo.dataset;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;
public class CartesianProductDemo {
public static void main(String[] args) throws Exception {
// 创建一个 DataSet API 的运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 用户信息 城市ID 用户名字
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2<Integer, String>(1,"Tom"));
data1.add(new Tuple2<Integer, String>(2,"Mary"));
data1.add(new Tuple2<Integer, String>(3,"Mike"));
data1.add(new Tuple2<Integer, String>(4,"Jone"));
// 城市的信息 城市ID 城市名称
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2<Integer, String>(1,"北京"));
data2.add(new Tuple2<Integer, String>(2,"上海"));
data2.add(new Tuple2<Integer, String>(3,"重庆"));
data2.add(new Tuple2<Integer, String>(4,"深圳"));
// 生成两张表(两个数据集合)
DataSet<Tuple2<Integer, String>> user = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> city = env.fromCollection(data2);
//生成笛卡尔积
user.cross(city).print();
}
}
执行结果
((1,Tom),(1,北京))
((1,Tom),(2,上海))
((1,Tom),(3,重庆))
((1,Tom),(4,深圳))
((2,Mary),(1,北京))
((2,Mary),(2,上海))
((2,Mary),(3,重庆))
((2,Mary),(4,深圳))
((3,Mike),(1,北京))
((3,Mike),(2,上海))
((3,Mike),(3,重庆))
((3,Mike),(4,深圳))
((4,Jone),(1,北京))
((4,Jone),(2,上海))
((4,Jone),(3,重庆))
((4,Jone),(4,深圳))
5、first-N
获取集合中的前N个元素
package demo.dataset;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;
public class FirstNDemo {
public static void main(String[] args) throws Exception {
// 创建一个运行环境执行DataSet API
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 用户信息 城市ID 用户名字
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2<Integer, String>(1,"Tom"));
data1.add(new Tuple2<Integer, String>(2,"Mary"));
data1.add(new Tuple2<Integer, String>(3,"Mike"));
data1.add(new Tuple2<Integer, String>(4,"Jone"));
data1.add(new Tuple2<Integer, String>(5,"Jerry"));
data1.add(new Tuple2<Integer, String>(4,"Alice"));
DataSet<Tuple2<Integer, String>> source = env.fromCollection(data1);
// 取出前三条数据,按照插入的顺序
source.first(3).print();
System.out.println("----------------------");
// 分组:按照第一个字段进行分组,取出每组中的两条记录
source.groupBy(0).first(2).print();
}
}
执行结果
(1,Tom)
(2,Mary)
(3,Mike)
----------------------
(3,Mike)
(1,Tom)
(4,Jone)
(4,Alice)
(5,Jerry)
(2,Mary)
九、DataStream API
Flink DataStream 程序是实现了 data streams 转换操作(比如过滤、更新状态、定义窗口、聚合等等)的一般程序。data streams 最初可以从多种 sources 创建(消息队列,socket 流,文件等)。结果可以通过 sink 返回。例如写出到文件或者标准输出(例如终端命令行)。Flink 程序可以在多种环境下运行,单节点、或嵌入到其他程序。可以在本地 JVM 或很多机器组成的集群上执行。
1、数据源
Source 是程序的数据源输入,你可以通过 StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序添加一个 source。Flink 提供了大量的已经实现好的 source 方法,你也可以自定义source。
通过实现 sourceFunction 接口来自定义无并行度的 source,或者你也可以通过实现 ParallelSourceFunction 接口 or 继承 RichParallelSourceFunction 来自定义有并行度的source。
(1)基本数据源
使用基本数据源不需要自定义 SourceFunction,示例代码如下:
package demo.datastream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class StreamingFromCollection {
public static void main(String[] args) throws Exception {
// 创建一个流计算的运行环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建集合
ArrayList<Integer> data = new ArrayList<>();
data.add(10);
data.add(20);
data.add(30);
data.add(40);
// 创建数据源source
DataStreamSource<Integer> source = senv.fromCollection(data);
// 执行Transformation的操作,每个数字加1
DataStream<Integer> result = source.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer integer) throws Exception {
return integer+1;
}
});
// 设置并行度
result.print().setParallelism(2);
senv.execute("StreamingFromCollection");
}
}
(2)自定义数据源
自定义数据源需要编写实现 SourceFunction 接口的类。
该接口中有两个接口方法:
void run(SourceContext<T> ctx) throws Exception;
void cancel();
run 方法定义了数据源是如何产生数据的,cancel 方法会在数据源被停止时调用。
package demo.datastream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
// 创建一个 并行度为1的数据源
public class MyNoParalleSource implements SourceFunction<Long> {
// 定义一个计数器和开关
private long count = 1;
private boolean isRunning = true;
// 如何产生数据
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while(isRunning){
ctx.collect(count);
count++;
Thread.sleep(1000); // 每秒产生一条数据
}
}
// 需要停止数据源的时候调用
@Override
public void cancel() {
isRunning = false;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = senv.addSource(new MyNoParalleSource());
// .setParallelism(2); // 不能设置并行度,自定义source并行度就是1
DataStream<Long> data = source.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
//System.out.println("收到的数据是:" + value);
return value;
}
});
data.print();
senv.execute("MyNoParalleSourceMain");
}
}
执行结果
打印出来的执行结果中,左边表示由机器的哪个CPU来执行的,右边是结果。由于程序打印的操作没有设置并行度,默认情况下并行度就是机器的CPU核数,运行的PC是16核,所以最大显示16,多个线程在不同的CPU中交替执行程序。
但是这里没法对执行环境设置并行度的,否则会报错,如果要支持执行环境设置并行度,需要实现 ParallelSourceFunction 接口。
package demo.datastream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
public class MyParalleSource implements ParallelSourceFunction<Long> {
private long count = 1;
private boolean isRunning = true;
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while(isRunning){
ctx.collect(count);
count++;
Thread.sleep(1000); // 每秒产生一条数据
}
}
@Override
public void cancel() {
isRunning = false;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = senv.addSource(new MyParalleSource()).setParallelism(2);
DataStream<Long> data = source.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
//System.out.println("收到的数据是:" + value);
return value;
}
});
// 每2秒钟处理一次数据:求和
DataStream<Long> result = data.timeWindowAll(Time.seconds(2)).sum(0);
result.print();
//data.print();
senv.execute("MyParalleSourceMain");
}
}
2、connector
Flink 中 connector 的概念跟 Kafka connector 特别类似,官方支持以下版本(1.7.x):
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- FileSystem (sink)
- RabbitMQ (source/sink)
- Google PubSub (source/sink)
- Hybrid Source (source)
- Apache NiFi (source/sink)
- Apache Pulsar (source)
- JDBC (sink)
具体使用参考官方文档:https://nightlies.apache.org/flink/flink-docs-master/docs/connectors。
3、Transformation
在 DataStream 中,各种算子的操作跟 DataSet API 非常类似,常用的算子如下:
- map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
- flatmap:输入一个元素,可以返回零个,一个或者多个元素
- filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
- keyBy:根据指定的key进行分组,相同key的数据会进入同一个分区【典型用法见备注】
- reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
- aggregations:sum(),min(),max()等
- window:窗口
- union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。
- connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。
- CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap
- Split:根据规则把一个数据流切分为多个流
- Select:和split配合使用,选择切分后的流
用法跟 DataSet 高度类似。
(1)filter
使用 filter 过滤出是偶数的数据
package demo.datastream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FilterDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> input = senv.addSource(new MyNoParalleSource()).setParallelism(1);
input.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value % 2 == 0 ? true : false;
}
}).print();
senv.execute("Filter Stream demo");
}
}
(2)union
union 可以连接合并多个流,但是流中的数据类型必须一致。
package demo.datastream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class UnionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// 添加数据源
DataStreamSource<Long> source1 = senv.addSource(new MyNoParalleSource());
DataStreamSource<Long> source2 = senv.addSource(new MyNoParalleSource());
// union操作合并的数据源的类型必须一样
DataStream<Long> data = source1.union(source2);
data.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("收到的数据是:" + value);
return value;
}
});
senv.execute("Stream Union demo");
}
}
MyNoParalleSource 是一个产生累加计数的 source,两个相同的 source 合并,最后输出的结果应该是每个整数都被处理了两次,执行结果如下:
(3)connect
connect 只可以合并两个流,但是流的数据类型可以不一样,并且针对流中的数据分别进行处理。
package demo.datastream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
public class ConnectDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// 添加数据源
DataStreamSource<Long> source1 = senv.addSource(new MyNoParalleSource());
DataStream<String> source2 = source1.map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "String: " + value;
}
});
// 使用connect连接这两个数据源,不同类型的数据源
ConnectedStreams<Long, String> stream = source1.connect(source2);
// 使用map算子分别对long和string进行处理
stream.map(new CoMapFunction<Long, String, Object>() {
@Override
public Object map1(Long value) throws Exception {
return "对Long类型进行处理:" + value;
}
@Override
public Object map2(String value) throws Exception {
return "对String类型进行处理:" + value;
}
}).print().setParallelism(1);
senv.execute("Stream Connect demo");
}
}
source1 的数据类型是 Long,source2 的数据类型是 String,两个流连接之后得到的 ConnectedStream 对象的 map 算子需要传入一个 CoMapFunction 的对象,会根据 ConnectedStream 中的数据类型生成多个处理方法。
执行结果:
(4)split
split 可以按照一定的规则(自行定义)将一个流切分为多个流,在实际工作中非常有用。
比如源数据流中混合了多种类似的数据,多种类型的数据处理规则不一样,所以就可以在根据一定的规则,把一个数据流切分成多个数据流,这样每个数据流就可以使用不用的处理逻辑了。
下面的代码演示了将一个整型数据流,根据奇数偶数切分为两个流。
package demo.datastream;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class SplitDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = senv.addSource(new MyNoParalleSource());
// 根据奇偶数切分为两个流
SplitStream<Long> splitStream = source.split(new OutputSelector<Long>() {
@Override
public Iterable<String> select(Long value) {
// 定义一个List
ArrayList<String> selector = new ArrayList<>();
if (value % 2 == 0) {
selector.add("even"); // 给当前的数字打上标签
} else {
selector.add("odd");
}
return selector;
}
});
// 选择所有的偶数
DataStream<Long> result = splitStream.select("even");
result.print();
senv.execute("Stream Split demo");
}
}
流切分的时候,将数据打上不同的标签,使用时再选择相应的标签。
(5)partition
基于特定的 key 对一个数据类型是元组的 DataStream 进行分区,使用时需要传入一个 partitioner(里面定义了分区规则)和一个用来做分区的属性(指定元组属性的下标)。
下面的代码演示了将数据流划分成两个分区。
package demo.datastream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class PartitionerDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = senv.addSource(new MyNoParalleSource());
// 数据类型转换,把 Long --> Tuple1<Long>
DataStream<Tuple1<Long>> data = source.map(new MapFunction<Long, Tuple1<Long>>() {
@Override
public Tuple1<Long> map(Long value) throws Exception {
return new Tuple1<Long>(value);
}
});
// 分区
DataStream<Tuple1<Long>> partitionStream = data.partitionCustom(new MyPartitioner(), 0);
partitionStream.map(new MapFunction<Tuple1<Long>, Long>() {
@Override
public Long map(Tuple1<Long> value) throws Exception {
Long data = value.f0;
System.out.println("线程ID:" + Thread.currentThread().getId() + "\t数据:" + data);
return data;
}
}).print();
senv.execute("Stream Partitioner demo");
}
}
// 定义分区规则
class MyPartitioner implements Partitioner<Long> {
@Override
public int partition(Long key, int numPartitions) {
if (key % 2 == 0) {
return 0;
} else {
return 1;
}
}
}
4、DataSink
source 是获取数据源的组件,sink 就是将数据处理完保存到其他的地方,比如 mysql、kafka、redis 等。同样的,Flink 也内置了很多 sink
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- FileSystem (sink)
- RabbitMQ (source/sink)
- Google PubSub (source/sink)
- Apache NiFi (source/sink)
- JDBC (sink)
下面的程序演示了,如何将经过 Flink 处理后的数据 sink 到 Redis 中。
首先,需要引入对应的依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
处理程序先送 1234 端口读取数据流处理,再将处理后的数据发送给 redis。
package demo.datastream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
public class WordCountDemoToRedis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream = senv.socketTextStream("bigdata111", 1234);
DataStream<WordCount> result = stream.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> out) throws Exception {
String[] words = value.split(" ");
for (String w : words) {
out.collect(new WordCount(w, 1));
}
}
}).keyBy("word").sum("count");
// 创建Redis Sink
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setHost("bigdata111")
.setPort(6379).build();
RedisSink<WordCount> sink = new RedisSink(conf, new RedisMapper<WordCount>() {
@Override
public RedisCommandDescription getCommandDescription() {
// 要执行什么操作
return new RedisCommandDescription(RedisCommand.SET);
}
@Override
public String getKeyFromData(WordCount data) {
return data.getWord();
}
@Override
public String getValueFromData(WordCount data) {
return String.valueOf(data.getCount());
}
});
// 将result保存到redis中
result.addSink(sink);
// 启动流式计算
senv.execute("WordCountDemoToRedis with Stream");
}
public static class WordCount {
private String word;
private Integer count;
public WordCount() {}
public WordCount(String word, Integer count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
}
}
执行结果
十、高级特性
1、广播变量
广播变量允许在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给 tasks。广播变量创建后,它可以运行在集群中的任何 function 上,而不需要多次传递给集群节点,另外需要记住,不应该修改广播变量,这样才能保证每个节点获取到的值是一致的。
广播变量可以理解为一个公用的共享变量,我们可以把一个 dataset 广播出去,然后不同的 task 在节点上都能获取到,数据在每个节点上只会存在一份。如果不使用 broadcast,则在每个节点的每个 task 中都需要拷贝一份 dataset 数据集,比较浪费内存(也就是一个节点中可能会存在多份 dataset 数据)。
广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束。
广播变量常用于计算中,经常需要获取一个数据集辅助运算,这样较少内存开销。
package demo.feature;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class BroadCastDemo {
public static void main(String[] args) throws Exception {
// 执行离线计算 DataSet,创建一个运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建需要广播的数据
List<Tuple2<String, Integer>> people = new ArrayList<>();
people.add(new Tuple2<>("Tom", 23));
people.add(new Tuple2<>("Mary", 30));
people.add(new Tuple2<>("Mike", 21));
DataSet<Tuple2<String, Integer>> peopleData = env.fromCollection(people);
// 创建广播变量
DataSet<HashMap<String, Integer>> broadcast = peopleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
HashMap<String, Integer> result = new HashMap<>();
result.put(value.f0, value.f1);
return result;
}
});
// 执行一个计算
DataSet<String> source = env.fromElements("Tom", "Mary", "Mike");
DataSet<String> result = source.map(new RichMapFunction<String, String>() {
// 定义一个变量保存结果
HashMap<String, Integer> allPeople = new HashMap<>();
@Override
public void open(Configuration parameters) throws Exception {
// 获取广播变量
super.open(parameters);
List<HashMap<String, Integer>> data = this.getRuntimeContext().getBroadcastVariable("peopleData");
for (HashMap<String, Integer> d : data) {
allPeople.putAll(d);
}
}
@Override
public String map(String name) throws Exception {
// 根据姓名从变量中获取年龄
Integer age = allPeople.get(name);
return "姓名:" + name + ", 年龄:" + age;
}
}).withBroadcastSet(broadcast, "peopleData");
result.print();
}
}
上面的代码演示了这么一个场景,要根据姓名找到年龄,首先将姓名年龄对应关系的数据集保存在一个数据集中(或者从外部加载),然后配置为广播变量,这样执行计算的多个 task 运算时,不需要每个 task 都去保存数据。
执行结果:
姓名:Tom, 年龄:23
姓名:Mary, 年龄:30
姓名:Mike, 年龄:21
2、累加器和计数器
Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化。可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Counter是一个具体的累加器(Accumulator)实现,具体有:IntCounter, LongCounter 和 DoubleCounter。
package demo.feature;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
// 不使用累加器的情况
public class MyNoneCounter {
public static void main(String[] args) throws Exception {
//执行离线计算DataSet,创建一个运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> data = env.fromElements("Tom","Mary","Mike","Jerry");
//计算一共有多少个人
DataSet<Integer> result = data.map(new RichMapFunction<String, Integer>() {
//定义一个计数器
private int total = 0;
@Override
public Integer map(String value) throws Exception {
total ++ ;
System.out.println(Thread.currentThread().getId() + "\t 当前的和是:" + total);
return total;
}
}).setParallelism(4);
result.print();
}
}
package demo.feature;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
public class AccumulatorDemo {
public static void main(String[] args) throws Exception {
//执行离线计算DataSet,创建一个运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> data = env.fromElements("Tom","Mary","Mike","Jerry");
DataSet<Integer> result = data.map(new RichMapFunction<String, Integer>() {
//定义一个计数器
private IntCounter intCounter = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
// TODO Auto-generated method stub
super.open(parameters);
//注册一个计数器
this.getRuntimeContext().addAccumulator("mycounter", intCounter);
}
@Override
public Integer map(String value) throws Exception {
//实现计数的功能
this.intCounter.add(1);
return 0;
}
});
//result.print();
result.writeAsText("d:\\download\\result.txt");
//取出计数器的结果
int total = env.execute("AccumulatorDemo").getAccumulatorResult("mycounter");
System.out.println("最后的结果是:" + total);
}
}
十一、状态管理和恢复
在程序执行的过程中,如果没有包含状态管理。如果一个 task 在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。
1、状态
State 一般指一个具体的 task/operator的状态【state数据默认保存在java的堆内存中】。State可以被记录,在失败的情况下数据还可以恢复。
Flink中有两种基本类型的State:
- Keyed State
- Operator State
Keyed State
顾名思义,就是基于 KeyedStream 上的状态。这个状态是跟特定的 key 绑定的,对 KeyedStream 流上的每一个 key,都对应一个 state。
- ValueState<T>:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。
- ListState<T>:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable<T>来遍历状态值。
- ReducingState<T>:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。
- MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素。
Operator State
与Key无关的State,与Operator绑定的state,整个operator只对应一个state保存state的数据结构,例如:ListState<T> 。
比如,Flink中的 Kafka Connector,就使用了operator state。它会在每个 connector 实例中,保存该实例中消费topic的所有 (partition, offset) 映射。
示例程序:
package demo.state;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class CountWindowTotal {
public static void main(String[] args) throws Exception {
// 创建一个流计算的运行环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.fromElements(Tuple2.of(1, 1),
Tuple2.of(1, 2),
Tuple2.of(1, 3),
Tuple2.of(1, 4),
Tuple2.of(1, 5),
Tuple2.of(1, 6))
.keyBy(0)
.flatMap(new MyRichFlatMapFunction())
.print()
.setParallelism(1);
senv.execute("Count Window Total");
}
}
class MyRichFlatMapFunction extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private ValueState<Tuple2<Integer, Integer>> state;
@Override
public void open(Configuration parameters) throws Exception {
// 初始化状态
ValueStateDescriptor<Tuple2<Integer, Integer>> descriptor = new ValueStateDescriptor<Tuple2<Integer, Integer>>
("mystate", // 给状态起一个名字
TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){}), // 状态类型
Tuple2.of(0, 0)); // 状态的初始值
state = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2<Integer, Integer> value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
//获取当前的状态
Tuple2<Integer, Integer> currentState = state.value();
//执行累加并计数
currentState.f0 ++; //个数加一
currentState.f1 = currentState.f1 + value.f1;
//更新状态
state.update(currentState);
//判断是否达到了三个数字
if(currentState.f0 >= 3) {
//输出原来已经得到的结果 key 结果
out.collect(new Tuple2<Integer, Integer>(currentState.f0, currentState.f1));
//清空状态重新计算
state.clear();
}
}
}
这里在 MapFunction 内部定义了一个私有成员,类型为 ValueState,用来保存程序执行过程中产生的状态数据,由于是没三个数字计数一次,所以每满三个就输出数据并且清空初始化 state 对象。
最终执行结果如下:
(3,6)
(3,15)
2、检查点
Checkpoint【可以理解为checkpoint是把state数据持久化存储了】,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有 task/operator 的状态。
为了保证state的容错性,Flink需要对state进行Checkpoint。Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来。当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。
Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提:持久化的source,它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)。
要启动检查点,必须进行相关的配置:
- 默认 checkpoint 功能是 disabled 的,想要使用的时候需要先启用
- checkpoint 开启之后,默认的 checkPointMode 是 Exactly-once
- checkpoint 的 checkPointMode 有两种,Exactly-once和At-least-once
- Exactly-once 对于大多数应用来说是最合适的。At-least-once 可能用在某些延迟超低的应用程序(始终延迟为几毫秒)
在代码中启动检查点只需要加一行代码:
// 创建一个流计算的运行环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点
senv.enableCheckpointing(1000); // 每1000ms执行一个检查点
// 其他相关配置
// advanced options:
// 设置模式为exactly-once 默认(this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确保检查点之间有进行500 ms的进度
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// 检查点必须在一分钟内完成,或者被丢弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 同一时间只允许进行一个检查点
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
3、state backend
默认情况 state 会保存在 taskmanager 的内存中,checkpoint 会存储在 JobManager 的内存中。state 的store 和 checkpoint 的位置取决于 State Backend 的配置。一共有三种 State Backend:
- MemoryStateBackend:state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中。基于内存的state backend在生产环境下不建议使用。
- FsStateBackend:state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中。可以使用hdfs等分布式文件系统。
- RocksDBStateBackend:RocksDB是一个为更快速存储而生的,可嵌入的持久型的key-value存储,相当于数据库存储。
这里演示使用 HDFS 作为后端存储的 WordCount 程序,首先引入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
示例程序:
package demo.state;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class SocketWordCountWithCheckpoint {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置检查点
senv.enableCheckpointing(1000);
// 设置后端存储在HDFS上
senv.setStateBackend(new FsStateBackend("hdfs://bigdata111:9000/flink/ckpt"));
// 执行一个简单的流式计算
senv.socketTextStream("bigdata111", 1234).flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// 数据:I love Guangzhou and love Shantou
String[] words = value.split(" ");
for(String w:words) {
out.collect(new Tuple2<String, Integer>(w,1));
} }
}).keyBy(0).sum(1).print();
senv.execute("SocketWordCountWithCheckpoint");
}
}
处理过程
十二、Flink Table & SQL
Apache Flink 具有两个关系型API:Table API 和SQL,用于统一流和批处理。
需要引入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
1、Table
Table API 是用于Scala 和 Java 语言的查询API,允许以非常直观的方式组合关系运算符的查询,例如 select,filter 和 join。
批处理
package demo.table;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.util.Collector;
public class WordCountBatchTable {
public static void main(String[] args) throws Exception {
// 初始化环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.getTableEnvironment(env);
// 数据源
DataSet<String> source = env.fromElements("I love Guangzhou",
"I love Guangdong",
"Gunangzhou is the capital of Guangdong");
DataSet<WordCount> data = source.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> out) throws Exception {
String[] words = value.split(" ");
for (String w : words) {
out.collect(new WordCount(w, 1));
}
}
});
// 创建一张表
Table table = tEnv.fromDataSet(data);
Table result = table.groupBy("word").select("word, count.sum as count");
// 转换回 DataSet
tEnv.toDataSet(result, WordCount.class).print();
}
public static class WordCount {
public String word;
public Integer count;
public WordCount() {}
public WordCount(String word, Integer count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
执行结果:
WordCount{word='love', count=2}
WordCount{word='the', count=1}
WordCount{word='Guangdong', count=2}
WordCount{word='Guangzhou', count=1}
WordCount{word='Gunangzhou', count=1}
WordCount{word='of', count=1}
WordCount{word='I', count=2}
WordCount{word='capital', count=1}
WordCount{word='is', count=1}
流处理
package demo.table;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
public class WordCountStreamTable {
public static void main(String[] args) throws Exception {
// 初始化环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(senv);
// 数据源
DataStreamSource<String> stream = senv.socketTextStream("bigdata111", 1234);
DataStream<WordCount> data = stream.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> out) throws Exception {
String[] words = value.split(" ");
for (String w : words) {
out.collect(new WordCount(w, 1));
}
}
});
// 创建一张表
Table table = tEnv.fromDataStream(data);
Table result = table.groupBy("word").select("word, count.sum as count");
// 转换会 DataStream
tEnv.toRetractStream(result, WordCount.class).print();
senv.execute();
}
public static class WordCount {
public String word;
public Integer count;
public WordCount() {}
public WordCount(String word, Integer count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
执行结果
2、SQL
Flink SQL 的支持是基于实现了SQL标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。
批处理
package demo.sql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.util.Collector;
public class WordCountBatchSQL {
public static void main(String[] args) throws Exception {
// 初始化环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.getTableEnvironment(env);
// 数据源
DataSet<String> source = env.fromElements("I love Guangzhou",
"I love Guangdong",
"Gunangzhou is the capital of Guangdong");
DataSet<WordCount> data = source.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> out) throws Exception {
String[] words = value.split(" ");
for (String w : words) {
out.collect(new WordCount(w, 1));
}
}
});
// 注册一张表 表名 数据集 列名
tEnv.registerDataSet("WordCount", data, "word,frequency");
Table table = tEnv.sqlQuery("select word,sum(frequency) as frequency from WordCount group by word");
// 输出
tEnv.toDataSet(table, WordCount.class).print();
}
public static class WordCount {
public String word;
public Integer frequency;
public WordCount() {}
public WordCount(String word, Integer frequency) {
this.word = word;
this.frequency = frequency;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Integer getFrequency() {
return frequency;
}
public void setFrequency(Integer frequency) {
this.frequency = frequency;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", frequency=" + frequency +
'}';
}
}
}
执行结果
WordCount{word='love', frequency=2}
WordCount{word='the', frequency=1}
WordCount{word='Guangdong', frequency=2}
WordCount{word='Guangzhou', frequency=1}
WordCount{word='Gunangzhou', frequency=1}
WordCount{word='of', frequency=1}
WordCount{word='I', frequency=2}
WordCount{word='capital', frequency=1}
WordCount{word='is', frequency=1}
流处理
package demo.sql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
public class WordCountStreamSQL {
public static void main(String[] args) throws Exception {
// 初始化环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(senv);
// 数据源
DataStreamSource<String> stream = senv.socketTextStream("bigdata111", 1234);
DataStream<WordCount> data = stream.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> out) throws Exception {
String[] words = value.split(" ");
for (String w : words) {
out.collect(new WordCount(w, 1));
}
}
});
//注册表
Table table = tEnv.fromDataStream(data,"word,frequency");
Table tableResult = tEnv.sqlQuery("select word,sum(frequency) as frequency from " + table + " group by word");
//输出
tEnv.toRetractStream(tableResult, WordCount.class).print();
senv.execute();
}
public static class WordCount {
public String word;
public Integer frequency;
public WordCount() {}
public WordCount(String word, Integer frequency) {
this.word = word;
this.frequency = frequency;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Integer getFrequency() {
return frequency;
}
public void setFrequency(Integer frequency) {
this.frequency = frequency;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", frequency=" + frequency +
'}';
}
}
}
执行结果
3、SQL Client
使用 SQL Client 可以很方便的书写、调试和提交一个任务到Flink集群运行,使用Flink SQL Client不需要书写一行 Java 或者 Scala 代码。
由于 Flink 版本使用的是 flink-1.7.2-bin-hadoop27-scala_2.11.tgz,运行SQL Client需要将Flink运行在Yarn之上。
bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -d
启动 SQL Client
bin/sql-client.sh embedded
执行以下 SQL
Flink SQL> SELECt name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS Nametable(name) GROUP BY name;
结果如下