MENU

Flink - 自己总结了一些学习笔记

• May 6, 2022 • 教程

本篇文章较长,建议通过目录快速定位你感兴趣的内容!

DataSet/Stream API

1.1 Environment

1.1.1 getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

// 初始化环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是 1

1.2 Source

1.2.1 基于本地集合的source

在一个本地内存中,生成一个集合作为Flink处理的source。
离线处理代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object ListSource {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive hbase"))
listDataSet.print()
}
}

实时处理代码如下:

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object ListSourceStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val listDataStream: DataStream[String] = env.fromCollection(List("hadoop spark","hive hbase"))
listDataStream.print()
env.execute("ListSourceStream is runned")
}
}

1.2.2 基于本地文件的source

导入本地文本数据作为数据源。
离线处理代码如下:

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

object FileSource {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val fileDataSet = env.readTextFile("C:\\Users\\thinkpad\\Desktop\\words.txt")
fileDataSet.print()
}
}

实时处理代码如下:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object FileSourceStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val fileDataStream: DataStream[String] = env.readTextFile("C:\\Users\\thinkpad\\Desktop\\words.txt")
fileDataStream.print()
env.execute("FileSourceStream is runned")
}
}

1.2.3 基于HDFS的source

读取hdfs文件,作为数据源。
离线处理代码如下:

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

object hdfsSource {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val hdfsDataSet: DataSet[String] = env.readTextFile("hdfs://linux01:9000/a.txt")
hdfsDataSet.print()
}
}

实时处理代码如下:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object hdfsSourceStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val hdfsDataStream: DataStream[String] = env.readTextFile("hdfs://linux01:9000/a.txt")
hdfsDataStream.print()
env.execute("hdfsSourceStream is runned")
}
}

1.2.4 基于 kafka 消息队列的source

处理代码如下:

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.api.scala._

object kafkaSourceStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
props.setProperty("group.id", "consumer-group")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("auto.offset.reset", "latest")
//SimpleStringSchema反序列化工具
val kafkaDataStream: DataStream[String] = 
env.addSource(new FlinkKafkaConsumer010[String]("test",new SimpleStringSchema(),props))
kafkaDataStream.print()
env.execute(“kafkaSourceStream is runned”)
}
}

1.2.5 自定义 Source作为数据源

除了以上的source数据来源,我们还可以自定义source,只是继承SourceFunction即可。
自定义source代码如下:

import org.apache.flink.streaming.api.functions.source.SourceFunction

class MySource extends SourceFunction[String] {
//定义标志位用来标记是否正常运行
var running = true

override def cancel(): Unit = {
running = false
}

override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
val data: Range.Inclusive = 1.to(10)
while (running) {
data.foreach(t => {
  sourceContext.collect(t.toString)
})
}
}
}

调用自定义source代码如下:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object DefineSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val defineSource: DataStream[String] = env.addSource(new MySource())
defineSource.print()
env.execute("DefineSource is runned")
}
}

1.3 Sink

sink 也就是Flink运行完后,最终要将数据输出到哪儿。

1.3.1基于本地内存集合的sink

将数据最终输出到内存中的集合中。
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object listSink {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark","hive"))
val list: Seq[String] = listDataSet.collect()
list.foreach(println(_))
}
}

1.3.2基于本地文件的sink
将结果输出到本地文件系统中。
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object fileSink {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val fileDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark"))
fileDataSet.writeAsText("C:\\Users\\thinkpad\\Desktop\\print.txt")
env.execute("fileSink is runned")
}
}

1.3.3基于HDFS文件系统的sink

将结果输出到hdfs文件系统中。
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object hdfsSink {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val hdfsDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark"))
hdfsDataSet.writeAsText("hdfs://linux01:9000/hdfsSink")
env.execute("hdfsSink is runned")
}
}

1.3.4基于Kafka消息队列的sink

将结果输出到kafka文件系统中,用flink作为kafka的生产者。
示例代码如下:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010

object kafkaSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
props.setProperty("group.id", "consumer-group")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("auto.offset.reset", "latest")
val listDataStream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
listDataStream.addSink(new FlinkKafkaProducer010[String]("linux01:9092,linux02:9092,linux03:9092","test",new SimpleStringSchema()))
env.execute("kafkaSink is runned")
}
}

1.3.5基于JDBC自定义sink

将计算结果存储到关系数据库中,如mysql等。
导入依赖:

 <dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.47</version>
</dependency>

实现MyJdbcSink类,继承RichSinkFunction,用来是实现保存到mysql中调用的命令。

import java.sql
import java.sql.DriverManager
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
//为什么继承的是富函数
class MyJdbcSink extends RichSinkFunction[String] {
//定义连接参数成员属性
var conn: Connection = _
var prepare: PreparedStatement = _

//打开连接
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata",
"root", "root")
prepare= conn.prepareStatement("INSERT INTO infoTest VALUES (?, ?)")
}

//执行sql语句
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
prepare.setString(1,value)
prepare.setString(2,value)
prepare.execute()
}

//关闭资源
override def close(): Unit = {
prepare.close()
conn.close()
}
}

将结果写入mysql,调用自定义mysql类,代码如下:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object mysqlSInk {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val listDataSream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
listDataSream.addSink(new MyJdbcSink())
env.execute("mysqlSInk is runned")
}
}

1.3.5基于Redis非关系型数据库的sink

将计算结果存储到redis非关系数据库中。
导入flink-redis依赖:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.0</version>
</dependency>

定义一个redis的mapper类,继承RedisMapper类,用于定义保存到 redis时调用的命令,代码如下:

import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

class MyRedisMapper extends RedisMapper[String]{
//定义保存到redis中的命令
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET,"redis")
}

override def getKeyFromData(t: String): String = {
t.hashCode.toString
}

override def getValueFromData(t: String): String = {
t
}
}

将结果输入到redis代码如下:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig

object RedisSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val listDataStream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).setDatabase(0).build()
listDataStream.addSink(new RedisSink[String](conf,new MyRedisMapper))
env.execute("RedisSink is runned")
}
}

1.4 Transform

在flink中有类似于spark的一类转换算子,就是transform,在Flink的编程体系中,我们获取到数据源之后,需要经过一系列的处理即transformation操作,再将最终结果输出到目的Sink使数据落地。

常用的transform转换算子如下:

Transformation说明
map将DataSet中的每一个元素转换为另外一个元素
flatMap将DataSet中的每一个元素转换为0...n个元素
mapPartition将一个分区中的元素转换为另一个元素
filter过滤出来一些符合条件的元素
reduce可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素
reduceGroup将一个dataset或者一个group聚合成一个或多个元素
aggregate按照内置的方式来进行聚合。例如:SUM/MIN/MAX..
distinct去重
join将两个DataSet按照一定条件连接到一起,形成新的DataSet
union将两个DataSet取并集,并自动进行去重
KeyBy逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的
Split根据某些特征把一个 DataStream 拆分成两个或者多个
Select从一个 SplitStream 中获取一个或者多个 DataStream
Connect连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
CoMap,CoFlatMap跟map and flatMap类似,只不过作用在ConnectedStreams上
rebalance让每个分区的数据均匀分布,避免数据倾斜
partitionByHash按照指定的key进行hash分区
sortPartition指定字段对分区中的数据进行排序

1.4.1 map

将DataSet中的每一个元素转换为另外一种形式的元素
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[Int] = env.fromCollection(List(1,2,3))
    val result: DataSet[Int] = listDataSet.map(_*2)
    result.print()
  }
}

1.4.2 flatMap

flatMap也是一种类似于遍历循环,是将每一个元素按照特定的标识切分,变成多个元素。
如将集合中每个元素按照空格切分。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive mysql","hbase kafka"))
    val result: DataSet[String] = listDataSet.flatMap(_.split(" "))
    result.print()
  }
}

1.4.3 mapPartition

mapPartition:中的函数是在每个分区运行一次

map :每个元素运行一次

mapPartition是按照分区进行处理数据,传入是一个迭代,是将分区中的元素进行转换,map 和 mapPartition 的效果是一样的,但如果在map的函数中,需要访问一些外部存储。例如:
访问mysql数据库,需要打开连接,此时map效率较低。而使用 mapPartition 可以有效减少连接数,提高效率。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive mysql","hbase kafka"))
    val result: DataSet[String] = listDataSet.mapPartition(iter => {
      iter.flatMap(_.split(" "))
    })
    result.print()
  }
}

1.4.4 Filter

filter是遍历循环dataset中每一个元素,filter中满足表达式的过滤出来,不满足表达式的过滤掉。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive","hbase kafka"))
    val result: DataSet[String] = listDataSet.filter(_.length>=5)
    result.print()
  }
}

1.4.5 reduce

reduce是对一个 dataset 或者一个 group 来进行聚合计算,按照表达逻辑最终聚合成一个元素。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[Int] = env.fromCollection(List(1,2,3,4))
    val result = listDataSet.reduce(_+_)
    result.print()
  }
}

Window操作

2.1 Window概述

streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有

2.2 Window类型

Window 可以分成两类:CountWindow:按照指定的数据条数生成一个 Window,与时间无关;TimeWindow:按照时间生成 Window。

2.2.1 CountWindow

CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。

注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。

(1) 滚动窗口

默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当相同key元素数量达到窗口大小时,就会触发窗口的执行。

object Windows {
    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val file: DataStream[String] = env.socketTextStream("node01",9999)
        val countStream: DataStream[(String, Int)] = file.flatMap(_.split(" "))
            .map((_, 1))
            .keyBy(0)
            .countWindow(2)
            .sum(1)
        countStream.print()
        env.execute("Windows is runned")
    }
}

(2) 滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。

下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 5 个元素。

object Windows {
    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val file: DataStream[String] = env.socketTextStream("node01",9999)
        val countStream: DataStream[(String, Int)] = file.flatMap(_.split(" "))
            .map((_, 1))
            .keyBy(0)
            .countWindow(5,2)
            .sum(1)
        countStream.print()
        env.execute("Windows is runned")
    }
}

2.2.2 TimeWindow

对于 TimeWindow,可以根据窗口实现原理的不同分成三类:

  • 滚动窗口(Tumbling Window)

将数据依据固定的窗口长度对数据进行切片。
特点:时间对齐,窗口长度固定,没有重叠。所有的数据只能落在一个窗口里面
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗口

适用场景: 适合做 BI 统计等(做每个时间段的聚合计算)。

  • 滑动窗口(Sliding Window)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。一次数据统计的时间长度 每次统计移动多长的时间
特点:时间对齐,窗口长度固定,可以有重叠。一个数据可以被统计多次,滑动间隔、窗口长度是某个数值的整数倍
滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。
例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据

  • 会话窗口(Session Window)

电商网站: 登录一个系统之后,多长时间没有操作,session就失效。

手机银行: 登录一个系统之后,多长时间没有操作,session就失效要求重新登录。

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点: 时间无对齐。多长时间之内没有收到数据,这个不是人为能规定的。

session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去

2.2 Window Function

window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:

2.2.1 增量聚合函数(incremental aggregation functions)

每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。

2.2.2 全窗口函数(full window functions)

先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。

2.3 Window 其他操作

2.3.1 trigger()

触发器 定义 window 什么时候关闭,触发计算并输出结果

2.3.2 evitor()

移除器 定义移除某些数据的逻辑

2.3.3 allowedLateness()

允许处理迟到的数据

2.3.4 sideOutputLateData()

将迟到的数据放入侧输出流

2.3.5 getSideOutput()

获取侧输出流

Table&SQL

3.1 概述

Table API是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义的,具有 IDE 支持如:自动完成和语法检测;允许以非常直观的方式组合关系运算符的查询,例如 select,filter 和 join。Flink SQL 的支持是基于实现了SQL标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。

3.2 Table API

3.2.1 依赖

<!-- flink-table&sql -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table</artifactId>
    <version>1.9.1</version>
    <type>pom</type>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.9.1</version>
</dependency>

3.2.2 TableEnvironment

TableEnvironment 是 Table API 和SQL集成的核心概念,它负责:

  • 在内部目录中注册表
  • 注册外部目录
  • 执行SQL查询
  • 注册用户定义的函数
  • DataStream 或 DataSet 转换为 Table
  • 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
    Table总是与特定的TableEnvironment 绑定。不能在同一查询中组合不同 TableEnvironments 的表(例如,union 或 join)。

创建 TableEnvironment:

// 基于流的tableEnv
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = StreamTableEnvironment.create(sEnv)
// 基于批的bTableEnv
val bEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val bTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(bEnv)

3.2.3 数据加载

数据加载通常有两种:一者基于流/批,一者基于TableSource,但是后者在Flink1.11中已经被废弃,所以不建议使用。

基于批

case class Student(id:Int,name:String,age:Int,gender:String,course:String,score:Int)
object FlinkBatchTableOps {
    def main(args: Array[String]): Unit = {
        //构建batch的executionEnvironment
        val env = ExecutionEnvironment.getExecutionEnvironment
        val bTEnv = BatchTableEnvironment.create(env)
        val dataSets: DataSet[Student] = env.readCsvFile[Student]("E:\\data\\student.csv",
            //是否忽略文件的第一行数据(主要考虑表头数据)
            ignoreFirstLine = true,
            //字段之间的分隔符
            fieldDelimiter = "|")
        //table 就相当于sparksql中的dataset
        val table: Table = bTEnv.fromDataSet(dataSets)
        //条件查询
        val result: Table = table.select("name,age").where("age=25")
        //打印输出
        bTEnv.toDataSet[Row](result).print()
    }
}

基于流

case class Goods(id: Int,brand:String,category:String)
object FlinkStreamTableOps {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val sTEnv = StreamTableEnvironment.create(env)
        val dataStream: DataStream[Goods] = env.fromElements(
            "001|mi|mobile",
            "002|mi|mobile",
            "003|mi|mobile",
            "004|mi|mobile",
            "005|huawei|mobile",
            "006|huawei|mobile",
            "007|huawei|mobile",
            "008|Oppo|mobile",
            "009|Oppo|mobile",
            "010|uniqlo|clothing",
            "011|uniqlo|clothing",
            "012|uniqlo|clothing",
            "013|uniqlo|clothing",
            "014|uniqlo|clothing",
            "015|selected|clothing",
            "016|selected|clothing",
            "017|selected|clothing",
            "018|Armani|clothing",
            "019|lining|sports",
            "020|nike|sports",
            "021|adidas|sports",
            "022|nike|sports",
            "023|anta|sports",
            "024|lining|sports"
        ).map(line => {
            val fields = line.split("\\|")
            Goods(fields(0), fields(1), fields(2))
        })
        //load data from external system
        var table = sTEnv.fromDataStream(dataStream)
        // stream table api
        table.printSchema()
        // 高阶api的操作
        table = table.select("category").distinct()
        /*
            将一个table转化为一个DataStream的时候,有两种选择:
            1. toAppendStream  :在没有聚合操作的时候使用
            2. toRetractStream(缩放的含义) :在进行聚合操作之后使用
         */
        sTEnv.toRetractStream[Row](table).print()
        env.execute("FlinkStreamTableOps")
    }
}

3.2.4 sqlQuery

sql仍然是最主要的分析工具,使用dsl当然也能完成业务分析,但是灵活性,简易性上都不及sql。FlinkTable通过sqlQuery来完成sql的查询操作。

object FlinkSQLOps {
    def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val sTEnv = BatchTableEnvironment.create(env)
        val dataStream: DataSet[Goods] = env.fromElements(
            "001|mi|mobile",
            "002|mi|mobile",
            "003|mi|mobile",
            "004|mi|mobile",
            "005|huawei|mobile",
            "006|huawei|mobile",
            "007|huawei|mobile",
            "008|Oppo|mobile",
            "009|Oppo|mobile",
            "010|uniqlo|clothing",
            "011|uniqlo|clothing",
            "012|uniqlo|clothing",
            "013|uniqlo|clothing",
            "014|uniqlo|clothing",
            "015|selected|clothing",
            "016|selected|clothing",
            "017|selected|clothing",
            "018|Armani|clothing",
            "019|lining|sports",
            "020|nike|sports",
            "021|adidas|sports",
            "022|nike|sports",
            "023|anta|sports",
            "024|lining|sports"
        ).map(line => {
            val fields = line.split("\\|")
            Goods(fields(0), fields(1), fields(2))
        })
        //load data from external system
        sTEnv.registerTable("goods", dataStream)
        //sql操作
        var sql =
            """
              |select
              |   id,
              |   brand,
              |   category
              |from goods
              |""".stripMargin
        sql =
            """
              |select
              |   category,
              |   count(1) counts
              |from goods
              |group by category
              |order by counts desc
              |""".stripMargin
        table = sTEnv.sqlQuery(sql)
        sTEnv.toDataSet[Row](table).print()
    }
}

3.2.5 基于滚动窗口的Table操作

基于EventTIme滑动窗口操作

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Table
import org.apache.flink.types.Row
//基于滚动窗口Table操作
object FlinkTrumblingWindowTableOps {
    def main(args: Array[String]): Unit = {
        //1、获取流式执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
        // 2、获取table执行环境
        val tblEnv = StreamTableEnvironment.create(env)
        //3、获取数据源
        //输入数据:
        val ds = env.socketTextStream("node01", 9999)
                .map(line => {
                    val fields = line.split("\t")
                    UserLogin(fields(0), fields(1), fields(2), fields(3).toInt,
 fields(4))
                })
                .assignTimestampsAndWatermarks(
                    new BoundedOutOfOrdernessTimestampExtractor[UserLogin](Time.seconds(2)) {
                        override def extractTimestamp(userLogin: UserLogin): Long = {
                            userLogin.dataUnix * 1000
                        }
                    }
                )
        //4、将DataStream转换成table
        //引入隐式
        //某天每隔2秒的输入记录条数:
        import org.apache.flink.table.api.scala._
        val table: Table = tblEnv.fromDataStream[UserLogin](ds , 'platform, 'server, 'status, 'ts.rowtime)
//        tblEnv.toAppendStream[Row](table).print()
        tblEnv.sqlQuery(
            s"""
               |select
               |platform,
               |count(1) counts
               |from ${table}
               |where status = 'LOGIN'
               |group by platform, tumble(ts,interval '2' second)
               |""".stripMargin)
            .toAppendStream[Row]
            .print("每隔2秒不同平台登录用户->")
        env.execute()
    }
}
/** 用户登录
  *
  * @param platform 所在平台 id(e.g. H5/IOS/ADR/IOS_YY)
  * @param server   所在游戏服 id
  * @param uid      用户唯一 id
  * @param dataUnix 事件时间/s 时间戳
  * @param status   登录动作(LOGIN/LOGOUT)
  */
case class UserLogin(platform: String, server: String, uid: String,  dataUnix: Int, status: String)

基于窗口的processTime

object FlinkTrumblingWindowTableOps2 {
    def main(args: Array[String]): Unit = {
        //1、获取流式执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        // 2、获取table执行环境
        val tblEnv = StreamTableEnvironment.create(env)
        //3、获取数据源
        //输入数据:
        val ds = env.socketTextStream("node01", 9999)
                .map(line => {
                    val fields = line.split("\t")
                    UserLogin(fields(0), fields(1), fields(2), fields(3).toInt, fields(4))
                })
        //4、将DataStream转换成table
        //引入隐式
        //某天每隔2秒的输入记录条数:
        import org.apache.flink.table.api.scala._
        val table: Table = tblEnv.fromDataStream[UserLogin](ds , 'platform, 'server, 'status, 'ts.proctime)
        // tblEnv.toAppendStream[Row](table).print()
        tblEnv.sqlQuery(
            s"""
               |select
               |  platform,
               |  count(1) counts
               |from ${table}
               |where status = 'LOGIN'
               |group by platform, tumble(ts,interval '2' second)
               |""".stripMargin)
            .toAppendStream[Row]
            .print("prcotime-每隔2秒不同平台登录用户->")
        env.execute()
    }
}

3.3 Flink Table UDF

3.3.1 说明

自定义标量函数(User Defined Scalar Function)。一行输入一行输出。

3.3.2 数据

某个用户在某个时刻浏览了某个商品,以及商品的价值

{"userID": 2, "eventTime": "2020-10-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}

3.3.3 需求

  • UDF时间转换
  • UDF需要继承ScalarFunction抽象类,主要实现eval方法。
  • 自定义UDF,实现将eventTime转化为时间戳

3.3.4 实现

object FlinkTableUDFOps {
    def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val bTEnv = BatchTableEnvironment.create(env)
        val ds = env.fromElements(
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:00\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:02\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:10\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:12\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:15\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:16\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}"
        ).map(line => {
            val jsonObj = new JSONObject(line)
            val userID = jsonObj.getInt("userID")
            val eventTime = jsonObj.getString("eventTime")
            val eventType = jsonObj.getString("eventType")
            val productID = jsonObj.getString("productID")
            val productPrice = jsonObj.getDouble("productPrice")
            UserBrowseLog(userID, eventTime, eventType, productID, productPrice)
        })
        //自定义udf
        bTEnv.registerFunction("to_time", new TimeScalarFunction())
        bTEnv.registerFunction("myLen", new LenScalarFunction())
        val table = bTEnv.fromDataSet(ds)
        val sql =
            s"""
              |select
              |  userID,
              |  eventTime,
              |  myLen(eventTime) my_len_et,
              |  to_time(eventTime) timestamps
              |from ${table}
              |""".stripMargin
        val ret = bTEnv.sqlQuery(sql)

        bTEnv.toDataSet[Row](ret).print
    }
}
case class UserBrowseLog(
    userID: Int,
    eventTime: String,
    eventType: String,
    productID: String,
    productPrice: Double
)

/*
    自定义类去扩展ScalarFunction 复写其中的方法:eval
    at least one method named 'eval' which is public, not
 */
class TimeScalarFunction extends ScalarFunction {
    //2020-10-01 10:02:16
    private val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    def eval(eventTime: String): Long = {
        df.parse(eventTime).getTime
    }
}

class LenScalarFunction extends ScalarFunction {
    //2020-10-01 10:02:16
    def eval(str: String): Int = {
        str.length
    }
}
Archives QR Code
QR Code for this page
Tipping QR Code