队列 LocalQueue
序
问题列表
- 过饱问题
场景一:
1、消费者每天处理的量比生产者生产的少;如生产者每天1万条,消费者每天只能消费5千条。
2、解决办法:消费者加机器
3、原因:生产者没法限流,因为要一天内处理完,只能消费者加机器
场景二:
1、消费者每天处理的量比生产者生产的多。
2、系统高峰期生产者速度太快,把队列塞爆了
3、解决办法:适当的加大队列
4、原因:消费者一天的消费能力已经高于生产者,那说明一天之内肯定能处理完,保证高峰期别把队列塞满就好
场景三:
1、消费者每天处理的量比生产者生产的多
2、条件有限或其他原因,队列没法设置特别大
3、系统高峰期生产者速度太快,把队列塞爆了
4、解决办法:生产者限流
5、原因:消费者一天的消费能力高于生产者,说明一天内能处理完,队列又太小,那只能限流生产者,让高峰期塞队列的速度慢点
应用场景
一批任务 需要监听是否处理完成
/**
* @author yuye
* @version 1.0
*/
@Component
public class OperateClearQueue {
/**
* 配置的任务内存队列数量
*/
@Value("${bucket.operate.queue-num:32}")
private Integer queueNum;
/**
* 处理任务的队列列表
*/
private final List<BlockingQueue> operateQueue = new ArrayList<>();
/**
* 下一个任务处理队列在列表中的下标
*/
private AtomicInteger index = new AtomicInteger();
@PostConstruct
public void init(){
ExecutorService executors = Executors.newFixedThreadPool(queueNum);
for (int i = 0; i< queueNum; i++){
//设置一个队列最大容纳数量
BlockingQueue blockingQueue = new ArrayBlockingQueue(150000);
operateQueue.add(blockingQueue);
executors.execute(new OperateClearRunner(blockingQueue,inventoryBucketService));
}
}
/**
* 轮询
* @param object
* @return
*/
public boolean offerByRoundRobin(Object object) {
index.compareAndSet(queueNum * 10000,0);
boolean offer = operateQueue.get(index.getAndIncrement() % queueNum).offer(object);
return offer;
}
/**
* 所有队列任务处理完成
* @return
*/
public boolean checkProcessFinish() {
for (BlockingQueue blockingQueue : operateQueue) {
if (blockingQueue.size() > 0) {
return false;
}
}
return true;
}
}
但是这个有几个问题
我想支持顺序执行 (同一个订单号的按照顺序执行)
对 offerByRoundRobin 方法进行扩展,支持自己选择 queue,对一定参数进行hash 保证顺序
如果服务突然宕机,会不会导致数据不一致,或者其他问题
会的,这就是后面的处理要支持 幂等操作的原因,还可能丢失数据,如果纯依靠内存 操作的话
如果生产者性能高,有没有更进一步提升性能的方法
可以添加 几个额外的cache 区,生产满了之后,就切换2个空间,让消费者 继续消费的同时,那边继续生产,形成一个交替
/**
* 交换队列
*/
private void swapRequests() {
lock.lock();
try {
LinkedList<Object> tmp = writeQueue;
writeQueue = readQueue;
readQueue = tmp;
} finally {
lock.unlock();
}
}
最终定版
/**
* @author yuye
* @version 1.0
*/
//@Component
public class LocalQueue {
/**
* 配置的任务内存队列数量
*/
private Integer queueNum;
/**
* 处理任务的队列列表
*/
private final List<BlockingQueue> operateQueue = new ArrayList<>();
/**
* 下一个任务处理队列在列表中的下标
*/
private AtomicInteger index = new AtomicInteger();
public LocalQueue(BaseLocalQueueRunner command) {
this(command, 32, 150000);
}
public LocalQueue(BaseLocalQueueRunner command, int queueNum, int capacity) {
if(queueNum <= 0 || capacity <= 0){
throw new RuntimeException("LocalQueue queueNum/capacity is not less than 0");
}
if(command == null){
throw new RuntimeException("LocalQueue command is not blank");
}
this.queueNum = queueNum;
ExecutorService executors = Executors.newFixedThreadPool(queueNum);
for (int i = 0; i< queueNum; i++){
//设置一个队列最大容纳数量
BlockingQueue blockingQueue = new ArrayBlockingQueue(capacity);
operateQueue.add(blockingQueue);
BaseLocalQueueRunner runnable = command.getInstance();
runnable.setBlockingQueue(blockingQueue);
executors.execute(runnable);
}
}
/**
* 轮询
* @param object
* @return
*/
public boolean offerByRoundRobin(Object object) {
return offerByRoundRobin(object, index.getAndIncrement() % queueNum);
}
/**
* 轮询
* @param object
* @param indexQueue 选择的queue index
* @return
*/
public boolean offerByRoundRobin(Object object, int indexQueue) {
index.compareAndSet(queueNum * 10000,0);
boolean offer = operateQueue.get(indexQueue).offer(object);
return offer;
}
/**
* 所有队列任务处理完成
* @return
*/
public boolean checkProcessFinish() {
for (BlockingQueue blockingQueue : operateQueue) {
if (blockingQueue.size() > 0) {
return false;
}
}
return true;
}
}
问题
如果想保证顺序呢?
下一次
我们打印日志的时候 有的时候 需要对日志的部分内容进行脱敏,敏感词变为 *号,对手机号和密码进行处理,或者不返回
todo
实现搜索效果 (springboot-starter 一个jar 实现这种效果)
代码地址
个人
请勿转载
转载自:https://juejin.cn/post/7268996716648235027