public static Constructor<?> getClientConstructor(Class<?> svcInterface) { String client = svcInterface.getName().indexOf("Async") > 0 ? ASYNC_CLIENT_NAME : CLIENT_NAME; Class<?>[] args = svcInterface.getName().indexOf("Async") > 0 ? new Class[]{TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class} : new Class[]{TProtocol.class}; Class<?> clientClass = getThriftServiceInnerClassOrNull(svcInterface.getEnclosingClass(), client, false); if (clientClass == null) { throw new ThriftRuntimeException("the client class is null"); } Constructor<?> constructor = ClassUtils.getConstructorIfAvailable(clientClass, args); if (constructor == null) { throw new ThriftRuntimeException("the clientClass constructor is null"); } return constructor; }
@Override protected FrameBuffer createFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) { TrackingFrameBuffer frameBuffer = new TrackingFrameBuffer(trans, selectionKey, selectThread); if (trans instanceof TNonblockingSocket) { try { SocketChannel socketChannel = ((TNonblockingSocket) trans).getSocketChannel(); InetAddress addr = ((InetSocketAddress) socketChannel.getRemoteAddress()).getAddress(); clientAddresses.put(frameBuffer.getInputFramedTransport(), addr); } catch (IOException e) { log.warn("Exception while tracking client address", e); clientAddresses.remove(frameBuffer.getInputFramedTransport()); } } else { log.warn("Unknown TNonblockingTransport instance: {}", trans.getClass().getName()); clientAddresses.remove(frameBuffer.getInputFramedTransport()); } return frameBuffer; }
@Override @SuppressWarnings("unchecked") public <X extends TAsyncClient> X getClient(final Class<X> clazz) { return (X) super.clients.computeIfAbsent(ClassNameUtils.getOuterClassName(clazz), (className) -> { TProtocolFactory protocolFactory = (TProtocolFactory) tTransport -> { TProtocol protocol = new TBinaryProtocol(tTransport); return new TMultiplexedProtocol(protocol, className); }; try { return clazz.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class) .newInstance(protocolFactory, this.clientManager, this.transport); } catch (Throwable e) { if (e instanceof UnresolvedAddressException) { this.isOpen = false; } return null; } }); }
/** * Accept a new connection. */ private void handleAccept() { final TNonblockingTransport client = doAccept(); if (client != null) { // Pass this connection to a selector thread final SelectorThread targetThread = threadChooser.nextThread(); if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) { doAddAccept(targetThread, client); } else { // FAIR_ACCEPT try { invoker.submit(new Runnable() { public void run() { doAddAccept(targetThread, client); } }); } catch (RejectedExecutionException rx) { LOGGER.warn("ExecutorService rejected accept registration!", rx); // close immediately client.close(); } } } }
/** * Hands off an accepted connection to be handled by this thread. This * method will block if the queue for new connections is at capacity. * * @param accepted * The connection that has been accepted. * @return true if the connection has been successfully added. */ public boolean addAcceptedConnection(TNonblockingTransport accepted) { try { while (!acceptedQueue.offer(accepted, 200, TimeUnit.MILLISECONDS)) { // If server is stopped, then return false. if (stopped_) { return false; } } } catch (InterruptedException e) { LOGGER.warn("Interrupted while adding accepted connection!", e); return false; } selector.wakeup(); return true; }
public FrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread) { trans_ = trans; selectionKey_ = selectionKey; selectThread_ = selectThread; buffer_ = ByteBuffer.allocate(4); frameTrans_ = new TMemoryInputTransport(); response_ = new TByteArrayOutputStream(); inTrans_ = inputTransportFactory_.getTransport(frameTrans_); outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_)); inProt_ = inputProtocolFactory_.getProtocol(inTrans_); outProt_ = outputProtocolFactory_.getProtocol(outTrans_); if (eventHandler_ != null) { context_ = eventHandler_.createContext(inProt_, outProt_); } else { context_ = null; } }
private void registerAccepted(TNonblockingTransport accepted) { SelectionKey clientKey = null; try { clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ); FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this); clientKey.attach(frameBuffer); } catch (IOException e) { LOGGER.warn("Failed to register accepted connection to selector!", e); if (clientKey != null) { cleanupSelectionKey(clientKey); } accepted.close(); } }
@Override protected void processKey(SelectionKey key) throws IOException { if (!key.isAcceptable()) return; try { // accept the connection SelectorThread selector = selectorLoadBalancer.nextSelector(); selector.subscribe((TNonblockingTransport) serverTransport.accept()); selector.wakeupSelector(); } catch (TTransportException tte) { // accept() shouldn't be NULL if fine because are are raising for a socket logger.debug("Non-fatal exception trying to accept!", tte); } }
/** * 获取客户端AsyncService对象 */ @Override public AsyncService getAsyncService(TNonblockingTransport transport, String serviceName) throws STException { if (transport == null) throw new STException("'transport' is null !"); try { return StringUtil.isEmpty(serviceName) ? new AsyncServiceClientImpl((TProtocolFactory) new TCompactProtocol.Factory(), transport) : new AsyncServiceClientImpl(new AsyncMultiplexedProtocolFactory(serviceName), transport); } catch (IOException e) { throw new STException(e); } }
@Override public T makeObject(InetSocketAddress socket) throws Exception { TNonblockingTransport nbTr = new TNonblockingSocket( socket.getAddress().getHostAddress(), socket.getPort()); TProtocolFactory factory = new TBinaryProtocol.Factory(); T client = maker.create(nbTr, clientManager, factory); transports.put(client, nbTr); return client; }
private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) { if (queueSize == 0) { // Unbounded queue return new LinkedBlockingQueue<TNonblockingTransport>(); } return new ArrayBlockingQueue<TNonblockingTransport>(queueSize); }
private TNonblockingTransport doAccept() { try { return (TNonblockingTransport) serverTransport.accept(); } catch (TTransportException tte) { // something went wrong accepting. LOGGER.warn("Exception trying to accept!", tte); return null; } }
private void processAcceptedConnections() { // Register accepted connections while (!stopped_) { TNonblockingTransport accepted = acceptedQueue.poll(); if (accepted == null) { break; } registerAccepted(accepted); } }
private void registerAccepted(TNonblockingTransport accepted) { SelectionKey clientKey = null; try { clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ); FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this); clientKey.attach(frameBuffer); } catch (IOException e) { LOGGER.warn("Failed to register accepted connection to selector!", e); if (clientKey != null) { cleanupSelectionKey(clientKey); } accepted.close(); } }
/** * Hands off an accepted connection to be handled by this thread. This * method will block if the queue for new connections is at capacity. * * @param accepted * The connection that has been accepted. * @return true if the connection has been successfully added. */ public boolean addAcceptedConnection(TNonblockingTransport accepted) { try { acceptedQueue.put(accepted); } catch (InterruptedException e) { LOGGER.warn("Interrupted while adding accepted connection!", e); return false; } selector.wakeup(); return true; }
protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread) { return processorFactory_.isAsyncProcessor() ? new AsyncFrameBuffer(trans, selectionKey, selectThread) : new FrameBuffer(trans, selectionKey, selectThread); }
protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) { this.transport = transport; this.callback = callback; this.protocolFactory = protocolFactory; this.client = client; this.isOneway = isOneway; this.sequenceId = TAsyncMethodCall.sequenceIdCounter.getAndIncrement(); this.timeout = client.getTimeout(); }
public Message(TNonblockingTransport trans, SelectionKey key, ThriftFactories factories, boolean heapBasedAllocation) { frameSizeBuffer = Buffer.allocate(4, heapBasedAllocation); transport = trans; selectionKey = key; thriftFactories = factories; useHeapBasedAllocation = heapBasedAllocation; }
@Override protected void selectorIterationComplete() throws IOException { TNonblockingTransport newClient; while ((newClient = newConnections.poll()) != null) { SelectionKey clientKey = newClient.registerSelector(selector, SelectionKey.OP_READ); clientKey.attach(new Message(newClient, clientKey, thriftFactories, useHeapBasedAllocation)); } }
public int writeTo(TNonblockingTransport transport, int start, int count) throws IOException { ByteBuffer dup = buffer.duplicate(); dup.position(start).limit(start + count); return transport.write(dup); }
@Override public void after(Object target, Object[] args, Object result, Throwable throwable) { if (isDebug) { logger.afterInterceptor(target, args, result, throwable); } if (validate(target)) { TNonblockingTransport transport = ((TNonblockingTransportFieldGetter)target)._$PINPOINT$_getTNonblockingTransport(); if (validateTransport(transport)) { SocketAddress socketAddress = ((SocketAddressFieldAccessor)transport)._$PINPOINT$_getSocketAddress(); ((SocketAddressFieldAccessor)target)._$PINPOINT$_setSocketAddress(socketAddress); } } }
protected final Socket getRootSocket(Object target) { if (target instanceof TNonblockingTransportFieldGetter) { TNonblockingTransport inTrans = ((TNonblockingTransportFieldGetter) target)._$PINPOINT$_getTNonblockingTransport(); if (inTrans != null) { if (inTrans instanceof SocketFieldAccessor) { return ((SocketFieldAccessor) inTrans)._$PINPOINT$_getSocket(); } else { if (isDebug) { logger.debug("Invalid target object. Need field accessor({}).", SocketFieldAccessor.class.getName()); } } } } return null; }
public FrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread) { trans_ = trans; selectionKey_ = selectionKey; selectThread_ = selectThread; buffer_ = ByteBuffer.allocate(4); }
protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) { this.transport = transport; this.callback = callback; this.protocolFactory = protocolFactory; this.client = client; this.isOneway = isOneway; this.sequenceId = TAsyncMethodCall.sequenceIdCounter.getAndIncrement(); }