Wednesday 21 October 2015

Performance Monitoring, Testing and Optimizing Hadoop-MapReduce Job using Hadoop Counters

If you run a Hadoop MapReduce Job and check its status page on job tracker for its job id, you will get performance counters like the below.

File System Counters
                FILE: Number of bytes read=4386096368
                FILE: Number of bytes written=8805370803
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=54583718086
                HDFS: Number of bytes written=4382090874
                HDFS: Number of read operations=1479
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=369
                Launched reduce tasks=1
                Data-local map tasks=369
                Total time spent by all maps in occupied slots (ms)=34288552
                Total time spent by all reduces in occupied slots (ms)=232084
                Total time spent by all map tasks (ms)=8572138
                Total time spent by all reduce tasks (ms)=58021
                Total vcore-seconds taken by all map tasks=8572138
                Total vcore-seconds taken by all reduce tasks=58021
                Total megabyte-seconds taken by all map tasks=35111477248
                Total megabyte-seconds taken by all reduce tasks=237654016
        Map-Reduce Framework
                Map input records=14753874
                Map output records=666776
                Map output bytes=4383426830
                Map output materialized bytes=4386098552
                Input split bytes=47970
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=4386098552
                Reduce input records=666776
                Reduce output records=666776
                Spilled Records=1333552
                Shuffled Maps =369
                Failed Shuffles=0
                Merged Map outputs=369
                GC time elapsed (ms)=1121584
                CPU time spent (ms)=23707900
                Physical memory (bytes) snapshot=152915259392
                Virtual memory (bytes) snapshot=2370755190784
                Total committed heap usage (bytes)=126644912128
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=49449743227
        File Output Format Counters
                Bytes Written=4382090874
Counters are used to determine if and how often a particular event occurred during a job execution.
A task’s counters are sent in full every time, rather than sending the counts since the last transmission, since this guards against errors due to lost messages. Furthermore, during a job run, counters may go down if a task fails. Counter values are definitive only once a job has successfully completed.

Performance Monitoring: Tracking the counter values provides vital stats about the mapreduce job. In addition to stats on mapreduce job and its tasks, it also provide stats on operation done on underlying filesystems. 

Performance Testing: Knowing the expected values of counters provide first line of performance validation for mapreduce jobs. It helps in ascertaining whether mapreduce job is behaving as expected by the ideal mapreduce paradigm. For example, the simple check that sum of number of records produced by mapper should be equal to total number of records consumed by producer.

Performance Optimization: This forms the second line of performance validation for mapreduce jobs. Counter values gave hints on what can be done to optimize that mapreduce job further.

To describe in detail how the above objectives of Performance Monitoring, Testing and Optimization can be achieved using Counters, I have listed in tabular format all mapreduce counters, their description, ideal expected values and how to optimize hints.
There are 2 categories of counters in Hadoop: In-built(file system, job, framework) and custom.

In-built Counters

Source Code hadoop-mapreduce-client-core/2.6.0/org/apache/hadoop/mapreduce



Counter  
Brief Description
Detailed Description
Expected value for ideally optimized MapReduce Performance
How to Optimize in case of not expected value/ Bad Performance
           Counter Group: FileSystem Counters
           (Source FileName in Hadoop Code = FileSystemCounter.properties)
           (CounterGroupName = File System Counters)
FOR each filesystem(HDFS, MAPRFS, S3):




·    <FS>_BYTES_READ
Number of bytes read from filesystem, <FS>
The total number of bytes read from a specific filesystem.
No ideal expected value as it depends on volume of data being read.
It could be reduced by using efficient serialization techniques (like Avro, Parquet, etc) and compression algorithms (like LZO, Snappy, bzip2 etc).  
Choices of algorithms purely depend on use case. For example, Parquet is useful only if read pattern for data is limited to some columns.
Also, Tradeoff of time taken to serialize-deserialize and compress-decompress should be taken into consideration.
·    <FS>_BYTES_WRITTEN
Number of bytes written to filesystem, <FS>
The total number of bytes written into a specific filesystem.
No ideal expected value as it depends on volume of data being generated by mapreduce algorithm.
Example, FOR MAPRFS:




·    MAPRFS_BYTES_READ
total number of bytes read from mapr file system.
Off the shelf counter for HDFS_BYTES_READ.


·    MAPRFS_BYTES_WRITTEN
total number of bytes written to mapr file system.
Off the shelf counter for HDFS_BYTES_WRITTEN.


        FILE_BYTES_READ
total number of bytes read from local file system
This counter is incremented for each byte read from the local file system.
These writes occur:
1) during the shuffle phase, when the map phase intermediate data is read by HTTP worker threads(“tasktracker.http.threads”) into reducer tasks.
2) during the reduce phase, when reducers read grouped and sorted data from local filesystem.
No ideal expected value as it depends on shuffle phase and reducer logic.
It could be reduced by using efficient compression algorithms (like LZO, Snappy, bzip2 etc). Also, Tradeoff of time taken to compress-decompress should be taken into consideration

        FILE_BYTES_WRITTEN
total number of bytes written to local file system
This counter is incremented for each byte written to the local file system.
These writes occur:
1) during the map phase when the mappers write their intermediate results to the local file system.
2) during the shuffle phase when the reducers spill intermediate results to their local disks while sorting.
No ideal expected value as it depends on mapper logic and shuffle phase.
It could be reduced by using efficient compression algorithms (like LZO, Snappy, bzip2 etc). Also, Tradeoff of time taken to compress-decompress should be taken into consideration

READ_OPS
Number of read operations
It is accumulated at the client per file system. It is number of read operations such as listStatus, getFileBlockLocations, open etc.
This is useful in the interim to identify jobs that heavily load HDFS.

LARGE_READ_OPS
Number of large read operations
It is accumulated at the client per file system. On file system, most of the operations are small except listFiles for a large directory. Iterative listFiles was introduced in HDFS to break down a single large operation into smaller steps. This counter is incremented for every iteration of listFiles, when listing files under a large directory.
This is useful in the interim to identify jobs that heavily load HDFS.

WRITE_OPS
Number of write operations
It is accumulated at the client per file system. It is number of write operations such as create, append, setPermission etc.
This is useful in the interim to identify jobs that heavily load HDFS.






Counter Group: Job Counters
(Source FileName in Hadoop Code = JobCounter.properties)
(CounterGroupName= Job Counters)
SLOTS_MILLIS_MAPS
Total time spent by all maps in occupied slots (ms)
This value indicates wall clock time for the map tasks. It equals to FALLOW_SLOTS_MILLIS_MAPS + MILLIS_MAPS
Optimally, should be equal to MILLIS_MAPS
Reduce time spent in FALLOW_SLOTS_MILLIS_MAPS as much as possible
FALLOW_SLOTS_MILLIS_MAPS
Total time spent by all maps waiting after reserving slots (ms)
Indicates how much time map tasks wait in the queue after the slots are reserved but before the map tasks execute. Slot reservation is a capacity scheduler feature for memory-intensive jobs.

Not used by YARN-based mapreduce.
Optimally, should be zero
A high number indicates a possible mismatch between the number of slots configured for a task tracker and how many resources are actually available.

The way to reduce it can be one of the following:
1) Increase the number of nodes in cluster as it will distribute the data and computing capacity further. Overall, load on a particular node will decrease.
2) Reduce the mapper-task capacity of task-tracker nodes with low level of resources.
Reduce the value of  “mapred.tasktracker.map.tasks.maximum” on that task-tracker’s mapred-site.xml 
MILLIS_MAPS
Total time spent by all map tasks (ms)
The total time taken in running all the map tasks in ms, including speculative tasks
No ideal optimal value as it depends on mapper logic.
Optimize the mapper code as much as possible by profiling it for memory leaks and following best coding standards.
SLOTS_MILLIS_REDUCES
Total time spent by all reduces in occupied slots (ms)
This value indicates wall clock time for the reduce tasks. It equals to FALLOW_SLOTS_MILLIS_REDUCES + MILLIS_REDUCES
Optimally, should be equal to MILLIS_REDUCES
Reduce time spent in FALLOW_SLOTS_MILLIS_REDUCES as much as possible
FALLOW_SLOTS_MILLIS_REDUCES
Total time spent by all reduces waiting after reserving slots (ms)
indicates how much time map tasks wait in the queue after the slots are reserved but before the reduce tasks execute. Slot reservation is a capacity scheduler feature for memory-intensive jobs.

Not used by YARN-based mapreduce.
Optimally, should be zero
A high number indicates a possible mismatch between the number of slots configured for a task tracker and how many resources are actually available.

The way to reduce it can be one of the following:
1) Increase the number of nodes in cluster as it will distribute the data and computing capacity further. Overall, load on a particular node will decrease.
2) Reduce the reducer-task capacity of task-tracker nodes with low level of resources.
Reduce the value of  “mapred.tasktracker.reduce.tasks.maximum” on that task-tracker’s mapred-site.xml 
MILLIS_REDUCES
Total time spent by all reduce tasks (ms)
The total time taken in running all the reduce tasks in ms, including speculative tasks
No ideal optimal value as it depends on reducer logic.
Optimize the reducer code as much as possible by profiling it for memory leaks and following best coding standards.
VCORES_MILLIS_MAPS
Total vcore-seconds taken by all map tasks
Vcore stands for virtual core in CPU of a computer.
This counter measures the cpu resources used by all the mappers.
It is the aggregated number of vcores that each mapper had been allocated times the number of seconds that mapper had run.
The number of vcores allocated to a mapper is controlled by property
“mapreduce.map.cpu.vcores” (default 1)
And in yarn, it is controlled by
“yarn.nodemanager.resource.cpu-vcores” (default 8).
In simple language, it is equal to (“mapreduce.map.cpu.vcores” X MILLIS_MAPS)
It gives the value of cpu resources that were blocked by mappers.
The ideal optimal value is as much low value for this counter as possible.
To reduce value to optimize the value of this counter, either reduce the number of vcore allocated to each mapper or the time taken to execute a mapper instance.
In other words, mapper should be made optimized and less resource-intensive as much possible. Then, reduce value of “mapreduce.map.cpu.vcores” that fits its requirement perfectly.


VCORES_MILLIS_REDUCES
Total vcore-seconds taken by all reduce tasks
Vcore stands for virtual core in CPU of a computer.
This counter measures the cpu resources used by all the reducers.
It is the aggregated number of vcores that each reducer had been allocated times the number of seconds that reducer had run.
The number of vcores allocated to a reducer is controlled by property
“mapreduce.reduce.cpu.vcores” (default 1)
And in yarn, it is controlled by
“yarn.nodemanager.resource.cpu-vcores” (default 8).
In simple language, it is equal to (“mapreduce.reduce.cpu.vcores” X MILLIS_REDUCES)
It gives the value of cpu resources that were blocked by reducers.
The ideal optimal value is as much low value for this counter as possible.
To reduce value to optimize the value of this counter, either reduce the number of vcore allocated to each reducer or the time taken to execute a mapper instance.
In other words, mapper should be made optimized and less resource-intensive as much possible. Then, reduce value of “mapreduce.reduce.cpu.vcores” that fits its requirement perfectly.

MB_MILLIS_MAPS
Total megabyte-seconds taken by all map tasks
This counter measures the memory resources used by all the mappers.
It is the aggregated amount of memory (in megabytes) that each mapper had been allocated times the number of seconds that mapper had run.
The amount of memory(MB) allocated to a mapper is controlled by property
“mapreduce.map.memory.mb” (default 1024)
And in yarn, it is controlled by
“yarn.nodemanager.resource.memory-mb” (default 8192).
In simple language, it is equal to (“mapreduce.map.memory.mb” X MILLIS_MAPS)
It gives the value of memory resources that were blocked by mappers.
The ideal optimal value is as much low value for this counter as possible.
To reduce value to optimize the value of this counter, either reduce the number of memory-mb allocated to each mapper or the time taken to execute a mapper instance.
In other words, mapper should be made optimized and less resource-intensive as much possible. Then, reduce value of “mapreduce.map.memory.mb” that fits its requirement perfectly.
MB_MILLIS_REDUCES
Total megabyte-seconds taken by all reduce tasks
This counter measures the memory resources used by all the reducers.
It is the aggregated amount of memory (in megabytes) that each reducer had been allocated times the number of seconds that reducer had run.
The amount of memory(MB) allocated to a reducer is controlled by property
“mapreduce.reduce.memory.mb” (default 1024)
And in yarn, it is controlled by
“yarn.nodemanager.resource.memory-mb” (default 8192).
In simple language, it is equal to (“mapreduce.reduce.memory.mb” X MILLIS_ REDUCES)
It gives the value of memory resources that were blocked by reducers.
The ideal optimal value is as much low value for this counter as possible.
To reduce value to optimize the value of this counter, either reduce the number of memory-mb allocated to each reducer or the time taken to execute a reducer instance.
In other words, mapper should be made optimized and less resource-intensive as much possible. Then, reduce value of “mapreduce.reduce.memory.mb” that fits its requirement perfectly.
DATA_LOCAL_MAPS
Data-local map tasks
Total number of map tasks run on local data blocks (data locality).
It should be as much near to total maps as possible. Optimally, all the map tasks will execute on local data to exploit locality of reference
The only solution is to ensure maximum data locality.

The way to ensure it can be one of the following:
1) Increase the number of nodes in cluster as it will distribute the data and computing capacity further. Overall, load on a particular node will decrease and probability of map task getting data locally will increase.
2) Increase the mapper-task capacity of task-tracker nodes with high level of resources.
Increase the value of  “mapred.tasktracker.reduce.tasks.maximum” on that task-tracker’s mapred-site.xml 
RACK_LOCAL_MAPS

Rack-local map tasks
Total number of map tasks for which data blocks are on same rack as node on which map task is executing.
Ideally should be zero.
OTHER_LOCAL_MAPS
Other local map tasks tasks
Total number of map tasks for which data blocks are on some other rack as node on which map task is executing.
Ideally should be zero.
TOTAL_LAUNCHED_MAPS
Launched map tasks.
It defines how many map tasks were launched for the job, including failed tasks, killed tasks and tasks that were started speculatively.
Optimally, this number is the same as the number of splits for the job.
In case of value not maching number of input splits, verify the InputFormat used.
TOTAL_LAUNCHED_REDUCES
Launched reduce tasks
It defines how many reduce tasks were launched for the job, including failed tasks, killed tasks and tasks that were started speculatively.
Optimally, should be equal to the number of reducers configured for that mapreduce job (default is 2 reducers)

NUM_FAILED_MAPS
Failed map tasks
The number of map attempts/tasks that were failed.
The reason of failure could be runtime exception in code or errors like Out Of Memory, I/O errors, etc.
Ideally should be zero.

NUM_FAILED_REDUCES
Failed reduce tasks
The number of reduce attempts/tasks that were failed.
The reason of failure could be runtime exception in code or errors like Out Of Memory, I/O errors, etc.
Ideally should be zero.

NUM_KILLED_MAPS
Killed map tasks
The number of map attempts/tasks that were killed.
The reason of killing could be speculative execution (in which slower task is killed) or failure of any node on which task is running.
Ideally should be zero.
A high number indicates too many tasks getting killed due to speculative execution or node failures.

The way to reduce the instances of speculative execution can be one of the following:
1) Increase the number of nodes in cluster as it will distribute the data and computing capacity further. Overall, load on a particular node will decrease.
2) Remove or minimize the number of nodes with low level of resources from cluster.
NUM_KILLED_REDUCES
Killed reduce tasks
The number of reduce attempts/tasks that were killed.
The reason of killing could be speculative execution (in which slower task is killed) or failure of any node on which task is running.
Ideally should be zero.





·         Counter Group: Map-Reduce Framework Counters
·         (Source FileName in Hadoop Code = TaskCounter.properties)
·         (CounterGroupName= Map-Reduce Framework)
SPLIT_RAW_BYTES
Input split bytes
The total number of bytes of input-split object consumed read by maps. These objects represent the split metadata (offset and length within a file) rather than split data itself.
No ideal optimal value

MAP_INPUT_BYTES
Map input bytes
The number of bytes of uncompressed input read by all the maps in the job.
Must be equal to number of bytes in input data

MAP_INPUT_RECORDS
Map input records
The number of input records consumed by all the maps in the job. It is incremented for every successful record read from RecordReader and passed to the mapper’s map method. Records that the map tasks failed to read are not included in these counters.
Must be equal to number of records in input data
In case of non-matching value, check RecordReader logic
MAP_SKIPPED_RECORDS
Map skipped records.
The number of Input records skipped by all the maps in the job.
Skipped records are bad/corrupt records that are making the map task attempts to fail due to run-time exceptions in third-party libraries being used in mapper. Skipping mode is turned on for a task only after it has failed twice.
For a task consistently failing on a bad record, the tasktracker runs the following
task attempts with these outcomes:
1. Task fails.
2. Task fails.
3. Skipping mode is enabled. Task fails but failed record is stored by the tasktracker.
4. Skipping mode is still enabled. Task succeeds by skipping the bad record that failed in the previous attempt.
It can detect only one bad record per task attempt, so this mechanism is appropriate only for detecting
occasional bad records.
To give skipping mode enough attempts to detect and skip
all the bad records in an input split, increase the value of “mapred.map.max.attempts”. Bad records are saved as sequence files in the job’s
output directory under the _logs/skip subdirectory.
Ideally, should be 0

MAP_OUTPUT_RECORDS
Map output records
The number of map output records produced by all the maps in the job. It is incremented for every successful record written by the mappers. Records that the map tasks failed to write are not included in these counters.
No ideal optimal value as it depends on mapper logic.

MAP_OUTPUT_BYTES
Map output bytes
The number of bytes of uncompressed output produced by all the maps in the job. Incremented every time collect() method is called on a map’s Output Collector.
No ideal optimal value as it depends on mapper logic.
To optimize this counter, use Hadoop Serialization or Avro to serialize the Mapper output.
MAP_OUTPUT_MATERIALIZED_BYTES
Map output materialized bytes
The number of bytes of map output actually written to disk.
This is visible and relevant only if map output compression is enabled. In that case, it is the number of bytes of compressed output produced by all the maps in the job.
No ideal optimal value as it depends on mapper logic.

COMBINE_INPUT_RECORDS
Combine input records
It indicates the number of records that were read by the optional combiner.
Incremented every time a value is read from the combiner’s iterator over values
If you don’t specify a combiner, these counters should be 0. Otherwise, should be equal to MAP_OUTPUT_RECORDS
In case of unexpected value, check combiner logic
COMBINE_OUTPUT_RECORDS
Combine output records
It indicates the number of records that were written by the optional combiner.
Incremented every time the collect() method is called on a combiner’s
OutputCollector.
If you don’t specify a combiner, these counters should be 0. Otherwise, should be equal to REDUCE_INPUT_RECORDS
In case of unexpected value, check combiner logic
       REDUCE_INPUT_RECORDS
Reduce input records
It indicates how many records were successfully read by the reducers.
Incremented every time a value is read from the reducer’s iterator over values.
The input record counter should be equal to the MAP_OUTPUT_RECORDS counter, in case of no combiner. Otherwise, should be equal to REDUCE_INPUT_RECORDS

REDUCE_SKIPPED_RECORDS
Reduce skipped records
The number of Input records skipped by all the reducers in the job.
Skipped records are bad/corrupt records that are making the reduce task attempts to fail due to run-time exceptions in third-party libraries being used in reducer. Skipping mode is turned on for a task only after it has failed twice.
For a task consistently failing on a bad record, the tasktracker runs the following
task attempts with these outcomes:
1. Task fails.
2. Task fails.
3. Skipping mode is enabled. Task fails but failed record is stored by the tasktracker.
4. Skipping mode is still enabled. Task succeeds by skipping the bad record that failed in the previous attempt.
It can detect only one bad record per task attempt, so this mechanism is appropriate only for detecting
occasional bad records.
To give skipping mode enough attempts to detect and skip
all the bad records in an input split, increase the value of “mapred.reduce.max.attempts”. Bad records are saved as sequence files in the job’s
output directory under the _logs/skip subdirectory.
Ideally should be zero.

REDUCE_INPUT_GROUPS
Reduce input groups
The total number of distinct unique key groups consumed by all the reducers in the job. It is incremented for every unique key that the reducers process.
Incremented every time the reducer’s reduce() method is called by the
framework.
This value should be equal to the total number of different keys in the intermediate results from the mappers.

REDUCE_SKIPPED_GROUPS
Reduce skipped groups
The number of distinct key groups skipped by all the reducers in the job.
 This value should be zero

REDUCE_OUTPUT_RECORDS
Reduce output records
It indicates how many records were successfully written by all the reducers in the job.
Incremented every time the collect() method is called on a reducer’s OutputCollector.
Ideally, ratio of (REDUCE_OUTPUT_RECORDS/ MAP_INPUT_RECORDS) should be less than 1 and as much closer to 0 as possible.
Although even if ratio is > 1, it is not invalid. It only violates the basic assumption of mapreduce paradigm that reduce-phase aggregates the data considerably.
REDUCE_SHUFFLE_BYTES
Reduce shuffle bytes
It indicates how many bytes of the map output were copied by the shuffle to the reducers.  
Should be as much lower as possible.
Higher numbers here will make the job go slower as the shuffle process is the primary network consumer in the MapReduce job.
Compress and Serialize the Mapper Output.
It could be reduced by using efficient serialization techniques (like Avro, Parquet, etc) and compression algorithms (like LZO, Snappy, bzip2 etc). 
Choices of algorithms purely depend on use case. For example, Parquet is useful only if read pattern for data is limited to some columns.
Also, Tradeoff of time taken to serialize-deserialize and compress-decompress should be taken into consideration.
SPILLED_RECORDS
Spilled Records
It indicates how much data the map and reduce tasks wrote (spilled) to disk when processing the records.
Must be equal to sum of REDUCE_INPUT_RECORDS and MAP_OUTPUT_RECORDS. In case, combiner is also invoked, It must be equal to sum of REDUCE_INPUT_RECORDS and COMBINE_OUTPUT_RECORDS.
Should be as much lower as possible as it involves costly Disk I/O.

SHUFFLED_MAPS
Shuffled Maps
The number of map output files transferred to reducers by shuffle.
In shuffle phase, each mapper’s output is partitioned and sorted into files, one for each reducer. There might be scenario that keys are not uniformly distributed across mappers and some mappers don’t generate partitions for all reducers.
Must be less than or equal to product of number of successful mappers and successful reducers.
Mathematically, must be less than or equal to (TOTAL_LAUNCHED_MAPS – NUM_KILLED_MAPS – NUM_FAILED_MAPS) * (TOTAL_LAUNCHED_REDUCES – NUM_KILLED_ REDUCES – NUM_FAILED_ REDUCES)

FAILED_SHUFFLE
Failed Shuffles
The number of map output copy failures during the shuffle.
Should be zero.

MERGED_MAP_OUTPUTS
Merged Map outputs
The number of map outputs that have been merged on the reduce side of the shuffle.
Must be equal to the value of SHUFFLED_MAPS

CPU_MILLISECONDS
CPU time spent (ms)
Total time spent by all tasks on CPU. It is gathered from /proc/cpuinfo and indicate how much total time was spent executing map and reduce tasks for a job.
Should be as much lower as possible.


GC_TIME_MILLIS
GC time elapsed (ms)
The total time spent in milliseconds doing garbage collection. The garbage collection counter is reported from GarbageCollectorMXBean.getCollectionTime().
The Garbage Collection happens in Java to collect unused objects (objects that are no longer referenced).

If garbage collection is frequent and represents a lot of time, you may be allocating unnecessary objects.
It should be closer to 0 as much as possible
If GC_TIME is amounting to considerable proportion of task run time, follow steps can help
1) Add -verbose:gc -XX:+PrintGCDetails to “mapred.child.java.opts”. Then inspect the logs for some tasks (mappers or reducers). If new objects are being created (like “new Text” or “new IntWritable”) unnecessarily or inside a loop or inside a function, try to optimize it. The Hadoop Writable Objects are mutable.
2) If too much object creation is necessity, try to increase heap-size allocation to task attempt jvm by adding –Xmx parameter to “mapred.map.child.java.opts” for map tasks and “mapred.reduce.child.java.opts” for reduce tasks. For example, to set committed heap usage to 4GB for map tasks, set “–Xmx4096m” to “mapred.map.child.java.opts”.    
VIRTUAL_MEMORY_BYTES
Virtual memory (bytes) snapshot
Total number of Virtual Memory Bytes (RAM+SWAP) consumed by all the tasks. It shows how much physical memory plus swap space is consumed by all tasks in a job.
(VIRTUAL_MEMORY_BYTES  - PHYSICAL_MEMORY_BYTES ) indicates how much swap space or paged memory is used for a job, which should be close to 0 as much as possible
The only reason for high value of swap space is memory committed to current tasks on a node far exceeding available memory resource on that node. The reason could be other system processes or applications consuming memory. This will result in pagination of memory which is being indicated by high value of swap space.

The way to ensure low value of swap memory can be one of the following:
1) Increase the number of nodes in cluster as it will distribute the data and computing capacity further. Overall, load on a particular node will decrease.
2) Decrease the mapper-task capacity and reducer-task capacity of task-tracker nodes having low level of resources.
Decrease the value of  “mapred.tasktracker.map.tasks.maximum” and “mapred.tasktracker.reduce.tasks.maximum” on that task-tracker’s mapred-site.xml 
PHYSICAL_MEMORY_BYTES
Physical memory (bytes) snapshot
Total Physical Memory consumed by all the tasks. These statistics are gathered from /proc/meminfo and indicate how much RAM (not including swap space) was consumed by all the tasks.
COMMITTED_HEAP_BYTES
Total committed heap usage (bytes)
The total amount of memory available in the JVM in bytes, as reported by Runtime.getRuntime().totalMemory()
Should be close to total task requirement



References: