Apache Flink建立模板专案有2种方式:
1. 通过Maven archetype命令建立;
2. 通过Flink 提供的Quickstart shell指令码建立;
关于Apache Flink的环境搭建,请参考相关连结:
Apache Flink快速入门-基本架构、核心概念和执行流程
Apache Flink v1.8 本地单机环境安装和执行Flink应用
1. 通过Maven archetype建立Flink专案
#使用Maven建立mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.8.0
-DgroupId=com.rickie
-DartifactId=flink-tutorial
-Dversion=0.1
-Dpackage=com.rickie.tutorial
-DinteractiveMode=false
引数说明:
原型archetype有关引数表

专案相关引数:

通过上述mvn 命令建立的Java模板专案结构。

从上述专案结构可以看出,该专案是一个比较完善的Maven专案,其中Java程式码部分,BatchJob.java和StreamingJob.java 分别对应Flink 批量界面DataSet的例项程式码和流式界面DataStream的例项程式码。
2. 编写业务程式码
将上述专案汇入到IDEA中,Flink应用程序模板如下图所示。
开启StreamingJob.java档案,实现简单的单词统计(Word Count)业务功能。
具体程式码如下所示。
package com.rickie.tutorial;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Skeleton for a Flink Streaming Job.
*
*
For a tutorial how to write a Flink streaming application, check the
* tutorials and examples on the Flink Website.
*
*
To package your application into a JAR file for execution, run
* \'mvn clean package\' on the command line.
*
*
If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for \'mainClass\').
*/
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
// 设定streaming执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 连线socket获取输入的资料
DataStream text = env.socketTextStream("127.0.0.1", 9000);
// split("\\W+") 使用非数字字母切分字串
DataStream> dataStream = text.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String s, Collector> collector) throws Exception {
String[] tokens = s.toLowerCase().split("\\W+");
for(String token : tokens) {
if(token.length() > 0) {
collector.collect(new Tuple2(token, 1));
}
}
}
}).keyBy(0).timeWindow(Time.seconds(5)).sum(1);
dataStream.print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}
3. IDEA中执行应用、测试执行效果
首先,使用nc命令启动一个本地监听9000埠,和上一步程式码中的埠号保持一致。nc -l -p 9000

然后,在IDEA中执行StreamingJob,和本地server socket 9000埠建立连线,如下图所示。

接着,在nc命令视窗,输入一些单词,如下所示。
StreamingJob应用根据实现的业务逻辑,进行单词聚合,并输出。单词在5秒的时间视窗(翻滚时间视窗)中计算并打印到stdout。
说明:timeWindow(Time.seconds(5)),只有一个引数,表示是翻滚时间视窗(Tumbling window),即不重叠的时间视窗,只统计本视窗内的资料。

因为没启动Flink服务,所以去localhost:8081的web UI中进行监控。程式码 StreamExecutionEnvironment.getExecutionEnvironment()会建立一个LocalEnvironment,然后在Java虚拟机器上执行。
在Linux/Flink单机模式下执行
Linux 单机模式下启动Flink相当简单,直接执行bin/start-cluster.sh,会启动Flink的JobManager和TaskManager两个程序。如果想将上述程式提交到Flink,需要执行maven命令打成jar包,然后在命令列中,进入到bin目录下执行 flink run xxx/xxx/xxx.jar 即可,输出结果会在TaskManager的服务视窗中输出。
4. 使用IDEA开发批计算应用
Flink支援DataSet API 用于处理批量资料,资料集通过source进行初始化,例如读取档案或者序列化集合,然后通过transformation(filtering、mapping、joining、grouping)完成资料集转换操作,然后通过sink进行储存,既可以写入HDFS这种分散式档案系统,也可以打印控制台。在IDEA中,开启BatchJob.java档案,编写如下程式码,实现批量计算逻辑。
package com.rickie.tutorial;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Skeleton for a Flink Batch Job.
*
*
For a tutorial how to write a Flink batch application, check the
* tutorials and examples on the Flink Website.
*
*
To package your application into a JAR file for execution,
* change the main class in the POM.xml file to this class (simply search for \'mainClass\')
* and run \'mvn clean package\' on the command line.
*/
public class BatchJob {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 通过字串构建资料集
DataSet text = env.fromElements(
"I am Rickie ",
"Hello Rickie",
"Good morning ... Rickie"
);
// 分割字串,按照key进行分组,统计相同的key个数
DataSet> wordCount = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
wordCount.print();
// execute program
// env.execute("Flink Batch Java API Skeleton");
}
// 分割字串的方法
public static class LineSplitter implements FlatMapFunction> {
@Override
public void flatMap(String line, Collector> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2(word, 1));
}
}
}
}
在IDEA中执行,检视输出的单词聚合结果。






























