Task 属于进程 TaskSlot属于线程
客户端在提交任务的时候对Operator进行优化操作,能进行合并的Operator会被合并为一个Operator,合并后后的Operator称为Operator Chain,实际上就是一个执行链,
每个执行链会在TaskManager上独立的线程中执行–就是SubTask
每个TaskManager是一个JVM的进程,为了控制一个TaskManager(worker)能接收多少个task,Flink通过TaskSlot来控制。TaskSloat数量是用来限制一个TaskManager工作进程中卡可以同时运行多少个工作线程,TaskSlot是一个TaskManager中的最小资源单位,一个TaskManager中有多少个TaskSlot就意味着能支持多少个并发Task处理
Flink将进程的内存进行
了划分到多个slot之后可以获得如下好处:
个人理解:分布式线程池,可复用
基于集合
基于文件
env.readTextFile(本地/hdsf/文件夹)
基于Socket
env.socketTextStream(host,port)
自定义
Flink还提供了数据源接口,实现自定义数据源:
- SourceFunction:非并行度数据源(并行度只能 = 1)
- RichSourceFunction:多功能并行数据源(并行度只能 = 1)
- ParallelSourceFunction:并行数据源(并行度能够 >= 1)
- RichParallelSourceFunction: 多功能并
行数据源(并行度能够>=1)–>Kafka数据源使用欧冠的就是该接口
类似于Source ,4个Sink
- SinkFunction:非并行度数据源(并行度只能 = 1)
- RichSinkFunction:多功能并行数据源(并行度只能 = 1)
- ParallelSinkFunction:并行数据源(并行度能够 >= 1)
- RichParallelSinkFunction: 多功能并
JDBCSink,KafkaSink,RedisSink
window分类
总结
- 基于时间的滚动窗口 tumbling-time-window
- 基于时间的滑动窗口 sliding-time-window
- 基于数量的滚动窗口 tumbling-count-window
- 基于数量的滑动窗口 sliding-count-window
flink还支持一个特殊的窗口:session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上一个窗口计算
使用keyby的流,应该使用window方法
未使用keyby的流,应该使用windowAll方法
基于时间
public class CarDemo {/*** 需求* nc -lk 9999* 有如下数据表示* 信号灯编号和通过该信号灯的车的数量
9,3
9,2
2,3
9,2
3,3
3,2
4,3
9,2要求1:每隔5秒钟统计一次,最近5秒内,各个路口通过红路灯汽车的数量--基于时间的滚动窗口要求2:每隔5秒钟统计一次,最近10秒内,各个路口通过红路灯汽车的数量--基于时间的滑动窗口** @param args*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource stream = env.socketTextStream("hadoop102", 9999);SingleOutputStreamOperator> mapStream = stream.map(new MapFunction>() {@Overridepublic Tuple2 map(String s) throws Exception {String[] split = s.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}});KeyedStream, String> tStream = mapStream.keyBy(t -> t.f0);SingleOutputStreamOperator> tumblingStream = tStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);SingleOutputStreamOperator> slidingStream = tStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);tumblingStream.print("tumbling");slidingStream.printToErr("sliding");env.execute();}
}
基于数量
public class CarDemo {/*** 需求* nc -lk 9999* 有如下数据表示* 信号灯编号和通过该信号灯的车的数量
9,3
9,2
2,3
9,2
9,2
9,2要求1:统计在最近的5条消息中,各自路口通过的机车数量,相同的key每出现5次进行统计--基于数量的滚动窗口要求2:统计在最近的5条消息中,各自路口通过的机车数量,相同的key每出现3次进行统计--基于时间的滑动窗口** @param args*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource stream = env.socketTextStream("hadoop102", 9999);SingleOutputStreamOperator> mapStream = stream.map(new MapFunction>() {@Overridepublic Tuple2 map(String s) throws Exception {String[] split = s.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}});KeyedStream, String> tStream = mapStream.keyBy(t -> t.f0);SingleOutputStreamOperator> tumblingStream = tStream.countWindow(5).sum(1);SingleOutputStreamOperator> slidingStream = tStream.countWindow(5, 3).sum(1);tumblingStream.print("tumbling");slidingStream.printToErr("sliding");env.execute();}
}
会话窗口
public class CarDemo {/*** 需求* nc -lk 9999* 有如下数据表示* 信号灯编号和通过该信号灯的车的数量
9,3
9,2
2,3
9,2
9,2
9,2要求1:设置会话超时时间10s,10s内没有数据到来,则触发窗口计算** @param args*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource stream = env.socketTextStream("hadoop102", 9999);SingleOutputStreamOperator> mapStream = stream.map(new MapFunction>() {@Overridepublic Tuple2 map(String s) throws Exception {String[] split = s.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}});KeyedStream, String> tStream = mapStream.keyBy(t -> t.f0);SingleOutputStreamOperator> sum = tStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum(1);sum.printToErr();env.execute();}
}
WaterMarker重点
案例
public class WTOrderDemo {public static void main(String[] args) throws Exception {/*** 需求* 有订单数据,格式: ( 订单Id,用户Id,时间戳/事件时间 ,订单金额)* 要求每隔5s,计算5s内,每个用户的订单总金额** 并添加WaterMaker来解决一定程度的 数据延迟以及数据乱序 问题*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource orderDs = env.addSource(new SourceFunction() {private Boolean flag = true;@Overridepublic void run(SourceContext ct) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(2);int money = random.nextInt(101);long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;Order order = new Order(orderId, userId, money, eventTime);ct.collect(order);Thread.sleep(1000);}}@Overridepublic void cancel() {flag = false;}});// TODO transformation/*** Flink 1.12 版本* 每隔5s计算最近的数据球每个用户的订单总金额,要求:基于时间时间计算+WaterMarker** env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 在新版本中默认就是EventTime* 设置 WaterMarker = 当前最大事件时间 - 最大允许的延时 或 乱序时间*/SingleOutputStreamOperator orderDsWithWatermark = orderDs.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 指定延时时间或乱序时间.withTimestampAssigner(((order, timestamp) -> order.eventTime)));SingleOutputStreamOperator result = orderDsWithWatermark.keyBy(order -> order.userId).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("money");result.print();env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order{public String orderId;public Integer userId;public Integer money;public Long eventTime;}
}
测到输出机制(解决严重迟到问题)
public class WTOrderDemo {public static void main(String[] args) throws Exception {/*** 需求* 有订单数据,格式: ( 订单Id,用户Id,时间戳/事件时间 ,订单金额)* 要求每隔5s,计算5s内,每个用户的订单总金额** 并添加WaterMaker来解决一定程度的 数据延迟以及数据乱序 问题** 并使用outputTag + allowedLateness 来解决数据丢失问题(严重的 数据延迟以及数据乱序 问题)*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource orderDs = env.addSource(new SourceFunction() {private Boolean flag = true;@Overridepublic void run(SourceContext ct) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(2);int money = random.nextInt(101);long eventTime = System.currentTimeMillis() - random.nextInt(20) * 1000;Order order = new Order(orderId, userId, money, eventTime);ct.collect(order);Thread.sleep(1000);}}@Overridepublic void cancel() {flag = false;}});// TODO transformation/*** Flink 1.12 版本* 每隔5s计算最近的数据球每个用户的订单总金额,要求:基于时间时间计算+WaterMarker** env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 在新版本中默认就是EventTime* 设置 WaterMarker = 当前最大事件时间 - 最大允许的延时 或 乱序时间*/// TODO 侧道输出机制,解决严重的迟到问题OutputTag orderOutputTag = new OutputTag<>("seriousLate", TypeInformation.of(Order.class));SingleOutputStreamOperator orderDsWithWatermark = orderDs.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 指定延时时间或乱序时间.withTimestampAssigner(((order, timestamp) -> order.eventTime)));SingleOutputStreamOperator result1 = orderDsWithWatermark.keyBy(order -> order.userId).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3)).sideOutputLateData(orderOutputTag).sum("money");DataStream result2 = result1.getSideOutput(orderOutputTag);result1.print("正常的数据");result2.printToErr("迟到的数据");env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order{public String orderId;public Integer userId;public Integer money;public Long eventTime;}
}