网站首页 > 博客文章 正文
独立部署(Standalone)模式由Flink自身提供计算资源,无需其他框架提供资源,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是我们也要知道,Flink主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成更靠谱,所以接下来我们来学习在强大的Yarn环境中Flink是如何使用的。(其实是因为在国内工作中,Yarn使用的非常多)
一、Yarn模式配置
把Flink应用提交给Yarn的ResourceManager, Yarn的ResourceManager会申请容器从Yarn的NodeManager上面. Flink会创建JobManager和TaskManager在这些容器上.Flink会根据运行在JobManger上的job的需要的slot的数量动态的分配TaskManager资源
1. 复制flink-yarn
cp -r flink-1.13.1 flink-yarn
2.配置环境变量HADOOP_CLASSPATH, 如果前面已经配置可以忽略。
在/etc/profile.d/my.sh中配置并分发
export HADOOP_CLASSPATH=`hadoop classpath`
二、Yarn运行无界流WordCount
1.启动hadoop集群(hdfs, yarn)
2.运行无界流
bin/flink run -t yarn-per-job -c com.bigdata.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
3.在yarn的ResourceManager界面查看执行情况
三、Flink on Yarn的3种部署模式
Flink提供了yarn上运行的3种模式,分别为Session-Cluster,Application Mode和Per-Job-Cluster模式。
1. Session-Cluster
Session-Cluster模式需要先启动Flink集群,向Yarn申请资源。以后提交任务都向这里提交。这个Flink集群会常驻在yarn集群中,除非手动停止。
在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交。
缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job。
所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job。
2. Per-Job-Cluster
一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
3. Application Mode
Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行。只要应用程序执行结束, Flink集群会马上被关闭。也可以手动停止集群。
与Per-Job-Cluster的区别:就是Application Mode下, 用户的main函数式在集群中执行的,并且当一个application中有多个job的话,per-job模式则是一个job对应一个yarn中的application,而Application Mode则这个application中对应多个job。
测试代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Application_Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
test1(env);
test2(env);
test3(env);
}
public static void test1(StreamExecutionEnvironment env) throws Exception {
DataStreamSource<String> stringDataStreamSource = env.fromElements("22222");
stringDataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value;
}
}).print();
env.execute();
}
public static void test2(StreamExecutionEnvironment env) throws Exception {
DataStreamSource<String> stringDataStreamSource = env.fromElements("22222");
stringDataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value;
}
}).print();
env.execute();
}
public static void test3(StreamExecutionEnvironment env) throws Exception {
DataStreamSource<String> stringDataStreamSource = env.socketTextStream("hadoop102", 9999);
stringDataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value;
}
}).print();
env.execute();
}
}
打包上传到集群,分别以Per-job和Application Mode模式运行,对比区别
Per-job模式执行结果,一个job对应一个Application
Application Mode模式执行结果,多个job对应一个Application并且对应一个flink集群。
官方建议:
出于生产的需求, 我们建议使用Per-job or Application Mode,因为他们给应用提供了更好的隔离!
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/
四、各模式下执行无界流WordCount
1. Per-Job-Cluster模式执行无界流WordCount
bin/flink run -d -t yarn-per-job -c com.bigdata.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
提交任务到Yarn的其他队列
bin/flink run -d -m yarn-cluster -yqu hive -c com.bigdata.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar(老版本)
bin/flink run -d -t yarn-per-job -Dyarn.application.queue=hive -c com.bigdata.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
2. Session-Cluster模式执行无界流WordCount
2.1 启动一个Flink-Session
bin/yarn-session.sh -d
2.2 在Session上运行Job
bin/flink run -c com.bigdata.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
会自动找到你的yarn-session启动的Flink集群.也可以手动指定你的yarn-session集群:
bin/flink run -t yarn-session -Dyarn.application.id=application_XXXX_YY -c com.bigdata.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
注意: application_XXXX_YY 指的是在yarn上启动的yarn应用
如果是1.12版本开启了Yarn模式的高可用,上面指定yarn-session集群的命令不能用,需要去掉 -t yarn-session (1.13版本已修复)
bin/flink run -Dyarn.application.id=application_XXXX_YY -c com.bigdata.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
3. Application Mode模式执行无界流WordCount
bin/flink run-application -t yarn-application -c com.bigdata.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
五、Yarn模式高可用
Yarn模式的高可用和Standalone模式的高可用原理不一样。
Standalone模式中, 同时启动多个Jobmanager, 一个为leader其他为standby的, 当leader挂了, 其他的才会有一个成为leader。
yarn的高可用是同时只启动一个Jobmanager, 当这个Jobmanager挂了之后, yarn会再次启动一个, 其实是利用的yarn的重试次数来实现的高可用。
1. 在yarn-site.xml中配置
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
注意: 配置完不要忘记分发, 和重启yarn
2. 在flink-conf.yaml中配置
yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop102:8020/flink/yarn/ha
high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.zookeeper.path.root: /flink-yarn
3. 启动yarn-session
4. 杀死Jobmanager, 查看的他的复活情况
注意: yarn-site.xml中是它活的次数的上限, flink-conf.xml中的次数应该小于这个值。
5. 测试过程中会发现一直kill不掉jobManager,是因为除了重试次数这个机制外,还有一个时间的机制(Akka超时时间),如果在一定的时间内jobManager重新拉取了几次还是挂掉的话,那就会真正的挂掉。
总结
Flink提供了yarn上运行的3种模式,分别为Session-Cluster,Application Mode和Per-Job-Cluster模式。Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job。Per-Job-Cluster适合规模大长时间运行的作业。Per-job模式执行结果,一个job对应一个Application。Application Mode模式执行结果,多个job对应一个Application并且对应一个Flink集群。
猜你喜欢
- 2024-10-14 国庆期间别偷懒,大数据平台只差YARN了
- 2024-10-14 为什么我们从Yarn切换到pnpm(yarn设置淘宝镜像)
- 2024-10-14 flink的yarn模式部署(flink的三种部署方案)
- 2024-10-14 yarn的安装和使用(yarn安装教程)
- 2024-10-14 yarn 的安装和使用(yarn安装教程)
- 2024-10-14 操作学习-Spark on Yarn(spark on yarn执行流程)
- 2024-10-14 全面解析:Hadoop基础——YARN(hadoop yarn工作原理)
- 2024-10-14 Node简史及好书推荐(nodejs相关书籍)
- 2024-10-14 大数据之-Hadoop3.x_Yarn_常用命令---大数据之hadoop3.x_0147
- 2024-10-14 大数据系列文章之Yarn组件及其执行流程
你 发表评论:
欢迎- 367℃用AI Agent治理微服务的复杂性问题|QCon
- 358℃初次使用IntelliJ IDEA新建Maven项目
- 357℃手把手教程「JavaWeb」优雅的SpringMvc+Mybatis整合之路
- 351℃Maven技术方案最全手册(mavena)
- 348℃安利Touch Bar 专属应用,让闲置的Touch Bar活跃起来!
- 346℃InfoQ 2024 年趋势报告:架构篇(infoq+2024+年趋势报告:架构篇分析)
- 345℃IntelliJ IDEA 2018版本和2022版本创建 Maven 项目对比
- 342℃从头搭建 IntelliJ IDEA 环境(intellij idea建包)
- 最近发表
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)