需求:读取本地数据文件,统计文件中每个单词出现的次数。
(资料图片仅供参考)
本案例编写Flink代码选择语言为Java和Scala,所以这里我们通过IntelliJ IDEA创建一个目录,其中包括Java项目模块和Scala项目模块,将Flink Java api和Flink Scala api分别在不同项目模块中实现。步骤如下:
使用IntelliJ IDEA开发Flink,如果使用Scala api 那么还需在IntelliJ IDEA中安装Scala的插件,如果已经安装可以忽略此步骤,下图为以安装Scala插件。
创建Java模块:
继续点击"+",创建Scala模块:
创建好"FlinkScalaCode"模块后,右键该模块添加Scala框架支持,并修改该模块中的"java"src源为"scala":
在"FlinkScalaCode"模块Maven pom.xml中引入Scala依赖包,这里使用的Scala版本为2.12.10。
org.scala-lang scala-library 2.12.10 org.scala-lang scala-compiler 2.12.10 org.scala-lang scala-reflect 2.12.10
为了方便查看项目运行过程中的日志,需要在两个项目模块中配置log4j.properties配置文件,并放在各自项目src/main/resources资源目录下,没有resources资源目录需要手动创建并设置成资源目录。log4j.properties配置文件内容如下:
log4j.rootLogger=ERROR, consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} %p %c{2}: %m%n
复制
并在两个项目中的Maven pom.xml中添加对应的log4j需要的依赖包,使代码运行时能正常打印结果:
org.slf4j slf4j-log4j12 1.7.36 org.apache.logging.log4j log4j-to-slf4j 2.17.2
"FlinkJavaCode"模块导入Flink Maven依赖如下:
UTF-8 1.8 1.8 1.16.0 1.7.36 2.17.2 org.apache.flink flink-clients ${flink.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.apache.logging.log4j log4j-to-slf4j ${log4j.version}
"FlinkScalaCode"模块导入Flink Maven依赖如下:
UTF-8 1.8 1.8 1.16.0 1.7.31 2.17.1 2.12.10 2.12 org.apache.flink flink-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-clients ${flink.version} org.scala-lang scala-library ${scala.version} org.scala-lang scala-compiler ${scala.version} org.scala-lang scala-reflect ${scala.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.apache.logging.log4j log4j-to-slf4j ${log4j.version}
注意:在后续实现WordCount需求时,Flink Java Api只需要在Maven中导入"flink-clients"依赖包即可,而Flink Scala Api 需要导入以下三个依赖包:
flink-scala_${scala.binary.version}flink-streaming-scala_${scala.binary.version}flink-clients
主要是因为在Flink1.15版本后,Flink添加对opting-out(排除)Scala的支持,如果你只使用Flink的Java api,导入包不必包含scala后缀,如果使用Flink的Scala api,需要选择匹配的Scala版本。
在项目"MyFlinkCode"中创建"data"目录,在目录中创建"words.txt"文件,向文件中写入以下内容,方便后续使用Flink编写WordCount实现代码。
hello Flinkhello MapReducehello Sparkhello Flinkhello Flinkhello Flinkhello Flinkhello Javahello Scalahello Flinkhello Javahello Flinkhello Scalahello Flinkhello Flinkhello Flink
数据源分为有界和无界之分,有界数据源可以编写批处理程序,无界数据源可以编写流式程序。DataSet API用于批处理,DataStream API用于流式处理。
批处理使用ExecutionEnvironment和DataSet,流式处理使用StreamingExecutionEnvironment和DataStream。DataSet和DataStream是Flink中表示数据的特殊类,DataSet处理的数据是有界的,DataStream处理的数据是无界的,这两个类都是不可变的,一旦创建出来就无法添加或者删除数据元。
使用Flink Java Dataset api实现WordCount具体代码如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1.读取文件DataSource linesDS = env.readTextFile("./data/words.txt");//2.切分单词FlatMapOperator wordsDS = linesDS.flatMap((String lines, Collector collector) -> { String[] arr = lines.split(" "); for (String word : arr) { collector.collect(word); }}).returns(Types.STRING);//3.将单词转换成Tuple2 KV 类型MapOperator> kvWordsDS = wordsDS.map(word -> new Tuple2<>(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.按照key 进行分组处理得到最后结果并打印kvWordsDS.groupBy(0).sum(1).print();
Scala版本WordCount
使用Flink Scala Dataset api实现WordCount具体代码如下:
//1.准备环境,注意是Scala中对应的Flink环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.api.scala._//3.读取数据文件val linesDS: DataSet[String] = env.readTextFile("./data/words.txt")//4.进行 WordCount 统计并打印linesDS.flatMap(line => { line.split(" ")}) .map((_, 1)) .groupBy(0) .sum(1) .print()
以上无论是Java api 或者是Scala api 输出结果如下,显示的最终结果是统计好的单词个数。
(hello,15)(Spark,1)(Scala,2)(Java,2)(MapReduce,1)(Flink,10)
使用Flink Java DataStream api实现WordCount具体代码如下:
//1.创建流式处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.读取文件数据DataStreamSource lines = env.readTextFile("./data/words.txt");//3.切分单词,设置KV格式数据SingleOutputStreamOperator> kvWordsDS = lines.flatMap((String line, Collector> collector) -> { String[] words = line.split(" "); for (String word : words) { collector.collect(Tuple2.of(word, 1L)); }}).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.分组统计获取 WordCount 结果kvWordsDS.keyBy(tp->tp.f0).sum(1).print();//5.流式计算中需要最后执行execute方法env.execute();
Scala版本WordCount使用Flink Scala DataStream api实现WordCount具体代码如下:
//1.创建环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.streaming.api.scala._//3.读取文件val ds: DataStream[String] = env.readTextFile("./data/words.txt")//4.进行wordCount统计ds.flatMap(line=>{line.split(" ")}) .map((_,1)) .keyBy(_._1) .sum(1) .print()//5.最后使用execute 方法触发执行env.execute()
以上输出结果开头展示的是处理当前数据的线程,一个Flink应用程序执行时默认的线程数与当前节点cpu的总线程数有关。
下面使用Java代码使用DataStream API 的Batch 模式来处理批WordCount代码,方式如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置批运行模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);DataStreamSource linesDS = env.readTextFile("./data/words.txt");SingleOutputStreamOperator> wordsDS = linesDS.flatMap(new FlatMapFunction>() { @Override public void flatMap(String lines, Collector> out) throws Exception { String[] words = lines.split(" "); for (String word : words) { out.collect(new Tuple2<>(word, 1L)); } }});wordsDS.keyBy(tp -> tp.f0).sum(1).print();env.execute();
以上代码运行完成之后结果如下,可以看到结果与批处理结果类似,只是多了对应的处理线程号。
3> (hello,15)8> (Flink,10)8> (Spark,1)7> (Java,2)7> (Scala,2)7> (MapReduce,1)
此外,Stream API 中除了可以设置Batch批处理模式之外,还可以设置 AUTOMATIC、STREAMING模式,STREAMING 模式是流模式,AUTOMATIC模式会根据数据是有界流/无界流自动决定采用BATCH/STREAMING模式来读取数据,设置方式如下:
//BATCH 设置批处理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);//AUTOMATIC 会根据有界流/无界流自动决定采用BATCH/STREAMING模式env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//STREAMING 设置流处理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
除了在代码中设置处理模式外,还可以在Flink配置文件(flink-conf.yaml)中设置execution.runtime-mode参数来指定对应的模式,也可以在集群中提交Flink任务时指定execution.runtime-mode来指定,Flink官方建议在提交Flink任务时指定执行模式,这样减少了代码配置给Flink Application提供了更大的灵活性,提交任务指定参数如下:
$FLINK_HOME/bin/flink run -Dexecution.runtime-mode=BATCH -c xxx xxx.jar