当前位置: 首页>资讯 >

大数据Flink进阶(十五):Flink On Yarn任务提交

来源: 腾讯云 | 时间: 2023-04-10 14:13:02 |

Flink On Yarn任务提交

一、Flink On Yarn运行原理

Flink On Yarn即Flink任务运行在Yarn集群中,Flink On Yarn的内部实现原理如下图:

当启动一个新的Flink YARN Client会话时,客户端首先会检查所请求的资源(容器和内存)是否可用,之后,它会上传Flink配置和JAR文件到HDFS。客户端的下一步是向ResourceManager请求一个YARN容器启动ApplicationMaster。JobManager和ApplicationMaster(AM)运行在同一个容器中,一旦它们成功地启动了,AM就能够知道JobManager的地址,它会为TaskManager生成一个新的Flink配置文件(这样它才能连上JobManager),该文件也同样会被上传到HDFS。另外,AM容器还提供了Flink的Web界面服务。Flink用来提供服务的端口是由用户和应用程序ID作为偏移配置的,这使得用户能够并行执行多个YARN会话。之后,AM开始为Flink的TaskManager分配容器(Container),从HDFS下载JAR文件和修改过的配置文件,一旦这些步骤完成了,Flink就可以基于Yarn运行任务了。

Flink On Yarn任务提交支持Session会话模式、Per-Job单作业模式、Application应用模式。下面分别介绍这三种模式的任务提交命令和原理。


(资料图片)

二、代码及Yarn环境准备

1、准备代码

为了能演示出不同模式的效果,这里我们编写准备Flink代码形成一个Flink Application,该代码中包含有2个job。Flink允许在一个main方法中提交多个job任务,多Job执行的顺序不受部署模式影响,但受启动Job的调用影响,每次调用execute()或者executeAsyc()方法都会触发job执行,我们可以在一个Flink Application中执行多次execute()或者executeAsyc()方法来触发多个job执行,两者区别如下:

execute():该方法为阻塞方法,当一个Flink Application中执行多次execute()方法触发多个job时,下一个job的执行会被推迟到该job执行完成后再执行。executeAsyc():该方法为非阻塞方法,一旦调用该方法触发job后,后续还有job也会立即提交执行。

当一个Flink Application中有多个job时,这些job之间没有直接通信的机制,所以建议编写Flink代码时一个Application中包含一个job即可,目前只有非HA的Application模式可以支持多job运行。后续打包运行包含多个job的Flink代码如下:

//1.准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.读取Socket数据 ,获取ds1和ds2DataStreamSource ds1 = env.socketTextStream("node3", 8888);DataStreamSource ds2 = env.socketTextStream("node3", 9999);//3.1 对ds1 直接输出原始数据SingleOutputStreamOperator> transDs1 = ds1.flatMap((String line, Collector> out) -> {    String[] words = line.split(",");    for (String word : words) {        out.collect(Tuple2.of(word, 1));    }}).returns(Types.TUPLE(Types.STRING, Types.INT));transDs1.print();env.executeAsync("first job");//3.2 对ds2准备K,V格式数据 ,统计实时WordCountSingleOutputStreamOperator> tupleDS = ds2.flatMap((String line, Collector> out) -> {    String[] words = line.split(",");    for (String word : words) {        out.collect(Tuple2.of(word, 1));    }}).returns(Types.TUPLE(Types.STRING, Types.INT));tupleDS.keyBy(tp -> tp.f0).sum(1).print();//5.execute触发执行env.execute("second job");

将以上代码进行打包,名称为"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar",并在node3节点上启动多个socket服务

[root@node3 ~]# nc -lk 8888[root@node3 ~]# nc -lk 9999

2、yarn 环境准备

在Per-Job模式中,Flink每个job任务都会启动一个对应的Flink集群,基于Yarn提交后会在Yarn中同时运行多个实时Flink任务,在HDFS中$HADOOP_HOME/etc/hadoop/capacity-scheduler.xml中有"yarn.scheduler.capacity.maximum-am-resource-percent"配置项,该项默认值为0.1,表示Yarn集群中运行的所有ApplicationMaster的资源比例上限,默认0.1表示10%,这个参数变相控制了处于活动状态的Application个数,所以这里我们修改该值为0.5,否则后续在Yarn中运行多个Flink Application时只有一个Application处于活动运行状态,其他处于Accepted状态。

所有HDFS节点配置$HADOOP_HOME/etc/hadoop/capacity-scheduler.xml文件,修改如下配置项为0.5:

    yarn.scheduler.capacity.maximum-am-resource-percent    0.5          Maximum percent of resources in the cluster which can be used to run application masters i.e. controls number of concurrent running applications.      

至此,Flink On Yarn运行环境准备完毕。

三、Yarn Session模式

1、任务提交命令

Yarn Session模式首先需要在Yarn中初始化一个Flink集群(称为Flink Yarn Session 集群),开辟指定的资源,以后的Flink任务都提交到这里。这个Flink集群会常驻在YARN集群中,除非手工停止(yarn application -kill id),当手动停止yarn application对应的id时,运行在当前application上的所有flink任务都会被kill。这种方式创建的Flink集群会独占资源,不管有没有Flink任务在执行,YARN上面的其他任务都无法使用这些资源。

1.1、启动Yarn Session集群

启动Yarn Session 集群前首先保证HDFS和Yarn正常启动,这里在node5节点上来使用名称创建Yarn Session集群,命令如下:

[root@node3 ~]# cd /software/flink-1.16.0/bin/#启动Yarn Session集群,名称为lansonjy,每个TM有3个slot[root@node3 bin]# ./yarn-session.sh -s 3 -nm lansonjy -d

以上启动Yarn Session集群命令的参数解释如下:

参数

解释

-d

--detached,Yarn Session集群启动后在后台独立运行,退出客户端,也可不指定,则客户端不退出。

-nm

--name,自定义在YARN上运行Application应用的名字。

-jm

--jobManagerMemory,指定JobManager所需内存,单位MB。

-tm

--taskManagerMemory,指定每个TaskManager所需的内存,单位MB。

-s

--slots,指定每个TaskManager上Slot的个数。

-id

--applicationId,指定YARN集群上的任务ID,附着到一个后台独立运行的yarn session中。

-qu

--queue,指定Yarn的资源队列。

以上命令执行完成后,可以在Yarn WebUI(https://node1:8088)中看到启动的Flink Yarn Session集群:

点击Tracking UI"ApplicationMaster"可以跳转到Flink Yarn Session集群 WebUI页面中:

目前在Yarn Session集群WebUI中看不到启动的TaskManager ,这是因为Yarn会按照提交任务的需求动态分配TaskManager数量,所以Flink 基于Yarn Session运行任务资源是动态分配的。

此外,创建出Yarn Session集群后会在node5节点/tmp/下创建一个隐藏的".yarn-properties-<用户名>" Yarn属性文件,有了该文件后,在当前节点提交Flink任务时会自动发现Yarn Session集群并进行任务提交。

1.2、向Yarn Session集群中提交作业

[root@node3 ~]# cd /software/flink-1.16.0/bin/#执行如下命令,会根据.yarn-properties-<用户名>文件,自动发现yarn session 集群[root@node3 bin]# ./flink run -c com.lanson.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar #也可以使用如下命令指定Yarn Session集群提交任务,-t 指定运行的模式[root@node3 bin]# ./flink run -t yarn-session -Dyarn.application.id=application_1671607810626_0001 -c com.lanson.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

以上命令执行之后,可以查看对应的Yarn Session 对应的Flink集群,可以看到启动了2个Flink Job任务、启动1个TaskManager,分配了3个Slot。

1.3、任务资源测试

按照以上方式继续提交一次Flink Application,可以看到会申请新的TaskManager:

查看集群中任务列表并取消各个任务,命令如下:

#查看Yarn Session集群中任务列表 后面跟上Yarn Application ID[root@node3 bin]# ./flink list------------------ Running/Restarting Jobs -------------------87f6f9a45fd9a9533e93a94dff455b66 : first job (RUNNING)0d5cd72d8f59ed0eb51d2d64124d4859 : second job (RUNNING)cff599a2d43a33195702ca7e7512feb4 : first job (RUNNING)6498d664a8e141ed7503046c5fb9fa9a : second job (RUNNING)--------------------------------------------------------------#取消任务命令,也可以在WebUI中“cancel”取消任务[root@node3 bin]# ./flink cancel 87f6f9a45fd9a9533e93a94dff455b66 [root@node3 bin]# ./flink cancel 0d5cd72d8f59ed0eb51d2d64124d4859 [root@node3 bin]# ./flink cancel cff599a2d43a33195702ca7e7512feb4 [root@node3 bin]# ./flink cancel 6498d664a8e141ed7503046c5fb9fa9a

当任务取消后,等待30s后(resourcemanager.taskmanager-timeout=30000ms)可以看到TaskManager数量为0,说明Flink基于Yarn Session模式提交任务会动态进行资源分配。

1.4、集群停止

停止Yarn Session集群可以在Yarn WebUI中找到对应的ApplicationId,执行如下命令关闭任务即可。

[root@node3 bin]# yarn application -kill application_1671607810626_0001

2、任务提交流程

Yarn Session 模式下提交任务首先创建Yarn Session 集群,创建该集群实际上就是启动了JobManager,启动JobManager同时会启动Dispatcher和ResourceManager,当客户端提交任务时,才会启动JobMaster以及根据提交的任务需求资源情况来动态分配启动TaskManager。

Yarn Session模式下提交任务流程如下:

客户端向Yarn Session集群提交任务,客户端会将任务转换成JobGraph提交给JobManager。Dispatcher启动JobMaster并将JobGraph提交给JobMaster。JobMaster向ResourceManager请求Slot资源。ResourceManager向Yarn的资源管理器请求Container计算资源。Yarn动态启动TaskManager,启动的TaskManager会注册给ResourcemanagerResourceManager会在对应的TaskManager上划分Slot资源。TaskManager向JobMaster offer Slot资源。JobMaster将任务对应的task发送到TaskManager上执行。

四、Yarn Per-Job模式

Per-Job 模式目前只有yarn支持,Per-job模式在Flink1.15中已经被弃用,后续版本可能会完全剔除。Per-Job模式就是直接由客户端向Yarn中提交Flink作业,每个作业形成一个单独的Flink集群。

1、任务提交命令

Flink On Yarn Per-Job模式提交命令如下:

[root@node5 bin]# ./flink run -t yarn-per-job -d -c com.lanson.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

以上提交任务命令的参数解释如下:

参数

解释

-t

--target,指定运行模式,可以跟在flink run 命令后,可以指定"remote", "local", "kubernetes-session", "yarn-per-job"(deprecated), "yarn-session";也可以跟在 flink run-application 命令后,可以指定"kubernetes-application", "yarn-application"。

-c

--class,指定运行的class主类。

-d

--detached,任务提交后在后台独立运行,退出客户端,也可不指定。

-p

--parallelism,执行应用程序的并行度。

以上命令提交后,我们可以通过Yarn WebUI看到有2个Application 启动,对应2个Flink的集群,进入对应的Flink集群WebUI可以看到运行提交的Flink Application中的不同Job任务:

这说明Per-Job模式针对每个Flink Job会启动一个Flink集群。

注意:在基于Yarn Per-Job模式提交任务后,会打印以下错误:

该异常是Hadoop3与Flink整合的bug(https://issues.apache.org/jira/browse/FLINK-19916),不会影响Flink任务基于Yarn提交。错误的原因是Hadoop3启动异步线程来执行一些shutdown钩子,当任务提交后对应的类加载器被释放,这些钩子在作业执行之后执行仍然持有释放的类加载器,因此抛出异常。

取消任务可以使用yarn application -kill ApplicationId也可以执行如下命令:

#取消任务命令执行后对应的 Flink集群也会停止 :flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY [root@node5 bin]# ./flink cancel -t yarn-per-job -Dyarn.application.id=application_1671610064817_0002 805542d84c9944480196ef73911d1b59[root@node5 bin]# ./flink cancel -t yarn-per-job -Dyarn.application.id=application_1671610064817_0003 56365ae67b8e93b1184d22fa567d7ddf

2、任务提交流程

Flink基于Yarn Per-Job 提交任务时,在提交Flink Job作业的同时启动JobManager并启动Flink的集群,根据提交任务所需资源的情况会动态申请启动TaskManager给当前提交的job任务提供资源。

Yarn Per-Job模式下提交任务流程如下:

客户端提交Flink任务,Flink会将jar包和配置上传HDFS并向Yarn请求Container启动JobManagerYarn资源管理器分配Container资源,启动JobManager,并启动Dispatcher、ResourceManager对象。客户端会将任务转换成JobGraph提交给JobManager。Dispatcher启动JobMaster并将JobGraph提交给JobMaster。JobMaster向ResourceManager申请Slot资源。ResourceManager会向Yarn请求Container计算资源Yarn分配Container启动TaskManager,TaskManager启动后会向ResourceManager注册SlotResourceManager会在对应的TaskManager上划分Slot资源。TaskManager向JobMaster offer Slot资源。JobMaster将任务对应的task发送到TaskManager上执行。

Yarn Per-job模式在客户端提交任务,如果在客户端提交大量的Flink任务会对客户端节点性能又非常大的压力,所以在Flink1.15中已经被弃用,后续版本可能会完全剔除,使用Yarn Application模式来替代。

五、Yarn Application模式

Yarn Application 与Per-Job 模式类似,只是提交任务不需要客户端进行提交,直接由JobManager来进行任务提交,每个Flink Application对应一个Flink集群,如果该Flink Application有多个job任务,所有job任务共享该集群资源,TaskManager也是根据提交的Application所需资源情况动态进行申请。

1、任务提交命令

#Yarn Application模式提交任务命令[root@node5 bin]# ./flink run-application -t yarn-application -c com.lanson.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

以上参数解释同Per-Job模式,命令提交后,查看对应Yarn Application,进入到Flink Application的WebUI,可以看到2个Flink 任务共享该集群资源。

查看集群任务、取消集群任务及停止集群命令如下:

#查看Flink 集群中的Job作业:flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY[root@node3 bin]# flink list -t yarn-application -Dyarn.application.id=application_1671610064817_0004------------------ Running/Restarting Jobs -------------------108a7b91cf6b797d4b61a81156cd4863 : first job (RUNNING)5adacb416f99852408224234d9027cc7 : second job (RUNNING)--------------------------------------------------------------#取消Flink集群中的Job作业:flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY [root@node3 bin]# flink cancel -t yarn-application -Dyarn.application.id=application_1671610064817_0004 108a7b91cf6b797d4b61a81156cd4863#停止集群,当取消Flink集群中所有任务后,Flink集群停止,也可以使用yarn application -kill ApplicationID 停止集群[root@node3 bin]# yarn application -kill application_1671610064817_0004

2、任务提交流程

Flink Yarn Application模式提交任务与Per-Job模式任务提交非常类似,只是客户端不再提交一个个的Flink Job ,而是运行任务后,一次性将Application信息提交给JobManager,JobManager根据每个Flink Job作业由Dispatcher启动对应的JobMaster进行资源申请和任务提交。

关键词:

 

热文推荐

大数据Flink进阶(十五):Flink On Yarn任务提交

FlinkOnYarn即Flink任务运行在Yarn集群中,FlinkOnYarn的内部实现原理如下图:

2023-04-10

天天速看:丰巢公开致信用户人新闻_丰巢公开致信用户

一、题文二、解答本文到此结束,希望对大家有所帮助。

2023-04-10

全球要闻:建发房产逆势奔跑 旗下三大上市公司齐发力

文 乐居财经魏薇2022年,房地产行业持续调整。房企经营面临重重考验,行业洗牌加速。但这样的行情下,却有一些企业做到了逆势向上。3月30日,

2023-04-10

天天要闻:专家研究称ChatGPT会影响用户道德判断 不要低估它的影响

GhatGPT是备受关注的热门话题。无论是普通人还是商业领袖,都对它表现出了浓厚的兴趣。他们建议未来的研究可以设计让对话机器人拒绝回答需要给

2023-04-10

环球今热点:浙江小伙一天狂刷武汉7大景点,武汉值得“特种兵式旅游”,更值得长情陪伴

浙江小伙一天狂刷武汉7大景点,武汉值得“特种兵式旅游”,更值得长情陪伴

2023-04-10

【新时代新征程新伟业——真抓实干推动高质量发展】广东打造国际一流营商环境

央视新闻客户端消息(新闻联播):地处开放前沿的广东,持续深化“放管服”改革,打造国际一流营商环境,助推经济高质量发展。来自法国的托玛

2023-04-10

环球今日报丨天津卫生健康领域优化营商环境 推进包容审慎执法 首批明确──六大专业13项内容“首违不罚”

从5月1日开始,我市卫生健康领域将启动实施“首违不罚”,首批明确对涉及六大专业13项内容的初次违法行为不予行政处罚。为了推进包容审慎执法

2023-04-10

在毛乌素用青春按住黄沙

把经过太空育种的长柄扁桃栽回毛乌素沙地,这是治沙人张应龙今年的计划。在距离张应龙300公里的陕西省榆林市定边县,90后治沙青年石健阳今年的

2023-04-10

hvac系统什么意思_hvac-今日聚焦

1、空气调节系统,是包含温度、湿度、空气清净度以及空气循环的控制系统,被称为HVAC(英语:Heating,Venti

2023-04-10

森泰股份(301429.SZ)IPO网上摇号中签结果出炉:中签号共2.8673万个 快播

格隆汇4月9日丨森泰股份(301429 SZ)披露首次公开发行股票并在创业板上市网上摇号中签结果,具体如下:凡参与网上发

2023-04-09

FSR、DLSS谁更好?26款游戏PK:AMD居然0胜!|每日头条

AMDFSR、NVIDIADLSS是双方在同一技术路线上的又一次PK,但是和历史上的太多次类似情况一样,AMD远远落

2023-04-09

世界百事通!完美世界鲁晓寅:正在游戏内推行AI技术应用 情感设计暂时无法替代

4月6日,完美世界发布2022年年度报告,全年营收76 7亿元,归母净利润13 77亿元,同比增长273 07%,扣非净利润6 91亿元,同比增长580 2%。游戏

2023-04-09

【环球聚看点】二婚送什么礼物比较好

送礼一定要投其所好,如果是实在型的,就送一些比较中式的礼品;如果是浪漫型的,就送一些温馨、浪漫的礼物。可以送一束卡通花

2023-04-09

B站宣布终止UWP版本客户端的维护

【ITBEAR科技资讯】4月9日消息,B站近日宣布终止UWP版本客户端的维护。这意味着,用户在登陆B站的UWP客户端时,将无法加载视频等内容信息,界

2023-04-09

中雨大雨!即将抵达安徽!

这两天天气不错你出去踏春了没?还是在家里洗洗晒晒?反转来了27℃→大雨未来7天明后天晴天或多云气温逐步回升周四大雨来袭!请!注!意!气

2023-04-09

英媒:特朗普刑事指控事件暴露美国社会深度撕裂|今热点

英媒:特朗普刑事指控事件暴露美国社会深度撕裂.com-img{width:100%;vertical-ali

2023-04-09

英语名人演说精选 英汉对照_关于英语名人演说精选 英汉对照的简介

1、《英语名人演说精选(英汉对照)》是1999年东方出版中心社出版的图书,作者是孙爱玲。本文关于英语名人演说精选英

2023-04-09

美丽镰鳞果,古生物学者发现约1.7亿年前的“花”

记者从中科院南京地质古生物研究所获悉,古生物学者近期通过化石重新研究,发现了我国西北地区目前已知最早的被子植物,即人们熟

2023-04-09

2023年3月传媒行业月度跟踪:3月传媒领涨全行业 AI技术应用加速落地 每日消息

2023年3月传媒行业月度跟踪:3月传媒领涨全行业AI技术应用加速落地

2023-04-09

网友在环球影城偶遇宋轶,颜值身材都好优越,气质绝佳谁人不爱!

近日,有网友在网上晒出了自己在的VLOG,视频中不仅记录了自己的欢乐时光,还意外地遇到了!网友们纷纷表示羡慕可以偶遇宋轶,还称赞她的新发

2023-04-09

资讯

大数据Flink进阶(十五):Flink On Yarn任务提交

FlinkOnYarn即Flink任务运行在Yarn集群中,FlinkOnYarn的内部实现原理如下图:

2023-04-10     
天天速看:丰巢公开致信用户人新闻_丰巢公开致信用户

一、题文二、解答本文到此结束,希望对大家有所帮助。

2023-04-10     
全球要闻:建发房产逆势奔跑 旗下三大上市公司齐发力

文 乐居财经魏薇2022年,房地产行业持续调整。房企经营面临重重考验,行业洗牌加速。但这样的行情下,却有一些企业做到了逆势向上。3月30日,

2023-04-10     
天天要闻:专家研究称ChatGPT会影响用户道德判断 不要低估它的影响

GhatGPT是备受关注的热门话题。无论是普通人还是商业领袖,都对它表现出了浓厚的兴趣。他们建议未来的研究可以设计让对话机器人拒绝回答需要给

2023-04-10     
环球今热点:浙江小伙一天狂刷武汉7大景点,武汉值得“特种兵式旅游”,更值得长情陪伴

浙江小伙一天狂刷武汉7大景点,武汉值得“特种兵式旅游”,更值得长情陪伴

2023-04-10     
【新时代新征程新伟业——真抓实干推动高质量发展】广东打造国际一流营商环境

央视新闻客户端消息(新闻联播):地处开放前沿的广东,持续深化“放管服”改革,打造国际一流营商环境,助推经济高质量发展。来自法国的托玛

2023-04-10