Spark内存管理

Spark内存管理

Spark作为基于内存的分布式计算引擎,理解Spark内存管理方式对于程序调优、Spark运行机制有着不错的效果。本文旨在介绍Spark内存管理的脉络,总结与沉淀。

在执行Spark程序时(yarn-cluster模式),会启动两种类型的 JVM : Driver、Work。前者为控制进程,负责Job提交、根据RDD图划分Stage、生成Task,并负责相关的任务调度;后者执行Task、反馈结果给Driver。由于 Driver 的内存管理相对来说较为简单,本文主要对 Executor 的内存管理进行分析,下文中的 Spark 内存均特指 Executor 的内存。

本文主要内容为:

  • 介绍Spark内存分类、内存额度计算方式
  • Executor内存管理方法
  • Spark如何使用内存

内存分类

按存储对象分类,Spark将Executor内存分成三种类型:

  • 执行时内存(execution memory),task执行特定操作时使用的内存,数据正在被使用,如:shuffle、sort、join、agg等
  • 存储内存(storage memory),常用于存储广播变量(Broadcast)、RDD缓存等,通常这类数据会被多读很少更新
  • 其它,用于存储Spark内部代码的对象,用户代码的class对象等

内存大小计算方式:

  • JVM 系统总内存:Runtime.getRuntime.maxMemory (-Xmx)
  • 其它内存最低值:固定为300MB
  • 存储内存:(JVM 系统总内存 - 300) * spark.memory.fraction * spark.memory.storageFraction
  • 执行时内存:(JVM 系统总内存 - 300) * spark.memory.fraction * (1 - spark.memory.storageFraction)
  • 其它:(JVM 系统总内存 - 执行时内存 - 存储内存)

而按照execution memory申请内存方式的不同,又可将其使用的内存分为:堆内存(on-heap)、堆外内存(off-heap),前者使用JVM的堆内存,后者使用JVM之外的内存,不过2者共用同一个内存配额。

Executor 内存管理

Spark提供了2种Executor Memory Manager: StaticMemoryManagerUnifiedMemoryManager它们均是管理整个Executor的内存(即整个JVM) 。逻辑管理,虽然内含MemoryAllocator,但并不会进行实际上的内存申请与回收。

前者(StaticMemoryManager)会固定execution memory 和 storage memory 的内存最大值,运行期间不能超过该阈值;后者可以允许在一定条件下,使用的内存超过该阈值,比如execution memory额度用完后,可以使用storage free memory(即: storage未用完的内存额度)。

Spark自1.6开始,就默认采用UnifiedMemoryManager管理Executor内存。UnifiedMemoryManager借助内存池(Memory Pool)记录每个taskAttempt的内存使用情况,处理taskAttempt的申请、释放内存的请求。为了避免单个taskAttempt占用大量内存,设定了内存限制:每个task内存使用率最多1/N,最少1/2N(N为task个数)。

内存池被分成三类:storageMemoryPool、onHeapExecutionMemoryPool、offHeapExecutionMemoryPool。

1
2
3
4
5
6
@GuardedBy("this")
protected val storageMemoryPool = new StorageMemoryPool(this)
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "on-heap execution")
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "off-heap execution")

其申请资源的逻辑分别为:

申请storage memory:
  • 若storageMemoryPool 内存足够,直接分配,并同时更新内存池的已分配内存数、剩余内存数

    Note: 对内存池的操作都是 lock.synchronized

  • 若storageMemoryPool 内存不够

  • 则占用onHeapExecutionMemoryPool.memoryFree;

  • 若依旧不够,则释放一些指定的Block(由调用者指定BlockId)

  • 若还是不够,则申请失败,返回false

申请onHeap Execution Memory(基于active task attempt id):
  • 若内存池里memoryFree足够,直接分配
  • 若不够了,则向storageMemoryPool 借用memoryFree
  • 还是不够,则回收storageMemoryPool占用onHeapExecutionMemoryPool的内存
  • 若内存够了,直接返回
  • 若不够且task占用的内存低于最低配额(1/2N),则wait;否则直接返回申请到的额度
申请offHeap Execution Memory(基于active task attempt id):
  • 内存够,直接分配
  • 内存不够,返回已申请到的额度,or wait

Task 内存管理

TaskMemoryManagerMemoryConsumer的方式管理当前单个Task Attempt的内存分配与回收(spill),以pageTable的方式(即: MemoryBlock数组)记录分配的内存地址。

MemoryConsumer 顾名思义,为内存消费者(子类有ShuffleExternalSorterUnsafeExternalSorter等),会向TaskMemoryManager 申请内存、释放内存,内存不够时还能写磁盘。

通常,MemoryConsumer 每次申请,都会申请memoryPage样式的一段连续内存,即由MemoryBlock记录了内存的起始地址与长度。

Note: 若为offHeap Memory,起始地址 加 长度就能定位到分配的内存;

​ 若为onHeap Memory,由于GC后起始地址会改变,故用一object 加 偏移量 标注起始地址

MemoryConsumer申请内存处理逻辑:

  • TaskMemoryManager 向Executor内存管理(UnifiedMemoryManager) 申请内存

  • 若内存不够,TaskMemoryManager循环释放其余MemoryConsumer占用的内存,即spill磁盘

  • 若依旧不够,该MemoryConsumer写磁盘

  • 获取申请到的内存数额,生成下一个page编号

  • 调用MemoryAllocator,实际执行内存分配,并将page记录到pageTable

    • 若为onHeapMemroy,有个内存缓存池,对于1MB及以上的请求,先在池子里查看有无可用内存,有则直接使用池子里的内存(释放内存时,直接将内存丢池子里,池子里的内存为弱引用,Full GC时可回收);若池子里没有,则new long[size],申请一个大数组。
    • 若为offHeapMemory,直接Platform.allocateMemory(size),无需考虑JVM GC导致内存挪动的问题
  • 若page.size,即申请到的内存小于所需内存,则释放该page,抛出OutOfMemoryError

    这个是比较奇怪的,为什么不在实际申请资源前进行判断那? 若申请到的内存数额低于预期,直接就抛异常。 是为了子类重载?

结束语

Spark内存管理模块具有很强的层次性:

  • Executor Memory Manager:管理整个JVM的storage\execution内存
  • Task Memory Manager:管理单个taskAttempt的内存,向executor申请内存
  • Memory Consumer:内存实际消费者,其子类与具体处理逻辑紧密相连
参考资料