Flink入门案例

需求:读取本地数据文件,统计文件中每个单词出现的次数。


(资料图片仅供参考)

一、IDEA Project创建及配置

本案例编写Flink代码选择语言为Java和Scala,所以这里我们通过IntelliJ IDEA创建一个目录,其中包括Java项目模块和Scala项目模块,将Flink Java api和Flink Scala api分别在不同项目模块中实现。步骤如下:

1、打开IDEA,创建空项目

2、在IntelliJ IDEA 中安装Scala插件

使用IntelliJ IDEA开发Flink,如果使用Scala api 那么还需在IntelliJ IDEA中安装Scala的插件,如果已经安装可以忽略此步骤,下图为以安装Scala插件。

3、打开Structure,创建项目新模块

创建Java模块:

继续点击"+",创建Scala模块:

创建好"FlinkScalaCode"模块后,右键该模块添加Scala框架支持,并修改该模块中的"java"src源为"scala":

在"FlinkScalaCode"模块Maven pom.xml中引入Scala依赖包,这里使用的Scala版本为2.12.10。

  org.scala-lang  scala-library  2.12.10  org.scala-lang  scala-compiler  2.12.10  org.scala-lang  scala-reflect  2.12.10

4、Log4j日志配置

为了方便查看项目运行过程中的日志,需要在两个项目模块中配置log4j.properties配置文件,并放在各自项目src/main/resources资源目录下,没有resources资源目录需要手动创建并设置成资源目录。log4j.properties配置文件内容如下:

log4j.rootLogger=ERROR, consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} %p %c{2}: %m%n

复制

并在两个项目中的Maven pom.xml中添加对应的log4j需要的依赖包,使代码运行时能正常打印结果:

  org.slf4j  slf4j-log4j12  1.7.36  org.apache.logging.log4j  log4j-to-slf4j  2.17.2

5、分别在两个项目模块中导入Flink Maven依赖

"FlinkJavaCode"模块导入Flink Maven依赖如下:

  UTF-8  1.8  1.8  1.16.0  1.7.36  2.17.2        org.apache.flink    flink-clients    ${flink.version}          org.slf4j    slf4j-log4j12    ${slf4j.version}        org.apache.logging.log4j    log4j-to-slf4j    ${log4j.version}  

"FlinkScalaCode"模块导入Flink Maven依赖如下:

  UTF-8  1.8  1.8  1.16.0  1.7.31  2.17.1  2.12.10  2.12        org.apache.flink    flink-scala_${scala.binary.version}    ${flink.version}        org.apache.flink    flink-streaming-scala_${scala.binary.version}    ${flink.version}        org.apache.flink    flink-clients    ${flink.version}          org.scala-lang    scala-library    ${scala.version}        org.scala-lang    scala-compiler    ${scala.version}        org.scala-lang    scala-reflect    ${scala.version}          org.slf4j    slf4j-log4j12    ${slf4j.version}        org.apache.logging.log4j    log4j-to-slf4j    ${log4j.version}  

注意:在后续实现WordCount需求时,Flink Java Api只需要在Maven中导入"flink-clients"依赖包即可,而Flink Scala Api 需要导入以下三个依赖包:

flink-scala_${scala.binary.version}flink-streaming-scala_${scala.binary.version}flink-clients

主要是因为在Flink1.15版本后,Flink添加对opting-out(排除)Scala的支持,如果你只使用Flink的Java api,导入包不必包含scala后缀,如果使用Flink的Scala api,需要选择匹配的Scala版本。

二、案例数据准备

在项目"MyFlinkCode"中创建"data"目录,在目录中创建"words.txt"文件,向文件中写入以下内容,方便后续使用Flink编写WordCount实现代码。

hello Flinkhello MapReducehello Sparkhello Flinkhello Flinkhello Flinkhello Flinkhello Javahello Scalahello Flinkhello Javahello Flinkhello Scalahello Flinkhello Flinkhello Flink

三、案例实现

数据源分为有界和无界之分,有界数据源可以编写批处理程序,无界数据源可以编写流式程序。DataSet API用于批处理,DataStream API用于流式处理。

批处理使用ExecutionEnvironment和DataSet,流式处理使用StreamingExecutionEnvironment和DataStream。DataSet和DataStream是Flink中表示数据的特殊类,DataSet处理的数据是有界的,DataStream处理的数据是无界的,这两个类都是不可变的,一旦创建出来就无法添加或者删除数据元。

1、Flink 批数据处理案例

Java版本WordCount

使用Flink Java Dataset api实现WordCount具体代码如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1.读取文件DataSource linesDS = env.readTextFile("./data/words.txt");//2.切分单词FlatMapOperator wordsDS =        linesDS.flatMap((String lines, Collector collector) -> {    String[] arr = lines.split(" ");    for (String word : arr) {        collector.collect(word);    }}).returns(Types.STRING);//3.将单词转换成Tuple2 KV 类型MapOperator> kvWordsDS =        wordsDS.map(word -> new Tuple2<>(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.按照key 进行分组处理得到最后结果并打印kvWordsDS.groupBy(0).sum(1).print();

Scala版本WordCount

使用Flink Scala Dataset api实现WordCount具体代码如下:

//1.准备环境,注意是Scala中对应的Flink环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.api.scala._//3.读取数据文件val linesDS: DataSet[String] = env.readTextFile("./data/words.txt")//4.进行 WordCount 统计并打印linesDS.flatMap(line => {  line.split(" ")})  .map((_, 1))  .groupBy(0)  .sum(1)  .print()

以上无论是Java api 或者是Scala api 输出结果如下,显示的最终结果是统计好的单词个数。

(hello,15)(Spark,1)(Scala,2)(Java,2)(MapReduce,1)(Flink,10)

2、Flink流式数据处理案例

Java版本WordCount

使用Flink Java DataStream api实现WordCount具体代码如下:

//1.创建流式处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.读取文件数据DataStreamSource lines = env.readTextFile("./data/words.txt");//3.切分单词,设置KV格式数据SingleOutputStreamOperator> kvWordsDS =        lines.flatMap((String line, Collector> collector) -> {    String[] words = line.split(" ");    for (String word : words) {        collector.collect(Tuple2.of(word, 1L));    }}).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.分组统计获取 WordCount 结果kvWordsDS.keyBy(tp->tp.f0).sum(1).print();//5.流式计算中需要最后执行execute方法env.execute();
Scala版本WordCount

使用Flink Scala DataStream api实现WordCount具体代码如下:

//1.创建环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.streaming.api.scala._//3.读取文件val ds: DataStream[String] = env.readTextFile("./data/words.txt")//4.进行wordCount统计ds.flatMap(line=>{line.split(" ")})  .map((_,1))  .keyBy(_._1)  .sum(1)  .print()//5.最后使用execute 方法触发执行env.execute()

以上输出结果开头展示的是处理当前数据的线程,一个Flink应用程序执行时默认的线程数与当前节点cpu的总线程数有关。

3、DataStream BATCH模式

下面使用Java代码使用DataStream API 的Batch 模式来处理批WordCount代码,方式如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置批运行模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);DataStreamSource linesDS = env.readTextFile("./data/words.txt");SingleOutputStreamOperator> wordsDS = linesDS.flatMap(new FlatMapFunction>() {    @Override    public void flatMap(String lines, Collector> out) throws Exception {        String[] words = lines.split(" ");        for (String word : words) {            out.collect(new Tuple2<>(word, 1L));        }    }});wordsDS.keyBy(tp -> tp.f0).sum(1).print();env.execute();

以上代码运行完成之后结果如下,可以看到结果与批处理结果类似,只是多了对应的处理线程号。

3> (hello,15)8> (Flink,10)8> (Spark,1)7> (Java,2)7> (Scala,2)7> (MapReduce,1)

此外,Stream API 中除了可以设置Batch批处理模式之外,还可以设置 AUTOMATIC、STREAMING模式,STREAMING 模式是流模式,AUTOMATIC模式会根据数据是有界流/无界流自动决定采用BATCH/STREAMING模式来读取数据,设置方式如下:

//BATCH 设置批处理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);//AUTOMATIC 会根据有界流/无界流自动决定采用BATCH/STREAMING模式env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//STREAMING 设置流处理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

除了在代码中设置处理模式外,还可以在Flink配置文件(flink-conf.yaml)中设置execution.runtime-mode参数来指定对应的模式,也可以在集群中提交Flink任务时指定execution.runtime-mode来指定,Flink官方建议在提交Flink任务时指定执行模式,这样减少了代码配置给Flink Application提供了更大的灵活性,提交任务指定参数如下:

$FLINK_HOME/bin/flink run -Dexecution.runtime-mode=BATCH -c xxx xxx.jar

推荐内容