背景
需要统计hive中每个sql的counter信息,MapReduce的框架中一共有以下counter信息
// Counters used by Task subclasses
public static enum Counter {
MAP_INPUT_RECORDS,
MAP_OUTPUT_RECORDS,
MAP_SKIPPED_RECORDS,
MAP_INPUT_BYTES,
MAP_OUTPUT_BYTES,
COMBINE_INPUT_RECORDS,
COMBINE_OUTPUT_RECORDS,
REDUCE_INPUT_GROUPS,
REDUCE_SHUFFLE_BYTES,
REDUCE_INPUT_RECORDS,
REDUCE_OUTPUT_RECORDS,
REDUCE_SKIPPED_GROUPS,
REDUCE_SKIPPED_RECORDS,
SPILLED_RECORDS,
SPLIT_RAW_BYTES,
CPU_MILLISECONDS,
PHYSICAL_MEMORY_BYTES,
VIRTUAL_MEMORY_BYTES,
COMMITTED_HEAP_BYTES
}
可以看出,counter信息主要有两类:
- 一类是MapReduce框架中IO方面的一些统计,比如记录数、字节数等等
- 另一类是运行时宿主机的性能指标,比如CPU时间、内存使用等等
Counter信息获取
- 使用自带的hadoop rumen项目对job history进行解析,具体命令如下:
hadoop jar \
/opt/cloudera/parcels/CDH-5.11.2-1.cdh5.11.2.p0.4/jars/hadoop-rumen-2.6.0-cdh5.11.2.jar \
org.apache.hadoop.tools.rumen.TraceBuilder \
file:///tmp/job-trace.json \
file:///tmp/job-topology.json \
hdfs:///user/history/done/2018/06/06/000000
- 这样在生成的job-trace.json中就可以查看当天的所有job的具体信息
{
"jobID" : "job_1528373726326_0204",
"queue" : "default",
"user" : "hive",
"jobName" : "INSERT OVERWRITE TABL...st_day('2018-05-16')(Stage-1)",
"submitTime" : 1528781559636,
"finishTime" : 1528781571551,
"mapTasks" : [ {
"startTime" : 1528781565131,
"taskID" : "task_1528373726326_0204_m_000000",
"taskType" : "MAP",
"finishTime" : 1528781571514,
"attempts" : [ {
"startTime" : 1528781567259,
"finishTime" : 1528781571514,
"attemptID" : "attempt_1528373726326_0204_m_000000_0",
"clockSplits" : [ 4201, 5, 4, 5, 4, 5, 5, 4, 5, 4, 5, 5 ],
"cpuUsages" : [ 170, 171, 171, 171, 171, 171, 170, 171, 171, 171, 171, 171 ],
"vmemKbytes" : [ 116591, 349773, 582955, 816136, 1049319, 1282500, 1515683, 1748864, 1982047, 2215229, 2448410, 2681593 ],
"physMemKbytes" : [ 17301, 51903, 86505, 121107, 155710, 190312, 224915, 259516, 294119, 328722, 363323, 397926 ],
"shuffleFinished" : -1,
"sortFinished" : -1,
"hdfsBytesRead" : 7795,
"hdfsBytesWritten" : 2,
"fileBytesRead" : 0,
"fileBytesWritten" : 255682,
"mapInputRecords" : 0,
"mapOutputBytes" : -1,
"mapOutputRecords" : 0,
"combineInputRecords" : -1,
"reduceInputGroups" : -1,
"reduceInputRecords" : -1,
"reduceShuffleBytes" : -1,
"reduceOutputRecords" : -1,
"spilledRecords" : 0,
"mapInputBytes" : -1,
"resourceUsageMetrics" : {
"heapUsage" : 623378432,
"virtualMemoryUsage" : 2865340416,
"physicalMemoryUsage" : 425193472,
"cumulativeCpuUsage" : 2050
},
……
……
Counter信息解读
Resource Usage Metrics
一般来说,resourceUsageMetrics中的指标就可以体现出某个task attempt的资源使用情况
"resourceUsageMetrics" : {
"heapUsage" : 623378432,
"virtualMemoryUsage" : 2865340416,
"physicalMemoryUsage" : 425193472,
"cumulativeCpuUsage" : 2050
},
具体的更新逻辑在Task类中
/**
* Update resource information counters
*/
void updateResourceCounters() {
// Update generic resource counters
updateHeapUsageCounter();
// Updating resources specified in ResourceCalculatorProcessTree
if (pTree == null) {
return;
}
pTree.updateProcessTree();
long cpuTime = pTree.getCumulativeCpuTime();
long pMem = pTree.getCumulativeRssmem();
long vMem = pTree.getCumulativeVmem();
// Remove the CPU time consumed previously by JVM reuse
cpuTime -= initCpuCumulativeTime;
counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
}
Progress Split Counter
但是job-trace.json中有一组counter信息很是奇怪
"clockSplits" : [ 4201, 5, 4, 5, 4, 5, 5, 4, 5, 4, 5, 5 ],
"cpuUsages" : [ 170, 171, 171, 171, 171, 171, 170, 171, 171, 171, 171, 171 ],
"vmemKbytes" : [ 116591, 349773, 582955, 816136, 1049319, 1282500, 1515683, 1748864, 1982047, 2215229, 2448410, 2681593 ],
"physMemKbytes" : [ 17301, 51903, 86505, 121107, 155710, 190312, 224915, 259516, 294119, 328722, 363323, 397926 ]
表面看是四个size为12的数组,这些其实是在task执行的过程中,每隔一段时间就记录下当前时刻的性能指标。
其中核心的类就是ProgressSplitsBlock
ProgressSplitsBlock
ProgressSplitsBlock(int numberSplits) {
progressWallclockTime
= new CumulativePeriodicStats(numberSplits);
progressCPUTime
= new CumulativePeriodicStats(numberSplits);
progressVirtualMemoryKbytes
= new StatePeriodicStats(numberSplits);
progressPhysicalMemoryKbytes
= new StatePeriodicStats(numberSplits);
}
ProgressSplitsBlock中包含了四组统计信息,分别是距离任务启动的时间、CPU时间、虚拟内存占用、物理内存占用。其中CumulativePeriodicStats和StatePeriodicStats稍有区别。
- CumulativePeriodicStats是可以累加的指标,数组中的值相加即总计的值。
An easy-to-understand example of this kind of quantity would
be a distance traveled. It makes sense to consider that
portion of the total travel that can be apportioned to each
bucket.
170+171+171+171+171+171+170+171+171+171+171+171 = 2050
- StatePeriodicStats是一段时间内的平均值,数组中的值其实是一段时间的中位数
An easy-to-understand example of this kind of quantity would
be a temperature. It makes sense to consider the mean
temperature over a progress range.
创建ProgressSplitsBlock并更新的地方是TaskInProgress
TaskInProgress
- 创建ProgressSplitsBlock
synchronized ProgressSplitsBlock getSplits(TaskAttemptID statusAttemptID) {
ProgressSplitsBlock result = splitsBlocks.get(statusAttemptID);
if (result == null) {
result
= new ProgressSplitsBlock
(conf.getInt(JTConfig.JT_JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS,
ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS));
splitsBlocks.put(statusAttemptID, result);
}
return result;
}
DEFAULT_NUMBER_PROGRESS_SPLITS为12,所以json中我们看到的数组的size为12
- 更新ProgressSplitsBlock
Counters.Counter cpuCounter = counters.findCounter(CPU_COUNTER_KEY);
if (cpuCounter != null && cpuCounter.getCounter() <= Integer.MAX_VALUE) {
splitsBlock.progressCPUTime.extend
(newProgress, (int)(cpuCounter.getCounter()));
}
extend方法中有一处特殊处理,就是下一次更新的时候,任务的progress的跨度太大,比如从30%直接跳到了90%,则中间的结果需要填充。所以这时,中间的结果并不是实际测量出来的值,而是平滑计算后的结果。