Python gevent 模块,pool() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gevent.pool()

项目:girlfriend    作者:chihongze    | 项目源码 | 文件源码
def execute(self, context, start_req, parser=_default_parser,
                pool=None, pool_size=None):
        """
        :param context: ?????
        :param start_req: ??????
        :param parser: Response?????
        :param concurrent: ??????????
        :param pool: ?????gevent pool
        """

        if pool or pool_size:
            # ????
            return self._concurrent_execute(
                context, start_req, parser,
                pool, pool_size)
        else:
            # ????
            return self._sync_execute(context, start_req, parser)
项目:girlfriend    作者:chihongze    | 项目源码 | 文件源码
def _concurrent_execute(self, context, start_req, parser, pool, pool_size):
        queue = Queue()  # ????

        # ????????????
        for r in start_req:
            queue.put_nowait(r)

        if pool is None:
            pool = GeventPool(pool_size)

        greenlets = []

        while True:
            try:
                req = self._check_req(queue.get(timeout=1))
                if req.parser is None:
                    req.parser = parser
                greenlets.append(pool.spawn(req, context, queue))
            except Empty:
                break

        return [greenlet.get() for greenlet in greenlets]
项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def test_pool(self):
        """?????"""
        class SocketPool(object):

            def __init__(self):
                self.pool = Pool(1000)
                self.pool.start()

            def listen(self, socket):
                while True:
                    socket.recv()

            def add_handler(self, socket):
                if self.pool.full():
                    raise Exception("At maximum pool size")
                else:
                    self.pool.spawn(self.listen, socket)

            def shutdown(self):
                self.pool.kill()
项目:PyS60-Projects    作者:gauravssnl    | 项目源码 | 文件源码
def map(requests, prefetch=True, size=None):
    """Concurrently converts a list of Requests to Responses.

    :param requests: a collection of Request objects.
    :param prefetch: If False, the content will not be downloaded immediately.
    :param size: Specifies the number of requests to make at a time. If None, no throttling occurs.
    """

    requests = list(requests)

    pool = Pool(size) if size else None
    jobs = [send(r, pool) for r in requests]
    gevent.joinall(jobs)

    if prefetch:
        [r.response.content for r in requests]

    return [r.response for r in requests]
项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def run_application(self):
        upgrade_header = self.environ.get('HTTP_UPGRADE', '').lower()
        if upgrade_header:
            # Build and start the HTTP response
            self.environ['ws4py.socket'] = self.socket or self.environ['wsgi.input'].rfile._sock
            self.result = self.application(self.environ, self.start_response) or []
            self.process_result()
            del self.environ['ws4py.socket']
            self.socket = None
            self.rfile.close()

            ws = self.environ.pop('ws4py.websocket', None)
            if ws:
                ws_greenlet = self.server.pool.track(ws)
                # issue #170
                # in gevent 1.1 socket will be closed once application returns
                # so let's wait for websocket handler to finish
                ws_greenlet.join()
        else:
            gevent.pywsgi.WSGIHandler.run_application(self)
项目:dnsfind    作者:smarttang    | 项目源码 | 文件源码
def run(self):
        # ??????????
        # ??????????
        findreport(self.document)

        # ???????????
        # ?????????????????
        # ????????????block???
        block_check_results = self.checkdomain('d312379196bd822558ca7dfb3c95ba61.'+self.options['target'],'block')

        if block_check_results:
            self.blockip = block_check_results[0]

        # ????
        dic_list = (dic.strip('\n')+'.'+self.options['target'] for dic in open(getpath() + '/' +self.options['dictname'],'r'))
        # ??????
        self.pool.map(self.checkdomain,dic_list)
项目:agentzero    作者:gabrielfalcao    | 项目源码 | 文件源码
def loop(self):
        self.listen()
        self.connect()
        self.logger.info('listening for jobs on %s', self.address)
        while self.should_run():
            if self.pool.free_count() == 0:
                self.logger.info('waiting for an execution slot')
                self.pool.wait_available()

            job = self.sockets.recv_safe('pull-in')

            if job:
                self.logger.info('received job')
                self.pool.spawn(self.dispatch, job)
            else:
                self.notify_available()
                gevent.sleep(1)
项目:subDomainsBrute    作者:0xa-saline    | 项目源码 | 文件源码
def _load_dns_servers(self):
        print '[+] Validate DNS servers ...'
        self.dns_servers = []
        pool = Pool(30)
        for server in open('dict/dns_servers.txt').xreadlines():
            server = server.strip()
            if server:
                pool.apply_async(self._test_server, (server,))
        pool.join()

        self.dns_count = len(self.dns_servers)
        sys.stdout.write('\n')
        print '[+] Found %s available DNS Servers in total' % self.dns_count
        if self.dns_count == 0:
            print '[ERROR] No DNS Servers available.'
            sys.exit(-1)
项目:httphose    作者:HarryR    | 项目源码 | 文件源码
def run(self):
        if self.beanstalk:
            generator = self.beanstalk.get_workgenerator(self)
        else:
            generator = ListWorkGenerator(self)

        pool = gevent.pool.Pool(self.options.concurrency)
        self.finished = 0
        if self.progress:
            self.progress.start(generator.total)

        try:
            for worker in generator.getall():
                pool.add(gevent.spawn(worker.run))
        except KeyboardInterrupt:
            print("Ctrl+C caught... stopping")
        pool.join()

        if self.progress:
            self.progress.finish()
项目:dnsbrute    作者:XiphosResearch    | 项目源码 | 文件源码
def run(args):
    if args.download:
        resolvers = download_resolvers()
    else:
        resolvers = load_resolvers(args.resolvers)
    random.shuffle(resolvers)

    pool = gevent.pool.Pool(args.concurrency)

    bar = progressbar.ProgressBar(redirect_stdout=True, redirect_stderr=True)
    for resolver in bar(resolvers):
        pool.add(gevent.spawn(check_resolver, args, resolver))
    pool.join()
项目:haveibeenpwned    作者:kernelmachine    | 项目源码 | 文件源码
def __init__(self):
        self.pool_size = None
        self.pool = Pool(self.pool_size)
        self.session = requests.Session()
        self.timeout = 10
        self.url = None
        self.response = None
项目:haveibeenpwned    作者:kernelmachine    | 项目源码 | 文件源码
def send(self,hibp_obj):
        '''
        Spawns gevent/pool threads that will run the execute method on each
        HIBP object.

        Attributes:
            - hibp_obj -> HIBP object
        '''
        if self.pool is not None:
            return self.pool.spawn(hibp_obj.execute)
        return gevent.spawn(hibp_obj.execute)
项目:haveibeenpwned    作者:kernelmachine    | 项目源码 | 文件源码
def imap(self,hibp_objs):
        '''
        Lazily + Asynchronously map the HIBP execution job to multiple queries.

        Attributes:
            - hibp_objs - list of HIBP objects
        '''
        for hibp_obj in self.pool.imap_unordered(HIBP.execute, hibp_objs):
                yield hibp_obj.response
        self.pool.join()
项目:haveibeenpwned    作者:kernelmachine    | 项目源码 | 文件源码
def __init__(self):
        self.pool_size = None
        self.pool = Pool(self.pool_size)
        self.session = requests.Session()
        self.timeout = 10
        self.url = None
        self.response = None
项目:haveibeenpwned    作者:kernelmachine    | 项目源码 | 文件源码
def send(self,hibp_obj):
        '''
        Spawns gevent/pool threads that will run the execute method on each
        HIBP object.

        Attributes:
            - hibp_obj -> HIBP object
        '''
        if self.pool is not None:
            return self.pool.spawn(hibp_obj.execute)
        return gevent.spawn(hibp_obj.execute)
项目:haveibeenpwned    作者:kernelmachine    | 项目源码 | 文件源码
def __init__(self):
        self.pool_size = None
        self.pool = Pool(self.pool_size)
        self.session = requests.Session()
        self.timeout = 10
        self.url = None
        self.response = None
项目:haveibeenpwned    作者:kernelmachine    | 项目源码 | 文件源码
def send(self,hibp_obj):
        '''
        Spawns gevent/pool threads that will run the execute method on each
        HIBP object.

        Attributes:
            - hibp_obj -> HIBP object
        '''
        if self.pool is not None:
            return self.pool.spawn(hibp_obj.execute)
        return gevent.spawn(hibp_obj.execute)
项目:haveibeenpwned    作者:kernelmachine    | 项目源码 | 文件源码
def imap(self,hibp_objs):
        '''
        Lazily + Asynchronously map the HIBP execution job to multiple queries.

        Attributes:
            - hibp_objs - list of HIBP objects
        '''
        for hibp_obj in self.pool.imap_unordered(HIBP.execute, hibp_objs):
                yield hibp_obj.response
        self.pool.join()
项目:corvus-web-public    作者:eleme    | 项目源码 | 文件源码
def run(self):
        logger.info("starting task daemon...")

        pool = gevent.pool.Pool(self.pool_size)
        for i in range(self.pool_size):
            pool.apply_async(self.consumer)

        p = gevent.spawn(self.producer)
        p.join()
项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def gevent_click_page():
    global TRY_COUNT
    TRY_COUNT = int(sys.argv[1])

    _log.info('????????...')
    # ????????
    driver = webdriver.PhantomJS()
    driver.get('https://www.xncoding.com/archives/')
    # driver.maximize_window()
    posts_count = len(driver.find_elements_by_xpath(
        '//article/header/h1[@class="post-title"]/a[@class="post-title-link"]'))
    driver.close()
    # gevent?pool??
    psize = posts_count / THREAD_COUNT
    _log.info('???????:{}, ??????????:{}'.format(posts_count, psize))
    group = Group()
    for i in range(0, THREAD_COUNT + 1):
        group.add(gevent.spawn(_click_page, posts_count, psize, i))
    group.join()

    _log.info('????...')
项目:Python_Study    作者:thsheep    | 项目源码 | 文件源码
def boss():
    # headers = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}

    def _(url):
        # response = requests.get(url, headers=headers)
        response = requests.get(url)
        # sleep(300 / 1000)
        print(url)
        tasks.put_nowait(url)

    global nums
    page_url_base = 'http://www.mala.cn/forum-70-{0}.html'
    page_urls = [page_url_base.format(i) for i in range(1, 100)]
    nums = len(page_urls)
    # [pool.apply_async(_, args=(obj,)) for obj in page_urls]
    [pool.apply(_, args=(obj,)) for obj in page_urls]
    # pool.map_async(_, page_urls)
项目:isilon_data_insights_connector    作者:Isilon    | 项目源码 | 文件源码
def __init__(self, pidfile):
        """
        Initialize.
        :param: pidfile is the path to the daemon's pidfile (required).
        """
        super(IsiDataInsightsDaemon, self).__init__(pidfile=pidfile)
        self._stat_sets = {}
        self._update_intervals = []
        self._stats_processor = None
        self._stats_processor_args = None
        self._process_stats_func = None
        self.async_worker_pool = gevent.pool.Pool(MAX_ASYNC_QUERIES)
项目:squishy    作者:tmehlinger    | 项目源码 | 文件源码
def __init__(self, func, pool_size=100, timeout=None):
        # XXX: Is it necessary to patch all? I know we need at least, socket,
        # ssl, dns, and signal. There may be calls inside boto/botocore that
        # require more patching.
        super(GeventWorker, self).__init__(func, pool_size=pool_size,
                                           timeout=timeout)
        self.logger = get_logger(__name__)
        self.pool = Pool(size=pool_size)
项目:squishy    作者:tmehlinger    | 项目源码 | 文件源码
def process_messages(self, messages):
        greenlet_to_message = {}
        processed = []

        self.logger.debug('processesing %d messages', len(messages))

        for message in messages:
            try:
                g = self.pool.spawn(self.func, message)
            except:
                self.logger.exception('cannot submit jobs to pool')
                raise
            greenlet_to_message[g] = message

        for g in gevent.iwait(greenlet_to_message):
            message = greenlet_to_message.pop(g)
            try:
                if g.exception:
                    raise g.exception
            except:
                self.logger.exception('exception processing message %s',
                                      message.message_id)
            else:
                processed.append(message)

        return processed
项目:squishy    作者:tmehlinger    | 项目源码 | 文件源码
def shutdown(self):
        self.pool.join()
项目:arduino-ciao-meteor-ddp-connector    作者:andrea689    | 项目源码 | 文件源码
def run_application(self):
        upgrade_header = self.environ.get('HTTP_UPGRADE', '').lower()
        if upgrade_header:
            try:
                # Build and start the HTTP response
                self.environ['ws4py.socket'] = self.socket or self.environ['wsgi.input'].rfile._sock
                self.result = self.application(self.environ, self.start_response) or []
                self.process_result()
            except:
                raise
            else:
                del self.environ['ws4py.socket']
                self.socket = None
                self.rfile.close()

                ws = self.environ.pop('ws4py.websocket')
                if ws:
                    self.server.pool.track(ws)
        else:
            gevent.pywsgi.WSGIHandler.run_application(self)
项目:arduino-ciao-meteor-ddp-connector    作者:andrea689    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        """
        WSGI server that simply tracks websockets
        and send them a proper closing handshake
        when the server terminates.

        Other than that, the server is the same
        as its :class:`gevent.pywsgi.WSGIServer`
        base.
        """
        _WSGIServer.__init__(self, *args, **kwargs)
        self.pool = GEventWebSocketPool()
项目:arduino-ciao-meteor-ddp-connector    作者:andrea689    | 项目源码 | 文件源码
def stop(self, *args, **kwargs):
        self.pool.clear()
        _WSGIServer.stop(self, *args, **kwargs)
项目:PyS60-Projects    作者:gauravssnl    | 项目源码 | 文件源码
def send(r, pool=None):
    """Sends the request object using the specified pool. If a pool isn't 
    specified this method blocks. Pools are useful because you can specify size
    and can hence limit concurrency."""

    if pool != None:
        return pool.spawn(r.send)

    return gevent.spawn(r.send)


# Patched requests.api functions.
项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        """
        WSGI server that simply tracks websockets
        and send them a proper closing handshake
        when the server terminates.

        Other than that, the server is the same
        as its :class:`gevent.pywsgi.WSGIServer`
        base.
        """
        _WSGIServer.__init__(self, *args, **kwargs)
        self.pool = GEventWebSocketPool()
项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def stop(self, *args, **kwargs):
        self.pool.clear()
        _WSGIServer.stop(self, *args, **kwargs)
项目:dnsfind    作者:smarttang    | 项目源码 | 文件源码
def __init__(self,options):
        self.options = options
        self.blockip = None
        self.keywords = False
        self.pool = Pool(self.options['threads_count'])
        self.document = self.options['target'].replace('.','_')+'.txt'
        socket.setdefaulttimeout(self.options['timeout'])

    # ????
项目:YouPBX    作者:JoneXiong    | 项目源码 | 文件源码
def _protocol_send(self, command, args=""):
        if self._closing_state:
            return Event()
        self.trace("_protocol_send %s %s" % (command, args))
        # Append command to pool
        # and send it to eventsocket
        _cmd_uuid = str(uuid1())
        _async_res = gevent.event.AsyncResult()
        with self._lock:
            self._commands_pool.append((_cmd_uuid, _async_res))
            self._send("%s %s" % (command, args))
        self.trace("_protocol_send %s wait ..." % command)
        _uuid, event = _async_res.get()
        if _cmd_uuid != _uuid:
            raise InternalSyncError("in _protocol_send")
        # Casts Event to appropriate event type :
        # Casts to ApiResponse, if event is api
        if command == 'api':
            event = ApiResponse.cast(event)
        # Casts to BgapiResponse, if event is bgapi
        elif command == "bgapi":
            event = BgapiResponse.cast(event)
        # Casts to CommandResponse by default
        else:
            event = CommandResponse.cast(event)
        self.trace("_protocol_send %s done" % command)
        return event
项目:pyCreeper    作者:ZcyAndWt    | 项目源码 | 文件源码
def test_concurrency_with_delayed_url(self):
        dh = DownloadHandler(self.spider, self.driver, self.driver_sem)
        n = 5
        pool = Pool(n)
        urls = []
        for i in range(n):
            urls.append(HTTPBIN_URL + '/delay/1')
        time_start = time.time()
        pool.map(dh.fetch, [Request(url) for url in urls])
        time_total = time.time() - time_start
        self.assertLess(time_total, n)
项目:pyCreeper    作者:ZcyAndWt    | 项目源码 | 文件源码
def test_dynamic_request_concurrency(self):
        self.driver = webdriver.PhantomJS()
        dh = DownloadHandler(self.spider, self.driver, self.driver_sem)
        n = 5
        pool = Pool(n)
        urls = []
        for i in range(n):
            urls.append(HTTPBIN_URL + '/delay/1')
        time1 = time.time()
        pool.map(dh.fetch, [Request(url, dynamic=True, wait=5) for url in urls])
        self.assertGreater(time.time() - time1, n)
        self.driver.close()
项目:pocscan    作者:erevus-cn    | 项目源码 | 文件源码
def run_task_in_gevent(url_list, poc_file_dict):  # url_list ???????????url
    poc = Poc_Launcher()
    pool = Pool(100)
    for target in url_list:
        for plugin_type, poc_files in poc_file_dict.iteritems():
            for poc_file in poc_files:
                if target and poc_file:
                    target = fix_target(target)
                    pool.add(gevent.spawn(poc.poc_verify, target, plugin_type, poc_file))
    pool.join()
项目:data007    作者:mobishift2011    | 项目源码 | 文件源码
def benchmark():
    gevent.spawn(printstats)
    for _ in xrange(1000):
        pool.spawn(bench, itemid) 
    pool.join()
项目:data007    作者:mobishift2011    | 项目源码 | 文件源码
def main():
    import argparse
    parser = argparse.ArgumentParser(description='Listing ids of a (leaf) category')
    parser.add_argument('--cid', '-c', type=int, help='taobao cid, e.g. 51106012', required=True)
    parser.add_argument('--pool', '-p', action='store_true', help='use gevent pool to boost execution')
    parser.add_argument('--num_paths', '-n', type=int, default=2, help='number of paths, default to 2')
    parser.add_argument('--max_page', '-m', type=int, default=1, help='max page, default to 1')
    option = parser.parse_args()
    print('total items: {}'.format(len(test_list(option.cid, option.pool, option.num_paths, option.max_page))))
项目:data007    作者:mobishift2011    | 项目源码 | 文件源码
def __init__(self, poolsize=5):
        self.pool = gevent.pool.Pool(poolsize)
项目:data007    作者:mobishift2011    | 项目源码 | 文件源码
def main():
    import argparse
    parser = argparse.ArgumentParser(description='Call Worker with arguments')
    parser.add_argument('--worker', '-w', choices=['aggregate'], help='worker type, can be "aggregate"', required=True)
    parser.add_argument('--poolsize', '-p', type=int, default=5, help='gevent pool size for worker (default: %(default)s)')
    option = parser.parse_args()
    if option.worker == "aggregate":
        AggregateWorker(option.poolsize).work()
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def run_bugscan(url_list):     
    from tools.pocs.bugscan import Bugscan
    PLUGINS_DIR = 'D:\\Projects\\xlcscan\\tools\\pocs\\'
    poc = Bugscan()
    pool = Pool(100)
    for target in url_list: 
        for poc_file in bugscan_name_list:
            if target and poc_file:
                target = fix_target(target)
                poc_file = PLUGINS_DIR + 'bugscan' + '\\' + poc_file
                pool.add(gevent.spawn(poc.run, target,poc_file))
    pool.join()
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def run_task_in_gevent(url_list, poc_file_dict):  # url_list ???????????url
    poc = Poc_Launcher()
    pool = Pool(100)
    for target in url_list:
        for plugin_type, poc_files in poc_file_dict.iteritems():
            for poc_file in poc_files:
                if target and poc_file:
                    target = fix_target(target)
                    pool.add(gevent.spawn(poc.poc_verify, target, plugin_type, poc_file))
    pool.join()
项目:sec-scan-agent_v1.0    作者:Canbing007    | 项目源码 | 文件源码
def run_task_in_gevent(url_list, poc_file_dict):
    poc = Poc_Launcher()
    pool = Pool(100)
    for target in url_list:
        for poc_file in poc_file_dict:
            if target and poc_file:
                try:
                    target = fix_domain(target)
                except Exception as e:
                    target = fix_host(target)
                #print target,poc_file,"^^^^^^^^"
                pool.add(gevent.spawn(poc.poc_verify, target, poc_file))
    pool.join()
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def __init__( self, cloudDest, cbReceiveMessage, orgId, installerId, platform, architecture, 
                  sensorId = None, enrollmentToken = None, 
                  cbDebugLog = None, cbEnrollment = None ):
        gevent.Greenlet.__init__( self )
        self._cbDebugLog = cbDebugLog
        self._cbReceiveMessage = cbReceiveMessage
        self._cbEnrollment = cbEnrollment
        try:
            self._destServer, self._destPort = cloudDest.split( ':' )
        except:
            self._destServer = cloudDest
            self._destPort = 443
        self._oid = uuid.UUID( str( orgId ) )
        self._iid = uuid.UUID( str( installerId ) )
        self._sid = sensorId
        self._arch = architecture
        self._plat = platform
        if self._sid is not None:
            self._sid = uuid.UUID( str( self._sid ) )
        self._enrollmentToken = enrollmentToken
        self._socket = None

        self._threads = gevent.pool.Group()
        self._stopEvent = gevent.event.Event()
        self._lock = Semaphore( 1 )
        self._connectedEvent = gevent.event.Event()

        self._r = rpcm( isHumanReadable = True, isDebug = self._log )
        self._r.loadSymbols( Symbols.lookups )

        self._hcpModules = []
        self._hbsProfileHash = ( "\x00" * 32 )
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def querySites( queryCat, queryAction, queryData = {}, siteProc = defaultSiteProc, qProc = defaultQueryProc ):
    global sites
    p = gevent.pool.Pool()
    ctx = {}

    siteResults = [ x for x in p.imap_unordered( lambda x: querySite( queryCat, queryAction, queryData, siteProc, x, ctx ), sites ) ]

    return qProc( siteResults, ctx )


###############################################################################
# PAGES
###############################################################################
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def __init__( self, maxQps, cbLog = None ):
        self._maxQps = maxQps
        self._q = gevent.queue.Queue()
        self._log = cbLog
        self._transmitted = 0
        self._lastWait = time.time()
        self._isRunning = True
        self._threads = gevent.pool.Group()
        self._threads.add( gevent.spawn_later( 0, self._sendLoop ) )
        self._threads.add( gevent.spawn_later( 1, self._resetStats ) )
项目:agentzero    作者:gabrielfalcao    | 项目源码 | 文件源码
def __init__(self, name, steps=[]):
        self.name = name
        self.actions = Speaker(
            'actions',
            [
                'available',
                'failed',
                'started',
                'success',
                'metric',
                'error',
                'logs',
            ]
        )
        self.steps = [s.job_type for s in steps]
        self.total_steps = len(steps)
        self.context = zmq.Context()
        self.sockets = SocketManager(zmq, self.context)
        self.sockets.create('step-events', zmq.SUB)
        self.sockets.create('jobs-in', zmq.PULL)
        for step in self.steps:
            self.sockets.create(step, zmq.PUSH)

        for action in self.actions.actions.keys():
            self.bind_action(action)

        self.total_actions = len(self.actions.actions)
        self.pool = gevent.pool.Pool(self.total_actions ** (self.total_steps + 1))
        self.greenlets = []
        self._allowed_to_run = True
        self.default_interval = 0.1
        self.backend = StorageBackend()
        self.logger = logging.getLogger('pipeline')
项目:agentzero    作者:gabrielfalcao    | 项目源码 | 文件源码
def spawn(self, *args, **kw):
        self.greenlets.append(
            self.pool.spawn(*args, **kw)
        )
项目:agentzero    作者:gabrielfalcao    | 项目源码 | 文件源码
def __init__(self, pull_bind_address='tcp://127.0.0.1', subscriber_connect_address='tcp://127.0.0.1:6000', concurrency=100, timeout=1):
        self.context = zmq.Context()
        self.sockets = SocketManager(zmq, self.context)
        self.sockets.create('pull-in', zmq.PULL)
        # self.sockets.set_socket_option('pull-in', zmq.RCVHWM, concurrency)
        self.sockets.create('events', zmq.PUB)
        self.name = self.__class__.__name__
        self.subscriber_connect_address = subscriber_connect_address
        self._allowed_to_run = True
        self.pool = gevent.pool.Pool(concurrency + 1)
        self.timeout = timeout
        self.pull_bind_address = pull_bind_address
        self.id = str(uuid.uuid4())
        self.logger = self.sockets.get_logger('events', 'logs', 'logs')
项目:classifieds-to-telegram    作者:DiegoHueltes    | 项目源码 | 文件源码
def run(self, bots, wait_seconds=None):
        """
        Setup the bots and seconds to wait and spawn the required gevent
        :param bots: [Crawler]
        :param wait_seconds: seconds for checking the urls
        """
        self.wait_seconds, self.bots = wait_seconds, bots
        pool = Pool()
        pool.spawn(self.posts_fetcher)
        pool.spawn(self.telegram_posts_sender)
        pool.join()