小编典典

如何使用Spark处理一系列HBase行?

java

我正在尝试使用HBase作为Spark的数据源。因此,第一步证明是从HBase表创建RDD。由于Spark使用hadoop输入格式,因此我可以通过创建rdd
http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25找到使用所有行的方法/ lighting-a-spark-with-
hbase
但是我们如何为范围扫描创建RDD?

欢迎所有建议。


阅读 377

收藏
2020-12-03

共1个答案

小编典典

这是在Spark中使用扫描的示例:

import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64

def convertScanToString(scan: Scan): String = {
  val out: ByteArrayOutputStream = new ByteArrayOutputStream
  val dos: DataOutputStream = new DataOutputStream(out)
  scan.write(dos)
  Base64.encodeBytes(out.toByteArray)
}

val conf = HBaseConfiguration.create()
val scan = new Scan()
scan.setCaching(500)
scan.setCacheBlocks(false)
conf.set(TableInputFormat.INPUT_TABLE, "table_name")
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.count

您需要将相关的库添加到Spark类路径,并确保它们与您的Spark兼容。提示:您可以hbase classpath用来查找它们。

2020-12-03