Hadoop及Flink内存调优

前言

Yarn由ResourceManager、NodeManager、ApplicationMaster和Container四个概念构成。

在 Hadoop 中,RM、NM 和 AM 分别指 ResourceManager、NodeManager 和 ApplicationMaster。

  1. ResourceManager(RM):RM 是 YARN 的中心服务,负责管理和调度集群资源。它接收客户端提交的应用程序,为每个应用程序分配一个 ApplicationMaster,并将资源分配给正在运行的应用程序。RM 还负责监视节点的状态,并协调 NodeManager 以执行任务。
  2. NodeManager(NM):NM 是在集群中运行的每个节点上运行的 YARN 服务。它负责管理该节点上的资源,并与 ResourceManager 交互以获取任务。NM 还负责启动和停止容器以运行任务,并在任务完成后释放容器和资源。
  3. ApplicationMaster(AM):AM 是运行在集群上的每个应用程序的管理器,它负责与 ResourceManager 交互以获取资源并协调任务的执行。AM 还负责监控任务的进度和状态,并与 NodeManager 通信以启动和停止任务。

在 Hadoop 中,RM、NM 和 AM 是三个关键组件,它们共同协作以管理和调度集群资源,并执行应用程序中的任务。

内存优化建议

在 Hadoop 中,内存分配是非常重要的,因为它会直接影响到集群的性能和稳定性。

当大数据集群环境内存不足时会产生如下错误

Container [pid=24156,containerID=container_1427332071311_0019_01_000002] is running beyond physical memory limits. Current usage: 2.1 GB of 2 GB physical memory used; 2.7 GB of 4.2 GB virtual memory used. Killing container.

以下是一些 Hadoop 内存配置的建议:

Java Heap

hadoop-env.sh 文件中配置 Java Heap 大小。

可以使用以下参数配置 Java Heap 大小:

1
export HADOOP_HEAPSIZE=XXXX

其中 XXXX 为内存大小,以 MB 为单位。建议将其设置为集群可用内存的 80%。

HADOOP_HEAPSIZE是用于设置单个Hadoop节点启动时的JVM最大堆内存大小的环境变量,而不是集群整体的可用内存。每个Hadoop节点都会独立地启动JVM,因此HADOOP_HEAPSIZE环境变量只会影响当前节点的JVM堆内存大小。

集群中每个节点的可用内存可能是不同的,因此可以为每个节点设置不同的HADOOP_HEAPSIZE值,以便充分利用节点的资源。一般来说,HADOOP_HEAPSIZE的值应该根据节点的可用物理内存和节点上运行的任务的内存需求来设置,以确保在不浪费资源的情况下充分利用节点的资源。

需要注意的是,Hadoop还有其他的内存限制参数,如YARN的容器内存限制和MapReduce的Mapper和Reducer的内存限制等,这些参数也需要根据任务的需求来设置,以避免内存问题对任务的影响。

HADOOP_HEAPSIZE是Hadoop启动时用于指定Java虚拟机(JVM)的最大堆内存大小的环境变量。JVM是Java应用程序的运行环境,堆内存是JVM用于存储对象的区域。在Hadoop中,许多组件(如NameNode,DataNode,ResourceManager等)都是用Java编写的,因此需要JVM来运行。

通过设置HADOOP_HEAPSIZE环境变量,可以控制Hadoop组件启动时JVM使用的最大堆内存大小。这对于控制Hadoop组件的内存使用非常重要,因为如果JVM使用的内存超过了可用的物理内存,可能会导致系统变慢或崩溃。

通常情况下,HADOOP_HEAPSIZE的值应该设置为集群可用物理内存的一小部分。如果将HADOOP_HEAPSIZE设置为太大的值,可能会导致Hadoop组件使用过多的内存,从而导致系统变慢或崩溃。相反,如果将HADOOP_HEAPSIZE设置为太小的值,可能会导致Hadoop组件无法处理大量的数据,从而导致任务失败。因此,建议根据集群的物理内存大小和任务的性质来设置HADOOP_HEAPSIZE的值。

注意

下文中所有的集群可用内存说的都是该节点的可用内存

MapReduce 任务的内存

mapred-site.xml 文件中配置 MapReduce 任务的内存。

可以使用以下参数配置内存大小:

1
2
mapreduce.map.memory.mb=YYYY
mapreduce.reduce.memory.mb=ZZZZ

其中 YYYYZZZZ 分别为 map 和 reduce 任务的内存大小,以 MB 为单位。建议将其设置为集群可用内存的 80%。

指定 map 和 reduce 任务的内存大小,该值应该在 RM 的最大最小 container 之间。

如果不设置,则默认用以下规则进行计算:max{MIN_Container_Size,(Total Available RAM/containers)}

YARN 的内存

yarn-site.xml 文件中配置 YARN 的内存。

可以使用以下参数配置内存大小:

1
2
3
yarn.nodemanager.resource.memory-mb=AAAA
yarn.scheduler.maximum-allocation-mb=BBBB
yarn.scheduler.minimum-allocation-mb=CCCC

其中
yarn.nodemanager.resource.memory-mb 为每个节点可用的总内存大小,以 MB 为单位。

yarn.nodemanager.vmem-pmem-ratio表示虚拟内存率,即占 task 所用内存的百分比,默认为 2.1。

yarn.scheduler.minimum-allocation-mbyarn.scheduler.maximum-allocation-mb表示单个容器可以申请的最小与最大内存,以 MB 为单位。

建议将 BBBBCCCC 设置为相同的值,并将它们设置为 AAAA 的 80%。

请注意,这些值需要根据集群的硬件和工作负载进行调整。建议在测试阶段对内存进行调整,并监控集群的性能和稳定性,以确定最佳配置。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>

<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>1024</value>
</property>

<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>2048</value>
</property>

默认Yarn的调度最小是1GB,最大是2G。

我们可以通过以下地址查看所有配置

http://hadoop02:8088/conf

内存检查

1
2
3
4
5
6
7
8
9
10
11
<!--pmem指的是默认检查物理内存,容器使用的物理内存不能超过我们限定的内存大小,因为我们上面设置了所有容器能够使用的最大内存数量,超出这个内存限制,任务就会被kill掉-->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>

<!--vmem指的是默认检查虚拟内存,容器使用的虚拟内存不能超过我们设置的虚拟内存大小-->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

AM的内存

capacity-scheduler.xml

1
2
3
4
5
6
<property>
<name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
<value>0.8</value>
<description>
</description>
</property>

群集中可用于运行应用程序的最大资源百分比,即控制并发运行的应用数量。

参数含义就是所有AM占用的总内存数要小于Yarn所管理总内存的一定比例,默认是0.1。
也即是yarn所能同时运行的任务数受限于该参数和单个AM的内存。

假如我们总共是6G的资源,默认的0.1,我们只能使用0.6G的资源,但是启动一个应用就使用了1G,这时候所有其他应用都会处于挂起等待资源分配的状态。

所以将AM总内存占比提高,比如0.8,即可。

Flink配置

Flink是一个基于流式数据处理的分布式计算框架。

在Flink中,一个Job通常会被分成多个任务(Task),每个任务会被分配到一个或多个Slot中执行。

在Flink中,每个TaskManager(即运行Flink任务的节点)可以配置多个Task Slot(即执行任务的单元)。

Slot的数量取决于TaskManager的可用资源和任务的需求。

在Flink的默认配置下,每个TaskManager会创建一个Slot,但是这个数量可以通过配置文件或者命令行参数进行修改。

以下是一些常用的Flink Slot配置:

  • taskmanager.numberOfTaskSlots:配置每个TaskManager的Slot数量。默认值为1。

    可以通过修改flink-conf.yaml文件或者在命令行中使用-Dtaskmanager.numberOfTaskSlots参数来设置。

  • taskmanager.memory.task.off-heap.size:配置每个Slot的堆外内存大小。默认值为0,表示使用堆内存。

    可以通过修改flink-conf.yaml文件或者在命令行中使用-Dtaskmanager.memory.task.off-heap.size参数来设置。

  • taskmanager.memory.process.size:配置每个TaskManager的JVM堆大小。默认值为1024m。

    可以通过修改flink-conf.yaml文件或者在命令行中使用-Dtaskmanager.memory.process.size参数来设置。

  • taskmanager.cpu.cores:配置每个TaskManager可用的CPU核心数量。默认值为1。

    可以通过修改flink-conf.yaml文件或者在命令行中使用-Dtaskmanager.cpu.cores参数来设置。

需要注意的是,Slot的数量和资源分配需要根据具体的应用场景进行优化,以获得更好的性能和资源利用率。

Flink Session同时只能执行一个任务吗?

不一定。Flink在运行时通过资源管理器来分配和管理资源,因此可以同时运行多个任务,即使它们是在同一个Flink Session中提交的。

在Flink中,一个Session可以包含多个作业(Job),每个作业可以包含多个任务(Task)。资源管理器可以在不同的TaskManager节点上分配不同的Slot来执行这些任务。因此,即使多个作业在同一个Session中提交,它们也可以在不同的Slot中并行执行。

需要注意的是,如果多个作业需要共享同一个资源池,可能会存在资源抢占的情况。在这种情况下,资源管理器需要根据各个任务的需求来分配资源,并尽可能避免资源的浪费和抢占。为了最大化资源利用率和性能,可以通过调整资源管理器的配置参数和任务的调度策略来优化资源分配和调度。

Flink Session模式配置优化

Flink Session模式是Flink集群中最常用的一种部署模式。在Session模式下,Flink JobManager和TaskManager运行在独立的进程中,它们之间通过Akka进行通信。以下是一些Session模式的配置优化建议:

调整内存分配

在Session模式下,Flink JobManager和TaskManager的内存分配是非常重要的。

一般来说,JobManager的内存分配应该足够大,以便能够处理大量的并发任务。

TaskManager的内存分配应该根据具体的任务和部署环境进行调整,以避免内存不足或者浪费的情况。

可以通过设置以下参数来调整内存分配:

  • jobmanager.memory.process.size:JobManager进程的内存大小。
  • taskmanager.memory.process.size:TaskManager进程的内存大小。
  • taskmanager.memory.flink.size:TaskManager分配给Flink的内存大小。

调整并发度

并发度是指每个任务执行的并发线程数。调整并发度可以提高作业的并发性和吞吐量,但同时也会增加资源占用和负载。因此,在Session模式下,应该根据实际情况调整并发度。

可以通过设置以下参数来调整并发度:

  • parallelism.default:默认并发度。
  • taskmanager.numberOfTaskSlots:每个TaskManager可以执行的并发任务数。

调整网络参数

Session模式下,JobManager和TaskManager之间通过Akka进行通信。因此,网络参数的设置对于作业的性能和稳定性非常重要。

可以通过设置以下参数来调整网络参数:

  • akka.framesize:Akka消息的帧大小。
  • akka.flight-recorder.buffer-size:Akka事件记录器的缓冲区大小。
  • akka.flight-recorder.enabled:是否启用Akka事件记录器。

配置高可用性

在Session模式下,为了保证集群的高可用性,需要启用Flink的高可用性机制。可以通过配置以下参数来实现高可用性:

  • high-availability:指定高可用性的实现方式,可以选择ZooKeeper、HDFS、S3等。
  • high-availability.zookeeper.quorum:指定ZooKeeper的地址。
  • high-availability.storageDir:指定高可用性存储的位置。

总的来说,在Session模式下,通过合理的配置可以提高作业的性能和稳定性,为实现高效的大数据计算提供保障。