java轻量级数据迁移通用框架设计与实现
数据迁移通用框架设计与实现本文分享一套基于生产者-消费者模式的高性能数据迁移框架支持多线程并发读取、批量写入、失败重试与优雅停机适合需要高效、可复用数据迁移方案的开发者。一、为什么需要通用数据迁移框架在日常开发中我们经常遇到以下场景将旧表数据迁移到新表表结构异构跨数据源同步MySQL → ES、Oracle → MySQL数据清洗、加密、格式转换后入库如果每次迁移都从头写脚本不仅效率低下还容易出错。本文设计的通用迁移基类通过泛型 多线程生产者-消费者模式实现了✅ 读写解耦充分利用CPU和IO资源✅ 批量处理减少数据库交互次数✅ 失败重试保证数据最终一致性✅ 可配置线程数灵活适配不同环境✅ 毒丸机制安全终止所有线程二、整体架构设计2.1 核心思想角色线程类型职责生产者读线程分页从源数据源查询数据放入有界阻塞队列消费者写线程从队列批量取出数据转换为目标对象批量写入目标数据源毒丸特殊标记对象用于通知线程优雅退出避免线程永久阻塞2.2 工作流程text读线程1 ──┐ 读线程2 ──┼── 阻塞队列 ── 写线程1 ── 目标库 读线程3 ──┘ 写线程2 ──┘启动 N 个读线程并发分页查询源数据放入队列。启动 M 个写线程从队列批量取出数据转换格式后批量保存。当所有源数据读取完毕最后一个读线程向队列发送 N 个“读毒丸”。写线程收到读毒丸后若已处理完所有数据则发送“写毒丸”通知其他写线程退出。所有线程安全退出迁移任务完成。三、完整源码文件1BaseMigrationTask.java迁移任务基类javaimport org.apache.log4j.Logger; import org.springframework.data.domain.PageRequest; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * 迁移任务基类 * */ public abstract class BaseMigrationTaskT, K { /** * 读毒丸标识 */ private static String READ_POISON_PILL READ_QUEUE_POISON_PILL; /** * 缓存T的有界阻塞队列 */ private BlockingQueueT queue new LinkedBlockingQueue(20000); /** * 分页查询数据计数器 */ private AtomicInteger readIntervalInteger new AtomicInteger(0); /** * 读取数据计数器 */ private AtomicInteger readAtomicInteger new AtomicInteger(0); /** * 写入数据计数器 */ private AtomicInteger writeAtomicInteger new AtomicInteger(0); /** * 读取数据库是否完成标记 */ private volatile boolean isFinishRead false; /** * 写线程数量 */ private int writeThreadCount 0; /** * 读线程数量 */ private int readThreadCount 0; /** * 读线程计数器 */ private AtomicInteger readThreadAtomicInteger new AtomicInteger(); /** * 源数据Class */ private ClassT sourceClass; /** * 目标数据Class */ private ClassK targetClass; private static final Logger logger Logger.getLogger(BaseMigrationTask.class); void setSourceClass(ClassT sourceClass) { this.sourceClass sourceClass; } void setTargetClass(ClassK targetClass) { this.targetClass targetClass; } /** * 设置写线程数 * * param writeThreadCount 写线程数量 */ public void setWriteThreadCount(int writeThreadCount) { this.writeThreadCount writeThreadCount; } /** * 设置读线程数 * * param readThreadCount 读线程数量 */ public void setReadThreadCount(int readThreadCount) { this.readThreadCount readThreadCount; } /** * 分页查询T * * param pageRequest 分页参数 * return T列表数据 * throws Exception 查询失败时抛出异常 */ abstract ListT findT(PageRequest pageRequest) throws Exception; /** * 保存K * * param kList K列表 * return 保存成功的K列表 * throws Exception 保存失败时抛出异常 */ abstract ListK saveK(ListK kList) throws Exception; /** * 读-生产者 * * throws Exception 线程中断时抛出异常 */ public void put() throws Exception { System.out.println(消费者); boolean flag true; while (flag) { //CAS获取分页读取源数据区间 int interval; int readUnit 4000; while (true) { int current readIntervalInteger.get(); int next current readUnit; if (readIntervalInteger.compareAndSet(current, next)) { interval next; break; } } //分页查询源数据 PageRequest pageRequest new PageRequest(interval / readUnit - 1, readUnit); ListT tList findT(pageRequest); //将查询数据放入队列同时read计数器;若队列已满则阻塞等待 for (T t : tList) { queue.put(t); readAtomicInteger.incrementAndGet(); } logger.log(Log4jLevel.FINE, 查询数据 sourceClass.getSimpleName() 成功条数 tList.size()); //读取已经完成结束循环 if (tList.size() readUnit) { //修改退出线程标识符为false,准备退出线程 flag false; logger.log(Log4jLevel.FINE, 线程查询 sourceClass.getSimpleName() 结束 Thread.currentThread().getName()); int incrementAndGet readThreadAtomicInteger.incrementAndGet(); //判断所有线程是否全部完成工作若均已完成则发送读毒丸告知消费者可以退出等待 if (incrementAndGet readThreadCount) { //修改所有读线程是否完成标识为true isFinishRead true; //给所有写线程发送读毒丸 for (int i 0; i writeThreadCount; i) { T newInstance sourceClass.newInstance(); newInstance.setAsymkeyNum(READ_POISON_PILL); queue.put(newInstance); } } } } } /** * 写-消费者 * * throws Exception 线程中断时抛出异常 */ public void get() throws Exception { boolean flag true; endTask: while (flag) { ListT tList new ArrayList(); ListK kList new ArrayList(); int writeUnit 1000; //批量操作数据 //写毒丸标识 String writeQueuePoisonPill WRITE_QUEUE_POISON_PILL; for (int i 0; i writeUnit; i) { T take queue.take(); //判断是否为写毒丸若满足则退出最外层循环结束线程 if (writeQueuePoisonPill.equals(take.getAsymkeyNum())) { break endTask; } //判断是否为读毒丸则退出该次批量操作循环重新进入循环 if (READ_POISON_PILL.equals(take.getAsymkeyNum())) { break; } //将源数据copy至目标 K k targetClass.newInstance(); k.setAsymkeyNum(take.getAsymkeyNum()); k.setPrivateKey(take.getPrivateKey()); kList.add(k); tList.add(take); } //批量保存目标数据 ListK saveK saveK(kList); int incrementAndGet; //CAS更新已读取数据数量 while (true) { int current writeAtomicInteger.get(); int next current saveK.size(); if (writeAtomicInteger.compareAndSet(current, next)) { incrementAndGet next; break; } } logger.log(Log4jLevel.FINE, 插入 targetClass.getSimpleName() 成功条数 saveK.size()); //筛选保存失败的数据准备将保存失败对应的源数据放回队列 for (K k : saveK) { for (T t : tList) { if (t.getAsymkeyNum().equals(k.getAsymkeyNum())) { tList.remove(t); break; } } } //判断是否存在保存失败数据若存在将保存失败对应的源数据放回队列 if (!tList.isEmpty()) { //将保存失败对应源数据放回队列 for (T t : tList) { queue.put(t); } //给所有写线程发送读毒丸 for (int i 0; i writeThreadCount; i) { T newInstance sourceClass.newInstance(); newInstance.setAsymkeyNum(READ_POISON_PILL); queue.put(newInstance); } } //若所有读线程已结束 if (isFinishRead) { //判断目标数据是否已经全部保存完成 if (incrementAndGet readAtomicInteger.get()) { //准备退出线程 flag false; logger.log(Log4jLevel.FINE, 插入 targetClass.getSimpleName() 数据完成总数 incrementAndGet); logger.log(Log4jLevel.FINE, 插入 targetClass.getSimpleName() 数据完成发送毒丸 Thread.currentThread().getName()); //给所有除当前线程外写线程发送写毒丸 for (int i 0; i writeThreadCount - 1; i) { T newInstance sourceClass.newInstance(); newInstance.setAsymkeyNum(writeQueuePoisonPill); queue.put(newInstance); } } } } } }文件2SpringAppRunner.java任务执行器javaimport org.apache.log4j.Logger; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.ExitCodeGenerator; import org.springframework.boot.SpringApplication; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 迁移任务执行类 * * */ Component public class SpringAppRunner implements CommandLineRunner { Resource private KeyToTempTask keyToTempTask; Resource private TempToKeyTask tempToKeyTask; Resource private ApplicationContext context; private static final Logger logger Logger.getLogger(SpringAppRunner.class); Override public void run(String... args) throws Exception { ListString params Arrays.asList(args); logger.log(Log4jLevel.FINE, 参数列表 params.toString()); int availableProcessors Runtime.getRuntime().availableProcessors(); int length args.length; int keyIndex 1; if (length 1) { keyIndex Integer.parseInt(args[0]); } keyToTempTask.setKeyIndex(keyIndex); int threadNum availableProcessors * 2; ExecutorService executorService Executors.newFixedThreadPool(threadNum); this.performTask(keyToTempTask, executorService, availableProcessors, availableProcessors); logger.log(Log4jLevel.FINE, 睡眠1分钟协助数据库连接及JVM GC); //等待协助数据库连接回收及JVM GC Thread.sleep( 60 * 1000); this.performTask(tempToKeyTask, executorService, availableProcessors, availableProcessors); // 退出服务 int exitCode SpringApplication.exit(context, new ExitCodeGenerator() { Override public int getExitCode() { return 0; } }); System.exit(exitCode); } private void performTask(BaseMigrationTask migrationTask, ExecutorService executorService, int writeCount, int readCount) throws Exception { migrationTask.setWriteThreadCount(writeCount); migrationTask.setReadThreadCount(readCount); ListCallableVoid callableArrayList new ArrayList(); //生产者线程 for (int i 0; i readCount; i) { Producer producer new Producer(migrationTask); callableArrayList.add(producer); } //消费者线程 for (int i 0; i writeCount; i) { Consumer consumer new Consumer(migrationTask); callableArrayList.add(consumer); } //等待执行结果 executorService.invokeAll(callableArrayList); } } /** * 生产者 */ class Producer implements CallableVoid { private BaseMigrationTask migrationTask; Producer(BaseMigrationTask migrationTask) { this.migrationTask migrationTask; } Override public Void call() throws Exception { try { migrationTask.put(); } catch (Exception e) { e.printStackTrace(); } return null; } } /** * 消费者 */ class Consumer implements CallableVoid { private BaseMigrationTask migrationTask; Consumer(BaseMigrationTask migrationTask) { this.migrationTask migrationTask; } Override public Void call() throws Exception { try { migrationTask.get(); } catch (Exception e) { e.printStackTrace(); } return null; } }四、核心逻辑详解4.1 生产者读线程工作流程原子分配分页偏移量使用AtomicInteger CAS 操作每个读线程独立累加readIntervalInteger每次增加readUnit4000从而计算出本次要查询的页号。多线程安全且无重复。执行分页查询调用子类实现的findT(PageRequest)方法获取一页源数据。数据入队将每条数据放入BlockingQueue队列容量 20000。若队列满则自动阻塞等待。同时累加readAtomicInteger记录已读取总数。判断是否结束如果查询到的记录数小于readUnit说明没有更多数据。当前读线程退出并利用readThreadAtomicInteger记录已退出的读线程数量。最后一个退出的读线程负责设置isFinishRead true标志。向队列中放入writeThreadCount个读毒丸通知所有写线程读取阶段已结束。4.2 消费者写线程工作流程批量取数据每次循环尝试从队列中取出writeUnit1000条数据如果取到写毒丸直接退出整个写线程。如果取到读毒丸则跳出本次批量循环不清空已取数据等待下一批。否则将源对象T转换为目标对象K示例中复制了asymkeyNum和privateKey字段并分别加入tList和kList。批量保存调用子类实现的saveK(kList)获得实际保存成功的对象列表saveK。通过 CAS 更新writeAtomicInteger累加成功条数。失败重试通过双重循环对比saveK和tList找出未保存成功的源数据将其重新放回队列头部queue.put。 同时再次发送writeThreadCount个读毒丸让所有写线程重新处理这批失败数据避免数据丢失。正常退出当isFinishRead true且已写入数量 ≥ 已读取数量时说明所有数据已处理完毕。当前写线程打印完成日志。向其他写线程发送写毒丸数量为writeThreadCount - 1不给自己发。自身通过flagfalse退出循环。4.3 线程协调亮点无锁设计利用LinkedBlockingQueue内置的阻塞机制生产者与消费者之间无需额外加锁。CAS 计数器readIntervalInteger、readAtomicInteger、writeAtomicInteger保证多线程下的计数绝对准确且性能优于synchronized。双毒丸机制区分“读取完成”读毒丸和“写入完成”写毒丸避免消费者在数据未处理完时提前终止。失败重试保证即使部分数据写入失败也会重新入队并触发新一轮处理最终实现数据最终一致性。五、如何使用该框架5.1 继承 BaseMigrationTask 实现具体迁移任务javaComponent public class KeyToTempTask extends BaseMigrationTaskKeyEntity, TempEntity { Resource private KeyRepository keyRepository; // 源数据DAO Resource private TempRepository tempRepository; // 目标数据DAO PostConstruct public void init() { setSourceClass(KeyEntity.class); setTargetClass(TempEntity.class); } Override ListKeyEntity findT(PageRequest pageRequest) throws Exception { // 分页查询源数据 return keyRepository.findAll(pageRequest).getContent(); } Override ListTempEntity saveK(ListTempEntity kList) throws Exception { // 批量保存目标数据返回实际成功的列表 return tempRepository.saveAll(kList); } }5.2 配置线程数量并启动在SpringAppRunner中已默认将读线程数和写线程数设置为Runtime.getRuntime().availableProcessors()CPU核数。 可通过启动参数传入自定义配置例如java -jar migration.jar 2表示设置keyIndex2具体业务参数。5.3 运行效果启动 Spring Boot 应用后会自动执行两个迁移任务KeyToTempTask将密钥表数据迁移到临时表。等待 60 秒给数据库连接池回收和 GC 时间。TempToKeyTask将临时表数据再迁回密钥表可用于数据回滚或双重校验。控制台会输出类似以下日志text查询数据KeyEntity成功条数4000 查询数据KeyEntity成功条数4000 插入TempEntity成功条数1000 插入TempEntity成功条数1000 ... 线程查询KeyEntity结束pool-1-thread-1 插入TempEntity数据完成总数80000 插入TempEntity数据完成发送毒丸pool-1-thread-5六、总结本文提供的通用数据迁移框架具有以下优点高吞吐多线程并发读写批量操作减少 IO 次数。高可靠性失败自动重试毒丸机制保证线程安全退出。易扩展只需实现findT和saveK两个方法即可完成任意数据源的迁移。