PySpark RDD PySpark SparkContext PySpark广播与累积器 现在我们已经在我们的系统上安装并配置了PySpark,我们可以在Apache Spark上用Python编程。但在此之前,让我们了解Spark - RDD中的一个基本概念。 RDD代表 Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素。RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改。RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以在这些RDD上应用多个操作来完成某项任务。 要对这些RDD进行操作,有两种方法 Transformation Action 让我们详细了解这两种方式。 转换 - 这些操作应用于RDD以创建新的RDD。 Filter,groupBy和map是转换的例子。 操作 - 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序。 要在PySpark中应用任何操作,我们首先需要创建一个 PySpark RDD 。以下代码块具有PySpark RDD类的详细信息 class pyspark.RDD ( jrdd, ctx, jrdd_deserializer = AutoBatchedSerializer(PickleSerializer()) ) 让我们看看如何使用PySpark运行一些基本操作。Python文件中的以下代码创建RDD单词,其中存储了一组提到的单词。 words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) 我们现在将对单词进行一些操作。 count() 返回RDD中的元素数。 ----------------------------------------count.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) counts = words.count() print "Number of elements in RDD -> %i" % (counts) ----------------------------------------count.py--------------------------------------- 命令 - count()的命令是 $SPARK_HOME/bin/spark-submit count.py 输出 - 上述命令的输出是 Number of elements in RDD → 8 搜集() 返回RDD中的所有元素。 ----------------------------------------collect.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Collect app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) coll = words.collect() print "Elements in RDD -> %s" % (coll) ----------------------------------------collect.py--------------------------------------- 命令 - collect()的命令是 $SPARK_HOME/bin/spark-submit collect.py 输出 - 上述命令的输出是 Elements in RDD -> [ 'scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ] foreach(F) 仅返回满足foreach内函数条件的元素。在下面的示例中,我们在foreach中调用print函数,它打印RDD中的所有元素。 ----------------------------------------foreach.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "ForEach app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) def f(x): print(x) fore = words.foreach(f) ----------------------------------------foreach.py--------------------------------------- 命令 - foreach(f)的命令是 $SPARK_HOME/bin/spark-submit foreach.py 输出 - 上述命令的输出是 scala java hadoop spark akka spark vs hadoop pyspark pyspark and spark filter(f) 返回一个包含元素的新RDD,它满足过滤器内部的功能。在下面的示例中,我们过滤掉包含''spark'的字符串。 ----------------------------------------filter.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Filter app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_filter = words.filter(lambda x: 'spark' in x) filtered = words_filter.collect() print "Fitered RDD -> %s" % (filtered) ----------------------------------------filter.py---------------------------------------- 命令 - 过滤器(f)的命令是 $SPARK_HOME/bin/spark-submit filter.py 输出 - 上述命令的输出是 Fitered RDD -> [ 'spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ] map(f,preservesPartitioning = False) 通过将函数应用于RDD中的每个元素来返回新的RDD。在下面的示例中,我们形成一个键值对,并将每个字符串映射为值1。 ----------------------------------------map.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Map app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_map = words.map(lambda x: (x, 1)) mapping = words_map.collect() print "Key value pair -> %s" % (mapping) ----------------------------------------map.py--------------------------------------- 命令 - map命令(f,preservesPartitioning = False)是 $SPARK_HOME/bin/spark-submit map.py 输出 - 上述命令的输出是 Key value pair -> [ ('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1) ] reduce(F) 执行指定的可交换和关联二进制操作后,将返回RDD中的元素。在下面的示例中,我们从运算符导入add包并将其应用于'num'以执行简单的加法运算。 ----------------------------------------reduce.py--------------------------------------- from pyspark import SparkContext from operator import add sc = SparkContext("local", "Reduce app") nums = sc.parallelize([1, 2, 3, 4, 5]) adding = nums.reduce(add) print "Adding all the elements -> %i" % (adding) ----------------------------------------reduce.py--------------------------------------- 命令 - reduce(f)的命令是 $SPARK_HOME/bin/spark-submit reduce.py 输出 - 上述命令的输出是 Adding all the elements -> 15 join(other,numPartitions = None) 它返回RDD,其中包含一对带有匹配键的元素以及该特定键的所有值。在以下示例中,两个不同的RDD中有两对元素。在连接这两个RDD之后,我们得到一个RDD,其元素具有匹配的键及其值。 ----------------------------------------join.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Join app") x = sc.parallelize([("spark", 1), ("hadoop", 4)]) y = sc.parallelize([("spark", 2), ("hadoop", 5)]) joined = x.join(y) final = joined.collect() print "Join RDD -> %s" % (final) ----------------------------------------join.py--------------------------------------- 命令 - 连接命令(其他,numPartitions =无)是 $SPARK_HOME/bin/spark-submit join.py 输出 - 上述命令的输出是 Join RDD -> [ ('spark', (1, 2)), ('hadoop', (4, 5)) ] cache() 使用默认存储级别(MEMORY_ONLY)保留此RDD。您还可以检查RDD是否被缓存。 ----------------------------------------cache.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Cache app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words.cache() caching = words.persist().is_cached print "Words got chached > %s" % (caching) ----------------------------------------cache.py--------------------------------------- 命令 - cache()的命令是 $SPARK_HOME/bin/spark-submit cache.py 输出 - 上述程序的输出是 Words got cached -> True 这些是在PySpark RDD上完成的一些最重要的操作。 PySpark SparkContext PySpark广播与累积器