DataX同步hive数据到mysql时死锁问题初步排查
原文已发布在微信公众号:融媒体技术社
1 DataX配置说明
数据仓库中有7700万的数据需要从hive同步到业务的MySql库,选择DataX作为同步工具,DS作为任务调度平台。在DS中拖拽datax的组件,选择自定义模板,并将脚本生成的datax的json配置粘贴到JSON配置栏中,配置内容如下(已删掉额外信息):
1 | { |
我们选择写mysql用mysqlwriter插件,看看官方如何描述:MysqlWriter 插件实现了写入数据到 Mysql 主库的目的表的功能。在底层实现上, MysqlWriter 通过 JDBC 连接远程 Mysql 数据库,并执行相应的 insert into … 或者 ( replace into …) 的 sql 语句将数据写入 Mysql,内部会分批次提交入库,需要数据库本身采用 innodb 引擎。
job.setting.speed.channel=16
: 表示有16个通道同时写数据至MySQL;
writeMode=replace
: 表示MySQL以replace into的方式写数据。
DataX对于writeMod
有这样一段描述,其中replace into
是对replace
模式的描述:
MysqlWriter 通过 DataX 框架获取 Reader 生成的协议数据,根据你配置的
writeMode
生成
insert into...
(当主键/唯一性索引冲突时会写不进去冲突的行)或者
replace into...
(没有遇到主键/唯一性索引冲突时,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 Mysql。出于性能考虑,采用了PreparedStatement + Batch
,并且设置了:rewriteBatchedStatements=true
,将数据缓冲到线程上下文 Buffer 中,当 Buffer 累计到预定阈值时,才发起写入请求。
这说明DataX使用了批量写的方式来写入MySQL。
2 DataX 相关代码解析
打开datax的源码,我们找到mysqlwriter模块,里面只有一个类文件:MysqlWriter
,该类中有一个内部静态类Task
继承了Writer.Task
,找到核心代码:
1 | public void startWrite(RecordReceiver recordReceiver) { |
这个方法很简单,直接调用了commonRdbmsWriterTask
的startWrite方法。进入到这里CommonRdbmsWriter.Task
的startWrite(recordReceiver,writerSliceConfig,taskPluginCollector)
方法:
1 | // TODO 改用连接池,确保每次获取的连接都是可用的(注意:连接可能需要每次都初始化其 session) |
继续跟进,可以看到是当前类的startWriteWithConnection(recordReceiver, taskPluginCollector, connection);
方法中。
1 | public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) { |
batchSize
为2048(public static final int DEFAULT_BATCH_SIZE = 2048;
),是MysqlWriter
的默认配置,我们配置了2000;batchByteSize
的默认值为32 * 1024 * 1024(public static final int DEFAULT_BATCH_BYTE_SIZE = 32 * 1024 * 1024;
)。根据代码我们知道,如果内存中的数据条数大于等于我们设置的batchSize大小或者内存中的字节码大小大于等于batchByteSize大小的时候,就会开始写数据到mysql。进入doBatchInsert(connection, writeBuffer)
方法看一下。
1 | protected void doBatchInsert(Connection connection, List<Record> buffer) |
由于InnoDB的update、delete、insert、replace都会自动给涉及到的数据加上排他锁,此处的connection.setAutoCommit(false)
将事务设置为手动提交,每次批量写完数据后再手动commit。死锁就出现在这块儿逻辑。
死锁出现后,datax开始执行doOneInsert(connection, buffer);
,会一条一条的执行插入数据,connection.setAutoCommit(true);
改为自动提交,每插入一条数据自动提交一次。
1 | protected void doOneInsert(Connection connection, List<Record> buffer) { |
日志中会看到:
1 | include: null, offset: 0, length: 9223372036854775807} |
死锁日志:
1 | {"byteSize":3,"index":31,"rawData":"0.0","type":"STRING"},{"byteSize":1,"index":32,"rawData":"0","type":"STRING"}],"type":"writer"} |
3 死锁解析
简单来看一个死锁过程,假设有mock_table这样一张表,id为唯一索引,此时这张表的锁为行锁。
session1 | session2 |
---|---|
set autocommit=0; | set autocommit=0; |
replace into mock_table where id = 1; | replace into mocl_table where id = 2; |
replace into mock_table where id = 2; 此处会被阻塞 | replace into mock_table where id = 1; 出现Deadlock报错 |
回到我们的场景,如果数据源的重复数据很多,并且顺序混乱,多channel
的replace
模式极有可能出现死锁。
但是,回到问题,mysql表中通过cid创建的唯一索引,cid在hive中没有重复数据。那问题是怎么发生的呢?我们回到数据分割的代码逻辑中。
在datax的core模块中,找到com/alibaba/datax/core/job/JobContainer.java
这个类,通过adjustChannelNumber()
方法,我们可以看到needChannelNumber
设置为public static final String DATAX_JOB_SETTING_SPEED_CHANNEL = "job.setting.speed.channel";
对应的配置信息,也就是文章开头的json配置文件中:job.setting.speed.channel=16
这个配置。
1 | private void adjustChannelNumber() { |
在JobContainer中,我们找到split()
方法,就能看到doReaderSplit(needChannelNumber)
, 这个参数就是上面配置的channel参数。
1 | /** |
我们从上面方法中看到doReaderSplit和doWriterSplit方法,doReaderSplit是读取阶段如何划分,doWriterSplit是写入阶段如何划分任务。但我们需要注意,并不是说我们配置了channel多少,就会划分多少个任务,而是根据doReader返回的结果划分任务。
1 | // TODO: 如果源头就是空数据 |
1 | private List<Configuration> doWriterSplit(int readerTaskNumber) { |
我们先看看doReaderSplit实际调用的方法。我们读取的是hdfs,使用的插件是hdfsreader。我们跳转到hdfsreader模块的com/alibaba/datax/plugin/reader/hdfsreader/HdfsReader.java
类的split方法:
1 |
|
HdfsReader.split(adviceNumber)
方法相对简单,就是读取hdfs文件的个数,并将文件路径保存在readerSplitConfigs中。虽然splitSourceFiles
做了一些复杂的操作,但是我们发现splitSourceFiles的传入参数,第一个是hdfs的文件列表,第二个是列表数。所以,readerSplitConfigs.size()的数量就是splitNumber的值,也就是hdfs指定路径下的文件个数。
writer插件我们使用的是mysqlwriter插件,因此我们又回到MysqlWriter
类:
1 |
|
继续跟进,在commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);
查看实现细节:
1 | public List<Configuration> split(Configuration originalConfig, |
层层嵌套,我们继续看WriterUtil.doSplit(originalConfig, mandatoryNumber);
方法,我们只有一个表,所以,只看tableNumber ==1的逻辑:
1 | //TODO 切分报错 |
从这些方法中,我们没发现什么比较深奥的问题,我们回过头看看JobContainer.split(int adviceNumber)
方法,在做完doReaderSplit和doWriterSplit方法后,做了List<Configuration> contentConfig =mergeReaderAndWriterTaskConfigs(readerTaskConfigs, writerTaskConfigs, transformerList);
操作,将reader的配置和writer的配置信息做了合并。
而实际执行的数据读取和写入的逻辑是在JobContainer.schedule
方法中:
1 | /** |
深入研读schedule的方法,里面的公平分派任务JobAssignUtil.assignFairly
的方法也没有任何逻辑问题,划分任务时,并没有修改读取hdfs分区文件的问题。按照这个逻辑,假如我的分区下有12个文件,那么就会有12个任务,taskNumber的值为12,对应的writer也有12个任务,通过this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
我们可以知道needChannelNumber
的值也为12了(因为12<16)。12/5=3,也就是3个任务组,应该不会出现重复读取同一文件的情况。
至此,问题依然没有找到…只能回到数据上,执行统计SQL,但结果为空,不存在重复数据。
1 | select count(cid) from |
4 总结
使用DataX同步数据到MySQL需要注意如下几点:
- 注意目标表的唯一索引,如果使用
replace
模式,我能想到的场景可能是数据有重复,但又想正常同步。这种情况channel
最好设置为1,然后通过增加batchSize
来提高吞吐量; - 从原始需求出发,保证数据的唯一性。或者改变唯一索引的用法,在service层再对数据做一次去重处理。