RxJava的Observable订阅时如何作到在onComplete被调用时取消订阅?

作者站长头像
站长
· 阅读数 5

例:

table.subscribe(tableIns -> {
            // System.out.println("-------Information-------");
            System.out.println(tableIns);
        }, throwable -> {
            throw new SchemaExportException(throwable);
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("Complete");
                // 在这里取消订阅
        });

注意:是非Android 运行环境, 使用的是RxJava2.x

回复
1个回答
avatar
test
2024-06-25

暂时只能在onComplete中设置: CompletableFuture.complete来通知调用方结束了. 示例:

Flowable由持久层方法返回,是调用方中的:result.getAll(dbName.get(), strategy)result.getTableColumn(table)

public class ConsoleSchemaFlowableOutput implements SchemaFlowableOutput {
    private final static Logger logger = LoggerFactory.getLogger(ConsoleSchemaFlowableOutput.class);
    private volatile CompletableFuture<String> future = new CompletableFuture<>();
    private AtomicInteger count = new AtomicInteger(0);
    @Override
    public Disposable flush(Information information, Flowable<Table> table) throws SchemaExportException {
        logger.info("Start Flowable Flush");
        Disposable export_flush_complete = table.subscribe(tableIns -> {
            System.out.println(printAsciiTable(tableIns));
            System.out.println(printAsciiColumns(tableIns.getColumns()));
            System.out.println("\r\n");
            count.addAndGet(1);
        }, throwable -> {
            logger.debug("Export Break, reason: " + throwable.getMessage());
            future.cancel(true);
            throw new SchemaExportException(throwable);
        }, new Action() {
            @Override
            public void run() throws Exception {
                logger.debug("Export Complete, Affect Size:"+count.get());
                future.complete("OK");
            }
        });
        return export_flush_complete;
    }
    @Override
    public CompletableFuture<String> getFuture() {
        return future;
    }
    ...
}

调用方:

    public void export(Information info, SchemaFlowableOutput out) throws SchemaExportException{
        long startStamp = System.currentTimeMillis();
        // Flowable
        Flowable<Table> tableFlowable = result.getAll(dbName.get(), strategy).flatMap(new Function<Table, Publisher<Table>>() {
            @Override
            public Publisher<Table> apply(@NonNull Table table) throws Exception {
                return result.getTableColumn(table).flatMap(new Function<List<Column>, SingleSource<Table>>() {
                    @Override
                    public SingleSource<Table> apply(@NonNull List<Column> columns) throws Exception {
                        return Single.just(table.fillColumn(columns));
                    }
                }).flatMapPublisher(new Function<Table, Publisher<? extends Table>>() {
                    @Override
                    public Publisher<? extends Table> apply(@NonNull Table table) throws Exception {
                        return Flowable.just(table);
                    }
                });
            }
        });
        Disposable disposable = null;
        try {
            disposable = out.flush(info, tableFlowable);
            CompletableFuture<String> future= out.getFuture();
            while(!future.isDone()){
                logger.info("[ERE-Flowable]未完成,线程休眠1秒");
                Thread.currentThread().sleep(1000,0);
            }
            String result = future.get();
            logger.info("[ERE-Flowable]完成, 结果:"+result);
            if(result.equals("OK")){
                long finishStamp = System.currentTimeMillis();
                clearHander(disposable, "[ERE-Flowable]RxJava disposed because complete, WithTime: "+(finishStamp-startStamp));
            }
        }catch (Exception e){
            clearHander(disposable, "[ERE-Flowable]RxJava disposed has Exception: "+e.getMessage());
        }
    }

    private void clearHander(Disposable disposable, String reason){
        logger.info(reason);
        if(null!=disposable && !disposable.isDisposed()) {
            disposable.dispose();
        }else{
            if(null != disposable) {
                logger.info("[CH]disposable status:" + disposable.isDisposed());
            }else{
                logger.info("[CH]disposable is null:");
            }
        }
        // 结束后的回调,执行一些清理工作
        completeHandler.apply();
    }
回复
likes
适合作为回答的
  • 经过验证的有效解决办法
  • 自己的经验指引,对解决问题有帮助
  • 遵循 Markdown 语法排版,代码语义正确
不该作为回答的
  • 询问内容细节或回复楼层
  • 与题目无关的内容
  • “赞”“顶”“同问”“看手册”“解决了没”等毫无意义的内容