大师网-带你快速走向大师之路 解决你在学习过程中的疑惑,带你快速进入大师之门。节省时间,提升效率

Mysql中使用流式查询避免数据量过大导致OOM-后续

一、前言

之前http://www.jianshu.com/p/0339c6fe8b61 介绍了MySQL中三种使用流式方法,看起来很优雅,实则优雅的同时还是有一些注意事项的,下面就针对流式查询时候的注意事项进行介绍。

二、 同一个连接在游标迭代数据过程中不能被复用

2.1 简单介绍

先贴下MySQL Connector/J 5.1 Developer Guide中原文:

There are some caveats with this approach. You must read all of the rows in the result set (or close it) before you can issue any other queries on the connection, or an exception will be thrown.
也就是说当通过流式查询获取一个ResultSet后,在你通过next迭代出所有元素之前或者调用close关闭它之前,你不能使用同一个数据库连接去发起另外一个查询,否者抛出异常(第一次调用的正常,第二次的抛出异常)。

Therefore, if using streaming results, process them as quickly as possible if you want to maintain concurrent access to the tables referenced by the statement producing the result set.
如果你想要保持访问表并发量,那么就要尽可能快的把流式Resultset内容处理完毕。

之所以有这个限制是因为非游标情况下我们在得到resultset后,mysqlclient已经把数据全部放到了resultset,所以这时候该数据库连接就空闲了,所以可以去执行其他查询,而流式查询时候返回给我们Resultset后,所有数据并不都在Resultset,当我们调用next时候需要使用数据库连接从Server获取数据,所以在整个数据访问完毕之前这个连接一直被占用,所以才有了同一个连接在游标迭代数据过程中不能被复用的注意事项。

2.2 一个例子


public static void testOneConn() throws Exception {

        System.out.print("begn");
        String cmdSql = "select app_key,device_id,brand from accs_device_info where app_key='21646297' AND app_version = '6.1.7' and package_name='com.taobao.taobao'";

        Connection conn = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;

        ExecutorService executor = Executors.newSingleThreadExecutor();

        try {

            conn = ds.getConnection();

            stmt = conn.prepareStatement(cmdSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            stmt.setFetchSize(Integer.MIN_VALUE);
                       //第一次查询(1)
            rs = stmt.executeQuery();
            final ResultSet tempRs = rs;
            Future<Integer> feture = executor.submit(new Callable<Integer>() {
                public Integer call() throws InterruptedException {
                    try {
                        while (tempRs.next()) {
                            try {
                                System.out.println("app_key:" + tempRs.getString(1) + "device_id:" + tempRs.getString(2)
                                        + "brand:" + tempRs.getString(3));
                            } catch (SQLException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                            
                            Thread.sleep(1000);

                        }
                    } catch (SQLException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    return null;
                }
            });

            Thread.sleep(2000);

            try {
                //第二次查询(2)
                stmt.executeQuery();

            } catch (Exception e) {
                System.out.println("second search:" + e.getLocalizedMessage());

            }

            // 等待子线程执行完毕
            feture.get();

        } catch (Exception e) {
            System.out.println("first search:" + e.getLocalizedMessage());
        } finally {

            close(stmt, rs, conn);

        }
    }
    

执行上面代码:
app_key:111device_id:222brand:333
second search:Streaming result set com.mysql.jdbc.RowDataDynamic@3e0c5a62 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
app_key:233222device_id:232332brand:45454
app_key:1234device_id:54brand:eyu345
........
可知第二次查询时候抛出了异常,说是RowDataDynamic@3e0c5a62 数据集还是激活状态,当一个连接上已经有一个打开的流式Resultset时候不能再发起一个查询,并且在尝试更多查询前确保调用了close方法。

而第一次查询不收影响继续自己的迭代数据。

那么就来看下在第二次查询前调用close方法会有啥效果。
在 查询(2)前添加rs.close();然后执行,结果是首先子线程会不断输出迭代结果,然后主线程调用close,调用close后子线程不在输出结果,然后主线程的close方法也没返回,这是啥情况那?不急,看看close里面做了啥:

public void close() throws SQLException {
        ...

    //获取链接的锁
        Object mutex = this;

        MySQLConnection conn = null;

        if (this.owner != null) {
            conn = this.owner.connection;

            if (conn != null) {
                mutex = conn.getConnectionMutex();
            }
        }

        synchronized (mutex) {
            // drain the rest of the records.
            while (next() != null) {
                hadMore = true;
                howMuchMore++;

                if (howMuchMore % 100 == 0) {
                    Thread.yield();
                }
            }

        ...
    }

可知在调用close时候,里面还是循环调用next尝试把剩余记录迭代出来丢弃掉。我们调用close之所以没返回,时间上是因为内部在丢弃数据中,其实文档里面说迭代数据完毕或者调用close后才能调用新的查询,其实调用close作用还是要把Resultset里面的数据迭代出来完。

那么还有一个问题,上面说同时子线程也不输出结果了,为啥那?那么我们在回顾下next方法:

public boolean next() throws SQLException {
        synchronized (checkClosed().getConnectionMutex()) {

            ....
        }
    }

protected final MySQLConnection checkClosed() throws SQLException {
    MySQLConnection c = this.connection;
    
    if (c == null) {
        throw SQLError.createSQLException(
                Messages
                        .getString("ResultSet.Operation_not_allowed_after_ResultSet_closed_144"), //$NON-NLS-1$
                SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor());
    }
    
    return c;
}

    

soga,原来调用next方面里面也是先获取链接的锁,但是这个锁现在被close方法锁持有,你可能说synchronized是可重入锁哇,为啥调用next进入不了那? 不错synchronized是可重入锁,但是调用close和调用next是不同线程哦。

三、MyBatisCursorItemReader是线程不安全的

之前文章介绍了使用MyBatisCursorItemReader可以由我们自己操作游标,使用时候在xml注入即可:


<bean id="myMyBatisCursorItemReader" class="org.mybatis.spring.batch.MyBatisCursorItemReader">
    <property name="sqlSessionFactory" ref="sqlSessionFactory" />
    <property name="queryId"
        value="com.taobao.accs.mass.petadata.dal.sqlmap.AccsDeviceInfoDAOMapper.selectByExampleForPetaData" />
</bean>

当我们只有一个线程调用myMyBatisCursorItemReader进行查询操作时候,很优雅,没有问题,但是当多个线程都调用myMyBatisCursorItemReader进行open,read操作就有问题了,因为这货是线程不安全的。下面看下myMyBatisCursorItemReader代码:

public class MyBatisCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {

  private String queryId;

  private SqlSessionFactory sqlSessionFactory;
  private SqlSession sqlSession;

  private Map<String, Object> parameterValues;

  private Cursor<T> cursor;
  private Iterator<T> cursorIterator;
  
  ...
  @Override
  protected T doRead() throws Exception {
    T next = null;
    if (cursorIterator.hasNext()) {
      next = cursorIterator.next();
    }
    return next;
  }

  @Override
  protected void doOpen() throws Exception {
    Map<String, Object> parameters = new HashMap<String, Object>();
    if (parameterValues != null) {
      parameters.putAll(parameterValues);
    }

    sqlSession = sqlSessionFactory.openSession(ExecutorType.SIMPLE);
    cursor = sqlSession.selectCursor(queryId, parameters);
    cursorIterator = cursor.iterator();
  }

  @Override
  protected void doClose() throws Exception {
    cursor.close();
    sqlSession.close();
    cursorIterator = null;
  }

  ...
}
    

哦,原来下面这些变量都不是线程安全的
private SqlSession sqlSession;
private Map<String, Object> parameterValues;
private Cursor<T> cursor;
private Iterator<T> cursorIterator;

那么我们把他改造为ThreadLocal如何, 其实还是有问题,为啥那,看父类:


public abstract class AbstractItemCountingItemStreamItemReader<T> extends AbstractItemStreamItemReader<T> {

    private static final String READ_COUNT = "read.count";

    private static final String READ_COUNT_MAX = "read.count.max";

    private int currentItemCount = 0;

    private int maxItemCount = Integer.MAX_VALUE;

    private boolean saveState = true;

    
    protected void jumpToItem(int itemIndex) throws Exception {
        for (int i = 0; i < itemIndex; i++) {
            read();
        }
    }

    @Override
    public T read() throws Exception, UnexpectedInputException, ParseException {
        if (currentItemCount >= maxItemCount) {
            return null;
        }
        currentItemCount++;
        T item = doRead();
        if(item instanceof ItemCountAware) {
            ((ItemCountAware) item).setItemCount(currentItemCount);
        }
        return item;
    }

    protected int getCurrentItemCount() {
        return currentItemCount;
    }

    
    public void setCurrentItemCount(int count) {
        this.currentItemCount = count;
    }


    public void setMaxItemCount(int count) {
        this.maxItemCount = count;
    }

    @Override
    public void close() throws ItemStreamException {
        super.close();
        currentItemCount = 0;
        try {
            doClose();
        }
        catch (Exception e) {
            throw new ItemStreamException("Error while closing item reader", e);
        }
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        super.open(executionContext);
        try {
            doOpen();
        }
        catch (Exception e) {
            throw new ItemStreamException("Failed to initialize the reader", e);
        }
        if (!isSaveState()) {
            return;
        }

        if (executionContext.containsKey(getExecutionContextKey(READ_COUNT_MAX))) {
            maxItemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT_MAX));
        }

        int itemCount = 0;
        if (executionContext.containsKey(getExecutionContextKey(READ_COUNT))) {
            itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT));
        }
        else if(currentItemCount > 0) {
            itemCount = currentItemCount;
        }

        if (itemCount > 0 && itemCount < maxItemCount) {
            try {
                jumpToItem(itemCount);
            }
            catch (Exception e) {
                throw new ItemStreamException("Could not move to stored position on restart", e);
            }
        }

        currentItemCount = itemCount;

    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        super.update(executionContext);
        if (saveState) {
            Assert.notNull(executionContext, "ExecutionContext must not be null");
            executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);
            if (maxItemCount < Integer.MAX_VALUE) {
                executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount);
            }
        }

    }


    public void setSaveState(boolean saveState) {
        this.saveState = saveState;
    }

    /**
     * The flag that determines whether to save internal state for restarts.
     * @return true if the flag was set
     */
    public boolean isSaveState() {
        return saveState;
    }

}

    

因为里面还有个currentItemCount是线程不安全的,回头看,会发现这个父类对我们没有用,他的作用是限制迭代出来的记录数据,如果不需要这个限制可以不用,所以可以改造为线程安全的:

public class MyBatisCursorItemReaderThreadSafe<T> implements InitializingBean {

    private String queryId;

    private SqlSessionFactory sqlSessionFactory;
    private ThreadLocal<SqlSession> sqlSession = new ThreadLocal<SqlSession>();

    private ThreadLocal<Map<String, Object>> parameterValues = new ThreadLocal<Map<String, Object>>();

    private ThreadLocal<Cursor<T>> cursor = new ThreadLocal<Cursor<T>>();
    private ThreadLocal<Iterator<T>> cursorIterator = new ThreadLocal<Iterator<T>>();

    public MyBatisCursorItemReaderThreadSafe() {
    }

    public T doRead() throws Exception {
        T next = null;
        if (cursorIterator.get().hasNext()) {
            next = cursorIterator.get().next();
        }
        return next;
    }

    public void doOpen() throws Exception {
        Map<String, Object> parameters = new HashMap<String, Object>();
        if (parameterValues != null) {
            parameters.putAll(parameterValues.get());
        }
        SqlSession sqlSessionTmp = null;
        Cursor<T> cursorTemp = null;
        Iterator<T> cursorIteratorTmp = null;
        sqlSessionTmp = sqlSessionFactory.openSession(ExecutorType.SIMPLE);
        cursorTemp = sqlSessionTmp.selectCursor(queryId, parameters);
        cursorIteratorTmp = cursorTemp.iterator();

        // 成功后在设置
        cursor.set(cursorTemp);
        cursorIterator.set(cursorIteratorTmp);
        this.sqlSession.set(sqlSessionTmp);

    }

    public void doClose() throws Exception {

        Cursor<T> cursorTemp = cursor.get();
        if (null != cursorTemp) {
            cursorTemp.close();
            cursor.set(null);
        }
        sqlSession.get().close();

        Iterator<T> cursorIteratorTmp = cursorIterator.get();
        if (null != cursorIteratorTmp) {
            cursorIterator.set(null);

        }

        if (null != parameterValues) {
            parameterValues.set(null);
        }
    }

    
    public void setParameterValues(Map<String, Object> parameterValues) {
        this.parameterValues.set(parameterValues);
    }

}
    

当然还有更简单的办法,那就是使用原型模式,每次getBean时候会重新创建一个对象。

四 、参考

欢迎关注公众号‘技术原始积累’参与投票:

image.png
image.png