Java 类io.reactivex.netty.protocol.http.sse.ServerSentEvent 实例源码

项目:MarketData    文件:RxNettyEventServer.java   
private Observable<Void> getIntervalObservable(HttpServerRequest<?> request, final HttpServerResponse<ServerSentEvent> response) {
    HttpRequest simpleRequest = new HttpRequest(request.getQueryParameters());
    return getEvents(simpleRequest)
            .flatMap(event -> {
                System.out.println("Writing SSE event: " + event);
                ByteBuf data = response.getAllocator().buffer().writeBytes(( event + "\n").getBytes());
                ServerSentEvent sse = new ServerSentEvent(data);
                return response.writeAndFlush(sse);
            }).materialize()
            .takeWhile(notification -> {
                if (notification.isOnError()) {
                    System.out.println("Write to client failed, stopping response sending.");
                    notification.getThrowable().printStackTrace(System.err);
                }
                return !notification.isOnError();
            })
            .map((Func1<Notification<Void>, Void>) notification -> null);
}
项目:ReactiveLab    文件:BookmarksService.java   
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> videoIds = request.getQueryParameters().get("videoId");

    int latency = 1;
    if (Random.randomIntFrom0to100() > 80) {
        latency = 10;
    }

    return Observable.from(videoIds).map(videoId -> {
        Map<String, Object> video = new HashMap<>();
        video.put("videoId", videoId);
        video.put("position", (int) (Math.random() * 5000));
        return video;
    }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
            .delay(latency, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
}
项目:ReactiveLab    文件:GeoService.java   
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    return request.getContent().flatMap(i -> {
        List<String> ips = request.getQueryParameters().get("ip");
        Map<String, Object> data = new HashMap<>();
        for (String ip : ips) {
            Map<String, Object> ip_data = new HashMap<>();
            ip_data.put("country_code", "GB");
            ip_data.put("longitude", "-0.13");
            ip_data.put("latitude", "51.5");
            data.put(ip, ip_data);
        }
        return response.writeStringAndFlush("data: " + SimpleJson.mapToJson(data) + "\n")
                .doOnCompleted(response::close);
    }).delay(10, TimeUnit.MILLISECONDS);
}
项目:ReactiveLab    文件:UserService.java   
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> userIds = request.getQueryParameters().get("userId");
    if (userIds == null || userIds.size() == 0) {
        return writeError(request, response, "At least one parameter of 'userId' must be included.");
    }
    return Observable.from(userIds).map(userId -> {
        Map<String, Object> user = new HashMap<>();
        user.put("userId", userId);
        user.put("name", "Name Here");
        user.put("other_data", "goes_here");
        return user;
    }).flatMap(user -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(user) + "\n")
            .doOnCompleted(response::close))
            .delay(((long) (Math.random() * 500) + 500), TimeUnit.MILLISECONDS); // simulate latency
}
项目:ge-export    文件:Application.java   
private static Observable<ServerSentEvent> buildStream(Instant since, String lastStreamedBuildId) {

        AtomicReference<String> _lastBuildId = new AtomicReference<>(null);

        final String buildsSinceUri = "/build-export/v1/builds/since/" + String.valueOf(since.toEpochMilli());
        LOGGER.info("Builds uri: " + buildsSinceUri);

        HttpClientRequest<ByteBuf, ByteBuf> request = HTTP_CLIENT
                .createGet(buildsSinceUri)
                .setKeepAlive(true);
        if (BASIC_AUTH != null) {
            request = request.addHeader("Authorization", "Basic " + BASIC_AUTH);
        }

        if (lastStreamedBuildId != null) {
            request = request.addHeader("Last-Event-ID", lastStreamedBuildId);
        }

        return request
                .flatMap(HttpClientResponse::getContentAsServerSentEvents)
                .doOnNext(serverSentEvent -> _lastBuildId.set(serverSentEvent.getEventIdAsString()))
                .doOnSubscribe(() -> LOGGER.info("Streaming builds..."))
                .onErrorResumeNext(t -> {
                    LOGGER.info("Error streaming builds, resuming from build id: " + _lastBuildId.get());
                    return buildStream(since, _lastBuildId.get());
                });
    }
项目:MarketData    文件:RxNettyEventEventStreamClient.java   
private Observable<String> initializeStream() {
    HttpClient<ByteBuf, ServerSentEvent> client =
            RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>clientSseConfigurator());

    return client.submit(HttpClientRequest.createGet("/hello")).
            flatMap(response -> {
                printResponseHeader(response);
                return response.getContent();
            }).map(serverSentEvent -> serverSentEvent.contentAsString());
}
项目:MarketData    文件:RxNettyEventEventStreamClient.java   
private static void printResponseHeader(HttpClientResponse<ServerSentEvent> response) {
    System.out.println("New response received.");
    System.out.println("========================");
    System.out.println(response.getHttpVersion().text() + ' ' + response.getStatus().code()
            + ' ' + response.getStatus().reasonPhrase());
    for (Map.Entry<String, String> header : response.getHeaders().entries()) {
        System.out.println(header.getKey() + ": " + header.getValue());
    }
}
项目:MarketData    文件:RxNettyEventBroadcaster.java   
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
    if (flaky) {
        events = SubscriptionLimiter
                    .limitSubscriptions(1,initializeEventStream());
    } else {
        events  = initializeEventStream();
    }
    return super.createServer();
}
项目:MarketData    文件:RxNettyEventServer.java   
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
    HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port,
            (request, response) -> {
                response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
                response.getHeaders().set(CACHE_CONTROL, "no-cache");
                response.getHeaders().set(CONNECTION, "keep-alive");
                response.getHeaders().set(CONTENT_TYPE, "text/event-stream");
                return getIntervalObservable(request, response);
            }, PipelineConfigurators.<ByteBuf>serveSseConfigurator());
    System.out.println("HTTP Server Sent Events server started...");
    return server;
}
项目:vizceral-hystrix    文件:HystrixReader.java   
/**
 * Creates a new hystrix reader.
 *
 * @param configuration The configuration to use.
 * @param cluster       The cluster to read from.
 */
public HystrixReader(Configuration configuration, String cluster)
{
    this.configuration = configuration;
    this.cluster = cluster;
    HttpClientBuilder<ByteBuf, ServerSentEvent> builder = RxNetty.newHttpClientBuilder(configuration.getTurbineHost(), configuration.getTurbinePort());
    builder.pipelineConfigurator(PipelineConfigurators.clientSseConfigurator());
    if (configuration.isSecure())
    {
        builder.withSslEngineFactory(DefaultFactories.trustAll());
    }
    rxNetty = builder.build();
}
项目:argos-dashboard    文件:DefaultHystrixClusterMonitor.java   
@Override
public Observable<String> observeJson() {
    if(jsonObservable != null) {
        return jsonObservable;
    }

    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(url.getPath() + "?" + url.getQuery());
    int port = url.getPort() < 0 ? url.getDefaultPort() : url.getPort();
    HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.<ByteBuf, ServerSentEvent>newHttpClientBuilder(url.getHost(), port)
            .withNoConnectionPooling()
            .pipelineConfigurator(PipelineConfigurators.<ByteBuf>clientSseConfigurator())
            .build();


    jsonObservable = client.submit(request)
            .doOnError(t -> LOG.error("Error connecting to " + url, t))
            .flatMap(response -> {
                        if (response.getStatus().code() != 200) {
                            return Observable.error(new RuntimeException("Failed to connect: " + response.getStatus()));
                        }

                        return response.getContent()
                                .doOnSubscribe(() -> LOG.info("Turbine => Aggregate Stream from URL: " + url))
                                .doOnUnsubscribe(() -> LOG.info("Turbine => Unsubscribing Stream: " + url))
                                .map(ServerSentEvent::contentAsString);
                    }
            )
            .timeout(120, TimeUnit.SECONDS)
            .retryWhen(attempts -> attempts.zipWith(Observable.range(1, Integer.MAX_VALUE), (k, i) -> i)
                    .flatMap(n -> {
                        int waitTimeSeconds = Math.min(6, n) * 10; // wait in 10 second increments up to a max of 1 minute
                        LOG.info("Turbine => Retrying connection to: " + this.url + " in {} seconds", waitTimeSeconds);
                        return Observable.timer(waitTimeSeconds, TimeUnit.SECONDS);
                    })
            )
            .repeat()
            .share();

    return jsonObservable;
}
项目:spring-cloud-netflix    文件:TurbineStreamConfiguration.java   
@Bean
@SuppressWarnings("deprecation")
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
    // multicast so multiple concurrent subscribers get the same stream
    Observable<Map<String, Object>> publishedStreams = StreamAggregator
            .aggregateGroupedStreams(hystrixSubject().groupBy(
                    data -> InstanceKey.create((String) data.get("instanceId"))))
            .doOnUnsubscribe(() -> log.info("Unsubscribing aggregation."))
            .doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o)
            .publish().refCount();
    Observable<Map<String, Object>> ping = Observable.timer(1, 10, TimeUnit.SECONDS)
            .map(count -> Collections.singletonMap("type", (Object) "Ping")).publish()
            .refCount();
    Observable<Map<String, Object>> output = Observable.merge(publishedStreams, ping);

    this.turbinePort = this.properties.getPort();

    if (this.turbinePort <= 0) {
        this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
    }

    HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty
            .createHttpServer(this.turbinePort, (request, response) -> {
                log.info("SSE Request Received");
                response.getHeaders().setHeader("Content-Type", "text/event-stream");
                return output.doOnUnsubscribe(
                        () -> log.info("Unsubscribing RxNetty server connection"))
                        .flatMap(data -> response.writeAndFlush(new ServerSentEvent(
                                null,
                                Unpooled.copiedBuffer("message",
                                        StandardCharsets.UTF_8),
                                Unpooled.copiedBuffer(JsonUtility.mapToJson(data),
                                        StandardCharsets.UTF_8))));
            }, serveSseConfigurator());
    return httpServer;
}
项目:ReactiveLab    文件:AbstractMiddleTierService.java   
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) {
    System.out.println("Start " + getClass().getSimpleName() + " on port: " + port);

    // declare handler chain (wrapped in Hystrix)
    // TODO create a better way of chaining these (related https://github.com/ReactiveX/RxNetty/issues/232 and https://github.com/ReactiveX/RxNetty/issues/202)
    HystrixMetricsStreamHandler<ByteBuf, ServerSentEvent> handlerChain 
      = new HystrixMetricsStreamHandler<>(metrics, "/hystrix.stream", 1000, (request, response) -> {
        try {
            long startTime = System.currentTimeMillis();
            return handleRequest(request, response)
                    .doOnCompleted(() -> System.out.println("Response => " + request.getPath() + " Time => " + (int) (System.currentTimeMillis() - startTime) + "ms"))
                    .doOnCompleted(() -> metrics.getRollingPercentile().addValue((int) (System.currentTimeMillis() - startTime)))
                    .doOnCompleted(() -> metrics.getRollingNumber().add(Metrics.EventType.SUCCESS, 1))
                    .doOnError(t -> metrics.getRollingNumber().add(Metrics.EventType.FAILURE, 1));
        } catch (Throwable e) {
            e.printStackTrace();
            System.err.println("Server => Error [" + request.getPath() + "] => " + e);
            response.setStatus(HttpResponseStatus.BAD_REQUEST);
            return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n");
        }
    });

    return RxNetty.createHttpServer(port, (request, response) -> {
        // System.out.println("Server => Request: " + request.getPath());
            return handlerChain.handle(request, response);
        }, PipelineConfigurators.<ByteBuf> serveSseConfigurator());
}
项目:ReactiveLab    文件:PersonalizedCatalogService.java   
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    return Observable.from(request.getQueryParameters().get("userId")).map(userId -> {
        Map<String, Object> userData = new HashMap<>();
        userData.put("user_id", userId);

        userData.put("list_title", "Really quirky and over detailed list title!");
        userData.put("other_data", "goes_here");
        userData.put("videos", Arrays.asList(12345, 23456, 34567, 45678, 56789, 67890));
        return userData;
    }).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
            .delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS)
            .doOnCompleted(response::close); // simulate latency
}
项目:ReactiveLab    文件:SocialService.java   
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    return Observable.from(request.getQueryParameters().get("userId")).map(userId -> {
        Map<String, Object> user = new HashMap<>();
        user.put("userId", userId);
        user.put("friends", Arrays.asList(randomUser(), randomUser(), randomUser(), randomUser()));
        return user;
    }).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
            .delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
}
项目:ReactiveLab    文件:VideoMetadataService.java   
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> videoIds = request.getQueryParameters().get("videoId");
    return Observable.from(videoIds).map(videoId -> {
        Map<String, Object> video = new HashMap<>();
        video.put("videoId", videoId);
        video.put("title", "Video Title");
        video.put("other_data", "goes_here");
        return video;
    }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n")
            .doOnCompleted(response::close))
            .delay(((long) (Math.random() * 20) + 20), TimeUnit.MILLISECONDS); // simulate latency
}
项目:ReactiveLab    文件:MockService.java   
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> _id = request.getQueryParameters().get("id");
    if (_id == null || _id.size() != 1) {
        return writeError(request, response, "Please provide a numerical 'id' value. It can be a random number (uuid). Received => " + _id);
    }
    long id = Long.parseLong(String.valueOf(_id.get(0)));

    int delay = getParameter(request, "delay", 50); // default to 50ms server-side delay
    int itemSize = getParameter(request, "itemSize", 128); // default to 128 bytes item size (assuming ascii text)
    int numItems = getParameter(request, "numItems", 10); // default to 10 items in a list

    // no more than 100 items
    if (numItems < 1 || numItems > 100) {
        return writeError(request, response, "Please choose a 'numItems' value from 1 to 100.");
    }

    // no larger than 50KB per item
    if (itemSize < 1 || itemSize > 1024 * 50) {
        return writeError(request, response, "Please choose an 'itemSize' value from 1 to 1024*50 (50KB).");
    }

    // no larger than 60 second delay
    if (delay < 0 || delay > 60000) {
        return writeError(request, response, "Please choose a 'delay' value from 0 to 60000 (60 seconds).");
    }

    response.setStatus(HttpResponseStatus.OK);
    return MockResponse.generateJson(id, delay, itemSize, numItems)
            .flatMap(json -> response.writeStringAndFlush("data:" + json + "\n"))
            .doOnCompleted(response::close);
}
项目:ReactiveLab    文件:RatingsService.java   
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> videoIds = request.getQueryParameters().get("videoId");
    return Observable.from(videoIds).map(videoId -> {
        Map<String, Object> video = new HashMap<>();
        video.put("videoId", videoId);
        video.put("estimated_user_rating", 3.5);
        video.put("actual_user_rating", 4);
        video.put("average_user_rating", 3.1);
        return video;
    }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
            .delay(20, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latenc
}
项目:ReactiveLab    文件:LoadBalancerFactory.java   
public LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> forVip(String targetVip) {
    Observable<MembershipEvent<Host>> eurekaHostSource = membershipSource.forInterest(Interests.forVips(targetVip), instanceInfo -> {
        String ipAddress = instanceInfo.getDataCenterInfo()
                .getAddresses().stream()
                .filter(na -> na.getProtocolType() == ProtocolType.IPv4)
                .collect(Collectors.toList()).get(0).getIpAddress();
        HashSet<ServicePort> servicePorts = instanceInfo.getPorts();
        ServicePort portToUse = servicePorts.iterator().next();
        return new Host(ipAddress, portToUse.getPort());
    });

    final Map<Host, HttpClientHolder<ByteBuf, ServerSentEvent>> hostVsHolders = new ConcurrentHashMap<>();

    String lbName = targetVip + "-lb";
    return LoadBalancers.newBuilder(eurekaHostSource.map(
            hostEvent -> {
                HttpClient<ByteBuf, ServerSentEvent> client = clientPool.getClientForHost(hostEvent.getClient());
                HttpClientHolder<ByteBuf, ServerSentEvent> holder;
                if (hostEvent.getType() == MembershipEvent.EventType.REMOVE) {
                    holder = hostVsHolders.remove(hostEvent.getClient());
                } else {
                    holder = new HttpClientHolder<>(client);
                    hostVsHolders.put(hostEvent.getClient(), holder);
                }
                return new MembershipEvent<>(hostEvent.getType(), holder);
            })).withWeightingStrategy(new LinearWeightingStrategy<>(new RxNettyPendingRequests<>()))
               .withName(lbName)
            .withFailureDetector(new RxNettyFailureDetector<>()).build();
}
项目:ReactiveLab    文件:PersonalizedCatalogCommand.java   
@Override
protected Observable<Catalog> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/catalog?" + UrlGenerator.generate("userId", users));
    return loadBalancer.choose()
                       .map(holder -> holder.getClient())
                       .<Catalog>flatMap(client -> client.submit(request)
                                                .flatMap(r -> r.getContent()
                                                               .map((ServerSentEvent sse) -> Catalog.fromJson(sse.contentAsString()))))
                       .retry(1);
}
项目:ReactiveLab    文件:SocialCommand.java   
@Override
protected Observable<Social> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/social?" + UrlGenerator.generate("userId", users));
    return loadBalancer.choose().map(holder -> holder.getClient())
            .<Social>flatMap(client -> client.submit(request)
                                     .flatMap(r -> r.getContent().map((ServerSentEvent sse) -> {
                                         String social = sse.contentAsString();
                                         return Social.fromJson(social);
                                     })))
            .retry(1);
}
项目:ReactiveLab    文件:GeoCommand.java   
@Override
protected Observable<GeoIP> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/geo?" + UrlGenerator.generate("ip", ips));
    return loadBalancer.choose()
                       .map(holder -> holder.getClient())
                       .<GeoIP>flatMap(client -> client.submit(request)
                                                .flatMap(r -> r.getContent()
                                                               .map((ServerSentEvent sse) -> GeoIP.fromJson(sse.contentAsString()))))
                       .retry(1);
}
项目:ReactiveLab    文件:UserCommand.java   
@Override
protected Observable<User> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/user?" + UrlGenerator.generate("userId", userIds));
    return loadBalancer.choose().map(holder -> holder.getClient())
            .<User>flatMap(client -> client.submit(request)
                                     .flatMap(r -> r.getContent().map(
                                             (ServerSentEvent sse) -> {
                                                 String user = sse.contentAsString();
                                                 return User.fromJson(user);
                                             })))
            .retry(1);
}
项目:ReactiveLab    文件:VideoMetadataCommand.java   
@Override
protected Observable<VideoMetadata> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/metadata?" + UrlGenerator.generate("videoId",
                                                                                                          videos));
    return loadBalancer.choose()
                       .map(holder -> holder.getClient())
                       .<VideoMetadata>flatMap(client -> client.submit(request)
                                                .flatMap(r -> r.getContent()
                                                               .map((ServerSentEvent sse) -> VideoMetadata.fromJson(sse.contentAsString()))))
                       .retry(1);
}
项目:ReactiveLab    文件:RatingsCommand.java   
@Override
protected Observable<Rating> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/ratings?" + UrlGenerator.generate("videoId", videos));
    return loadBalancer.choose()
                       .map(holder -> holder.getClient())
                       .<Rating>flatMap(client -> client.submit(request)
                                                .flatMap(r -> r.getContent()
                                                               .map((ServerSentEvent sse) -> Rating.fromJson(sse.contentAsString()))))
                       .retry(1);
}
项目:ReactiveLab    文件:BookmarksCommand.java   
public BookmarksCommand(List<Video> videos, LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer) {
    super(HystrixCommandGroupKey.Factory.asKey("GetBookmarks"));
    this.videos = videos;
    this.loadBalancer = loadBalancer;
    StringBuilder b = new StringBuilder();
    for (Video v : videos) {
        b.append(v.getId()).append("-");
    }
    this.cacheKey = b.toString();
}
项目:ReactiveLab    文件:BookmarksCommand.java   
@Override
public Observable<Bookmark> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/bookmarks?" + UrlGenerator.generate("videoId", videos));
    return loadBalancer.choose()
            .map(holder -> holder.getClient())
            .<Bookmark>flatMap(client -> client.submit(request)
                                     .flatMap(r -> r.getContent().map((ServerSentEvent sse) -> Bookmark.fromJson(sse.contentAsString()))))
            .retry(1);
}
项目:ReactiveLab    文件:LoadBalancerFactory.java   
public LoadBalancerFactory(EurekaMembershipSource membershipSource,
                           HttpClientPool<ByteBuf, ServerSentEvent> clientPool) {
    this.membershipSource = membershipSource;
    this.clientPool = clientPool;
}
项目:ReactiveLab    文件:AbstractMiddleTierService.java   
protected abstract Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response);