/** * 初始化连接池 */ public void init() { bootstrap = new Bootstrap(); eventLoopGroup = new NioEventLoopGroup(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new LoggingHandler()); //所有的公用一个eventloopgroup, 对于客户端来说应该问题不大! poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() { @Override protected FixedChannelPool newPool(InetSocketAddress key) { return new FixedChannelPool(bootstrap.remoteAddress(key), new FixedChannelPoolHandler(), 2); } }; //预先建立好链接 serverListConfig.getAddressList().stream().forEach(address -> { poolMap.get(address); }); }
/** * @return */ private FixedChannelPool choose() { counter.incrementAndGet(); AbstractChannelPoolMap temp = ((AbstractChannelPoolMap) poolMap); int size = temp.size(); int index = counter.get() % size; Iterator<Map.Entry<InetSocketAddress, FixedChannelPool>> it = temp.iterator(); int i = 0; while (it.hasNext()) { i++; if (i == index) { return it.next().getValue(); } } return null; }
/** * Create a capped {@link PoolResources} to provide automatically for {@link * ChannelPool}. * <p>A Fixed {@link PoolResources} will open up to the given max connection value. * Further connections will be pending acquisition indefinitely. * * @param name the channel pool map name * @param maxConnections the maximum number of connections before starting pending * @param acquireTimeout the maximum time in millis to wait for aquiring * * @return a new {@link PoolResources} to provide automatically for {@link * ChannelPool} */ static PoolResources fixed(String name, int maxConnections, long acquireTimeout) { if (maxConnections == -1) { return elastic(name); } if (maxConnections <= 0) { throw new IllegalArgumentException("Max Connections value must be strictly " + "positive"); } if (acquireTimeout != -1L && acquireTimeout < 0) { throw new IllegalArgumentException("Acquire Timeout value must " + "be " + "positive"); } return new DefaultPoolResources(name, (bootstrap, handler, checker) -> new FixedChannelPool(bootstrap, handler, checker, FixedChannelPool.AcquireTimeoutAction.FAIL, acquireTimeout, maxConnections, Integer.MAX_VALUE )); }
FastdfsPool(Bootstrap bootstrap, long readTimeout, long idleTimeout, int maxConnPerHost) { this.channelPool = new FixedChannelPool( bootstrap, new FastdfsPoolHandler(readTimeout, idleTimeout), maxConnPerHost ); }
@SuppressWarnings("unchecked") public void close() { Iterator<Map.Entry<InetSocketAddress, FixedChannelPool>> it = ((AbstractChannelPoolMap) poolMap).iterator(); while (it.hasNext()) { it.next().getValue().close(); } eventLoopGroup.shutdownGracefully(); }
public NettyHttpClient(ClientConfiguration configuration) { ThreadGroup threadGroup = new ThreadGroup("Netty RxS3 client"); AtomicInteger threadCounter = new AtomicInteger(); ThreadFactory threadFactory = r -> new Thread(threadGroup, r, "RxS3-client-worker" + threadCounter.getAndIncrement()); group = new NioEventLoopGroup(configuration.getWorkerThreadCount(), threadFactory); String[] s3LocationArray = configuration.getS3Location().trim().split(":"); s3Location = s3LocationArray[0]; int port = 80; if (s3LocationArray.length == 2) { port = Integer.parseInt(s3LocationArray[1]); } demultiplexer = new HandlerDemultiplexer(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.getConnectionTimeoutMillis()) .channel(NioSocketChannel.class) .remoteAddress(s3Location, port); channelPool = new FixedChannelPool(bootstrap, new AbstractChannelPoolHandler() { HttpClientInitializer initializer = new HttpClientInitializer(demultiplexer); @Override public void channelCreated(Channel ch) { initializer.initChannel(ch); } }, ChannelHealthChecker.ACTIVE, FixedChannelPool.AcquireTimeoutAction.FAIL, configuration.getAcquireTimeoutMillis(), configuration.getMaxConnections(), configuration.getMaxPendingAcquires()); }
public int acquiredConnections() { try { Field acquiredChannelCount = FixedChannelPool.class.getDeclaredField("acquiredChannelCount"); acquiredChannelCount.setAccessible(true); return (int) acquiredChannelCount.get(channelPool); } catch (Exception e) { throw new RuntimeException(e); } }
FastdfsPool(Bootstrap bootstrap, long readTimeout, long idleTimeout, int maxConnPerHost) { this.channelPool = new FixedChannelPool(bootstrap, new FastdfsPoolHandler(readTimeout, idleTimeout), maxConnPerHost); }