RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
56.Netty源代码分析-服务器初始化NioEventLoopGroup实例化

一. 代码下载

Netty代码下载和编译参考前一篇Netty文章
https://blog.51cto.com/483181/2112163

创新互联是一家集网站建设,东源企业网站建设,东源品牌网站建设,网站定制,东源网站建设报价,网络营销,网络优化,东源网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。

二. 服务器代码分析

2.1 服务器代码编写

一般Netty服务器端这样编写

    EventLoopGroup bossGroup = new NioEventLoopGroup();   //1. 实例化NioEventLoopGroup对象
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); //2. 
            b.group(bossGroup, workerGroup) //3. 
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
                    }
                });

            ChannelFuture f = b.bind(port).sync(); //4.
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

2.2 NioEventLoopGroup

2.2.1 NioEventLoopGroup继承关系

一步步来看,首先看第一个注释,初始化NioEventLoopGroup对象

EventLoopGroup bossGroup = new NioEventLoopGroup();   //1. 实例化NioEventLoopGroup对象

下图是NioEventLoopGroup的类继承图,包含类成员和方法,比较详细。 这个功能是IntelliJ 自带的。
右击NioEventLoopGroup类名,选择Diagrams->Show Diagram->上面有f,m的按钮,分别对应field和method。

如下:

56. Netty源代码分析-服务器初始化 NioEventLoopGroup实例化

2.2.2 NioEventLoopGroup构造函数

public NioEventLoopGroup() {
        this(0);
    }

        public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }       

        public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

我们可以看到几点

  1. NioEventLoopGroup采用的是构造函数重载的方式,以适应不同的初始化场景
  2. Executor传的是null
  3. SelectorProvider用的是SelectorProvider.provider()
  4. 然后把构造好的参数都传给父类MultithreadEventLoopGroup (继承关系可以看上图)

2.2.3 SelectorProvider.provider()

private static SelectorProvider provider = null;

public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }

public class DefaultSelectorProvider {
    private DefaultSelectorProvider() {
    }

    public static SelectorProvider create() {
        return new KQueueSelectorProvider();
    }
}

public class KQueueSelectorProvider extends SelectorProviderImpl {
    public KQueueSelectorProvider() {
    }

    public AbstractSelector openSelector() throws IOException {
        return new KQueueSelectorImpl(this);
    }
}

这段代码我们也可以看到几点:

  1. SelectorProvider provider是一个单例,static类型
  2. SelectorProvider.provider的实现,产生了一个KQueueSelectorProvider
  3. KQueueSelectorProvider的openSelector会生成一个KQueueSelectorImpl

这个先记下来,也许后面分析会有用,继续分析MultithreadEventLoopGroup的构造函数。

2.2.4 MultithreadEventLoopGroup

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }

private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    }       

上面这段代码我们可以看到这几点:

  1. 如果我们实例化NioEventLoopGroup没有传入参数,也就是没有nThreads,那么就会采用默认的DEFAULT_EVENT_LOOP_THREADS
    DEFAULT_EVENT_LOOP_THREADS如果没有配置io.netty.eventLoopThreads的话,一般是cpu核数*2
  2. MultithreadEventLoopGroup的实例化方法是继续调用父类的初始化方法。

继续父类MultithreadEventExecutorGroup

2.2.5 MultithreadEventExecutorGroup

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        ...

        children = new EventExecutor[nThreads]; //1. 实例化children数组

        for (int i = 0; i < nThreads; i ++) { //2. 循环初始化children
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                ...
            }
        }

        chooser = chooserFactory.newChooser(children); //3. 实例化chooser

        final FutureListener terminationListener = new FutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set childrenSet = new LinkedHashSet(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

上面这段代码可以从下面几个点分析:

private final EventExecutor[] children;
  1. children - EventExecutor数组,大小是nThreads,线程数目。
  2. newChild初始化
    实例类是NioEventLoopGroup.java,返回NioEventLoop对象
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

    NioEventLoop的继承关系是这样的,继承于SingleThreadEventLoop,别忘了上面我们看到NioEventLoopGroup继承自MultithreadEventLoopGroup.(看名字是单线程和多线程的区别?)

56. Netty源代码分析-服务器初始化 NioEventLoopGroup实例化

继续看NioEventLoop的构造函数

2.2.6 NioEventLoop

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);

        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
                if (DISABLE_KEYSET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }
                ...
}

从上面这段代码我们可以看出这几点

  1. NioEventLoop里面保存了SelectorProvider selectorProvider, Selector selector, unwrappedSelector(类型是KQueueSelectorImpl)
  2. selector, unwrappedSelector是通过provider.openSelector()打开的.
    根据2.3段的介绍,provider之前介绍的类型是KQueueSelectorProvider,然后它的openSelector会生成一个KQueueSelectorImpl
    所以provider.openSelector()得到是KQueueSelectorImpl,KQueueSelectorImpl的继承关系如下:

    56. Netty源代码分析-服务器初始化 NioEventLoopGroup实例化

继续往回看,看MultithreadEventExecutorGroup的构造函数。

2.2.7 newChooser

EventExecutorChooserFactory.EventExecutorChooser chooser;

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

chooser = chooserFactory.newChooser(children);

上面代码我们可以看到:

  1. chooserFactory的类型是DefaultEventExecutorChooserFactory,所以newChooser调用的是DefaultEventExecutorChooserFactory.newChooser方法。
    如下:
public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
  1. 传入的参数是children,也就是NioEventLoop数组
  2. DefaultEventExecutorChooserFactory INSTANCE是一个static final类型对象,也就是一种饿汉式的单例模式,如下:
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

继续看newChooser的实现

2.2.8 newChooser

newChooser的代码就不贴了,上面就有,从上面代码可以看到:

  1. isPowerOfTwo是用来判断一个整数是否是2的幂,比如(2,4, 8,16,32等等),它的实现方式如下:
private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }       

这种实现方法感觉比较优雅和高效,首先拿到-val,也就是val的二进制倒转,然后+1。再做&运算。
大家自己可以拿到数字举个例子,比较巧妙。后续自己写代码可以借鉴,这是读源码的一个好处,可以学习到别人很多优秀的写法。

  1. PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser的不同之处在于next方法的算法不一样,作用都是从NioEventLoop数组里面选出一个NioEventLoop对象来。

但是说实话,我没有想到这两种算法有什么区别,如果谁知道,请告诉我,谢谢。

return executors[idx.getAndIncrement() & executors.length - 1];

return executors[Math.abs(idx.getAndIncrement() % executors.length)];

继续往回走,MultithreadEventExecutorGroup的构造函数就基本看完了。

三. 总结

我们来总结下NioEventLoopGroup的实例化过程,可以得到以下几点。

1. NioEventLoopGroup的父类MultithreadEventExecutorGroup包含一个NioEventLoop数组children,数组的大小等于nThreads线程数目。如果没有指定,默认一般是cpu核数 x 2

2. NioEventLoopGroup和NioEventLoop一样都是继承自Executor,但是NioEventLoopGroup又包含多个NioEventLoop(children数组),这种关系有点像android里面ViewGroup和View的关系。或者装饰者模式?

3. NioEventLoopGroup继承自MultithreadEventLoopGroup,而NioEventLoop继承自SingleThreadEventLoop,从名字看,不知道和多线程,单线程有没有关系。

4. MultithreadEventLoopGroup有个chooser,执行next方法的时候,会选择下一个NioEventLoop对象,虽然并不知道两个chooser算法有何区别。

5. NioEventLoopGroup里面重写了newChild方法,里面实例化NioEventLoop。

6. NioEventLoop里面包含了Selector,类型是KQueueSelectorImpl

SelectorProvider provider
SelectStrategy selectStrategy

SelectStrategy这个我们上面我们没有关注,其实它是NioEventLoopGroup构造函数传进去的,如下:

public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
    public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();

    private DefaultSelectStrategyFactory() { }

    @Override
    public SelectStrategy newSelectStrategy() {
        return DefaultSelectStrategy.INSTANCE;
    }
}

final class DefaultSelectStrategy implements SelectStrategy {
    static final SelectStrategy INSTANCE = new DefaultSelectStrategy();

    private DefaultSelectStrategy() { }

    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
}

所以SelectStrategy的实现类是DefaultSelectStrategy.

在理清楚NioEventLoopGroup实例化的过程之后,我们下一篇继续按照源代码分析Netty服务器端的源代码。


当前文章:56.Netty源代码分析-服务器初始化NioEventLoopGroup实例化
标题链接:http://scpingwu.com/article/jgcpse.html