@Override public ServerTransportListener transportCreated(final ServerTransport transport) { transports.add((NettyServerTransport) transport); return new ServerTransportListener() { @Override public void streamCreated(ServerStream stream, String method, Metadata headers) { EchoServerStreamListener listener = new EchoServerStreamListener(stream, method, headers); stream.setListener(listener); stream.writeHeaders(new Metadata()); stream.request(1); streamListeners.add(listener); } @Override public Attributes transportReady(Attributes transportAttrs) { return transportAttrs; } @Override public void transportTerminated() {} }; }
@Override public void start(ServerListener listener) throws IOException { try { armeriaServer.start().get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } scheduler = Executors.newSingleThreadScheduledExecutor(); listener.transportCreated(new ServerTransport() { @Override public void shutdown() { armeriaServer.stop(); } @Override public void shutdownNow(Status reason) { armeriaServer.stop(); } @Override public ScheduledExecutorService getScheduledExecutorService() { return scheduler; } @Override public Future<Stats> getTransportStats() { return Futures.immediateFuture(null); } @Override public LogId getLogId() { return null; } }); }
@Test public void getPort() throws Exception { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, NioServerSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), null, // no boss group null, // no event group new ProtocolNegotiators.PlaintextNegotiator(), Collections.<ServerStreamTracer.Factory>emptyList(), TransportTracer.getDefaultFactory(), 1, // ignore 1, // ignore 1, // ignore 1, // ignore 1, // ignore 1, 1, // ignore 1, 1, // ignore true, 0); // ignore ns.start(new ServerListener() { @Override public ServerTransportListener transportCreated(ServerTransport transport) { return null; } @Override public void serverShutdown() {} }); // Check that we got an actual port. assertThat(ns.getPort()).isGreaterThan(0); // Cleanup ns.shutdown(); }
@Test public void serverHoldsRefToScheduler() throws Exception { final ScheduledExecutorService ses = new FakeClock().getScheduledExecutorService(); class RefCountingObjectPool implements ObjectPool<ScheduledExecutorService> { private int count; @Override public ScheduledExecutorService getObject() { count++; return ses; } @Override public ScheduledExecutorService returnObject(Object returned) { count--; return null; } } RefCountingObjectPool pool = new RefCountingObjectPool(); InProcessServer s = new InProcessServer("name", pool, Collections.<ServerStreamTracer.Factory>emptyList()); Truth.assertThat(pool.count).isEqualTo(0); s.start(new ServerListener() { @Override public ServerTransportListener transportCreated(ServerTransport transport) { throw new UnsupportedOperationException(); } @Override public void serverShutdown() {} }); Truth.assertThat(pool.count).isEqualTo(1); s.shutdown(); Truth.assertThat(pool.count).isEqualTo(0); }
@Test(timeout = 60000) public void childChannelOptions() throws Exception { final int originalLowWaterMark = 2097169; final int originalHighWaterMark = 2097211; Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>(); channelOptions.put(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(originalLowWaterMark, originalHighWaterMark)); final AtomicInteger lowWaterMark = new AtomicInteger(0); final AtomicInteger highWaterMark = new AtomicInteger(0); final CountDownLatch countDownLatch = new CountDownLatch(1); InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, NioServerSocketChannel.class, channelOptions, null, // no boss group null, // no event group new ProtocolNegotiators.PlaintextNegotiator(), Collections.<ServerStreamTracer.Factory>emptyList(), TransportTracer.getDefaultFactory(), 1, // ignore 1, // ignore 1, // ignore 1, // ignore 1, // ignore 1, 1, // ignore 1, 1, // ignore true, 0); // ignore ns.start(new ServerListener() { @Override public ServerTransportListener transportCreated(ServerTransport transport) { Channel channel = ((NettyServerTransport)transport).channel(); WriteBufferWaterMark writeBufferWaterMark = channel.config() .getOption(ChannelOption.WRITE_BUFFER_WATER_MARK); lowWaterMark.set(writeBufferWaterMark.low()); highWaterMark.set(writeBufferWaterMark.high()); countDownLatch.countDown(); return null; } @Override public void serverShutdown() {} }); Socket socket = new Socket(); socket.connect(new InetSocketAddress("localhost", ns.getPort()), /* timeout= */ 8000); countDownLatch.await(); socket.close(); assertThat(lowWaterMark.get()).isEqualTo(originalLowWaterMark); assertThat(highWaterMark.get()).isEqualTo(originalHighWaterMark); ns.shutdown(); }
@Override public ServerTransportListener transportCreated(ServerTransport transport) { MockServerTransportListener listener = new MockServerTransportListener(transport); listeners.add(listener); return listener; }
public MockServerTransportListener(ServerTransport transport) { this.transport = transport; }