public FileReaderService(PathSet pathSet, Charset charset, FileInput.InitialReadPosition initialReadPosition, FileInput input, MessageBuilder messageBuilder, ContentSplitter contentSplitter, Buffer buffer, int readerBufferSize, long readerInterval, FileObserver fileObserver) { this.pathSet = pathSet; this.initialReadPosition = initialReadPosition; this.input = input; this.messageBuilder = messageBuilder; this.contentSplitter = contentSplitter; this.buffer = buffer; this.charset = charset; this.readerBufferSize = readerBufferSize; this.readerInterval = readerInterval; this.fileObserver = fileObserver; chunkQueue = Queues.newArrayBlockingQueue(2); }
public Chunk(World worldIn, int x, int z) { this.storageArrays = new ExtendedBlockStorage[16]; this.blockBiomeArray = new byte[256]; this.precipitationHeightMap = new int[256]; this.updateSkylightColumns = new boolean[256]; this.chunkTileEntityMap = Maps.<BlockPos, TileEntity>newHashMap(); this.queuedLightChecks = 4096; this.tileEntityPosQueue = Queues.<BlockPos>newConcurrentLinkedQueue(); this.entityLists = (ClassInheritanceMultiMap[])(new ClassInheritanceMultiMap[16]); this.worldObj = worldIn; this.xPosition = x; this.zPosition = z; this.heightMap = new int[256]; for (int i = 0; i < this.entityLists.length; ++i) { this.entityLists[i] = new ClassInheritanceMultiMap(Entity.class); } Arrays.fill((int[])this.precipitationHeightMap, (int) - 999); Arrays.fill(this.blockBiomeArray, (byte) - 1); }
public ParticleManager(World worldIn, TextureManager rendererIn) { this.worldObj = worldIn; this.renderer = rendererIn; for (int i = 0; i < 4; ++i) { this.fxLayers[i] = new ArrayDeque[2]; for (int j = 0; j < 2; ++j) { this.fxLayers[i][j] = Queues.newArrayDeque(); } } this.registerVanillaParticles(); }
@Override public void afterPropertiesSet() throws Exception { auditExecutorService.submit(() -> { while (!auditStopped.get() && !Thread.currentThread().isInterrupted()) { List<ConsumerAudit> toAudit = Lists.newArrayList(); try { Queues.drain(audits, toAudit, BATCH_SIZE, BATCH_TIMEOUT, BATCH_TIMEUNIT); if (!toAudit.isEmpty()) { consumerService.createConsumerAudits(toAudit); } } catch (Throwable ex) { Tracer.logError(ex); } } }); }
private Set<EnumFacing> floodFill(int p_178604_1_) { Set<EnumFacing> set = EnumSet.<EnumFacing>noneOf(EnumFacing.class); Queue<Integer> queue = Queues.<Integer>newArrayDeque(); queue.add(IntegerCache.getInteger(p_178604_1_)); this.bitSet.set(p_178604_1_, true); while (!((Queue)queue).isEmpty()) { int i = ((Integer)queue.poll()).intValue(); this.addEdges(i, set); for (EnumFacing enumfacing : EnumFacing.values()) { int j = this.getNeighborIndexAtFace(i, enumfacing); if (j >= 0 && !this.bitSet.get(j)) { this.bitSet.set(j, true); queue.add(IntegerCache.getInteger(j)); } } } return set; }
private NutchServer() { configManager = new ConfManagerImpl(); BlockingQueue<Runnable> runnables = Queues.newArrayBlockingQueue(JOB_CAPACITY); NutchServerPoolExecutor executor = new NutchServerPoolExecutor(10, JOB_CAPACITY, 1, TimeUnit.HOURS, runnables); jobManager = new JobManagerImpl(new JobFactory(), configManager, executor); fetchNodeDb = FetchNodeDb.getInstance(); sf = new JAXRSServerFactoryBean(); BindingFactoryManager manager = sf.getBus().getExtension(BindingFactoryManager.class); JAXRSBindingFactory factory = new JAXRSBindingFactory(); factory.setBus(sf.getBus()); manager.registerBindingFactory(JAXRSBindingFactory.JAXRS_BINDING_ID, factory); sf.setResourceClasses(getClasses()); sf.setResourceProviders(getResourceProviders()); sf.setProvider(new JacksonJaxbJsonProvider()); }
@Override public void start() { if (this.encoder == null) { addError("No encoder set for the appender named ["+ name +"]."); return; } try { encoder.init(stream); } catch (IOException ignored) { } EvictingQueue<String> q = EvictingQueue.create(limit); logList = Queues.synchronizedQueue(q); isLoggingOn = true; super.start(); }
protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf, int receiverQueueSize, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) { super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0 , TimeUnit.MILLISECONDS)); this.maxReceiverQueueSize = receiverQueueSize; this.subscription = subscription; this.conf = conf; this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName(); this.subscribeFuture = subscribeFuture; this.listener = conf.getMessageListener(); if (receiverQueueSize <= 1) { this.incomingMessages = Queues.newArrayBlockingQueue(1); } else { this.incomingMessages = new GrowableArrayBlockingQueue<>(); } this.listenerExecutor = listenerExecutor; this.pendingReceives = Queues.newConcurrentLinkedQueue(); }
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, ManagedLedgerConfig config, ScheduledExecutorService scheduledExecutor, OrderedSafeExecutor orderedExecutor, final String name) { this.factory = factory; this.bookKeeper = bookKeeper; this.config = config; this.store = store; this.name = name; this.scheduledExecutor = scheduledExecutor; this.executor = orderedExecutor; TOTAL_SIZE_UPDATER.set(this, 0); NUMBER_OF_ENTRIES_UPDATER.set(this, 0); ENTRIES_ADDED_COUNTER_UPDATER.set(this, 0); STATE_UPDATER.set(this, State.None); this.ledgersStat = null; this.mbean = new ManagedLedgerMBeanImpl(this); this.entryCache = factory.getEntryCacheManager().getEntryCache(this); this.waitingCursors = Queues.newConcurrentLinkedQueue(); this.uninitializedCursors = Maps.newHashMap(); this.updateCursorRateLimit = RateLimiter.create(1); // Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0)); }
/** * Do a breadth-first search over the given collection of lists, applying the supplied function * to each item in the list. If the function returns an explicit true (not null or false) * the search will abort. * * @param toTraverse the lists to traverse (breadth-first) * @param toApply the function to apply (if it returns true the search will abort) * @return the number of lists visited (inclusive) */ public static int bfsLists(Collection<WFList> toTraverse, Function<WFList, Boolean> toApply) { if (toTraverse == null) return 0; int numVisited = 0; Queue<WFList> visitQueue = Queues.newArrayDeque(toTraverse); while (!visitQueue.isEmpty()) { numVisited++; WFList cur = visitQueue.remove(); Boolean abort = toApply.apply(cur); if (abort != null && abort.equals(true)) { break; } if (cur.getChildren() != null) { visitQueue.addAll(cur.getChildren()); } } return numVisited; }
/** * Route Between Nodes: Find whether there is a path between two nodes (A->B) in a directed graph. * * Assumptions: * * Time complexity: O(n) * Space complexity: O(n) * * Notes: Simple breadth first search. */ public static boolean pathExistsDirectional(IntNode a, IntNode b, IntGraph graph) { if (a == b) { return true; } Queue<IntNode> queue = Queues.newArrayDeque(); Set<IntNode> visited = Sets.newHashSet(); queue.add(a); visited.add(a); while (!queue.isEmpty()) { IntNode next = queue.remove(); for (Node<Integer> adjacent : next.getAdjacent()) { if (adjacent == b) { return true; } else if (visited.add((IntNode) adjacent)) { queue.add((IntNode) adjacent); } } } return false; }
/** * Route Between Nodes: Modified - Find whether there is a path between two nodes (A->B) in a bidirectional graph. * * Assumptions: * * Time complexity: O(n) where n is numer of nodes * Space complexity: O(n) */ public static boolean pathExistsBidirectional(IntNode a, IntNode b) { // BFS on both nodes at the same time Queue<IntNode> queueA = Queues.newArrayDeque(); Queue<IntNode> queueB = Queues.newArrayDeque(); Set<IntNode> visitedA = Sets.newHashSet(); Set<IntNode> visitedB = Sets.newHashSet(); visitedA.add(a); visitedB.add(b); queueA.add(a); queueB.add(b); while (!queueA.isEmpty() && !queueB.isEmpty()) { if (pathExistsBidirectionalHelper(queueA, visitedA, visitedB)) { return true; } if (pathExistsBidirectionalHelper(queueB, visitedB, visitedA)) { return true; } } return false; }
PubSubClient(String hostname, int port, int maxPendingMessages) throws IOException { this.hostname = hostname; this.port = port; this.maxPendingMessages = maxPendingMessages; if (maxPendingMessages <= 0) { this.pending = Queues.newLinkedBlockingDeque(); } else { this.pending = Queues.newLinkedBlockingDeque(maxPendingMessages); } this.selector = Selector.open(); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { close(); } }); }
/** * Adds all function names and signatures to passed jar, * adds all function names, their signatures and holders to {@link #functions}. * * @param jar jar where function to be added * @param newFunctions collection of function holders, each contains function name, signature and holder. */ private void addFunctions(Map<String, Queue<String>> jar, List<FunctionHolder> newFunctions) { for (FunctionHolder function : newFunctions) { final String functionName = function.getName(); Queue<String> jarFunctions = jar.get(functionName); if (jarFunctions == null) { jarFunctions = Queues.newConcurrentLinkedQueue();; jar.put(functionName, jarFunctions); } final String functionSignature = function.getSignature(); jarFunctions.add(functionSignature); Map<String, DrillFuncHolder> signatures = functions.get(functionName); if (signatures == null) { signatures = Maps.newConcurrentMap(); functions.put(functionName, signatures); } signatures.put(functionSignature, function.getHolder()); } }
/** * Returns {@link FieldPath} equivalent of the specified {@link SchemaPath}. * * @param schemaPath {@link SchemaPath} instance that should be converted * @return {@link FieldPath} equivalent of the specified {@link SchemaPath}. */ public static FieldPath schemaPathToFieldPath(SchemaPath schemaPath) { Deque<PathSegment> pathSegments = Queues.newArrayDeque(); PathSegment pathSegment = schemaPath.getRootSegment(); while (pathSegment != null) { pathSegments.push(pathSegment); pathSegment = pathSegment.getChild(); } FieldSegment child = null; while (!pathSegments.isEmpty()) { pathSegment = pathSegments.pop(); if (pathSegment.isNamed()) { child = new FieldSegment.NameSegment(((PathSegment.NameSegment) pathSegment).getPath(), child, false); } else { child = new FieldSegment.IndexSegment(String.valueOf(((PathSegment.ArraySegment) pathSegment).getIndex()), child); } } return new FieldPath((FieldSegment.NameSegment) child); }
private static Scene executeSoot(Collection<String> methodSignatures) { IoSinkExpressions sinkExpressions = new IoSinkExpressions(Scene.v()); Set<MethodAnalysis> processed = Sets.newHashSet(); Queue<MethodAnalysis> queue = Queues.newArrayDeque(methodSignatures.stream() .map(s -> Scene.v().getMethod(s)) .map(MethodAnalysis::new) .collect(Collectors.toSet())); PackManager.v().getPack("wjtp").add( new Transform("wjtp.dataFlowTransform", new SceneTransformer() { @Override protected void internalTransform(String phaseName, Map options) { processQueue(queue, processed, sinkExpressions); } })); PackManager.v().runPacks(); PackManager.v().writeOutput(); return Scene.v(); }
@Override @NotNull public PsiElement insertItemIntoListRemoveRedundantCommas( @NotNull final PyElement list, @Nullable final PyExpression afterThis, @NotNull final PyExpression toInsert) { // TODO: #insertItemIntoList is probably buggy. In such case, fix it and get rid of this method final PsiElement result = insertItemIntoList(list, afterThis, toInsert); final LeafPsiElement[] leafs = PsiTreeUtil.getChildrenOfType(list, LeafPsiElement.class); if (leafs != null) { final Deque<LeafPsiElement> commas = Queues.newArrayDeque(Collections2.filter(Arrays.asList(leafs), COMMAS_ONLY)); if (!commas.isEmpty()) { final LeafPsiElement lastComma = commas.getLast(); if (PsiTreeUtil.getNextSiblingOfType(lastComma, PyExpression.class) == null) { //Comma has no expression after it lastComma.delete(); } } } return result; }
public static ImmutableCollection<TargetKey> getTransitiveDependencies( Collection<TargetKey> targetKeys, TargetMap targetMap) { Queue<TargetKey> targetsToVisit = Queues.newArrayDeque(); Set<TargetKey> transitiveDependencies = Sets.newHashSet(); targetsToVisit.addAll(targetKeys); while (!targetsToVisit.isEmpty()) { TargetIdeInfo currentTarget = targetMap.get(targetsToVisit.remove()); if (currentTarget == null) { continue; } List<TargetKey> newDependencies = currentTarget .dependencies .stream() .map(d -> TargetKey.forPlainTarget(d.targetKey.label)) // Get rid of the ones we've already seen. .filter(r -> !transitiveDependencies.contains(r)) .collect(Collectors.toList()); targetsToVisit.addAll(newDependencies); transitiveDependencies.addAll(newDependencies); } return ImmutableSet.copyOf(transitiveDependencies); }
/** * Finds transitive closure of all files in the given file sets (traversing child filesets * transitively). */ private static ImmutableList<File> traverseFileSetsTransitively( Map<String, BuildEventStreamProtos.NamedSetOfFiles> fileSets, Set<String> fileSetsToVisit, Predicate<String> fileFilter) { Queue<String> toVisit = Queues.newArrayDeque(); Set<File> allFiles = new HashSet<>(); Set<String> visited = new HashSet<>(); toVisit.addAll(fileSetsToVisit); visited.addAll(fileSetsToVisit); while (!toVisit.isEmpty()) { String name = toVisit.remove(); BuildEventStreamProtos.NamedSetOfFiles fs = fileSets.get(name); allFiles.addAll( fs.getFilesList().stream().map(f -> parseFile(f, fileFilter)).collect(toImmutableList())); Set<String> children = fs.getFileSetsList() .stream() .map(NamedSetOfFilesId::getId) .filter(s -> !visited.contains(s)) .collect(toImmutableSet()); visited.addAll(children); toVisit.addAll(children); } return ImmutableList.copyOf(allFiles); }
private Collection<TargetIdeInfo> targetsForSourceFileImpl( ImmutableMultimap<TargetKey, TargetKey> rdepsMap, File sourceFile) { List<TargetIdeInfo> result = Lists.newArrayList(); Collection<TargetKey> roots = rootsMap.get(sourceFile); Queue<TargetKey> todo = Queues.newArrayDeque(); todo.addAll(roots); Set<TargetKey> seen = Sets.newHashSet(); while (!todo.isEmpty()) { TargetKey targetKey = todo.remove(); if (!seen.add(targetKey)) { continue; } TargetIdeInfo target = targetMap.get(targetKey); if (filter.test(target)) { result.add(target); } todo.addAll(rdepsMap.get(targetKey)); } return result; }
@Test public void log_errors() { final CommandEnvelope commandEnvelope = givenCommandEnvelope(); // Since we're in the tests mode `Environment` returns `SubstituteLogger` instance. final SubstituteLogger log = (SubstituteLogger) handler.log(); // Restrict the queue size only to the number of calls we want to make. final Queue<SubstituteLoggingEvent> queue = Queues.newArrayBlockingQueue(1); log.setDelegate(new EventRecodingLogger(log, queue)); SubstituteLoggingEvent loggingEvent; final RuntimeException exception = new RuntimeException("log_errors"); handler.onError(commandEnvelope, exception); loggingEvent = queue.poll(); assertEquals(Level.ERROR, loggingEvent.getLevel()); assertEquals(commandEnvelope, handler.getLastErrorEnvelope()); assertEquals(exception, handler.getLastException()); }
private boolean willRewrite(Production production) { // fill worker queue Deque<Production> openProduction = Queues.newArrayDeque(); openProduction.offer(production); // till no production to consider ... while (openProduction.isEmpty() == false) { Production currentProduction = openProduction.poll(); if (currentProduction instanceof RewriteProduction) { // we're not done, for sure, as we found a rewrite production return true; } else if (currentProduction instanceof BranchProduction<?>) { BranchProduction<?> branchProduction = (BranchProduction<?>) currentProduction; List<Production> branchInnerProductions = branchProduction.getRuleProductions(-1); for (Production branchInnerProduction : branchInnerProductions) { openProduction.offer(branchInnerProduction); } } } return false; }
@Test(dependsOnMethods = "testSerializeToSequenceFile") public void testDeserializeFromSequenceFile() throws IOException { Queue<WorkUnitState> workUnitStates = Queues.newConcurrentLinkedQueue(); Closer closer = Closer.create(); try { ParallelRunner parallelRunner = closer.register(new ParallelRunner(2, this.fs)); parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, new Path(this.outputPath, "seq1"), workUnitStates); parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, new Path(this.outputPath, "seq2"), workUnitStates); } catch (Throwable t) { throw closer.rethrow(t); } finally { closer.close(); } Assert.assertEquals(workUnitStates.size(), 2); for (WorkUnitState workUnitState : workUnitStates) { TestWatermark watermark = new Gson().fromJson(workUnitState.getActualHighWatermark(), TestWatermark.class); Assert.assertTrue(watermark.getLongWatermark() == 10L || watermark.getLongWatermark() == 100L); } }
public EventReporter(Builder builder) { super(builder.context, builder.name, builder.filter, builder.rateUnit, builder.durationUnit); this.closer = Closer.create(); this.immediateReportExecutor = MoreExecutors. getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(1, ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("EventReporter-" + builder.name + "-%d"))), 5, TimeUnit.MINUTES); this.metricContext = builder.context; this.notificationTargetKey = builder.context.addNotificationTarget(new Function<Notification, Void>() { @Nullable @Override public Void apply(Notification notification) { notificationCallback(notification); return null; } }); this.reportingQueue = Queues.newLinkedBlockingQueue(QUEUE_CAPACITY); }
protected void clear(Commit<Void> commit) { Queue<DocumentPath> toClearQueue = Queues.newArrayDeque(); Map<String, Versioned<byte[]>> topLevelChildren = docTree.getChildren(DocumentPath.from("root")); toClearQueue.addAll(topLevelChildren.keySet() .stream() .map(name -> new DocumentPath(name, DocumentPath.from("root"))) .collect(Collectors.toList())); while (!toClearQueue.isEmpty()) { DocumentPath path = toClearQueue.remove(); Map<String, Versioned<byte[]>> children = docTree.getChildren(path); if (children.size() == 0) { docTree.removeNode(path); } else { children.keySet().forEach(name -> toClearQueue.add(new DocumentPath(name, path))); toClearQueue.add(path); } } }
@Nonnull Iterator<NodeDocument> getAllPreviousDocs() { if (getPreviousRanges().isEmpty()) { return Iterators.emptyIterator(); } //Currently this method would fire one query per previous doc //If that poses a problem we can try to find all prev doc by relying //on property that all prevDoc id would starts <depth+2>:p/path/to/node return new AbstractIterator<NodeDocument>(){ private Queue<Map.Entry<Revision, Range>> previousRanges = Queues.newArrayDeque(getPreviousRanges().entrySet()); @Override protected NodeDocument computeNext() { if(!previousRanges.isEmpty()){ Map.Entry<Revision, Range> e = previousRanges.remove(); NodeDocument prev = getPreviousDoc(e.getKey(), e.getValue()); if(prev != null){ previousRanges.addAll(prev.getPreviousRanges().entrySet()); return prev; } } return endOfData(); } }; }
@Test public void testForwardingOfRequests() throws Exception { Queue<RequestAndCallback> queue = Queues.newArrayDeque(); BatchedPermitsRequester container = BatchedPermitsRequester.builder().resourceId("resource") .requestorIdentifier("requestor").requestSender(new TestRequestSender(queue, false)).build(); try (ParallelRequester requester = new ParallelRequester(container)) { Future<Boolean> future = requester.request(10); await(new QueueSize(queue, 1), 1000); Assert.assertEquals(queue.size(), 1); satisfyRequestBuilder().requestAndCallback(queue.poll()).satisfy(); future.get(1, TimeUnit.SECONDS); Assert.assertTrue(future.isDone()); Assert.assertTrue(future.get()); } }
@Test public void testRetriableFail() throws Exception { Queue<RequestAndCallback> queue = Queues.newArrayDeque(); BatchedPermitsRequester container = BatchedPermitsRequester.builder().resourceId("resource") .requestorIdentifier("requestor").requestSender(new TestRequestSender(queue, false)).build(); try (ParallelRequester requester = new ParallelRequester(container)) { Future<Boolean> future = requester.request(10); for (int i = 0; i < BatchedPermitsRequester.MAX_RETRIES; i++) { // container will fail 5 times await(new QueueSize(queue, 1), 1000); Assert.assertFalse(future.isDone()); failRequestBuilder().requestAndCallback(queue.poll()).fail(); } // should return a failure Assert.assertFalse(future.get()); // should not make any more request Assert.assertEquals(queue.size(), 0); } }
@Test public void testNonRetriableFail() throws Exception { Queue<RequestAndCallback> queue = Queues.newArrayDeque(); BatchedPermitsRequester container = BatchedPermitsRequester.builder().resourceId("resource") .requestorIdentifier("requestor").requestSender(new TestRequestSender(queue, false)).build(); try (ParallelRequester requester = new ParallelRequester(container)) { Future<Boolean> future = requester.request(10); // container should only try request once await(new QueueSize(queue, 1), 1000); Assert.assertFalse(future.isDone()); failRequestBuilder().requestAndCallback(queue.poll()).errorStatus(HttpStatus.S_422_UNPROCESSABLE_ENTITY).fail(); Assert.assertFalse(future.get()); Assert.assertEquals(queue.size(), 0); } }
@Test(dependsOnMethods = "testSerializeToSequenceFile") public void testDeserializeFromSequenceFile() throws IOException { Queue<WorkUnitState> workUnitStates = Queues.newConcurrentLinkedQueue(); Path seqPath1 = new Path(this.outputPath, "seq1"); Path seqPath2 = new Path(this.outputPath, "seq2"); try (ParallelRunner parallelRunner = new ParallelRunner(2, this.fs)) { parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, seqPath1, workUnitStates, true); parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, seqPath2, workUnitStates, true); } Assert.assertFalse(this.fs.exists(seqPath1)); Assert.assertFalse(this.fs.exists(seqPath2)); Assert.assertEquals(workUnitStates.size(), 2); for (WorkUnitState workUnitState : workUnitStates) { TestWatermark watermark = new Gson().fromJson(workUnitState.getActualHighWatermark(), TestWatermark.class); Assert.assertTrue(watermark.getLongWatermark() == 10L || watermark.getLongWatermark() == 100L); } }
public EventReporter(Builder builder) { super(builder.context, builder.name, builder.filter, builder.rateUnit, builder.durationUnit); this.closer = Closer.create(); this.immediateReportExecutor = MoreExecutors.getExitingExecutorService( (ThreadPoolExecutor) Executors.newFixedThreadPool(1, ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("EventReporter-" + builder.name + "-%d"))), 5, TimeUnit.MINUTES); this.metricContext = builder.context; this.notificationTargetKey = builder.context.addNotificationTarget(new Function<Notification, Void>() { @Nullable @Override public Void apply(Notification notification) { notificationCallback(notification); return null; } }); this.reportingQueue = Queues.newLinkedBlockingQueue(QUEUE_CAPACITY); }
SelectDispatcher(SelectConfig config) { super(config); m_config = config; CassandraSession session = new CassandraSessionImpl( config.getCassandraKeyspace(), config.getCassandraHost(), config.getCassandraPort(), config.getCassandraCompression(), config.getCassandraUsername(), config.getCassandraPassword(), config.getCassandraSsl()); m_repository = new CassandraSampleRepository( session, Config.CASSANDRA_TTL, new MetricRegistry(), new DefaultSampleProcessorService(1), new ContextConfigurations()); m_queryQueue = Queues.newArrayBlockingQueue(config.getThreads() * 10); }