Apache Spark核心编程


Spark Core是整个项目的基础。它提供分布式任务调度,调度和基本I / O功能。Spark使用称为RDD(弹性分布式数据集)的专用基础数据结构,它是跨计算机分区的逻辑数据集合。RDD可以通过两种方式创建; 一种是通过引用外部存储系统中的数据集,另一种是通过在现有的RDD上应用转换(例如映射,过滤器,缩减器,连接)。

RDD抽象是通过一个语言集成的API公开的。这简化了编程复杂性,因为应用程序操作RDD的方式与操作本地数据集合类似。

Spark Shell

Spark提供了一个交互式外壳 - 一种交互式分析数据的强大工具。它以Scala或Python语言提供。Spark的主要抽象是分布式的项目集合,称为弹性分布式数据集(RDD)。可以从Hadoop输入格式(例如HDFS文件)或通过转换其他RDD创建RDD。

打开Spark Shell

以下命令用于打开Spark shell。

$ spark-shell

创建简单的RDD

让我们从文本文件中创建一个简单的RDD。使用以下命令创建一个简单的RDD。

scala> val inputfile = sc.textFile(“input.txt”)

上述命令的输出是

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD API引入了一些 转换 和少量 操作 来操作RDD。

RDD转换

RDD转换返回指向新RDD的指针,并允许您创建RDD之间的依赖关系。依赖链(依赖项字符串)中的每个RDD都有一个用于计算其数据的函数,并且具有指向其父RDD的指针(依赖项)。

Spark是懒惰的,所以除非你调用一些会触发作业创建和执行的转换或动作,否则什么也不会执行。看下面的单词计数示例片段。

因此,RDD转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤),告诉Spark如何获取数据以及如何处理数据。

以下给出了RDD转换的列表。

S.No 转化与意义
1 map(func)
返回一个新的分布式数据集,通过传递函数 **func中的** 每个元素来形成。
2 filter(func)
返回通过选择 **func** 返回true 的源的那些元素形成的新数据集。
3 flatMap(func)
与map类似,但每个输入项可以映射到0个或更多的输出项(所以 _func_ 应该返回一个Seq而不是单个项)。
4 mapPartitions(func)
与map类似,但是在RDD的每个分区(块)上分别运行,所以 **func** 在类型T的RDD上运行时必须是Iterator ⇒Iterator 类型。
5 mapPartitionsWithIndex(func)
类似于映射分区,而且还提供 **FUNC** 与表示所述分区的索引的整数值,所以 **FUNC** 必须是类型(中间体,迭代器)的⇒迭代上型T的RDD运行时
6 sample(withReplacement, fraction, seed)
使用给定的随机数生成器种子对 **部分** 数据进行采样,包括或不包括替换。
7 union(otherDataset)
返回包含源数据集中的元素和参数的联合的新数据集。
8 intersection(otherDataset)
返回一个新的RDD,其中包含源数据集中的元素与参数的交集。
9 distinct([numTasks])
返回包含源数据集的不同元素的新数据集。
10 groupByKey([numTasks])
当调用(K,V)对的数据集时,返回(K,Iterable )对的数据集。 **注** \- 如果您正在进行分组以执行每个密钥的聚合(例如和或平均值),则使用reduceByKey或aggregateByKey将会产生更好的性能。
11 reduceByKey(func, [numTasks])
在(K,V)对的数据集上调用时,返回(K,V)对的数据集,其中每个键的值使用给定的reduce函数 _func_ 进行聚合,该函数必须是(V,V)⇒V 。和groupByKey一样,reduce任务的数量可以通过可选的第二个参数来配置。
12 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中每个键的值使用给定的组合函数和中性“零”值进行聚合。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。和groupByKey一样,reduce任务的数量可以通过可选的第二个参数来配置。
13 sortByKey([ascending], [numTasks])
当调用K实现Ordered的(K,V)对的数据集时,按照布尔上升参数的指定,按照升序或降序顺序返回按键排序的(K,V)对数据集。
14 join(otherDataset, [numTasks])
在类型(K,V)和(K,W)的数据集上调用时,返回包含每个键的所有元素对的(K,(V,W))对的数据集。外连接通过leftOuterJoin,rightOuterJoin和fullOuterJoin支持。
15 cogroup(otherDataset, [numTasks])
在类型(K,V)和(K,W)的数据集上调用时,返回(K,(Iterable ,Iterable ))元组的数据集。这个操作也叫做Group With。
16 cartesian(otherDataset)
当调用类型T和U的数据集时,返回(T,U)对(所有元素对)的数据集。
17 pipe(command, [envVars])
通过shell命令管理RDD的每个分区,例如Perl或bash脚本。RDD元素被写入进程的stdin,输出到stdout的行作为字符串的RDD返回。
18 coalesce(numPartitions)
减少RDD中的分区数量为numPartitions。用于过滤大型数据集后更高效地运行操作。
19 repartition(numPartitions)
随机调整RDD中的数据以创建更多或更少的分区并在其间进行平衡。这总是通过网络混洗所有数据。
20 repartitionAndSortWithinPartitions(partitioner)
根据给定的分区程序对RDD进行重新分区,并在每个生成的分区内按键对记录进行排序。这比调用重新分区,然后在每个分区内进行排序更有效率,因为它可以将排序压入洗牌机器。

操作

下表给出了一个Actions列表,它返回值。

S.No 行动与意义
1 reduce(func)
使用函数 **func** (它接受两个参数并返回一个)来聚合数据集的元素。该函数应该是可交换和关联的,以便可以并行地正确计算它。
2 collect()
在驱动程序中将数据集的所有元素作为数组返回。在过滤器或其他操作返回足够小的数据子集之后,这通常很有用。
3 count()
返回数据集中元素的数量。
4 first()
返回数据集的第一个元素(类似于take(1))。
5 take(n)
返回包含数据集前 **n个** 元素的数组。
6 takeSample (withReplacement,num, [seed])
返回一个数组,其中包含数据集的 **num** 元素的随机样本,有或没有替换,可以预先指定一个随机数生成器种子。
7 takeOrdered(n, [ordering])
使用自然顺序或自定义比较器返回RDD 的前 **n个** 元素。
8 saveAsTextFile(path)
将数据集的元素作为文本文件(或文本文件集)写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定目录中。Spark在每个元素上调用toString将其转换为文件中的一行文本。
9 saveAsSequenceFile(path) (Java and Scala)
将数据集的元素作为Hadoop SequenceFile写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定路径。这在实现Hadoop的Writable接口的键值对的RDD上可用。在Scala中,它也可用于可隐式转换为Writable的类型(Spark包含Int,Double,String等基本类型的转换)。
10 saveAsObjectFile(path) (Java and Scala)
使用Java序列化以简单格式写入数据集的元素,然后可以使用SparkContext.objectFile()加载该序列化。
11 countByKey()
仅适用于类型(K,V)的RDD。用(K,Int)对的hashmap返回每个键的计数。
12 foreach(func)
在数据集的每个元素上运行函数 **func** 。这通常是为了副作用而完成的,例如更新累加器或与外部存储系统交互。 **注** \- 修改foreach()之外的累加器以外的变量可能会导致未定义的行为。 有关更多详细信息,请参阅了解闭包

用RDD编程

借助示例,让我们看看RDD编程中几个RDD转换和操作的实现。

考虑一个字数统计的例子 - 统计出现在文档中的每个单词。考虑将下列文本作为输入,并将其作为 input.txt 文件保存在主目录中。

input.txt - 输入文件。

people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful  as they love,
as they care as they share.

按照下面给出的步骤执行给定的例子。

打开Spark-Shell

以下命令用于打开Spark外壳。一般来说,Spark是使用Scala构建的。因此,Spark程序在Scala环境中运行。

$ spark-shell

如果Spark shell成功打开,你会发现下面的输出。查看输出的最后一行“Spark context available as sc”意味着Spark容器将自动创建名为 sc的 spark上下文对象 。在开始程序的第一步之前,应该创建SparkContext对象。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/  

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc scala>

创建一个RDD

首先,我们必须使用Spark-Scala API读取输入文件并创建一个RDD。

以下命令用于从给定位置读取文件。这里,使用inputfile的名称创建新的RDD。在textFile(“”)方法中作为参数给出的字符串是输入文件名的绝对路径。但是,如果只给出文件名,则表示输入文件位于当前位置。

scala> val inputfile = sc.textFile("input.txt")

执行字数转换

我们的目标是统计文件中的单词。创建一个平面地图,将每条线分成单词( flatMap(line⇒line.split(“”) )。

接下来,使用地图功能( 地图(单词⇒(单词,1) ))将每个单词作为值为 '1' (<键,值> = <单词,1>)的键读取。 **

最后,通过添加相似键的值( reduceByKey( + )来减少这些键。

以下命令用于执行字数逻辑。执行此操作后,您不会找到任何输出,因为这不是一个操作,这是一个转换; 指出一个新的RDD或者告诉spark如何处理给定的数据)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

目前的RDD

在使用RDD时,如果您想了解当前的RDD,请使用以下命令。它将向您显示关于当前RDD及其调试依赖关系的描述。

scala> counts.toDebugString

缓存转换

您可以使用其上的persist()或cache()方法将RDD标记为持久化。第一次在动作中计算时,它将保存在节点的内存中。使用以下命令将中间转换存储在内存中。

scala> counts.cache()

应用该操作

应用一个动作,例如存储所有转换,结果到一个文本文件中。saveAsTextFile(“”)方法的String参数是输出文件夹的绝对路径。尝试使用以下命令将输出保存在文本文件中。在以下示例中,“输出”文件夹位于当前位置。

scala> counts.saveAsTextFile("output")

检查输出

打开另一个终端进入主目录(在另一个终端中执行spark)。使用以下命令检查输出目录。

[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1

part-00000
part-00001
_SUCCESS

以下命令用于查看 Part-00000 文件的输出。

[hadoop@localhost output]$ cat part-00000

输出

(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)

以下命令用于查看 Part-00001 文件的输出。

[hadoop@localhost output]$ cat part-00001

输出

(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

联合国坚持存储

在UN-Persistence之前,如果您想查看用于此应用程序的存储空间,请在浏览器中使用以下URL。

http://localhost:4040

您将看到以下屏幕,其中显示了在Spark外壳上运行的应用程序使用的存储空间。

储存空间

如果要联合保留特定RDD的存储空间,请使用以下命令。

Scala> counts.unpersist()

你会看到输出如下 -

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

要验证浏览器中的存储空间,请使用以下URL。

http://localhost:4040/

您将看到以下屏幕。它显示了应用程序使用的存储空间,这些存储空间在Spark shell上运行。

存储空间的应用程序