专业的编程技术博客社区

网站首页 > 博客文章 正文

Flink入门学习(flink教学)

baijin 2024-08-28 11:24:53 博客文章 3 ℃ 0 评论

flink

  • 概念Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。Flink具有高吞吐,低延时,时间正确,语义化窗口,在压力下保持正确,操作简单,表现力好等特点。
  • 架构



  • 组件基础配置Data Source: 数据的来源地。做为流处理框架,处理的数据类型有:处理静态的数据集、历史的数据集,实时的处理些实时数据流,实时的产生数据流结果API:添加数据来源StreamExecutionEnvironment.addSource(sourceFunction)数据源分类
  • 基于集合 特点:有界数据集,更偏向于本地测试用
  • 基于文件 特点:适合监听文件修改并读取其内容
  • 基于socket 特点:监听主机的 host port,从 Socket 中获取数据自定义 addSource特点:大多数的场景数据都是无界的,会源源不断的过来。比如去消费 Kafka 某个 topic 上的数据,这时候就需要用到这个 addSource,可能因为用的比较多的原因吧,Flink 直接提供了 FlinkKafkaConsumer011 等类可供你直接使用。可以使用FlinkKafkaConsumerBase 这个基础类,它是 Flink Kafka 消费的最根本的类。基于集合基于文件基于socket自定义添加数据源,代码示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<KafkaEvent> input = env
		.addSource(
			new FlinkKafkaConsumer011<>(
				parameterTool.getRequired("input-topic"), //从参数中获取传进来的 topic 
				new KafkaEventSchema(),
				parameterTool.getProperties())
			.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()));
  • 入门示例
    • 安装flink mac安装
      • brew install apache-flink
    • 验证检查安装
      • flink --version
    • 启动
      • /usr/local/Cellar/apache-flink/1.10.1/libexec/bin ./start-cluster.sh
    • 访问:http://127.0.0.1:8081/
  • 项目创建
    • pom.xml
<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<flink.version>1.6.1</flink.version>
		<java.version>1.8</java.version>
		<scala.binary.version>2.11</scala.binary.version>
		<maven.compiler.source>${java.version}</maven.compiler.source>
		<maven.compiler.target>${java.version}</maven.compiler.target>
	</properties>

	<repositories>
		<repository>
			<id>apache.snapshots</id>
			<name>Apache Development Snapshot Repository</name>
			<url>https://repository.apache.org/content/repositories/snapshots/</url>
			<releases>
				<enabled>false</enabled>
			</releases>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</repository>
	</repositories>

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.7</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
			<scope>runtime</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>

			<!-- Java Compiler -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.1</version>
				<configuration>
					<source>${java.version}</source>
					<target>${java.version}</target>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.0.0</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<artifactSet>
								<excludes>
									<exclude>org.apache.flink:force-shading</exclude>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>myflink.StreamingJob</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>

		<pluginManagement>
			<plugins>
				<plugin>
					<groupId>org.eclipse.m2e</groupId>
					<artifactId>lifecycle-mapping</artifactId>
					<version>1.0.0</version>
					<configuration>
						<lifecycleMappingMetadata>
							<pluginExecutions>
								<pluginExecution>
									<pluginExecutionFilter>
										<groupId>org.apache.maven.plugins</groupId>
										<artifactId>maven-shade-plugin</artifactId>
										<versionRange>[3.0.0,)</versionRange>
										<goals>
											<goal>shade</goal>
										</goals>
									</pluginExecutionFilter>
									<action>
										<ignore/>
									</action>
								</pluginExecution>
								<pluginExecution>
									<pluginExecutionFilter>
										<groupId>org.apache.maven.plugins</groupId>
										<artifactId>maven-compiler-plugin</artifactId>
										<versionRange>[3.1,)</versionRange>
										<goals>
											<goal>testCompile</goal>
											<goal>compile</goal>
										</goals>
									</pluginExecutionFilter>
									<action>
										<ignore/>
									</action>
								</pluginExecution>
							</pluginExecutions>
						</lifecycleMappingMetadata>
					</configuration>
				</plugin>
			</plugins>
		</pluginManagement>
	</build>

	<profiles>
		<profile>
			<id>add-dependencies-for-IDEA</id>

			<activation>
				<property>
					<name>idea.version</name>
				</property>
			</activation>

			<dependencies>
				<dependency>
					<groupId>org.apache.flink</groupId>
					<artifactId>flink-java</artifactId>
					<version>${flink.version}</version>
					<scope>compile</scope>
				</dependency>
				<dependency>
					<groupId>org.apache.flink</groupId>
					<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
					<version>${flink.version}</version>
					<scope>compile</scope>
				</dependency>
			</dependencies>
		</profile>
	</profiles>
    • Log4j.properties
log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    • SocketTextStreamWordCount
SocketTextStreamWordCount

package flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author JefferyChang
 * @date 2020/6/3
 */
public class SocketTextStreamWordCount {

    public static void main(String[] args) throws Exception {
        //参数检查
        if (args.length != 2) {
            System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }
        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //获取数据
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);
        //计数
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);
        sum.print();
        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token : tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }


}
    • 工程打包 mvn clean package -Dmaven.test.skip=true
    • 开启监听 9000端口
    • flink 安装目录 bin 下执行以下命令跑程序:
    • 使用 http://127.0.0.1:8081/ 验证或者 直接使用日志查看

Flink概念

  • 窗口
    1. 时间窗口:是最简单和最有用的一种窗口。它支持滚动和滑动。假设要对传感器输出的数值求和。一分钟滚动窗口收集最近一分钟的数值,并在一分钟结束时输出总和api:stream.timeWindow(Time.minutes(1))




    一分钟滑动窗口计算最近一分钟的数值总和,但每半分钟滑动一次并输出结果 api:stream.timeWindow(Time.minutes(1), Time.seconds(30))

    1. 计数窗口:采用计数窗口时,分组依据不再是时间戳,而是元素的数量。上图滑动窗口也可以解释为由4 个元素组成的计数窗口,并且每两个元素滑动一次。滚动窗口API:stream.countWindow(4)滑动窗口API:stream.countWindow(4, 2)
    2. 会话窗口:会话指的是活动阶段,其前后都是非活动阶段,例如用户与网站进行一系列交互(活动阶段)之后,关闭浏览器或者不再交互(非活动阶段)。会话需要有自己的处理机制,因为它们通常没有固定的持续时间(有些 30 秒就结束了,有些则长达一小时),或者没有固定的交互次数(有些可能是 3 次点击后购买,另一些可能是 40 次点击却没有购买)。会话窗口由超时时间设定,即希望等待多久才认为会话已经结束。如果用户处于非活动状态长达 5 分钟,则认为会话结束。API: stream.window(SessionWindows.withGap(Time.minutes(5))
  • 触发器:触发器控制生成结果的时间,即何时聚合窗口内容并将结果返回给用户。每一个默认窗口都有一个触发器。例如,采用事件时间的时间窗口将在收到水印时被触发。对于用户来说,除了收到水印时生成完整、准确的结果之外,也可以实现自定义的触发器(例如每秒提供一次近似结果)。
  • 时空穿梭:时空穿梭意味着将数据流倒回至过去的某个时间,重新启动处理程序,直到处理至当前时间为止。流处理架构拥有时空穿梭(即重新处理数据)的能力。流处理器支持事件时间,这意味着将数据流“倒带”,用同一组数据重新运行同样的程序,会得到相同的结果。
  • 水印:事件时间,事件时间能保证结果正确,并使流处理架构拥有重新处理数据的能力。Flink 通过水印来推进事件时间。水印是嵌在流中的常规记录,计算程序通过水印获知某个时间点已到。对于上述一分钟滚动窗口,假设水印标记时间为 10:01:00(或者其他时间,如 10:03:43),那么收到水印的窗口就知道不会再有早于该时间的记录出现,因为所有时间戳小于或等于该时间的事件都已经到达。这时,窗口可以安全地计算并给出结果(总和)。水印使事件时间与处理时间完全无关。迟到的水印(“迟到”是从处理时间的角度而言)并不会影响结果的正确性,而只会影响收到结果的速度。
  • Tags:

    本文暂时没有评论,来添加一个吧(●'◡'●)

    欢迎 发表评论:

    最近发表
    标签列表