其他
如何快速安全的插入千万条数据?
思路
1.估算文件大小
2.如何批量插入
3.数据的完整性
4.数据库是否支持批次数据
5.中途出错的情况
实现
1.准备数据表
CREATE TABLE `file_analysis` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`file_type` varchar(255) NOT NULL COMMENT '文件类型 01:类型1,02:类型2',
`file_name` varchar(255) NOT NULL COMMENT '文件名称',
`file_path` varchar(255) NOT NULL COMMENT '文件路径',
`status` varchar(255) NOT NULL COMMENT '文件状态 0初始化;1成功;2失败:3处理中',
`position` bigint(20) NOT NULL COMMENT '上一次处理完成的位置',
`crt_time` datetime NOT NULL COMMENT '创建时间',
`upd_time` datetime NOT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8
CREATE TABLE `file_order` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`file_id` bigint(20) DEFAULT NULL,
`field1` varchar(255) DEFAULT NULL,
`field2` varchar(255) DEFAULT NULL,
`field3` varchar(255) DEFAULT NULL,
`field4` varchar(255) DEFAULT NULL,
`field5` varchar(255) DEFAULT NULL,
`field6` varchar(255) DEFAULT NULL,
`field7` varchar(255) DEFAULT NULL,
`field8` varchar(255) DEFAULT NULL,
`field9` varchar(255) DEFAULT NULL,
`field10` varchar(255) DEFAULT NULL,
`field11` varchar(255) DEFAULT NULL,
`field12` varchar(255) DEFAULT NULL,
`field13` varchar(255) DEFAULT NULL,
`field14` varchar(255) DEFAULT NULL,
`field15` varchar(255) DEFAULT NULL,
`field16` varchar(255) DEFAULT NULL,
`field17` varchar(255) DEFAULT NULL,
`field18` varchar(255) DEFAULT NULL,
`crt_time` datetime NOT NULL COMMENT '创建时间',
`upd_time` datetime NOT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10000024 DEFAULT CHARSET=utf8
2.配置数据库包大小
'%max_allowed_packet%';
+--------------------------+------------+
| Variable_name | Value |
+--------------------------+------------+
| max_allowed_packet | 1048576 |
| slave_max_allowed_packet | 1073741824 |
+--------------------------+------------+
2 rows in set
set global max_allowed_packet = 1024*1024*10;
Query OK, 0 rows affected
show VARIABLES like Caused by: com.mysql.jdbc.PacketTooBigException: Packet for query is too large (4980577 > 1048576). You can change this value on the server by setting the max_allowed_packet' variable.
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3915)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2598)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2778)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2834)
3.准备测试数据
public static void main(String[] args) throws IOException {
FileWriter out = new FileWriter(new File("D://xxxxxxx//orders.txt"));
for (int i = 0; i < 10000000; i++) {
out.write(
"vaule1,vaule2,vaule3,vaule4,vaule5,vaule6,vaule7,vaule8,vaule9,vaule10,vaule11,vaule12,vaule13,vaule14,vaule15,vaule16,vaule17,vaule18");
out.write(System.getProperty("line.separator"));
}
out.close();
}
4.截取数据的完整性
ByteBuffer byteBuffer = ByteBuffer.allocate(buffSize); // 申请一个缓存区
long endPosition = batchFileSize + startPosition - buffSize;// 子文件结束位置
long startTime, endTime;
for (int i = 0; i < count; i++) {
startTime = System.currentTimeMillis();
if (i + 1 != count) {
int read = inputChannel.read(byteBuffer, endPosition);// 读取数据
readW: while (read != -1) {
byteBuffer.flip();// 切换读模式
byte[] array = byteBuffer.array();
for (int j = 0; j < array.length; j++) {
byte b = array[j];
if (b == 10 || b == 13) { // 判断\n\r
endPosition += j;
break readW;
}
}
endPosition += buffSize;
byteBuffer.clear(); // 重置缓存块指针
read = inputChannel.read(byteBuffer, endPosition);
}
} else {
endPosition = fileSize; // 最后一个文件直接指向文件末尾
}
...省略,更多可以查看Github完整代码...
}
5.批次插入数据
通过insert(...)values(...),(...)的方式批次插入数据,部分代码如下:
// 保存订单和解析位置保证在一个事务中
SqlSession session = sqlSessionFactory.openSession();
try {
long startTime = System.currentTimeMillis();
FielAnalysisMapper fielAnalysisMapper = session.getMapper(FielAnalysisMapper.class);
FileOrderMapper fileOrderMapper = session.getMapper(FileOrderMapper.class);
fileOrderMapper.batchInsert(orderList);
// 更新上次解析到的位置,同时指定更新时间
fileAnalysis.setPosition(endPosition + 1);
fileAnalysis.setStatus("3");
fileAnalysis.setUpdTime(new Date());
fielAnalysisMapper.updateFileAnalysis(fileAnalysis);
session.commit();
long endTime = System.currentTimeMillis();
System.out.println("===插入数据花费:" + (endTime - startTime) + "ms===");
} catch (Exception e) {
session.rollback();
} finally {
session.close();
}
...省略,更多可以查看Github完整代码...
总结
https://my.oschina.net/OutOfMemory/blog/3117737
点击「阅读原文」和栈长学更多~