Python six 模块,moves() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves()

项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def setUp(self):
        self.cert = test_util.load_comparable_cert('rsa2048_cert.pem')
        key = test_util.load_pyopenssl_private_key('rsa2048_key.pem')
        # pylint: disable=protected-access
        certs = {b'foo': (key, self.cert.wrapped)}

        from acme.crypto_util import SSLSocket

        class _TestServer(socketserver.TCPServer):

            # pylint: disable=too-few-public-methods
            # six.moves.* | pylint: disable=attribute-defined-outside-init,no-init

            def server_bind(self):  # pylint: disable=missing-docstring
                self.socket = SSLSocket(socket.socket(), certs=certs)
                socketserver.TCPServer.server_bind(self)

        self.server = _TestServer(('', 0), socketserver.BaseRequestHandler)
        self.port = self.server.socket.getsockname()[1]
        self.server_thread = threading.Thread(
            # pylint: disable=no-member
            target=self.server.handle_request)
        self.server_thread.start()
        time.sleep(1)  # TODO: avoid race conditions in other way
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:cmd2    作者:python-cmd2    | 项目源码 | 文件源码
def test_pseudo_raw_input_tty_rawinput_true():
    # use context managers so original functions get put back when we are done
    # we dont use decorators because we need m_input for the assertion
    with mock.patch('sys.stdin.isatty',
            mock.MagicMock(name='isatty', return_value=True)):
        with mock.patch('six.moves.input',
                mock.MagicMock(name='input', side_effect=['set', EOFError])) as m_input:
            # run the cmdloop, which should pull input from our mocks
            app = cmd2.Cmd()
            app.use_rawinput = True
            app._cmdloop()
            # because we mocked the input() call, we won't get the prompt
            # or the name of the command in the output, so we can't check
            # if its there. We assume that if input got called twice, once
            # for the 'set' command, and once for the 'quit' command,
            # that the rest of it worked
            assert m_input.call_count == 2
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:diluvian    作者:aschampion    | 项目源码 | 文件源码
def remask(self):
        """Reset the mask based on the seeded connected component.
        """
        body = self.to_body()
        if not body.is_seed_in_mask():
            return False
        new_mask_bin, bounds = body.get_seeded_component(CONFIG.postprocessing.closing_shape)
        new_mask_bin = new_mask_bin.astype(np.bool)

        mask_block = self.mask[map(slice, bounds[0], bounds[1])].copy()
        # Clip any values not in the seeded connected component so that they
        # cannot not generate moves when rechecking.
        mask_block[~new_mask_bin] = np.clip(mask_block[~new_mask_bin], None, 0.9 * CONFIG.model.t_move)

        self.mask[:] = np.NAN
        self.mask[map(slice, bounds[0], bounds[1])] = mask_block
        return True
项目:diluvian    作者:aschampion    | 项目源码 | 文件源码
def generate_subvolume_bounds(filename, volumes, num_bounds, sparse=False, moves=None):
    if '{volume}' not in filename:
        raise ValueError('CSV filename must contain "{volume}" for volume name replacement.')

    if moves is None:
        moves = 5
    else:
        moves = np.asarray(moves)
    subv_shape = CONFIG.model.input_fov_shape + CONFIG.model.move_step * 2 * moves

    if sparse:
        gen_kwargs = {'sparse_margin': subv_shape}
    else:
        gen_kwargs = {'shape': subv_shape}
    for k, v in six.iteritems(volumes):
        bounds = v.downsample(CONFIG.volume.resolution)\
                  .subvolume_bounds_generator(**gen_kwargs)
        bounds = itertools.islice(bounds, num_bounds)
        SubvolumeBounds.iterable_to_csv(bounds, filename.format(volume=k))
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def setUp(self):
        self.cert = test_util.load_comparable_cert('cert.pem')
        key = test_util.load_pyopenssl_private_key('rsa512_key.pem')
        # pylint: disable=protected-access
        certs = {b'foo': (key, self.cert.wrapped)}

        from acme.crypto_util import SSLSocket

        class _TestServer(socketserver.TCPServer):

            # pylint: disable=too-few-public-methods
            # six.moves.* | pylint: disable=attribute-defined-outside-init,no-init

            def server_bind(self):  # pylint: disable=missing-docstring
                self.socket = SSLSocket(socket.socket(), certs=certs)
                socketserver.TCPServer.server_bind(self)

        self.server = _TestServer(('', 0), socketserver.BaseRequestHandler)
        self.port = self.server.socket.getsockname()[1]
        self.server_thread = threading.Thread(
            # pylint: disable=no-member
            target=self.server.handle_request)
        self.server_thread.start()
        time.sleep(1)  # TODO: avoid race conditions in other way
项目:Tencent_Cartoon_Download    作者:Fretice    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:lddmm-ot    作者:jeanfeydy    | 项目源码 | 文件源码
def get_streaming_specs(self):
        """
        Returns the streaming server, port, ssl_enabled flag, and headers.

        """
        streaming_url = get_config()['plotly_streaming_domain']
        ssl_verification_enabled = get_config()['plotly_ssl_verification']
        ssl_enabled = 'https' in streaming_url
        port = self.HTTPS_PORT if ssl_enabled else self.HTTP_PORT

        # If no scheme (https/https) is included in the streaming_url, the
        # host will be None. Use streaming_url in this case.
        host = (six.moves.urllib.parse.urlparse(streaming_url).hostname or
                streaming_url)

        headers = {'Host': host, 'plotly-streamtoken': self.stream_id}
        streaming_specs = {
            'server': host,
            'port': port,
            'ssl_enabled': ssl_enabled,
            'ssl_verification_enabled': ssl_verification_enabled,
            'headers': headers
        }

        return streaming_specs
项目:coolrelation    作者:mrtial    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:hypothesis-regex    作者:maximkulkin    | 项目源码 | 文件源码
def _test_matching_pattern(self, pattern, isvalidchar, unicode=False):
        r = unicode_regex(pattern) if unicode else ascii_regex(pattern)

        codepoints = six.moves.range(0, sys.maxunicode+1) \
            if unicode else six.moves.range(1, 128)
        for c in [six.unichr(x) for x in codepoints]:
            if isvalidchar(c):
                assert r.match(c), (
                    '"%s" supposed to match "%s" (%r, category "%s"), '
                    'but it doesnt' % (pattern, c, c, unicodedata.category(c))
                )
            else:
                assert not r.match(c), (
                    '"%s" supposed not to match "%s" (%r, category "%s"), '
                    'but it does' % (pattern, c, c, unicodedata.category(c))
                )
项目:LIE    作者:EmbraceLife    | 项目源码 | 文件源码
def import_libs():
            from __future__ import absolute_import
            from __future__ import division
            from __future__ import print_function

            import hashlib
            import os
            import shutil
            import sys
            import tarfile
            import zipfile

            import six
            from six.moves.urllib.error import HTTPError
            from six.moves.urllib.error import URLError
            from six.moves.urllib.request import urlopen

            from tensorflow.contrib.keras.python.keras.utils.generic_utils import Progbar
项目:LIE    作者:EmbraceLife    | 项目源码 | 文件源码
def import_libs():
            from __future__ import absolute_import
            from __future__ import division
            from __future__ import print_function

            import os
            import re
            import threading

            import numpy as np
            from six.moves import range  # pylint: disable=redefined-builtin

            from tensorflow.contrib.keras.python.keras import backend as K
            from tensorflow.python.platform import tf_logging as logging


        # pylint: disable=g-import-not-at-top
项目:LIE    作者:EmbraceLife    | 项目源码 | 文件源码
def import_libs():
            from __future__ import absolute_import
            from __future__ import division
            from __future__ import print_function

            import copy
            import json
            import os
            import re

            import numpy as np
            from six.moves import zip  # pylint: disable=redefined-builtin

            from tensorflow.contrib.keras.python.keras import backend as K
            from tensorflow.contrib.keras.python.keras.utils import conv_utils
            from tensorflow.contrib.keras.python.keras.utils.io_utils import ask_to_proceed_with_overwrite
            from tensorflow.contrib.keras.python.keras.utils.layer_utils import print_summary as print_layer_summary
            from tensorflow.python.framework import ops
            from tensorflow.python.framework import tensor_shape
            from tensorflow.python.layers import base as tf_base_layers
            from tensorflow.python.platform import tf_logging as logging
            from tensorflow.python.util import tf_inspect


        # pylint: disable=g-import-not-at-top
项目:python-group-proj    作者:Sharcee    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:PornGuys    作者:followloda    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:jrm_ssl    作者:Fhrozen    | 项目源码 | 文件源码
def data_feed():
    global max_len
    global batch_init
    num_networks = [int(x) for x in config.get('gpu', 'index').split(';')  ]
    num_networks = np.amax((len(num_networks),1)).astype(np.int)
    DBClass = importlib.import_module('python_utils.datareader.{}'.format(
        config.get('reader', 'data')))
    reader = getattr(DBClass,config.get('reader', 'class'))(config)
    idxs = reader.idxs
    max_len = len(idxs) - (len(idxs) % (batchsize*num_networks))
    data_q.put('train')
    if args.A == 'r': batch_init += batchsize
    for epoch in six.moves.range(init_epoch,1+epochs):
        shuffle(idxs)
        for idx in range (batch_init,max_len,batchsize*num_networks):
            data_batch = reader.read_data(idxs[idx:idx+batchsize], num_networks)
            data_q.put((epoch, idx, data_batch.copy()))
        batch_init = 0
    data_q.put('end')
    return
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def install(domain):
    """Install a _() function using the given translation domain.

    Given a translation domain, install a _() function using gettext's
    install() function.

    The main difference from gettext.install() is that we allow
    overriding the default localedir (e.g. /usr/share/locale) using
    a translation-domain-specific environment variable (e.g.
    NOVA_LOCALEDIR).

    Note that to enable lazy translation, enable_lazy must be
    called.

    :param domain: the translation domain
    """
    from six import moves
    tf = TranslatorFactory(domain)
    moves.builtins.__dict__['_'] = tf.primary
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def configure_logging(debug):
    requests_log = logging.getLogger("requests.packages.urllib3")
    requests_log.propagate = True

    logging.basicConfig(
        format=(
            "%(asctime)s [%(levelname)5s] (%(filename)20s:%(lineno)-4d):"
            " %(message)s"
        )
    )

    if debug:
        six.moves.http_client.HTTPConnection.debuglevel = 1
        logging.getLogger().setLevel(logging.DEBUG)
        requests_log.setLevel(logging.DEBUG)
    else:
        six.moves.http_client.HTTPConnection.debuglevel = 0
        logging.getLogger().setLevel(logging.CRITICAL)
        requests_log.setLevel(logging.CRITICAL)
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_move_items(item_name):
    """Ensure that everything loads correctly."""
    try:
        item = getattr(six.moves, item_name)
        if isinstance(item, types.ModuleType):
            __import__("six.moves." + item_name)
    except AttributeError:
        if item_name == "zip_longest" and sys.version_info < (2, 6):
            py.test.skip("zip_longest only available on 2.6+")
    except ImportError:
        if item_name == "winreg" and not sys.platform.startswith("win"):
            py.test.skip("Windows only module")
        if item_name.startswith("tkinter"):
            if not have_tkinter:
                py.test.skip("requires tkinter")
            if item_name == "tkinter_ttk" and sys.version_info[:2] <= (2, 6):
                py.test.skip("ttk only available on 2.7+")
        if item_name.startswith("dbm_gnu") and not have_gdbm:
            py.test.skip("requires gdbm")
        raise
    if sys.version_info[:2] >= (2, 6):
        assert item_name in dir(six.moves)
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_python_2_unicode_compatible():
    @six.python_2_unicode_compatible
    class MyTest(object):
        def __str__(self):
            return six.u('hello')

        def __bytes__(self):
            return six.b('hello')

    my_test = MyTest()

    if six.PY2:
        assert str(my_test) == six.b("hello")
        assert unicode(my_test) == six.u("hello")
    elif six.PY3:
        assert bytes(my_test) == six.b("hello")
        assert str(my_test) == six.u("hello")

    assert getattr(six.moves.builtins, 'bytes', str)(my_test) == six.b("hello")
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def _fragment_in_reverse(iterable, start=0, limit=None):
    """
    Selects a fragment of the iterable and returns the items in reverse order.

    The lowest index is 0 and designates the last line of the file.
    'Limit' specifies the number of lines to return.
    """
    # TODO: Naive implementation. Needs to be rewritten to a solution with
    # file pointer moving around etc. if it turns out that this isn't
    # performant enough.
    maxlen = None
    if limit >= 0:
        maxlen = start + limit

    fragment = collections.deque(iterable, maxlen)
    try:
        for _ in six.moves.range(start):
            fragment.pop()
    except IndexError:
        raise exc.InvalidInputError(__name__,
                                    'Index start=%s is out of range.'
                                    % str(start))

    fragment.reverse()
    return fragment
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:CloudPrint    作者:William-An    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:hate-to-hugs    作者:sdoran35    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def test_move_items(item_name):
    """Ensure that everything loads correctly."""
    try:
        item = getattr(six.moves, item_name)
        if isinstance(item, types.ModuleType):
            __import__("six.moves." + item_name)
    except AttributeError:
        if item_name == "zip_longest" and sys.version_info < (2, 6):
            py.test.skip("zip_longest only available on 2.6+")
    except ImportError:
        if item_name == "winreg" and not sys.platform.startswith("win"):
            py.test.skip("Windows only module")
        if item_name.startswith("tkinter"):
            if not have_tkinter:
                py.test.skip("requires tkinter")
            if item_name == "tkinter_ttk" and sys.version_info[:2] <= (2, 6):
                py.test.skip("ttk only available on 2.7+")
        if item_name.startswith("dbm_gnu") and not have_gdbm:
            py.test.skip("requires gdbm")
        raise
    if sys.version_info[:2] >= (2, 6):
        assert item_name in dir(six.moves)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def test_python_2_unicode_compatible():
    @six.python_2_unicode_compatible
    class MyTest(object):
        def __str__(self):
            return six.u('hello')

        def __bytes__(self):
            return six.b('hello')

    my_test = MyTest()

    if six.PY2:
        assert str(my_test) == six.b("hello")
        assert unicode(my_test) == six.u("hello")
    elif six.PY3:
        assert bytes(my_test) == six.b("hello")
        assert str(my_test) == six.u("hello")

    assert getattr(six.moves.builtins, 'bytes', str)(my_test) == six.b("hello")
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:sam-s-club-auctions    作者:sameer2800    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:six    作者:benjaminp    | 项目源码 | 文件源码
def test_move_items(item_name):
    """Ensure that everything loads correctly."""
    try:
        item = getattr(six.moves, item_name)
        if isinstance(item, types.ModuleType):
            __import__("six.moves." + item_name)
    except AttributeError:
        if item_name == "zip_longest" and sys.version_info < (2, 6):
            py.test.skip("zip_longest only available on 2.6+")
    except ImportError:
        if item_name == "winreg" and not sys.platform.startswith("win"):
            py.test.skip("Windows only module")
        if item_name.startswith("tkinter"):
            if not have_tkinter:
                py.test.skip("requires tkinter")
            if item_name == "tkinter_ttk" and sys.version_info[:2] <= (2, 6):
                py.test.skip("ttk only available on 2.7+")
        if item_name.startswith("dbm_gnu") and not have_gdbm:
            py.test.skip("requires gdbm")
        raise
    if sys.version_info[:2] >= (2, 6):
        assert item_name in dir(six.moves)
项目:six    作者:benjaminp    | 项目源码 | 文件源码
def test_python_2_unicode_compatible():
    @six.python_2_unicode_compatible
    class MyTest(object):
        def __str__(self):
            return six.u('hello')

        def __bytes__(self):
            return six.b('hello')

    my_test = MyTest()

    if six.PY2:
        assert str(my_test) == six.b("hello")
        assert unicode(my_test) == six.u("hello")
    elif six.PY3:
        assert bytes(my_test) == six.b("hello")
        assert str(my_test) == six.u("hello")

    assert getattr(six.moves.builtins, 'bytes', str)(my_test) == six.b("hello")
项目:blog_flask    作者:momantai    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:oci-python-sdk    作者:oracle    | 项目源码 | 文件源码
def __init__(self, service, config, signer, type_mapping):
        validate_config(config)
        self.signer = signer
        self.endpoint = regions.endpoint_for(
            service,
            region=config.get("region"),
            endpoint=config.get("endpoint"))

        self.complex_type_mappings = type_mapping
        self.type_mappings = merge_type_mappings(self.primitive_type_map, type_mapping)
        self.session = requests.Session()
        self.user_agent = build_user_agent(get_config_value_or_default(config, "additional_user_agent"))

        self.logger = logging.getLogger("{}.{}".format(__name__, id(self)))
        self.logger.addHandler(logging.NullHandler())
        if get_config_value_or_default(config, "log_requests"):
            self.logger.setLevel(logging.DEBUG)
            six.moves.http_client.HTTPConnection.debuglevel = 1
        else:
            six.moves.http_client.HTTPConnection.debuglevel = 0
项目:kobo    作者:release-engineering    | 项目源码 | 文件源码
def run(self):
        while (not self.kill) and (self.running or not self.pool.queue.empty()):
            try:
                item = self.pool.queue.get(timeout=self.get_timeout)
            except six.moves.queue.Empty:
                continue

            self.pool.queue_get_lock.acquire()
            self.pool.queue_processed += 1
            num = self.pool.queue_processed
            self.pool.queue_get_lock.release()

            try:
                self.process(item, num)
            except:
                self.failed = True
                self.pool.exceptions.append(sys.exc_info())
                self.pool.kill()
项目:Turkish-Language-NLP-API    作者:WoodProgrammer    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in six.moves.filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element
项目:py-secretcrypt    作者:Zemanta    | 项目源码 | 文件源码
def test_key_loaded(self):
        with open(self.key_file, 'wb') as f:
            f.write(base64.b64encode(os.urandom(16)))
        with open(self.key_file) as f:
            with mock.patch.object(six.moves.builtins, 'open') as mock_open:
                mock_open.return_value = f
                local.encrypt(b'abc')
                mock_open.assert_called_with(self.key_file, 'rb')
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _detect_gce_environment():
    """Determine if the current environment is Compute Engine.

    Returns:
        Boolean indicating whether or not the current environment is Google
        Compute Engine.
    """
    # NOTE: The explicit ``timeout`` is a workaround. The underlying
    #       issue is that resolving an unknown host on some networks will take
    #       20-30 seconds; making this timeout short fixes the issue, but
    #       could lead to false negatives in the event that we are on GCE, but
    #       the metadata resolution was particularly slow. The latter case is
    #       "unlikely".
    connection = six.moves.http_client.HTTPConnection(
        _GCE_METADATA_HOST, timeout=1)

    try:
        headers = {_METADATA_FLAVOR_HEADER: _DESIRED_METADATA_FLAVOR}
        connection.request('GET', '/', headers=headers)
        response = connection.getresponse()
        if response.status == http_client.OK:
            return (response.getheader(_METADATA_FLAVOR_HEADER) ==
                    _DESIRED_METADATA_FLAVOR)
    except socket.error:  # socket.timeout or socket.error(64, 'Host is down')
        logger.info('Timeout attempting to reach GCE metadata service.')
        return False
    finally:
        connection.close()
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def _should_write_file(self, filepath):
        # type: (str) -> bool
        """Determines whether a specific file should be written.

        :param str filepath: Full file path to file in question
        :rtype: bool
        """
        if not os.path.isfile(filepath):
            # The file does not exist, nothing to overwrite
            return True

        if self.no_overwrite:
            # The file exists and the caller specifically asked us not to overwrite anything
            _LOGGER.warning('Skipping existing output file because of "no overwrite" option: %s', filepath)
            return False

        if self.interactive:
            # The file exists and the caller asked us to be consulted on action before overwriting
            decision = six.moves.input(  # type: ignore # six.moves confuses mypy
                'Overwrite existing output file "{}" with new contents? [y/N]:'.format(filepath)
            )
            try:
                if decision.lower()[0] == 'y':
                    _LOGGER.warning('Overwriting existing output file based on interactive user decision: %s', filepath)
                    return True
                return False
            except IndexError:
                # No input is interpreted as 'do not overwrite'
                _LOGGER.warning('Skipping existing output file based on interactive user decision: %s', filepath)
                return False

        # If we get to this point, the file exists and we should overwrite it
        _LOGGER.warning('Overwriting existing output file because no action was specified otherwise: %s', filepath)
        return True
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _should_retry_response(resp_status, content):
  """Determines whether a response should be retried.

  Args:
    resp_status: The response status received.
    content: The response content body. 

  Returns:
    True if the response should be retried, otherwise False.
  """
  # Retry on 5xx errors.
  if resp_status >= 500:
    return True

  # Retry on 429 errors.
  if resp_status == _TOO_MANY_REQUESTS:
    return True

  # For 403 errors, we have to check for the `reason` in the response to
  # determine if we should retry.
  if resp_status == six.moves.http_client.FORBIDDEN:
    # If there's no details about the 403 type, don't retry.
    if not content:
      return False

    # Content is in JSON format.
    try:
      data = json.loads(content.decode('utf-8'))
      reason = data['error']['errors'][0]['reason']
    except (UnicodeDecodeError, ValueError, KeyError):
      LOGGER.warning('Invalid JSON content from response: %s', content)
      return False

    LOGGER.warning('Encountered 403 Forbidden with reason "%s"', reason)

    # Only retry on rate limit related failures.
    if reason in ('userRateLimitExceeded', 'rateLimitExceeded', ):
      return True

  # Everything else is a success or non-retriable so break.
  return False
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def test_valid_answer_github(self):
        with mock.patch.object(six.moves, 'input') as input_:
            input_.return_value = 'GitHub'
            result = _configure_publish()
        assert result == 'github'
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def test_invalid_answer(self):
        with mock.patch.object(six.moves, 'input') as input_:
            # Return a wrong answer the first time, and raise an exception
            # the second time (using this to verify the full recursive loop).
            input_.side_effect = ('bogus', RuntimeError)
            with pytest.raises(RuntimeError):
                _configure_publish()
项目:cupy    作者:cupy    | 项目源码 | 文件源码
def _get_operation_code(op):
    code = ''.join('v%d_%d = v%d;\n' % (op.num, i, v.num)
                   for i, v in enumerate(op.in_vars))
    params = ['v%d_%d' % (op.num, i)
              for i in six.moves.range(op.nin + op.nout)]
    code += op.name + '(' + ', '.join(params) + ');\n'
    code += ''.join('v%d = v%d_%d;\n' %
                    (v.num, op.num, i + op.nin)
                    for i, v in enumerate(op.out_vars))
    return code
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def test_multiple(self):
        for _ in six.moves.range(10):
            f, name = self._call("wow")
        self.assertTrue(isinstance(f, file_type))
        self.assertTrue(isinstance(name, str))
        self.assertTrue("wow-0009.conf" in name)
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def test_plugins(self):
        flags = ['--init', '--prepare', '--authenticators', '--installers']
        for args in itertools.chain(
                *(itertools.combinations(flags, r)
                  for r in six.moves.range(len(flags)))):
            self._call(['plugins'] + list(args))
项目:webapp2    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def get_all(self, argument_name, default_value=None):
        """Returns a list of query or POST arguments with the given name.

        We parse the query string and POST payload lazily, so this will be a
        slower operation on the first call.

        :param argument_name:
            The name of the query or POST argument.
        :param default_value:
            The value to return if the given argument is not present,
            None may not be used as a default, if it is then an empty
            list will be returned instead.
        :returns:
            A (possibly empty) list of values.
        """
        if self.charset and not six.PY3:
            argument_name = argument_name.encode(self.charset)

        if default_value is None:
            default_value = []

        param_value = self.params.getall(argument_name)

        if param_value is None or len(param_value) == 0:
            return default_value

        for i in six.moves.range(len(param_value)):
            if isinstance(param_value[i], cgi.FieldStorage):
                param_value[i] = param_value[i].value

        return param_value
项目:python-kingbirdclient    作者:openstack    | 项目源码 | 文件源码
def log_critical(self):
        "Translate critical-level log messages."
        return self._make_log_translation_func('critical')


# NOTE(dhellmann): When this module moves out of the incubator into
# oslo.i18n, these global variables can be moved to an integration
# module within each application.

# Create the global translation functions.