我一直在测试如何使用dask(具有20个内核的集群),而我对调用len函数与通过loc进行切片的速度感到惊讶。
import dask.dataframe as dd from dask.distributed import Client client = Client('192.168.1.220:8786') log = pd.read_csv('800000test', sep='\t') logd = dd.from_pandas(log,npartitions=20) #This is the code than runs slowly #(2.9 seconds whilst I would expect no more than a few hundred millisencods) print(len(logd)) #Instead this code is actually running almost 20 times faster than pandas logd.loc[:'Host'].count().compute()
任何想法为什么会发生这种情况?len的运行速度对我来说并不重要,但是我觉得由于不了解这种行为,所以我对图书馆一无所知。
所有绿色框均与“ from_pandas”相对应,而在本文中,Matthew Rocklin(http://matthewrocklin.com/blog/work/2017/01/12/dask- dataframes)调用图看起来更好(被称为len_chunk速度更快,通话似乎没有被锁定,而是等待一个工作人员完成其任务,然后再启动另一个工作)
很好的问题,这是关于何时将数据移至群集并向下移至客户端(您的python会话)的几点。让我们看一下计算的几个阶段
这是python会话中的Pandas数据框,因此显然仍在本地进程中。
log = pd.read_csv('800000test', sep='\t') # on client
这会将您的Pandas数据帧分解为20个Pandas数据帧,但是这些仍在客户端上。Dask数据帧不会急于将数据发送到群集。
logd = dd.from_pandas(log,npartitions=20) # still on client
调用len实际上会在此处引起计算(通常您会使用df.some_aggregation().compute()。因此,Dask开始了。首先将数据移出群集(慢速),然后在20个分区的所有分区上调用len(快速),将其聚合(快速),然后然后将结果下移到您的客户端,以便可以打印。
len
df.some_aggregation().compute()
print(len(logd)) # costly roundtrip client -> cluster -> client
所以这里的问题是我们的dask.dataframe仍然在本地python会话中拥有所有数据。
例如,使用本地线程调度程序比使用分布式调度程序要快得多。这应该以毫秒为单位计算
with dask.set_options(get=dask.threaded.get): # no cluster, just local threads print(len(logd)) # stays on client
但是想必您想知道如何扩展到更大的数据集,因此以正确的方式进行操作。
不要让Dask工作者在您的客户端/本地会话中加载熊猫,而是加载csv文件的位。这样,无需与客户-工人进行沟通。
# log = pd.read_csv('800000test', sep='\t') # on client log = dd.read_csv('800000test', sep='\t') # on cluster workers
但是,与不同pd.read_csv,它dd.read_csv是懒惰的,因此这几乎应该立即返回。我们可以强制Dask使用persist方法实际执行计算
pd.read_csv
dd.read_csv
log = client.persist(log) # triggers computation asynchronously
现在,集群开始起作用并将您的数据直接加载到工作进程中。这是相对较快的。请注意,在后台进行工作时,此方法会立即返回。如果您要等到完成,请致电wait。
wait
from dask.distributed import wait wait(log) # blocks until read is done
如果您正在使用较小的数据集进行测试并希望获得更多分区,请尝试更改块大小。
log = dd.read_csv(..., blocksize=1000000) # 1 MB blocks
无论如何,log现在应该可以快速进行操作
log
len(log) # fast
为了回答此博客文章上的问题,以下是我们对文件所在位置的假设。
通常,当您提供文件名时dd.read_csv,假定该文件对所有工作人员均可见。如果您使用的是网络文件系统或S3或HDFS之类的全局存储,则为true。如果您使用的是网络文件系统,则将要使用绝对路径(例如/path/to/myfile.*.csv),或者确保您的工作人员和客户端具有相同的工作目录。
/path/to/myfile.*.csv
如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散数据。
简单的方法是只做您最初的工作,但是保留dask.dataframe
log = pd.read_csv('800000test', sep='\t') # on client logd = dd.from_pandas(log,npartitions=20) # still on client logd = client.persist(logd) # moves to workers
很好,但是导致交流效果不理想。
相反,您可能会将数据明确分散到群集中
[future] = client.scatter([log])
不过,这会进入更复杂的API,因此我只将您指向docs
http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/ delay- collections.html