专业的编程技术博客社区

网站首页 > 博客文章 正文

flink1.20集群环境搭建(flinkonyarn集群部署)

baijin 2025-03-30 14:18:59 博客文章 19 ℃ 0 评论

前置准备

  • hadoop集群环境用

flink集群规划

vm100:   TaskManager
vm101:   TaskManager+historyserver
vm102:   JobManager+TaskManager

环境搭建

# flink下载
wget https://dlcdn.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz

# 解压至/opt/module目录
tar xzvf flink-1.20.0-bin-scala_2.12.tgz -C /opt/module

# config.yaml修改(vm100,vm101,vm102都需修改)
# 1) jobmanager
# 2) tasmanager
# 3) rest
vi conf/config.yaml
jobmanager.bind-host: 0.0.0.0
jobmanager.rpc.address: vm102
taskmanager.bind-host: 0.0.0.0
rest.address: vm102
rest.bind-host: 0.0.0.0

# conf/masters修改(vm100,vm101,vm102都需修改)
vi conf/masters 
vm102:8081

# conf/workers修改(vm100,vm101,vm102都需修改)
vi conf/workers 
vm100
vm101
vm102

# standalone模式
# 在vm102上启动集群
bin/start-cluster.sh

# 访问web UI
http://vm102:8081/

flink job案例

  • flink流式处理word count代码编写
  • 流式数据源准备


  • flink job提交
# flink流式处理word count代码编写
# 打包flink-samples-1.0-SNAPSHOT.jar

# 1) 通过web UI向集群提交作业
# 2) 命令行方式向集群提交作业
bin/flink run -m vm102:8081 -c com.samples.WordCountStreamUnboundedMain ../flink-samples-1.0-SNAPSHOT.jar
  • 查看flink流式job输出

yarn运行模式配置

  • yarn会话模式
# yarn运行模式
# 环境变量添加
vi /etc/profile.d/my_env.sh
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

# yarn会话模式
bin/yarn-session.sh --help
Usage:
   Optional
     -at,--applicationType      Set a custom application type for the application on YARN
     -D              use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId        Attach to running YARN session
     -j,--jar                   Path to Flink jar file
     -jm,--jobManagerMemory     Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager            Set to yarn-cluster to use YARN execution mode.
     -nl,--nodeLabel            Specify YARN node label for the YARN application
     -nm,--name                 Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue                Specify YARN queue.
     -s,--slots                 Number of slots per TaskManager
     -t,--ship                  Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory    Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace    Namespace to create the Zookeeper sub-paths for high availability mode

# yarn会话模式启动
bin/yarn-session.sh -d -nm testflink
The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1725013965137_0010
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1725013965137_0010
Note that killing Flink might not clean up all job artifacts and temporary files.

# yarn会话模式: 命令行提交作业
bin/flink run -c com.samples.WordCountStreamUnboundedMain ../flink-samples-1.0-SNAPSHOT.jar
Found Yarn properties file under /tmp/.yarn-properties-test

cat /tmp/.yarn-properties-test
#Generated YARN properties file
#Sat Sep 07 07:18:49 PDT 2024
dynamicPropertiesString=
applicationID=application_1725013965137_0009

# yarn会话模式下停止flink应用
echo "stop" | ./bin/yarn-session.sh -id application_1725013965137_0010
  • yarn 单作业模式
# yarn per-job模式启动
bin/flink run -t yarn-per-job -c com.samples.WordCountStreamUnboundedMain ../flink-samples-1.0-SNAPSHOT.jar
# yarn per-job查看应用
bin/flink list -t yarn-per-job -Dyarn.application.id=
# yarn per-job取消job
bin/flink cancel -t yarn-per-job -Dyarn.application.id= 
  • yarn应用模式
# yarn应用模式启动
bin/flink run-application -t yarn-application -c com.samples.WordCountStreamUnboundedMain ../flink-samples-1.0-SNAPSHOT.jar
# yarn应用模式查看应用
bin/flink list -t yarn-application -Dyarn.application.id=
# yarn应用模式取消job
bin/flink cancel -t yarn-application -Dyarn.application.id= 


# 通过HDFS文件启动
# flink lib+plugins上传HDFS
hadoop fs -mkdir /flink-dist
hadoop fs -put lib /flink-dist
hadoop fs -put plugins /flink-dist
# flink job jar上传HDFS
hadoop fs -mkdir /flink-jars
hadoop fs -put flink-samples-1.0-SNAPSHOT.jar /flink-jars

# yarn应用模式通过HDFS文件启动
bin/flink run-application -t yarn-application	-Dyarn.provided.lib.dirs="hdfs://vm100:8020/flink-dist"	-c com.samples.WordCountStreamUnboundedMain hdfs://vm100:8020/flink-jars/flink-samples-1.0-SNAPSHOT.jar

flink历史服务器

# flink历史服务器(vm101)
# 创建hdfs目录
hadoop fs -mkdir -p /logs/completed-jobs

# flink修改conf/config.yaml配置
vi conf/config.yaml
#==============================================================================
# HistoryServer
#==============================================================================

# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)

jobmanager:
  archive:
    fs:
      # Directory to upload completed jobs to. Add this directory to the list of
      # monitored directories of the HistoryServer as well (see below).
      dir: hdfs://vm100:8020/completed-jobs/

historyserver:
  web:
    # The address under which the web-based HistoryServer listens.
    address: vm101
    # The port under which the web-based HistoryServer listens.
    port: 8082
  archive:
    fs:
      # Comma separated list of directories to monitor for completed jobs.
      dir: hdfs://vm100:8020/completed-jobs/
      # Interval in milliseconds for refreshing the monitored directories.
      fs.refresh-interval: 10000

# 启动flink historyserver
bin/historyserver.sh start

# 访问flink historyserver web UI
http://vm101:8082/

# 停止flink historyserver
bin/historyserver.sh stop


Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表