原文已发布在微信公众号:融媒体技术社

文章链接:https://mp.weixin.qq.com/s?__biz=MzAwMzY3NDY1Ng==&mid=2647660979&idx=1&sn=d8266a96c07ea5061938192d0f1465c3&chksm=83133efcb464b7ea109f224908781675a613ee50f8abddffb89f72a2afb6667c76e92d76914d#rd

1 DataX配置说明

数据仓库中有7700万的数据需要从hive同步到业务的MySql库,选择DataX作为同步工具,DS作为任务调度平台。在DS中拖拽datax的组件,选择自定义模板,并将脚本生成的datax的json配置粘贴到JSON配置栏中,配置内容如下(已删掉额外信息):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
{
"job":
{
"setting":
{
"speed":
{
"channel": "16",
"batchSize": 2000
}
},
"content":
[
{
"reader":
{
"name": "hdfsreader",
"parameter":
{
"path": "/bigdata/user/hive/warehouse/xxxx.db/abc/dt=2023-07-19",
"defaultFS": "hdfs://bigdata",
"hadoopConfig": {
"dfs.nameservices": "bigdata",
"dfs.ha.namenodes.bigdata": "nn1,nn2",
"dfs.namenode.rpc-address.bigdata.nn1": "hadoop001:8020",
"dfs.namenode.rpc-address.bigdata.nn2": "hadoop002:8020",
"dfs.client.failover.proxy.provider.bigdata": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"column":
[
{
"index": 0,
"name": "cid",
"type": "string"
},
{
"index": 1,
"name": "company_name",
"type": "string"
},
...
],
"fileType": "orc",
"encoding": "UTF-8",
"fieldDelimiter": "\t"
}
},
"writer":
{
"name": "mysqlwriter",
"parameter":
{
"column":
[
"cid",
"company_name",
...
],
"connection":
[
{
"jdbcUrl": "jdbc:mysql://xxx:3306/xxxx_database",
"table":
[
"company_basic_info"
]
}
],
"password": "xxxx",
"username": "xxxx",
"writeMode": "replace"
}
}
}
]
}
}

我们选择写mysql用mysqlwriter插件,看看官方如何描述:MysqlWriter 插件实现了写入数据到 Mysql 主库的目的表的功能。在底层实现上, MysqlWriter 通过 JDBC 连接远程 Mysql 数据库,并执行相应的 insert into … 或者 ( replace into …) 的 sql 语句将数据写入 Mysql,内部会分批次提交入库,需要数据库本身采用 innodb 引擎。

job.setting.speed.channel=16 表示有16个通道同时写数据至MySQL;

writeMode=replace 表示MySQL以replace into的方式写数据。

DataX对于writeMod有这样一段描述,其中replace into是对replace模式的描述:

MysqlWriter 通过 DataX 框架获取 Reader 生成的协议数据,根据你配置的 writeMode 生成

  • insert into...(当主键/唯一性索引冲突时会写不进去冲突的行)
或者
  • replace into...(没有遇到主键/唯一性索引冲突时,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 Mysql。出于性能考虑,采用了 PreparedStatement + Batch,并且设置了:rewriteBatchedStatements=true,将数据缓冲到线程上下文 Buffer 中,当 Buffer 累计到预定阈值时,才发起写入请求。

这说明DataX使用了批量写的方式来写入MySQL。

2 DataX 相关代码解析

打开datax的源码,我们找到mysqlwriter模块,里面只有一个类文件:MysqlWriter,该类中有一个内部静态类Task继承了Writer.Task,找到核心代码:

1
2
3
4
public void startWrite(RecordReceiver recordReceiver) {
this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,
super.getTaskPluginCollector());
}

这个方法很简单,直接调用了commonRdbmsWriterTask的startWrite方法。进入到这里CommonRdbmsWriter.TaskstartWrite(recordReceiver,writerSliceConfig,taskPluginCollector)方法:

1
2
3
4
5
6
7
8
9
10
// TODO 改用连接池,确保每次获取的连接都是可用的(注意:连接可能需要每次都初始化其 session)
public void startWrite(RecordReceiver recordReceiver,
Configuration writerSliceConfig,
TaskPluginCollector taskPluginCollector) {
Connection connection = DBUtil.getConnection(this.dataBaseType,
this.jdbcUrl, username, password);
DBUtil.dealWithSessionConfig(connection, writerSliceConfig,
this.dataBaseType, BASIC_MESSAGE);
startWriteWithConnection(recordReceiver, taskPluginCollector, connection);
}

继续跟进,可以看到是当前类的startWriteWithConnection(recordReceiver, taskPluginCollector, connection);方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
···
while ((record = recordReceiver.getFromReader()) != null) {
...
writeBuffer.add(record);
bufferBytes += record.getMemorySize();

if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
}
}
if (!writeBuffer.isEmpty()) {
doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
}
···
}

batchSize为2048(public static final int DEFAULT_BATCH_SIZE = 2048;),是MysqlWriter的默认配置,我们配置了2000;batchByteSize的默认值为32 * 1024 * 1024(public static final int DEFAULT_BATCH_BYTE_SIZE = 32 * 1024 * 1024;)。根据代码我们知道,如果内存中的数据条数大于等于我们设置的batchSize大小或者内存中的字节码大小大于等于batchByteSize大小的时候,就会开始写数据到mysql。进入doBatchInsert(connection, writeBuffer)方法看一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected void doBatchInsert(Connection connection, List<Record> buffer)
throws SQLException {
PreparedStatement preparedStatement = null;
try {
connection.setAutoCommit(false);
preparedStatement = connection
.prepareStatement(this.writeRecordSql);

for (Record record : buffer) {
preparedStatement = fillPreparedStatement(
preparedStatement, record);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
connection.commit();
} catch (SQLException e) {
LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为:" + e.getMessage());
connection.rollback();
doOneInsert(connection, buffer);
} catch (Exception e) {
throw DataXException.asDataXException(
DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
DBUtil.closeDBResources(preparedStatement, null);
}
}

由于InnoDB的update、delete、insert、replace都会自动给涉及到的数据加上排他锁,此处的connection.setAutoCommit(false)将事务设置为手动提交,每次批量写完数据后再手动commit。死锁就出现在这块儿逻辑。

死锁出现后,datax开始执行doOneInsert(connection, buffer);,会一条一条的执行插入数据,connection.setAutoCommit(true);改为自动提交,每插入一条数据自动提交一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected void doOneInsert(Connection connection, List<Record> buffer) {
PreparedStatement preparedStatement = null;
try {
connection.setAutoCommit(true);
preparedStatement = connection
.prepareStatement(this.writeRecordSql);

for (Record record : buffer) {
try {
preparedStatement = fillPreparedStatement(
preparedStatement, record);
preparedStatement.execute();
} catch (SQLException e) {
LOG.debug(e.toString());

this.taskPluginCollector.collectDirtyRecord(record, e);
} finally {
// 最后不要忘了关闭 preparedStatement
preparedStatement.clearParameters();
}
}
} catch (Exception e) {
throw DataXException.asDataXException(
DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
DBUtil.closeDBResources(preparedStatement, null);
}
}

日志中会看到:

1
2
3
4
5
6
7
include: null, offset: 0, length: 9223372036854775807}
2023-08-22 14:52:07.226 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00%
2023-08-22 14:52:13.262 [0-0-18-writer] WARN CommonRdbmsWriter$Task - 回滚此次写入, 采用每次写入一行方式提交. 因为:Deadlock found when trying to get lock; try restarting transaction
2023-08-22 14:52:13.552 [0-2-16-writer] WARN CommonRdbmsWriter$Task - 回滚此次写入, 采用每次写入一行方式提交. 因为:Deadlock found when trying to get lock; try restarting transaction
2023-08-22 14:52:13.552 [0-0-0-writer] WARN CommonRdbmsWriter$Task - 回滚此次写入, 采用每次写入一行方式提交. 因为:Deadlock found when trying to get lock; try restarting transaction
2023-08-22 14:52:13.552 [0-1-8-writer] WARN CommonRdbmsWriter$Task - 回滚此次写入, 采用每次写入一行方式提交. 因为:Deadlock found when trying to get lock; try restarting transaction
2023-08-22 14:52:13.555 [0-3-4-writer] WARN CommonRdbmsWriter$Task - 回滚此次写入, 采用每次写入一行方式提交. 因为:Deadlock found when trying to get lock; try restarting transaction

死锁日志:

1
2
3
{"byteSize":3,"index":31,"rawData":"0.0","type":"STRING"},{"byteSize":1,"index":32,"rawData":"0","type":"STRING"}],"type":"writer"}
2023-08-22 11:52:22.028 [0-1-9-writer] ERROR StdoutPluginCollector - 脏数据:
{"exception":"Deadlock found when trying to get lock; try restarting transaction","record":[{"byteSize":5,"index":0,"rawData":"53670","type":"STRING"},{"byteSize":10,"index":1,"rawData":"贵州众薪劳务有限公司","type":"STRING"},{"byteSize":4,"index":2,"rawData":"小微企业","type":"STRING"},{"byteSize":0,"index":3,"rawData":"","type":"STRING"},{"byteSize":3,"index":4,"rawData":"未上市","type":"STRING"},{"byteSize":0,"index":5,"rawData":"","type":"STRING"},{"byteSize":3,"index":6,"rawData":"蒋*晨","type":"STRING"},{"byteSize":10,"index":7,"rawData":"2020-10-23","type":"STRING"},{"byteSize":11,"index":8,"rawData":"13638022823","type":"STRING"},{"byteSize":0,"index":9,"type":"STRING"},{"byteSize":0,"index":10,"type":"STRING"},{"byteSize":1,"index":11,"rawData":"0","type":"STRING"},{"byteSize":2,"index":12,"rawData":"存续","type":"STRING"},{"byteSize":0,"index":13,"type":"STRING"},{"byteSize":13,"index":14,"rawData":"建筑装饰、装修和其他建筑业","type":"STRING"},{"byteSize":41,"index":15,"rawData":"贵州省黔南州瓮安县雍阳街道办事处城北社区大地商业广场9栋1层11、12、13号商铺","type":"STRING"},{"byteSize":1,"index":16,"rawData":"0","type":"STRING"},{"byteSize":3,"index":17,"rawData":"0.0","type":"STRING"},{"byteSize":3,"index":18,"rawData":"0.0","type":"STRING"},{"byteSize":3,"index":19,"rawData":"0.0","type":"STRING"},{"byteSize":3,"index":20,"rawData":"0.0","type":"STRING"},{"byteSize":3,"index":21,"rawData":"0.0","type":"STRING"},{"byteSize":1,"index":22,"rawData":"0","type":"STRING"},{"byteSize":3,"index":23,"rawData":"0.0","type":"STRING"},{"byteSize":1,"index":24,"rawData":"0","type":"STRING"},{"byteSize":3,"index":25,"rawData":"0.0","type":"STRING"},{"byteSize":1,"index":26,"rawData":"0","type":"STRING"},{"byteSize":3,"index":27,"rawData":"0.0","type":"STRING"},{"byteSize":1,"index":28,"rawData":"0","type":"STRING"},{"byteSize":3,"index":29,"rawData":"0.0","type":"STRING"},{"byteSize":1,"index":30,"rawData":"0","type":"STRING"},{"byteSize":3,"index":31,"rawData":"0.0","type":"STRING"},{"byteSize":1,"index":32,"rawData":"0","type":"STRING"}],"type":"writer"}

3 死锁解析

简单来看一个死锁过程,假设有mock_table这样一张表,id为唯一索引,此时这张表的锁为行锁。

session1 session2
set autocommit=0; set autocommit=0;
replace into mock_table where id = 1; replace into mocl_table where id = 2;
replace into mock_table where id = 2; 此处会被阻塞 replace into mock_table where id = 1; 出现Deadlock报错

回到我们的场景,如果数据源的重复数据很多,并且顺序混乱,多channelreplace模式极有可能出现死锁。

但是,回到问题,mysql表中通过cid创建的唯一索引,cid在hive中没有重复数据。那问题是怎么发生的呢?我们回到数据分割的代码逻辑中。

在datax的core模块中,找到com/alibaba/datax/core/job/JobContainer.java这个类,通过adjustChannelNumber()方法,我们可以看到needChannelNumber设置为public static final String DATAX_JOB_SETTING_SPEED_CHANNEL = "job.setting.speed.channel";对应的配置信息,也就是文章开头的json配置文件中:job.setting.speed.channel=16这个配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void adjustChannelNumber() {
...
boolean isChannelLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
if (isChannelLimit) {
this.needChannelNumber = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);

LOG.info("Job set Channel-Number to " + this.needChannelNumber
+ " channels.");
return;
}
...
}

在JobContainer中,我们找到split()方法,就能看到doReaderSplit(needChannelNumber), 这个参数就是上面配置的channel参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,
* 达到切分后数目相等,才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起,
* 然后,为避免顺序给读写端带来长尾影响,将整合的结果shuffler掉
*/
private int split() {
this.adjustChannelNumber();

if (this.needChannelNumber <= 0) {
this.needChannelNumber = 1;
}

List<Configuration> readerTaskConfigs = this
.doReaderSplit(this.needChannelNumber);
int taskNumber = readerTaskConfigs.size();
List<Configuration> writerTaskConfigs = this.doWriterSplit(taskNumber);
List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);

LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
/**
* 输入是reader和writer的parameter list,输出是content下面元素的list
*/
List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
readerTaskConfigs, writerTaskConfigs, transformerList);
LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));

this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);
}

我们从上面方法中看到doReaderSplit和doWriterSplit方法,doReaderSplit是读取阶段如何划分,doWriterSplit是写入阶段如何划分任务。但我们需要注意,并不是说我们配置了channel多少,就会划分多少个任务,而是根据doReader返回的结果划分任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// TODO: 如果源头就是空数据
private List<Configuration> doReaderSplit(int adviceNumber) {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
List<Configuration> readerSlicesConfigs =
this.jobReader.split(adviceNumber);
if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
"reader切分的task数目不能小于等于0");
}
LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.",
this.readerPluginName, readerSlicesConfigs.size());
classLoaderSwapper.restoreCurrentThreadClassLoader();
return readerSlicesConfigs;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private List<Configuration> doWriterSplit(int readerTaskNumber) {
// 加载writer插件
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName));

// 此处开始切割划分任务
List<Configuration> writerSlicesConfigs = this.jobWriter
.split(readerTaskNumber);
if (writerSlicesConfigs == null || writerSlicesConfigs.size() <= 0) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
"writer切分的task不能小于等于0");
}
LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.",
this.writerPluginName, writerSlicesConfigs.size());
classLoaderSwapper.restoreCurrentThreadClassLoader();

return writerSlicesConfigs;
}

我们先看看doReaderSplit实际调用的方法。我们读取的是hdfs,使用的插件是hdfsreader。我们跳转到hdfsreader模块的com/alibaba/datax/plugin/reader/hdfsreader/HdfsReader.java类的split方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public List<Configuration> split(int adviceNumber) {

LOG.info("split() begin...");
List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();
// warn:每个slice拖且仅拖一个文件,
// int splitNumber = adviceNumber;
int splitNumber = this.sourceFiles.size();
if (0 == splitNumber) {
throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,
String.format("未能找到待读取的文件,请确认您的配置项path: %s", this.readerOriginConfig.getString(Key.PATH)));
}

List<List<String>> splitedSourceFiles = this.splitSourceFiles(new ArrayList<String>(this.sourceFiles), splitNumber);
for (List<String> files : splitedSourceFiles) {
Configuration splitedConfig = this.readerOriginConfig.clone();
splitedConfig.set(Constant.SOURCE_FILES, files);
readerSplitConfigs.add(splitedConfig);
}

return readerSplitConfigs;
}

HdfsReader.split(adviceNumber)方法相对简单,就是读取hdfs文件的个数,并将文件路径保存在readerSplitConfigs中。虽然splitSourceFiles做了一些复杂的操作,但是我们发现splitSourceFiles的传入参数,第一个是hdfs的文件列表,第二个是列表数。所以,readerSplitConfigs.size()的数量就是splitNumber的值,也就是hdfs指定路径下的文件个数。

writer插件我们使用的是mysqlwriter插件,因此我们又回到MysqlWriter类:

1
2
3
4
 @Override
public List<Configuration> split(int mandatoryNumber) {
return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);
}

继续跟进,在commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);查看实现细节:

1
2
3
4
public List<Configuration> split(Configuration originalConfig,
int mandatoryNumber) {
return WriterUtil.doSplit(originalConfig, mandatoryNumber);
}

层层嵌套,我们继续看WriterUtil.doSplit(originalConfig, mandatoryNumber);方法,我们只有一个表,所以,只看tableNumber ==1的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//TODO 切分报错
public static List<Configuration> doSplit(Configuration simplifiedConf,
int adviceNumber) {

List<Configuration> splitResultConfigs = new ArrayList<Configuration>();

int tableNumber = simplifiedConf.getInt(Constant.TABLE_NUMBER_MARK);

//处理单表的情况
if (tableNumber == 1) {
//由于在之前的 master prepare 中已经把 table,jdbcUrl 提取出来,所以这里处理十分简单
for (int j = 0; j < adviceNumber; j++) {
splitResultConfigs.add(simplifiedConf.clone());
}

return splitResultConfigs;
}
...
}

从这些方法中,我们没发现什么比较深奥的问题,我们回过头看看JobContainer.split(int adviceNumber)方法,在做完doReaderSplit和doWriterSplit方法后,做了List<Configuration> contentConfig =mergeReaderAndWriterTaskConfigs(readerTaskConfigs, writerTaskConfigs, transformerList);操作,将reader的配置和writer的配置信息做了合并。

而实际执行的数据读取和写入的逻辑是在JobContainer.schedule方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中,
* 同时不同的执行模式调用不同的调度策略,将所有任务调度起来
*/
private void schedule() {
/**
* 这里的全局speed和每个channel的速度设置为B/s
*/
int channelsPerTaskGroup = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
int taskNumber = this.configuration.getList(
CoreConstant.DATAX_JOB_CONTENT).size();
// 请注意看这个,取得是配置的channel和实际任务的最小值
this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
/**
* 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务
*/
List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
this.needChannelNumber, channelsPerTaskGroup);

LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());

AbstractScheduler scheduler;
try {
...
scheduler = initStandaloneScheduler(this.configuration);
...
scheduler.schedule(taskGroupConfigs);

} catch (Exception e) {

}
...
}

深入研读schedule的方法,里面的公平分派任务JobAssignUtil.assignFairly的方法也没有任何逻辑问题,划分任务时,并没有修改读取hdfs分区文件的问题。按照这个逻辑,假如我的分区下有12个文件,那么就会有12个任务,taskNumber的值为12,对应的writer也有12个任务,通过this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);我们可以知道needChannelNumber 的值也为12了(因为12<16)。12/5=3,也就是3个任务组,应该不会出现重复读取同一文件的情况。

至此,问题依然没有找到…只能回到数据上,执行统计SQL,但结果为空,不存在重复数据。

1
2
3
4
5
select count(cid) from 
dwt_creditrisk_xy_report_v3_1.company_basic_info
where dayno='2023-07-19'
group by cid
having count(cid) > 1

4 总结

使用DataX同步数据到MySQL需要注意如下几点:

  • 注意目标表的唯一索引,如果使用replace模式,我能想到的场景可能是数据有重复,但又想正常同步。这种情况channel最好设置为1,然后通过增加batchSize来提高吞吐量;
  • 从原始需求出发,保证数据的唯一性。或者改变唯一索引的用法,在service层再对数据做一次去重处理。