likes
comments
collection
share

队列 LocalQueue

作者站长头像
站长
· 阅读数 13

问题列表

  1. 过饱问题

场景一:

1、消费者每天处理的量比生产者生产的少;如生产者每天1万条,消费者每天只能消费5千条。

2、解决办法:消费者加机器

3、原因:生产者没法限流,因为要一天内处理完,只能消费者加机器

队列 LocalQueue

场景二:

1、消费者每天处理的量比生产者生产的多。

2、系统高峰期生产者速度太快,把队列塞爆了

3、解决办法:适当的加大队列

4、原因:消费者一天的消费能力已经高于生产者,那说明一天之内肯定能处理完,保证高峰期别把队列塞满就好

队列 LocalQueue

场景三:

1、消费者每天处理的量比生产者生产的多

2、条件有限或其他原因,队列没法设置特别大

3、系统高峰期生产者速度太快,把队列塞爆了

4、解决办法:生产者限流

5、原因:消费者一天的消费能力高于生产者,说明一天内能处理完,队列又太小,那只能限流生产者,让高峰期塞队列的速度慢点

队列 LocalQueue

应用场景

一批任务 需要监听是否处理完成

/**  
* @author yuye  
* @version 1.0  
*/  
@Component  
public class OperateClearQueue {  
  
    /**  
    * 配置的任务内存队列数量  
    */  
    @Value("${bucket.operate.queue-num:32}")  
    private Integer queueNum;  
  
    /**  
    * 处理任务的队列列表  
    */  
    private final List<BlockingQueue> operateQueue = new ArrayList<>();  
  
    /**  
    * 下一个任务处理队列在列表中的下标  
    */  
    private AtomicInteger index = new AtomicInteger();  
  
  
    @PostConstruct  
    public void init(){  
        ExecutorService executors = Executors.newFixedThreadPool(queueNum);  
        for (int i = 0; i< queueNum; i++){  
        //设置一个队列最大容纳数量  
        BlockingQueue blockingQueue = new ArrayBlockingQueue(150000);  
        operateQueue.add(blockingQueue);  
        executors.execute(new OperateClearRunner(blockingQueue,inventoryBucketService));  
    }  

    }  
  
    /**  
    * 轮询  
    * @param object  
    * @return  
    */  
    public boolean offerByRoundRobin(Object object) {  
        index.compareAndSet(queueNum * 10000,0);  
        boolean offer = operateQueue.get(index.getAndIncrement() % queueNum).offer(object);  
        return offer;  
    }  
  
    /**  
    * 所有队列任务处理完成  
    * @return  
    */  
    public boolean checkProcessFinish() {  
        for (BlockingQueue blockingQueue : operateQueue) {  
            if (blockingQueue.size() > 0) {  
                return false;  
            }  
        }  
        return true;  
    }  
  
}

但是这个有几个问题

我想支持顺序执行 (同一个订单号的按照顺序执行)

对 offerByRoundRobin 方法进行扩展,支持自己选择 queue,对一定参数进行hash 保证顺序

如果服务突然宕机,会不会导致数据不一致,或者其他问题

会的,这就是后面的处理要支持 幂等操作的原因,还可能丢失数据,如果纯依靠内存 操作的话

如果生产者性能高,有没有更进一步提升性能的方法

可以添加 几个额外的cache 区,生产满了之后,就切换2个空间,让消费者 继续消费的同时,那边继续生产,形成一个交替

/**  
* 交换队列  
*/  
private void swapRequests() {  
    lock.lock();  
    try {   
        LinkedList<Object> tmp = writeQueue;  
        writeQueue = readQueue;  
        readQueue = tmp;  
    } finally {  
        lock.unlock();  
    }  
}

最终定版

/**  
* @author yuye  
* @version 1.0  
*/  
//@Component  
public class LocalQueue {  
  
/**  
* 配置的任务内存队列数量  
*/  
private Integer queueNum;  
  
/**  
* 处理任务的队列列表  
*/  
private final List<BlockingQueue> operateQueue = new ArrayList<>();  
  
/**  
* 下一个任务处理队列在列表中的下标  
*/  
private AtomicInteger index = new AtomicInteger();  
  
  
  
  
public LocalQueue(BaseLocalQueueRunner command) {  
    this(command, 32, 150000);  
}  
  
public LocalQueue(BaseLocalQueueRunner command, int queueNum, int capacity) {  
    if(queueNum <= 0 || capacity <= 0){  
        throw new RuntimeException("LocalQueue queueNum/capacity is not less than 0");  
    }  
    if(command == null){  
        throw new RuntimeException("LocalQueue command is not blank");  
    }  
    this.queueNum = queueNum;  
    ExecutorService executors = Executors.newFixedThreadPool(queueNum);  
    for (int i = 0; i< queueNum; i++){  
    //设置一个队列最大容纳数量  
    BlockingQueue blockingQueue = new ArrayBlockingQueue(capacity);  
    operateQueue.add(blockingQueue);  
    BaseLocalQueueRunner runnable = command.getInstance();  
    runnable.setBlockingQueue(blockingQueue);  
    executors.execute(runnable);  
}  
}  
  
  
  
/**  
* 轮询  
* @param object  
* @return  
*/  
public boolean offerByRoundRobin(Object object) {  
    return offerByRoundRobin(object, index.getAndIncrement() % queueNum);  
}  
  
/**  
* 轮询  
* @param object  
* @param indexQueue 选择的queue index  
* @return  
*/  
public boolean offerByRoundRobin(Object object, int indexQueue) {  
    index.compareAndSet(queueNum * 10000,0);  
    boolean offer = operateQueue.get(indexQueue).offer(object);  
    return offer;  
}  
  
/**  
* 所有队列任务处理完成  
* @return  
*/  
public boolean checkProcessFinish() {  
    for (BlockingQueue blockingQueue : operateQueue) {  
        if (blockingQueue.size() > 0) {  
            return false;  
        }  
    }  
    return true;  
}  
  
}

问题

如果想保证顺序呢?

下一次

我们打印日志的时候 有的时候 需要对日志的部分内容进行脱敏,敏感词变为 *号,对手机号和密码进行处理,或者不返回

todo

实现搜索效果 (springboot-starter 一个jar 实现这种效果)

队列 LocalQueue

代码地址

个人

请勿转载

转载自:https://juejin.cn/post/7268996716648235027
评论
请登录