编写一个计算时间序列的最大跌幅的函数非常容易。花一点O(n)时间而不是O(n^2)时间来编写它。但这还不错。这将起作用:
O(n)
O(n^2)
import numpy as np import pandas as pd import matplotlib.pyplot as plt def max_dd(ser): max2here = pd.expanding_max(ser) dd2here = ser - max2here return dd2here.min()
让我们设置一个简短的系列进行尝试:
np.random.seed(0) n = 100 s = pd.Series(np.random.randn(n).cumsum()) s.plot() plt.show()
如预期的那样,max_dd(s)结束时显示出大约-17.6。好极了 现在说我对计算该系列的滚动延期感兴趣。即对于每个步骤,我想从指定长度的前面的子系列中计算最大跌幅。使用,这很容易做到pd.rolling_apply。它的工作原理如下:
max_dd(s)
pd.rolling_apply
rolling_dd = pd.rolling_apply(s, 10, max_dd, min_periods=0) df = pd.concat([s, rolling_dd], axis=1) df.columns = ['s', 'rol_dd_10'] df.plot()
这很完美。但是感觉很慢。大熊猫或其他工具包中是否有特别灵活的算法来快速完成此任务?我拍摄了一些定制的照片:它跟踪各种中间数据(观察到的最大值的位置,先前发现的亏损的位置),以减少大量的冗余计算。它确实节省了一些时间,但并没有节省很多,并且几乎没有应有的可能。
我认为这是由于Python / Numpy / Pandas中所有循环开销所致。但是我目前对Cython的流利程度还不足以真正知道如何从那个角度开始对此进行攻击。我希望有人曾经尝试过。或者,也许有人希望看一下我的“手工”代码,并愿意帮助我将其转换为Cython。
编辑:对于想查看此处提到的所有功能的人(以及其他一些人!),请访问以下网址查看iPython笔记本:http ://nbviewer.ipython.org/gist/8one6/8506455
它显示了解决此问题的某些方法之间的关系,检查它们是否给出了相同的结果,并显示了它们在各种大小的数据上的运行时间。
如果有人感兴趣,我在帖子中提到的“定制”算法是rolling_dd_custom。我认为,如果在Cython中实施,那将是一个非常快速的解决方案。
rolling_dd_custom
这是滚动最大跌幅函数的小巧版本。 windowed_view是单行函数的包装,用于numpy.lib.stride_tricks.as_strided使1d数组的内存有效2d窗口视图(下面的完整代码)。有了该窗口化视图后,计算基本上与相同max_dd,但是是为numpy数组编写的,并沿第二个轴(即axis=1)应用。
windowed_view
numpy.lib.stride_tricks.as_strided
max_dd
axis=1
def rolling_max_dd(x, window_size, min_periods=1): """Compute the rolling maximum drawdown of `x`. `x` must be a 1d numpy array. `min_periods` should satisfy `1 <= min_periods <= window_size`. Returns an 1d array with length `len(x) - min_periods + 1`. """ if min_periods < window_size: pad = np.empty(window_size - min_periods) pad.fill(x[0]) x = np.concatenate((pad, x)) y = windowed_view(x, window_size) running_max_y = np.maximum.accumulate(y, axis=1) dd = y - running_max_y return dd.min(axis=1)
这是演示该功能的完整脚本:
import numpy as np from numpy.lib.stride_tricks import as_strided import pandas as pd import matplotlib.pyplot as plt def windowed_view(x, window_size): """Creat a 2d windowed view of a 1d array. `x` must be a 1d numpy array. `numpy.lib.stride_tricks.as_strided` is used to create the view. The data is not copied. Example: >>> x = np.array([1, 2, 3, 4, 5, 6]) >>> windowed_view(x, 3) array([[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]) """ y = as_strided(x, shape=(x.size - window_size + 1, window_size), strides=(x.strides[0], x.strides[0])) return y def rolling_max_dd(x, window_size, min_periods=1): """Compute the rolling maximum drawdown of `x`. `x` must be a 1d numpy array. `min_periods` should satisfy `1 <= min_periods <= window_size`. Returns an 1d array with length `len(x) - min_periods + 1`. """ if min_periods < window_size: pad = np.empty(window_size - min_periods) pad.fill(x[0]) x = np.concatenate((pad, x)) y = windowed_view(x, window_size) running_max_y = np.maximum.accumulate(y, axis=1) dd = y - running_max_y return dd.min(axis=1) def max_dd(ser): max2here = pd.expanding_max(ser) dd2here = ser - max2here return dd2here.min() if __name__ == "__main__": np.random.seed(0) n = 100 s = pd.Series(np.random.randn(n).cumsum()) window_length = 10 rolling_dd = pd.rolling_apply(s, window_length, max_dd, min_periods=0) df = pd.concat([s, rolling_dd], axis=1) df.columns = ['s', 'rol_dd_%d' % window_length] df.plot(linewidth=3, alpha=0.4) my_rmdd = rolling_max_dd(s.values, window_length, min_periods=1) plt.plot(my_rmdd, 'g.') plt.show()
该图显示了由代码生成的曲线。绿点由计算rolling_max_dd。
rolling_max_dd
使用n = 10000和比较时间window_length = 500:
n = 10000
window_length = 500
In [2]: %timeit rolling_dd = pd.rolling_apply(s, window_length, max_dd, min_periods=0) 1 loops, best of 3: 247 ms per loop In [3]: %timeit my_rmdd = rolling_max_dd(s.values, window_length, min_periods=1) 10 loops, best of 3: 38.2 ms per loop
rolling_max_dd快了6.5倍 对于较小的窗口长度,加速效果更好。例如,使用window_length = 200,速度快将近13倍。
window_length = 200
要处理NA,可以在将数组传递给之前对Seriesusing fillna方法进行预处理rolling_max_dd。
Series
fillna