JobHistory中的resource usage分析

背景

需要统计hive中每个sql的counter信息,MapReduce的框架中一共有以下counter信息

  // Counters used by Task subclasses
  public static enum Counter { 
    MAP_INPUT_RECORDS, 
    MAP_OUTPUT_RECORDS,
    MAP_SKIPPED_RECORDS,
    MAP_INPUT_BYTES, 
    MAP_OUTPUT_BYTES,
    COMBINE_INPUT_RECORDS,
    COMBINE_OUTPUT_RECORDS,
    REDUCE_INPUT_GROUPS,
    REDUCE_SHUFFLE_BYTES,
    REDUCE_INPUT_RECORDS,
    REDUCE_OUTPUT_RECORDS,
    REDUCE_SKIPPED_GROUPS,
    REDUCE_SKIPPED_RECORDS,
    SPILLED_RECORDS,
    SPLIT_RAW_BYTES,
    CPU_MILLISECONDS,
    PHYSICAL_MEMORY_BYTES,
    VIRTUAL_MEMORY_BYTES,
    COMMITTED_HEAP_BYTES
  }

可以看出,counter信息主要有两类:

  • 一类是MapReduce框架中IO方面的一些统计,比如记录数、字节数等等
  • 另一类是运行时宿主机的性能指标,比如CPU时间、内存使用等等

Counter信息获取

  • 使用自带的hadoop rumen项目对job history进行解析,具体命令如下:
 hadoop jar \
  /opt/cloudera/parcels/CDH-5.11.2-1.cdh5.11.2.p0.4/jars/hadoop-rumen-2.6.0-cdh5.11.2.jar \
  org.apache.hadoop.tools.rumen.TraceBuilder \
  file:///tmp/job-trace.json \
  file:///tmp/job-topology.json \
  hdfs:///user/history/done/2018/06/06/000000

  • 这样在生成的job-trace.json中就可以查看当天的所有job的具体信息
 {
  "jobID" : "job_1528373726326_0204",
  "queue" : "default",
  "user" : "hive",
  "jobName" : "INSERT OVERWRITE TABL...st_day('2018-05-16')(Stage-1)",
  "submitTime" : 1528781559636,
  "finishTime" : 1528781571551,
  "mapTasks" : [ {
    "startTime" : 1528781565131,
    "taskID" : "task_1528373726326_0204_m_000000",
    "taskType" : "MAP",
    "finishTime" : 1528781571514,
    "attempts" : [ {
      "startTime" : 1528781567259,
      "finishTime" : 1528781571514,
      "attemptID" : "attempt_1528373726326_0204_m_000000_0",
      "clockSplits" : [ 4201, 5, 4, 5, 4, 5, 5, 4, 5, 4, 5, 5 ],
      "cpuUsages" : [ 170, 171, 171, 171, 171, 171, 170, 171, 171, 171, 171, 171 ],
      "vmemKbytes" : [ 116591, 349773, 582955, 816136, 1049319, 1282500, 1515683, 1748864, 1982047, 2215229, 2448410, 2681593 ],
      "physMemKbytes" : [ 17301, 51903, 86505, 121107, 155710, 190312, 224915, 259516, 294119, 328722, 363323, 397926 ],
      "shuffleFinished" : -1,
      "sortFinished" : -1,
      "hdfsBytesRead" : 7795,
      "hdfsBytesWritten" : 2,
      "fileBytesRead" : 0,
      "fileBytesWritten" : 255682,
      "mapInputRecords" : 0,
      "mapOutputBytes" : -1,
      "mapOutputRecords" : 0,
      "combineInputRecords" : -1,
      "reduceInputGroups" : -1,
      "reduceInputRecords" : -1,
      "reduceShuffleBytes" : -1,
      "reduceOutputRecords" : -1,
      "spilledRecords" : 0,
      "mapInputBytes" : -1,
      "resourceUsageMetrics" : {
        "heapUsage" : 623378432,
        "virtualMemoryUsage" : 2865340416,
        "physicalMemoryUsage" : 425193472,
        "cumulativeCpuUsage" : 2050
      },
……
……

Counter信息解读

Resource Usage Metrics

一般来说,resourceUsageMetrics中的指标就可以体现出某个task attempt的资源使用情况

  "resourceUsageMetrics" : {
    "heapUsage" : 623378432,
    "virtualMemoryUsage" : 2865340416,
    "physicalMemoryUsage" : 425193472,
    "cumulativeCpuUsage" : 2050
  },

具体的更新逻辑在Task类中

  /**
   * Update resource information counters
   */
  void updateResourceCounters() {
    // Update generic resource counters
    updateHeapUsageCounter();

    // Updating resources specified in ResourceCalculatorProcessTree
    if (pTree == null) {
      return;
    }
    pTree.updateProcessTree();
    long cpuTime = pTree.getCumulativeCpuTime();
    long pMem = pTree.getCumulativeRssmem();
    long vMem = pTree.getCumulativeVmem();
    // Remove the CPU time consumed previously by JVM reuse
    cpuTime -= initCpuCumulativeTime;
    counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
    counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
    counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
  }

Progress Split Counter

但是job-trace.json中有一组counter信息很是奇怪

 "clockSplits" : [ 4201, 5, 4, 5, 4, 5, 5, 4, 5, 4, 5, 5 ],
 "cpuUsages" : [ 170, 171, 171, 171, 171, 171, 170, 171, 171, 171, 171, 171 ],
 "vmemKbytes" : [ 116591, 349773, 582955, 816136, 1049319, 1282500, 1515683, 1748864, 1982047, 2215229, 2448410, 2681593 ],
 "physMemKbytes" : [ 17301, 51903, 86505, 121107, 155710, 190312, 224915, 259516, 294119, 328722, 363323, 397926 ]

表面看是四个size为12的数组,这些其实是在task执行的过程中,每隔一段时间就记录下当前时刻的性能指标。

其中核心的类就是ProgressSplitsBlock

ProgressSplitsBlock

  ProgressSplitsBlock(int numberSplits) {
    progressWallclockTime
      = new CumulativePeriodicStats(numberSplits);
    progressCPUTime
      = new CumulativePeriodicStats(numberSplits);
    progressVirtualMemoryKbytes
      = new StatePeriodicStats(numberSplits);
    progressPhysicalMemoryKbytes
      = new StatePeriodicStats(numberSplits);
  }

ProgressSplitsBlock中包含了四组统计信息,分别是距离任务启动的时间、CPU时间、虚拟内存占用、物理内存占用。其中CumulativePeriodicStats和StatePeriodicStats稍有区别。

  • CumulativePeriodicStats是可以累加的指标,数组中的值相加即总计的值。

An easy-to-understand example of this kind of quantity would
be a distance traveled. It makes sense to consider that
portion of the total travel that can be apportioned to each
bucket.

170+171+171+171+171+171+170+171+171+171+171+171 = 2050

  • StatePeriodicStats是一段时间内的平均值,数组中的值其实是一段时间的中位数

An easy-to-understand example of this kind of quantity would
be a temperature. It makes sense to consider the mean
temperature over a progress range.

创建ProgressSplitsBlock并更新的地方是TaskInProgress

TaskInProgress

  • 创建ProgressSplitsBlock
  synchronized ProgressSplitsBlock getSplits(TaskAttemptID statusAttemptID) {
    ProgressSplitsBlock result = splitsBlocks.get(statusAttemptID);

    if (result == null) {
      result
        = new ProgressSplitsBlock
            (conf.getInt(JTConfig.JT_JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS,
                         ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS));
      splitsBlocks.put(statusAttemptID, result);
    }

    return result;
  }

DEFAULT_NUMBER_PROGRESS_SPLITS为12,所以json中我们看到的数组的size为12

  • 更新ProgressSplitsBlock
      Counters.Counter cpuCounter = counters.findCounter(CPU_COUNTER_KEY);
      if (cpuCounter != null &amp;&amp; cpuCounter.getCounter() <= Integer.MAX_VALUE) {
        splitsBlock.progressCPUTime.extend
          (newProgress, (int)(cpuCounter.getCounter()));
      }

extend方法中有一处特殊处理,就是下一次更新的时候,任务的progress的跨度太大,比如从30%直接跳到了90%,则中间的结果需要填充。所以这时,中间的结果并不是实际测量出来的值,而是平滑计算后的结果。

Enable @Transactional on private methods

Problem

在spring中,如果为某个method增加@Transactional注解,则该方法内的数据库操作都处于事务中。

但是,如果方法中有一段业务逻辑在事务之外的话,比如

public void method(){
    // 数据库操作,开启事务
    Result result = handleDbOperation();
    // 业务逻辑处理,不需要事务
    handleBizLogic(result);
}

@Transactional
public void handleDbOperation(){
……
}

则这里的注解其实是不生效的

Why?

Public visibility

spring doc中有相关的说明

When using proxies, you should apply the @Transactional annotation only to methods with public visibility. If you do annotate protected, private or package-visible methods with the @Transactional annotation, no error is raised, but the annotated method does not exhibit the configured transactional settings. Consider the use of AspectJ (see below) if you need to annotate non-public methods.

External call

in proxy mode (which is the default), only external method calls coming in through the proxy are intercepted. This means that self-invocation, in effect, a method within the target object calling another method of the target object, will not lead to an actual transaction at runtime even if the invoked method is marked with @Transactional

Workaround

Another bean

新建另一个service,将事务相关的代码全部都放在这个service中,从而将内部调用转变为外部调用

@Autowird
private DbService dbservice;

public void method(){
    // 数据库操作,开启事务
    Result result = dbservice.handleDbOperation();
    // 业务逻辑处理,不需要事务
    handleBizLogic(result);
}

TransactionUtil

同样是转为外部调用,但是比较巧妙的是,这里利用了lambda表达式作为传参,从而不需要将代码逻辑进行迁移。

@Autowird
private TransactionHelper helper;

public void method(){
    // 数据库操作,开启事务
    Result result = helper.withTransaction(() -> handleDbOperation());
    // 业务逻辑处理,不需要事务
    handleBizLogic(result);
}

// TransactionHelper.java :
@Service
public class TransactionHelper {
    @Transactional
    public <T> T withTransaction(Supplier<T> supplier) {
        return supplier.get();
    }
    @Transactional
    public void withTransaction(Runnable runnable) {
        runnable.run();
    }
}

MyBatis的localCache问题及解决方案

背景

在单元测试中,我们需要验证不同的场景下系统功能是否正常。而构造不同的测试场景,就需要对db中的数据做相应的订正,同时通过事务的回滚,保证单元测试结束后,db中的数据回归到初始状态

问题描述

单元测试类上已经申明了@Transactional注解,在下面的这个测试场景代码中

result1 = queryResult();
jdbcTemplate.update("***");
jdbcTemplate.update("***");
result2 = queryResult();

result1和result2的结果完全一致,不符合预期

原因探讨

mybatis默认开启了localCache功能,默认的scope是session级别。那么在同一个session中,相同的Query重复执行的时候,直接从缓存中读取结果。

  @SuppressWarnings("unchecked")
  @Override
  public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    ……
    List<E> list;
    try {
      queryStack++;
      list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
      if (list != null) {
        handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
      } else {
        list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
      }
    } finally {
      queryStack--;
    }
    ……
    return list;
  }

解决方案

只需要在select element中加上flushCache=”true”,则这个statement执行结束后,会将localCache清空,下一次查询的时候就会直接查询数据库

    <select id="selectAll" resultMap="BaseResultMap" flushCache="true">
        select
        <include refid="Base_Column_List"/>
        from Table
    </select>

如果配置了这个选项,则在执行之前会将缓存清空,保证从数据库重新查询一次

  @Override
  public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
      throws SQLException {
    Cache cache = ms.getCache();
    if (cache != null) {
      flushCacheIfRequired(ms);
      ……
  }

  private void flushCacheIfRequired(MappedStatement ms) {
    Cache cache = ms.getCache();
    if (cache != null &amp;&amp; ms.isFlushCacheRequired()) {      
      tcm.clear(cache);
    }
  }

不过需要注意的是,mybatis默认的作用域是session级别,也就是说清空以后,会影响这个session内的所有sql。
可以在mybatis的配置中,配置cache的作用域是session级别还是sql级别

<setting name="localCacheScope" value="SESSION"/>
<setting name="localCacheScope" value="STATEMENT"/>

ibatis to mybatis.Part 2

在转换的过程中,陆续遇到了几个问题

jdbcType大小写问题

Build.xml
<replace dir="destination" includes="*.xml" token=":NUMERIC#" value=",jdbcType=NUMERIC#" encoding="UTF8"/>
<replace dir="destination" includes="*.xml" token=":TIMESTAMP#" value=",jdbcType=TIMESTAMP#" encoding="UTF8"/>
<replace dir="destination" includes="*.xml" token=":VARCHAR#" value=",jdbcType=VARCHAR#" encoding="UTF8"/>

jdbcType的大小写需要实现转换,另外需要添加更多的类型。

dynamic的转换缺失

默认的migrate.xslt没有对dynamic element做转换,需要新增转换逻辑。

首先确定dynamic element的定义,ibatis.dtd中可以找到

<!--Wrapper tag that allows for an overall prepend, open and close.-->
<!ELEMENT dynamic (#PCDATA | include | iterate | isParameterPresent | isNotParameterPresent | isEmpty | isNotEmpty | isNotNull | isNull | isNotEqual | isEqual | isGreaterThan | isGreaterEqual | isLessThan | isLessEqual | isPropertyAvailable | isNotPropertyAvailable)*>
<!ATTLIST dynamic
prepend CDATA #IMPLIED
open CDATA #IMPLIED
close CDATA #IMPLIED
>

然后将dynamic element的处理逻辑加入到migrate.xslt中。

migrate.xslt
<xsl:template match="dynamic">
    <xsl:element name="trim">
        <xsl:attribute name="prefix">
            <xsl:value-of select="@prepend" />
        </xsl:attribute>
        <xsl:attribute name="prefixOverrides">
            <xsl:text>and|or|,</xsl:text>
        </xsl:attribute>
        <xsl:value-of select="@open" />
        <xsl:apply-templates/>
        <xsl:value-of select="@close" />
    </xsl:element>  
</xsl:template>

效果如下:

ibatis source
<dynamic prepend=" where " close="and (1=1)">
  <isNotNull property="offerId" prepend="and">
    item_id=#offerId#
  </isNotNull>
</dynamic>

mybatis target
<trim prefix=" where " prefixOverrides="and|or|,">
  <if test="offerId != null">and
    item_id=#{offerId}
  </if>
</trim>


isNotEmpty的翻译错误

原先isNotEmpty的转换逻辑为:

<xsl:template match="isNotEmpty">
  <xsl:element name="if">
    <xsl:attribute name="test">
      <xsl:if test="substring-before(@property, '.')">
        <xsl:value-of select="substring-before(@property, '.')" /><xsl:text> != null and </xsl:text>
      </xsl:if>
      <xsl:value-of select="@property" /><xsl:text> != null and </xsl:text>
      <xsl:value-of select="@property" /><xsl:text>.size != 0</xsl:text>
    </xsl:attribute>
    <xsl:value-of select="@prepend" />
      <xsl:apply-templates/>
    </xsl:element>
</xsl:template>

但是当遇到iterate类型时,直接!=”是会报错的。需要针对iterate类型,修改isNotEmpty的逻辑

<xsl:template match="isNotEmpty">
  <xsl:element name="if">
    <xsl:attribute name="test">
      <xsl:if test="substring-before(@property, '.')">
        <xsl:value-of select="substring-before(@property, '.')" /><xsl:text> != null and </xsl:text>
      </xsl:if>
      <xsl:value-of select="@property" /><xsl:text> != null and </xsl:text>
      <xsl:choose>
        <xsl:when test="child::node()[contains(name(), 'iterate')]">
          <xsl:value-of select="@property" /><xsl:text>.size != 0</xsl:text>
        </xsl:when>
        <xsl:otherwise>
          <xsl:value-of select="@property" /><xsl:text> != ''</xsl:text>
        </xsl:otherwise>
      </xsl:choose>
    </xsl:attribute>
    <xsl:value-of select="@prepend" />
      <xsl:apply-templates/>
    </xsl:element>
</xsl:template>

最终的转换效果如下:

ibatis source
<isNotEmpty property="offerIds" prepend="and">
  item_id in
  <iterate property="offerIds" conjunction="," open="(" close=")">
    #offerIds[]#
  </iterate>
</isNotEmpty>

mybatis target
<if test="offerIds != null and offerIds.size != 0">and
  item_id in
  <foreach collection="offerIds" item="item" separator="," close=")" open="(">
    #{item}
  </foreach>
</if>

ibatis to mybatis.Part 1

目标

将ibatis的dynamic SQL,翻译为符合mybatis规范的SQL

两者区别

ibatis

官方文档
标签可以嵌套,具体包括以下几种:

  • Binary Conditional Elements
<isEqual prepend="AND"  property="status"  compareValue="Y">
MARRIED = 'TRUE'
</isEqual> 

  • Unary Conditional Elements
<isNotNull prepend="AND" property="order.id" >
  ORDER.ORDER_ID = #order.id#
</isNotEmpty>

  • Parameter Present Elements
<isParameterPresent prepend="AND">
  EMPLOYEE_TYPE = #empType#
</isParameterPresent>

  • Iterate Element
<select id="select-test" resultMap="MyTableResult" parameterClass="list">
select * from my_table where col_1 in
  <iterate open="(" close=")" conjunction=",">
   #[]#
  </iterate>
</select>

  • Simple Dynamic SQL Elements
<statement id="getProduct" resultMap="get-product-result">
  select * from PRODUCT order by $preferredOrder$
</statement>

mybatis

官方文档

  • if
<if test="title != null">
  AND title like #{title}
</if>
<if test="author != null and author.name != null">
  AND author_name like #{author.name}
</if>

  • choose (when, otherwise)
<choose>
  <when test="title != null">
    AND title like #{title}
  </when>
  <when test="author != null and author.name != null">
    AND author_name like #{author.name}
  </when>
  <otherwise>
    AND featured = 1
  </otherwise>
</choose>

  • trim (where, set)
<trim prefix="WHERE" prefixOverrides="AND |OR ">   ... </trim>
  • foreach
<foreach item="item" index="index" collection="list"
    open="(" separator="," close=")">
      #{item}
</foreach>

  • Direct substitution
<select id="selectByNetworkId" parameterType="java.util.Map" resultMap="userResult">
    select user_profile.user_profile_id,  user_profile.first_name
    from user_profile user_profile
    where  user_profile.network_id = #{network_id}  
    order by  user_profile.user_profile.first_name ${sortType}
</select>

方案

  1. 传参格式的转换
    • replace $id$ with ${id}
    • replace #id# with #{id}
    • replace xyz[] with item for use in iterators
  2. Element写法的转换
    • isNull -> object == null OR object.property == null
    • isNotNull -> object != null AND object.property != null
    • isNotEmpty -> object != null AND object.property != null AND object.property != ”
    • isEmpty -> object != null AND (object.property == null OR object.property == ”)
    • isGreaterThan -> property > value (number or another property)
    • isLessThan -> property < value (number or another property)
    • isEqual -> object != null AND object.property == value (number or another property)
    • isLessEqual -> object != null AND object.property <= value (number or another property)
    • iterate
  3. 无法转换的Element
    • dynamic
    • isPropertyAvailable
    • isNotPropertyAvailable
    • isNotParameterPresent
    • isParameterPresent

(to be continued ……)