Friday 18 December 2015

Spark's new Memory Manager, Unified Memory Manager

Starting from v1.6+ release, Spark will move on to the latest Memory Manager implementation, Unified Memory Manager. Over the current implementation, It aims to increase memory efficiency for running programs and lot less configuration parameters passed by user.

Starting from basics, Memory Manager in Spark are responsible for distributing Executor's Heap Memory between storage and shuffle fraction.

1) Storage Memory Fraction: This fraction hold the partitions of the RDDs being processed. It acts as an in-memory LRU cache for the data. It means data doesn't remain in-memory for long and in case of storage fraction being full for incoming data, the oldest data is dropped.
There are multiple usecases of RDDs that are stored in Storage memory:
1. Input RDD: It is the input of the program which is lazily processed on the execution of Action function. The partitions are only loaded till the scope of execution of first function.
2. Intermediate RDD: They are created as output of one function which is consumed by next function in DAG. The partitions are only loaded till the scope of execution of next function.
3. Output RDD: They are the output of the last function in the DAG. They are available till the program scope.
4. Persisted RDD: In case if any of the intermediate RDD have to be used again later, it is recommended to persist it in memory, else the DAG is executed again to calculate them.
It can be persisted using persist() function of Spark API.
Following are the levels of the RDD persistence (sourced from spark's programming guide):
Storage LevelMeaning
MEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SERStore RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SERSimilar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLYStore the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. 
If your RDD is small and has low number of steps of calculation, MEMORY_ONLY is recommended.
If your RDD is small and has large number of steps of calculation, MEMORY_AND_DISK is recommended.
If your RDD is large and has low number of steps of calculation, MEMORY_ONLY_SER is recommended.
If your RDD is large and has large number of steps of calculation, MEMORY_AND_DISK_SER is recommended.
If you have multiple RDDs where each is large in size, has large number of steps of execution and want to share it within multiple spark applications, using TACHYON is recommended.
Storage Memory Distribution among Tasks: All the storage memory division logic among tasks is logically implemented, not physically enforced in JVM. So, each task is allocated minimum 1/2n fraction of storage memory with maximum upto 1/n fraction of storage memory. But it can still load the data physically in excess to allocation. That’s why, optimized execution of spark jobs depends significantly on synchronization between value of "spark.executor.memory" MB and number of parallel tasks (spark.executor.cores/spark.task.cpus) running on each executor. 
Thus, If value of "spark.executor.memory" is low, number of parallel tasks should be set to a higher value.

2) Shuffle Memory Fraction: When a dataset is aggregated/reduced by a key, all of its RDDs are shuffled to create a sorted dataset. This sorting needs some memory/buffer to keep sorted chunks of data. The amount of memory used depends on algorithm being used. This memory buffer used during sorting in shuffle phase is called shuffle memory fraction.

Executor JVM Heap Initialization
When Spark application is submitted in Spark-on-Yarn mode, the amount of memory to be used for each of the executors (–executor-memory flag or spark.executor.memory  parameter) is specified. Also the amount of memory to be used by the driver application (–driver-memory flag or spark.driver.memory parameter) is specified.

When executing, there will be spark.executor.instances  number of spark executors, each running as Java process in an isolated Yarn Container. Each Spark executor’s Java process launches a JVM of spark.executor.memory  MB. But Yarn container for spark executor occupies higher memory than spark.executor.memory  MB by max(384 MB, 0.10* spark.executor.memory). 
This difference is memory-overhead of launching yarn container as yarn needs some memory for internal execution and maintaining state. Not adding overhead memory will result in container not getting launched as YARN strictly follows the policy that if request memory is less than memory available in container, request fails.


Current Memory Manager Implementation- Static Memory Manager:
This is the only Memory Manager supported currently. In this implementation,
1) the ratio of the two fractions, storage and shuffle is statically defined by setting the parameter "spark.storage.memoryFraction". Because of statically defined boundaries, each fraction can’t use other fraction’s space even when it is idle. Overall, it results in heavy under-utilization of heap space.
2) To optimize the utilization of heap space, the end-user has to estimate storage memory and shuffle memory requirement for that program and then, have to set memory manager configuration parameters like "spark.storage.memoryFraction","spark.shuffle.memoryFraction", etc. This activity has to be repeated for each application and each execution environment. As it is more hit and trial way of optimizing the application, it can be frustrating for developers.

For more exhaustive read on static memory manager, refer http://0x0fff.com/spark-architecture/

New Memory Manager Implementation- Unified Memory Manager
This implementation aims to mitigate the above two disadvantages of static memory manager: under-utilization of java heap memory and manual intervention in optimizing usage of java heap memory.



It enforces a soft boundary between shuffle and storage memory such that either side can borrow memory from the other.
The region shared between shuffle and storage is a fraction of the total heap space, configurable through `spark.memory.fraction` (default 0.75). The position of the boundary within this space is further determined by `spark.memory.storageFraction` (default 0.5).
This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default. So, if heap size is 512 MB, the storage memory size will be 192 MB. But this is the minimum memory which can be used for storage purpose, as more memory can be borrowed from shuffle as per availability.
Storage can borrow as much shuffle/execution memory as is free until shuffle reclaims its space. When this happens, cached blocks will be evicted from memory until sufficient borrowed memory is released to satisfy the shuffle memory request.
Similarly, shuffle can borrow as much storage memory as is free. However, shuffle memory is never evicted by storage due to the complexities involved in implementing this. The implication is that attempts to cache blocks may fail if shuffle has already eaten up most of the storage space, in which case the new blocks will be evicted immediately according to their respective storage levels.

Unified Memory Manager further helps in reducing JVM heap space issues as heap space is not statically divided. As boundary is fluid, execution and storage can utilize each other’s idle space and other can reclaim it when needed. This results in optimum utilization of JVM heap space.

Wednesday 9 December 2015

container-launch exception in CDH 5.3 while launching spark-shell or submitting spark job in YARN deploy mode

It is a kind of bug in CDH 5.3 spark configuration but couple of months back while running first spark job, it was a big mysterical hurdle. I am adding a post on it as i came across couple of guys in my workplace facing same issue. I am hoping it might help other guys stepping into spark world through CDH 5.3 .

Below is the sequence of issue occurence and resolution:


  • [cloudera@quickstart bin]$  ./spark-shell --master yarn
2015-10-25 22:42:09,321 INFO  [main] spark.SecurityManager (Logging.scala:logInfo(59)) - Changing view acls to: cloudera
2015-10-25 22:42:09,332 INFO  [main] spark.SecurityManager (Logging.scala:logInfo(59)) - Changing modify acls to: cloudera
2015-10-25 22:42:09,333 INFO  [main] spark.SecurityManager (Logging.scala:logInfo(59)) - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
2015-10-25 22:42:09,333 INFO  [main] spark.HttpServer (Logging.scala:logInfo(59)) - Starting HTTP Server
2015-10-25 22:42:09,483 INFO  [main] server.Server (Server.java:doStart(272)) - jetty-8.y.z-SNAPSHOT
2015-10-25 22:42:09,520 INFO  [main] server.AbstractConnector (AbstractConnector.java:doStart(338)) - Started SocketConnector@0.0.0.0:40247
2015-10-25 22:42:09,521 INFO  [main] util.Utils (Logging.scala:logInfo(59)) - Successfully started service 'HTTP class server' on port 40247.
............................................
2015-10-25 22:42:42,999 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) -
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: root.cloudera
start time: 1445838161149
final status: UNDEFINED
tracking URL: http://quickstart.cloudera:8088/proxy/application_1436476349975_0011/
user: cloudera
2015-10-25 22:42:44,672 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:42:45,681 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:42:55,364 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:43:00,660 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:43:01,849 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:43:02,854 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: FAILED)
2015-10-25 22:43:02,854 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) -
client token: N/A
diagnostics: Application application_1436476349975_0011 failed 2 times due to AM Container for appattempt_1436476349975_0011_000002 exited with  exitCode: 1 due to: Exception from container-launch.
Container id: container_1436476349975_0011_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:197)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1
.Failing this attempt.. Failing the application.
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: root.cloudera
start time: 1445838161149
final status: FAILED
tracking URL: http://quickstart.cloudera:8088/cluster/app/application_1436476349975_0011
user: cloudera
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:102)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:58)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:335)
at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
at $iwC$$iwC.<init>(<console>:9)
at $iwC.<init>(<console>:18)
at <init>(<console>:20)
at .<init>(<console>:24)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:123)
at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122)
at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:270)
at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122)
at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:60)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:147)
at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:60)
at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106)
at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:60)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:962)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

  • One thing was obvious after reading the above stacktrace, there is some exception in launching container for spark-shell application, application_1436476349975_0011. The application-id can be retrieved from spark-shell console logs above.
  • The next thing was to check yarn logs for this application.
[cloudera@quickstart bin]$ yarn logs -applicationId application_1436476349975_0011
15/10/25 22:44:39 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032


Container: container_1436476349975_0011_01_000001 on quickstart.cloudera_60545
================================================================================
LogType: stderr
LogLength: 1151
Log Contents:
Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder
at org.apache.spark.Logging$class.initializeLogging(Logging.scala:116)
at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
at org.apache.spark.Logging$class.log(Logging.scala:51)
at org.apache.spark.deploy.yarn.ApplicationMaster$.log(ApplicationMaster.scala:495)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:511)
at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:536)
at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
Caused by: java.lang.ClassNotFoundException: org.slf4j.impl.StaticLoggerBinder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 7 more

LogType: stdout
LogLength: 0
Log Contents:



Container: container_1436476349975_0011_02_000001 on quickstart.cloudera_60545
================================================================================
LogType: stderr
LogLength: 1151
Log Contents:
Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder
at org.apache.spark.Logging$class.initializeLogging(Logging.scala:116)
at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
at org.apache.spark.Logging$class.log(Logging.scala:51)
at org.apache.spark.deploy.yarn.ApplicationMaster$.log(ApplicationMaster.scala:495)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:511)
at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:536)
at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
Caused by: java.lang.ClassNotFoundException: org.slf4j.impl.StaticLoggerBinder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 7 more

LogType: stdout
LogLength: 0
Log Contents:


  • On first look, It seems to be a classpath issue due to which it was not able to find slf4j lib in classpath.
  • Resolution lies in configuration file, /etc/spark/conf/spark-env.sh where there is a minor typographical mistake.
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-etc/hadoop/conf}

In the above statement in spark-env.sh, there is a / missing in front of etc/hadoop/conf due to which it is missing slf4j.

so, i changed it to 

export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop/conf}

  • Now it is running fit and fine.
Happy Hacking !!!

Tuesday 8 December 2015

TRANSPOSE/PIVOT a Table in Hive

Transposing/pivoting a table means to convert values of one of the column as set of new columns and another column as corresponding values to those new set of columns.
For example, if original table, "temp_sample" is:

id code   key    value
-------------------------------
1  A       p       e
2       B       q       f
3       B       p       f
3       B       q       h
3       B       r        j
3       C       t        k

the transpose/pivot of the table should be:

id      code    p       q       r       t
--------------------------------------
1       A       e
2       B                 f
3       B       f         h        j
3       C                                   k


Hive Implementation

1) When value is of string type
If "test_sample" is hive table with following table definiton:
create table test_sample(id string, code string, key string, value string) row format delimited fields terminated by ',' lines terminated by '\n';

hive> set hive.cli.print.header=true;
hive> select * from test_sample;
id code   key    value
1  A       p       e
2       B       q       f
3       B       p       f
3       B       q       h
3       B       r        j
3       C       t        k

the query to create transpose of it is:

select b.id, b.code, concat_ws('',b.p) as p, concat_ws('',b.q) as q, concat_ws('',b.r) as r, concat_ws('',b.t) as t from
(select id, code,
collect_list(a.group_map['p']) as p,
collect_list(a.group_map['q']) as q,
collect_list(a.group_map['r']) as r,
collect_list(a.group_map['t']) as t
from ( select
id, code,
map(key,value) as group_map
from test_sample
) a group by a.id, a.code) b;

On execution of this query, the output will be:
 id      code    p       q       r       t
--------------------------------------
1       A       e
2       B                 f
3       B       f         h        j
3       C                                   k

which is the expected output.

Important, point to note is that it is not using any custom UDF/UDAFs. It is only using in-built hive functions which save us lot of hassles.
"concat_ws" and "map" are hive udf and "collect_list" is a hive udaf.

Working
  • "map" function creates map of values of two columns as key value pairs.
  • the in outer query, we did group by on dimension columns (id and code) and aggregate all the values of a particular key using "collect_list"
id      code    p       q          r       t
1       A       ["e"]   []         []      []
2       B       []        ["f"]     []      []
3       B       ["f"]   ["h"]    ["j"]   []
3       C       []        []          []      ["k"]

  • Then in last outer query, we use "concat_ws" function to get a single string value out of array.

2) When value is of numeric (int/float/double) type
If "test_sample" is hive table with following table definiton:
create table test_sample(id string, code string, key string, value int) row format delimited fields terminated by ',' lines terminated by '\n';

hive> set hive.cli.print.header=true;
hive> select * from test_sample;
id code   key    value
1       A       p       5
2       B       q       6
3       B       p       6
3       B       q       8
3       B       r       10
3       C       t       11

the query to create transpose of it is:

select id, code, sum(a.group_map['p']) as p, sum(a.group_map['q']) as q, sum(a.group_map['r']) as r, sum(a.group_map['t']) as t from
( select id, code,  
map(proc1,proc2) as group_map 
from test_sample
) a group by a.id, a.code;


On execution of this query, the output will be:
 id      code    p            q             r             t
-----------------------------------------------------------
1       A         5            NULL    NULL    NULL
2       B         NULL    6            NULL    NULL
3       B         6             8            10          NULL
3       C         NULL    NULL    NULL    11

which is the expected output.

Wednesday 21 October 2015

Performance Monitoring, Testing and Optimizing Hadoop-MapReduce Job using Hadoop Counters

If you run a Hadoop MapReduce Job and check its status page on job tracker for its job id, you will get performance counters like the below.

File System Counters
                FILE: Number of bytes read=4386096368
                FILE: Number of bytes written=8805370803
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=54583718086
                HDFS: Number of bytes written=4382090874
                HDFS: Number of read operations=1479
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=369
                Launched reduce tasks=1
                Data-local map tasks=369
                Total time spent by all maps in occupied slots (ms)=34288552
                Total time spent by all reduces in occupied slots (ms)=232084
                Total time spent by all map tasks (ms)=8572138
                Total time spent by all reduce tasks (ms)=58021
                Total vcore-seconds taken by all map tasks=8572138
                Total vcore-seconds taken by all reduce tasks=58021
                Total megabyte-seconds taken by all map tasks=35111477248
                Total megabyte-seconds taken by all reduce tasks=237654016
        Map-Reduce Framework
                Map input records=14753874
                Map output records=666776
                Map output bytes=4383426830
                Map output materialized bytes=4386098552
                Input split bytes=47970
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=4386098552
                Reduce input records=666776
                Reduce output records=666776
                Spilled Records=1333552
                Shuffled Maps =369
                Failed Shuffles=0
                Merged Map outputs=369
                GC time elapsed (ms)=1121584
                CPU time spent (ms)=23707900
                Physical memory (bytes) snapshot=152915259392
                Virtual memory (bytes) snapshot=2370755190784
                Total committed heap usage (bytes)=126644912128
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=49449743227
        File Output Format Counters
                Bytes Written=4382090874
Counters are used to determine if and how often a particular event occurred during a job execution.
A task’s counters are sent in full every time, rather than sending the counts since the last transmission, since this guards against errors due to lost messages. Furthermore, during a job run, counters may go down if a task fails. Counter values are definitive only once a job has successfully completed.

Performance Monitoring: Tracking the counter values provides vital stats about the mapreduce job. In addition to stats on mapreduce job and its tasks, it also provide stats on operation done on underlying filesystems. 

Performance Testing: Knowing the expected values of counters provide first line of performance validation for mapreduce jobs. It helps in ascertaining whether mapreduce job is behaving as expected by the ideal mapreduce paradigm. For example, the simple check that sum of number of records produced by mapper should be equal to total number of records consumed by producer.

Performance Optimization: This forms the second line of performance validation for mapreduce jobs. Counter values gave hints on what can be done to optimize that mapreduce job further.

To describe in detail how the above objectives of Performance Monitoring, Testing and Optimization can be achieved using Counters, I have listed in tabular format all mapreduce counters, their description, ideal expected values and how to optimize hints.
There are 2 categories of counters in Hadoop: In-built(file system, job, framework) and custom.

In-built Counters

Source Code hadoop-mapreduce-client-core/2.6.0/org/apache/hadoop/mapreduce



Counter  
Brief Description
Detailed Description
Expected value for ideally optimized MapReduce Performance
How to Optimize in case of not expected value/ Bad Performance
           Counter Group: FileSystem Counters
           (Source FileName in Hadoop Code = FileSystemCounter.properties)
           (CounterGroupName = File System Counters)
FOR each filesystem(HDFS, MAPRFS, S3):




·    <FS>_BYTES_READ
Number of bytes read from filesystem, <FS>
The total number of bytes read from a specific filesystem.
No ideal expected value as it depends on volume of data being read.
It could be reduced by using efficient serialization techniques (like Avro, Parquet, etc) and compression algorithms (like LZO, Snappy, bzip2 etc).  
Choices of algorithms purely depend on use case. For example, Parquet is useful only if read pattern for data is limited to some columns.
Also, Tradeoff of time taken to serialize-deserialize and compress-decompress should be taken into consideration.
·    <FS>_BYTES_WRITTEN
Number of bytes written to filesystem, <FS>
The total number of bytes written into a specific filesystem.
No ideal expected value as it depends on volume of data being generated by mapreduce algorithm.
Example, FOR MAPRFS:




·    MAPRFS_BYTES_READ
total number of bytes read from mapr file system.
Off the shelf counter for HDFS_BYTES_READ.


·    MAPRFS_BYTES_WRITTEN
total number of bytes written to mapr file system.
Off the shelf counter for HDFS_BYTES_WRITTEN.


        FILE_BYTES_READ
total number of bytes read from local file system
This counter is incremented for each byte read from the local file system.
These writes occur:
1) during the shuffle phase, when the map phase intermediate data is read by HTTP worker threads(“tasktracker.http.threads”) into reducer tasks.
2) during the reduce phase, when reducers read grouped and sorted data from local filesystem.
No ideal expected value as it depends on shuffle phase and reducer logic.
It could be reduced by using efficient compression algorithms (like LZO, Snappy, bzip2 etc). Also, Tradeoff of time taken to compress-decompress should be taken into consideration

        FILE_BYTES_WRITTEN
total number of bytes written to local file system
This counter is incremented for each byte written to the local file system.
These writes occur:
1) during the map phase when the mappers write their intermediate results to the local file system.
2) during the shuffle phase when the reducers spill intermediate results to their local disks while sorting.
No ideal expected value as it depends on mapper logic and shuffle phase.
It could be reduced by using efficient compression algorithms (like LZO, Snappy, bzip2 etc). Also, Tradeoff of time taken to compress-decompress should be taken into consideration

READ_OPS
Number of read operations
It is accumulated at the client per file system. It is number of read operations such as listStatus, getFileBlockLocations, open etc.
This is useful in the interim to identify jobs that heavily load HDFS.

LARGE_READ_OPS
Number of large read operations
It is accumulated at the client per file system. On file system, most of the operations are small except listFiles for a large directory. Iterative listFiles was introduced in HDFS to break down a single large operation into smaller steps. This counter is incremented for every iteration of listFiles, when listing files under a large directory.
This is useful in the interim to identify jobs that heavily load HDFS.

WRITE_OPS
Number of write operations
It is accumulated at the client per file system. It is number of write operations such as create, append, setPermission etc.
This is useful in the interim to identify jobs that heavily load HDFS.






Counter Group: Job Counters
(Source FileName in Hadoop Code = JobCounter.properties)
(CounterGroupName= Job Counters)
SLOTS_MILLIS_MAPS
Total time spent by all maps in occupied slots (ms)
This value indicates wall clock time for the map tasks. It equals to FALLOW_SLOTS_MILLIS_MAPS + MILLIS_MAPS
Optimally, should be equal to MILLIS_MAPS
Reduce time spent in FALLOW_SLOTS_MILLIS_MAPS as much as possible
FALLOW_SLOTS_MILLIS_MAPS
Total time spent by all maps waiting after reserving slots (ms)
Indicates how much time map tasks wait in the queue after the slots are reserved but before the map tasks execute. Slot reservation is a capacity scheduler feature for memory-intensive jobs.

Not used by YARN-based mapreduce.
Optimally, should be zero
A high number indicates a possible mismatch between the number of slots configured for a task tracker and how many resources are actually available.

The way to reduce it can be one of the following:
1) Increase the number of nodes in cluster as it will distribute the data and computing capacity further. Overall, load on a particular node will decrease.
2) Reduce the mapper-task capacity of task-tracker nodes with low level of resources.
Reduce the value of  “mapred.tasktracker.map.tasks.maximum” on that task-tracker’s mapred-site.xml 
MILLIS_MAPS
Total time spent by all map tasks (ms)
The total time taken in running all the map tasks in ms, including speculative tasks
No ideal optimal value as it depends on mapper logic.
Optimize the mapper code as much as possible by profiling it for memory leaks and following best coding standards.
SLOTS_MILLIS_REDUCES
Total time spent by all reduces in occupied slots (ms)
This value indicates wall clock time for the reduce tasks. It equals to FALLOW_SLOTS_MILLIS_REDUCES + MILLIS_REDUCES
Optimally, should be equal to MILLIS_REDUCES
Reduce time spent in FALLOW_SLOTS_MILLIS_REDUCES as much as possible
FALLOW_SLOTS_MILLIS_REDUCES
Total time spent by all reduces waiting after reserving slots (ms)
indicates how much time map tasks wait in the queue after the slots are reserved but before the reduce tasks execute. Slot reservation is a capacity scheduler feature for memory-intensive jobs.

Not used by YARN-based mapreduce.
Optimally, should be zero
A high number indicates a possible mismatch between the number of slots configured for a task tracker and how many resources are actually available.

The way to reduce it can be one of the following:
1) Increase the number of nodes in cluster as it will distribute the data and computing capacity further. Overall, load on a particular node will decrease.
2) Reduce the reducer-task capacity of task-tracker nodes with low level of resources.
Reduce the value of  “mapred.tasktracker.reduce.tasks.maximum” on that task-tracker’s mapred-site.xml 
MILLIS_REDUCES
Total time spent by all reduce tasks (ms)
The total time taken in running all the reduce tasks in ms, including speculative tasks
No ideal optimal value as it depends on reducer logic.
Optimize the reducer code as much as possible by profiling it for memory leaks and following best coding standards.
VCORES_MILLIS_MAPS
Total vcore-seconds taken by all map tasks
Vcore stands for virtual core in CPU of a computer.
This counter measures the cpu resources used by all the mappers.
It is the aggregated number of vcores that each mapper had been allocated times the number of seconds that mapper had run.
The number of vcores allocated to a mapper is controlled by property
“mapreduce.map.cpu.vcores” (default 1)
And in yarn, it is controlled by
“yarn.nodemanager.resource.cpu-vcores” (default 8).
In simple language, it is equal to (“mapreduce.map.cpu.vcores” X MILLIS_MAPS)
It gives the value of cpu resources that were blocked by mappers.
The ideal optimal value is as much low value for this counter as possible.
To reduce value to optimize the value of this counter, either reduce the number of vcore allocated to each mapper or the time taken to execute a mapper instance.
In other words, mapper should be made optimized and less resource-intensive as much possible. Then, reduce value of “mapreduce.map.cpu.vcores” that fits its requirement perfectly.


VCORES_MILLIS_REDUCES
Total vcore-seconds taken by all reduce tasks
Vcore stands for virtual core in CPU of a computer.
This counter measures the cpu resources used by all the reducers.
It is the aggregated number of vcores that each reducer had been allocated times the number of seconds that reducer had run.
The number of vcores allocated to a reducer is controlled by property
“mapreduce.reduce.cpu.vcores” (default 1)
And in yarn, it is controlled by
“yarn.nodemanager.resource.cpu-vcores” (default 8).
In simple language, it is equal to (“mapreduce.reduce.cpu.vcores” X MILLIS_REDUCES)
It gives the value of cpu resources that were blocked by reducers.
The ideal optimal value is as much low value for this counter as possible.
To reduce value to optimize the value of this counter, either reduce the number of vcore allocated to each reducer or the time taken to execute a mapper instance.
In other words, mapper should be made optimized and less resource-intensive as much possible. Then, reduce value of “mapreduce.reduce.cpu.vcores” that fits its requirement perfectly.

MB_MILLIS_MAPS
Total megabyte-seconds taken by all map tasks
This counter measures the memory resources used by all the mappers.
It is the aggregated amount of memory (in megabytes) that each mapper had been allocated times the number of seconds that mapper had run.
The amount of memory(MB) allocated to a mapper is controlled by property
“mapreduce.map.memory.mb” (default 1024)
And in yarn, it is controlled by
“yarn.nodemanager.resource.memory-mb” (default 8192).
In simple language, it is equal to (“mapreduce.map.memory.mb” X MILLIS_MAPS)
It gives the value of memory resources that were blocked by mappers.
The ideal optimal value is as much low value for this counter as possible.
To reduce value to optimize the value of this counter, either reduce the number of memory-mb allocated to each mapper or the time taken to execute a mapper instance.
In other words, mapper should be made optimized and less resource-intensive as much possible. Then, reduce value of “mapreduce.map.memory.mb” that fits its requirement perfectly.
MB_MILLIS_REDUCES
Total megabyte-seconds taken by all reduce tasks
This counter measures the memory resources used by all the reducers.
It is the aggregated amount of memory (in megabytes) that each reducer had been allocated times the number of seconds that reducer had run.
The amount of memory(MB) allocated to a reducer is controlled by property
“mapreduce.reduce.memory.mb” (default 1024)
And in yarn, it is controlled by
“yarn.nodemanager.resource.memory-mb” (default 8192).
In simple language, it is equal to (“mapreduce.reduce.memory.mb” X MILLIS_ REDUCES)
It gives the value of memory resources that were blocked by reducers.
The ideal optimal value is as much low value for this counter as possible.
To reduce value to optimize the value of this counter, either reduce the number of memory-mb allocated to each reducer or the time taken to execute a reducer instance.
In other words, mapper should be made optimized and less resource-intensive as much possible. Then, reduce value of “mapreduce.reduce.memory.mb” that fits its requirement perfectly.
DATA_LOCAL_MAPS
Data-local map tasks
Total number of map tasks run on local data blocks (data locality).
It should be as much near to total maps as possible. Optimally, all the map tasks will execute on local data to exploit locality of reference
The only solution is to ensure maximum data locality.

The way to ensure it can be one of the following:
1) Increase the number of nodes in cluster as it will distribute the data and computing capacity further. Overall, load on a particular node will decrease and probability of map task getting data locally will increase.
2) Increase the mapper-task capacity of task-tracker nodes with high level of resources.
Increase the value of  “mapred.tasktracker.reduce.tasks.maximum” on that task-tracker’s mapred-site.xml 
RACK_LOCAL_MAPS

Rack-local map tasks
Total number of map tasks for which data blocks are on same rack as node on which map task is executing.
Ideally should be zero.
OTHER_LOCAL_MAPS
Other local map tasks tasks
Total number of map tasks for which data blocks are on some other rack as node on which map task is executing.
Ideally should be zero.
TOTAL_LAUNCHED_MAPS
Launched map tasks.
It defines how many map tasks were launched for the job, including failed tasks, killed tasks and tasks that were started speculatively.
Optimally, this number is the same as the number of splits for the job.
In case of value not maching number of input splits, verify the InputFormat used.
TOTAL_LAUNCHED_REDUCES
Launched reduce tasks
It defines how many reduce tasks were launched for the job, including failed tasks, killed tasks and tasks that were started speculatively.
Optimally, should be equal to the number of reducers configured for that mapreduce job (default is 2 reducers)

NUM_FAILED_MAPS
Failed map tasks
The number of map attempts/tasks that were failed.
The reason of failure could be runtime exception in code or errors like Out Of Memory, I/O errors, etc.
Ideally should be zero.

NUM_FAILED_REDUCES
Failed reduce tasks
The number of reduce attempts/tasks that were failed.
The reason of failure could be runtime exception in code or errors like Out Of Memory, I/O errors, etc.
Ideally should be zero.

NUM_KILLED_MAPS
Killed map tasks
The number of map attempts/tasks that were killed.
The reason of killing could be speculative execution (in which slower task is killed) or failure of any node on which task is running.
Ideally should be zero.
A high number indicates too many tasks getting killed due to speculative execution or node failures.

The way to reduce the instances of speculative execution can be one of the following:
1) Increase the number of nodes in cluster as it will distribute the data and computing capacity further. Overall, load on a particular node will decrease.
2) Remove or minimize the number of nodes with low level of resources from cluster.
NUM_KILLED_REDUCES
Killed reduce tasks
The number of reduce attempts/tasks that were killed.
The reason of killing could be speculative execution (in which slower task is killed) or failure of any node on which task is running.
Ideally should be zero.





·         Counter Group: Map-Reduce Framework Counters
·         (Source FileName in Hadoop Code = TaskCounter.properties)
·         (CounterGroupName= Map-Reduce Framework)
SPLIT_RAW_BYTES
Input split bytes
The total number of bytes of input-split object consumed read by maps. These objects represent the split metadata (offset and length within a file) rather than split data itself.
No ideal optimal value

MAP_INPUT_BYTES
Map input bytes
The number of bytes of uncompressed input read by all the maps in the job.
Must be equal to number of bytes in input data

MAP_INPUT_RECORDS
Map input records
The number of input records consumed by all the maps in the job. It is incremented for every successful record read from RecordReader and passed to the mapper’s map method. Records that the map tasks failed to read are not included in these counters.
Must be equal to number of records in input data
In case of non-matching value, check RecordReader logic
MAP_SKIPPED_RECORDS
Map skipped records.
The number of Input records skipped by all the maps in the job.
Skipped records are bad/corrupt records that are making the map task attempts to fail due to run-time exceptions in third-party libraries being used in mapper. Skipping mode is turned on for a task only after it has failed twice.
For a task consistently failing on a bad record, the tasktracker runs the following
task attempts with these outcomes:
1. Task fails.
2. Task fails.
3. Skipping mode is enabled. Task fails but failed record is stored by the tasktracker.
4. Skipping mode is still enabled. Task succeeds by skipping the bad record that failed in the previous attempt.
It can detect only one bad record per task attempt, so this mechanism is appropriate only for detecting
occasional bad records.
To give skipping mode enough attempts to detect and skip
all the bad records in an input split, increase the value of “mapred.map.max.attempts”. Bad records are saved as sequence files in the job’s
output directory under the _logs/skip subdirectory.
Ideally, should be 0

MAP_OUTPUT_RECORDS
Map output records
The number of map output records produced by all the maps in the job. It is incremented for every successful record written by the mappers. Records that the map tasks failed to write are not included in these counters.
No ideal optimal value as it depends on mapper logic.

MAP_OUTPUT_BYTES
Map output bytes
The number of bytes of uncompressed output produced by all the maps in the job. Incremented every time collect() method is called on a map’s Output Collector.
No ideal optimal value as it depends on mapper logic.
To optimize this counter, use Hadoop Serialization or Avro to serialize the Mapper output.
MAP_OUTPUT_MATERIALIZED_BYTES
Map output materialized bytes
The number of bytes of map output actually written to disk.
This is visible and relevant only if map output compression is enabled. In that case, it is the number of bytes of compressed output produced by all the maps in the job.
No ideal optimal value as it depends on mapper logic.

COMBINE_INPUT_RECORDS
Combine input records
It indicates the number of records that were read by the optional combiner.
Incremented every time a value is read from the combiner’s iterator over values
If you don’t specify a combiner, these counters should be 0. Otherwise, should be equal to MAP_OUTPUT_RECORDS
In case of unexpected value, check combiner logic
COMBINE_OUTPUT_RECORDS
Combine output records
It indicates the number of records that were written by the optional combiner.
Incremented every time the collect() method is called on a combiner’s
OutputCollector.
If you don’t specify a combiner, these counters should be 0. Otherwise, should be equal to REDUCE_INPUT_RECORDS
In case of unexpected value, check combiner logic
       REDUCE_INPUT_RECORDS
Reduce input records
It indicates how many records were successfully read by the reducers.
Incremented every time a value is read from the reducer’s iterator over values.
The input record counter should be equal to the MAP_OUTPUT_RECORDS counter, in case of no combiner. Otherwise, should be equal to REDUCE_INPUT_RECORDS

REDUCE_SKIPPED_RECORDS
Reduce skipped records
The number of Input records skipped by all the reducers in the job.
Skipped records are bad/corrupt records that are making the reduce task attempts to fail due to run-time exceptions in third-party libraries being used in reducer. Skipping mode is turned on for a task only after it has failed twice.
For a task consistently failing on a bad record, the tasktracker runs the following
task attempts with these outcomes:
1. Task fails.
2. Task fails.
3. Skipping mode is enabled. Task fails but failed record is stored by the tasktracker.
4. Skipping mode is still enabled. Task succeeds by skipping the bad record that failed in the previous attempt.
It can detect only one bad record per task attempt, so this mechanism is appropriate only for detecting
occasional bad records.
To give skipping mode enough attempts to detect and skip
all the bad records in an input split, increase the value of “mapred.reduce.max.attempts”. Bad records are saved as sequence files in the job’s
output directory under the _logs/skip subdirectory.
Ideally should be zero.

REDUCE_INPUT_GROUPS
Reduce input groups
The total number of distinct unique key groups consumed by all the reducers in the job. It is incremented for every unique key that the reducers process.
Incremented every time the reducer’s reduce() method is called by the
framework.
This value should be equal to the total number of different keys in the intermediate results from the mappers.

REDUCE_SKIPPED_GROUPS
Reduce skipped groups
The number of distinct key groups skipped by all the reducers in the job.
 This value should be zero

REDUCE_OUTPUT_RECORDS
Reduce output records
It indicates how many records were successfully written by all the reducers in the job.
Incremented every time the collect() method is called on a reducer’s OutputCollector.
Ideally, ratio of (REDUCE_OUTPUT_RECORDS/ MAP_INPUT_RECORDS) should be less than 1 and as much closer to 0 as possible.
Although even if ratio is > 1, it is not invalid. It only violates the basic assumption of mapreduce paradigm that reduce-phase aggregates the data considerably.
REDUCE_SHUFFLE_BYTES
Reduce shuffle bytes
It indicates how many bytes of the map output were copied by the shuffle to the reducers.  
Should be as much lower as possible.
Higher numbers here will make the job go slower as the shuffle process is the primary network consumer in the MapReduce job.
Compress and Serialize the Mapper Output.
It could be reduced by using efficient serialization techniques (like Avro, Parquet, etc) and compression algorithms (like LZO, Snappy, bzip2 etc). 
Choices of algorithms purely depend on use case. For example, Parquet is useful only if read pattern for data is limited to some columns.
Also, Tradeoff of time taken to serialize-deserialize and compress-decompress should be taken into consideration.
SPILLED_RECORDS
Spilled Records
It indicates how much data the map and reduce tasks wrote (spilled) to disk when processing the records.
Must be equal to sum of REDUCE_INPUT_RECORDS and MAP_OUTPUT_RECORDS. In case, combiner is also invoked, It must be equal to sum of REDUCE_INPUT_RECORDS and COMBINE_OUTPUT_RECORDS.
Should be as much lower as possible as it involves costly Disk I/O.

SHUFFLED_MAPS
Shuffled Maps
The number of map output files transferred to reducers by shuffle.
In shuffle phase, each mapper’s output is partitioned and sorted into files, one for each reducer. There might be scenario that keys are not uniformly distributed across mappers and some mappers don’t generate partitions for all reducers.
Must be less than or equal to product of number of successful mappers and successful reducers.
Mathematically, must be less than or equal to (TOTAL_LAUNCHED_MAPS – NUM_KILLED_MAPS – NUM_FAILED_MAPS) * (TOTAL_LAUNCHED_REDUCES – NUM_KILLED_ REDUCES – NUM_FAILED_ REDUCES)

FAILED_SHUFFLE
Failed Shuffles
The number of map output copy failures during the shuffle.
Should be zero.

MERGED_MAP_OUTPUTS
Merged Map outputs
The number of map outputs that have been merged on the reduce side of the shuffle.
Must be equal to the value of SHUFFLED_MAPS

CPU_MILLISECONDS
CPU time spent (ms)
Total time spent by all tasks on CPU. It is gathered from /proc/cpuinfo and indicate how much total time was spent executing map and reduce tasks for a job.
Should be as much lower as possible.


GC_TIME_MILLIS
GC time elapsed (ms)
The total time spent in milliseconds doing garbage collection. The garbage collection counter is reported from GarbageCollectorMXBean.getCollectionTime().
The Garbage Collection happens in Java to collect unused objects (objects that are no longer referenced).

If garbage collection is frequent and represents a lot of time, you may be allocating unnecessary objects.
It should be closer to 0 as much as possible
If GC_TIME is amounting to considerable proportion of task run time, follow steps can help
1) Add -verbose:gc -XX:+PrintGCDetails to “mapred.child.java.opts”. Then inspect the logs for some tasks (mappers or reducers). If new objects are being created (like “new Text” or “new IntWritable”) unnecessarily or inside a loop or inside a function, try to optimize it. The Hadoop Writable Objects are mutable.
2) If too much object creation is necessity, try to increase heap-size allocation to task attempt jvm by adding –Xmx parameter to “mapred.map.child.java.opts” for map tasks and “mapred.reduce.child.java.opts” for reduce tasks. For example, to set committed heap usage to 4GB for map tasks, set “–Xmx4096m” to “mapred.map.child.java.opts”.    
VIRTUAL_MEMORY_BYTES
Virtual memory (bytes) snapshot
Total number of Virtual Memory Bytes (RAM+SWAP) consumed by all the tasks. It shows how much physical memory plus swap space is consumed by all tasks in a job.
(VIRTUAL_MEMORY_BYTES  - PHYSICAL_MEMORY_BYTES ) indicates how much swap space or paged memory is used for a job, which should be close to 0 as much as possible
The only reason for high value of swap space is memory committed to current tasks on a node far exceeding available memory resource on that node. The reason could be other system processes or applications consuming memory. This will result in pagination of memory which is being indicated by high value of swap space.

The way to ensure low value of swap memory can be one of the following:
1) Increase the number of nodes in cluster as it will distribute the data and computing capacity further. Overall, load on a particular node will decrease.
2) Decrease the mapper-task capacity and reducer-task capacity of task-tracker nodes having low level of resources.
Decrease the value of  “mapred.tasktracker.map.tasks.maximum” and “mapred.tasktracker.reduce.tasks.maximum” on that task-tracker’s mapred-site.xml 
PHYSICAL_MEMORY_BYTES
Physical memory (bytes) snapshot
Total Physical Memory consumed by all the tasks. These statistics are gathered from /proc/meminfo and indicate how much RAM (not including swap space) was consumed by all the tasks.
COMMITTED_HEAP_BYTES
Total committed heap usage (bytes)
The total amount of memory available in the JVM in bytes, as reported by Runtime.getRuntime().totalMemory()
Should be close to total task requirement



References: