我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves()。
def setUp(self): self.cert = test_util.load_comparable_cert('rsa2048_cert.pem') key = test_util.load_pyopenssl_private_key('rsa2048_key.pem') # pylint: disable=protected-access certs = {b'foo': (key, self.cert.wrapped)} from acme.crypto_util import SSLSocket class _TestServer(socketserver.TCPServer): # pylint: disable=too-few-public-methods # six.moves.* | pylint: disable=attribute-defined-outside-init,no-init def server_bind(self): # pylint: disable=missing-docstring self.socket = SSLSocket(socket.socket(), certs=certs) socketserver.TCPServer.server_bind(self) self.server = _TestServer(('', 0), socketserver.BaseRequestHandler) self.port = self.server.socket.getsockname()[1] self.server_thread = threading.Thread( # pylint: disable=no-member target=self.server.handle_request) self.server_thread.start() time.sleep(1) # TODO: avoid race conditions in other way
def unique_everseen(iterable, key=None): "List unique elements, preserving order. Remember all elements ever seen." # unique_everseen('AAAABBBCCDAABBB') --> A B C D # unique_everseen('ABBCcAD', str.lower) --> A B C D seen = set() seen_add = seen.add if key is None: for element in six.moves.filterfalse(seen.__contains__, iterable): seen_add(element) yield element else: for element in iterable: k = key(element) if k not in seen: seen_add(k) yield element
def test_pseudo_raw_input_tty_rawinput_true(): # use context managers so original functions get put back when we are done # we dont use decorators because we need m_input for the assertion with mock.patch('sys.stdin.isatty', mock.MagicMock(name='isatty', return_value=True)): with mock.patch('six.moves.input', mock.MagicMock(name='input', side_effect=['set', EOFError])) as m_input: # run the cmdloop, which should pull input from our mocks app = cmd2.Cmd() app.use_rawinput = True app._cmdloop() # because we mocked the input() call, we won't get the prompt # or the name of the command in the output, so we can't check # if its there. We assume that if input got called twice, once # for the 'set' command, and once for the 'quit' command, # that the rest of it worked assert m_input.call_count == 2
def remask(self): """Reset the mask based on the seeded connected component. """ body = self.to_body() if not body.is_seed_in_mask(): return False new_mask_bin, bounds = body.get_seeded_component(CONFIG.postprocessing.closing_shape) new_mask_bin = new_mask_bin.astype(np.bool) mask_block = self.mask[map(slice, bounds[0], bounds[1])].copy() # Clip any values not in the seeded connected component so that they # cannot not generate moves when rechecking. mask_block[~new_mask_bin] = np.clip(mask_block[~new_mask_bin], None, 0.9 * CONFIG.model.t_move) self.mask[:] = np.NAN self.mask[map(slice, bounds[0], bounds[1])] = mask_block return True
def generate_subvolume_bounds(filename, volumes, num_bounds, sparse=False, moves=None): if '{volume}' not in filename: raise ValueError('CSV filename must contain "{volume}" for volume name replacement.') if moves is None: moves = 5 else: moves = np.asarray(moves) subv_shape = CONFIG.model.input_fov_shape + CONFIG.model.move_step * 2 * moves if sparse: gen_kwargs = {'sparse_margin': subv_shape} else: gen_kwargs = {'shape': subv_shape} for k, v in six.iteritems(volumes): bounds = v.downsample(CONFIG.volume.resolution)\ .subvolume_bounds_generator(**gen_kwargs) bounds = itertools.islice(bounds, num_bounds) SubvolumeBounds.iterable_to_csv(bounds, filename.format(volume=k))
def setUp(self): self.cert = test_util.load_comparable_cert('cert.pem') key = test_util.load_pyopenssl_private_key('rsa512_key.pem') # pylint: disable=protected-access certs = {b'foo': (key, self.cert.wrapped)} from acme.crypto_util import SSLSocket class _TestServer(socketserver.TCPServer): # pylint: disable=too-few-public-methods # six.moves.* | pylint: disable=attribute-defined-outside-init,no-init def server_bind(self): # pylint: disable=missing-docstring self.socket = SSLSocket(socket.socket(), certs=certs) socketserver.TCPServer.server_bind(self) self.server = _TestServer(('', 0), socketserver.BaseRequestHandler) self.port = self.server.socket.getsockname()[1] self.server_thread = threading.Thread( # pylint: disable=no-member target=self.server.handle_request) self.server_thread.start() time.sleep(1) # TODO: avoid race conditions in other way
def get_streaming_specs(self): """ Returns the streaming server, port, ssl_enabled flag, and headers. """ streaming_url = get_config()['plotly_streaming_domain'] ssl_verification_enabled = get_config()['plotly_ssl_verification'] ssl_enabled = 'https' in streaming_url port = self.HTTPS_PORT if ssl_enabled else self.HTTP_PORT # If no scheme (https/https) is included in the streaming_url, the # host will be None. Use streaming_url in this case. host = (six.moves.urllib.parse.urlparse(streaming_url).hostname or streaming_url) headers = {'Host': host, 'plotly-streamtoken': self.stream_id} streaming_specs = { 'server': host, 'port': port, 'ssl_enabled': ssl_enabled, 'ssl_verification_enabled': ssl_verification_enabled, 'headers': headers } return streaming_specs
def _test_matching_pattern(self, pattern, isvalidchar, unicode=False): r = unicode_regex(pattern) if unicode else ascii_regex(pattern) codepoints = six.moves.range(0, sys.maxunicode+1) \ if unicode else six.moves.range(1, 128) for c in [six.unichr(x) for x in codepoints]: if isvalidchar(c): assert r.match(c), ( '"%s" supposed to match "%s" (%r, category "%s"), ' 'but it doesnt' % (pattern, c, c, unicodedata.category(c)) ) else: assert not r.match(c), ( '"%s" supposed not to match "%s" (%r, category "%s"), ' 'but it does' % (pattern, c, c, unicodedata.category(c)) )
def import_libs(): from __future__ import absolute_import from __future__ import division from __future__ import print_function import hashlib import os import shutil import sys import tarfile import zipfile import six from six.moves.urllib.error import HTTPError from six.moves.urllib.error import URLError from six.moves.urllib.request import urlopen from tensorflow.contrib.keras.python.keras.utils.generic_utils import Progbar
def import_libs(): from __future__ import absolute_import from __future__ import division from __future__ import print_function import os import re import threading import numpy as np from six.moves import range # pylint: disable=redefined-builtin from tensorflow.contrib.keras.python.keras import backend as K from tensorflow.python.platform import tf_logging as logging # pylint: disable=g-import-not-at-top
def import_libs(): from __future__ import absolute_import from __future__ import division from __future__ import print_function import copy import json import os import re import numpy as np from six.moves import zip # pylint: disable=redefined-builtin from tensorflow.contrib.keras.python.keras import backend as K from tensorflow.contrib.keras.python.keras.utils import conv_utils from tensorflow.contrib.keras.python.keras.utils.io_utils import ask_to_proceed_with_overwrite from tensorflow.contrib.keras.python.keras.utils.layer_utils import print_summary as print_layer_summary from tensorflow.python.framework import ops from tensorflow.python.framework import tensor_shape from tensorflow.python.layers import base as tf_base_layers from tensorflow.python.platform import tf_logging as logging from tensorflow.python.util import tf_inspect # pylint: disable=g-import-not-at-top
def data_feed(): global max_len global batch_init num_networks = [int(x) for x in config.get('gpu', 'index').split(';') ] num_networks = np.amax((len(num_networks),1)).astype(np.int) DBClass = importlib.import_module('python_utils.datareader.{}'.format( config.get('reader', 'data'))) reader = getattr(DBClass,config.get('reader', 'class'))(config) idxs = reader.idxs max_len = len(idxs) - (len(idxs) % (batchsize*num_networks)) data_q.put('train') if args.A == 'r': batch_init += batchsize for epoch in six.moves.range(init_epoch,1+epochs): shuffle(idxs) for idx in range (batch_init,max_len,batchsize*num_networks): data_batch = reader.read_data(idxs[idx:idx+batchsize], num_networks) data_q.put((epoch, idx, data_batch.copy())) batch_init = 0 data_q.put('end') return
def install(domain): """Install a _() function using the given translation domain. Given a translation domain, install a _() function using gettext's install() function. The main difference from gettext.install() is that we allow overriding the default localedir (e.g. /usr/share/locale) using a translation-domain-specific environment variable (e.g. NOVA_LOCALEDIR). Note that to enable lazy translation, enable_lazy must be called. :param domain: the translation domain """ from six import moves tf = TranslatorFactory(domain) moves.builtins.__dict__['_'] = tf.primary
def configure_logging(debug): requests_log = logging.getLogger("requests.packages.urllib3") requests_log.propagate = True logging.basicConfig( format=( "%(asctime)s [%(levelname)5s] (%(filename)20s:%(lineno)-4d):" " %(message)s" ) ) if debug: six.moves.http_client.HTTPConnection.debuglevel = 1 logging.getLogger().setLevel(logging.DEBUG) requests_log.setLevel(logging.DEBUG) else: six.moves.http_client.HTTPConnection.debuglevel = 0 logging.getLogger().setLevel(logging.CRITICAL) requests_log.setLevel(logging.CRITICAL)
def test_move_items(item_name): """Ensure that everything loads correctly.""" try: item = getattr(six.moves, item_name) if isinstance(item, types.ModuleType): __import__("six.moves." + item_name) except AttributeError: if item_name == "zip_longest" and sys.version_info < (2, 6): py.test.skip("zip_longest only available on 2.6+") except ImportError: if item_name == "winreg" and not sys.platform.startswith("win"): py.test.skip("Windows only module") if item_name.startswith("tkinter"): if not have_tkinter: py.test.skip("requires tkinter") if item_name == "tkinter_ttk" and sys.version_info[:2] <= (2, 6): py.test.skip("ttk only available on 2.7+") if item_name.startswith("dbm_gnu") and not have_gdbm: py.test.skip("requires gdbm") raise if sys.version_info[:2] >= (2, 6): assert item_name in dir(six.moves)
def test_python_2_unicode_compatible(): @six.python_2_unicode_compatible class MyTest(object): def __str__(self): return six.u('hello') def __bytes__(self): return six.b('hello') my_test = MyTest() if six.PY2: assert str(my_test) == six.b("hello") assert unicode(my_test) == six.u("hello") elif six.PY3: assert bytes(my_test) == six.b("hello") assert str(my_test) == six.u("hello") assert getattr(six.moves.builtins, 'bytes', str)(my_test) == six.b("hello")
def _fragment_in_reverse(iterable, start=0, limit=None): """ Selects a fragment of the iterable and returns the items in reverse order. The lowest index is 0 and designates the last line of the file. 'Limit' specifies the number of lines to return. """ # TODO: Naive implementation. Needs to be rewritten to a solution with # file pointer moving around etc. if it turns out that this isn't # performant enough. maxlen = None if limit >= 0: maxlen = start + limit fragment = collections.deque(iterable, maxlen) try: for _ in six.moves.range(start): fragment.pop() except IndexError: raise exc.InvalidInputError(__name__, 'Index start=%s is out of range.' % str(start)) fragment.reverse() return fragment
def __init__(self, service, config, signer, type_mapping): validate_config(config) self.signer = signer self.endpoint = regions.endpoint_for( service, region=config.get("region"), endpoint=config.get("endpoint")) self.complex_type_mappings = type_mapping self.type_mappings = merge_type_mappings(self.primitive_type_map, type_mapping) self.session = requests.Session() self.user_agent = build_user_agent(get_config_value_or_default(config, "additional_user_agent")) self.logger = logging.getLogger("{}.{}".format(__name__, id(self))) self.logger.addHandler(logging.NullHandler()) if get_config_value_or_default(config, "log_requests"): self.logger.setLevel(logging.DEBUG) six.moves.http_client.HTTPConnection.debuglevel = 1 else: six.moves.http_client.HTTPConnection.debuglevel = 0
def run(self): while (not self.kill) and (self.running or not self.pool.queue.empty()): try: item = self.pool.queue.get(timeout=self.get_timeout) except six.moves.queue.Empty: continue self.pool.queue_get_lock.acquire() self.pool.queue_processed += 1 num = self.pool.queue_processed self.pool.queue_get_lock.release() try: self.process(item, num) except: self.failed = True self.pool.exceptions.append(sys.exc_info()) self.pool.kill()
def test_key_loaded(self): with open(self.key_file, 'wb') as f: f.write(base64.b64encode(os.urandom(16))) with open(self.key_file) as f: with mock.patch.object(six.moves.builtins, 'open') as mock_open: mock_open.return_value = f local.encrypt(b'abc') mock_open.assert_called_with(self.key_file, 'rb')
def _detect_gce_environment(): """Determine if the current environment is Compute Engine. Returns: Boolean indicating whether or not the current environment is Google Compute Engine. """ # NOTE: The explicit ``timeout`` is a workaround. The underlying # issue is that resolving an unknown host on some networks will take # 20-30 seconds; making this timeout short fixes the issue, but # could lead to false negatives in the event that we are on GCE, but # the metadata resolution was particularly slow. The latter case is # "unlikely". connection = six.moves.http_client.HTTPConnection( _GCE_METADATA_HOST, timeout=1) try: headers = {_METADATA_FLAVOR_HEADER: _DESIRED_METADATA_FLAVOR} connection.request('GET', '/', headers=headers) response = connection.getresponse() if response.status == http_client.OK: return (response.getheader(_METADATA_FLAVOR_HEADER) == _DESIRED_METADATA_FLAVOR) except socket.error: # socket.timeout or socket.error(64, 'Host is down') logger.info('Timeout attempting to reach GCE metadata service.') return False finally: connection.close()
def _should_write_file(self, filepath): # type: (str) -> bool """Determines whether a specific file should be written. :param str filepath: Full file path to file in question :rtype: bool """ if not os.path.isfile(filepath): # The file does not exist, nothing to overwrite return True if self.no_overwrite: # The file exists and the caller specifically asked us not to overwrite anything _LOGGER.warning('Skipping existing output file because of "no overwrite" option: %s', filepath) return False if self.interactive: # The file exists and the caller asked us to be consulted on action before overwriting decision = six.moves.input( # type: ignore # six.moves confuses mypy 'Overwrite existing output file "{}" with new contents? [y/N]:'.format(filepath) ) try: if decision.lower()[0] == 'y': _LOGGER.warning('Overwriting existing output file based on interactive user decision: %s', filepath) return True return False except IndexError: # No input is interpreted as 'do not overwrite' _LOGGER.warning('Skipping existing output file based on interactive user decision: %s', filepath) return False # If we get to this point, the file exists and we should overwrite it _LOGGER.warning('Overwriting existing output file because no action was specified otherwise: %s', filepath) return True
def _should_retry_response(resp_status, content): """Determines whether a response should be retried. Args: resp_status: The response status received. content: The response content body. Returns: True if the response should be retried, otherwise False. """ # Retry on 5xx errors. if resp_status >= 500: return True # Retry on 429 errors. if resp_status == _TOO_MANY_REQUESTS: return True # For 403 errors, we have to check for the `reason` in the response to # determine if we should retry. if resp_status == six.moves.http_client.FORBIDDEN: # If there's no details about the 403 type, don't retry. if not content: return False # Content is in JSON format. try: data = json.loads(content.decode('utf-8')) reason = data['error']['errors'][0]['reason'] except (UnicodeDecodeError, ValueError, KeyError): LOGGER.warning('Invalid JSON content from response: %s', content) return False LOGGER.warning('Encountered 403 Forbidden with reason "%s"', reason) # Only retry on rate limit related failures. if reason in ('userRateLimitExceeded', 'rateLimitExceeded', ): return True # Everything else is a success or non-retriable so break. return False
def test_valid_answer_github(self): with mock.patch.object(six.moves, 'input') as input_: input_.return_value = 'GitHub' result = _configure_publish() assert result == 'github'
def test_invalid_answer(self): with mock.patch.object(six.moves, 'input') as input_: # Return a wrong answer the first time, and raise an exception # the second time (using this to verify the full recursive loop). input_.side_effect = ('bogus', RuntimeError) with pytest.raises(RuntimeError): _configure_publish()
def _get_operation_code(op): code = ''.join('v%d_%d = v%d;\n' % (op.num, i, v.num) for i, v in enumerate(op.in_vars)) params = ['v%d_%d' % (op.num, i) for i in six.moves.range(op.nin + op.nout)] code += op.name + '(' + ', '.join(params) + ');\n' code += ''.join('v%d = v%d_%d;\n' % (v.num, op.num, i + op.nin) for i, v in enumerate(op.out_vars)) return code
def test_multiple(self): for _ in six.moves.range(10): f, name = self._call("wow") self.assertTrue(isinstance(f, file_type)) self.assertTrue(isinstance(name, str)) self.assertTrue("wow-0009.conf" in name)
def test_plugins(self): flags = ['--init', '--prepare', '--authenticators', '--installers'] for args in itertools.chain( *(itertools.combinations(flags, r) for r in six.moves.range(len(flags)))): self._call(['plugins'] + list(args))
def get_all(self, argument_name, default_value=None): """Returns a list of query or POST arguments with the given name. We parse the query string and POST payload lazily, so this will be a slower operation on the first call. :param argument_name: The name of the query or POST argument. :param default_value: The value to return if the given argument is not present, None may not be used as a default, if it is then an empty list will be returned instead. :returns: A (possibly empty) list of values. """ if self.charset and not six.PY3: argument_name = argument_name.encode(self.charset) if default_value is None: default_value = [] param_value = self.params.getall(argument_name) if param_value is None or len(param_value) == 0: return default_value for i in six.moves.range(len(param_value)): if isinstance(param_value[i], cgi.FieldStorage): param_value[i] = param_value[i].value return param_value
def log_critical(self): "Translate critical-level log messages." return self._make_log_translation_func('critical') # NOTE(dhellmann): When this module moves out of the incubator into # oslo.i18n, these global variables can be moved to an integration # module within each application. # Create the global translation functions.