DataX writer 批量提交
人面不知何处去,桃花依旧笑春风。
DataX
是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL
、Oracle
、SqlServer
、Postgre
、HDFS
、Hive
、ADS
、HBase
、TableStore(OTS)
、MaxCompute(ODPS)
、DRDS
等各种异构数据源之间高效的数据同步功能。
优化
优化如下:
默认 HbaseAbstractTask
.startWriter
方法
public void startWriter(RecordReceiver lineReceiver,TaskPluginCollector taskPluginCollector){
Record record;
try {
while ((record = lineReceiver.getFromReader()) != null) {
Put put;
try {
put = convertRecordToPut(record);
} catch (Exception e) {
taskPluginCollector.collectDirtyRecord(record, e);
continue;
}
try {
this.htable.put(put);
} catch (IllegalArgumentException e) {
if(e.getMessage().equals("No columns to insert") && nullMode.equals(NullModeType.Skip)){
LOG.info(String.format("record is empty, 您配置nullMode为[skip],将会忽略这条记录,record[%s]", record.toString()));
continue;
}else {
taskPluginCollector.collectDirtyRecord(record, e);
continue;
}
}
}
}catch (IOException e){
throw DataXException.asDataXException(Hbase094xWriterErrorCode.PUT_HBASE_ERROR,e);
}finally {
Hbase094xHelper.closeTable(this.htable);
}
}
hbase
的htable
api
支持putList
方法,修改上面代码如下:
public void startWriter(RecordReceiver lineReceiver,TaskPluginCollector taskPluginCollector){
Record record;
List<Put> putList = new ArrayList<>(2000);
Long begin = System.currentTimeMillis();
try {
while ((record = lineReceiver.getFromReader()) != null) {
Put put;
try {
put = convertRecordToPut(record);
} catch (Exception e) {
taskPluginCollector.collectDirtyRecord(record, e);
continue;
}
putList.add(put);
try {
if (putList.size() % 2000 == 0 || System.currentTimeMillis() - begin > 200) {
this.asyncTable.put(putList);
putList.clear();
begin = System.currentTimeMillis();
}
} catch (IllegalArgumentException e) {
if (e.getMessage().equals("No columns to insert") && nullMode.equals(NullModeType.Skip)) {
LOG.info(String.format("record is empty, 您配置nullMode为[skip],将会忽略这条记录,record[%s]", record.toString()));
continue;
} else {
taskPluginCollector.collectDirtyRecord(record, e);
continue;
}
}
}
} finally {
Hbase20xHelper.closeConn(future);
}
}
修改为每2000条记录提交一次,减少请求。
总结
如果你使用的writer
中支持批量提交,也可以按照上面进行修改
转载自:https://juejin.cn/post/6844904088891719694