@Override protected void initChannel(SocketChannel ch) throws Exception { for (String name : handlers.keySet()) { /* 有些handler不是线程安全的,不能在多个channel件共享 */ Class<? extends ChannelHandler> clazz = handlers.get(name); ChannelHandler handler = clazz.newInstance(); /* 通过为每个handler指定concurrency提供SEDA特性支持 */ /* 不设置和值小与0为关闭,在没有数据分析支撑的情况下不要打开, 过多的线程切换会降低性能 */ int concurrency = AppSettings.getInstance() .getInt(String.format("ctu.upns.node.server.channel.handlers.%s.concurrency", name)); if (concurrency > 0) { ch.pipeline().addLast(new LocalEventLoopGroup(concurrency), name, handler); } else { ch.pipeline().addLast(name, handler); } } logger.info("init channel:[remote={},local:{}] with handlers:[{}]", ch.remoteAddress(), ch.localAddress(), ch.pipeline().toMap()); }
ServerBootstrap getLocalServerBootstrap() { EventLoopGroup serverGroup = new LocalEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(serverGroup); sb.channel(LocalServerChannel.class); sb.childHandler(new ChannelInitializer<LocalChannel>() { @Override public void initChannel(LocalChannel ch) throws Exception { } }); return sb; }
Bootstrap getLocalClientBootstrap() { EventLoopGroup clientGroup = new LocalEventLoopGroup(); Bootstrap cb = new Bootstrap(); cb.channel(LocalChannel.class); cb.group(clientGroup); cb.handler(loggingHandler); return cb; }
Bootstrap getLocalClientBootstrap() { EventLoopGroup clientGroup = new LocalEventLoopGroup(); Bootstrap cb = new Bootstrap(); cb.channel(LocalChannel.class); cb.group(clientGroup); cb.handler(this.loggingHandler); return cb; }
@Test public void testStartWithSyncCompletingActions() throws Exception { long now = System.currentTimeMillis(); PluginContext pctx = PluginContext.createLocal("plugin1"); final EventLoopGroup pluginEventLoop = new LocalEventLoopGroup(3); EventLoopExecutor pluginExecutor = new EventLoopExecutor() { @Override public Future executeInEventLoop(Runnable runnable) { return pluginEventLoop.submit(runnable); } }; List<Action> actions = new ArrayList<>(); MockImmediateCompleteAction a1 = new MockImmediateCompleteAction(pctx, new MockActionExecutionContext(), pluginExecutor); MockImmediateCompleteAction a2 = new MockImmediateCompleteAction(pctx, new MockActionExecutionContext(), pluginExecutor); MockImmediateCompleteAction a3 = new MockImmediateCompleteAction(pctx, new MockActionExecutionContext(), pluginExecutor); actions.add(a1); actions.add(a2); actions.add(a3); CompositeAction ca = new CompositeAction(actions); Job job = new Job(ca, 2000, now); job.start(); Thread.sleep(1000); assertTrue(a1.isOnStartCalled()); assertTrue(a2.isOnStartCalled()); assertTrue(a3.isOnStartCalled()); assertFalse(job.isInProgress()); assertTrue(job.isComplete()); }
@Test public void shouldShowUnmanagedCustomResourcesInEnvDump() { //create an environment with a custom IOPool and Scheduler that are not cleaned up on shutdown DefaultCoreEnvironment env = DefaultCoreEnvironment.builder() .ioPool(new LocalEventLoopGroup()) .scheduler(Schedulers.newThread()).build(); String dump = env.dumpParameters(new StringBuilder()).toString(); assertTrue(dump, dump.contains("LocalEventLoopGroup!unmanaged")); assertTrue(dump, dump.contains("NewThreadScheduler!unmanaged")); }
protected LocalEventLoopGroup load() { return new LocalEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Server IO #%d").setDaemon(true).build()); }
protected LocalEventLoopGroup load() { return new LocalEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).build()); }
@Test(timeout = 10000) public void testBindDeadLock() throws Exception { EventLoopGroup groupA = new LocalEventLoopGroup(1); EventLoopGroup groupB = new LocalEventLoopGroup(1); try { ChannelInboundHandler dummyHandler = new DummyHandler(); final Bootstrap bootstrapA = new Bootstrap(); bootstrapA.group(groupA); bootstrapA.channel(LocalChannel.class); bootstrapA.handler(dummyHandler); final Bootstrap bootstrapB = new Bootstrap(); bootstrapB.group(groupB); bootstrapB.channel(LocalChannel.class); bootstrapB.handler(dummyHandler); List<Future<?>> bindFutures = new ArrayList<Future<?>>(); // Try to bind from each other. for (int i = 0; i < 1024; i ++) { bindFutures.add(groupA.next().submit(new Runnable() { @Override public void run() { bootstrapB.bind(LocalAddress.ANY); } })); bindFutures.add(groupB.next().submit(new Runnable() { @Override public void run() { bootstrapA.bind(LocalAddress.ANY); } })); } for (Future<?> f: bindFutures) { f.sync(); } } finally { groupA.shutdownGracefully(); groupB.shutdownGracefully(); groupA.terminationFuture().sync(); groupB.terminationFuture().sync(); } }
@Test(timeout = 10000) public void testConnectDeadLock() throws Exception { EventLoopGroup groupA = new LocalEventLoopGroup(1); EventLoopGroup groupB = new LocalEventLoopGroup(1); try { ChannelInboundHandler dummyHandler = new DummyHandler(); final Bootstrap bootstrapA = new Bootstrap(); bootstrapA.group(groupA); bootstrapA.channel(LocalChannel.class); bootstrapA.handler(dummyHandler); final Bootstrap bootstrapB = new Bootstrap(); bootstrapB.group(groupB); bootstrapB.channel(LocalChannel.class); bootstrapB.handler(dummyHandler); List<Future<?>> bindFutures = new ArrayList<Future<?>>(); // Try to connect from each other. for (int i = 0; i < 1024; i ++) { bindFutures.add(groupA.next().submit(new Runnable() { @Override public void run() { bootstrapB.connect(LocalAddress.ANY); } })); bindFutures.add(groupB.next().submit(new Runnable() { @Override public void run() { bootstrapA.connect(LocalAddress.ANY); } })); } for (Future<?> f: bindFutures) { f.sync(); } } finally { groupA.shutdownGracefully(); groupB.shutdownGracefully(); groupA.terminationFuture().sync(); groupB.terminationFuture().sync(); } }
@Test public void testStartWithAsyncCompletingActions() throws Exception { long now = System.currentTimeMillis(); PluginContext pctx = PluginContext.createLocal("plugin1"); final EventLoopGroup pluginEventLoop = new LocalEventLoopGroup(3); EventLoopExecutor pluginExecutor = new EventLoopExecutor() { @Override public Future executeInEventLoop(Runnable runnable) { return pluginEventLoop.submit(runnable); } }; List<Action> actions = new ArrayList<>(); MockEventCompleteAction a1 = new MockEventCompleteAction(pctx, new MockActionExecutionContext(), pluginExecutor); MockEventCompleteAction a2 = new MockEventCompleteAction(pctx, new MockActionExecutionContext(), pluginExecutor); MockEventCompleteAction a3 = new MockEventCompleteAction(pctx, new MockActionExecutionContext(), pluginExecutor); actions.add(a1); actions.add(a2); actions.add(a3); CompositeAction ca = new CompositeAction(actions); Job job = new Job(ca, 2000, now); job.start().sync(); assertTrue(a1.isOnStartCalled()); assertFalse(a2.isOnStartCalled()); assertTrue(job.isInProgress()); assertFalse(job.isComplete()); job.message("complete", null).sync(); Thread.sleep(500); assertTrue(a2.isOnStartCalled()); assertTrue(job.isInProgress()); assertFalse(job.isComplete()); job.message("complete", null).sync(); Thread.sleep(500); assertTrue(a3.isOnStartCalled()); assertTrue(job.isInProgress()); assertFalse(job.isComplete()); job.message("complete", null).sync(); Thread.sleep(500); assertFalse(job.isInProgress()); assertTrue(job.isComplete()); }
protected LocalEventLoopGroup a() { return new LocalEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Server IO #%d").setDaemon(true).build()); }
protected LocalEventLoopGroup a() { return new LocalEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).build()); }