在基于服务器端的网络编程中,常见的一种线程模式是基于线程池的模式,一个线程对应对应一个客户端的连接,该线程处理关联客户端的各类事件,并且使用线程池化方案复用线程。另一种是基于NIO模式,使用基于运行服务配置的线程个数,一个线程绑定一个Selector选择器,监听绑定到该选择器的通道的各类事件。详细过程可以参考为什么要使用NIO?Tomcat是如何解决服务器端高并发的请求。 在基于Netty的网络编程中,Channel定义了网络通信的两端,例如,客户端SocketChannel与服务端SeverSocketChannel。EventLoop定义了通信Channel触发的各类事件的监听,例如,客户端连接请求。EventLoopGroup定义了一组EventLoop。通常,EventLoop与一个Thread绑定,EventLoop与Channel绑定后,两者关联关系就不会改变。以服务端接收监听客户端的为例,执行流程如下: Netty中基于线程池的线程模式是OioEventLoopGroup。当服务器端使用该种线程模式时,当客户端有请求连接时,OioEventLoopGroup会生成一个EventLoop与之关联,并且使用集合缓存生成的EventLoop对象。当EventLoop对象生成的个数大于集合设置的限制大小就会抛出太多客户端连接异常(tooManyChannels),其流程如下: 当使用OioEventLoopGroup模式定义服务器端时,当监听到客户端连接时,需要通过OioEventLoopGroup注册客户端连接SocketChannel,OioEventLoopGroup部分源码如下: 当使用ThreadPerChannelEventLoop时,当绑定的SocketChannel触发各类事件时,都有ThreadPerChannelEventLoop对应的线程处理,ThreadPerChannelEventLoop源码如下: Netty中基于NIO的线程模式是NioEventLoopGroup。当服务器端采用NioEventLoopGroup线程模式时,在初始化NioEventLoopGroup时,必须初始化NioEventLoop数组的大小,一般会执行与服务器集器拥有的线程数个数的线程,提高接收客户端连接的效率,例如,在dubbo中NettyServer初始化时,其设置的线程组大小为Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);。当客户端发起连接请求时,根据策略获取NioEventLoop数组中的一个NioEventLoop的Selector选择器进行绑定,注册多个SocketChannel。这样,就可以通过单个NioEventLoop监听绑定的所有定的SocketChannel触发的事件了,其执行流程如下: NioEventLoopGroup初始化时,需要指定NioEventLoop数组的大小,其源码如下: 当客户端连接时,需要通过EventExecutorChooserFactory.EventExecutorChooser chooser,选择NioEventLoop中的Selector绑定客户端连接SocketChannel。其源码如下: 使用案例之dubbo服务发布 NioEventLoopGroup在使用时,在服务器端,通常使用parentGroup(NioEventLoopGroup只初始化一个NioEventLoop)绑定ServerSocketChannel服务端SocketChannel,监听客户端连接请求。使用childGroup(NioEventLoopGroup初始化配置个数的数组)绑定客户端连接请求SocketChannel,监听客户端的各类事件。在dubbo中发布服务使用Netty作为服务器时,使用NettyServer配置NioEventLoopGroup,childGroup数组的默认配置大小为Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);,其部分源码如下: OioEventLoopGroup模式
/** * 注册客户端连接Channel */ // ThreadPerChannelEventLoopGroup.class @Override public ChannelFuture register(Channel channel) { if (channel == null) { throw new NullPointerException("channel"); } try { // 从activeChildren(活跃的EventLoop集合)获取或者新创建EventLoop EventLoop l = nextChild(); return l.register(new DefaultChannelPromise(channel, l)); } catch (Throwable t) { return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t); } } /** * 从activeChildren(活跃的EventLoop集合)获取或者新创建EventLoop */ // ThreadPerChannelEventLoopGroup.class private EventLoop nextChild() throws Exception { if (shuttingDown) { throw new RejectedExecutionException("shutting down"); } // 从空闲集合中取出,如果有空闲的直接转到活跃的EventLoop数组 EventLoop loop = idleChildren.poll(); if (loop == null) { if (maxChannels > 0 && activeChildren.size() >= maxChannels) { throw tooManyChannels; } // 新创建EventLoop loop = newChild(childArgs); loop.terminationFuture().addListener(childTerminationListener); } activeChildren.add(loop); return loop; } /** * ThreadPerChannelEventLoop即一个线程对应一个客户端连接Channel */ protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception { return new ThreadPerChannelEventLoop(this); }
/** * 异步执行 */ // SingleThreadEventExecutor.class @Override protected void run() { for (;;) { // 获取SocketChannel触发事件需要执行的任务 Runnable task = takeTask(); if (task != null) { // 在当前线程中执行 task.run(); updateLastExecutionTime(); } Channel ch = this.ch; if (isShuttingDown()) { if (ch != null) { ch.unsafe().close(ch.unsafe().voidPromise()); } if (confirmShutdown()) { break; } } else { if (ch != null) { // Handle deregistration if (!ch.isRegistered()) { runAllTasks(); deregister(); } } } } } /** * 从taskQueue任务队列中取出需要执行的任务 */ // SingleThreadEventExecutor.class protected Runnable takeTask() { assert inEventLoop(); if (!(taskQueue instanceof BlockingQueue)) { throw new UnsupportedOperationException(); } BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue; for (;;) { ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); if (scheduledTask == null) { Runnable task = null; try { task = taskQueue.take(); if (task == WAKEUP_TASK) { task = null; } } catch (InterruptedException e) { // Ignore } return task; } else { long delayNanos = scheduledTask.delayNanos(); Runnable task = null; if (delayNanos > 0) { try { task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { // Waken up. return null; } } if (task == null) { // We need to fetch the scheduled tasks now as otherwise there may be a chance that // scheduled tasks are never executed if there is always one task in the taskQueue. // This is for example true for the read task of OIO Transport // See https://github.com/netty/netty/issues/1614 fetchFromScheduledTaskQueue(); task = taskQueue.poll(); } if (task != null) { return task; } } } }
NioEventLoopGroup模式
/** * NioEventLoopGroup父类初始化逻辑 */ // MultithreadEventExecutorGroup.class protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // NioEventLoop数组初始化 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 初始化EventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 设置 NioEventLoop数组获取NioEventLoop的选择策略 chooser = chooserFactory.newChooser(children); // 监听事件 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); } /** * 初始化EventLoop 逻辑 */ // NioEventLoopGroup.class @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
/** *注册客户端连接Channel **/ // MultithreadEventLoopGroup.class @Override public ChannelFuture register(Channel channel) { return next().register(channel); } /** *实际注册客户端连接Channel **/ // NioEventLoop.class public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) { if (ch == null) { throw new NullPointerException("ch"); } if (interestOps == 0) { throw new IllegalArgumentException("interestOps must be non-zero."); } if ((interestOps & ~ch.validOps()) != 0) { throw new IllegalArgumentException( "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')'); } if (task == null) { throw new NullPointerException("task"); } if (isShutdown()) { throw new IllegalStateException("event loop shut down"); } try { // 可选择的客户端连接SelectableChannel,注册到NioEventLoop中的Selector选择器 ch.register(selector, interestOps, task); } catch (Exception e) { throw new EventLoopException("failed to register a channel", e); } }
/** * 启动Netty服务端监听 */ // NettyServer.class @Override protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); // 初始化包含一个NioEventLoop对象的eventLoopGroup,用于监听ServerSocketChannel的客户端连接 bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss"); // 默认情况下创建Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);个数的NioEventLoop,用于绑定客户端连接SocketChannel,并监听客户端连接的各类事件 workerGroup = NettyEventLoopFactory.eventLoopGroup( getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), "NettyServerWorker"); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopFactory.serverSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 配置客户端连接的一组拦截器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); if (getUrl().getParameter(SSL_ENABLED_KEY, false)) { ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler)); } ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算