likes
comments
collection
share

设计方案-定时任务接口数据存储及更新策略

作者站长头像
站长
· 阅读数 9

前言

在没有使用ETL工具且不考虑多数据源的情况下,我们需要从别的系统获取数据时,一般会选择分页接口查询并存储。本文算是我对类似场景代码的提炼,旨在总结相关套路,提升自我对数据库和模块的设计能力。

ETL工具--英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。(数据仓库结构)通俗的说法就是从数据源抽取数据出来,进行清洗加工转换,然后加载到定义好的数据仓库模型中去

常规接口对接

接口对接推荐使用Feign,常规写法如下

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

/**
 * @Author WangZY
 * @Date 2021/8/10 10:51
 * @Description 获取订单数据
 **/
@Component
@FeignClient(name = "${spring.application.name}GateWayApi", url = "${gateway.url}")
public interface GateWayFeign {

    @PostMapping(value = "/ds-data-service/xxx", headers = {"API-TOKEN=xxxx"})
    OrderLineResDTO getOrderLineToSmbgj(@RequestBody OrderLineQueryDTO query);
}

生成对应的查询参数及结果类

@NoArgsConstructor
@Data
public class OrderLineResDTO {

    @JsonProperty("data")
    private List<DataDTO> data;
    @JsonProperty("success")
    private Boolean success;
    @JsonProperty("affectedRow")
    private String affectedRow;
    @JsonProperty("errorCode")
    private Integer errorCode;
    @JsonProperty("errorInfo")
    ......
@NoArgsConstructor
@Data
public class OrderLineQueryDTO {
    ......

对应依赖及配置文件

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
    <version>xxxx</version>
</dependency>
# 外部链接
gateway.url=http://service-gw.ruijie.com.cn/api
# 日志打印
logging.level.com.ruijie.forecastdata.api=debug

需求分类

全量翻新类--订单全流程定时项目已采用此方案并上线稳定运行

详细信息

数据定时任务通过接口接收或者翻新数据库全量数据,逻辑删除以前的历史数据,保留固定次数的历史数据

操作步骤

前提条件

  1. 项目使用Mybatis-Plus,如果不是的话请自行替换对应SQL操作语句。该方案默认已提供MP的service层
  2. 对接系统提供分页查询接口
  3. 分布式调度(定时任务)采用elastic-job框架,请自行替换对应组件

定时任务

import com.alibaba.fastjson.JSON;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;


import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;

import org.springframework.stereotype.Component;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

@Component
@Slf4j
public class OrderLineSchedule implements SimpleJob {

    //引入接口类以及对应实体ServiceImpl类,或者将批量插入方法放到service层
    private final GateWayFeign gateWayFeign;
    private final SellOrderListServiceImpl sellOrderListService;

    public OrderLineSchedule(GateWayFeign gateWayFeign, SellOrderListServiceImpl sellOrderListService) {
        this.gateWayFeign = gateWayFeign;
        this.sellOrderListService = sellOrderListService;
    }
    //创建日期格式转换器,这里按需使用即可,提供了两种,一种对应普通java.util.Date,一种对应JDK8及以上提供的java.time.LocalDateTime

    private final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
    private final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss");

    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("开始定时查询数仓->销售订单表数据");
        executeRun();
        log.info("查询数仓->销售订单表数据结束");
    }

    //这里单独把操作类拿出来,是为了方便在Controller层通过接口调用,留一个手动操作的口子
    public void executeRun() {
    //updateOverDaily获取当前批次queue,解释详见后续service类
        int queue = sellOrderListService.updateOverDaily();
        try {
           
            cycleExecute(1, queue);
            //每次定时删除多余的历史数据
            log.info("数仓->销售订单表数据,删除批次号为[{}]", queue);
            CompletableFuture.runAsync(sellOrderListService::deletePre);
        } catch (Exception e) {
        //这里不使用事务而是采用手动回滚的原因是,由于定时任务一般时间较长,必然导致大事务问题,因此采用手动方式去规避问题
            log.error("查询数仓->销售订单表数据报错,回滚至上一版数据", e);
            sellOrderListService.updateRe(queue);
        }
    }
    /**
     * 该方法使用递归,分页获取接口参数,结束条件是接口没有数据,注意递归次数不宜过多,过多会导致栈溢出 
     * @param pageNum 接口分页参数--没有可删除
     * @param queue 当前批次


     */          


    public void cycleExecute(int pageNum, int queue) {
    //调用接口,这里分页查询         
        OrderLineQueryDTO query = new OrderLineQueryDTO();
        query.setPageNo(pageNum);
        query.setPageSize(1000);
        query.setInFields(new OrderLineQueryDTO.InFieldsDTO("19900101 00:00:00", "20230101 00:00:00"));
        log.info("开始调用数仓->销售订单表接口,查询参数={}", JSON.toJSONString(query));
        OrderLineResDTO res = gateWayFeign.getOrderLineToSmbgj(query);
        log.info("结束调用数仓->销售订单表接口,返回结果={}", res);
        //判断接口是否返回值,如果没有返回值,结束递归         
        if (CollectionUtils.isNotEmpty(res.getData())) {
            List<SellOrderList> sellOrderLists = new ArrayList<>(1024);
            for (OrderLineResDTO.DataDTO datum : res.getData()) {
            //数据转换,谨慎使用BeanUtils.copy()
                SellOrderList orderAdd = new SellOrderList();
                orderAdd.setCustomerName(datum.getPartyName());
                orderAdd.setItemCode(datum.getItemCode());
                orderAdd.setItemDesc(datum.getItemDesc());
                orderAdd.setFirstIntegratorSystem(datum.getFirstIntegratorSystem());
                orderAdd.setFinalCustomerName(datum.getFinalCustomerName());
                orderAdd.setOrderNum(datum.getOrderNumber());
                orderAdd.setOrderBelong(datum.getOrderBelongTypeName());
                orderAdd.setOrderType(datum.getOrderTypeNew());
                orderAdd.setLineNumber(datum.getOrderNumber());
                orderAdd.setContract(datum.getCustPoNumber());
                Date format = null;
                try {
                    format = sdf.parse(datum.getCreationDate());
                } catch (ParseException e) {
                    log.error("无法解析时间" + datum.getCreationDate());
                }
                orderAdd.setOrderCreateDate(format);
                orderAdd.setProductDescription(datum.getProductDescription());
                orderAdd.setOrderQty(datum.getOrderQty());
                orderAdd.setSoldToCountries(datum.getCountryName());
                orderAdd.setDataUpdateDate(datum.getLastUpdateDate());
                orderAdd.setCreateTime(new Date());
                orderAdd.setIsDelete(0);
                //批次判断,如果-1说明第一次,给默认值0,如果不是,填入当前批次即可
                orderAdd.setQueue(queue == -1 ? 0 : queue);
                sellOrderLists.add(orderAdd);
            }
            //推荐自己写批量插入SQL,MP提供的速度不够快,默认批次1000条
            sellOrderListService.getBaseMapper().batchSchdule(sellOrderLists);
            //开启递归,分页+1
            cycleExecute(pageNum + 1, queue, startTime);
        }
    }
}

service层

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/**
 * <p>
 * 数仓->销售订单表 服务实现类
 * </p>
 *
 * @author WangZY
 * @since 2022-06-01
 */
@Service
@Slf4j
public class SellOrderListServiceImpl extends ServiceImpl<SellOrderListMapper, SellOrderList> implements ISellOrderListService {
    /**
     * @author WangZY
     * @date 2022/6/1 16:09
     * @description 逻辑删除上一版数据
     **/
    @Override
    public int updateOverDaily() {
        List<SellOrderList> list = lambdaQuery().eq(SellOrderList::getIsDelete, 0)
                .select(SellOrderList::getQueue).list();
        if (CollectionUtils.isEmpty(list)) {
        //没有说明第一次全量,返回标识量-1
            return -1;
        } else {
        //逻辑删除上一版数据,并生成下一批次号即+1
            Integer queue = list.get(0).getQueue();
            SellOrderList delivery = new SellOrderList();
            delivery.setIsDelete(1);
            lambdaUpdate().eq(SellOrderList::getIsDelete, 0).update(delivery);
           
            return queue + 1;
        }
    }

    /**
     * @author WangZY
     * @date 2022/6/1 16:09
     * @description 删除历史数据
     **/
    @Override
    public void deletePre() {
        QueryWrapper<SellOrderList> qw = new QueryWrapper<>();
        qw.select("distinct queue");
        List<SellOrderList> queueNumberList = list(qw);
        List<Integer> queueDistinct = queueNumberList.parallelStream()
                .map(SellOrderList::getQueue).sorted(Comparator.comparingInt(o -> o)).collect(Collectors.toList());
                //删除30个批次之前的数据,这里可以调整保留多少版数据
        if (queueDistinct.size() > 30) {
            for (int i = 0; i < queueDistinct.size() - 30; i++) {
                remove(new QueryWrapper<SellOrderList>().eq("queue", queueDistinct.get(i)));
            }
        }
    }

    /**
     * @author WangZY
     * @date 2022/6/1 16:09
     * @description 恢复指定版本数据,并删除该版本的下一版数据
     **/
    @Override
    public void updateRe(int queue) {
        SellOrderList deliveryPre = new SellOrderList();
        deliveryPre.setUpdateTime(new Date());
        deliveryPre.setIsDelete(1);
        lambdaUpdate().eq(SellOrderList::getQueue, queue + 1).update(deliveryPre);
        SellOrderList deliveryAfter = new SellOrderList();
        deliveryAfter.setUpdateTime(new Date());
        deliveryAfter.setIsDelete(0);
        lambdaUpdate().eq(SellOrderList::getQueue, queue).update(deliveryAfter);
    }
}

数据库需要字段

设计方案-定时任务接口数据存储及更新策略

每日更新类--物料基础信息及SMB海外预测项目已上线并稳定运行

详细信息

数据每日通过接口接收增量数据,增量数据通过数据中的唯一值进行新增和更新的判断

操作步骤

前提条件

同全量翻新类

定时任务

import com.alibaba.fastjson.JSON;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Component;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * @author WangZY
 * @date 2022/6/2 11:51
 * @description SMB国际获取订单行定时任务
 **/
@Component
@Slf4j
public class OrderLineSchedule implements SimpleJob {
     //引入接口类以及对应实体ServiceImpl类,或者将批量插入方法放到service层

    private final GateWayFeign gateWayFeign;
    private final SellOrderListServiceImpl sellOrderListService;

    public OrderLineSchedule(GateWayFeign gateWayFeign, SellOrderListServiceImpl sellOrderListService) {
        this.gateWayFeign = gateWayFeign;
        this.sellOrderListService = sellOrderListService;
    }

    private final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
    private final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss");

    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("开始定时查询数仓->SMB国际获取订单行数据");
        executeRun();
        log.info("查询数仓->SMB国际获取订单行数据结束");
    }

    public void executeRun() {
    //查询出当前数据库里所有数据的ID,以及唯一条件,切勿全字段容易触发OOM
        List<SellOrderList> list = sellOrderListService.lambdaQuery().eq(SellOrderList::getIsDelete, 0)
                .select(SellOrderList::getId, SellOrderList::getLineId).list();
        if (CollectionUtils.isEmpty(list)) {
        //空的就全量新增即可
            cycleExecute(1, new HashMap<>(8), "20000101 00:00:00", "20500101 00:00:00");
        } else {
        //不为空,说明是增量更新,这里创建MAP,key为唯一条件,value为实体类,其实这个实体最主要的就是要个ID,方便批量更新
            Map<Integer, SellOrderList> judgeMap = list.stream().filter(e -> e.getLineId() != null)
                    .collect(Collectors.toMap(SellOrderList::getLineId, Function.identity()));
                    //每日更新,这里取昨天时间,顺便冗余一小时
            LocalDateTime time = LocalDateTime.now().minusDays(1).minusHours(1);
            cycleExecute(1, judgeMap, time.format(dtf), LocalDateTime.now().format(dtf));
        }
    }

    public void cycleExecute(int pageNum, Map<Integer, SellOrderList> judgeMap, String startTime, String endTime) {
    //正常分页调接口即可
        OrderLineQueryDTO query = new OrderLineQueryDTO();
        query.setPageNo(pageNum);
        query.setPageSize(1000);
        query.setInFields(new OrderLineQueryDTO.InFieldsDTO(startTime, endTime));
        log.info("开始调用数仓->SMB国际获取订单行接口,查询参数={}", JSON.toJSONString(query));
        OrderLineResDTO res = gateWayFeign.getOrderLineToSmbgj(query);
        log.info("结束调用数仓->SMB国际获取订单行接口,返回结果={}", JSON.toJSONString(res));

        if (CollectionUtils.isNotEmpty(res.getData())) {
            if (judgeMap.isEmpty()) {
            //map为空,说明全量新增,直接批量新增即可
                List<SellOrderList> sellOrderLists = new ArrayList<>(1024);
                for (OrderLineResDTO.DataDTO datum : res.getData()) {
                    SellOrderList orderAdd = new SellOrderList();
                    orderAdd.setLineId(datum.getLineId());
                    paddingParam(datum, orderAdd);
                    orderAdd.setCreateTime(new Date());
                    orderAdd.setIsDelete(0);
                    sellOrderLists.add(orderAdd);
                }
                sellOrderListService.getBaseMapper().batchSchdule(sellOrderLists);
                cycleExecute(pageNum + 1, judgeMap, startTime, endTime);
            } else {            
                List<SellOrderList> sellOrderListAdd = new ArrayList<>(1024);
                List<SellOrderList> sellOrderListUpdate = new ArrayList<>(1024);
                for (OrderLineResDTO.DataDTO datum : res.getData()) {
                    SellOrderList existOrder = judgeMap.get(datum.getLineId());
                    if (existOrder == null) {
                   //map中没有数据,直接新增
                        SellOrderList orderAdd = new SellOrderList();
                        orderAdd.setLineId(datum.getLineId());
                        paddingParam(datum, orderAdd);
                        orderAdd.setCreateTime(new Date());
                        orderAdd.setIsDelete(0);
                        sellOrderListAdd.add(orderAdd);
                    } else {
                     //map中有数据,说明该实体类以及初始化了,需要更新,这里把主键放入该修改的实体类中
                        SellOrderList orderUpdate = new SellOrderList();
                        orderUpdate.setId(existOrder.getId());
                        paddingParam(datum, orderUpdate);
                        orderUpdate.setUpdateTime(new Date());
                        sellOrderListUpdate.add(orderUpdate);
                    }
                }
                if (CollectionUtils.isNotEmpty(sellOrderListAdd)) {
                    sellOrderListService.getBaseMapper().batchSchdule(sellOrderListAdd);
                }
                if (CollectionUtils.isNotEmpty(sellOrderListUpdate)) {
                    sellOrderListService.updateBatchById(sellOrderListUpdate);
                }
                cycleExecute(pageNum + 1, judgeMap, startTime, endTime);
            }
        }
    }

    private void paddingParam(OrderLineResDTO.DataDTO datum, SellOrderList orderAdd) {
        orderAdd.setCustomerName(datum.getPartyName());
        orderAdd.setItemCode(datum.getItemCode());
        orderAdd.setItemDesc(datum.getItemDesc());
        orderAdd.setFirstIntegratorSystem(datum.getFirstIntegratorSystem());
        orderAdd.setFinalCustomerName(datum.getFinalCustomerName());
        orderAdd.setOrderNum(datum.getOrderNumber());
        orderAdd.setOrderBelong(datum.getOrderBelongTypeName());
        orderAdd.setOrderType(datum.getOrderTypeNew());
        orderAdd.setContract(datum.getCustPoNumber());
        Date format = null;
        try {
            format = sdf.parse(datum.getCreationDate());
        } catch (ParseException e) {
            log.error("无法解析时间" + datum.getCreationDate());
        }
        orderAdd.setOrderCreateDate(format);
        orderAdd.setProductDescription(datum.getProductDescription());
        orderAdd.setOrderQty(datum.getOrderQty());
        orderAdd.setSoldToCountries(datum.getCountryName());
        orderAdd.setDataUpdateDate(datum.getLastUpdateDate());
    }
}

优化要点

代码部分其实已经相当完善了,来说点代码之外的东西,例如大家喜闻乐见的优化。

减少大对象

大批量的数据必然会带来大对象,大对象的堆积则必然会导致OOM--java heap space即堆内存溢出。减少数据的传输必然是重中之重,可通过如下方向优化

  1. 接口的数据能精简的精简
  2. SQL的查询参数尽量减少并且使用尽量少字段的结果类去接收(即使字段没有值,但是序列化的时候,依然会有key)

减少循环

  1. 大多数同事可能用惯了stream流,所以会嫌弃使用for。但其实得根据情况,因为stream流本身就是循环的语法糖,多个stream流不方便合并的时候,用一个for循环就好了。
  2. 增量的时候我们需要对比从数据库里拿到的原数据,这个时候就不要循环里面套循环去contains。建议提前一次循环做个map,以唯一值为key,对象为value,会快很多,唯一不足的是用空间换时间,要注意OOM问题。

多线程优化

熟练使用CompletableFuture.allOf以及parallelStream流会大大提升效率。该部分留待后续文章分析,坑很多,知识点也很多。

批量插入

MP为了通用性终究是相对保守了点,可尝试修改批量插入,甚至是加上多线程事务。多线程事务实际上通常采用2PC的思想实现,这部分也留待后续文章分析。

写在最后

设计方案系列来自我对场景代码的总结,日常工作中会有很多这样的场景,我就想着要把这些套路代码留存下来,方便下次使用。本次套路包含了数据库设计、代码案例、优化思想三块,基本上都点到了,希望对读者的工作有所帮助。优化这块算是通用的思考吧,说白了性能优化就是围绕硬件和软件进行开源节流,堆硬件和提升软件效率。

最近一直在写一些多线程代码,尝试去总结套路,下一篇会是ComplatableFutrue的实战,多线程操作以及简易的2PC实现,干货很多,敬请期待。