0x00. Flink lib 包全貌

0.1 flink1.17 自带的jar包

1
2
3
4
5
6
7
8
9
10
11
12
13
-rw-r--r-- 1 501 games    196491 5月  19 18:56 flink-cep-1.17.1.jar
-rw-r--r-- 1 501 games 542620 5月 19 18:59 flink-connector-files-1.17.1.jar
-rw-r--r-- 1 501 games 102472 5月 19 19:02 flink-csv-1.17.1.jar
-rw-r--r-- 1 501 games 135975541 5月 19 19:13 flink-dist-1.17.1.jar
-rw-r--r-- 1 501 games 180248 5月 19 19:02 flink-json-1.17.1.jar
-rw-r--r-- 1 501 games 21043319 5月 19 19:12 flink-scala_2.12-1.17.1.jar
-rw-r--r-- 1 501 games 15407424 5月 19 19:13 flink-table-api-java-uber-1.17.1.jar
-rw-r--r-- 1 501 games 38191226 5月 19 19:08 flink-table-planner-loader-1.17.1.jar
-rw-r--r-- 1 501 games 3146210 5月 19 18:56 flink-table-runtime-1.17.1.jar
-rw-r--r-- 1 501 games 208006 5月 17 18:07 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 501 games 301872 5月 17 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 501 games 1790452 5月 17 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 501 games 24279 5月 17 18:07 log4j-slf4j-impl-2.17.1.jar

0.2 解决问题加入的jar包

下面一些包,有些是为了解决flink自身的问题,有一些是集成其他插件或者服务需要的包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
-rwxrwxr-x 1 hadoop hadoop     53820 3月   9 2017 commons-cli-1.5.0.jar
-rwxrwxr-x 1 hadoop hadoop 4617 2月 9 2023 failureaccess-1.0.1.jar
-rwxrwxr-x 1 hadoop hadoop 3171088 10月 11 11:39 flink-shaded-guava-31.1-jre-17.0.jar

# 将flink-table-planner-loader-1.17.1.jar移动到opt目录,并将opt中flink-table-planner_2.12-1.17.1.jar移动到lib中
-rwxrwxr-x 1 hadoop hadoop 21331589 10月 9 15:43 flink-table-planner_2.12-1.17.1.jar

# hadoop集成
-rwxrwxr-x 1 hadoop hadoop 1654881 10月 11 14:30 hadoop-mapreduce-client-core-3.1.1.jar
-rwxrwxr-x 1 hadoop hadoop 59604787 10月 13 19:20 flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

# connector连接器
-rwxrwxr-x 1 hadoop hadoop 264302 10月 9 15:43 flink-connector-jdbc-3.1.0-1.17.jar
-rwxrwxr-x 1 hadoop hadoop 403255 5月 19 17:26 flink-connector-kafka-1.17.1.jar
-rwxrwxr-x 1 hadoop hadoop 51337510 10月 8 20:02 flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
-rwxrwxr-x 1 hadoop hadoop 5563748 10月 8 20:02 flink-sql-connector-kafka-1.17.1.jar
-rwxrwxr-x 1 hadoop hadoop 18897491 10月 8 20:02 flink-sql-connector-mongodb-cdc-2.4.1.jar
-rwxrwxr-x 1 hadoop hadoop 23600947 10月 8 20:02 flink-sql-connector-mysql-cdc-2.4.1.jar
-rwxrwxr-x 1 hadoop hadoop 731203 10月 8 20:02 flink-sql-connector-rabbitmq-1.17-20230110.003509-121.jar
-rwxrwxr-x 1 hadoop hadoop 74942397 10月 8 20:02 flink-sql-connector-tidb-cdc-2.4.1.jar

# 支持paimon湖仓一体的包
-rwxrwxr-x 1 hadoop hadoop 36096449 10月 8 19:37 paimon-flink-1.17-0.6-20230916.001857-12.jar
-rwxrwxr-x 1 hadoop hadoop 13071 10月 8 19:38 paimon-flink-action-0.6-20230916.001857-12.jar
-rwxrwxr-x 1 hadoop hadoop 35359604 10月 8 19:38 paimon-hive-connector-3.1-0.6-20230916.001857-10.jar
-rwxrwxr-x 1 hadoop hadoop 35683112 10月 8 19:38 paimon-spark-3.3-0.6-20230916.001857-10.jar
-rwxrwxr-x 1 hadoop hadoop 42627639 10月 8 19:38 paimon-trino-422-0.6-20230916.000432-10.jar
# 支持dinky提交任务
-rwxrwxr-x 1 hadoop hadoop 21154 10月 13 12:54 dinky-catalog-mysql-1.17-1.0.0-SNAPSHOT.jar

完整的lib目录,应该结合上面两部分的jar包,但可以根据自身集成的服务做改动。特别注意:将lib/flink-table-planner-loader-1.17.1.jar移动到opt目录,并将opt/flink-table-planner_2.12-1.17.1.jar移动到lib中。

0x01. Standalone运行模式

独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。

1.1 会话模式部署

0)集群规划

表1-1 集群角色分配

节点服务器 hadoop001 hadoop002 hadoop003
角色 JobManager
TaskManager
TaskManager TaskManager

具体安装部署步骤如下:

1)下载并解压安装包

(1)下载安装包flink-1.17.1-bin-scala_2.12.tgz ,将该jar包上传到hadoop001节点服务器的/data/software路径上。

(2)在/data/software路径上解压flink-1.17.1-bin-scala_2.12.tgz/srv/udp/2.0.0.0/路径上。

1
2
3
4
5
# 先将原来的flink版本备份,标注为原来的版本,考虑可以复原
[hadoop@hadoop001 software]$ mv /srv/udp/2.0.0.0/flink /srv/udp/2.0.0.0/flink1.13.2
# 解压新版本flink,并重新命名目录为flink
[hadoop@hadoop001 software]$ tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /srv/udp/2.0.0.0/
[hadoop@hadoop001 software]$ mv /srv/udp/2.0.0.0/flink-1.17.1 /srv/udp/2.0.0.0/flink

2)修改集群配置

(1)进入conf路径,修改flink-conf.yaml文件,指定hadoop001节点服务器为JobManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[hadoop@hadoop001 conf]$ vim flink-conf.yaml
修改如下内容:

# JobManager节点地址.
jobmanager.rpc.address: hadoop001
jobmanager.bind-host: 0.0.0.0
rest.address: hadoop001
rest.bind-address: 0.0.0.0

# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名, hadoop00N节点这里为hadoop00N N=1,2,3,也就是上面角色对应的服务节点
taskmanager.host: hadoop001

# 服务器是8C的,25%给其他应用使用,name单台服务器有6C可以提供给flink使用
taskmanager.numberOfTaskSlots: 6
parallelism.default: 2

特别注意:不要设置rest.port 端口为固定端口,默认注释掉或者设置为0,。如果设置了,后续启动各种模式的时候,端口冲突解决很麻烦。

(2)修改workers文件,指定hadoop001、hadoop002和hadoop003为TaskManager

1
2
3
4
5
[hadoop@hadoop001 conf]$ vim workers
修改如下内容:
hadoop001
hadoop002
hadoop003

(3)修改masters文件

1
2
3
[hadoop@hadoop001 conf]$ vim masters
修改如下内容:
hadoop001:8081

(4)另外,在 flink-conf.yaml 文件中还可以对集群中的 JobManager 和 TaskManager 组件进行优化配置,主要配置项如下:

  • jobmanager.memory.process.size:对 JobManager 进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。

  • taskmanager.memory.process.size:对 TaskManager 进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。

  • taskmanager.numberOfTaskSlots:对每个 TaskManager 能够分配的 Slot 数量进行配置,默认为1,可根据 TaskManager 所在的机器能够提供给 Flink 的CPU数量决定。所谓Slot就是 TaskManager 中具体运行一个任务所分配的计算资源。

  • parallelism.default:Flink 任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。

3)分发安装目录

下面会用到xsync和xcall命令,这是两个脚本,具体的脚本内容可以参照我的另外一篇博客:linux xsync和xcall脚本

(1)配置修改完毕后,将Flink安装目录发给另外两个节点服务器。

1
[hadoop@hadoop001 module]$ xsync flink

(2)修改hadoop002的 taskmanager.host

1
2
3
4
5
 [hadoop@hadoop002 conf]$ vim flink-conf.yaml
修改如下内容:

# TaskManager节点地址.需要配置为当前机器名
taskmanager.host: hadoop002

(3)修改hadoop003的 taskmanager.host

1
2
3
4
5
[hadoop@hadoop003 conf]$ vim flink-conf.yaml
修改如下内容:

# TaskManager节点地址.需要配置为当前机器名
taskmanager.host: hadoop003

4)启动集群

(1)在hadoop001节点服务器上执行start-cluster.sh启动Flink集群:

1
2
3
4
5
6
[hadoop@hadoop001 flink]$ start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host hadoop001.
Starting taskexecutor daemon on host hadoop001.
Starting taskexecutor daemon on host hadoop002.
Starting taskexecutor daemon on host hadoop003.

(2)查看进程情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
[hadoop@hadoop001 flink]$ xcall
=================== hadoop001 =================
761769 HttpFSServerWebServer
767084 JournalNode
765600 NameNode
971011 StandaloneSessionClusterEntrypoint
646372 ResourceManager
762778 DataNode
643578 NodeManager
974621 Jps
20086 QuorumPeerMain
764221 DFSZKFailoverController
995000 RunJar
972247 TaskManagerRunner
994164 RunJar
=================== hadoop002 =================
666986 TaskManagerRunner
378950 HistoryServer
468009 JournalNode
26671 zkui.jar
927743 NodeManager
472403 NameNode
669694 Jps
464721 DataNode
926584 ApplicationHistoryServer
930008 ResourceManager
463472 HttpFSServerWebServer
723154 RunJar
471353 DFSZKFailoverController
26911 QuorumPeerMain
=================== hadoop003 =================
180770 Jps
19443 QuorumPeerMain
285479 JournalNode
693539 NodeManager
283567 DataNode
694455 JobHistoryServer
179341 TaskManagerRunner

其中,StandaloneSessionClusterEntrypoint 为 jobManager 的进程,TaskManagerRunner 为 taskManager 的进程。

5)访问Web UI
启动成功后,同样可以访问http://hadoop001:8081对flink集群和任务进行监控管理。

这里可以明显看到,当前集群的TaskManager数量为3;由于默认每个TaskManager的Slot数量为6,所以总Slot数和可用Slot数都为18。

1.2 单作业模式部署

Flink的Standalone集群并不支持单作业模式部署。因为单作业模式需要借助一些资源管理平台。

1.3 应用模式部署

应用模式下不会提前创建集群,所以不能调用start-cluster.sh脚本。我们可以使用同样在bin目录下的standalone-job.sh来创建一个 JobManager。

具体步骤如下:
(1)进入到Flink的安装路径下,将应用程序的jar包放到lib/目录下。

1
[hadoop@hadoop001 flink]$ cp ./examples/streaming/TopSpeedWindowing.jar lib/

(2)执行以下命令,启动JobManager。

1
./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.TopSpeedWindowing

这里我们直接指定作业入口类,脚本会到lib目录扫描所有的jar包。

特别注意:我再配置过程中设置了并行度parallelism.default: 18,导致资源不够,出现下面错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
023-10-18 16:40:00
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
at org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$assignResource$4(DefaultExecutionDeployer.java:227)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.jobmaster.slotpool.PendingRequest.failRequest(PendingRequest.java:88)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.cancelPendingRequests(DeclarativeSlotPoolBridge.java:185)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.failPendingRequests(DeclarativeSlotPoolBridge.java:408)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(DeclarativeSlotPoolBridge.java:396)
at org.apache.flink.runtime.jobmaster.JobMaster.notifyNotEnoughResourcesAvailable(JobMaster.java:887)
at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$0(AkkaRpcActor.java:301)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:300)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
... 36 more
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

(3)同样是使用bin目录下的脚本,启动TaskManager。

1
[hadoop@hadoop001 flink]$ bin/taskmanager.sh start

(4)在hadoop001:8081地址中观察输出数据:在当前任务的taskmanager日志中观察输出。

(5)如果希望停掉集群,同样可以使用脚本,命令如下。

1
2
[hadoop@hadoop001 flink]$ bin/taskmanager.sh stop
[hadoop@hadoop001 flink]$ bin/standalone-job.sh stop

0x02. YARN运行模式

YARN上部署的过程是:客户端把Flink应用提交给YarnResourceManagerYarnResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManagerTaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。

2.1 相关准备和配置

在将Flink任务部署至YARN集群之前,需要确认集群是否安装有Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有HDFS服务。

具体配置步骤如下:
(1)配置环境变量,增加环境变量配置如下:

1
2
3
4
5
6
$ sudo vim /etc/profile.d/bigdata_env

HADOOP_HOME=/srv/udp/2.0.0.0/yarn
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

在USDP大数据平台中,我们发现这块的内容是配置在 flink/bin/config.sh中的,所以我们无需改动:

1
2
3
4
5
6
7
8
9
10
# 在文件中constructFlinkClassPath方法的前面增加下面内容
# 额外说一句,此处的配置在未来安装dinky的时候会用到
export HADOOP_CONF_DIR=/srv/udp/2.0.0.0/yarn/etc/hadoop
export HADOOP_HOME=/srv/udp/2.0.0.0/yarn

export HBASE_CONF_DIR=/srv/udp/2.0.0.0/hbase/conf
export HBASE_HOME=/srv/udp/2.0.0.0/hbase
export HADOOP_CLASSPATH=`/srv/udp/2.0.0.0/yarn/bin/hadoop classpath`

# 下面是其他原来的配置

(2)启动Hadoop集群,包括HDFS和YARN。

1
2
[hadoop@hadoop001 hadoop-3.3.4]$ start-dfs.sh
[hadoop@hadoop002 hadoop-3.3.4]$ start-yarn.sh

在USDP大数据平台中,直接通过UI操作按钮启动即可。

2.2 会话模式部署

YARN的会话模式与独立集群略有不同,需要首先申请一个YARN会话(YARN Session)来启动Flink集群。具体步骤如下:

1)启动集群

(1)启动Hadoop集群(HDFS、YARN)。
(2)执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群。

1
[hadoop@hadoop001 flink]$ bin/yarn-session.sh -nm test -d

可用参数解读:

  • -d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。

  • -jm(–jobManagerMemory):配置JobManager所需内存,默认单位MB。

  • -nm(–name):配置在YARN UI界面上显示的任务名。

  • -qu(–queue):指定YARN队列名。

  • -tm(–taskManager):配置每个TaskManager所使用内存。

注意:Flink1.11.0版本不再使用-n参数和-s参数分别指定TaskManager数量和slot数量,YARN会按照需求动态分配TaskManager和slot。所以从这个意义上讲,YARN的会话模式也不会把集群资源固定,同样是动态分配的。

YARN Session启动之后会给出一个Web UI地址以及一个YARN application ID,如下所示,用户可以通过Web UI或者命令行两种方式提交作业。

1
2
3
4
5
6
7
8
2023-10-19 09:28:38,015 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2023-10-19 09:28:38,016 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hadoop003:20604 of application 'application_1697079413716_0061'.
JobManager Web Interface: http://hadoop003:20604
2023-10-19 09:28:38,256 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1697079413716_0061
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1697079413716_0061
Note that killing Flink might not clean up all job artifacts and temporary files.

2)提交作业

(1)通过Web UI提交作业

这种方式比较简单,与上文所述Standalone部署模式基本相同。

(2)通过命令行提交作业

① 通过flink提供的测试作业:WordCount.jar的进行测试,如果是个人开发的作业,需要上传到flink集群。

② 执行以下命令将该任务提交到已经开启的Yarn-Session中运行。

1
[hadoop@hadoop001 flink]$ bin/flink run -s test -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar

客户端可以自行确定 JobManager 的地址,也可以通过-m或者-jobmanager参数指定 JobManager 的地址,JobManager 的地址在YARN Session的启动页面中可以找到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2023-10-19 09:32:49,402 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-hadoop.
java.lang.NoSuchMethodError: org.apache.commons.cli.CommandLine.hasOption(Lorg/apache/commons/cli/Option;)Z
at org.apache.flink.client.cli.CliFrontendParser.createSavepointRestoreSettings(CliFrontendParser.java:631)
at org.apache.flink.client.cli.ProgramOptions.<init>(ProgramOptions.java:119)
at org.apache.flink.client.cli.ProgramOptions.create(ProgramOptions.java:192)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:235)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)

当时为了解决问题:Exception in thread "main" java.lang.NoSuchMethodError: org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder;,在flink/lib中加入的是commons-cli-1.4.jar,但在该模式下加上-s参数运行会报如上错误,因此需要替换为commons-cli-1.5.0.jar的包,参考文章:java.lang.NoSuchMethodError:org.apache.commons.cli.CommandLine.hasOption(Lorg/apache/commons/cli/Op

③ 任务提交成功后,可在YARN的Web UI界面查看运行情况。hadoop002:8088

YARN WEB UI

从上图中可以看到我们创建的Yarn-Session实际上是一个Yarn的Application,并且有唯一的Application ID。

④也可以通过Flink的Web UI页面 hadoop003:20604 (端口号在启动yarn-session的控制台查看) 查看提交任务的运行情况,如下图所示。

FLINK YARN SESSION WEB UI
FLINK WORDCOUNT RESULT

2.3 单作业模式部署

在YARN环境中,由于有了外部平台做资源调度,所以我们也可以直接向YARN提交一个单独的作业,从而启动一个Flink集群。
(1)执行命令提交作业。

1
[hadoop@hadoop001 flink]$bin/flink run -d -t yarn-per-job -c org.apache.flink.streaming.examples.windowing.TopSpeedWindowing examples/streaming/TopSpeedWindowing.jar

注意:如果启动过程中报如下异常。

1
2
Exception in thread “Thread-5” java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders

解决办法:在flink的/srv/udp/2.0.0.0/flink/conf/flink-conf.yaml配置文件中设置

1
2
3
[hadoop@hadoop001 conf]$ vim flink-conf.yaml

classloader.check-leaked-classloader: false

yarn-per-job提交批处理作业时,会出现如下错误:

1
2
3
4
5
Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count]. 
org.apache.flink.core.execution.DetachedJobExecutionResult.getAccumulatorResult(DetachedJobExecutionResult.java:56)
org.apache.flink.api.java.DataSet.collect(DataSet.java:419)
org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)

这个错误是因为程序调用了一个会立即执行的函数(如 collectprintprintToErrcount),而在 yarn-per-job 模式下,作业会以分离模式运行,无法获取到作业执行的结果或运行时信息。yarn-per-job 模式是用于运行长时间运行的作业,而不是为了获取立即可用的结果。如果程序需要输出结果或运行时信息,可以尝试使用 Flink 的其他模式,如 yarn-sessionstandalone 模式。

(2)在YARN的ResourceManager界面查看执行情况。

点击可以打开Flink Web UI页面进行监控,如下图所示:

(3)可以使用命令行查看或取消作业,命令如下。

1
2
3
[hadoop@hadoop001 flink]$ bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY

[hadoop@hadoop001 flink]$ bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

这里的application_XXXX_YY是当前应用的ID,<jobId>是作业的ID。注意如果取消作业,整个Flink集群也会停掉。

2.4 应用模式部署

应用模式同样非常简单,与单作业模式类似,直接执行flink run-application命令即可。

https://repo1.maven.org/maven2/org/apache/flink/flink-clients/1.17.1/flink-clients-1.17.1.jar

1)命令行提交

(1)执行命令提交作业。

1
[hadoop@hadoop001 flink]$ bin/flink run-application -t yarn-application - -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar 

由于WordCount.jar 是一个BATCH模式的作业,因此在提交的是会出现一些错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No ExecutorFactory found to execute the application.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)
at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1043)
at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:144)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:73)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 12 more

flink自从1.12开始,默认模式为STREAMING,可以通过-Dexecution.runtime-mode=BATCH修改为BATCH模式,但是修改后依然报错,这个错误通常是由于 Flink 没有找到适合执行应用程序的 ExecutorFactory 导致的。在 Flink 中,ExecutorFactory 用于创建和管理执行应用程序的执行器(例如线程池)。通常情况下,Flink 会根据配置自动选择合适的 ExecutorFactory,但有时需要手动配置。所以,我们需要增加--executor DefaultExecutorServiceLoader参数解决这个问题,至于-Dexecution.runtime-mode=BATCH这个参数,flink会自动判断,验证加不加都可以。

1
[hadoop@hadoop001 flink]$ bin/flink run-application -t yarn-application -Dexecution.runtime-mode=BATCH --executor DefaultExecutorServiceLoader -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar 

在yarn的 Web UI中,进入当前job application,查看运行日志,输出结果如下:

flink yarn-application wordcount result

(2)在命令行中查看或取消作业。

对于批处理作业,执行完成就结束了,对应的进程也就不存在了。

1
2
3
[hadoop@hadoop001 flink]$ bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

[hadoop@hadoop001 flink]$ bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

2)上传HDFS提交

可以通过yarn.provided.lib.dirs配置选项指定位置,将flink的依赖上传到远程。
(1)上传flink的lib和plugins到HDFS上

1
2
3
[hadoop@hadoop001 flink]$ hdfs dfs -mkdir /flink/1.17/dist
[hadoop@hadoop001 flink]$ hadoop fs -put lib/ /flink/1.17/dist
[hadoop@hadoop001 flink]$ hadoop fs -put plugins/ /flink/1.17/dist

(2)上传自己的jar包到HDFS

1
2
3
[hadoop@hadoop001 flink]$ hadoop fs -mkdir /flink/1.17/jars
[hadoop@hadoop001 flink]$ cd examples/batch/
[hadoop@hadoop001 flink]$ hadoop fs -put WordCount.jar /flink/1.17/jars

(3)提交作业

1
2
[hadoop@hadoop001 flink]$ bin/flink run-application -t yarn-application Dexecution.runtime-mode=BATCH --executor DefaultExecutorServiceLoader -Dyarn.provided.lib.dirs="hdfs://ahpthd/flink/1.17/dist" -c org.apache.flink.examples.java.wordcount.WordCount  hdfs://ahpthd/flink/1.17/jar/WordCount.jar
# 此处hadoop使用了HA,ahpthd是hdfs的servicename,也可以是hadoop001:8020,具体需要参考 dfs.namenode.rpc-address.ahpthd.*的配置值,当然需要选择nn1和nn2中active的那一个。

这种方式下,flink本身的依赖和用户jar可以预先上传到HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。

0x03. 历史服务器

运行 Flink job 的集群一旦停止,只能去 yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位问题了,所以如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些问题。

Flink提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我们都知道只有当作业处于运行中的状态,才能够查看到相关的WebUI统计信息。通过 History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。

此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。

1)创建存储目录

1
[hadoop@hadoop001 flink]$ hdfs dfs -mkdir -p /ahpthd/flink/completed-jobs
1
2
3
4
5
6
jobmanager.archive.fs.dir: hdfs://ahpthd/ahpthd/flink/completed-jobs/
historyserver.web.address: hadoop002
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://ahpthd/ahpthd/flink/completed-jobs/
historyserver.archive.fs.refresh-interval: 5000
historyserver.web.tmpdir: /data/udp/2.0.0.0/flink

3)启动历史服务器

1
[hadoop@hadoop001 flink]$ bin/historyserver.sh start

4)停止历史服务器

1
[hadoop@hadoop001 flink]$ bin/historyserver.sh stop

5)在浏览器地址栏输入:http://hadoop001:8082 查看已经停止的 job 的统计信息

0x04. flink-config.yaml完整的配置

以hadoop001的配置为例,下面是一份完整的flink的配置,从配置中可以看到没有zk的配置,并不支持standalone HA运行模式。此外,flink1.17中flink-shaded-zookeeper的jar包也不存在了,需要需要支持standalone HA还需要下载flink-shaded-zookeeper对应的jar包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#==============================================================================
# Common
#==============================================================================
jobmanager.rpc.address: hadoop001
jobmanager.rpc.port: 6123
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600m
taskmanager.bind-host: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名, hadoopN节点这里为hadoopN N=1,2,3
taskmanager.host: hadoop001
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 6
parallelism.default: 18

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
execution.checkpointing.interval: 10s
state.backend.type: rocksdb
state.checkpoints.dir: hdfs://ahpthd/ahpthd/flink/ckps
state.savepoints.dir: hdfs://ahpthd/ahpthd/flink/svps
state.backend.incremental: true
jobmanager.execution.failover-strategy: region

#==============================================================================
# Rest & web frontend
#==============================================================================
#rest.port: 8081
#rest.address: hadoop001
#rest.bind-port: 8080-8090
rest.bind-address: 0.0.0.0
#web.submit.enable: false
web.cancel.enable: true

#==============================================================================
# Advanced
#==============================================================================

# io.tmp.dirs: /tmp
classloader.resolve-order: parent-first

#==============================================================================
# HistoryServer
#==============================================================================
jobmanager.archive.fs.dir: hdfs://ahpthd/ahpthd/flink/completed-jobs/
historyserver.web.address: hadoop002
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://ahpthd/ahpthd/flink/completed-jobs/
historyserver.archive.fs.refresh-interval: 5000
historyserver.web.tmpdir: /data/udp/2.0.0.0/flink

# 避免客户端报告警信息
classloader.check-leaked-classloader: false
env.java.opts.all: -Dlog4j2.formatMsgNoLookups=true -Dfile.encoding=UTF-8


最后说明:本文是基于尚硅谷的flink安装文档,根据自身安装情况做了大量修改。感谢尚硅谷!!!