博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty源码分析之-Future、ChannelFuture与ChannelPromise详解(3)
阅读量:4290 次
发布时间:2019-05-27

本文共 8362 字,大约阅读时间需要 27 分钟。

对于jdk底层已经有对Future的实现,用来执行异步操作并且提供相应对结果操作的方法。但是,在netty内部也同样实现了自己的Future,并且继承了jdk中的Future接口,提供了一些额外的方法来针对在netty中相关的异步I/O操作来进行处理

jdk中的Future

该接口表示的是异步计算的结果,提供若干方法来监测计算是否完成、等待计算完成、获取计算的结果。下面举例其使用方法:

{  interface ArchiveSearcher { String search(String target); }  class App {    ExecutorService executor = ...    ArchiveSearcher searcher = ...    void showSearch(final String target)        throws InterruptedException {      Future
future = executor.submit(new Callable
() { public String call() { return searcher.search(target); }}); displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }}

future的get来获取异步计算的结果,该方法会阻塞直到计算完成

上述submit方法也可以被以下替换:

{  FutureTask
future = new FutureTask
(new Callable
() { public String call() { return searcher.search(target); }}); executor.execute(future);}

netty中的Future

public interface Future
extends java.util.concurrent.Future

下面是一些比较重要方法的定义,其中addListener方法非常重要:

  1. cause方法表示如果I/O操作失败,返回异常信息
  2. cancel方法的boolean参数表示是否对已经开始执行的操作进行中断
  3. isSuccess方法表示I/O操作是否已经成功的完成。对于上述jdk中Future申明的isDone方法,只能知道I/O是否结束,有可能是成功完成、被取消、异常中断。netty中Future的此isSuccess方法能够很好的判断出操作是否正真地成功完成
  4. sync方法阻塞直到future完成操作,如果操作失败会重新抛出异常
  5. addListener方法会添加特定的监听器到future,这些监听器会在future isDone返回true的时候立刻被通知。这是netty中很重要的扩展方法,这里用到了

addListener方法传入的监听器会实现以下接口,也就是被通知的时候operationComplete方法会被调用:

public interface GenericFutureListener
> extends EventListener {
/** * Invoked when the operation associated with the {@link Future} has been completed. * * @param future the source {@link Future} which called this callback */ void operationComplete(F future) throws Exception;}

为什么future中有get方法来获取异步的结果,这里又扩展了监听器这种方法。如果使用get,我们会考虑到底在什么时候使用,因为该方法会阻塞后续逻辑代码,如果我们使用监听器,毫无疑问,会更加优雅地在合理的时间来处理我们的逻辑代码


ChannelFuture

netty中的ChannelFuture继承来netty中的自己的Future

public interface ChannelFuture extends Future

ChannelFuture表示Channel中异步I/O操作的结果,在netty中所有的I/O操作都是异步的,I/O的调用会直接返回,可以通过ChannelFuture来获取I/O操作的结果状态。对于多种状态的表示如下:

+---------------------------+ *                                      | Completed successfully    | *                                      +---------------------------+ *                                 +---->      isDone() = true      | * +--------------------------+    |    |   isSuccess() = true      | * |        Uncompleted       |    |    +===========================+ * +--------------------------+    |    | Completed with failure    | * |      isDone() = false    |    |    +---------------------------+ * |   isSuccess() = false    |----+---->      isDone() = true      | * | isCancelled() = false    |    |    |       cause() = non-null  | * |       cause() = null     |    |    +===========================+ * +--------------------------+    |    | Completed by cancellation | *                                 |    +---------------------------+ *                                 +---->      isDone() = true      | *                                      | isCancelled() = true      | *                                      +---------------------------+

需要注意的是failure和cancellation都会表示操作完成,但是对应的状态是不同的。与Future类似,可以通过添加ChannelFutureListener监听器,当I/O操作完成的时候来通知调用。相比于wait()方式也更推荐这种方式来获取结果状态或者执行后续操作。

此外,不建议在ChannelHandler中调用await(),因为ChannelHandler中事件驱动的方法被一个I/O线程调用,可能一直不回完成,那么await()也可能被I/O线程调用,同样会一直block,因此会产生死锁。例如:

//永远不要这样做 * // BAD - NEVER DO THIS * public void channelRead({
@link ChannelHandlerContext} ctx, Object msg) { * {
@link ChannelFuture} future = ctx.channel().close(); * future.awaitUninterruptibly(); * // Perform post-closure operation * // ... * } //而应该这样做: * // GOOD * public void channelRead({
@link ChannelHandlerContext} ctx, Object msg) { * {
@link ChannelFuture} future = ctx.channel().close(); * future.addListener(new {
@link ChannelFutureListener}() { * public void operationComplete({
@link ChannelFuture} future) { * // Perform post-closure operation * // ... * } * }); * }

对于I/O超时和await()超时的区别:

//永远不要这样做 * // BAD - NEVER DO THIS * {@link Bootstrap} b = ...; * {@link ChannelFuture} f = b.connect(...); * f.awaitUninterruptibly(10, TimeUnit.SECONDS); * if (f.isCancelled()) { *     // Connection attempt cancelled by user * } else if (!f.isSuccess()) { *     // You might get a NullPointerException here because the future *     // might not be completed yet. *     f.cause().printStackTrace(); * } else { *     // Connection established successfully * }//当awaitUninterruptibly也就是await超时之后,ChannelFuture对应的连接是可能没有完成,那么执行后续的操作就会异常 //而应该这样做 * // GOOD * {@link Bootstrap} b = ...; * // Configure the connect timeout option. * b.option({@link ChannelOption}.CONNECT_TIMEOUT_MILLIS, 10000); * {@link ChannelFuture} f = b.connect(...); * f.awaitUninterruptibly(); * * // Now we are sure the future is completed. * assert f.isDone(); * * if (f.isCancelled()) { *     // Connection attempt cancelled by user * } else if (!f.isSuccess()) { *     f.cause().printStackTrace(); * } else { *     // Connection established successfully * } //当通过option的方式添加超时时间,如果超时则会被当做failure结果返回,同时再调用awaitUninterruptibly的时候一定是future已经操作完成

ChannelFuture中需要注意的是添加了channel方法来获取Channel:

/**     * Returns a channel where the I/O operation associated with this     * future takes place.     */    Channel channel();

JDK所提供的Future只能通过手工方式检查执行结果,而这个操作是会阻塞的;Netty则对ChannelFuture进行来增强,通过ChannelFutureListener以回调的方式来获取执行结果,去除来手工检查阻塞的操作。需要注意的是ChannelFutureListener的operationComplete方法是由I/O线程执行的,因此要注意的是不要在这里执行耗时操作,否则需要通过另外的线程或线程池来执行

ChannelPromise

ChannelPromise是一种可写的特殊ChannelFuture

public interface ChannelPromise extends ChannelFuture, Promise

对于Promise:

public interface Promise
extends Future

定义了可以标识Future成功或者失败的方法,并且每一个Future只能够被标识一次,如果成功将会去通知之前所定义的listeners

/**     * Marks this future as a success and notifies all     * listeners.     *     * If it is success or failed already it will throw an {
@link IllegalStateException}. */ Promise
setSuccess(V result); /** * Marks this future as a success and notifies all * listeners. * * @return {
@code true} if and only if successfully marked this future as * a success. Otherwise {
@code false} because this future is * already marked as either a success or a failure. */ boolean trySuccess(V result); /** * Marks this future as a failure and notifies all * listeners. * * If it is success or failed already it will throw an {
@link IllegalStateException}. */ Promise
setFailure(Throwable cause); /** * Marks this future as a failure and notifies all * listeners. * * @return {
@code true} if and only if successfully marked this future as * a failure. Otherwise {
@code false} because this future is * already marked as either a success or a failure. */ boolean tryFailure(Throwable cause); /** * Make this future impossible to cancel. * * @return {
@code true} if and only if successfully marked this future as uncancellable or it is already done * without being cancelled. {
@code false} if this future has been cancelled already. */ boolean setUncancellable();

在DefaultChannelPromise默认实现中,当表示为成功时会通知相应listeners

@Override    public ChannelPromise setSuccess(Void result) {        super.setSuccess(result);        return this;    }

在setSuccess方法中:

private void notifyListenersNow() {        ...        for (;;) {            if (listeners instanceof DefaultFutureListeners) {                notifyListeners0((DefaultFutureListeners) listeners);            } else {                notifyListener0(this, (GenericFutureListener
) listeners); } synchronized (this) { if (this.listeners == null) { // Nothing can throw from within this method, so setting notifyingListeners back to false does not // need to be in a finally block. notifyingListeners = false; return; } listeners = this.listeners; this.listeners = null; } } }

转载地址:http://aqrgi.baihongyu.com/

你可能感兴趣的文章
netty源码分析之-Future、ChannelFuture与ChannelPromise详解(3)
查看>>
redis主从集群的搭建
查看>>
redis cluster集群搭建与深入分析(1)
查看>>
netty源码分析之-引导详解(4)
查看>>
redis cluster节点的添加与删除(2)
查看>>
nginx+redis+tomcat三级缓存架构讲解
查看>>
Reactor模式详解
查看>>
基于OpenRestry部署nginx+lua实现流量定向分发
查看>>
netty源码分析之-服务端启动核心源码分析(5)
查看>>
Storm并行度和流分组详解
查看>>
缓存数据预热详解
查看>>
热点数据降级详解(storm+nginx+lua)
查看>>
加载更多功能实现
查看>>
React相关Dom约束性和非约束性操作
查看>>
Hystrix高可用架构介绍
查看>>
netty源码分析之-SimpleChannelInboundHandler与ChannelInboundHandlerAdapter详解(6)
查看>>
netty源码分析之-开发过程中重要事项分析(7)
查看>>
Sublime Text3插件详解
查看>>
netty源码分析之-ByteBuf详解(8)
查看>>
javascript函数定义三种方式详解
查看>>