小编典典

通过Python使用Spark准备我的大数据

algorithm

我的100m大小,量化数据:

(1424411938', [3885, 7898])
(3333333333', [3885, 7898])

所需结果:

(3885, [3333333333, 1424411938])
(7898, [3333333333, 1424411938])

所以我想要的是转换数据,以便我将3885(例如)与所有data[0]具有该数据的组组合在一起。这是我在python中所做的:

def prepare(data):
    result = []
    for point_id, cluster in data:
        for index, c in enumerate(cluster):
            found = 0
            for res in result:
                if c == res[0]:
                    found = 1
            if(found == 0):
                result.append((c, []))
            for res in result:
                if c == res[0]:
                    res[1].append(point_id)
    return result

但是当我mapPartitions()“编dataRDD带prepare(),它似乎做我想做的只是在当前分区,从而恢复比期望的更大的成绩。

例如,如果开始时的第一个记录在第一个分区中,第二个在第二个分区中,那么我得到的结果是:

(3885, [3333333333])
(7898, [3333333333])
(3885, [1424411938])
(7898, [1424411938])

如何修改我prepare()以获得预期的效果?或者,如何处理prepare()产生的结果,以便我可以获得所需的结果?


正如您可能已经从代码中注意到的那样,我根本不在乎速度。

这是一种创建数据的方法:

data = []
from random import randint
for i in xrange(0, 10):
    data.append((randint(0, 100000000), (randint(0, 16000), randint(0, 16000))))
data = sc.parallelize(data)

阅读 274

收藏
2020-07-28

共1个答案

小编典典

您可以使用一些基本的pyspark转换来实现此目的。

>>> rdd = sc.parallelize([(1424411938, [3885, 7898]),(3333333333, [3885, 7898])])
>>> r = rdd.flatMap(lambda x: ((a,x[0]) for a in x[1]))

我们曾经flatMap为其中的每个项目设置了一个键,值对,x[1]并将数据行格式更改为(a, x[0])a这是中的每个项目x[1]。为了flatMap更好地理解您可以查阅文档。

>>> r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])))

我们只是将所有键,值对按其键分组,并使用元组函数将可迭代的值转换为元组。

>>> r2.collect()
[(3885, (1424411938, 3333333333)), (7898, (1424411938, 3333333333))]

如您所说,您可以使用[:150]来拥有前150个元素,我想这是正确的用法:

r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])[:150]))

我试图尽可能地解释。我希望这有帮助。

2020-07-28