专业的编程技术博客社区

网站首页 > 博客文章 正文

Flink部署模式和任务提交模式介绍

baijin 2024-11-30 11:15:27 博客文章 4 ℃ 0 评论

集群部署模式

Local模式:

    • 通过一个JVM进程中,通过线程模拟出各个Flink角色来得到Flink环境
    • 只支持会话模式(Session Mode)

Standalone模式:

    • 各个角色是独立的进程存在
    • 只支持会话模式(Session Mode)

YARN模式:

    • Flink的各个角色,均运行在多个YARN的容器内,其整体上是一个YARN的任务
    • 支持会话模式(Session Mode), 单作业模式(Per-Job Mode), 应用模式(Application)

作业提交运行模式

模式

生命周期

资源隔离

优点

缺点

main方法

Session

关闭会话,才会停止

共用JM和TM

预先启动,启动作业不再启动。资源充分共享

资源隔离比较差,TM不容易扩展

在客户端执行

Per-job

Job停止,集群停止

单个Job独享JM和TM

充分隔离,资源根据job按需申请

job启动慢,每个job需要启动一个JobManager

在客户端执行

Application

当Application全部执行完,集群才会停止

Application使用一套JM和TM

Client负载低,Application之间实现资源隔离,Application内实现资源共享

per-job模式和session模式的优化部署模式(优点)

在Cluster中

会话模式(Session Mode)

步骤:

    • 第一步,启动一个集群: 创建或附着一个Session, 此时在Session中启动一个集群
    • 第二步,提交作业: 在这个Session中通过客户端提交作业

说明

Session-Cluster模式需要先启动一个 yarn session会话(相当于启动了一个 yarn 任务),然后再提交作业,接着会向 yarn 申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn 中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。

所有作业共享 Dispatcher 和 ResourceManager;共享资源;适合规模小执行时间短的作业。

在Yarn上启动一个Flink集群,并重复使用该集群,资源会一直被占用,除非手动关闭该集群

  • 特点:需要事先申请资源,启动JobManager和TaskManager
  • 优点:不需要每次提交作业申请资源,而是使用已经申请好的资源,从而提高执行效率
  • 缺点:作业执行完成以后,资源不会被释放,因此一直会占用系统资源
  • 应用场景:适合作业提交比较频繁的场景,小作业比较多的场景。

如图所示。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。

优点:

只需要一个集群,所有的作业提交之后都提交到改集群;集群的生命周期超越于作业之上,作业结束释放资源,集群依然正常运行。

缺点:

    • 由于资源共享,资源不足时,提交新作业会失败
    • 同一个 TaskManager 上可能运行了很多作业,如果其中一个发生故障导致 TaskManager 宕机,所有作业都会受到影响。

会话模式比较适合于单个规模小、执行时间短的大量作业

Session-Cluster模式示意图

启动方式

#1、启动yarn session任务
./bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d 

#2、执行任务
./bin/flink run \
-c com.cn.StreamWordCount \
flinkquickstart-1.0-SNAPSHOT.jar \
--host lcoalhost \
–port 7777

参数说明:

  • -n(--container)TaskManager 的数量。
  • -s(--slots): 每个TaskManagerslot数量,默认每个taskmanagerslot 的个数为 1
  • -jmJobManager 的内存(单位 MB)。
  • -tm:每个 taskmanager 的内存(单位 MB)。
  • -nmyarnappName(现在 yarnui 上的名字)。
  • -d:后台执行。

单作业模式(Per-Job Mode)--即将被废弃

步骤:

  • 只有一步,提交作业:提交作业时会启动一个集群, 该集群只为该作业服务, 作业完成后,集群就会关闭,所有资源也会释放

说明:

job提交前会创建一个新的flink-yarn-cluster集群,然后将flink job提交到flink-yarn-cluster上,flink job会根据自身情况申请资源,任务之间互相独立、互不影响。任务执行完成之后创建的集群也会消失。

独享 Dispatcher 和 ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。

单作业模式,是严格的一对一,集群只为这个作业而生,每个集群都有一个JobManager
同样由客户端运行应用程序,然后启动集群,作业被提交给 JobManager,进而分发给 TaskManager 执行。
作业完成后,集群就会关闭,所有资源也会释放针对每个Flink任务在Yarn上启动一个独立的Flink集群并允许,结束后自动关闭并释放资源。
这样一来,每个作业都有它自己的 JobManager管理,占用独享的资源,即使发生故障,它的 TaskManager 宕机也不会影响其他作业。

这些特性使得单作业模式在生产环境运行更加稳定,但该模式即将被废弃,被Application模式取代。

需要注意的是,Flink 本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如 YARN、Kubernetes。所以 Flink 的独立(Standalone)集群并不支持单作业模式部署

Per-job mode is only supported by YARN and has been deprecated in Flink 1.15. It will be dropped in FLINK-26000. Please consider application mode to launch a dedicated cluster per-job on YARN.

  • 特点:每次提交作业都需要申请一次资源
  • 优点:作业运行完成,资源会立刻被释放,不会一直占用系统资源
  • 缺点:每次提交作业都需要申请资源,会影响作业执行效率,因为申请资源需要消耗时间
  • 应用场景:适合作业比较少的场景、大作业的场景

会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源考虑为每个提交的作业启动一个集群,单作业(Per-Job)模式,如图所示。

Per-Job-Cluster模式示意图

启动方式

./bin/flink run \
–m yarn-cluster \
-c com.cn.StreamWordCount \
flinkquickstart-1.0-SNAPSHOT.jar \
--host lcoalhost –port 7777

应用模式(Application Mode)

步骤:

  • 只有一步,提交作业:提交作业时会启动一个集群, 该集群只为该作业服务, 作业完成后,集群就会关闭,所有资源也会释放

说明:

前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给 JobManager。客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;很多情况下提交作业用的是同一个客户端,会加重客户端所在节点的资源消耗。

Application Mode不需要客户端,直接将应用提交到 JobManger 上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个 JobManager,即创建一个集群。该 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了,这就是所谓的应用模式。

应用模式与单作业模式,不会提前创建集群,都是提交作业之后才创建集群,所以不能调用 start-cluster.sh 脚本;需要在bin 目录下的 standalone-job.sh 来创建一个 JobManager。

单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;
而应用模式下,是直接由 JobManager 执行应用程序的,并且即使应用包含了多个作业,也只创建一个集群。

Application Mode如图所示。

具体步骤如下:

  • 进入到 Flink 的安装路径下,将应用程序的 jar 包放到 lib/目录下。
$ cp ./FlinkTutorial-1.0-SNAPSHOT.jar $FLINK_HOME/lib/
  • 执行以下命令,启动 JobManager
# 这里直接指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包。
$ ./bin/standalone-job.sh start \
--job-classname com.atguigu.wc.StreamWordCount
  • 同样是使用 bin 目录下的脚本,启动 TaskManager
$ ./bin/taskmanager.sh start
  • 如果希望停止集群,同样可以使用脚本,命令如下。
$ ./bin/standalone-job.sh stop
$ ./bin/taskmanager.sh stop

独立部署模式(Standalone)

集群部署模式 之 独立部署模式(Standalone)介绍。

独立模式(Standalone)是部署 Flink 最基本也是最简单的方式:所需要的所有 Flink 组件,都只是操作系统上运行的一个 JVM 进程

独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。

另外,也可以将独立模式的集群放在容器中运行。Flink 提供了独立模式的容器化部署方式,可以在 Docker 或者 Kubernetes 上进行部署。

会话模式(Session Mode)

可以发现,独立模式的特点是不依赖外部资源管理平台,而会话模式的特点是先启动集群、后提交作业

单作业模式(Per-Job Mode)

Flink 本身无法直接以单作业方式启动集群,一般需要借助一些资源管理平台。所以 Flink 的独立(Standalone)集群并不支持单作业模式部署

应用模式(Application)

Flink 的独立(Standalone)集群并不支持Application模式部署

高可用

分布式除了提供高吞吐,另一大好处就是有更好的容错性。对于 Flink 而言,因为一般会有多个 TaskManager,即使运行时出现故障,也不需要将全部节点重启,只要尝试重启故障节点就可以了。但是我们发现,针对一个作业而言,管理它的 JobManager 却只有一个,这同样有可能出现单点故障。为了实现更好的可用性,我们需要 JobManager 做一些主备冗余,这就是所谓的高可用(High Availability,简称 HA)。

通过配置,让集群在任何时候都有一个主 JobManager 和多个备用 JobManagers,如图所示,这样主节点故障时就由备用节点来接管集群,接管后作业就可以继续正常运行。主备 JobManager 实例之间没有明显的区别,每个 JobManager 都可以充当主节点或者备节点。

Local本地模式

一般用来本地测试和演示使用。Local模式只支持会话模式。

通过一个JVM进程中,通过线程模拟出各个Flink角色来得到Flink环境

YARN 模式

介绍

独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但我们知道,Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是 YARN 了。所以接下来我们就将学习,在强大的 YARN 平台上 Flink 是如何集成部署的。

整体来说,YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn 的 ResourceManager, Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源。

高可用

YARN 模式的高可用和独立模式(Standalone)的高可用原理不一样。

Standalone 模式中, 同时启动多个 JobManager, 一个为“领导者”(leader),其他为“后备”(standby), 当 leader 挂了, 其他的才会有一个成为 leader。

而 YARN 的高可用是只启动一个 Jobmanager, 当这个 Jobmanager 挂了之后, YARN 会再次启动一个, 所以其实是利用的 YARN 的重试次数来实现的高可用。

Standalone独立集群模式

Standalone只支持会话模式(Session Mode), 不支持单作业模式(Per-Job Mode)应用模式(Application Mode)部署。

是集群模式的一种,但是这种模式一般并不运行在生产环境中。

各个角色是独立的进程存在。

优点:

    • Standalone模式的部署相对简单,可以支持小规模,少量的任务运行;
    • Standalone模式缺少系统层面对集群中Job的管理,容易遭成资源分配不均匀;
    • 资源隔离相对简单,任务之间资源竞争严重。

缺点:

    • JobManager有明显的单点问题,JobManager肩负任务调度和资源分配,一个JobManger出现意外,后果可想而知。

修改conf.yaml

jobmanager.rpc.address: node1
taskmanager.numberOfTaskSlots: 2
web.submit.enable: true

# 历史服务器
jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
historyserver.web.address: node1
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/

Standalone-HA高可用集群模式

zookeeper HA配置参考,修改conf.yaml

high-availability: zookeeper    # 实现高可用的方式
high-availability.storageDir: hdfs://node1:8020/flink/ha/  # 数据持久化地址,需要在hdfs创建
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181 # zk,提供分布式协调服务
high-availability.zookeeper.path.root: /flink   # 指定Zookeeper的根节点
high-availability.cluster-id: /cluster_one  # 为每个集群指定一个id

Flink-On-Yarn(生产使用)

  • Standalone支持会话模式(Session Mode), 不支持单作业模式(Per-Job Mode)应用模式(Application Mode)部署。
  • Flink的各个角色,均运行在多个YARN的容器内,其整体上是一个YARN的任务。
  • Flink On Yarn的前提是:hdfs、yarn均启动。

背景介绍

独立(Standalone)模式Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但我们知道,Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是 YARN 了。所以接下来我们就将学习,在强大的 YARN 平台上 Flink 是如何集成部署的。

整体来说,YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn 的 ResourceManager, Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源。

为什么生产使用Flink-On-Yarn

在企业实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:

    • Yarn的资源可以按需使用,提高集群的资源利用率
    • Yarn的任务有优先级,根据优先级运行作业
    • 基于Yarn调度系统,能够自动化地处理各个角色的 Failover( 容错 )
    • JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控
    • 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器
    • 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager

Flink On Yarn模式常见配置参数及说明

需要下载对应的Hadoop依赖包,并将对应的依赖复制到flink的lib目录下。

zookeeper HA配置参考。

Flink 本身提供了内置 ZooKeeper 插件,可以直接修改 conf/zoo.cfg,并且使用 /bin/start-zookeeper-quorum.sh直接启动。

配置参数

说明

jobmanager.rpc.address: master

jobmanager地址

jobmanager.heap.size: 1024m

jobmanager JVM heap内存大小

jobmanager.rpc.port: 6123

jobmanager的rpc通信端口

jobmanager.execution.failover-strategy: region

重启策略

taskmanager.memory.process.size: 1568m

taskmanager进程使用的所有内存大小

taskmanager.numberOfTaskSlots: 1

每个taskmanager提供的slot数量

parallelism.default: 1

默认的并行度

io.tmp.dirs: /tmp

临时存储目录

Flink任务处理过程

Flink系统启动时,首先启动JobManager和一至多个TaskManager

JobManager负责协调Flink系统,TaskManager则是执行并行程序的worker

当系统以本地形式启动时,一个JobManager和一个TaskManager会启动在同一个JVM中。

当一个程序被提交后,系统会创建一个Client来进行预处理,将程序转变成一个并行数据流的形式,交给JobManagerTaskManager执行。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表