public void testOnExceptionStreamReset() throws Exception { getMockEndpoint("mock:middle").expectedMessageCount(1); getMockEndpoint("mock:middle").message(0).exchangeProperty(Exchange.EXCEPTION_CAUGHT).isInstanceOf(IllegalArgumentException.class); getMockEndpoint("mock:end").expectedMessageCount(1); getMockEndpoint("mock:end").message(0).exchangeProperty(Exchange.EXCEPTION_CAUGHT).isInstanceOf(IllegalArgumentException.class); InputStreamCache cache = new InputStreamCache(TEST_STRING.getBytes()); template.sendBody("direct:a", cache); assertMockEndpointsSatisfied(); // To make sure we can read something from the InputStream String result = getMockEndpoint("mock:end").getExchanges().get(0).getIn().getBody(String.class); assertTrue(result.contains("<firstName>James</firstName>")); }
public void testStreamCacheUnzip() throws Exception { context.addRoutes(new RouteBuilder() { public void configure() { from("direct:start") .streamCaching() .marshal().zip() .unmarshal().zip() .to("mock:result"); } }); context.start(); MockEndpoint result = context.getEndpoint("mock:result", MockEndpoint.class); result.expectedBodiesReceived(TEXT); sendText(); result.assertIsSatisfied(); List<Exchange> exchangeList = result.getExchanges(); assertTrue(exchangeList.get(0).getIn().getBody() instanceof InputStreamCache); }
@Test public void testZipAndStreamCaching() throws Exception { MockEndpoint mock = getMockEndpoint("mock:zipStreamCache"); mock.setExpectedMessageCount(1); template.sendBody("direct:zipStreamCache", TEXT); assertMockEndpointsSatisfied(); Exchange exchange = mock.getReceivedExchanges().get(0); assertEquals(exchange.getIn().getMessageId() + ".zip", exchange.getIn().getHeader(FILE_NAME)); assertIsInstanceOf(InputStreamCache.class, exchange.getIn().getBody()); assertArrayEquals(getZippedText(exchange.getIn().getMessageId()), exchange.getIn().getMandatoryBody(byte[].class)); }
@Test public void testStreamCaching() throws Exception { final URI uri = create(TestUtils.baseUrl); final int status = 200; final String rdfConcat = StringUtils.repeat(TestUtils.rdfXml, 10000); final ByteArrayInputStream body = new ByteArrayInputStream(rdfConcat.getBytes()); final FcrepoResponse headResponse = new FcrepoResponse(uri, 200, emptyMap(), null); final FcrepoResponse getResponse = new FcrepoResponse(uri, status, singletonMap(CONTENT_TYPE, singletonList(TestUtils.RDF_XML)), body); init(); testExchange.getIn().setHeader(FCREPO_IDENTIFIER, "/foo"); testExchange.setProperty(DISABLE_HTTP_STREAM_CACHE, false); testExchange.getContext().getStreamCachingStrategy().setSpoolThreshold(1024); testExchange.getContext().getStreamCachingStrategy().setBufferSize(256); testExchange.getContext().setStreamCaching(true); when(mockHeadBuilder.perform()).thenReturn(headResponse); when(mockGetBuilder.perform()).thenReturn(getResponse); testProducer.process(testExchange); assertEquals(true, testExchange.getContext().isStreamCaching()); assertNotNull(testExchange.getIn().getBody(InputStreamCache.class)); assertEquals(rdfConcat.length(), testExchange.getIn().getBody(InputStreamCache.class).length()); assertEquals(rdfConcat.length(), testExchange.getIn().getBody(InputStreamCache.class).length()); assertEquals(testExchange.getIn().getHeader(CONTENT_TYPE, String.class), TestUtils.RDF_XML); assertEquals(testExchange.getIn().getHeader(HTTP_RESPONSE_CODE), status); }