小编典典

dask分布式数据帧上的慢len函数

python

我一直在测试如何使用dask(具有20个内核的集群),而我对调用len函数与通过loc进行切片的速度感到惊讶。

import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')

log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)

#This is the code than runs slowly 
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)

print(len(logd))

#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()

任何想法为什么会发生这种情况?len的运行速度对我来说并不重要,但是我觉得由于不了解这种行为,所以我对图书馆一无所知。

在此处输入图片说明

所有绿色框均与“ from_pandas”相对应,而在本文中,Matthew
Rocklin(http://matthewrocklin.com/blog/work/2017/01/12/dask-
dataframes)调用图看起来更好(被称为len_chunk速度更快,通话似乎没有被锁定,而是等待一个工作人员完成其任务,然后再启动另一个工作)

在此处输入图片说明


阅读 225

收藏
2021-01-20

共1个答案

小编典典

很好的问题,这是关于何时将数据移至群集并向下移至客户端(您的python会话)的几点。让我们看一下计算的几个阶段

用熊猫加载数据

这是python会话中的Pandas数据框,因此显然仍在本地进程中。

log = pd.read_csv('800000test', sep='\t')  # on client

转换为惰性Dask.dataframe

这会将您的Pandas数据帧分解为20个Pandas数据帧,但是这些仍在客户端上。Dask数据帧不会急于将数据发送到群集。

logd = dd.from_pandas(log,npartitions=20)  # still on client

计算len

调用len实际上会在此处引起计算(通常您会使用df.some_aggregation().compute()。因此,Dask开始了。首先将数据移出群集(慢速),然后在20个分区的所有分区上调用len(快速),将其聚合(快速),然后然后将结果下移到您的客户端,以便可以打印。

print(len(logd))  # costly roundtrip client -> cluster -> client

分析

所以这里的问题是我们的dask.dataframe仍然在本地python会话中拥有所有数据。

例如,使用本地线程调度程序比使用分布式调度程序要快得多。这应该以毫秒为单位计算

with dask.set_options(get=dask.threaded.get):  # no cluster, just local threads
    print(len(logd))  # stays on client

但是想必您想知道如何扩展到更大的数据集,因此以正确的方式进行操作。

将数据加载到工作人员上

不要让Dask工作者在您的客户端/本地会话中加载熊猫,而是加载csv文件的位。这样,无需与客户-工人进行沟通。

# log = pd.read_csv('800000test', sep='\t')  # on client
log = dd.read_csv('800000test', sep='\t')    # on cluster workers

但是,与不同pd.read_csv,它dd.read_csv是懒惰的,因此这几乎应该立即返回。我们可以强制Dask使用persist方法实际执行计算

log = client.persist(log)  # triggers computation asynchronously

现在,集群开始起作用并将您的数据直接加载到工作进程中。这是相对较快的。请注意,在后台进行工作时,此方法会立即返回。如果您要等到完成,请致电wait

from dask.distributed import wait
wait(log)  # blocks until read is done

如果您正在使用较小的数据集进行测试并希望获得更多分区,请尝试更改块大小。

log = dd.read_csv(..., blocksize=1000000)  # 1 MB blocks

无论如何,log现在应该可以快速进行操作

len(log)  # fast

编辑

为了回答此博客文章上的问题,以下是我们对文件所在位置的假设。

通常,当您提供文件名时dd.read_csv,假定该文件对所有工作人员均可见。如果您使用的是网络文件系统或S3或HDFS之类的全局存储,则为true。如果您使用的是网络文件系统,则将要使用绝对路径(例如/path/to/myfile.*.csv),或者确保您的工作人员和客户端具有相同的工作目录。

如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散数据。

简单但次优

简单的方法是只做您最初的工作,但是保留dask.dataframe

log = pd.read_csv('800000test', sep='\t')  # on client
logd = dd.from_pandas(log,npartitions=20)  # still on client
logd = client.persist(logd)  # moves to workers

很好,但是导致交流效果不理想。

复杂但最优

相反,您可能会将数据明确分散到群集中

[future] = client.scatter([log])

不过,这会进入更复杂的API,因此我只将您指向docs

http://distributed.readthedocs.io/en/latest/manage-computation.html

http://distributed.readthedocs.io/en/latest/memory.html

http://dask.pydata.org/en/latest/ delay-
collections.html

2021-01-20