我在这里研究了几个“太多客户”的相关主题,但仍然无法解决我的问题,因此我不得不针对我的具体情况再次询问。
基本上,我设置了本地Postgres服务器,并且需要执行数以万计的查询,因此我使用了Python psycopg2package。这是我的代码:
import psycopg2 import pandas as pd import numpy as np from flashtext import KeywordProcessor from psycopg2.pool import ThreadedConnectionPool from concurrent.futures import ThreadPoolExecutor df = pd.DataFrame({'S':['California', 'Ohio', 'Texas'], 'T':['Dispatcher', 'Zookeeper', 'Mechanics']}) # df = pd.concat([df]*10000) # repeat df 10000 times DSN = "postgresql://User:password@localhost/db" tcp = ThreadedConnectionPool(1, 800, DSN) def do_one_query(inputS, inputT): conn = tcp.getconn() c = conn.cursor() q = r"SELECT * from eridata where "State" = 'California' and "Title" = 'Dispatcher' limit 1;" c.execute(q) all_results = c.fetchall() for row in all_results: return row tcp.putconn(conn, close=True) cnt=0 for idx, row in df.iterrows(): cnt+=1 with ThreadPoolExecutor(max_workers=1) as pool: ret = pool.submit(do_one_query, row["S"], row["T"]) print ret.result() print cnt
较小的df,代码运行良好。如果我重复df 10000次,则会收到错误消息,提示连接池已耗尽。尽管我使用的连接已被以下行关闭:
tcp.putconn(conn,close = True)但是我想实际上它们没有关闭吗?我该如何解决这个问题?
您需要在池顶部使用队列。
类似于以下内容的东西应该起作用:
import gevent, sys, random, psycopg2, logging from contextlib import contextmanager from gevent.queue import Queue from gevent.socket import wait_read, wait_write from psycopg2.pool import ThreadedConnectionPool from psycopg2 import extensions, OperationalError import sys logger = logging.getLogger(__name__) poolsize = 100 #number of max connections pdsn = '' # put your dsn here if sys.version_info[0] >= 3: integer_types = (int,) else: import __builtin__ integer_types = (int, __builtin__.long) class ConnectorError(Exception): """ This is a base class for all CONNECTOR related exceptions """ pass #singleton connection pool, gets reset if a connection is bad or drops _pgpool = None def pgpool(): global _pgpool if not _pgpool: try: _pgpool = PostgresConnectionPool(maxsize=poolsize) except psycopg2.OperationalError as exc: _pgpool = None return _pgpool class Pcursor(object): def __init__(self, **kwargs): #in case of a lost connection lets sit and wait till it's online global _pgpool if not _pgpool: while not _pgpool: try: pgpool() except: logger.debug('Attempting Connection To Postgres...') gevent.sleep(1) def fetchone(self, PSQL, *args): with _pgpool.cursor() as cursor: try: cursor.execute(PSQL, args) except TypeError: cursor.execute(PSQL, args[0]) except Exception as exc: print(sys._getframe().f_back.f_code) print(sys._getframe().f_back.f_code.co_name) logger.warning(str(exc)) logger.debug(cursor.query) return cursor.fetchone() def fetchall(self, PSQL, *args): with _pgpool.cursor() as cursor: try: cursor.execute(PSQL, args) except TypeError: cursor.execute(PSQL, args[0]) except Exception as exc: print(sys._getframe().f_back.f_code) print(sys._getframe().f_back.f_code.co_name) logger.warning(str(exc)) logger.debug(cursor.query) return cursor.fetchall() def execute(self, PSQL, *args): with _pgpool.cursor() as cursor: try: cursor.execute(PSQL, args) except TypeError: cursor.execute(PSQL, args[0]) except Exception as exc: print(sys._getframe().f_back.f_code) print(sys._getframe().f_back.f_code.co_name) logger.warning(str(exc)) finally: logger.debug(cursor.query) return cursor.query def fetchmany(self, PSQL, *args): with _pgpool.cursor() as cursor: try: cursor.execute(PSQL, args) except TypeError: cursor.execute(PSQL, args[0]) while 1: items = cursor.fetchmany() if not items: break for item in items: yield item class AbstractDatabaseConnectionPool(object): def __init__(self, maxsize=poolsize): if not isinstance(maxsize, integer_types): raise TypeError('Expected integer, got %r' % (maxsize, )) self.maxsize = maxsize self.pool = Queue() self.size = 0 def create_connection(self): #overridden by PostgresConnectionPool raise NotImplementedError() def get(self): pool = self.pool if self.size >= self.maxsize or pool.qsize(): return pool.get() self.size += 1 try: new_item = self.create_connection() except: self.size -= 1 raise return new_item def put(self, item): self.pool.put(item) def closeall(self): while not self.pool.empty(): conn = self.pool.get_nowait() try: conn.close() except Exception: pass @contextmanager def connection(self, isolation_level=None): conn = self.get() try: if isolation_level is not None: if conn.isolation_level == isolation_level: isolation_level = None else: conn.set_isolation_level(isolation_level) yield conn except: if conn.closed: conn = None self.closeall() raise else: if conn.closed: raise OperationalError("Cannot commit because connection was closed: %r" % (conn, )) finally: if conn is not None and not conn.closed: if isolation_level is not None: conn.set_isolation_level(isolation_level) self.put(conn) @contextmanager def cursor(self, *args, **kwargs): isolation_level = kwargs.pop('isolation_level', None) with self.connection(isolation_level) as conn: try: yield conn.cursor(*args, **kwargs) except: global _pgpool _pgpool = None del(self) class PostgresConnectionPool(AbstractDatabaseConnectionPool): def __init__(self,**kwargs): try: self.pconnect = ThreadedConnectionPool(1, poolsize, dsn=pdsn) except: global _pgpool _pgpool = None raise ConnectorError('Database Connection Failed') maxsize = kwargs.pop('maxsize', None) self.kwargs = kwargs AbstractDatabaseConnectionPool.__init__(self, maxsize) def create_connection(self): self.conn = self.pconnect.getconn() self.conn.autocommit = True return self.conn def gevent_wait_callback(conn, timeout=None): """A wait callback useful to allow gevent to work with Psycopg.""" while 1: state = conn.poll() if state == extensions.POLL_OK: break elif state == extensions.POLL_READ: wait_read(conn.fileno(), timeout=timeout) elif state == extensions.POLL_WRITE: wait_write(conn.fileno(), timeout=timeout) else: raise ConnectorError("Bad result from poll: %r" % state) extensions.set_wait_callback(gevent_wait_callback)
然后,您可以通过以下方式调用连接:
import db db.Pcursor().execute(PSQL, arg1, arg2, arg3)
基本上,我借用了异步postgres的gevent示例,并将其修改为通过pyscopg2支持线程池。
https://github.com/gevent/gevent/blob/master/examples/psycopg2_pool.py
我在模块内部添加了psycogreen的功能,因此您所需要做的就是导入并调用该类。每次对该类的调用都会在队列上堆叠一个新查询,但仅使用特定大小的池。这样,您就不会耗尽连接。这基本上类似于PGBouncer所做的,我认为这也可以消除您的问题。
https://pgbouncer.github.io/