专业的编程技术博客社区

网站首页 > 博客文章 正文

Apache Flink中DataStream基本转换

baijin 2025-03-17 15:48:37 博客文章 5 ℃ 0 评论

很多刚刚接触Flink的人会被其中各种类型的stream绕晕,DataStream作为最基础的流处理类是Stream流转换的中心,通过一些方法可以和其它类型的流相互转换,今天来说一下它们是如何互相转换。

1、Stream类型介绍:

flink中的stream主要有以下几种类型:

  • DataStream:表示无界的数据流,可以包含任何类型的数据元素,可以使用各种算子进行转换和处理。
  • KeyedStream:表示按照指定的key或字段进行分区的DataStream,可以使用聚合、分组、窗口等算子进行操作。
  • ConnectedStream:表示连接两个类型不同的DataStream的结果,可以使用coMap或coFlatMap等算子对两个源数据流进行不同的处理。
  • WindowedStream:表示按照时间或者数量进行划分的KeyedStream,可以使用窗口函数对每个窗口内的数据进行计算。
  • SplitStream:表示按照指定的条件将一个DataStream拆分为多个DataStream的结果,可以使用select算子选择其中一个或多个拆分流。
  • JoinedStreams:表示两个KeyedStream按照相同的key进行连接的结果,可以使用where、equalTo和join等算子指定连接条件和函数。
  • CoGroupedStreams:表示两个KeyedStream按照相同的key进行分组的结果,可以使用where、equalTo和coGroup等算子指定分组条件和函数。


2、转换方法:

keyBy转换:将一个datastream按照指定的key或字段进行分区,得到一个keyedstream。keyedstream中具有相同key的元素会被分配到同一个算子实例中。

聚合转换:将一个keyedstream按照指定的聚合函数(如sum,min,max等)进行滚动计算,得到一个datastream。datastream中的每个元素是一个聚合结果。

reduce转换:将一个keyedstream按照指定的reduce函数进行滚动组合,得到一个datastream。datastream中的每个元素是一个组合结果。

fold转换:将一个keyedstream按照指定的fold函数和初始值进行滚动折叠,得到一个datastream。datastream中的每个元素是一个折叠结果。

union转换:将两个或多个类型相同的datastream合并为一个datastream。datastream中的元素是来自不同源的数据流的元素。

connect转换:将两个类型不同的datastream连接为一个connectedstream。connectedstream中的元素是两个源数据流的元素的组合。

coMap或coFlatMap转换:将一个connectedstream按照指定的map或flatMap函数分别对两个源数据流进行处理,得到一个datastream。datastream中的元素是处理后的结果。

filter转换:将一个数据流中的每个元素应用一个布尔函数,只保留返回true的元素,得到一个新的数据流。

示例:

// keyBy转换
DataStream sensorData = env.addSource(new SensorSource());
KeyedStream keyedSensorData = sensorData.keyBy(r -> r.id);


// 聚合转换
DataStream avgTemp = keyedSensorData.timeWindow(Time.seconds(5)).apply(new TemperatureAverager());


// reduce转换
DataStream minTemp = keyedSensorData.reduce((r1, r2) -> {
  if (r1.temperature < r2.temperature) {
    return r1;
  } else {
    return r2;
  }
});


// fold转换
DataStream result = keyedSensorData.fold("start", (current, r) -> current + "-" + r.temperature);


// union转换
DataStream high = sensorData.filter(r -> r.temperature > 25);
DataStream low = sensorData.filter(r -> r.temperature <= 25);
DataStream all = high.union(low);


// connect转换
DataStream<Tuple2> warning = high.map(r -> new Tuple2<>(r.id, r.temperature));
ConnectedStream<Tuple2, SensorReading> connected = warning.connect(low);


// coMap转换
DataStream coMap = connected.map(
  warningData -> warningData.f0 + " " + warningData.f1 + " warning",
  lowData -> lowData.id + " healthy"
);
// map转换
DataStream ds = env.fromElements("Good good study","Day day up");
DataStream ds_map = ds.map(String::toLowerCase);
ds_map.print();


// flatMap转换
DataStream ds = env.fromElements("Good good study","Day day up");
DataStream ds_flatmap = ds.flatMap((value, out) -> {
  for (String word: value.split("\\W+")) {
    out.collect(word);
  }
}).returns(Types.STRING);
ds_flatmap.print();


// filter转换
DataStream ds = env.fromElements("Good good study","Day day up");
DataStream ds_filter = ds.filter(s -> s.contains("study"));
ds_filter.print();


使用 window() 方法将 KeyedStream 转换成 WindowedStream。12

WindowedStream 是一个对键分组的流进行窗口化的数据流,它使用 WindowAssigner 将元素放入窗口中。窗口中的元素既按键也按窗口分组。你可以定义一个 Trigger 来指定何时评估窗口。

KeyedStream<Tuple2, String> keyedStream = ...;
WindowedStream<Tuple2, String, TimeWindow> windowedStream = keyedStream
    .window(TumblingEventTimeWindows.of(Time.seconds(5))); // 按事件时间滚动窗口


使用 apply() 或 process() 方法将 WindowedStream 转换成 DataStream。

apply() 方法接受一个 AllWindowFunction 或 WindowFunction 作为参数,它可以对每个窗口中的元素进行操作,并输出一个或多个结果。

process() 方法接受一个 ProcessAllWindowFunction 或 ProcessWindowFunction 作为参数,它可以访问窗口的元数据,并输出一个或多个结果。

WindowedStream<Tuple2, String, TimeWindow> windowedStream = ...;
DataStream<Tuple2> dataStream = windowedStream
    .apply(new WindowFunction<Tuple2, Tuple2, String, TimeWindow>() {
        @Override
        public void apply(String key, TimeWindow window, Iterable<Tuple2> input, Collector<Tuple2> out) {
            // 对每个窗口中的元素进行操作,并输出结果
        }
    });

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表