Riko - Python 流处理引擎


MIT
跨平台
Python

软件简介

Riko是一款Python 流处理引擎,类似 Yahoo Pipes
。采用纯python开发,用于分析处理结构化数据流。拥有同步和异步APIs,同时也支持并行RSS feeds。Riko也支持字符终端界面。

功能特性:

  • 可读取csv/xml/json/html文件。

  • 通过模块化的管道可创建文本流和数据流。

  • 可解析、处理、提取RSS/Atom feeds。

  • 可创建强大的混合型APIs和maps。

  • 支持并行处理。

使用示例代码:

>>> ### Create a SyncPipe flow ###
>>> #
>>> # `SyncPipe` is a convenience class that creates chainable flows
>>> # and allows for parallel processing.
>>> from riko.collections.sync import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> #   1. the `detag` option will strip all html tags from the result
>>> #   2. fetch the text contained inside the 'body' tag of the hackernews
>>> #      homepage
>>> #   3. replace newlines with spaces and assign the result to 'content'
>>> #   4. tokenize the resulting text using whitespace as the delimeter
>>> #   5. count the number of times each token appears
>>> #   6. obtain the raw stream
>>> #   7. extract the first word and its count
>>> #   8. extract the second word and its count
>>> #   9. extract the third word and its count
>>> url = 'https://news.ycombinator.com/'
>>> fetch_conf = {
...     'url': url, 'start': '<body>', 'end': '</body>', 'detag': True}  # 1
>>>
>>> replace_conf = {
...     'rule': [
...         {'find': '\r\n', 'replace': ' '},
...         {'find': '\n', 'replace': ' '}]}
>>>
>>> flow = (
...     SyncPipe('fetchpage', conf=fetch_conf)                           # 2
...         .strreplace(conf=replace_conf, assign='content')             # 3
...         .stringtokenizer(conf={'delimiter': ' '}, emit=True)         # 4
...         .count(conf={'count_key': 'content'}))                       # 5
>>>
>>> stream = flow.output                                                 # 6
>>> next(stream)                                                         # 7
{"'sad": 1}
>>> next(stream)                                                         # 8
{'(': 28}
>>> next(stream)                                                         # 9
{'(1999)': 1}