Friday 18 December 2015

Spark's new Memory Manager, Unified Memory Manager

Starting from v1.6+ release, Spark will move on to the latest Memory Manager implementation, Unified Memory Manager. Over the current implementation, It aims to increase memory efficiency for running programs and lot less configuration parameters passed by user.

Starting from basics, Memory Manager in Spark are responsible for distributing Executor's Heap Memory between storage and shuffle fraction.

1) Storage Memory Fraction: This fraction hold the partitions of the RDDs being processed. It acts as an in-memory LRU cache for the data. It means data doesn't remain in-memory for long and in case of storage fraction being full for incoming data, the oldest data is dropped.
There are multiple usecases of RDDs that are stored in Storage memory:
1. Input RDD: It is the input of the program which is lazily processed on the execution of Action function. The partitions are only loaded till the scope of execution of first function.
2. Intermediate RDD: They are created as output of one function which is consumed by next function in DAG. The partitions are only loaded till the scope of execution of next function.
3. Output RDD: They are the output of the last function in the DAG. They are available till the program scope.
4. Persisted RDD: In case if any of the intermediate RDD have to be used again later, it is recommended to persist it in memory, else the DAG is executed again to calculate them.
It can be persisted using persist() function of Spark API.
Following are the levels of the RDD persistence (sourced from spark's programming guide):
Storage LevelMeaning
MEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SERStore RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SERSimilar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLYStore the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. 
If your RDD is small and has low number of steps of calculation, MEMORY_ONLY is recommended.
If your RDD is small and has large number of steps of calculation, MEMORY_AND_DISK is recommended.
If your RDD is large and has low number of steps of calculation, MEMORY_ONLY_SER is recommended.
If your RDD is large and has large number of steps of calculation, MEMORY_AND_DISK_SER is recommended.
If you have multiple RDDs where each is large in size, has large number of steps of execution and want to share it within multiple spark applications, using TACHYON is recommended.
Storage Memory Distribution among Tasks: All the storage memory division logic among tasks is logically implemented, not physically enforced in JVM. So, each task is allocated minimum 1/2n fraction of storage memory with maximum upto 1/n fraction of storage memory. But it can still load the data physically in excess to allocation. That’s why, optimized execution of spark jobs depends significantly on synchronization between value of "spark.executor.memory" MB and number of parallel tasks (spark.executor.cores/spark.task.cpus) running on each executor. 
Thus, If value of "spark.executor.memory" is low, number of parallel tasks should be set to a higher value.

2) Shuffle Memory Fraction: When a dataset is aggregated/reduced by a key, all of its RDDs are shuffled to create a sorted dataset. This sorting needs some memory/buffer to keep sorted chunks of data. The amount of memory used depends on algorithm being used. This memory buffer used during sorting in shuffle phase is called shuffle memory fraction.

Executor JVM Heap Initialization
When Spark application is submitted in Spark-on-Yarn mode, the amount of memory to be used for each of the executors (–executor-memory flag or spark.executor.memory  parameter) is specified. Also the amount of memory to be used by the driver application (–driver-memory flag or spark.driver.memory parameter) is specified.

When executing, there will be spark.executor.instances  number of spark executors, each running as Java process in an isolated Yarn Container. Each Spark executor’s Java process launches a JVM of spark.executor.memory  MB. But Yarn container for spark executor occupies higher memory than spark.executor.memory  MB by max(384 MB, 0.10* spark.executor.memory). 
This difference is memory-overhead of launching yarn container as yarn needs some memory for internal execution and maintaining state. Not adding overhead memory will result in container not getting launched as YARN strictly follows the policy that if request memory is less than memory available in container, request fails.


Current Memory Manager Implementation- Static Memory Manager:
This is the only Memory Manager supported currently. In this implementation,
1) the ratio of the two fractions, storage and shuffle is statically defined by setting the parameter "spark.storage.memoryFraction". Because of statically defined boundaries, each fraction can’t use other fraction’s space even when it is idle. Overall, it results in heavy under-utilization of heap space.
2) To optimize the utilization of heap space, the end-user has to estimate storage memory and shuffle memory requirement for that program and then, have to set memory manager configuration parameters like "spark.storage.memoryFraction","spark.shuffle.memoryFraction", etc. This activity has to be repeated for each application and each execution environment. As it is more hit and trial way of optimizing the application, it can be frustrating for developers.

For more exhaustive read on static memory manager, refer http://0x0fff.com/spark-architecture/

New Memory Manager Implementation- Unified Memory Manager
This implementation aims to mitigate the above two disadvantages of static memory manager: under-utilization of java heap memory and manual intervention in optimizing usage of java heap memory.



It enforces a soft boundary between shuffle and storage memory such that either side can borrow memory from the other.
The region shared between shuffle and storage is a fraction of the total heap space, configurable through `spark.memory.fraction` (default 0.75). The position of the boundary within this space is further determined by `spark.memory.storageFraction` (default 0.5).
This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default. So, if heap size is 512 MB, the storage memory size will be 192 MB. But this is the minimum memory which can be used for storage purpose, as more memory can be borrowed from shuffle as per availability.
Storage can borrow as much shuffle/execution memory as is free until shuffle reclaims its space. When this happens, cached blocks will be evicted from memory until sufficient borrowed memory is released to satisfy the shuffle memory request.
Similarly, shuffle can borrow as much storage memory as is free. However, shuffle memory is never evicted by storage due to the complexities involved in implementing this. The implication is that attempts to cache blocks may fail if shuffle has already eaten up most of the storage space, in which case the new blocks will be evicted immediately according to their respective storage levels.

Unified Memory Manager further helps in reducing JVM heap space issues as heap space is not statically divided. As boundary is fluid, execution and storage can utilize each other’s idle space and other can reclaim it when needed. This results in optimum utilization of JVM heap space.

7 comments:

  1. Each Spark executor’s Java process launches a JVM of spark.executor.memory MB. But Yarn container for spark executor occupies higher memory than spark.executor.memory MB by min(384 MB, 0.10* spark.executor.memory).

    it must be max(384 MB, 0.10* spark.executor.memory).

    ReplyDelete

  2. I like your post very much. It is very much useful for my research. I hope you to share more info about this. Keep posting Spark Online Training Hyderabad

    ReplyDelete
  3. This comment has been removed by the author.

    ReplyDelete
  4. Your good knowledge and kindness in playing with all the pieces were
    very useful. I don’t know what I would have done if I had not
    encountered such a step like this.
    mysql dba online training in Chennai
    Unix classes in Chennai
    Best IT training institute in Chennai

    ReplyDelete