/** * Execute the given method on the provided URI. The * {@link org.springframework.http.client.ClientHttpRequest} * is processed using the {@link RequestCallback}; the response with * the {@link ResponseExtractor}. * @param url the fully-expanded URL to connect to * @param method the HTTP method to execute (GET, POST, etc.) * @param requestCallback object that prepares the request (can be {@code null}) * @param responseExtractor object that extracts the return value from the response (can * be {@code null}) * @return an arbitrary object, as returned by the {@link ResponseExtractor} */ protected <T> ListenableFuture<T> doExecute(URI url, HttpMethod method, AsyncRequestCallback requestCallback, ResponseExtractor<T> responseExtractor) throws RestClientException { Assert.notNull(url, "'url' must not be null"); Assert.notNull(method, "'method' must not be null"); try { AsyncClientHttpRequest request = createAsyncRequest(url, method); if (requestCallback != null) { requestCallback.doWithRequest(request); } ListenableFuture<ClientHttpResponse> responseFuture = request.executeAsync(); return new ResponseExtractorFuture<T>(method, url, responseFuture, responseExtractor); } catch (IOException ex) { throw new ResourceAccessException("I/O error on " + method.name() + " request for \"" + url + "\":" + ex.getMessage(), ex); } }
@Override public <T> void write(final AsyncClientHttpRequest request, final HttpEntity<T> entity) throws IOException { final HttpHeaders headers = entity.getHeaders(); request.getHeaders().putAll(headers); @Nullable final T body = entity.getBody(); if (body == null) { return; } final Class<?> type = body.getClass(); @Nullable final MediaType contentType = headers.getContentType(); converters.stream() .filter(converter -> converter.canWrite(type, contentType)) .map(this::<T>cast) .findFirst() .orElseThrow(() -> fail(type, contentType)) .write(body, contentType, request); }
@Override public void doWithRequest(final AsyncClientHttpRequest request) throws IOException { if (this.adaptee != null) { this.adaptee.doWithRequest(new ClientHttpRequest() { @Override public ClientHttpResponse execute() throws IOException { throw new UnsupportedOperationException("execute not supported"); } @Override public OutputStream getBody() throws IOException { return request.getBody(); } @Override public HttpMethod getMethod() { return request.getMethod(); } @Override public URI getURI() { return request.getURI(); } @Override public HttpHeaders getHeaders() { return request.getHeaders(); } }); } }
/** * Create a new {@link AsyncClientHttpRequest} via this template's {@link * AsyncClientHttpRequestFactory}. * @param url the URL to connect to * @param method the HTTP method to execute (GET, POST, etc.) * @return the created request * @throws IOException in case of I/O errors */ protected AsyncClientHttpRequest createAsyncRequest(URI url, HttpMethod method) throws IOException { AsyncClientHttpRequest request = getAsyncRequestFactory().createAsyncRequest(url, method); if (logger.isDebugEnabled()) { logger.debug("Created asynchronous " + method.name() + " request for \"" + url + "\""); } return request; }
@Override public AsyncClientHttpRequest createAsyncRequest(URI uri, HttpMethod httpMethod) throws IOException { AsyncClientHttpRequest request = this.asyncDelegate .createAsyncRequest(uri, httpMethod); addRequestTags(request); publishStartEvent(request); return request; }
private CompletableFuture<ClientHttpResponse> send() throws IOException { final AsyncClientHttpRequest request = createRequest(); worker.write(request, entity); final ListenableFuture<ClientHttpResponse> original = request.executeAsync(); final CompletableFuture<ClientHttpResponse> future = preserveCancelability(original); original.addCallback(future::complete, future::completeExceptionally); return future; }
@Test public void shouldReadContributorsManually() throws IOException, ExecutionException, InterruptedException { driver.addExpectation(onRequestTo("/repos/zalando/riptide/contributors").withMethod(Method.POST), giveResponseAsBytes(getResource("contributors.json").openStream(), "application/json")); final URI uri = URI.create(driver.getBaseUrl()).resolve("/repos/zalando/riptide/contributors"); final AsyncClientHttpRequest request = factory.createAsyncRequest(uri, POST); request.getHeaders().setAccept(singletonList(APPLICATION_JSON)); request.getBody().write("{}".getBytes(UTF_8)); assertThat(request.getMethod(), is(POST)); assertThat(request.getURI(), hasToString(endsWith("/repos/zalando/riptide/contributors"))); assertThat(request.getHeaders().getAccept(), hasItem(APPLICATION_JSON)); final ClientHttpResponse response = request.executeAsync().get(); assertThat(response.getStatusCode(), is(HttpStatus.OK)); assertThat(response.getRawStatusCode(), is(200)); assertThat(response.getStatusText(), is("OK")); assertThat(response.getHeaders(), is(not(anEmptyMap()))); final InputStream stream = response.getBody(); final ObjectMapper mapper = createObjectMapper(); final List<User> users = mapper.readValue(stream, new TypeReference<List<User>>() { }); final List<String> names = users.stream() .map(User::getLogin) .collect(toList()); assertThat(names, hasItems("jhorstmann", "lukasniemeier-zalando", "whiskeysierra")); }
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { AsyncClientHttpRequest clientHttpRequest = (AsyncClientHttpRequest)ret; if (ret != null) { Object[] cacheValues = (Object[])objInst.getSkyWalkingDynamicField(); ContextCarrier contextCarrier = (ContextCarrier)cacheValues[1]; CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); clientHttpRequest.getHeaders().set(next.getHeadKey(), next.getHeadValue()); } } return ret; }
@Override public AsyncClientHttpRequest createAsyncRequest(URI uri, HttpMethod httpMethod) throws IOException { return createRequestInternal(uri, httpMethod); }
private AsyncClientHttpRequest createRequest() throws IOException { final URI requestUri = arguments.getRequestUri(); final HttpMethod method = arguments.getMethod(); return requestFactory.createAsyncRequest(requestUri, method); }
@Override public AsyncClientHttpRequest createAsyncRequest(final URI uri, final HttpMethod method) throws IOException { return new RestAsyncClientHttpRequest(factory.createRequest(uri, method), executor); }
@Override public AsyncClientHttpRequest createAsyncRequest(URI uri, HttpMethod httpMethod) throws IOException { return super.createAsyncRequest(expand(uri), httpMethod); }
@Override public AsyncClientHttpRequest createAsyncRequest(URI uri, HttpMethod httpMethod) throws IOException { return null; }
/** * Gets called by {@link AsyncRestTemplate#execute} with an opened {@code ClientHttpRequest}. * Does not need to care about closing the request or about handling errors: * this will all be handled by the {@code RestTemplate}. * @param request the active HTTP request * @throws java.io.IOException in case of I/O errors */ void doWithRequest(AsyncClientHttpRequest request) throws IOException;
<T> void write(final AsyncClientHttpRequest request, final HttpEntity<T> entity) throws IOException;