@Test public void testSendBinaryDataWithContentType() throws Exception { try (ConfigurableApplicationContext context = SpringApplication.run( SourceApplication.class, "--server.port=0", "--spring.jmx.enabled=false", "--spring.cloud.stream.bindings.output.contentType=image/jpeg")) { MessageCollector collector = context.getBean(MessageCollector.class); Source source = context.getBean(Source.class); byte[] data = new byte[] { 0, 1, 2, 3 }; source.output().send(MessageBuilder.withPayload(data) .build()); Message<byte[]> message = (Message<byte[]>) collector .forChannel(source.output()).poll(1, TimeUnit.SECONDS); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) .includes(MimeTypeUtils.IMAGE_JPEG)); assertThat(message.getPayload()).isEqualTo(data); } }
@Override public Mono<Void> fireAndForget(Payload payload) { JsonNode metadata = readConnectionMetadata(payload.getMetadataUtf8()); try{ MethodHandler handler = handlerFor(metadata); Converter converter = converterFor(MimeType.valueOf(metadata.get("MIME_TYPE").textValue())); Object converted = converter.read(ServiceUtils.toByteArray(payload.getData()), getActualType(handler.getInfo().getParameterType())); handler.invoke(handler.getInfo().buildInvocationArguments(converted, null)); return Mono.empty(); }catch (Exception e){ return Mono.error(e); } }
@Override public Mono<Payload> requestResponse(Payload payload) { JsonNode metadata = readConnectionMetadata(payload.getMetadataUtf8()); try { MethodHandler handler = handlerFor(metadata); Converter converter = converterFor(MimeType.valueOf(metadata.get("MIME_TYPE").textValue())); Object converted = converter.read(ServiceUtils.toByteArray(payload.getData()), getActualType(handler.getInfo().getParameterType())); Object result = handler.invoke(handler.getInfo().buildInvocationArguments(converted, null)); Mono monoResult = monoOF(result); return monoResult.map(o -> { byte[] data = converter.write(o); return new PayloadImpl(data); }); }catch (Exception e){ return Mono.error(e); } }
@Override public Flux<Payload> requestStream(Payload payload) { JsonNode metadata = readConnectionMetadata(payload.getMetadataUtf8()); try { MethodHandler handler = handlerFor(metadata); Converter converter = converterFor(MimeType.valueOf(metadata.get("MIME_TYPE").textValue())); Object converted = converter.read(ServiceUtils.toByteArray(payload.getData()), getActualType(handler.getInfo().getParameterType())); Flux result = (Flux)handler.invoke(handler.getInfo().buildInvocationArguments(converted, null)); return result.map(o -> new PayloadImpl(converter.write(o)) ); } catch (Exception e){ return Flux.error(new ApplicationException("No path found for " + metadata.get("PATH").asText())); } }
@Override public Flux<Payload> requestChannel(Publisher<Payload> payloads) { Flux<Payload> flux = Flux.from(payloads); Payload headerPayload = flux.take(1).next().block(); JsonNode metadata = readConnectionMetadata(headerPayload.getMetadataUtf8()); try{ MethodHandler handler = handlerFor(metadata); Converter converter = converterFor(MimeType.valueOf(metadata.get("MIME_TYPE").textValue())); Flux converted = flux.repeat().map(payload -> { return converter.read(ServiceUtils.toByteArray(payload.getData()), getActualType( handler.getInfo().getParameterType())); }); Flux result = (Flux)handler.invoke(handler.getInfo().buildInvocationArguments(converted, null)); return result.map(o -> new PayloadImpl(converter.write(o)) ); }catch (Exception e){ return Flux.error(e); } }
@Override public MimeType resolve(MessageHeaders headers) { if (headers == null || headers.get(MessageHeaders.CONTENT_TYPE) == null) { return this.defaultMimeType; } Object value = headers.get(MessageHeaders.CONTENT_TYPE); if (value instanceof MimeType) { return (MimeType) value; } else if (value instanceof String) { return MimeType.valueOf((String) value); } else { throw new IllegalArgumentException( "Unknown type for contentType header value: " + value.getClass()); } }
@Test public void toMessage() throws Exception { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); MyBean payload = new MyBean(); payload.setString("Foo"); payload.setNumber(42); payload.setFraction(42F); payload.setArray(new String[]{"Foo", "Bar"}); payload.setBool(true); payload.setBytes(new byte[]{0x1, 0x2}); Message<?> message = converter.toMessage(payload, null); String actual = new String((byte[]) message.getPayload(), UTF_8); assertTrue(actual.contains("\"string\":\"Foo\"")); assertTrue(actual.contains("\"number\":42")); assertTrue(actual.contains("fraction\":42.0")); assertTrue(actual.contains("\"array\":[\"Foo\",\"Bar\"]")); assertTrue(actual.contains("\"bool\":true")); assertTrue(actual.contains("\"bytes\":\"AQI=\"")); assertEquals("Invalid content-type", new MimeType("application", "json", UTF_8), message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class)); }
@Test public void toMessageUtf16String() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setSerializedPayloadClass(String.class); Charset utf16 = Charset.forName("UTF-16BE"); MimeType contentType = new MimeType("application", "json", utf16); Map<String, Object> map = new HashMap<>(); map.put(MessageHeaders.CONTENT_TYPE, contentType); MessageHeaders headers = new MessageHeaders(map); String payload = "H\u00e9llo W\u00f6rld"; Message<?> message = converter.toMessage(payload, headers); assertEquals("\"" + payload + "\"", message.getPayload()); assertEquals(contentType, message.getHeaders().get(MessageHeaders.CONTENT_TYPE)); }
@Test public void handleErrorFrame() throws Exception { StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); accessor.setContentType(new MimeType("text", "plain", UTF_8)); accessor.addNativeHeader("foo", "bar"); accessor.setLeaveMutable(true); String payload = "Oops"; StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders()); when(this.sessionHandler.getPayloadType(stompHeaders)).thenReturn(String.class); this.session.handleMessage(MessageBuilder.createMessage(payload.getBytes(UTF_8), accessor.getMessageHeaders())); verify(this.sessionHandler).getPayloadType(stompHeaders); verify(this.sessionHandler).handleFrame(stompHeaders, payload); verifyNoMoreInteractions(this.sessionHandler); }
@Test public void send() throws Exception { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); String destination = "/topic/foo"; String payload = "sample payload"; this.session.send(destination, payload); Message<byte[]> message = this.messageCaptor.getValue(); StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); assertEquals(StompCommand.SEND, accessor.getCommand()); StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders()); assertEquals(stompHeaders.toString(), 2, stompHeaders.size()); assertEquals(destination, stompHeaders.getDestination()); assertEquals(new MimeType("text", "plain", UTF_8), stompHeaders.getContentType()); assertEquals(-1, stompHeaders.getContentLength()); // StompEncoder isn't involved assertEquals(payload, new String(message.getPayload(), UTF_8)); }
@Override public Object invoke(MethodInvocation invocation, StoreInvoker invoker) { String fromMimeType = null; fromMimeType = (String)BeanUtils.getFieldWithAnnotation(invocation.getArguments()[0], org.springframework.content.commons.annotations.MimeType.class); if (fromMimeType == null) { return null; } String toMimeType = (String) invocation.getArguments()[1]; if (this.canConvert(fromMimeType, toMimeType)) { InputStream content = null; try { content = invoker.invokeGetContent(); return (InputStream) this.convert(fromMimeType, content, toMimeType); } catch (Exception e) { LOGGER.error(String.format("Failed to get rendition from %s to %s", fromMimeType, toMimeType ), e); } } return null; }
public void read(String mimeType, InputStream is) throws IOException { Assert.hasText(mimeType, "MimeType string is null or empty."); Assert.notNull(is, "InputStream is null or empty."); MimeType mimeTypeObj = MimeTypeUtils.parseMimeType(mimeType); if(MimeTypeUtils.APPLICATION_JSON.equals(mimeTypeObj)) { Assert.hasText(mimeType, "MimeType '" + mimeType + "' is not supported."); } AppConfigObject aco = objectMapper.readValue(is, AppConfigObject.class); final String version = aco.getVersion(); if(!VERSION.equals(version)) { throw new RuntimeException("Unsupported version of config: " + version); } ConfigReadContext ctx = new ConfigReadContext(); Map<String, Object> map = aco.getData(); Assert.notNull(map, "config has empty map"); for(Map.Entry<String, Object> oe : map.entrySet()) { String name = oe.getKey(); ReConfigurableAdapter ca = adapters.get(name); Assert.notNull(ca, "Can not find adapter with name: " + name); Object value = oe.getValue(); Assert.notNull(value, "Config object is null for name: " + name); ca.setConfig(ctx, value); } }
/** * Convert message body to text if possible, otherwise throw exception. * @param body * @return */ public static String toPlainText(MailBody body) throws MailBadMessageException { MimeType mimeType = body.getMimeType(); boolean containsMime = false; for (MimeType type : mimeTypeSet) { containsMime = containsMime || type.includes(mimeType); } if(!containsMime) { throw new MailBadMessageException("Message contains body with unsupported contentType: " + mimeType); } try(Reader r = body.getReader()) { return IOUtils.toString(r); } catch (IOException e) { throw new MailBadMessageException(e); } }
protected MimeType getMimeType(RequestContext context) { List<Pair<String, String>> headers = context.getZuulResponseHeaders(); String contentType = null; for (Pair<String, String> pair : headers) { if ("content-type".equalsIgnoreCase(pair.first())) { contentType = pair.second(); break; } } if (contentType != null) { MimeType type = MimeType.valueOf(contentType); return type; } return null; }
protected void writeResponse(String responseBody, MimeType contentType) throws Exception { RequestContext context = RequestContext.getCurrentContext(); // there is no body to send if (responseBody == null || responseBody.isEmpty()) { return; } HttpServletResponse servletResponse = context.getResponse(); servletResponse.setCharacterEncoding("UTF-8"); servletResponse.setContentType(contentType.toString()); OutputStream outStream = servletResponse.getOutputStream(); InputStream is = null; try { writeResponse(new ByteArrayInputStream(responseBody.getBytes()), outStream); } finally { try { if (is != null) { is.close(); } outStream.flush(); outStream.close(); } catch (IOException ex) { } } }
@Nullable @Override public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) { if (!canConvertTo(payload, headers)) { return null; } byte[] payloadToUse = serialize(payload); MimeType mimeType = getDefaultContentType(payload); if (headers != null) { MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(headers, MessageHeaderAccessor.class); if (accessor != null && accessor.isMutable()) { if (mimeType != null) { accessor.setHeader(MessageHeaders.CONTENT_TYPE, mimeType); } return MessageBuilder.createMessage(payloadToUse, accessor.getMessageHeaders()); } } MessageBuilder<?> builder = MessageBuilder.withPayload(payloadToUse); if (headers != null) { builder.copyHeaders(headers); } if (mimeType != null) { builder.setHeader(MessageHeaders.CONTENT_TYPE, mimeType); } return builder.build(); }
protected MimeType mimeTypeFromObject(Object payload) { Assert.notNull(payload, "payload object cannot be null."); String className = payload.getClass().getName(); MimeType mimeType = mimeTypesCache.get(className); if (mimeType == null) { String modifiedClassName = className; if (payload.getClass().isArray()) { // Need to remove trailing ';' for an object array, e.g. // "[Ljava.lang.String;" or multi-dimensional // "[[[Ljava.lang.String;" if (modifiedClassName.endsWith(";")) { modifiedClassName = modifiedClassName.substring(0, modifiedClassName.length() - 1); } // Wrap in quotes to handle the illegal '[' character modifiedClassName = "\"" + modifiedClassName + "\""; } mimeType = MimeType.valueOf(KRYO_MIME_TYPE+";type=" + modifiedClassName); mimeTypesCache.put(className, mimeType); } return mimeType; }
/** * Retrieve the class name from the type parameter in {@link MimeType}. * * @param mimeType {@link MimeType} to retrieve class name from * @return class name from the type parameter in MimeType and null if the class name cannot be determined */ public static String classNameFromMimeType(MimeType mimeType) { Assert.notNull(mimeType, "mimeType cannot be null."); String className = mimeType.getParameter("type"); if (className == null) { return null; } // unwrap quotes if any className = className.replace("\"", ""); // restore trailing ';' if (className.contains("[L")) { className += ";"; } return className; }
@Test @SuppressWarnings("unchecked") public void testAnnotatedArguments() throws Exception { ConfigurableApplicationContext context = SpringApplication.run(TestPojoWithAnnotatedArguments.class, "--server.port=0"); TestPojoWithAnnotatedArguments testPojoWithAnnotatedArguments = context .getBean(TestPojoWithAnnotatedArguments.class); Sink sink = context.getBean(Sink.class); String id = UUID.randomUUID().toString(); sink.input().send(MessageBuilder.withPayload("{\"foo\":\"barbar" + id + "\"}") .setHeader("contentType", MimeType.valueOf("application/json")).setHeader("testHeader", "testValue").build()); assertThat(testPojoWithAnnotatedArguments.receivedArguments).hasSize(3); assertThat(testPojoWithAnnotatedArguments.receivedArguments.get(0)) .isInstanceOf(StreamListenerTestUtils.FooPojo.class); assertThat(testPojoWithAnnotatedArguments.receivedArguments.get(0)).hasFieldOrPropertyWithValue("foo", "barbar" + id); assertThat(testPojoWithAnnotatedArguments.receivedArguments.get(1)).isInstanceOf(Map.class); assertThat((Map<String, Object>) testPojoWithAnnotatedArguments.receivedArguments.get(1)) .containsEntry(MessageHeaders.CONTENT_TYPE, MimeType.valueOf("application/json")); assertThat((Map<String, String>) testPojoWithAnnotatedArguments.receivedArguments.get(1)) .containsEntry("testHeader", "testValue"); assertThat(testPojoWithAnnotatedArguments.receivedArguments.get(2)).isEqualTo("application/json"); context.close(); }
@Test public void testSendWithDefaultContentType() throws Exception { try (ConfigurableApplicationContext context = SpringApplication.run( SourceApplication.class, "--server.port=0", "--spring.jmx.enabled=false")) { MessageCollector collector = context.getBean(MessageCollector.class); Source source = context.getBean(Source.class); User user = new User("Alice"); source.output().send(MessageBuilder.withPayload(user).build()); Message<String> message = (Message<String>) collector .forChannel(source.output()).poll(1, TimeUnit.SECONDS); User received = mapper.readValue(message.getPayload(), User.class); assertThat( message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) .includes(MimeTypeUtils.APPLICATION_JSON)); assertThat(user.getName()).isEqualTo(received.getName()); } }
@Test public void testSendJsonAsString() throws Exception { try (ConfigurableApplicationContext context = SpringApplication.run( SourceApplication.class, "--server.port=0", "--spring.jmx.enabled=false")) { MessageCollector collector = context.getBean(MessageCollector.class); Source source = context.getBean(Source.class); User user = new User("Alice"); String json = mapper.writeValueAsString(user); source.output().send(MessageBuilder.withPayload(user).build()); Message<String> message = (Message<String>) collector .forChannel(source.output()).poll(1, TimeUnit.SECONDS); assertThat( message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) .includes(MimeTypeUtils.APPLICATION_JSON)); assertThat(json).isEqualTo(message.getPayload()); } }
@Test public void testSendJsonString() throws Exception{ try (ConfigurableApplicationContext context = SpringApplication.run( SourceApplication.class, "--server.port=0", "--spring.jmx.enabled=false")) { MessageCollector collector = context.getBean(MessageCollector.class); Source source = context.getBean(Source.class); source.output().send(MessageBuilder.withPayload("foo").build()); Message<String> message = (Message<String>) collector .forChannel(source.output()).poll(1, TimeUnit.SECONDS); assertThat( message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) .includes(MimeTypeUtils.APPLICATION_JSON)); assertThat("foo").isEqualTo(message.getPayload()); } }
@Test public void testSendBynaryData() throws Exception { try (ConfigurableApplicationContext context = SpringApplication.run( SourceApplication.class, "--server.port=0", "--spring.jmx.enabled=false")) { MessageCollector collector = context.getBean(MessageCollector.class); Source source = context.getBean(Source.class); byte[] data = new byte[] { 0, 1, 2, 3 }; source.output().send(MessageBuilder.withPayload(data).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_OCTET_STREAM).build()); Message<byte[]> message = (Message<byte[]>) collector .forChannel(source.output()).poll(1, TimeUnit.SECONDS); assertThat( message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) .includes(MimeTypeUtils.APPLICATION_OCTET_STREAM)); assertThat(message.getPayload()).isEqualTo(data); } }
@Test public void testSendBinaryDataWithContentTypeUsingHeaders() throws Exception { try (ConfigurableApplicationContext context = SpringApplication.run( SourceApplication.class, "--server.port=0", "--spring.jmx.enabled=false")) { MessageCollector collector = context.getBean(MessageCollector.class); Source source = context.getBean(Source.class); byte[] data = new byte[] { 0, 1, 2, 3 }; source.output().send(MessageBuilder.withPayload(data) .setHeader(MessageHeaders.CONTENT_TYPE,MimeTypeUtils.IMAGE_JPEG) .build()); Message<byte[]> message = (Message<byte[]>) collector .forChannel(source.output()).poll(1, TimeUnit.SECONDS); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) .includes(MimeTypeUtils.IMAGE_JPEG)); assertThat(message.getPayload()).isEqualTo(data); } }
@Test public void testSendJavaSerializable() throws Exception { try (ConfigurableApplicationContext context = SpringApplication.run( SourceApplication.class, "--server.port=0", "--spring.jmx.enabled=false", "--spring.cloud.stream.bindings.output.contentType=application/x-java-serialized-object")) { MessageCollector collector = context.getBean(MessageCollector.class); Source source = context.getBean(Source.class); User user = new User("Alice"); source.output().send(MessageBuilder.withPayload(user).build()); Message<User> message = (Message<User>) collector .forChannel(source.output()).poll(1, TimeUnit.SECONDS); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) .includes(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT)); User received = message.getPayload(); assertThat(user.getName()).isEqualTo(received.getName()); } }
@Test public void testSendKryoSerialized() throws Exception { try (ConfigurableApplicationContext context = SpringApplication.run( SourceApplication.class, "--server.port=0", "--spring.jmx.enabled=false", "--spring.cloud.stream.bindings.output.contentType=application/x-java-object")) { MessageCollector collector = context.getBean(MessageCollector.class); Source source = context.getBean(Source.class); User user = new User("Alice"); source.output().send(MessageBuilder.withPayload(user).build()); Message<User> message = (Message<User>) collector .forChannel(source.output()).poll(1, TimeUnit.SECONDS); User received = message.getPayload(); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) .includes(MimeType.valueOf(KryoMessageConverter.KRYO_MIME_TYPE))); assertThat(user.getName()).isEqualTo(received.getName()); } }
@Test public void testSendStringType() throws Exception{ try (ConfigurableApplicationContext context = SpringApplication.run( SourceApplication.class, "--server.port=0", "--spring.jmx.enabled=false", "--spring.cloud.stream.bindings.output.contentType=text/plain")) { MessageCollector collector = context.getBean(MessageCollector.class); Source source = context.getBean(Source.class); User user = new User("Alice"); source.output().send(MessageBuilder.withPayload(user).build()); Message<String> message = (Message<String>) collector .forChannel(source.output()).poll(1, TimeUnit.SECONDS); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) .includes(MimeTypeUtils.TEXT_PLAIN)); assertThat(message.getPayload()).isEqualTo(user.toString()); } }
@Test public void testSendTuple() throws Exception { try (ConfigurableApplicationContext context = SpringApplication.run( SourceApplication.class, "--server.port=0", "--spring.jmx.enabled=false", "--spring.cloud.stream.bindings.output.contentType=application/x-spring-tuple")) { MessageCollector collector = context.getBean(MessageCollector.class); Source source = context.getBean(Source.class); Tuple tuple = TupleBuilder.tuple().of("foo","bar"); source.output().send(MessageBuilder.withPayload(tuple).build()); Message<byte[]> message = (Message<byte[]>) collector .forChannel(source.output()).poll(1, TimeUnit.SECONDS); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) .includes(MessageConverterUtils.X_SPRING_TUPLE)); assertThat(TupleBuilder.fromString(new String(message.getPayload()))).isEqualTo(tuple); } }
@Test public void testReceiveWithDefaults() throws Exception { try (ConfigurableApplicationContext context = SpringApplication.run( SinkApplication.class, "--server.port=0", "--spring.jmx.enabled=false")) { TestSink testSink = context.getBean(TestSink.class); SinkApplication sourceApp = context.getBean(SinkApplication.class); User user = new User("Alice"); testSink.pojo().send(MessageBuilder.withPayload(mapper.writeValueAsBytes(user)).build()); Map<String,Object> headers = (Map<String, Object>) sourceApp.arguments.pop(); User received = (User)sourceApp.arguments.pop(); assertThat(((MimeType)headers.get(MessageHeaders.CONTENT_TYPE)) .includes(MimeTypeUtils.APPLICATION_JSON)); assertThat(user.getName()).isEqualTo(received.getName()); } }
@Test public void testReceiveRawWithDifferentContentTypes() throws Exception { try (ConfigurableApplicationContext context = SpringApplication.run( SinkApplication.class, "--server.port=0", "--spring.jmx.enabled=false")) { TestSink testSink = context.getBean(TestSink.class); SinkApplication sourceApp = context.getBean(SinkApplication.class); testSink.raw().send(MessageBuilder.withPayload(new byte[4]) .setHeader(MessageHeaders.CONTENT_TYPE,MimeTypeUtils.IMAGE_JPEG) .build()); testSink.raw().send(MessageBuilder.withPayload(new byte[4]) .setHeader(MessageHeaders.CONTENT_TYPE,MimeTypeUtils.IMAGE_GIF) .build()); Map<String,Object> headers = (Map<String, Object>) sourceApp.arguments.pop(); sourceApp.arguments.pop(); assertThat(((MimeType)headers.get(MessageHeaders.CONTENT_TYPE)) .includes(MimeTypeUtils.IMAGE_GIF)); headers = (Map<String, Object>) sourceApp.arguments.pop(); sourceApp.arguments.pop(); assertThat(((MimeType)headers.get(MessageHeaders.CONTENT_TYPE)) .includes(MimeTypeUtils.IMAGE_JPEG)); } }
@Test public void testReceiveKryoPayload() throws Exception { try (ConfigurableApplicationContext context = SpringApplication.run( SinkApplication.class, "--server.port=0", "--spring.jmx.enabled=false", "--spring.cloud.stream.bindings.pojo_input.contentType=application/x-java-object;type=org.springframework.cloud.stream.config.contentType.User" )) { TestSink testSink = context.getBean(TestSink.class); SinkApplication sourceApp = context.getBean(SinkApplication.class); Kryo kryo = new Kryo(); User user = new User("Alice"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); Output output = new Output(baos); kryo.writeObject(output,user); output.close(); testSink.pojo().send(MessageBuilder.withPayload(baos.toByteArray()).build()); Map<String,Object> headers = (Map<String, Object>) sourceApp.arguments.pop(); User received = (User)sourceApp.arguments.pop(); assertThat(((MimeType)headers.get(MessageHeaders.CONTENT_TYPE)) .includes(MimeType.valueOf(KryoMessageConverter.KRYO_MIME_TYPE))); assertThat(user.getName()).isEqualTo(received.getName()); } }
@Test(expected=MessageDeliveryException.class) public void testReceiveKryoWithHeadersOverridingDefault() throws Exception{ try (ConfigurableApplicationContext context = SpringApplication.run( SinkApplication.class, "--server.port=0", "--spring.jmx.enabled=false" )) { TestSink testSink = context.getBean(TestSink.class); SinkApplication sourceApp = context.getBean(SinkApplication.class); Kryo kryo = new Kryo(); User user = new User("Alice"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); Output output = new Output(baos); kryo.writeObject(output,user); output.close(); testSink.pojo().send(MessageBuilder.withPayload(baos.toByteArray()) .setHeader(MessageHeaders.CONTENT_TYPE, MimeType.valueOf(KryoMessageConverter.KRYO_MIME_TYPE)) .build()); Map<String,Object> headers = (Map<String, Object>) sourceApp.arguments.pop(); User received = (User)sourceApp.arguments.pop(); assertThat(((MimeType)headers.get(MessageHeaders.CONTENT_TYPE)) .includes(MimeType.valueOf(KryoMessageConverter.KRYO_MIME_TYPE))); assertThat(user.getName()).isEqualTo(received.getName()); } }
@Test public void testReceiveJavaSerializable() throws Exception { try (ConfigurableApplicationContext context = SpringApplication.run( SinkApplication.class, "--server.port=0", "--spring.jmx.enabled=false", "--spring.cloud.stream.bindings.pojo_input.contentType=application/x-java-serialized-object" )) { TestSink testSink = context.getBean(TestSink.class); SinkApplication sourceApp = context.getBean(SinkApplication.class); User user = new User("Alice"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); new ObjectOutputStream(baos).writeObject(user); testSink.pojo().send(MessageBuilder.withPayload(baos.toByteArray()).build()); Map<String,Object> headers = (Map<String, Object>) sourceApp.arguments.pop(); User received = (User)sourceApp.arguments.pop(); assertThat(((MimeType)headers.get(MessageHeaders.CONTENT_TYPE)) .includes(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT)); assertThat(user.getName()).isEqualTo(received.getName()); } }
@Test @SuppressWarnings("unchecked") public void testHandlerBean() throws Exception { ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--spring.cloud.stream.bindings.output.contentType=application/json", "--server.port=0"); MessageCollector collector = context.getBean(MessageCollector.class); Processor processor = context.getBean(Processor.class); String id = UUID.randomUUID().toString(); processor.input().send( MessageBuilder.withPayload("{\"foo\":\"barbar" + id + "\"}") .setHeader("contentType", "application/json").build()); HandlerBean handlerBean = context.getBean(HandlerBean.class); Assertions.assertThat(handlerBean.receivedPojos).hasSize(1); Assertions.assertThat(handlerBean.receivedPojos.get(0)).hasFieldOrPropertyWithValue("foo", "barbar" + id); Message<String> message = (Message<String>) collector.forChannel( processor.output()).poll(1, TimeUnit.SECONDS); assertThat(message).isNotNull(); assertThat(message.getPayload()).isEqualTo("{\"bar\":\"barbar" + id + "\"}"); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) .includes(MimeTypeUtils.APPLICATION_JSON)); context.close(); }
@Test @SuppressWarnings("unchecked") public void testReturnConversion() throws Exception { ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--spring.cloud.stream.bindings.output.contentType=application/json", "--server.port=0","--spring.jmx.enabled=false"); MessageCollector collector = context.getBean(MessageCollector.class); Processor processor = context.getBean(Processor.class); String id = UUID.randomUUID().toString(); processor.input().send(MessageBuilder.withPayload("{\"foo\":\"barbar" + id + "\"}") .setHeader("contentType", "application/json").build()); TestPojoWithMimeType testPojoWithMimeType = context.getBean(TestPojoWithMimeType.class); Assertions.assertThat(testPojoWithMimeType.receivedPojos).hasSize(1); Assertions.assertThat(testPojoWithMimeType.receivedPojos.get(0)).hasFieldOrPropertyWithValue("foo", "barbar" + id); Message<String> message = (Message<String>) collector.forChannel(processor.output()).poll(1, TimeUnit.SECONDS); assertThat(message).isNotNull(); assertThat(new String(message.getPayload())).isEqualTo("{\"bar\":\"barbar" + id + "\"}"); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) .includes(MimeTypeUtils.APPLICATION_JSON)); context.close(); }
@Test @SuppressWarnings("unchecked") public void testReturnNoConversion() throws Exception { ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0","--spring.jmx.enabled=false"); MessageCollector collector = context.getBean(MessageCollector.class); Processor processor = context.getBean(Processor.class); String id = UUID.randomUUID().toString(); processor.input().send(MessageBuilder.withPayload("{\"foo\":\"barbar" + id + "\"}") .setHeader("contentType", "application/json").build()); TestPojoWithMimeType testPojoWithMimeType = context.getBean(TestPojoWithMimeType.class); Assertions.assertThat(testPojoWithMimeType.receivedPojos).hasSize(1); Assertions.assertThat(testPojoWithMimeType.receivedPojos.get(0)).hasFieldOrPropertyWithValue("foo", "barbar" + id); Message<String> message = (Message<String>) collector .forChannel(processor.output()).poll(1, TimeUnit.SECONDS); assertThat(message).isNotNull(); StreamListenerTestUtils.BarPojo barPojo = mapper.readValue(message.getPayload(),StreamListenerTestUtils.BarPojo.class); assertThat(barPojo.getBar()).isEqualTo("barbar" + id); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class) != null); context.close(); }
@Override protected Schema resolveWriterSchemaForDeserialization(MimeType mimeType) { if (this.readerSchema == null) { Schema schema = null; ParsedSchema parsedSchema = null; SchemaReference schemaReference = extractSchemaReference(mimeType); if (schemaReference != null) { parsedSchema = cacheManager.getCache(REFERENCE_CACHE_NAME) .get(schemaReference, ParsedSchema.class); if (parsedSchema == null) { String schemaContent = this.schemaRegistryClient .fetch(schemaReference); schema = new Schema.Parser().parse(schemaContent); parsedSchema = new ParsedSchema(schema); cacheManager.getCache(REFERENCE_CACHE_NAME) .putIfAbsent(schemaReference, parsedSchema); } } return parsedSchema.getSchema(); } else { return this.readerSchema; } }
@Test public void testCustomNamingStrategy() throws Exception { ConfigurableApplicationContext sourceContext = SpringApplication.run(AvroSourceApplication.class, "--server.port=0", "--debug", "--spring.jmx.enabled=false", "--spring.cloud.stream.bindings.output.contentType=application/*+avro", "--spring.cloud.stream.schema.avro.subjectNamingStrategy=org.springframework.cloud.schema.avro.CustomSubjectNamingStrategy", "--spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true"); Source source = sourceContext.getBean(Source.class); User1 user1 = new User1(); user1.setFavoriteColor("foo" + UUID.randomUUID().toString()); user1.setName("foo" + UUID.randomUUID().toString()); source.output().send(MessageBuilder.withPayload(user1).build()); MessageCollector barSourceMessageCollector = sourceContext.getBean(MessageCollector.class); Message<?> message = barSourceMessageCollector.forChannel(source.output()).poll(1000, TimeUnit.MILLISECONDS); assertThat(message.getHeaders().get("contentType")) .isEqualTo(MimeType.valueOf("application/vnd.org.springframework.cloud.schema.avro.User1.v1+avro")); }
@Bean @StreamMessageConverter public MessageConverter userMessageConverter() throws IOException { AvroSchemaMessageConverter avroSchemaMessageConverter = new AvroSchemaMessageConverter( MimeType.valueOf("avro/bytes")); if (schemaLocation != null) { avroSchemaMessageConverter.setSchemaLocation(schemaLocation); } return avroSchemaMessageConverter; }
public MimeType getMimeType(String file) { for (Entry<String, String> entry : wellKnownMimeTypes.entrySet()) { if (file.endsWith(entry.getKey())) { return MimeType.valueOf(entry.getValue()); } } String mimeType = servletContext.getMimeType(file); if (mimeType != null) { return MimeType.valueOf(mimeType); } throw new IllegalArgumentException(String.format("No mimetype for %s found", file)); }