SpringBoot 整合MongoDB步骤+MySQL数据实时迁移到MongoDB方案
前言
最近项目中有一个需求,某些表的数据 不是由系统业务产生的,而是有可能是从别的第三方系统抽取过来的。而这些数据可能量很大,所以Mysql只作为一个持久化存储,当系统中去查询这些数据时,希望从MongoDB中查询,这里就涉及到一个MySQL中数据实时迁移到MongoDB的问题,本文就来简单介绍一下SpringBoot整合MongoDB的步骤,以及MySQL数据实时迁移到MongoDB的一种方案。
SpringBoot整合MongoDB
依赖+配置文件
pom.xml
<!--引入Mongodb-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
application.yml
spring:
data:
mongodb:
uri: mongodb://localhost:27017
database: demo
实体类+Repository
创建文档对应的实体类,标注@Document,指定集合名称,id字段标注@Id
@Data
@Document("t_log")
public class LogEntity implements Serializable {
private static final long serialVersionUID = 1L;
@Id
private String id;
//省略其他字段...
}
创建Repository类,作用类似于service
public interface LogRepository extends MongoRepository<LogEntity, String> {
}
后续就可以在代码中注入LogRepository,调用它的方法操作MongoDB了。
数据迁移
数据迁移的方案如下:
- 在MySQL中,需要迁移数据的表中,添加触发器,当触发增删改三种操作时,将表名以及该条数据的主键记录到一个中间表。
- 项目启动后,起一个异步线程去轮询中间表,只要有数据,就根据表名和主键查询出该条数据,再根据操作的类型更新到MongoDB
- 上述操作完毕后,删除中间表的数据。
下面贴一下详细的步骤
触发器
中间表表结构如下:
在需要迁移数据的表中添加触发器:
事件监听器
监听上下文刷新事件,实现在项目启动之后,触发自定义逻辑
@Component
public class UploadService {
@EventListener
public void onEventListener(ContextRefreshedEvent event) {
new Thread(() -> {
while (true) {
upload();
//休眠200毫秒 避免cpu空转
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
}
}
轮询
@Component
public class UploadService {
@Autowired
private UploadLogService uploadLogService;
@Autowired
private LogService logService;
@Autowired
private LogRepository logRepository;
private static final ThreadPoolTaskExecutor threadPool = UploadThreadPool.getTaskExecutor();
private long maxId = 0;
/**
* 监听项目启动事件
*
* @param event
*/
@EventListener
public void onEventListener(ContextRefreshedEvent event) {
new Thread(() -> {
while (true) {
upload();
//休眠200毫秒 避免cpu空转
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
}
private void upload() {
//获取最小id值
UploadLogEntity one = uploadLogService.getOne(new QueryWrapper<UploadLogEntity>().orderByAsc("id").last("limit 1"));
if (one == null) return;
//赋值最小id值,取数根据id来取 >= id
maxId = maxId > 0 ? maxId : one.getId();
//查询数据
List<UploadLogEntity> list = uploadLogService.list(new QueryWrapper<UploadLogEntity>().ge("id", maxId).last("limit 1000"));
if (list.size() == 0) return;
maxId = list.get(list.size() - 1).getId() + 1;
//线程池异步迁移数据
list.forEach(data -> threadPool.execute(() -> {
Long id = data.getId();//中间表数据id
String tableName = data.getTableName();//表名
String keyValue = data.getKeyValue();//主键名
String type = data.getType();
if ("t_log".equals(tableName)) {
if ("1".equals(type)) {
LogEntity entity = logService.getById(keyValue);
logRepository.save(entity);
} else if ("2".equals(type)) {
logRepository.deleteById(keyValue);
}
} else if ("t_demo".equals(tableName)) {
//依次类推,判断每个可能的表名,分别进行处理
}
}
//单例线程池
static class UploadThreadPool {
private static ThreadPoolTaskExecutor taskExecutor = taskExecutor();
private UploadThreadPool() {
}
public static ThreadPoolTaskExecutor getTaskExecutor() {
return taskExecutor;
}
private static ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);//核心线程数
executor.setMaxPoolSize(50);//最大线程数
executor.setQueueCapacity(100);//等待队列长度
executor.setThreadNamePrefix("upload-thread-");//线程名称前缀
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//拒绝策略
executor.initialize();//线程池初始化
return executor;
}
}
}
拓展
上面的方案其实还是有一定问题,因为代码中在迁移数据时,还是要根据主键去业务表里查询出该条数据,如果业务表数据很多的话,这个查询频率可能会对数据库造成压力。
由于项目比较早,项目上的MySQL版本还是5.6,如果MySQL版本在5.7及以上的话,可以考虑在触发器中,直接使用JSON_OBJECT
函数,将json数据存到中间表,代码中只需要将json查出来,转化为实体类,存到MongoDB中即可。不需要再去查询业务表,降低了数据库压力。
总结
以上,就完成了MySQL数据实时迁移MongoDB。
转载自:https://juejin.cn/post/7361231869587374091