JobHistory中的resource usage分析

背景

需要统计hive中每个sql的counter信息,MapReduce的框架中一共有以下counter信息

  // Counters used by Task subclasses
  public static enum Counter { 
    MAP_INPUT_RECORDS, 
    MAP_OUTPUT_RECORDS,
    MAP_SKIPPED_RECORDS,
    MAP_INPUT_BYTES, 
    MAP_OUTPUT_BYTES,
    COMBINE_INPUT_RECORDS,
    COMBINE_OUTPUT_RECORDS,
    REDUCE_INPUT_GROUPS,
    REDUCE_SHUFFLE_BYTES,
    REDUCE_INPUT_RECORDS,
    REDUCE_OUTPUT_RECORDS,
    REDUCE_SKIPPED_GROUPS,
    REDUCE_SKIPPED_RECORDS,
    SPILLED_RECORDS,
    SPLIT_RAW_BYTES,
    CPU_MILLISECONDS,
    PHYSICAL_MEMORY_BYTES,
    VIRTUAL_MEMORY_BYTES,
    COMMITTED_HEAP_BYTES
  }

可以看出,counter信息主要有两类:

  • 一类是MapReduce框架中IO方面的一些统计,比如记录数、字节数等等
  • 另一类是运行时宿主机的性能指标,比如CPU时间、内存使用等等

Counter信息获取

  • 使用自带的hadoop rumen项目对job history进行解析,具体命令如下:
 hadoop jar \
  /opt/cloudera/parcels/CDH-5.11.2-1.cdh5.11.2.p0.4/jars/hadoop-rumen-2.6.0-cdh5.11.2.jar \
  org.apache.hadoop.tools.rumen.TraceBuilder \
  file:///tmp/job-trace.json \
  file:///tmp/job-topology.json \
  hdfs:///user/history/done/2018/06/06/000000

  • 这样在生成的job-trace.json中就可以查看当天的所有job的具体信息
 {
  "jobID" : "job_1528373726326_0204",
  "queue" : "default",
  "user" : "hive",
  "jobName" : "INSERT OVERWRITE TABL...st_day('2018-05-16')(Stage-1)",
  "submitTime" : 1528781559636,
  "finishTime" : 1528781571551,
  "mapTasks" : [ {
    "startTime" : 1528781565131,
    "taskID" : "task_1528373726326_0204_m_000000",
    "taskType" : "MAP",
    "finishTime" : 1528781571514,
    "attempts" : [ {
      "startTime" : 1528781567259,
      "finishTime" : 1528781571514,
      "attemptID" : "attempt_1528373726326_0204_m_000000_0",
      "clockSplits" : [ 4201, 5, 4, 5, 4, 5, 5, 4, 5, 4, 5, 5 ],
      "cpuUsages" : [ 170, 171, 171, 171, 171, 171, 170, 171, 171, 171, 171, 171 ],
      "vmemKbytes" : [ 116591, 349773, 582955, 816136, 1049319, 1282500, 1515683, 1748864, 1982047, 2215229, 2448410, 2681593 ],
      "physMemKbytes" : [ 17301, 51903, 86505, 121107, 155710, 190312, 224915, 259516, 294119, 328722, 363323, 397926 ],
      "shuffleFinished" : -1,
      "sortFinished" : -1,
      "hdfsBytesRead" : 7795,
      "hdfsBytesWritten" : 2,
      "fileBytesRead" : 0,
      "fileBytesWritten" : 255682,
      "mapInputRecords" : 0,
      "mapOutputBytes" : -1,
      "mapOutputRecords" : 0,
      "combineInputRecords" : -1,
      "reduceInputGroups" : -1,
      "reduceInputRecords" : -1,
      "reduceShuffleBytes" : -1,
      "reduceOutputRecords" : -1,
      "spilledRecords" : 0,
      "mapInputBytes" : -1,
      "resourceUsageMetrics" : {
        "heapUsage" : 623378432,
        "virtualMemoryUsage" : 2865340416,
        "physicalMemoryUsage" : 425193472,
        "cumulativeCpuUsage" : 2050
      },
……
……

Counter信息解读

Resource Usage Metrics

一般来说,resourceUsageMetrics中的指标就可以体现出某个task attempt的资源使用情况

  "resourceUsageMetrics" : {
    "heapUsage" : 623378432,
    "virtualMemoryUsage" : 2865340416,
    "physicalMemoryUsage" : 425193472,
    "cumulativeCpuUsage" : 2050
  },

具体的更新逻辑在Task类中

  /**
   * Update resource information counters
   */
  void updateResourceCounters() {
    // Update generic resource counters
    updateHeapUsageCounter();

    // Updating resources specified in ResourceCalculatorProcessTree
    if (pTree == null) {
      return;
    }
    pTree.updateProcessTree();
    long cpuTime = pTree.getCumulativeCpuTime();
    long pMem = pTree.getCumulativeRssmem();
    long vMem = pTree.getCumulativeVmem();
    // Remove the CPU time consumed previously by JVM reuse
    cpuTime -= initCpuCumulativeTime;
    counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
    counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
    counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
  }

Progress Split Counter

但是job-trace.json中有一组counter信息很是奇怪

 "clockSplits" : [ 4201, 5, 4, 5, 4, 5, 5, 4, 5, 4, 5, 5 ],
 "cpuUsages" : [ 170, 171, 171, 171, 171, 171, 170, 171, 171, 171, 171, 171 ],
 "vmemKbytes" : [ 116591, 349773, 582955, 816136, 1049319, 1282500, 1515683, 1748864, 1982047, 2215229, 2448410, 2681593 ],
 "physMemKbytes" : [ 17301, 51903, 86505, 121107, 155710, 190312, 224915, 259516, 294119, 328722, 363323, 397926 ]

表面看是四个size为12的数组,这些其实是在task执行的过程中,每隔一段时间就记录下当前时刻的性能指标。

其中核心的类就是ProgressSplitsBlock

ProgressSplitsBlock

  ProgressSplitsBlock(int numberSplits) {
    progressWallclockTime
      = new CumulativePeriodicStats(numberSplits);
    progressCPUTime
      = new CumulativePeriodicStats(numberSplits);
    progressVirtualMemoryKbytes
      = new StatePeriodicStats(numberSplits);
    progressPhysicalMemoryKbytes
      = new StatePeriodicStats(numberSplits);
  }

ProgressSplitsBlock中包含了四组统计信息,分别是距离任务启动的时间、CPU时间、虚拟内存占用、物理内存占用。其中CumulativePeriodicStats和StatePeriodicStats稍有区别。

  • CumulativePeriodicStats是可以累加的指标,数组中的值相加即总计的值。

An easy-to-understand example of this kind of quantity would
be a distance traveled. It makes sense to consider that
portion of the total travel that can be apportioned to each
bucket.

170+171+171+171+171+171+170+171+171+171+171+171 = 2050

  • StatePeriodicStats是一段时间内的平均值,数组中的值其实是一段时间的中位数

An easy-to-understand example of this kind of quantity would
be a temperature. It makes sense to consider the mean
temperature over a progress range.

创建ProgressSplitsBlock并更新的地方是TaskInProgress

TaskInProgress

  • 创建ProgressSplitsBlock
  synchronized ProgressSplitsBlock getSplits(TaskAttemptID statusAttemptID) {
    ProgressSplitsBlock result = splitsBlocks.get(statusAttemptID);

    if (result == null) {
      result
        = new ProgressSplitsBlock
            (conf.getInt(JTConfig.JT_JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS,
                         ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS));
      splitsBlocks.put(statusAttemptID, result);
    }

    return result;
  }

DEFAULT_NUMBER_PROGRESS_SPLITS为12,所以json中我们看到的数组的size为12

  • 更新ProgressSplitsBlock
      Counters.Counter cpuCounter = counters.findCounter(CPU_COUNTER_KEY);
      if (cpuCounter != null && cpuCounter.getCounter() <= Integer.MAX_VALUE) {
        splitsBlock.progressCPUTime.extend
          (newProgress, (int)(cpuCounter.getCounter()));
      }

extend方法中有一处特殊处理,就是下一次更新的时候,任务的progress的跨度太大,比如从30%直接跳到了90%,则中间的结果需要填充。所以这时,中间的结果并不是实际测量出来的值,而是平滑计算后的结果。

发表评论