Reactive-MongoDB异步Java Driver解读
publicclassObservableSubscriberimplementsSubscriber{ //响应数据 privatefinalList received; //错误信息 privatefinalList errors; //等待对象 privatefinalCountDownLatch latch; //订阅器 privatevolatileSubscription subscription; //是否完成 privatevolatileboolean completed; publicObservableSubscriber(){ this.received =newArrayList(); this.errors =newArrayList(); this.latch =newCountDownLatch(1); } @Override publicvoid onSubscribe(finalSubscription s){ subscription = s; } @Override publicvoid onNext(final T t){ received.add(t); } @Override publicvoid onError(finalThrowable t){ errors.add(t); onComplete(); } @Override publicvoid onComplete(){ completed =true; latch.countDown(); } publicSubscription getSubscription(){ return subscription; } publicList getReceived(){ return received; } publicThrowable getError(){ if(errors.size()>0){ return errors.get(0); } returnnull; } publicboolean isCompleted(){ return completed; } /** * 阻塞一定时间等待结果 * * @param timeout * @param unit * @return * @throws Throwable */ publicListget(finallong timeout,finalTimeUnit unit)throwsThrowable{ return await(timeout, unit).getReceived(); } /** * 一直阻塞等待请求完成 * * @return * @throws Throwable */ publicObservableSubscriber await()throwsThrowable{ return await(Long.MAX_VALUE,TimeUnit.MILLISECONDS); } /** * 阻塞一定时间等待完成 * * @param timeout * @param unit * @return * @throws Throwable */ publicObservableSubscriber await(finallong timeout,finalTimeUnit unit)throwsThrowable{ subscription.request(Integer.MAX_VALUE); if(!latch.await(timeout, unit)){ thrownewMongoTimeoutException("Publisher onComplete timed out"); } if(!errors.isEmpty()){ throw errors.get(0); } returnthis; }} 借助这个基础的工具类,我们对于文档的异步操作就变得简单多了。 比如对于文档查询的操作可以改造如下: (编辑:应用网_阳江站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |