Iterable<WindowedValue<T>> getValues(PCollection<T> pcollection) { if (windowedValues == null) { WindowFn<?, ?> windowFn = pcollection.getWindowingStrategy().getWindowFn(); Coder<? extends BoundedWindow> windowCoder = windowFn.windowCoder(); final WindowedValue.WindowedValueCoder<T> windowedValueCoder; if (windowFn instanceof GlobalWindows) { windowedValueCoder = WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder()); } else { windowedValueCoder = WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder); } JavaRDDLike<byte[], ?> bytesRDD = rdd.map(CoderHelpers.toByteFunction(windowedValueCoder)); List<byte[]> clientBytes = bytesRDD.collect(); windowedValues = Iterables.transform(clientBytes, new Function<byte[], WindowedValue<T>>() { @Override public WindowedValue<T> apply(byte[] bytes) { return CoderHelpers.fromByteArray(bytes, windowedValueCoder); } }); } return windowedValues; }
@Override public Object onRdd(JavaRDDLike rdd, Object... payloads) { try { List<Object> arguments = new ArrayList<>(payloads.length + 1); arguments.add(rdd); arguments.addAll(asList(payloads)); if (arguments.get(1) == null) { arguments.remove(1); } Method callbackMethod = rddCallbacks.get(0); callbackMethod.setAccessible(true); if (camelContext != null) { for (int i = 1; i < arguments.size(); i++) { arguments.set(i, camelContext.getTypeConverter().convertTo(callbackMethod.getParameterTypes()[i], arguments.get(i))); } } return callbackMethod.invoke(objectWithCallback, arguments.toArray(new Object[arguments.size()])); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } }
@Override protected JndiRegistry createRegistry() throws Exception { JndiRegistry registry = super.createRegistry(); registry.bind("testFileRdd", sparkContext.textFile("src/test/resources/testrdd.txt")); if (shouldRunHive) { registry.bind("hiveContext", hiveContext); DataFrame jsonCars = hiveContext.read().json("src/test/resources/cars.json"); jsonCars.registerTempTable("cars"); registry.bind("jsonCars", jsonCars); } registry.bind("countLinesTransformation", new org.apache.camel.component.spark.RddCallback() { @Override public Object onRdd(JavaRDDLike rdd, Object... payloads) { return rdd.count(); } }); return registry; }
@Test public void shouldExecuteVoidCallback() throws IOException { // Given final File output = File.createTempFile("camel", "spark"); output.delete(); // When template.sendBodyAndHeader(sparkUri, null, SPARK_RDD_CALLBACK_HEADER, new VoidRddCallback() { @Override public void doOnRdd(JavaRDDLike rdd, Object... payloads) { rdd.saveAsTextFile(output.getAbsolutePath()); } }); // Then Truth.assertThat(output.length()).isGreaterThan(0L); }
private static <K, V> TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>> gbk() { return new TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>>() { @Override public void evaluate(GroupByKey.GroupByKeyOnly<K, V> transform, EvaluationContext context) { @SuppressWarnings("unchecked") JavaRDDLike<WindowedValue<KV<K, V>>, ?> inRDD = (JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context.getInputRDD(transform); @SuppressWarnings("unchecked") KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder(); Coder<K> keyCoder = coder.getKeyCoder(); Coder<V> valueCoder = coder.getValueCoder(); // Use coders to convert objects in the PCollection to byte arrays, so they // can be transferred over the network for the shuffle. JavaRDDLike<WindowedValue<KV<K, Iterable<V>>>, ?> outRDD = fromPair( toPair(inRDD.map(WindowingHelpers.<KV<K, V>>unwindowFunction())) .mapToPair(CoderHelpers.toByteFunction(keyCoder, valueCoder)) .groupByKey() .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, valueCoder))) // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK .map(WindowingHelpers.<KV<K, Iterable<V>>>windowFunction()); context.setOutputRDD(transform, outRDD); } }; }
private static <T> TransformEvaluator<TextIO.Write.Bound<T>> writeText() { return new TransformEvaluator<TextIO.Write.Bound<T>>() { @Override public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) { @SuppressWarnings("unchecked") JavaPairRDD<T, Void> last = ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform)) .map(WindowingHelpers.<T>unwindowFunction()) .mapToPair(new PairFunction<T, T, Void>() { @Override public Tuple2<T, Void> call(T t) throws Exception { return new Tuple2<>(t, null); } }); ShardTemplateInformation shardTemplateInfo = new ShardTemplateInformation(transform.getNumShards(), transform.getShardTemplate(), transform.getFilenamePrefix(), transform.getFilenameSuffix()); writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class, NullWritable.class, TemplatedTextOutputFormat.class); } }; }
private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() { return new TransformEvaluator<Window.Bound<T>>() { @Override public void evaluate(Window.Bound<T> transform, EvaluationContext context) { @SuppressWarnings("unchecked") JavaRDDLike<WindowedValue<T>, ?> inRDD = (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform); WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform); if (windowFn instanceof GlobalWindows) { context.setOutputRDD(transform, inRDD); } else { @SuppressWarnings("unchecked") DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null); context.setOutputRDD(transform, inRDD.mapPartitions(dofn)); } } }; }
JavaRDDLike<WindowedValue<T>, ?> getRDD() { if (rdd == null) { Iterable<WindowedValue<T>> windowedValues = Iterables.transform(values, new Function<T, WindowedValue<T>>() { @Override public WindowedValue<T> apply(T t) { // TODO: this is wrong if T is a TimestampedValue return WindowedValue.valueInEmptyWindows(t); } }); WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder = WindowedValue.getValueOnlyCoder(coder); rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)) .map(CoderHelpers.fromByteFunction(windowCoder)); } return rdd; }
@Override public <T> T get(PValue value) { if (pobjects.containsKey(value)) { @SuppressWarnings("unchecked") T result = (T) pobjects.get(value); return result; } if (pcollections.containsKey(value)) { JavaRDDLike<?, ?> rdd = pcollections.get(value).getRDD(); @SuppressWarnings("unchecked") T res = (T) Iterables.getOnlyElement(rdd.collect()); pobjects.put(value, res); return res; } throw new IllegalStateException("Cannot resolve un-known PObject: " + value); }
public static <A extends JavaRDDLike<?, ?>> VoidFunction<A> create(JavaStreamingContext jsc, long amount, String printf) { final LongAccumulator stopAcc = jsc.ssc().sc().longAccumulator(); return rdd -> { if (printf != null) System.out.printf(printf, rdd.count()); if (rdd.count() == 0L) { stopAcc.add(1L); if (stopAcc.value() >= amount) jsc.stop(); } else stopAcc.reset(); }; }
@Override public T onRdd(JavaRDDLike rdd, Object... payloads) { if (payloads.length != payloadsTypes.length) { String message = format("Received %d payloads, but expected %d.", payloads.length, payloadsTypes.length); throw new IllegalArgumentException(message); } for (int i = 0; i < payloads.length; i++) { payloads[i] = camelContext.getTypeConverter().convertTo(payloadsTypes[i], payloads[i]); } return doOnRdd(rdd, payloads); }
@Override public void process(Exchange exchange) throws Exception { JavaRDDLike rdd = resolveRdd(exchange); RddCallback rddCallback = resolveRddCallback(exchange); Object body = exchange.getIn().getBody(); Object result = body instanceof List ? rddCallback.onRdd(rdd, ((List) body).toArray(new Object[0])) : rddCallback.onRdd(rdd, body); collectResults(exchange, result); }
protected JavaRDDLike resolveRdd(Exchange exchange) { if (exchange.getIn().getHeader(SPARK_RDD_HEADER) != null) { return (JavaRDD) exchange.getIn().getHeader(SPARK_RDD_HEADER); } else if (getEndpoint().getRdd() != null) { return getEndpoint().getRdd(); } else { throw new IllegalStateException("No RDD defined."); } }
@Test public void shouldExecuteRddCallback() { long linesCount = template.requestBodyAndHeader(sparkUri, null, SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() { @Override public Long onRdd(JavaRDDLike rdd, Object... payloads) { return rdd.count(); } }, Long.class); Truth.assertThat(linesCount).isEqualTo(numberOfLinesInTestFile); }
@Test public void shouldExecuteRddCallbackWithSinglePayload() { long linesCount = template.requestBodyAndHeader(sparkUri, 10, SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() { @Override public Long onRdd(JavaRDDLike rdd, Object... payloads) { return rdd.count() * (int) payloads[0]; } }, Long.class); Truth.assertThat(linesCount).isEqualTo(numberOfLinesInTestFile * 10); }
@Test public void shouldExecuteRddCallbackWithPayloads() { long linesCount = template.requestBodyAndHeader(sparkUri, asList(10, 10), SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() { @Override public Long onRdd(JavaRDDLike rdd, Object... payloads) { return rdd.count() * (int) payloads[0] * (int) payloads[1]; } }, Long.class); Truth.assertThat(linesCount).isEqualTo(numberOfLinesInTestFile * 10 * 10); }
@Test public void shouldExecuteRddCallbackWithTypedPayloads() { ConvertingRddCallback rddCallback = new ConvertingRddCallback<Long>(context, int.class, int.class) { @Override public Long doOnRdd(JavaRDDLike rdd, Object... payloads) { return rdd.count() * (int) payloads[0] * (int) payloads[1]; } }; long linesCount = template.requestBodyAndHeader(sparkUri, asList("10", "10"), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class); Truth.assertThat(linesCount).isEqualTo(1900); }
/** * Read an RDD from BufferedDataTable table * * @param table * <code>BufferedDataTable</code> * @throws ClassCastException * If <code>table</code> doesn't contain JavaRDDLike object * @return <code>JavaRDDLike</code> saved in <code>table</code> */ @SuppressWarnings("rawtypes") public static JavaRDDLike getRDD(BufferedDataTable table) { DataCell dc = table.iterator().next().getCell(0); if (dc.getType() == RddCell.TYPE) { return ((RddCell) dc).getRDDValue(); } else if (dc.getType() == PairRddCell.TYPE) { return ((PairRddCell) dc).getPairRDDValue(); } else { throw new ClassCastException( "table contains non JavaRDDLike object"); } }
/** * {@inheritDoc} */ @SuppressWarnings({ "rawtypes", "unchecked" }) @Override protected BufferedDataTable[] execute(final BufferedDataTable[] inData, final ExecutionContext exec) throws Exception { JavaRDDLike rdd; BufferedDataTable[] out; // create and save an if (TableCellUtils.isPairRDD(inData[0])) { if (TableCellUtils.isPairRDD(inData[1])) { rdd = ((JavaPairRDD) TableCellUtils.getRDD(inData[0])) .union((JavaPairRDD) TableCellUtils.getRDD(inData[1])); out = new BufferedDataTable[] { TableCellUtils.setRDD(exec, rdd, true) }; } else { throw new IllegalArgumentException("RDD's must be of same type"); } } else { if (TableCellUtils.isPairRDD(inData[1])) { throw new IllegalArgumentException("RDD's must be of same type"); } else { rdd = ((JavaRDD) TableCellUtils.getRDD(inData[0])) .union((JavaRDD) TableCellUtils.getRDD(inData[1])); out = new BufferedDataTable[] { TableCellUtils.setRDD(exec, rdd, false) }; } } // update viewer rddViewer = new RddViewer(out[0], exec); return out; }
/** * {@inheritDoc} */ @SuppressWarnings({ "rawtypes", "unchecked" }) @Override protected BufferedDataTable[] execute(final BufferedDataTable[] inData, final ExecutionContext exec) throws Exception { JavaRDDLike rdd; BufferedDataTable[] out; // create and save an intersection if (TableCellUtils.isPairRDD(inData[0])) { if (TableCellUtils.isPairRDD(inData[1])) { rdd = ((JavaPairRDD) TableCellUtils.getRDD(inData[0])) .intersection((JavaPairRDD) TableCellUtils .getRDD(inData[1])); out = new BufferedDataTable[] { TableCellUtils.setRDD(exec, rdd, true) }; } else { throw new IllegalArgumentException("RDD's must be of same type"); } } else { if (TableCellUtils.isPairRDD(inData[1])) { throw new IllegalArgumentException("RDD's must be of same type"); } else { rdd = ((JavaRDD) TableCellUtils.getRDD(inData[0])) .intersection((JavaRDD) TableCellUtils .getRDD(inData[1])); out = new BufferedDataTable[] { TableCellUtils.setRDD(exec, rdd, false) }; } } // update viewer rddViewer = new RddViewer(out[0], exec); return out; }
/** * {@inheritDoc} */ @SuppressWarnings("rawtypes") @Override protected BufferedDataTable[] execute(final BufferedDataTable[] inData, final ExecutionContext exec) throws Exception { JavaRDDLike rdd; BufferedDataTable[] out; // create and save sample if (TableCellUtils.isPairRDD(inData[0])) { rdd = ((JavaPairRDD) TableCellUtils.getRDD(inData[0])).sample( m_replacement.getBooleanValue(), m_fraction.getIntValue() / 100.0, m_seed.getIntValue()); out = new BufferedDataTable[] { TableCellUtils.setRDD(exec, rdd, true) }; } else { rdd = ((JavaRDD) TableCellUtils.getRDD(inData[0])).sample( m_replacement.getBooleanValue(), m_fraction.getIntValue() / 100.0, m_seed.getIntValue()); out = new BufferedDataTable[] { TableCellUtils.setRDD(exec, rdd, false) }; } // update viewer rddViewer = new RddViewer(out[0], exec); return out; }
private static <K, VI, VO> TransformEvaluator<Combine.GroupedValues<K, VI, VO>> grouped() { return new TransformEvaluator<Combine.GroupedValues<K, VI, VO>>() { @Override public void evaluate(Combine.GroupedValues<K, VI, VO> transform, EvaluationContext context) { Combine.KeyedCombineFn<K, VI, ?, VO> keyed = GROUPED_FG.get("fn", transform); @SuppressWarnings("unchecked") JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?> inRDD = (JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?>) context.getInputRDD(transform); context.setOutputRDD(transform, inRDD.map(new KVFunction<>(keyed))); } }; }
private static <K, V> JavaPairRDD<K, V> toPair(JavaRDDLike<KV<K, V>, ?> rdd) { return rdd.mapToPair(new PairFunction<KV<K, V>, K, V>() { @Override public Tuple2<K, V> call(KV<K, V> kv) { return new Tuple2<>(kv.getKey(), kv.getValue()); } }); }
private static <K, V> JavaRDDLike<KV<K, V>, ?> fromPair(JavaPairRDD<K, V> rdd) { return rdd.map(new Function<Tuple2<K, V>, KV<K, V>>() { @Override public KV<K, V> call(Tuple2<K, V> t2) { return KV.of(t2._1(), t2._2()); } }); }
private static <I, O> TransformEvaluator<ParDo.Bound<I, O>> parDo() { return new TransformEvaluator<ParDo.Bound<I, O>>() { @Override public void evaluate(ParDo.Bound<I, O> transform, EvaluationContext context) { DoFnFunction<I, O> dofn = new DoFnFunction<>(transform.getFn(), context.getRuntimeContext(), getSideInputs(transform.getSideInputs(), context)); @SuppressWarnings("unchecked") JavaRDDLike<WindowedValue<I>, ?> inRDD = (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform); context.setOutputRDD(transform, inRDD.mapPartitions(dofn)); } }; }
private static <I, O> TransformEvaluator<ParDo.BoundMulti<I, O>> multiDo() { return new TransformEvaluator<ParDo.BoundMulti<I, O>>() { @Override public void evaluate(ParDo.BoundMulti<I, O> transform, EvaluationContext context) { TupleTag<O> mainOutputTag = MULTIDO_FG.get("mainOutputTag", transform); MultiDoFnFunction<I, O> multifn = new MultiDoFnFunction<>( transform.getFn(), context.getRuntimeContext(), mainOutputTag, getSideInputs(transform.getSideInputs(), context)); @SuppressWarnings("unchecked") JavaRDDLike<WindowedValue<I>, ?> inRDD = (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform); JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD .mapPartitionsToPair(multifn) .cache(); PCollectionTuple pct = context.getOutput(transform); for (Map.Entry<TupleTag<?>, PCollection<?>> e : pct.getAll().entrySet()) { @SuppressWarnings("unchecked") JavaPairRDD<TupleTag<?>, WindowedValue<?>> filtered = all.filter(new TupleTagFilter(e.getKey())); @SuppressWarnings("unchecked") // Object is the best we can do since different outputs can have different tags JavaRDD<WindowedValue<Object>> values = (JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values(); context.setRDD(e.getValue(), values); } } }; }
private static <T> TransformEvaluator<AvroIO.Write.Bound<T>> writeAvro() { return new TransformEvaluator<AvroIO.Write.Bound<T>>() { @Override public void evaluate(AvroIO.Write.Bound<T> transform, EvaluationContext context) { Job job; try { job = Job.getInstance(); } catch (IOException e) { throw new IllegalStateException(e); } AvroJob.setOutputKeySchema(job, transform.getSchema()); @SuppressWarnings("unchecked") JavaPairRDD<AvroKey<T>, NullWritable> last = ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform)) .map(WindowingHelpers.<T>unwindowFunction()) .mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() { @Override public Tuple2<AvroKey<T>, NullWritable> call(T t) throws Exception { return new Tuple2<>(new AvroKey<>(t), NullWritable.get()); } }); ShardTemplateInformation shardTemplateInfo = new ShardTemplateInformation(transform.getNumShards(), transform.getShardTemplate(), transform.getFilenamePrefix(), transform.getFilenameSuffix()); writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo, AvroKey.class, NullWritable.class, TemplatedAvroKeyOutputFormat.class); } }; }
private static <K, V> TransformEvaluator<HadoopIO.Write.Bound<K, V>> writeHadoop() { return new TransformEvaluator<HadoopIO.Write.Bound<K, V>>() { @Override public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) { @SuppressWarnings("unchecked") JavaPairRDD<K, V> last = ((JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context .getInputRDD(transform)) .map(WindowingHelpers.<KV<K, V>>unwindowFunction()) .mapToPair(new PairFunction<KV<K, V>, K, V>() { @Override public Tuple2<K, V> call(KV<K, V> t) throws Exception { return new Tuple2<>(t.getKey(), t.getValue()); } }); ShardTemplateInformation shardTemplateInfo = new ShardTemplateInformation(transform.getNumShards(), transform.getShardTemplate(), transform.getFilenamePrefix(), transform.getFilenameSuffix()); Configuration conf = new Configuration(); for (Map.Entry<String, String> e : transform.getConfigurationProperties().entrySet()) { conf.set(e.getKey(), e.getValue()); } writeHadoopFile(last, conf, shardTemplateInfo, transform.getKeyClass(), transform.getValueClass(), transform.getFormatClass()); } }; }
Iterable<T> getValues(PCollection<T> pcollection) { if (values == null) { coder = pcollection.getCoder(); JavaRDDLike<byte[], ?> bytesRDD = rdd.map(WindowingHelpers.<T>unwindowFunction()) .map(CoderHelpers.toByteFunction(coder)); List<byte[]> clientBytes = bytesRDD.collect(); values = Iterables.transform(clientBytes, new Function<byte[], T>() { @Override public T apply(byte[] bytes) { return CoderHelpers.fromByteArray(bytes, coder); } }); } return values; }
protected JavaRDDLike<?, ?> getRDD(PValue pvalue) { RDDHolder<?> rddHolder = pcollections.get(pvalue); JavaRDDLike<?, ?> rdd = rddHolder.getRDD(); leafRdds.remove(rddHolder); if (multireads.contains(pvalue)) { // Ensure the RDD is marked as cached rdd.rdd().cache(); } else { multireads.add(pvalue); } return rdd; }
protected <T> void setRDD(PValue pvalue, JavaRDDLike<WindowedValue<T>, ?> rdd) { try { rdd.rdd().setName(pvalue.getName()); } catch (IllegalStateException e) { // name not set, ignore } RDDHolder<T> rddHolder = new RDDHolder<>(rdd); pcollections.put(pvalue, rddHolder); leafRdds.add(rddHolder); }
/** * Computes the outputs for all RDDs that are leaves in the DAG and do not have any * actions (like saving to a file) registered on them (i.e. they are performed for side * effects). */ protected void computeOutputs() { for (RDDHolder<?> rddHolder : leafRdds) { JavaRDDLike<?, ?> rdd = rddHolder.getRDD(); rdd.rdd().cache(); // cache so that any subsequent get() is cheap rdd.count(); // force the RDD to be computed } }
protected <T, Repr extends JavaRDDLike<T, Repr>> long getTotalDataSetObjectCount( JavaRDDLike<T, Repr> trainingData) { if (collectTrainingStats) stats.logCountStart(); long totalDataSetObjectCount = trainingData.count(); if (collectTrainingStats) stats.logCountEnd(); return totalDataSetObjectCount; }
@Override public Void onRdd(JavaRDDLike rdd, Object... payloads) { doOnRdd(rdd, payloads); return null; }
public JavaRDDLike getRdd() { return rdd; }
/** * RDD to compute against. */ public void setRdd(JavaRDDLike rdd) { this.rdd = rdd; }