public void testResponseProcessing() throws Exception { ContentDecoder contentDecoder = mock(ContentDecoder.class); IOControl ioControl = mock(IOControl.class); HttpContext httpContext = mock(HttpContext.class); HeapBufferedAsyncResponseConsumer consumer = spy(new HeapBufferedAsyncResponseConsumer(TEST_BUFFER_LIMIT)); ProtocolVersion protocolVersion = new ProtocolVersion("HTTP", 1, 1); StatusLine statusLine = new BasicStatusLine(protocolVersion, 200, "OK"); HttpResponse httpResponse = new BasicHttpResponse(statusLine); httpResponse.setEntity(new StringEntity("test", ContentType.TEXT_PLAIN)); //everything goes well consumer.responseReceived(httpResponse); consumer.consumeContent(contentDecoder, ioControl); consumer.responseCompleted(httpContext); verify(consumer).releaseResources(); verify(consumer).buildResult(httpContext); assertTrue(consumer.isDone()); assertSame(httpResponse, consumer.getResult()); consumer.responseCompleted(httpContext); verify(consumer, times(1)).releaseResources(); verify(consumer, times(1)).buildResult(httpContext); }
public void contentAvailable(ContentDecoder decoder, IOControl ioctrl) throws IOException { long transferred; if(useFileChannels && decoder instanceof FileContentDecoder) { transferred = ((FileContentDecoder) decoder).transfer( fileChannel, idx, Long.MAX_VALUE); } else { transferred = fileChannel.transferFrom( new ContentDecoderChannel(decoder), idx, Integer.MAX_VALUE); } if(transferred > 0) idx += transferred; }
public void consumeContent( final ContentDecoder decoder, final IOControl ioctrl) throws IOException { synchronized (this.httpExchange) { this.httpExchange.setClientIOControl(ioctrl); // Receive data from the client ByteBuffer buf = this.httpExchange.getInBuffer(); int n = decoder.read(buf); System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + n + " bytes read"); ConsoleFactory.printToConsole("[client->proxy] " + this.httpExchange.getId() + " " + n + " bytes read",true); if (decoder.isCompleted()) { System.out.println("[client->proxy] " + this.httpExchange.getId() + " content fully read"); ConsoleFactory.printToConsole("[client->proxy] " + this.httpExchange.getId() + " content fully read",true); } // If the buffer is full, suspend client input until there is free // space in the buffer if (!buf.hasRemaining()) { ioctrl.suspendInput(); System.out.println("[client->proxy] " + this.httpExchange.getId() + " suspend client input"); ConsoleFactory.printToConsole("[client->proxy] " + this.httpExchange.getId() + " suspend client input",true); } // If there is some content in the input buffer make sure origin // output is active if (buf.position() > 0) { if (this.httpExchange.getOriginIOControl() != null) { this.httpExchange.getOriginIOControl().requestOutput(); System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output"); ConsoleFactory.printToConsole("[client->proxy] " + this.httpExchange.getId() + " request origin output",true); } } } }
public void consumeContent( final ContentDecoder decoder, final IOControl ioctrl) throws IOException { synchronized (this.httpExchange) { this.httpExchange.setOriginIOControl(ioctrl); // Receive data from the origin ByteBuffer buf = this.httpExchange.getOutBuffer(); int n = decoder.read(buf); System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + n + " bytes read"); ConsoleFactory.printToConsole("[proxy<-origin] " + this.httpExchange.getId() + " " + n + " bytes read",true); if (decoder.isCompleted()) { System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " content fully read"); ConsoleFactory.printToConsole("[proxy<-origin] " + this.httpExchange.getId() + " content fully read",true); } // If the buffer is full, suspend origin input until there is free // space in the buffer if (!buf.hasRemaining()) { ioctrl.suspendInput(); System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " suspend origin input"); ConsoleFactory.printToConsole("[proxy<-origin] " + this.httpExchange.getId() + " suspend origin input",true); } // If there is some content in the input buffer make sure client // output is active if (buf.position() > 0) { if (this.httpExchange.getClientIOControl() != null) { this.httpExchange.getClientIOControl().requestOutput(); System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output"); ConsoleFactory.printToConsole("[proxy<-origin] " + this.httpExchange.getId() + " request client output",true); } } } }
@Override protected void onContentReceived(ContentDecoder decoder, IOControl ioctrl) throws IOException { this.buf.consumeContent(decoder); }
public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) { System.out.println(conn + " [client->proxy] input ready"); HttpContext context = conn.getContext(); ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB); synchronized (proxyTask) { ConnState connState = proxyTask.getClientState(); if (connState != ConnState.REQUEST_RECEIVED && connState != ConnState.REQUEST_BODY_STREAM) { throw new IllegalStateException("Illegal client connection state: " + connState); } try { ByteBuffer dst = proxyTask.getInBuffer(); int bytesRead = decoder.read(dst); System.out.println(conn + " [client->proxy] " + bytesRead + " bytes read"); System.out.println(conn + " [client->proxy] " + decoder); if (!dst.hasRemaining()) { // Input buffer is full. Suspend client input // until the origin handler frees up some space in the buffer conn.suspendInput(); } // If there is some content in the input buffer make sure origin // output is active if (dst.position() > 0) { if (proxyTask.getOriginIOControl() != null) { proxyTask.getOriginIOControl().requestOutput(); } } if (decoder.isCompleted()) { System.out.println(conn + " [client->proxy] request body received"); // Update connection state proxyTask.setClientState(ConnState.REQUEST_BODY_DONE); // Suspend client input conn.suspendInput(); } else { proxyTask.setClientState(ConnState.REQUEST_BODY_STREAM); } } catch (IOException ex) { shutdownConnection(conn); } } }
public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) { System.out.println(conn + " [proxy<-origin] input ready"); HttpContext context = conn.getContext(); ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB); synchronized (proxyTask) { ConnState connState = proxyTask.getOriginState(); if (connState != ConnState.RESPONSE_RECEIVED && connState != ConnState.RESPONSE_BODY_STREAM) { throw new IllegalStateException("Illegal target connection state: " + connState); } HttpResponse response = proxyTask.getResponse(); try { ByteBuffer dst = proxyTask.getOutBuffer(); int bytesRead = decoder.read(dst); System.out.println(conn + " [proxy<-origin] " + bytesRead + " bytes read"); System.out.println(conn + " [proxy<-origin] " + decoder); if (!dst.hasRemaining()) { // Output buffer is full. Suspend origin input until // the client handler frees up some space in the buffer conn.suspendInput(); } // If there is some content in the buffer make sure client output // is active if (dst.position() > 0) { proxyTask.getClientIOControl().requestOutput(); } if (decoder.isCompleted()) { System.out.println(conn + " [proxy<-origin] response body received"); proxyTask.setOriginState(ConnState.RESPONSE_BODY_DONE); if (!this.connStrategy.keepAlive(response, context)) { System.out.println(conn + " [proxy<-origin] close connection"); proxyTask.setOriginState(ConnState.CLOSING); conn.close(); } } else { proxyTask.setOriginState(ConnState.RESPONSE_BODY_STREAM); } } catch (IOException ex) { shutdownConnection(conn); } } }
public void inputReady(NHttpClientConnection conn, ContentDecoder decoder) { this.handler.inputReady(conn, decoder); }
@Override protected void onContentReceived(final ContentDecoder decoder, final IOControl ioctrl) throws IOException { Asserts.notNull(this.buf, "Content buffer"); this.buf.consumeContent(decoder); }