public void combineShortSessionsInParallel(int timeThres) throws InterruptedException, IOException { JavaRDD<String> userRDD = getUserRDD(this.cleanupType); userRDD.foreachPartition(new VoidFunction<Iterator<String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Iterator<String> arg0) throws Exception { ESDriver tmpES = new ESDriver(props); tmpES.createBulkProcessor(); while (arg0.hasNext()) { String s = arg0.next(); combineShortSessions(tmpES, s, timeThres); } tmpES.destroyBulkProcessor(); tmpES.close(); } }); }
public static <X> VoidFunction<JavaPairRDD<X, JsonObject>> outputTo(String table, String schema) throws IOException { Configuration conf = new Configuration(); conf.set("mapreduce.job.outputformat.class", BigQueryOutputFormat.class.getName()); BigQueryConfiguration.configureBigQueryOutput(conf, table, schema); return rdd -> { if (rdd.count() > 0L) { long time = System.currentTimeMillis(); /* This was only required the first time on a fresh table, it seems I had to kickstart the _PARTITIONTIME pseudo-column * but now it automatically add to the proper table using ingestion time. Using the decorator would only be required * if we were to place the entries using their "event timestamp", e.g. loading rows on old partitions. * Implementing that would be much harder though, since'd have to check each message, or each "partition" (date-based) if (partitioned) { String today = ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern("yyyyMMdd")); BigQueryConfiguration.configureBigQueryOutput(conf, table + "$" + today, schema); }*/ rdd.saveAsNewAPIHadoopDataset(conf); System.out.printf("Sent %d rows to BQ in %.1fs\n", rdd.count(), (System.currentTimeMillis() - time) / 1000f); } }; }
/** * @param stream, the Spark Stream to publish to NATS * @param dataEncoder, the function used to encode the Spark Stream Records into the NATS Message Payloads */ public <V extends Object> void publishToNats(final JavaDStream<V> stream, final Function<V, byte[]> dataEncoder) { logger.trace("publishToNats(JavaDStream<String> stream)"); stream.foreachRDD((VoidFunction<JavaRDD<V>>) rdd -> { logger.trace("stream.foreachRDD"); rdd.foreachPartitionAsync(objects -> { logger.trace("rdd.foreachPartition"); final SparkToNatsConnector<?> connector = getConnector(); while(objects.hasNext()) { final V obj = objects.next(); logger.trace("Will publish {}", obj); connector.publishToNats(dataEncoder.apply(obj)); } returnConnector(connector); // return to the pool for future reuse }); }); }
/** * @param stream, the Spark Stream (composed of Key/Value Records) to publish to NATS * @param dataEncoder, the function used to encode the Spark Stream Records into the NATS Message Payloads */ public <K extends Object, V extends Object> void publishToNatsAsKeyValue(final JavaPairDStream<K, V> stream, final Function<V, byte[]> dataEncoder) { logger.trace("publishToNats(JavaPairDStream<String, String> stream)"); setStoredAsKeyValue(true); stream.foreachRDD((VoidFunction<JavaPairRDD<K, V>>) rdd -> { logger.trace("stream.foreachRDD"); rdd.foreachPartitionAsync((VoidFunction<Iterator<Tuple2<K,V>>>) tuples -> { logger.trace("rdd.foreachPartition"); final SparkToNatsConnector<?> connector = getConnector(); while(tuples.hasNext()) { final Tuple2<K,V> tuple = tuples.next(); logger.trace("Will publish {}", tuple); connector.publishToNats(tuple._1.toString(), dataEncoder.apply(tuple._2)); } returnConnector(connector); // return to the pool for future reuse }); }); }
public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("VideoInput").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(conf); Configuration hc = new org.apache.hadoop.conf.Configuration(); JavaPairRDD<Text, HBMat> video = sc.newAPIHadoopFile("data/bike.avi", VideoInputFormat.class, Text.class, HBMat.class,hc); video.foreach(new VoidFunction<Tuple2<Text,HBMat>>() { @Override public void call(Tuple2<Text, HBMat> tuple) throws Exception { HBMat image = (HBMat)tuple._2; System.out.print(image.getBmat().dump()); } }); System.out.print(video.count()); }
/** * Save raster image as local file. * * @param distributedImage the distributed image * @param outputPath the output path * @param imageType the image type * @param zoomLevel the zoom level * @param partitionOnX the partition on X * @param partitionOnY the partition on Y * @return true, if successful * @throws Exception the exception */ public boolean SaveRasterImageAsLocalFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String outputPath, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) throws Exception { logger.info("[GeoSparkViz][SaveRasterImageAsLocalFile][Start]"); for(int i=0;i<partitionOnX*partitionOnY;i++) { deleteLocalFile(outputPath+"-"+ RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i),imageType); } distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() { @Override public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception { SaveRasterImageAsLocalFile(integerImageSerializableWrapperTuple2._2.getImage(), outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType); } }); logger.info("[GeoSparkViz][SaveRasterImageAsLocalFile][Stop]"); return true; }
/** * Save raster image as hadoop file. * * @param distributedImage the distributed image * @param outputPath the output path * @param imageType the image type * @param zoomLevel the zoom level * @param partitionOnX the partition on X * @param partitionOnY the partition on Y * @return true, if successful * @throws Exception the exception */ public boolean SaveRasterImageAsHadoopFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String outputPath, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) throws Exception { logger.info("[GeoSparkViz][SaveRasterImageAsHadoopFile][Start]"); for(int i=0;i<partitionOnX*partitionOnY;i++) { deleteHadoopFile(outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i)+".", imageType); } distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() { @Override public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception { SaveRasterImageAsHadoopFile(integerImageSerializableWrapperTuple2._2.getImage(), outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType); } }); logger.info("[GeoSparkViz][SaveRasterImageAsHadoopFile][Stop]"); return true; }
/** * Save raster image as S 3 file. * * @param distributedImage the distributed image * @param regionName the region name * @param accessKey the access key * @param secretKey the secret key * @param bucketName the bucket name * @param path the path * @param imageType the image type * @param zoomLevel the zoom level * @param partitionOnX the partition on X * @param partitionOnY the partition on Y * @return true, if successful */ public boolean SaveRasterImageAsS3File(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String regionName, final String accessKey, final String secretKey, final String bucketName, final String path, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) { logger.info("[GeoSparkViz][SaveRasterImageAsS3File][Start]"); S3Operator s3Operator = new S3Operator(regionName, accessKey, secretKey); for(int i=0;i<partitionOnX*partitionOnY;i++) { s3Operator.deleteImage(bucketName, path+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i)+"."+imageType.getTypeName()); } distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() { @Override public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception { SaveRasterImageAsS3File(integerImageSerializableWrapperTuple2._2.getImage(), regionName, accessKey, secretKey, bucketName, path+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType); } }); logger.info("[GeoSparkViz][SaveRasterImageAsS3File][Stop]"); return true; }
/** * Save raster image as local file. * * @param distributedImage the distributed image * @param outputPath the output path * @param imageType the image type * @param zoomLevel the zoom level * @param partitionOnX the partition on X * @param partitionOnY the partition on Y * @return true, if successful * @throws Exception the exception */ public boolean SaveRasterImageAsLocalFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String outputPath, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) throws Exception { logger.info("[GeoSparkViz][SaveRasterImageAsLocalFile][Start]"); for(int i=0;i<partitionOnX*partitionOnY;i++) { deleteLocalFile(outputPath+"-"+ RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i),imageType); } distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() { @Override public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception { SaveRasterImageAsLocalFile(integerImageSerializableWrapperTuple2._2.image, outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType); } }); logger.info("[GeoSparkViz][SaveRasterImageAsLocalFile][Stop]"); return true; }
/** * Save raster image as hadoop file. * * @param distributedImage the distributed image * @param outputPath the output path * @param imageType the image type * @param zoomLevel the zoom level * @param partitionOnX the partition on X * @param partitionOnY the partition on Y * @return true, if successful * @throws Exception the exception */ public boolean SaveRasterImageAsHadoopFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String outputPath, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) throws Exception { logger.info("[GeoSparkViz][SaveRasterImageAsHadoopFile][Start]"); for(int i=0;i<partitionOnX*partitionOnY;i++) { deleteHadoopFile(outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i)+".", imageType); } distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() { @Override public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception { SaveRasterImageAsHadoopFile(integerImageSerializableWrapperTuple2._2.image, outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType); } }); logger.info("[GeoSparkViz][SaveRasterImageAsHadoopFile][Stop]"); return true; }
/** * Save raster image as S 3 file. * * @param distributedImage the distributed image * @param regionName the region name * @param accessKey the access key * @param secretKey the secret key * @param bucketName the bucket name * @param path the path * @param imageType the image type * @param zoomLevel the zoom level * @param partitionOnX the partition on X * @param partitionOnY the partition on Y * @return true, if successful */ public boolean SaveRasterImageAsS3File(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String regionName, final String accessKey, final String secretKey, final String bucketName, final String path, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) { logger.info("[GeoSparkViz][SaveRasterImageAsS3File][Start]"); S3Operator s3Operator = new S3Operator(regionName, accessKey, secretKey); for(int i=0;i<partitionOnX*partitionOnY;i++) { s3Operator.deleteImage(bucketName, path+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i)+"."+imageType.getTypeName()); } distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() { @Override public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception { SaveRasterImageAsS3File(integerImageSerializableWrapperTuple2._2.image, regionName, accessKey, secretKey, bucketName, path+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType); } }); logger.info("[GeoSparkViz][SaveRasterImageAsS3File][Stop]"); return true; }
public void publishTriples(JavaRDD<String> datasetRDD) throws IOException { logger.debug("Initiating publication of triples on the queue..."); datasetRDD.foreach(new VoidFunction<String>() { private static final long serialVersionUID = 7603190977649586962L; @Override public void call(String stmt) throws Exception { // publish triple (statement) into the exchange if(stmt != null) { if(channel == null) { logger.warn("Channel was found to be null attempting to publish, reconnecting..."); connect(); } channel.basicPublish(EXCHANGE_NAME, "", null, stmt.getBytes()); } } }); logger.debug("All triples published on the queue. Processing metrics..."); }
public static <A extends JavaRDDLike<?, ?>> VoidFunction<A> create(JavaStreamingContext jsc, long amount, String printf) { final LongAccumulator stopAcc = jsc.ssc().sc().longAccumulator(); return rdd -> { if (printf != null) System.out.printf(printf, rdd.count()); if (rdd.count() == 0L) { stopAcc.add(1L); if (stopAcc.value() >= amount) jsc.stop(); } else stopAcc.reset(); }; }
public static <T> VoidFunction<T> noOp() { return new VoidFunction<T>() { @Override public void call(T t) { // do nothing } }; }
public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("StreamMultiTopic"); JavaSparkContext sc = new JavaSparkContext(sparkConf); CuratorFramework curator = OffsetManager.createCurator("127.0.0.1:2181"); KafkaConsumerPoolFactory<String,String> poolFactory = new KafkaConsumerPoolFactory<>("127.0.0.1:9092", StringDecoder.class, StringDecoder.class); ControllerKafkaTopics<String,String> topics = new ControllerKafkaTopics<>(sc.sc(), curator, poolFactory); topics.registerTopic("test_multi", "test"); topics.registerTopic("test_multi", "test2"); new StreamProcessor<String,String>(topics) { @Override public final void process() { JavaRDD<Tuple2<String,String>> rdd = fetch().toJavaRDD(); rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,String>>>() { @Override public final void call(final Iterator<Tuple2<String,String>> it) { while (it.hasNext()) { Tuple2<String,String> e = it.next(); LOG.info("key=" + e._1 + " message=" + e._2()); } } }); commit(); } }.run(); sc.sc().stop(); }
public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("StreamSingleTopic"); JavaSparkContext sc = new JavaSparkContext(sparkConf); CuratorFramework curator = OffsetManager.createCurator("127.0.0.1:2181"); KafkaConsumerPoolFactory<String,String> poolFactory = new KafkaConsumerPoolFactory<>("127.0.0.1:9092", StringDecoder.class, StringDecoder.class); ControllerKafkaTopics<String,String> topics = new ControllerKafkaTopics<>(sc.sc(), curator, poolFactory); ControllerKafkaTopic<String,String> topic = topics.registerTopic("test_group", "test"); new StreamProcessor<String,String>(topic) { @Override public final void process() { JavaRDD<Tuple2<String,String>> rdd = fetch().toJavaRDD(); rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,String>>>() { @Override public final void call(final Iterator<Tuple2<String,String>> it) { while (it.hasNext()) { Tuple2<String,String> e = it.next(); LOG.info("key=" + e._1 + " message=" + e._2()); } } }); commit(); } }.run(); sc.sc().stop(); }
private void encryptedColumnCalc(JavaPairDStream<Long,BigInteger> encRowRDD) { // Multiply the column values by colNum: emit <colNum, finalColVal> JavaPairDStream<Long,BigInteger> encColRDD; if (colMultReduceByKey) { encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(bVars), numColMultPartitions); } else { encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(bVars)); } // Update the output name, by batch number bVars.setOutput(outputFile + "_" + accum.numBatchesGetValue()); // Form and write the response object encColRDD.repartition(1).foreachRDD((VoidFunction<JavaPairRDD<Long,BigInteger>>) rdd -> { rdd.foreachPartition(new FinalResponseFunction(accum, bVars)); int maxBatchesVar = bVars.getMaxBatches(); if (maxBatchesVar != -1 && accum.numBatchesGetValue() == maxBatchesVar) { logger.info("num batches = maxBatches = " + maxBatchesVar + "; shutting down"); System.exit(0); } }); }
public static <T> VoidFunction<T> emptyVoidFunction() { return new VoidFunction<T>() { @Override public void call(T t) throws Exception { // Empty implementation. } }; }
@Override public void action() { // Force computation of DStream. dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() { @Override public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception { rdd.foreach(TranslationUtils.<WindowedValue<T>>emptyVoidFunction()); } }); }
protected void validateTheReceptionOfMessages(JavaStreamingContext ssc, JavaReceiverInputDStream<String> stream) throws InterruptedException { JavaDStream<String> messages = stream.repartition(3); ExecutorService executor = Executors.newFixedThreadPool(6); final int nbOfMessages = 5; NatsPublisher np = getNatsPublisher(nbOfMessages); if (logger.isDebugEnabled()) { messages.print(); } messages.foreachRDD(new VoidFunction<JavaRDD<String>>() { private static final long serialVersionUID = 1L; @Override public void call(JavaRDD<String> rdd) throws Exception { logger.debug("RDD received: {}", rdd.collect()); final long count = rdd.count(); if ((count != 0) && (count != nbOfMessages)) { rightNumber = false; logger.error("The number of messages received should have been {} instead of {}.", nbOfMessages, count); } TOTAL_COUNT.getAndAdd((int) count); atLeastSomeData = atLeastSomeData || (count > 0); for (String str :rdd.collect()) { if (! str.startsWith(NatsPublisher.NATS_PAYLOAD)) { payload = str; } } } }); closeTheValidation(ssc, executor, nbOfMessages, np); }
protected void validateTheReceptionOfIntegerMessages(JavaStreamingContext ssc, JavaReceiverInputDStream<Integer> stream) throws InterruptedException { JavaDStream<Integer> messages = stream.repartition(3); ExecutorService executor = Executors.newFixedThreadPool(6); final int nbOfMessages = 5; NatsPublisher np = getNatsPublisher(nbOfMessages); if (logger.isDebugEnabled()) { messages.print(); } messages.foreachRDD(new VoidFunction<JavaRDD<Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(JavaRDD<Integer> rdd) throws Exception { logger.debug("RDD received: {}", rdd.collect()); final long count = rdd.count(); if ((count != 0) && (count != nbOfMessages)) { rightNumber = false; logger.error("The number of messages received should have been {} instead of {}.", nbOfMessages, count); } TOTAL_COUNT.getAndAdd((int) count); atLeastSomeData = atLeastSomeData || (count > 0); for (Integer value :rdd.collect()) { if (value < NatsPublisher.NATS_PAYLOAD_INT) { payload = value.toString(); } } } }); closeTheValidation(ssc, executor, nbOfMessages, np); }
protected void validateTheReceptionOfMessages(final JavaStreamingContext ssc, final JavaPairDStream<String, String> messages) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(6); final int nbOfMessages = 5; NatsPublisher np = getNatsPublisher(nbOfMessages); if (logger.isDebugEnabled()) { messages.print(); } JavaPairDStream<String, Integer> pairs = messages.mapToPair(s -> new Tuple2(s._1, 1)); JavaPairDStream<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b); counts.print(); counts.foreachRDD((VoidFunction<JavaPairRDD<String, Integer>>) pairRDD -> { pairRDD.foreach((VoidFunction<Tuple2<String, Integer>>) tuple -> { final long count = tuple._2; if ((count != 0) && (count != nbOfMessages)) { rightNumber = false; logger.error("The number of messages received should have been {} instead of {}.", nbOfMessages, count); } TOTAL_COUNT.getAndAdd((int) count); atLeastSomeData = atLeastSomeData || (count > 0); }); }); closeTheValidation(ssc, executor, nbOfMessages, np); }
@SuppressWarnings("deprecation") public static void persists(JavaPairDStream<Integer, Iterable<Long>> partitonOffset, Properties props) { partitonOffset.foreachRDD(new VoidFunction<JavaPairRDD<Integer,Iterable<Long>>>() { @Override public void call(JavaPairRDD<Integer, Iterable<Long>> po) throws Exception { List<Tuple2<Integer, Iterable<Long>>> poList = po.collect(); doPersists(poList, props); } }); }
@SuppressWarnings("deprecation") public static void persists(DStream<Tuple2<Integer, Iterable<Long>>> partitonOffset, Properties props) { ClassTag<Tuple2<Integer, Iterable<Long>>> tuple2ClassTag = ScalaUtil.<Integer, Iterable<Long>>getTuple2ClassTag(); JavaDStream<Tuple2<Integer, Iterable<Long>>> jpartitonOffset = new JavaDStream<Tuple2<Integer, Iterable<Long>>>(partitonOffset, tuple2ClassTag); jpartitonOffset.foreachRDD(new VoidFunction<JavaRDD<Tuple2<Integer, Iterable<Long>>>>() { @Override public void call(JavaRDD<Tuple2<Integer, Iterable<Long>>> po) throws Exception { List<Tuple2<Integer, Iterable<Long>>> poList = po.collect(); doPersists(poList, props); } }); }
@Override public void call(JavaRDD<DataSet> dataSetJavaRDD) throws Exception { dataSetJavaRDD.foreach(new VoidFunction<DataSet>() { @Override public void call(DataSet dataSet) throws Exception { System.out.println(dataSet); } }); }
@Override public Void call(JavaRDD<DataSet> dataSetJavaRDD) throws Exception { dataSetJavaRDD.foreach(new VoidFunction<DataSet>() { @Override public void call(DataSet dataSet) throws Exception { System.out.println(dataSet); } }); return null; }
public static void main(String[] args) throws Exception { final String dbUrl = args[0]; final String hostname = args[1]; final String port = args[2]; final String inTargetSchema = args[3]; final String inTargetTable = args[4]; SparkConf conf = new SparkConf(); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(500)); SpliceSpark.setContext(ssc.sparkContext()); SparkSession spark = SpliceSpark.getSessionUnsafe(); JavaReceiverInputDStream<String> stream = ssc.socketTextStream(hostname, Integer.parseInt(port)); // Create a SplicemachineContext based on the provided DB connection SplicemachineContext splicemachineContext = new SplicemachineContext(dbUrl); // Set target tablename and schemaname final String table = inTargetSchema + "." + inTargetTable; stream.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> { JavaRDD<Row> rowRDD = rdd.map((Function<String, Row>) s -> RowFactory.create(s)); Dataset<Row> df = spark.createDataFrame(rowRDD, splicemachineContext.getSchema(table)); splicemachineContext.insert(df, table); }); ssc.start(); ssc.awaitTermination(); }
/** * @param args */ public static void main(String[] args) { //C:\Users\sumit.kumar\Downloads\bin\warehouse //System.setProperty("hadoop.home.dir", "C:\\Users\\sumit.kumar\\Downloads"); String logFile = "src/main/resources/Apology_by_Plato.txt"; // Should be some file on your system Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.WARN); SparkConf conf = new SparkConf().setMaster("local").setAppName("ActionExamples").set("spark.hadoop.validateOutputSpecs", "false"); JavaSparkContext sparkContext = new JavaSparkContext(conf); JavaRDD<Integer> rdd = sparkContext.parallelize(Arrays.asList(1, 2, 3,4,5),3).cache(); JavaRDD<Integer> evenRDD= rdd.filter(new org.apache.spark.api.java.function.Function<Integer, Boolean>() { @Override public Boolean call(Integer v1) throws Exception { return ((v1%2)==0)?true:false; } }); evenRDD.persist(StorageLevel.MEMORY_AND_DISK()); evenRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer t) throws Exception { System.out.println("The value of RDD are :"+t); } }); //unpersisting the RDD evenRDD.unpersist(); rdd.unpersist(); /* JavaRDD<String> lines = spark.read().textFile(logFile).javaRDD().cache(); System.out.println("DEBUG: \n"+ lines.toDebugString()); long word= lines.count(); JavaRDD<String> distinctLines=lines.distinct(); System.out.println("DEBUG: \n"+ distinctLines.toDebugString()); JavaRDD<String> finalRdd=lines.subtract(distinctLines); System.out.println("DEBUG: \n"+ finalRdd.toDebugString()); System.out.println("The count is "+word); System.out.println("The count is "+distinctLines.count()); System.out.println("The count is "+finalRdd.count()); finalRdd.foreach(new VoidFunction<String>() { @Override public void call(String t) throws Exception { // TODO Auto-generated method stub System.out.println(t); } }); */ /*SparkConf conf = new SparkConf().setAppName("Simple Application"); JavaSparkContext sc = new JavaSparkContext(conf); StorageLevel newLevel; JavaRDD<String> logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new Function(logFile, logFile, logFile, logFile, false) { public Boolean call(String s) { return s.contains("a"); } }).count(); long numBs = logData.filter(new Function(logFile, logFile, logFile, logFile, false) { public Boolean call(String s) { return s.contains("b"); } }).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); sc.stop();*/ }
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD, AthenaMLFeatureConfiguration athenaMLFeatureConfiguration, GaussianMixtureDetectionModel gaussianMixtureDetectionModel, GaussianMixtureValidationSummary gaussianMixtureValidationSummary) { List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures(); Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight(); GaussianMixtureModel gaussianMixtureModel = (GaussianMixtureModel) gaussianMixtureDetectionModel.getDetectionModel(); int numberOfTargetValue = listOfTargetFeatures.size(); Normalizer normalizer = new Normalizer(); mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() { public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException { long start2 = System.nanoTime(); // <-- start BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE); BSONObject idx = (BSONObject) t._2(); Vector normedForVal; double[] values = new double[numberOfTargetValue]; for (int j = 0; j < numberOfTargetValue; j++) { values[j] = 0; if (feature.containsField(listOfTargetFeatures.get(j).getValue())) { Object obj = feature.get(listOfTargetFeatures.get(j).getValue()); if (obj instanceof Long) { values[j] = (Long) obj; } else if (obj instanceof Double) { values[j] = (Double) obj; } else if (obj instanceof Boolean) { values[j] = (Boolean) obj ? 1 : 0; } else { return; } //check weight if (weight.containsKey(listOfTargetFeatures.get(j))) { values[j] *= weight.get(listOfTargetFeatures.get(j)); } //check absolute if (athenaMLFeatureConfiguration.isAbsolute()){ values[j] = Math.abs(values[j]); } } } if (athenaMLFeatureConfiguration.isNormalization()) { normedForVal = normalizer.transform(Vectors.dense(values)); } else { normedForVal = Vectors.dense(values); } int detectIdx = gaussianMixtureModel.predict(normedForVal); gaussianMixtureValidationSummary.updateSummary(detectIdx, idx, feature); long end2 = System.nanoTime(); long result2 = end2 - start2; gaussianMixtureValidationSummary.addTotalNanoSeconds(result2); } }); gaussianMixtureValidationSummary.calculateDetectionRate(); gaussianMixtureValidationSummary.getAverageNanoSeconds(); gaussianMixtureValidationSummary.setGaussianMixtureDetectionAlgorithm( (GaussianMixtureDetectionAlgorithm)gaussianMixtureDetectionModel.getDetectionAlgorithm()); }
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD, AthenaMLFeatureConfiguration athenaMLFeatureConfiguration, KMeansDetectionModel kMeansDetectionModel, KmeansValidationSummary kmeansValidationSummary) { List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures(); Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight(); KMeansModel cluster = (KMeansModel) kMeansDetectionModel.getDetectionModel(); int numberOfTargetValue = listOfTargetFeatures.size(); Normalizer normalizer = new Normalizer(); mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() { public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException { long start2 = System.nanoTime(); // <-- start BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE); BSONObject idx = (BSONObject) t._2(); Vector normedForVal; double[] values = new double[numberOfTargetValue]; for (int j = 0; j < numberOfTargetValue; j++) { values[j] = 0; if (feature.containsField(listOfTargetFeatures.get(j).getValue())) { Object obj = feature.get(listOfTargetFeatures.get(j).getValue()); if (obj instanceof Long) { values[j] = (Long) obj; } else if (obj instanceof Double) { values[j] = (Double) obj; } else if (obj instanceof Boolean) { values[j] = (Boolean) obj ? 1 : 0; } else { return; } //check weight if (weight.containsKey(listOfTargetFeatures.get(j))) { values[j] *= weight.get(listOfTargetFeatures.get(j)); } //check absolute if (athenaMLFeatureConfiguration.isAbsolute()) { values[j] = Math.abs(values[j]); } } } if (athenaMLFeatureConfiguration.isNormalization()) { normedForVal = normalizer.transform(Vectors.dense(values)); } else { normedForVal = Vectors.dense(values); } int detectIdx = cluster.predict(normedForVal); kmeansValidationSummary.updateSummary(detectIdx, idx, feature); long end2 = System.nanoTime(); long result2 = end2 - start2; kmeansValidationSummary.addTotalNanoSeconds(result2); } }); kmeansValidationSummary.calculateDetectionRate(); kmeansValidationSummary.getAverageNanoSeconds(); kmeansValidationSummary.setkMeansDetectionAlgorithm((KMeansDetectionAlgorithm) kMeansDetectionModel.getDetectionAlgorithm()); }
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD, AthenaMLFeatureConfiguration athenaMLFeatureConfiguration, GradientBoostedTreesDetectionModel gradientBoostedTreesDetectionModel, GradientBoostedTreesValidationSummary gradientBoostedTreesValidationSummary) { List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures(); Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight(); Marking marking = gradientBoostedTreesDetectionModel.getMarking(); GradientBoostedTreesModel model = (GradientBoostedTreesModel) gradientBoostedTreesDetectionModel.getDetectionModel(); Normalizer normalizer = new Normalizer(); int numberOfTargetValue = listOfTargetFeatures.size(); mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() { public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException { long start2 = System.nanoTime(); // <-- start BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE); BSONObject idx = (BSONObject) t._2(); int originLabel = marking.checkClassificationMarkingElements(idx,feature); double[] values = new double[numberOfTargetValue]; for (int j = 0; j < numberOfTargetValue; j++) { values[j] = 0; if (feature.containsField(listOfTargetFeatures.get(j).getValue())) { Object obj = feature.get(listOfTargetFeatures.get(j).getValue()); if (obj instanceof Long) { values[j] = (Long) obj; } else if (obj instanceof Double) { values[j] = (Double) obj; } else if (obj instanceof Boolean) { values[j] = (Boolean) obj ? 1 : 0; } else { return; } //check weight if (weight.containsKey(listOfTargetFeatures.get(j))) { values[j] *= weight.get(listOfTargetFeatures.get(j)); } //check absolute if (athenaMLFeatureConfiguration.isAbsolute()){ values[j] = Math.abs(values[j]); } } } Vector normedForVal; if (athenaMLFeatureConfiguration.isNormalization()) { normedForVal = normalizer.transform(Vectors.dense(values)); } else { normedForVal = Vectors.dense(values); } LabeledPoint p = new LabeledPoint(originLabel,normedForVal); int validatedLabel = (int) model.predict(p.features()); gradientBoostedTreesValidationSummary.updateSummary(validatedLabel,idx,feature); long end2 = System.nanoTime(); long result2 = end2 - start2; gradientBoostedTreesValidationSummary.addTotalNanoSeconds(result2); } }); gradientBoostedTreesValidationSummary.getAverageNanoSeconds(); gradientBoostedTreesValidationSummary.setGradientBoostedTreesDetectionAlgorithm((GradientBoostedTreesDetectionAlgorithm) gradientBoostedTreesDetectionModel.getDetectionAlgorithm()); }
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD, AthenaMLFeatureConfiguration athenaMLFeatureConfiguration, RandomForestDetectionModel randomForestDetectionModel, RandomForestValidationSummary randomForestValidationSummary) { List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures(); Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight(); Marking marking = randomForestDetectionModel.getMarking(); RandomForestModel model = (RandomForestModel) randomForestDetectionModel.getDetectionModel(); Normalizer normalizer = new Normalizer(); int numberOfTargetValue = listOfTargetFeatures.size(); mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() { public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException { long start2 = System.nanoTime(); // <-- start BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE); BSONObject idx = (BSONObject) t._2(); int originLabel = marking.checkClassificationMarkingElements(idx,feature); double[] values = new double[numberOfTargetValue]; for (int j = 0; j < numberOfTargetValue; j++) { values[j] = 0; if (feature.containsField(listOfTargetFeatures.get(j).getValue())) { Object obj = feature.get(listOfTargetFeatures.get(j).getValue()); if (obj instanceof Long) { values[j] = (Long) obj; } else if (obj instanceof Double) { values[j] = (Double) obj; } else if (obj instanceof Boolean) { values[j] = (Boolean) obj ? 1 : 0; } else { return; } //check weight if (weight.containsKey(listOfTargetFeatures.get(j))) { values[j] *= weight.get(listOfTargetFeatures.get(j)); } //check absolute if (athenaMLFeatureConfiguration.isAbsolute()){ values[j] = Math.abs(values[j]); } } } Vector normedForVal; if (athenaMLFeatureConfiguration.isNormalization()) { normedForVal = normalizer.transform(Vectors.dense(values)); } else { normedForVal = Vectors.dense(values); } LabeledPoint p = new LabeledPoint(originLabel,normedForVal); int validatedLabel = (int) model.predict(p.features()); randomForestValidationSummary.updateSummary(validatedLabel,idx,feature); long end2 = System.nanoTime(); long result2 = end2 - start2; randomForestValidationSummary.addTotalNanoSeconds(result2); } }); randomForestValidationSummary.getAverageNanoSeconds(); randomForestValidationSummary.setRandomForestDetectionAlgorithm((RandomForestDetectionAlgorithm) randomForestDetectionModel.getDetectionAlgorithm()); }
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD, AthenaMLFeatureConfiguration athenaMLFeatureConfiguration, SVMDetectionModel SVMDetectionModel, SVMValidationSummary SVMValidationSummary) { List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures(); Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight(); Marking marking = SVMDetectionModel.getMarking(); SVMModel model = (SVMModel) SVMDetectionModel.getDetectionModel(); Normalizer normalizer = new Normalizer(); int numberOfTargetValue = listOfTargetFeatures.size(); mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() { public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException { long start2 = System.nanoTime(); // <-- start BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE); BSONObject idx = (BSONObject) t._2(); int originLabel = marking.checkClassificationMarkingElements(idx,feature); double[] values = new double[numberOfTargetValue]; for (int j = 0; j < numberOfTargetValue; j++) { values[j] = 0; if (feature.containsField(listOfTargetFeatures.get(j).getValue())) { Object obj = feature.get(listOfTargetFeatures.get(j).getValue()); if (obj instanceof Long) { values[j] = (Long) obj; } else if (obj instanceof Double) { values[j] = (Double) obj; } else if (obj instanceof Boolean) { values[j] = (Boolean) obj ? 1 : 0; } else { return; } //check weight if (weight.containsKey(listOfTargetFeatures.get(j))) { values[j] *= weight.get(listOfTargetFeatures.get(j)); } //check absolute if (athenaMLFeatureConfiguration.isAbsolute()){ values[j] = Math.abs(values[j]); } } } Vector normedForVal; if (athenaMLFeatureConfiguration.isNormalization()) { normedForVal = normalizer.transform(Vectors.dense(values)); } else { normedForVal = Vectors.dense(values); } LabeledPoint p = new LabeledPoint(originLabel,normedForVal); //Only SVM!! int validatedLabel;// = (int) model.predict(p.features()); double score = model.predict(p.features()); if (score > 0){ //detection validatedLabel = 1; } else { validatedLabel = 0; } SVMValidationSummary.updateSummary(validatedLabel,idx,feature); long end2 = System.nanoTime(); long result2 = end2 - start2; SVMValidationSummary.addTotalNanoSeconds(result2); } }); SVMValidationSummary.getAverageNanoSeconds(); SVMValidationSummary.setSvmDetectionAlgorithm((SVMDetectionAlgorithm) SVMDetectionModel.getDetectionAlgorithm()); }
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD, AthenaMLFeatureConfiguration athenaMLFeatureConfiguration, LogisticRegressionDetectionModel logisticRegressionDetectionModel, LogisticRegressionValidationSummary logisticRegressionValidationSummary) { List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures(); Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight(); Marking marking = logisticRegressionDetectionModel.getMarking(); LogisticRegressionModel model = (LogisticRegressionModel) logisticRegressionDetectionModel.getDetectionModel(); Normalizer normalizer = new Normalizer(); int numberOfTargetValue = listOfTargetFeatures.size(); mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() { public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException { long start2 = System.nanoTime(); // <-- start BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE); BSONObject idx = (BSONObject) t._2(); int originLabel = marking.checkClassificationMarkingElements(idx,feature); double[] values = new double[numberOfTargetValue]; for (int j = 0; j < numberOfTargetValue; j++) { values[j] = 0; if (feature.containsField(listOfTargetFeatures.get(j).getValue())) { Object obj = feature.get(listOfTargetFeatures.get(j).getValue()); if (obj instanceof Long) { values[j] = (Long) obj; } else if (obj instanceof Double) { values[j] = (Double) obj; } else if (obj instanceof Boolean) { values[j] = (Boolean) obj ? 1 : 0; } else { return; } //check weight if (weight.containsKey(listOfTargetFeatures.get(j))) { values[j] *= weight.get(listOfTargetFeatures.get(j)); } //check absolute if (athenaMLFeatureConfiguration.isAbsolute()){ values[j] = Math.abs(values[j]); } } } Vector normedForVal; if (athenaMLFeatureConfiguration.isNormalization()) { normedForVal = normalizer.transform(Vectors.dense(values)); } else { normedForVal = Vectors.dense(values); } LabeledPoint p = new LabeledPoint(originLabel,normedForVal); int validatedLabel = (int) model.predict(p.features()); logisticRegressionValidationSummary.updateSummary(validatedLabel,idx,feature); long end2 = System.nanoTime(); long result2 = end2 - start2; logisticRegressionValidationSummary.addTotalNanoSeconds(result2); } }); logisticRegressionValidationSummary.getAverageNanoSeconds(); logisticRegressionValidationSummary.setLogisticRegressionDetectionAlgorithm((LogisticRegressionDetectionAlgorithm) logisticRegressionDetectionModel.getDetectionAlgorithm()); }
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD, AthenaMLFeatureConfiguration athenaMLFeatureConfiguration, DecisionTreeDetectionModel decisionTreeDetectionModel, DecisionTreeValidationSummary decisionTreeValidationSummary) { List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures(); Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight(); Marking marking = decisionTreeDetectionModel.getMarking(); DecisionTreeModel model = (DecisionTreeModel) decisionTreeDetectionModel.getDetectionModel(); Normalizer normalizer = new Normalizer(); int numberOfTargetValue = listOfTargetFeatures.size(); mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() { public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException { long start2 = System.nanoTime(); // <-- start BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE); BSONObject idx = (BSONObject) t._2(); int originLabel = marking.checkClassificationMarkingElements(idx,feature); double[] values = new double[numberOfTargetValue]; for (int j = 0; j < numberOfTargetValue; j++) { values[j] = 0; if (feature.containsField(listOfTargetFeatures.get(j).getValue())) { Object obj = feature.get(listOfTargetFeatures.get(j).getValue()); if (obj instanceof Long) { values[j] = (Long) obj; } else if (obj instanceof Double) { values[j] = (Double) obj; } else if (obj instanceof Boolean) { values[j] = (Boolean) obj ? 1 : 0; } else { return; } //check weight if (weight.containsKey(listOfTargetFeatures.get(j))) { values[j] *= weight.get(listOfTargetFeatures.get(j)); } //check absolute if (athenaMLFeatureConfiguration.isAbsolute()){ values[j] = Math.abs(values[j]); } } } Vector normedForVal; if (athenaMLFeatureConfiguration.isNormalization()) { normedForVal = normalizer.transform(Vectors.dense(values)); } else { normedForVal = Vectors.dense(values); } LabeledPoint p = new LabeledPoint(originLabel,normedForVal); int validatedLabel = (int) model.predict(p.features()); decisionTreeValidationSummary.updateSummary(validatedLabel,idx,feature); long end2 = System.nanoTime(); long result2 = end2 - start2; decisionTreeValidationSummary.addTotalNanoSeconds(result2); } }); decisionTreeValidationSummary.getAverageNanoSeconds(); decisionTreeValidationSummary.setDecisionTreeDetectionAlgorithm((DecisionTreeDetectionAlgorithm) decisionTreeDetectionModel.getDetectionAlgorithm()); }
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD, AthenaMLFeatureConfiguration athenaMLFeatureConfiguration, NaiveBayesDetectionModel naiveBayesDetectionModel, NaiveBayesValidationSummary naiveBayesValidationSummary) { List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures(); Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight(); Marking marking = naiveBayesDetectionModel.getMarking(); NaiveBayesModel model = (NaiveBayesModel) naiveBayesDetectionModel.getDetectionModel(); Normalizer normalizer = new Normalizer(); int numberOfTargetValue = listOfTargetFeatures.size(); mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() { public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException { long start2 = System.nanoTime(); // <-- start BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE); BSONObject idx = (BSONObject) t._2(); int originLabel = marking.checkClassificationMarkingElements(idx,feature); double[] values = new double[numberOfTargetValue]; for (int j = 0; j < numberOfTargetValue; j++) { values[j] = 0; if (feature.containsField(listOfTargetFeatures.get(j).getValue())) { Object obj = feature.get(listOfTargetFeatures.get(j).getValue()); if (obj instanceof Long) { values[j] = (Long) obj; } else if (obj instanceof Double) { values[j] = (Double) obj; } else if (obj instanceof Boolean) { values[j] = (Boolean) obj ? 1 : 0; } else { return; } //check weight if (weight.containsKey(listOfTargetFeatures.get(j))) { values[j] *= weight.get(listOfTargetFeatures.get(j)); } //check absolute if (athenaMLFeatureConfiguration.isAbsolute()){ values[j] = Math.abs(values[j]); } } } Vector normedForVal; if (athenaMLFeatureConfiguration.isNormalization()) { normedForVal = normalizer.transform(Vectors.dense(values)); } else { normedForVal = Vectors.dense(values); } LabeledPoint p = new LabeledPoint(originLabel,normedForVal); int validatedLabel = (int) model.predict(p.features()); naiveBayesValidationSummary.updateSummary(validatedLabel,idx,feature); long end2 = System.nanoTime(); long result2 = end2 - start2; naiveBayesValidationSummary.addTotalNanoSeconds(result2); } }); naiveBayesValidationSummary.getAverageNanoSeconds(); naiveBayesValidationSummary.setNaiveBayesDetectionAlgorithm((NaiveBayesDetectionAlgorithm) naiveBayesDetectionModel.getDetectionAlgorithm()); }
/** * Run the Envelope pipeline as a Spark Streaming job. * @param steps The full configuration of the Envelope pipeline */ @SuppressWarnings("unchecked") private static void runStreaming(final Set<Step> steps) throws Exception { final Set<Step> independentNonStreamingSteps = StepUtils.getIndependentNonStreamingSteps(steps); runBatch(independentNonStreamingSteps); Set<StreamingStep> streamingSteps = StepUtils.getStreamingSteps(steps); for (final StreamingStep streamingStep : streamingSteps) { LOG.debug("Setting up streaming step: " + streamingStep.getName()); @SuppressWarnings("rawtypes") JavaDStream stream = streamingStep.getStream(); final StructType streamSchema = streamingStep.getSchema(); LOG.debug("Stream schema: " + streamSchema); stream.foreachRDD(new VoidFunction<JavaRDD<?>>() { @Override public void call(JavaRDD<?> raw) throws Exception { // Some independent steps might be repeating steps that have been flagged for reload StepUtils.resetRepeatingSteps(steps); // This will run any batch steps (and dependents) that are not submitted runBatch(independentNonStreamingSteps); streamingStep.stageProgress(raw); JavaRDD<Row> translated = streamingStep.translate(raw); Dataset<Row> batchDF = Contexts.getSparkSession().createDataFrame(translated, streamSchema); streamingStep.setData(batchDF); streamingStep.setSubmitted(true); Set<Step> allDependentSteps = StepUtils.getAllDependentSteps(streamingStep, steps); runBatch(allDependentSteps); StepUtils.resetDataSteps(allDependentSteps); streamingStep.recordProgress(); } }); LOG.debug("Finished setting up streaming step: " + streamingStep.getName()); } JavaStreamingContext jsc = Contexts.getJavaStreamingContext(); jsc.start(); LOG.debug("Streaming context started"); jsc.awaitTermination(); LOG.debug("Streaming context terminated"); }
private void start() { // Create a local StreamingContext with two working thread and batch interval of // 1 second SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Streaming Ingestion File System Text File to Dataframe"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); JavaDStream<String> msgDataStream = jssc.textFileStream(StreamingUtils.getInputDirectory()); msgDataStream.print(); // Create JavaRDD<Row> msgDataStream.foreachRDD(new VoidFunction<JavaRDD<String>>() { private static final long serialVersionUID = -590010339928376829L; @Override public void call(JavaRDD<String> rdd) { JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() { private static final long serialVersionUID = 5167089361335095997L; @Override public Row call(String msg) { Row row = RowFactory.create(msg); return row; } }); // Create Schema StructType schema = DataTypes.createStructType( new StructField[] { DataTypes.createStructField("Message", DataTypes.StringType, true) }); // Get Spark 2.0 session SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); Dataset<Row> msgDataFrame = spark.createDataFrame(rowRDD, schema); msgDataFrame.show(); } }); jssc.start(); try { jssc.awaitTermination(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
/** * A method that will publish all the records of the provided Spark RDD into NATS * @param rdd, the RDD to publish to NATS */ @SuppressWarnings({ "unchecked", "rawtypes" }) public <V> void publishToNats(final JavaRDD<V> rdd) { ((JavaRDD) rdd).foreach((VoidFunction<V> & Serializable) obj -> publishToNats(encodeData(obj))); }