Contents
背景
需要统计hive中每个sql的counter信息,MapReduce的框架中一共有以下counter信息
// Counters used by Task subclasses public static enum Counter { MAP_INPUT_RECORDS, MAP_OUTPUT_RECORDS, MAP_SKIPPED_RECORDS, MAP_INPUT_BYTES, MAP_OUTPUT_BYTES, COMBINE_INPUT_RECORDS, COMBINE_OUTPUT_RECORDS, REDUCE_INPUT_GROUPS, REDUCE_SHUFFLE_BYTES, REDUCE_INPUT_RECORDS, REDUCE_OUTPUT_RECORDS, REDUCE_SKIPPED_GROUPS, REDUCE_SKIPPED_RECORDS, SPILLED_RECORDS, SPLIT_RAW_BYTES, CPU_MILLISECONDS, PHYSICAL_MEMORY_BYTES, VIRTUAL_MEMORY_BYTES, COMMITTED_HEAP_BYTES }
可以看出,counter信息主要有两类:
- 一类是MapReduce框架中IO方面的一些统计,比如记录数、字节数等等
- 另一类是运行时宿主机的性能指标,比如CPU时间、内存使用等等
Counter信息获取
- 使用自带的hadoop rumen项目对job history进行解析,具体命令如下:
hadoop jar \ /opt/cloudera/parcels/CDH-5.11.2-1.cdh5.11.2.p0.4/jars/hadoop-rumen-2.6.0-cdh5.11.2.jar \ org.apache.hadoop.tools.rumen.TraceBuilder \ file:///tmp/job-trace.json \ file:///tmp/job-topology.json \ hdfs:///user/history/done/2018/06/06/000000
- 这样在生成的job-trace.json中就可以查看当天的所有job的具体信息
{ "jobID" : "job_1528373726326_0204", "queue" : "default", "user" : "hive", "jobName" : "INSERT OVERWRITE TABL...st_day('2018-05-16')(Stage-1)", "submitTime" : 1528781559636, "finishTime" : 1528781571551, "mapTasks" : [ { "startTime" : 1528781565131, "taskID" : "task_1528373726326_0204_m_000000", "taskType" : "MAP", "finishTime" : 1528781571514, "attempts" : [ { "startTime" : 1528781567259, "finishTime" : 1528781571514, "attemptID" : "attempt_1528373726326_0204_m_000000_0", "clockSplits" : [ 4201, 5, 4, 5, 4, 5, 5, 4, 5, 4, 5, 5 ], "cpuUsages" : [ 170, 171, 171, 171, 171, 171, 170, 171, 171, 171, 171, 171 ], "vmemKbytes" : [ 116591, 349773, 582955, 816136, 1049319, 1282500, 1515683, 1748864, 1982047, 2215229, 2448410, 2681593 ], "physMemKbytes" : [ 17301, 51903, 86505, 121107, 155710, 190312, 224915, 259516, 294119, 328722, 363323, 397926 ], "shuffleFinished" : -1, "sortFinished" : -1, "hdfsBytesRead" : 7795, "hdfsBytesWritten" : 2, "fileBytesRead" : 0, "fileBytesWritten" : 255682, "mapInputRecords" : 0, "mapOutputBytes" : -1, "mapOutputRecords" : 0, "combineInputRecords" : -1, "reduceInputGroups" : -1, "reduceInputRecords" : -1, "reduceShuffleBytes" : -1, "reduceOutputRecords" : -1, "spilledRecords" : 0, "mapInputBytes" : -1, "resourceUsageMetrics" : { "heapUsage" : 623378432, "virtualMemoryUsage" : 2865340416, "physicalMemoryUsage" : 425193472, "cumulativeCpuUsage" : 2050 }, …… ……
Counter信息解读
Resource Usage Metrics
一般来说,resourceUsageMetrics中的指标就可以体现出某个task attempt的资源使用情况
"resourceUsageMetrics" : { "heapUsage" : 623378432, "virtualMemoryUsage" : 2865340416, "physicalMemoryUsage" : 425193472, "cumulativeCpuUsage" : 2050 },
具体的更新逻辑在Task类中
/** * Update resource information counters */ void updateResourceCounters() { // Update generic resource counters updateHeapUsageCounter(); // Updating resources specified in ResourceCalculatorProcessTree if (pTree == null) { return; } pTree.updateProcessTree(); long cpuTime = pTree.getCumulativeCpuTime(); long pMem = pTree.getCumulativeRssmem(); long vMem = pTree.getCumulativeVmem(); // Remove the CPU time consumed previously by JVM reuse cpuTime -= initCpuCumulativeTime; counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime); counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem); counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem); }
Progress Split Counter
但是job-trace.json中有一组counter信息很是奇怪
"clockSplits" : [ 4201, 5, 4, 5, 4, 5, 5, 4, 5, 4, 5, 5 ], "cpuUsages" : [ 170, 171, 171, 171, 171, 171, 170, 171, 171, 171, 171, 171 ], "vmemKbytes" : [ 116591, 349773, 582955, 816136, 1049319, 1282500, 1515683, 1748864, 1982047, 2215229, 2448410, 2681593 ], "physMemKbytes" : [ 17301, 51903, 86505, 121107, 155710, 190312, 224915, 259516, 294119, 328722, 363323, 397926 ]
表面看是四个size为12的数组,这些其实是在task执行的过程中,每隔一段时间就记录下当前时刻的性能指标。
其中核心的类就是ProgressSplitsBlock
ProgressSplitsBlock
ProgressSplitsBlock(int numberSplits) { progressWallclockTime = new CumulativePeriodicStats(numberSplits); progressCPUTime = new CumulativePeriodicStats(numberSplits); progressVirtualMemoryKbytes = new StatePeriodicStats(numberSplits); progressPhysicalMemoryKbytes = new StatePeriodicStats(numberSplits); }
ProgressSplitsBlock中包含了四组统计信息,分别是距离任务启动的时间、CPU时间、虚拟内存占用、物理内存占用。其中CumulativePeriodicStats和StatePeriodicStats稍有区别。
- CumulativePeriodicStats是可以累加的指标,数组中的值相加即总计的值。
An easy-to-understand example of this kind of quantity would
be a distance traveled. It makes sense to consider that
portion of the total travel that can be apportioned to each
bucket.
170+171+171+171+171+171+170+171+171+171+171+171 = 2050
- StatePeriodicStats是一段时间内的平均值,数组中的值其实是一段时间的中位数
An easy-to-understand example of this kind of quantity would
be a temperature. It makes sense to consider the mean
temperature over a progress range.
创建ProgressSplitsBlock并更新的地方是TaskInProgress
TaskInProgress
- 创建ProgressSplitsBlock
synchronized ProgressSplitsBlock getSplits(TaskAttemptID statusAttemptID) { ProgressSplitsBlock result = splitsBlocks.get(statusAttemptID); if (result == null) { result = new ProgressSplitsBlock (conf.getInt(JTConfig.JT_JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS, ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS)); splitsBlocks.put(statusAttemptID, result); } return result; }
DEFAULT_NUMBER_PROGRESS_SPLITS为12,所以json中我们看到的数组的size为12
- 更新ProgressSplitsBlock
Counters.Counter cpuCounter = counters.findCounter(CPU_COUNTER_KEY); if (cpuCounter != null && cpuCounter.getCounter() <= Integer.MAX_VALUE) { splitsBlock.progressCPUTime.extend (newProgress, (int)(cpuCounter.getCounter())); }
extend方法中有一处特殊处理,就是下一次更新的时候,任务的progress的跨度太大,比如从30%直接跳到了90%,则中间的结果需要填充。所以这时,中间的结果并不是实际测量出来的值,而是平滑计算后的结果。