public void testResponseProcessing() throws Exception { ContentDecoder contentDecoder = mock(ContentDecoder.class); IOControl ioControl = mock(IOControl.class); HttpContext httpContext = mock(HttpContext.class); HeapBufferedAsyncResponseConsumer consumer = spy(new HeapBufferedAsyncResponseConsumer(TEST_BUFFER_LIMIT)); ProtocolVersion protocolVersion = new ProtocolVersion("HTTP", 1, 1); StatusLine statusLine = new BasicStatusLine(protocolVersion, 200, "OK"); HttpResponse httpResponse = new BasicHttpResponse(statusLine); httpResponse.setEntity(new StringEntity("test", ContentType.TEXT_PLAIN)); //everything goes well consumer.responseReceived(httpResponse); consumer.consumeContent(contentDecoder, ioControl); consumer.responseCompleted(httpContext); verify(consumer).releaseResources(); verify(consumer).buildResult(httpContext); assertTrue(consumer.isDone()); assertSame(httpResponse, consumer.getResult()); consumer.responseCompleted(httpContext); verify(consumer, times(1)).releaseResources(); verify(consumer, times(1)).buildResult(httpContext); }
@Override public synchronized void produceContent(ContentEncoder encoder, IOControl ioctrl) throws IOException { checkCanceled(isCanceled); byte[] tmp = new byte[4096]; int len =; ByteBuffer buffer = ByteBuffer.wrap(tmp, 0, len); encoder.write(buffer); writeLength += len; if (httpListener != null) { httpListener.onHttpWrite(writeLength, contentLength); } checkCanceled(isCanceled); }
@Override protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException { try { char[] charArray = new char[buf.remaining()]; System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining()); CharBuffer charBuffer = CharBuffer.wrap(charArray); ParseStream.blockingQueue.put(new AbstractMap.SimpleEntry<String, CharBuffer>(clientUri, charBuffer)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
public void contentAvailable(ContentDecoder decoder, IOControl ioctrl) throws IOException { long transferred; if(useFileChannels && decoder instanceof FileContentDecoder) { transferred = ((FileContentDecoder) decoder).transfer( fileChannel, idx, Long.MAX_VALUE); } else { transferred = fileChannel.transferFrom( new ContentDecoderChannel(decoder), idx, Integer.MAX_VALUE); } if(transferred > 0) idx += transferred; }
public void consumeContent( final ContentDecoder decoder, final IOControl ioctrl) throws IOException { synchronized (this.httpExchange) { this.httpExchange.setClientIOControl(ioctrl); // Receive data from the client ByteBuffer buf = this.httpExchange.getInBuffer(); int n =; System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + n + " bytes read"); ConsoleFactory.printToConsole("[client->proxy] " + this.httpExchange.getId() + " " + n + " bytes read",true); if (decoder.isCompleted()) { System.out.println("[client->proxy] " + this.httpExchange.getId() + " content fully read"); ConsoleFactory.printToConsole("[client->proxy] " + this.httpExchange.getId() + " content fully read",true); } // If the buffer is full, suspend client input until there is free // space in the buffer if (!buf.hasRemaining()) { ioctrl.suspendInput(); System.out.println("[client->proxy] " + this.httpExchange.getId() + " suspend client input"); ConsoleFactory.printToConsole("[client->proxy] " + this.httpExchange.getId() + " suspend client input",true); } // If there is some content in the input buffer make sure origin // output is active if (buf.position() > 0) { if (this.httpExchange.getOriginIOControl() != null) { this.httpExchange.getOriginIOControl().requestOutput(); System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output"); ConsoleFactory.printToConsole("[client->proxy] " + this.httpExchange.getId() + " request origin output",true); } } } }
public void produceContent( final ContentEncoder encoder, final IOControl ioctrl) throws IOException { synchronized (this.httpExchange) { this.httpExchange.setOriginIOControl(ioctrl); // Send data to the origin server ByteBuffer buf = this.httpExchange.getInBuffer(); buf.flip(); int n = encoder.write(buf); buf.compact(); System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + n + " bytes written"); ConsoleFactory.printToConsole("[proxy->origin] " + this.httpExchange.getId() + " " + n + " bytes written",true); // If there is space in the buffer and the message has not been // transferred, make sure the client is sending more data if (buf.hasRemaining() && !this.httpExchange.isRequestReceived()) { if (this.httpExchange.getClientIOControl() != null) { this.httpExchange.getClientIOControl().requestInput(); System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request client input"); ConsoleFactory.printToConsole("[proxy->origin] " + this.httpExchange.getId() + " request client input",true); } } if (buf.position() == 0) { if (this.httpExchange.isRequestReceived()) { encoder.complete(); System.out.println("[proxy->origin] " + this.httpExchange.getId() + " content fully written"); ConsoleFactory.printToConsole("[proxy->origin] " + this.httpExchange.getId() + " content fully written",true); } else { // Input buffer is empty. Wait until the client fills up // the buffer ioctrl.suspendOutput(); System.out.println("[proxy->origin] " + this.httpExchange.getId() + " suspend origin output"); ConsoleFactory.printToConsole("[proxy->origin] " + this.httpExchange.getId() + " suspend origin output",true); } } } }
public void consumeContent( final ContentDecoder decoder, final IOControl ioctrl) throws IOException { synchronized (this.httpExchange) { this.httpExchange.setOriginIOControl(ioctrl); // Receive data from the origin ByteBuffer buf = this.httpExchange.getOutBuffer(); int n =; System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + n + " bytes read"); ConsoleFactory.printToConsole("[proxy<-origin] " + this.httpExchange.getId() + " " + n + " bytes read",true); if (decoder.isCompleted()) { System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " content fully read"); ConsoleFactory.printToConsole("[proxy<-origin] " + this.httpExchange.getId() + " content fully read",true); } // If the buffer is full, suspend origin input until there is free // space in the buffer if (!buf.hasRemaining()) { ioctrl.suspendInput(); System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " suspend origin input"); ConsoleFactory.printToConsole("[proxy<-origin] " + this.httpExchange.getId() + " suspend origin input",true); } // If there is some content in the input buffer make sure client // output is active if (buf.position() > 0) { if (this.httpExchange.getClientIOControl() != null) { this.httpExchange.getClientIOControl().requestOutput(); System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output"); ConsoleFactory.printToConsole("[proxy<-origin] " + this.httpExchange.getId() + " request client output",true); } } } }
public void produceContent( final ContentEncoder encoder, final IOControl ioctrl) throws IOException { synchronized (this.httpExchange) { this.httpExchange.setClientIOControl(ioctrl); // Send data to the client ByteBuffer buf = this.httpExchange.getOutBuffer(); buf.flip(); int n = encoder.write(buf); buf.compact(); System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + n + " bytes written"); ConsoleFactory.printToConsole("[client<-proxy] " + this.httpExchange.getId() + " " + n + " bytes written",true); // If there is space in the buffer and the message has not been // transferred, make sure the origin is sending more data if (buf.hasRemaining() && !this.httpExchange.isResponseReceived()) { if (this.httpExchange.getOriginIOControl() != null) { this.httpExchange.getOriginIOControl().requestInput(); System.out.println("[client<-proxy] " + this.httpExchange.getId() + " request origin input"); ConsoleFactory.printToConsole("[client<-proxy] " + this.httpExchange.getId() + " request origin input",true); } } if (buf.position() == 0) { if (this.httpExchange.isResponseReceived()) { encoder.complete(); System.out.println("[client<-proxy] " + this.httpExchange.getId() + " content fully written"); ConsoleFactory.printToConsole("[client<-proxy] " + this.httpExchange.getId() + " content fully written",true); } else { // Input buffer is empty. Wait until the origin fills up // the buffer ioctrl.suspendOutput(); System.out.println("[client<-proxy] " + this.httpExchange.getId() + " suspend client output"); ConsoleFactory.printToConsole("[client<-proxy] " + this.httpExchange.getId() + " suspend client output",true); } } } }
@Override protected void onByteReceived(ByteBuffer buffer, IOControl control) throws IOException { checkCanceled(isCanceled); byte[] tmp = new byte[buffer.remaining()]; buffer.get(tmp); httpOutStream.write(tmp); readLength += tmp.length; if (httpListener != null) { httpListener.onHttpRead(readLength, contentLength); } checkCanceled(isCanceled); }
@Override protected void onContentReceived(ContentDecoder decoder, IOControl ioctrl) throws IOException { this.buf.consumeContent(decoder); }
@Override protected void onByteReceived(ByteBuffer buf, IOControl ioctrl) throws IOException { while (buf.hasRemaining()) outputStream.write(buf.get()); }
@Override protected void onCharReceived(CharBuffer charBuffer, IOControl ioControl) throws IOException { sb.append(charBuffer.toString()); }
@Override protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException { while (buf.hasRemaining()) { System.out.print(buf.get()); } }
public IOControl getClientIOControl() { return this.clientIOControl; }
public void setClientIOControl(final IOControl clientIOControl) { this.clientIOControl = clientIOControl; }
public IOControl getOriginIOControl() { return this.originIOControl; }
public void setOriginIOControl(final IOControl originIOControl) { this.originIOControl = originIOControl; }
@Override protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException { this.url.result += buf.toString(); }
@Override protected void onContentReceived(final ContentDecoder decoder, final IOControl ioctrl) throws IOException { Asserts.notNull(this.buf, "Content buffer"); this.buf.consumeContent(decoder); }
@Override public void produceContent(final ContentEncoder encoder, final IOControl ioctrl) throws IOException { writer = producerFunc.apply(encoder); writer.write(); }
@Override protected void onByteReceived(final ByteBuffer buf, final IOControl ioctrl) { }
@Override public synchronized void produceContent(final ContentEncoder encoder, final IOControl ioctrl) throws IOException { final boolean first; if (this.mItemIterator == null) { this.mItemIterator = this.mItems.iterator(); first = true; } else { first = false; } if (this.mItem == null && this.mItemIterator.hasNext()) { this.mItem =; this.mMultipartHeaderIndex = 0; } if (this.mItem != null) { if (!this.writeHeader(encoder, ioctrl, this.mItem, first)) { return; } if (this.mFileChannel == null) { this.mFile = new RandomAccessFile(this.mItem.getFile(), "r"); this.mFileChannel = this.mFile.getChannel(); this.mFilePosition = 0; } final long transferred; if (encoder instanceof FileContentEncoder) { transferred = ((FileContentEncoder) encoder).transfer(this.mFileChannel, this.mFilePosition, Integer.MAX_VALUE); } else { transferred = this.mFileChannel.transferTo(this.mFilePosition, Integer.MAX_VALUE, new ContentEncoderChannel(encoder)); } if (transferred > 0) { this.mFilePosition += transferred; } if (this.mFilePosition >= this.mFileChannel.size()) { IOUtils.closeQuietly(this.mFileChannel); IOUtils.closeQuietly(this.mFile); this.mFileChannel = null; this.mFile = null; this.mItem = null; } } if (this.mItem == null && !this.mItemIterator.hasNext() && this.writeFooter(encoder, ioctrl)) { encoder.complete(); } }