Flink基础篇(基础算子+WaterMarker)
创始人
2024-02-07 06:22:35
0

Flink

  • 高可用
    • HA 依赖于zk
    • Flink ON Yarn
      • 两种模式
        • Session模式
        • Per-Job模式
  • 前置说明
  • Flink原理
    • 数据在两个operator算子之间传递的时候有两种模式:
    • Operator Chain
    • TaskSlot And Sharing
  • Flink执行图(ExecutionGraph)
  • API
    • Source
    • Transformation
    • Sink
    • 控制台
    • 自定义Sink
    • Connectors
  • Flink四大基石
    • Window
    • Time And Watermark
      • Time分类
      • WaterMarker![请添加图片描述](/webdata/wwwroot/pics.8red.cn/weishitang/202402/6a976ac37217.png)

高可用

HA 依赖于zk

请添加图片描述

Flink ON Yarn

请添加图片描述
请添加图片描述

两种模式

Session模式

请添加图片描述

Per-Job模式

请添加图片描述

前置说明

请添加图片描述

Flink原理

请添加图片描述
Task 属于进程 TaskSlot属于线程请添加图片描述

  1. DataFlow:Flink程序在执行的时候会被映射成一个数据流程模型
  2. Operator:数据流模型中得每一个操作都被称作为Operator,Operator分为:Source/Transfrom/Sink
  3. Partition:数据流模型是分布式的和并行的,执行中会形成1-n个分区
  4. SubTask:多个分区任务可以并行,每个都是运行在一个线程中的,也就是一个SubTask子任务
  5. Parallelism:并行度,就是可以同时执行的子任务数/分区数

数据在两个operator算子之间传递的时候有两种模式:

  1. One to One模式:
    两个Operatior用此模式传递的时候,会保持数据的分区数和数据的排序;如上图中的Source1到Map1,他就保留的Source的分区特性,以及分区元素处理的有序性
  2. Redistributing模式
    这种模式会改变数据的分区数;每个operator subtask会根据选择transformation把数据发送到不同的目标subtaks,比如keyBy()会通过hashcode重新分区,broadcast()和rebalance()方法会随机重新分区

Operator Chain

请添加图片描述
客户端在提交任务的时候对Operator进行优化操作,能进行合并的Operator会被合并为一个Operator,合并后后的Operator称为Operator Chain,实际上就是一个执行链,
每个执行链会在TaskManager上独立的线程中执行–就是SubTask

TaskSlot And Sharing

请添加图片描述
每个TaskManager是一个JVM的进程,为了控制一个TaskManager(worker)能接收多少个task,Flink通过TaskSlot来控制。TaskSloat数量是用来限制一个TaskManager工作进程中卡可以同时运行多少个工作线程,TaskSlot是一个TaskManager中的最小资源单位,一个TaskManager中有多少个TaskSlot就意味着能支持多少个并发Task处理

Flink将进程的内存进行
了划分到多个slot之后可以获得如下好处:

  • TaskManager最多能同时并发执行的子任务数是可以通过TaskSlot数量控制的
  • TaskSolt有独占的内存空间,这样在一个TaskManager中可以运行多个不同的作业,作业之间不受影响

个人理解:分布式线程池,可复用
请添加图片描述

Flink执行图(ExecutionGraph)

请添加图片描述

API

Source

基于集合

  1. env.fromElements(可变参数)
  2. env.fromCollection(各种集合)
  3. env.generateSequence(开始,结束)
  4. env.fromSequence(开始,结束)

基于文件
env.readTextFile(本地/hdsf/文件夹)
基于Socket
env.socketTextStream(host,port)

自定义
Flink还提供了数据源接口,实现自定义数据源:

  1. SourceFunction:非并行度数据源(并行度只能 = 1)
  2. RichSourceFunction:多功能并行数据源(并行度只能 = 1)
  3. ParallelSourceFunction:并行数据源(并行度能够 >= 1)
  4. RichParallelSourceFunction: 多功能并
    行数据源(并行度能够>=1)–>Kafka数据源使用欧冠的就是该接口

Transformation

  1. map/flatMap/keyBy/filter/reduce
  2. union和connect
    请添加图片描述
    union:只能合并同类型
    connect:可以合并不同类型,之后需要做其他处理,不能输出
  3. Side Outputs 侧流
  4. rebalance 重平衡分区请添加图片描述
  5. 其他分区
    请添加图片描述

Sink

控制台

  1. ds.print() 直接输出控制台
  2. ds.printToErr() 直接输出控制台红色
  3. ds.writeAsText(“本地/hdsf”,WriteMode.OVERWRITE)
    注意输出到path的时候可以在前面设置并行度,如果
    并行度>1,则path为目录
    并行度=1,则path为文件夹

自定义Sink

类似于Source ,4个Sink
  1. SinkFunction:非并行度数据源(并行度只能 = 1)
  2. RichSinkFunction:多功能并行数据源(并行度只能 = 1)
  3. ParallelSinkFunction:并行数据源(并行度能够 >= 1)
  4. RichParallelSinkFunction: 多功能并

Connectors

JDBCSink,KafkaSink,RedisSink

Flink四大基石

请添加图片描述

Window

window分类
请添加图片描述
总结

  1. 基于时间的滚动窗口 tumbling-time-window
  2. 基于时间的滑动窗口 sliding-time-window
  3. 基于数量的滚动窗口 tumbling-count-window
  4. 基于数量的滑动窗口 sliding-count-window
    flink还支持一个特殊的窗口:session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上一个窗口计算

使用keyby的流,应该使用window方法
未使用keyby的流,应该使用windowAll方法

请添加图片描述

请添加图片描述
基于时间

public class CarDemo {/*** 需求*  nc -lk 9999*  有如下数据表示*      信号灯编号和通过该信号灯的车的数量
9,3
9,2
2,3
9,2
3,3
3,2
4,3
9,2要求1:每隔5秒钟统计一次,最近5秒内,各个路口通过红路灯汽车的数量--基于时间的滚动窗口要求2:每隔5秒钟统计一次,最近10秒内,各个路口通过红路灯汽车的数量--基于时间的滑动窗口** @param args*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource stream = env.socketTextStream("hadoop102", 9999);SingleOutputStreamOperator> mapStream = stream.map(new MapFunction>() {@Overridepublic Tuple2 map(String s) throws Exception {String[] split = s.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}});KeyedStream, String> tStream = mapStream.keyBy(t -> t.f0);SingleOutputStreamOperator> tumblingStream = tStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);SingleOutputStreamOperator> slidingStream = tStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);tumblingStream.print("tumbling");slidingStream.printToErr("sliding");env.execute();}
}

基于数量

public class CarDemo {/*** 需求*  nc -lk 9999*  有如下数据表示*      信号灯编号和通过该信号灯的车的数量
9,3
9,2
2,3
9,2
9,2
9,2要求1:统计在最近的5条消息中,各自路口通过的机车数量,相同的key每出现5次进行统计--基于数量的滚动窗口要求2:统计在最近的5条消息中,各自路口通过的机车数量,相同的key每出现3次进行统计--基于时间的滑动窗口** @param args*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource stream = env.socketTextStream("hadoop102", 9999);SingleOutputStreamOperator> mapStream = stream.map(new MapFunction>() {@Overridepublic Tuple2 map(String s) throws Exception {String[] split = s.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}});KeyedStream, String> tStream = mapStream.keyBy(t -> t.f0);SingleOutputStreamOperator> tumblingStream = tStream.countWindow(5).sum(1);SingleOutputStreamOperator> slidingStream = tStream.countWindow(5, 3).sum(1);tumblingStream.print("tumbling");slidingStream.printToErr("sliding");env.execute();}
}

会话窗口

public class CarDemo {/*** 需求*  nc -lk 9999*  有如下数据表示*      信号灯编号和通过该信号灯的车的数量
9,3
9,2
2,3
9,2
9,2
9,2要求1:设置会话超时时间10s,10s内没有数据到来,则触发窗口计算** @param args*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource stream = env.socketTextStream("hadoop102", 9999);SingleOutputStreamOperator> mapStream = stream.map(new MapFunction>() {@Overridepublic Tuple2 map(String s) throws Exception {String[] split = s.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}});KeyedStream, String> tStream = mapStream.keyBy(t -> t.f0);SingleOutputStreamOperator> sum = tStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum(1);sum.printToErr();env.execute();}
}

Time And Watermark

Time分类

请添加图片描述

请添加图片描述
请添加图片描述
请添加图片描述
请添加图片描述
请添加图片描述

WaterMarker请添加图片描述

请添加图片描述
请添加图片描述
请添加图片描述
请添加图片描述
WaterMarker重点请添加图片描述
案例

public class WTOrderDemo {public static void main(String[] args) throws Exception {/*** 需求*  有订单数据,格式: ( 订单Id,用户Id,时间戳/事件时间 ,订单金额)*  要求每隔5s,计算5s内,每个用户的订单总金额**  并添加WaterMaker来解决一定程度的 数据延迟以及数据乱序 问题*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource orderDs = env.addSource(new SourceFunction() {private Boolean flag = true;@Overridepublic void run(SourceContext ct) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(2);int money = random.nextInt(101);long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;Order order = new Order(orderId, userId, money, eventTime);ct.collect(order);Thread.sleep(1000);}}@Overridepublic void cancel() {flag = false;}});// TODO transformation/*** Flink 1.12 版本* 每隔5s计算最近的数据球每个用户的订单总金额,要求:基于时间时间计算+WaterMarker** env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 在新版本中默认就是EventTime* 设置 WaterMarker = 当前最大事件时间 - 最大允许的延时 或 乱序时间*/SingleOutputStreamOperator orderDsWithWatermark = orderDs.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 指定延时时间或乱序时间.withTimestampAssigner(((order, timestamp) -> order.eventTime)));SingleOutputStreamOperator result = orderDsWithWatermark.keyBy(order -> order.userId).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("money");result.print();env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order{public String orderId;public Integer userId;public Integer money;public Long eventTime;}
}

测到输出机制(解决严重迟到问题)
请添加图片描述

public class WTOrderDemo {public static void main(String[] args) throws Exception {/*** 需求*  有订单数据,格式: ( 订单Id,用户Id,时间戳/事件时间 ,订单金额)*  要求每隔5s,计算5s内,每个用户的订单总金额**  并添加WaterMaker来解决一定程度的 数据延迟以及数据乱序 问题**  并使用outputTag + allowedLateness 来解决数据丢失问题(严重的 数据延迟以及数据乱序 问题)*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource orderDs = env.addSource(new SourceFunction() {private Boolean flag = true;@Overridepublic void run(SourceContext ct) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(2);int money = random.nextInt(101);long eventTime = System.currentTimeMillis() - random.nextInt(20) * 1000;Order order = new Order(orderId, userId, money, eventTime);ct.collect(order);Thread.sleep(1000);}}@Overridepublic void cancel() {flag = false;}});// TODO transformation/*** Flink 1.12 版本* 每隔5s计算最近的数据球每个用户的订单总金额,要求:基于时间时间计算+WaterMarker** env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 在新版本中默认就是EventTime* 设置 WaterMarker = 当前最大事件时间 - 最大允许的延时 或 乱序时间*/// TODO 侧道输出机制,解决严重的迟到问题OutputTag orderOutputTag = new OutputTag<>("seriousLate", TypeInformation.of(Order.class));SingleOutputStreamOperator orderDsWithWatermark = orderDs.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 指定延时时间或乱序时间.withTimestampAssigner(((order, timestamp) -> order.eventTime)));SingleOutputStreamOperator result1 = orderDsWithWatermark.keyBy(order -> order.userId).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3)).sideOutputLateData(orderOutputTag).sum("money");DataStream result2 = result1.getSideOutput(orderOutputTag);result1.print("正常的数据");result2.printToErr("迟到的数据");env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order{public String orderId;public Integer userId;public Integer money;public Long eventTime;}
}

相关内容

热门资讯

自由的英文名言 关于自由的英文名言  自由是一个政治哲学概念,在此条件下人类可以自我支配,凭借自由意志而行动,并为自...
礼貌的名人名言 有关礼貌的名人名言  在学习、工作乃至生活中,大家都知道一些经典的名言吧,巧用名言有助于我们正确对待...
赞美老师的名言 赞美老师的名言(精选50句)  名言是一个汉语词汇,基本意思是很出名的说法,著名的话,一般指名人说的...
经典名言录 经典名言录  名言录  1、一寸光阴一寸金,寸金难买寸光阴。——中国谚语  2、浪费别人的时间是谋财...
感恩教师节名言警句 感恩教师节名言警句  在平时的学习、工作或生活中,大家都对那些经典的名言警句很是熟悉吧,在议论文中,...
历史的名言 关于历史的名言大全  1、历史本身是自然史的一个现实的部分,是自然生成为人这一过程的一个现实的部分。...
交通安全名言警句大全   车在轨上行是幸福路,人在轨上走是危险路。如下是中国人才网给大家整理的交通安全名言警句大全,希望对...
朱熹的名言名句 朱熹的名言名句集锦  1、涵养、致和、力行三者,便是以涵养为首,致和次之,力行又次之。  2、心,生...
教师人生格言 教师人生格言大全  创新是人类进步的不竭动力!  当代能做老师的人必定是不平凡的人;因为教育事业本身...
歌颂母爱的名言摘抄 有关歌颂母爱的名言摘抄  在平平淡淡的日常中,大家总免不了要接触或使用名言吧,下面是小编为大家整理的...
毕达哥拉斯名言   毕达哥拉斯名言  1、友谊是一种和谐的平等。  2、要这样生活;使你的朋友不致成为仇人,使你的仇...
伤感的名言 伤感的名言  1.用一转身离开,用一辈子去忘记。  2.明知道天要下雨就该带把伞,明知道不会有结果就...
犯罪心理第七季的励志名言 犯罪心理第七季的励志名言  1.遗伤难愈——伊莉莎白一世  2.若为奇迹,一切证据皆可为之,若为事实...
我的人生格言 我的人生格言我的人生格言如果你的心灵很敞亮,很仁厚,你有一种坦率和勇敢,那么你有可能收获到许多意想不...
蒙田名言   蒙田名言  1、生命的价值不在于时间的长短,而在于你如何利用它。  2、作为一个父亲,最大的乐趣...
中国古代爱情的名言名句 中国古代关于爱情的名言名句(精选115句)  无论是身处学校还是步入社会,许多人都接触或是使用过一些...
人生励志名言 100句人生励志名言精选  1、没有行动的承诺,不过是一席空话。  2、坚持最初的梦想,年轻没有失败...
李嘉诚名言 李嘉诚名言(通用40句)  扩张中不忘谨慎,谨慎中不忘扩张……我讲求的是在稳健与扩张中取得平衡。船要...
目标与理想的名言警句 目标与理想的名言警句  平凡朴实的梦想,我们用那唯一的坚持信念去支撑那梦想,目标与理想的名言警句。以...
张爱玲名人名言 张爱玲名人名言汇总80句精选  人生最可爱就在那一撒手。下面这篇文章是小编收集整理的张爱玲名人名言,...