我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用contextlib.closing()。
def fetch_data(): try: r = requests.get(MTG_JSON_URL) except requests.ConnectionError: r = requests.get(FALLBACK_MTG_JSON_URL) with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive: unzipped_files = archive.infolist() if len(unzipped_files) != 1: raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.") data = archive.read(archive.infolist()[0]) decoded_data = data.decode('utf-8') sets_data = json.loads(decoded_data) return sets_data
def zipdir(archivename, basedir): '''Zip directory, from J.F. Sebastian http://stackoverflow.com/''' assert os.path.isdir(basedir) with closing(ZipFile(archivename, "w", ZIP_DEFLATED)) as z: for root, dirs, files in os.walk(basedir): #NOTE: ignore empty directories for fn in files: if fn[-4:]!='.zip': absfn = os.path.join(root, fn) zfn = absfn[len(basedir)+len(os.sep):] #XXX: relative path z.write(absfn, zfn) # ================ Inventory input data and create data structure =================
def get_server_certificate(addr, ssl_version=PROTOCOL_SSLv23, ca_certs=None): """Retrieve the certificate from the server at the specified address, and return it as a PEM-encoded string. If 'ca_certs' is specified, validate the server cert against it. If 'ssl_version' is specified, use it in the connection attempt.""" host, port = addr if ca_certs is not None: cert_reqs = CERT_REQUIRED else: cert_reqs = CERT_NONE context = _create_stdlib_context(ssl_version, cert_reqs=cert_reqs, cafile=ca_certs) with closing(create_connection(addr)) as sock: with closing(context.wrap_socket(sock)) as sslsock: dercert = sslsock.getpeercert(True) return DER_cert_to_PEM_cert(dercert)
def watch(self, path): params = {'watch': 'true'} url = self._base_url + path header = {} if self.token: header.update({'Authorization': 'Bearer %s' % self.token}) # TODO(ivc): handle connection errors and retry on failure while True: with contextlib.closing( requests.get(url, params=params, stream=True, cert=self.cert, verify=self.verify_server, headers=header)) as response: if not response.ok: raise exc.K8sClientException(response.text) for line in response.iter_lines(delimiter='\n'): line = line.strip() if line: yield jsonutils.loads(line)
def setClient(self, client): """Associate this Server with a new local Client instance, closing the Client this Server is currently using. """ if not isinstance(client, OSCClient): raise ValueError("'client' argument is not a valid OSCClient object") if client.server != None: raise OSCServerError("Provided OSCClient already has an OSCServer-instance: %s" % str(client.server)) # Server socket is already listening at this point, so we can't use the client's socket. # we'll have to force our socket on the client... client_address = client.address() # client may be already connected client.close() # shut-down that socket # force our socket upon the client client.setServer(self) if client_address: client.connect(client_address) if not self.return_port: self.return_port = client_address[1]
def initialize(self): with self.get_conn() as conn: with closing(conn.cursor()) as cur: cur.executescript(""" CREATE TABLE metadata ( key TEXT PRIMARY KEY, value TEXT ); CREATE TABLE backupsets ( id INTEGER PRIMARY KEY, backupset_id BLOB ); CREATE TABLE segments ( incremental INTEGER, segment INTEGER PRIMARY KEY, compression TINYINT, encryption TINYINT, segment_hash BLOB, backupset_id INTEGER, FOREIGN KEY(backupset_id) REFERENCES backupsets(id) ); """) conn.commit()
def get_metadata(self): with self.get_conn() as conn: with closing(conn.cursor()) as cur: cur.execute("SELECT * FROM metadata") metadata = dict(cur.fetchall()) cur.execute("SELECT * FROM backupsets") backupsets = [b[1] for b in cur.fetchall()] return structure.Metadata( incremental=metadata['incremental'], size_of_disk=metadata['size_of_disk'], segment_size=metadata['segment_size'], timestamp=metadata['timestamp'], backupset_id=backupsets[-1], backupsets=backupsets )
def put_segments(self, segments, metadata): with self.get_conn() as conn: with closing(conn.cursor()) as cur: for segment in segments: cur.execute( "INSERT INTO segments VALUES (?, ?, ?, ?, ?, ?)", ( segment.incremental, segment.segment, segment.compression, segment.encryption, buffer(segment.segment_hash), metadata.backupsets.index(segment.backupset_id) ) ) conn.commit()
def put_metadata(self, metadata): with self.get_conn() as conn: with closing(conn.cursor()) as cur: cur.executemany( "INSERT OR REPLACE INTO metadata VALUES (?, ?)", [ ('incremental', metadata.incremental), ('segment_size', metadata.segment_size), ('size_of_disk', metadata.size_of_disk), ('timestamp', metadata.timestamp) ] ) for i, v in enumerate(metadata.backupsets): cur.execute( "INSERT INTO backupsets VALUES (?, ?)", (i, buffer(v)) ) conn.commit()
def _get_records(self): """ Get the list of installed files for the distribution :return: A list of tuples of path, hash and size. Note that hash and size might be ``None`` for some entries. The path is exactly as stored in the file (which is as in PEP 376). """ results = [] r = self.get_distinfo_resource('RECORD') with contextlib.closing(r.as_stream()) as stream: with CSVReader(stream=stream) as record_reader: # Base location is parent dir of .dist-info dir #base_location = os.path.dirname(self.path) #base_location = os.path.abspath(base_location) for row in record_reader: missing = [None for i in range(len(row), 3)] path, checksum, size = row + missing #if not os.path.isabs(path): # path = path.replace('/', os.sep) # path = os.path.join(base_location, path) results.append((path, checksum, size)) return results
def get_resource_path(self, relative_path): """ NOTE: This API may change in the future. Return the absolute path to a resource file with the given relative path. :param relative_path: The path, relative to .dist-info, of the resource of interest. :return: The absolute path where the resource is to be found. """ r = self.get_distinfo_resource('RESOURCES') with contextlib.closing(r.as_stream()) as stream: with CSVReader(stream=stream) as resources_reader: for relative, destination in resources_reader: if relative == relative_path: return destination raise KeyError('no resource file with relative path %r ' 'is installed' % relative_path)
def file(self): """ Returns a file pointer to this binary :example: >>> process_obj = c.select(Process).where("process_name:svch0st.exe").first() >>> binary_obj = process_obj.binary >>> print(binary_obj.file.read(2)) MZ """ # TODO: I don't like reaching through to the session... with closing(self._cb.session.get("/api/v1/binary/{0:s}".format(self.md5sum), stream=True)) as r: z = StringIO(r.content) zf = ZipFile(z) fp = zf.open('filedata') return fp
def api_has_bwctl(host, timeout=5, bind=None): """ Determine if a host is running the BWCTL daemon """ # Null implies localhost if host is None: host = "localhost" # HACK: BWTCLBC # If the environment says to bind to a certain address, do it. if bind is None: bind = os.environ.get('PSCHEDULER_LEAD_BIND_HACK', None) for family in [socket.AF_INET, socket.AF_INET6]: try: with closing(socket.socket(family, socket.SOCK_STREAM)) as sock: if bind is not None: sock.bind((bind, 0)) sock.settimeout(timeout) return sock.connect_ex((host, 4823)) == 0 except socket.error: pass return False
def start(conf): persistence = _get_persistence_backend(conf) if conf.taskflow.db_upgrade: with contextlib.closing(persistence.get_connection()) as conn: LOG.info('Checking for database schema upgrade') conn.upgrade() my_name = uuidutils.generate_uuid() LOG.info('I am %s', my_name) board = _get_jobboard_backend(conf, persistence=persistence) conductor = conductors.fetch( 'nonblocking', my_name, board, engine='parallel', max_simultaneous_jobs=conf.max_simultaneous_jobs, persistence=persistence) board.connect() LOG.debug('Starting taskflow conductor loop') threading.Thread(target=conductor.run).start() return persistence, board, conductor
def test_handler_callback_file_object(self): """The handler callback receives the same fd object it passed in.""" server_sock, port = bind_unused_port() fds = [] def handle_connection(fd, events): fds.append(fd) conn, addr = server_sock.accept() conn.close() self.stop() self.io_loop.add_handler(server_sock, handle_connection, IOLoop.READ) with contextlib.closing(socket.socket()) as client_sock: client_sock.connect(('127.0.0.1', port)) self.wait() self.io_loop.remove_handler(server_sock) self.io_loop.add_handler(server_sock.fileno(), handle_connection, IOLoop.READ) with contextlib.closing(socket.socket()) as client_sock: client_sock.connect(('127.0.0.1', port)) self.wait() self.assertIs(fds[0], server_sock) self.assertEqual(fds[1], server_sock.fileno()) self.io_loop.remove_handler(server_sock.fileno()) server_sock.close()
def test_multi_line_headers(self): # Multi-line http headers are rare but rfc-allowed # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2 sock, port = bind_unused_port() with closing(sock): def write_response(stream, request_data): if b"HTTP/1." not in request_data: self.skipTest("requires HTTP/1.x") stream.write(b"""\ HTTP/1.1 200 OK X-XSS-Protection: 1; \tmode=block """.replace(b"\n", b"\r\n"), callback=stream.close) def accept_callback(conn, address): stream = IOStream(conn, io_loop=self.io_loop) stream.read_until(b"\r\n\r\n", functools.partial(write_response, stream)) netutil.add_accept_handler(sock, accept_callback, self.io_loop) self.http_client.fetch("http://127.0.0.1:%d/" % port, self.stop) resp = self.wait() resp.rethrow() self.assertEqual(resp.headers['X-XSS-Protection'], "1; mode=block") self.io_loop.remove_handler(sock.fileno())
def test_max_clients(self): AsyncHTTPClient.configure(SimpleAsyncHTTPClient) with closing(AsyncHTTPClient( self.io_loop, force_instance=True)) as client: self.assertEqual(client.max_clients, 10) with closing(AsyncHTTPClient( self.io_loop, max_clients=11, force_instance=True)) as client: self.assertEqual(client.max_clients, 11) # Now configure max_clients statically and try overriding it # with each way max_clients can be passed AsyncHTTPClient.configure(SimpleAsyncHTTPClient, max_clients=12) with closing(AsyncHTTPClient( self.io_loop, force_instance=True)) as client: self.assertEqual(client.max_clients, 12) with closing(AsyncHTTPClient( self.io_loop, max_clients=13, force_instance=True)) as client: self.assertEqual(client.max_clients, 13) with closing(AsyncHTTPClient( self.io_loop, max_clients=14, force_instance=True)) as client: self.assertEqual(client.max_clients, 14)
def get_station_info(info_url=INFO_URL, parse_map=PARSE_MAP): """ Parse information for magnetometer sites that report data to the THEMIS project. Returns a mapping between station IDs and :class:`Info` regarding the site. """ station_info = OrderedDict() with closing(urlopen(info_url)) as fid: stn_data = {} for line in fid: if line.startswith('};'): key = stn_data.pop('key') if 'mlat' not in stn_data: stn_data['mlat'] = float('nan') if 'mlon' not in stn_data: stn_data['mlon'] = float('nan') station_info[key] = Info(**stn_data) stn_data = {} line = line.lstrip() for search_key, (key, convert) in parse_map.iteritems(): if line.startswith(search_key): stn_data[key] = convert(line.split('"')[1]) return station_info
def get_template_files(template_data=None, template_url=None): if template_data: tpl = template_data elif template_url: with contextlib.closing(request.urlopen(template_url)) as u: tpl = u.read() else: return {}, None if not tpl: return {}, None if isinstance(tpl, six.binary_type): tpl = tpl.decode('utf-8') template = template_format.parse(tpl) files = {} _get_file_contents(template, files) return files, template
def unbuffered(proc, stream='stdout'): stream = getattr(proc, stream) with contextlib.closing(stream): while True: out = [] last = stream.read(1) # Don't loop forever if last == '' and proc.poll() is not None: break while last not in newlines: # Don't loop forever if last == '' and proc.poll() is not None: break out.append(last) last = stream.read(1) out = ''.join(out) yield out