Python six 模块,u() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.u()

项目:paragraph2vec    作者:thunlp    | 项目源码 | 文件源码
def deaccent(text):
    """
    Remove accentuation from the given string. Input text is either a unicode string or utf8 encoded bytestring.

    Return input string with accents removed, as unicode.

    >>> deaccent("Šéf chomutovských komunist? dostal poštou bílý prášek")
    u'Sef chomutovskych komunistu dostal postou bily prasek'

    """
    if not isinstance(text, unicode):
        # assume utf8 for byte strings, use default (strict) error handling
        text = text.decode('utf8')
    norm = unicodedata.normalize("NFD", text)
    result = u('').join(ch for ch in norm if unicodedata.category(ch) != 'Mn')
    return unicodedata.normalize("NFC", result)
项目:fabric-sdk-py    作者:hyperledger    | 项目源码 | 文件源码
def enroll(self, enrollment_id, enrollment_secret):
        """Enroll a registered user in order to receive a signed X509 certificate

        Args:
            enrollment_id (str): The registered ID to use for enrollment
            enrollment_secret (str): The secret associated with the
                                     enrollment ID

        Returns: PEM-encoded X509 certificate

        Raises:
            RequestException: errors in requests.exceptions
            ValueError: Failed response, json parse error, args missing

        """
        private_key = self._crypto.generate_private_key()
        csr = self._crypto.generate_csr(private_key, x509.Name(
            [x509.NameAttribute(NameOID.COMMON_NAME, six.u(enrollment_id))]))
        cert = self._ca_client.enroll(
            enrollment_id, enrollment_secret,
            csr.public_bytes(Encoding.PEM).decode("utf-8"))

        return Enrollment(private_key, cert)
项目:git-stacktrace    作者:pinterest    | 项目源码 | 文件源码
def pickaxe(snippet, git_range, filename=None):
    """Run git log -S <snippet> <git_range> <filename>

    Use git pickaxe to 'Look for differences that change the number of occurrences of the
    specified string'

    If filename is passed in only look in that file

    Return list of commits that modified that snippet
    """
    cmd = 'git', 'log', '-b', '--pretty=%H', '-S', six.u(snippet), git_range
    if filename:
        cmd = cmd + ('--', filename,)
    commits = run_command(*cmd).splitlines()
    commits = [(commit, line_removed(snippet, commit)) for commit in commits]
    # Couldn't find a good way to POSIX regex escape the code and use regex
    # pickaxe to match full lines, so filter out partial results here.
    # Filter out results that aren't a full line
    commits = [commit for commit in commits if commit[1] is not None]
    return commits
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def setUp(self):
        @TaskRegistry.register
        class RandomTask(Task):
            ID = 'random'
            NAME = 'Random'

            class Input:
                string = field.StringField(default=six.u('ABCD'))
                integer = field.IntegerField(default=5)
                instream = field.StringStreamField()
                integer_array = field.ListField(field.IntegerField())
                anyf = field.AnyField(default={1: 2})

            class Output:
                floating = field.FloatField(default=1.5)
                none = field.IntegerField()
                outstream = field.ByteStreamField()

        self.RandomTask = RandomTask
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_byte_field(self):
        instance = ByteField()
        self.assertEqual(instance.get_initial(), None)

        with self.assertRaises(InvalidValueException):
            instance.validate(1)

        with self.assertRaises(InvalidValueException):
            instance.validate(1.5)

        with self.assertRaises(InvalidValueException):
            instance.validate(six.u('ABC'))

        instance = ByteField(default=six.b('ABC'))
        self.assertEqual(instance.get_initial(), six.b('ABC'))

        instance.validate(six.b('hello'))
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_list_field(self):
        instance = ListField(StringField(), default=[six.u('XYZ')])
        self.assertEqual(instance.create(), [six.u('XYZ')])

        val = instance.create()
        val.append(six.u('ABC'))
        self.assertEqual(val, [six.u('XYZ'), six.u('ABC')])

        val = instance.create([six.u('A'), six.u('B'), six.u('C')])
        self.assertEqual(val, [six.u('A'), six.u('B'), six.u('C')])

        self.assertEqual(val.pop(), six.u('C'))
        self.assertEqual(val.pop(0), six.u('A'))

        instance = ListField(FloatField())
        self.assertEqual(instance.create(), [])

        with self.assertRaises(InvalidValueException):
            instance.create().append(six.u('ABC'))
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_struct_field(self):
        instance = StructField(a=IntegerField(), b=FloatField())
        val = instance.create()
        val.a = 100
        val.b = 3.14
        self.assertEqual(val.a, 100)

        nested_instance = StructField(
            a=IntegerField(),
            b=StructField(
                c=FloatField(),
                d=StringField(default=six.u('hello world'))
            )
        )

        val = nested_instance.create()
        val.a = 100
        val.b.c = 3.14
        self.assertEqual(val.b.c, 3.14)
        self.assertEqual(val.b.d, six.u('hello world'))
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def format(self):
        """Format the stack ready for printing.

        Returns a list of strings ready for printing.  Each string in the
        resulting list corresponds to a single frame from the stack.
        Each string ends in a newline; the strings may contain internal
        newlines as well, for those items with source text lines.
        """
        result = []
        for frame in self:
            row = []
            row.append(u('  File "{0}", line {1}, in {2}\n').format(
                _some_fs_str(frame.filename), frame.lineno, frame.name))
            if frame.line:
                row.append(u('    {0}\n').format(frame.line.strip()))
            if frame.locals:
                for name, value in sorted(frame.locals.items()):
                    row.append(u('    {name} = {value}\n').format(name=name, value=value))
            result.append(u('').join(row))
        return result
项目:deb-python-traceback2    作者:openstack    | 项目源码 | 文件源码
def format(self):
        """Format the stack ready for printing.

        Returns a list of strings ready for printing.  Each string in the
        resulting list corresponds to a single frame from the stack.
        Each string ends in a newline; the strings may contain internal
        newlines as well, for those items with source text lines.
        """
        result = []
        for frame in self:
            row = []
            row.append(u('  File "{0}", line {1}, in {2}\n').format(
                _some_fs_str(frame.filename), frame.lineno, frame.name))
            if frame.line:
                row.append(u('    {0}\n').format(frame.line.strip()))
            if frame.locals:
                for name, value in sorted(frame.locals.items()):
                    row.append(u('    {name} = {value}\n').format(name=name, value=value))
            result.append(u('').join(row))
        return result
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def test_unicode_sequence_form_value(resp_mock, mock):
    http = mock.return_value
    http.request.return_value = (Mock(), Mock())

    data = {
        "body": [u('\xe5'), u('\xe7')],
    }

    resources.make_request("POST", "http://www.example.com", data=data)

    http.request.assert_called_with(
        "http://www.example.com",
        "POST",
        headers=None,
        body="body=%C3%A5&body=%C3%A7",
    )
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_safe_decode(self):
        safe_decode = encodeutils.safe_decode
        self.assertRaises(TypeError, safe_decode, True)
        self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b("ni\xc3\xb1o"),
                         incoming="utf-8"))
        if six.PY2:
            # In Python 3, bytes.decode() doesn't support anymore
            # bytes => bytes encodings like base64
            self.assertEqual(six.u("test"), safe_decode("dGVzdA==",
                             incoming='base64'))

        self.assertEqual(six.u("strange"), safe_decode(six.b('\x80strange'),
                         errors='ignore'))

        self.assertEqual(six.u('\xc0'), safe_decode(six.b('\xc0'),
                         incoming='iso-8859-1'))

        # Forcing incoming to ascii so it falls back to utf-8
        self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b('ni\xc3\xb1o'),
                         incoming='ascii'))

        self.assertEqual(six.u('foo'), safe_decode(b'foo'))
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_unicode_or_str_exception(self):
        # Exception with __str__() and __unicode__() methods
        class UnicodeOrStrException(Exception):
            def __init__(self, unicode_value, str_value):
                Exception.__init__(self)
                self.unicode_value = unicode_value
                self.str_value = str_value

            def __unicode__(self):
                return self.unicode_value

            def __str__(self):
                return self.str_value

        # __unicode__() returns unicode
        exc = UnicodeOrStrException(u'unicode \xe9\u20ac', b'str')
        self.assertEqual(encodeutils.exception_to_unicode(exc),
                         u'unicode \xe9\u20ac')

        # __unicode__() returns bytes (does this case really happen in the
        # wild?)
        exc = UnicodeOrStrException(b'utf-8 \xc3\xa9\xe2\x82\xac', b'str')
        self.assertEqual(encodeutils.exception_to_unicode(exc),
                         u'utf-8 \xe9\u20ac')
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_unicode_only_exception(self):
        # Exception with a __unicode__() method and a __str__() which
        # raises an exception (similar to the Message class of oslo_i18n)
        class UnicodeOnlyException(Exception):
            def __init__(self, value):
                Exception.__init__(self)
                self.value = value

            def __unicode__(self):
                return self.value

            def __str__(self):
                raise UnicodeError("use unicode()")

        # __unicode__() returns unicode
        exc = UnicodeOnlyException(u'unicode \xe9\u20ac')
        self.assertEqual(encodeutils.exception_to_unicode(exc),
                         u'unicode \xe9\u20ac')

        # __unicode__() returns bytes
        exc = UnicodeOnlyException(b'utf-8 \xc3\xa9\xe2\x82\xac')
        self.assertEqual(encodeutils.exception_to_unicode(exc),
                         u'utf-8 \xe9\u20ac')
项目:paragraph2vec    作者:thunlp    | 项目源码 | 文件源码
def tokenize(text, lowercase=False, deacc=False, errors="strict", to_lower=False, lower=False):
    """
    Iteratively yield tokens as unicode strings, optionally also lowercasing them
    and removing accent marks.

    Input text may be either unicode or utf8-encoded byte string.

    The tokens on output are maximal contiguous sequences of alphabetic
    characters (no digits!).

    >>> list(tokenize('Nic nem?že let?t rychlostí vyšší, než 300 tisíc kilometr? za sekundu!', deacc = True))
    [u'Nic', u'nemuze', u'letet', u'rychlosti', u'vyssi', u'nez', u'tisic', u'kilometru', u'za', u'sekundu']

    """
    lowercase = lowercase or to_lower or lower
    text = to_unicode(text, errors=errors)
    if lowercase:
        text = text.lower()
    if deacc:
        text = deaccent(text)
    for match in PAT_ALPHABETIC.finditer(text):
        yield match.group()
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def write_np_values(values, f):
    """
    Arguments:
        values: {str: np.array}
        f: filename or filelike object
    """
    with ZipFile(f, 'w') as zf:
        for k, v in values.items():
            # Need to do this because Python zipfile has some odd support for filenames:
            # http://bugs.python.org/issue24110
            if len(k) == 16 and isinstance(k, six.binary_type):  # valid UUID bytes
                zf.writestr(str(uuid.UUID(bytes=k)), v.tostring())
            else:
                zf.writestr(six.u(k), v.tostring())

        zf.writestr(MANIFEST_FILENAME, json_dumps_manifest(values))
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def fetch_captcha_store(self, name, value, attrs=None, generator=None):
        """
        Fetches a new CaptchaStore
        This has to be called inside render
        """
        try:
            reverse('captcha-image', args=('dummy',))
        except NoReverseMatch:
            raise ImproperlyConfigured('Make sure you\'ve included captcha.urls as explained in the INSTALLATION section on http://readthedocs.org/docs/django-simple-captcha/en/latest/usage.html#installation')

        if settings.CAPTCHA_GET_FROM_POOL:
            key = CaptchaStore.pick()
        else:
            key = CaptchaStore.generate_key(generator)

        # these can be used by format_output and render
        self._value = [key, u('')]
        self._key = key
        self.id_ = self.build_attrs(attrs).get('id', None)
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def render(self, name, value, attrs=None):
        self.fetch_captcha_store(name, value, attrs, self._args.get('generator'))

        context = {
            'image': self.image_url(),
            'name': name,
            'key': self._key,
            'id': u'%s_%s' % (self._args.get('id_prefix'), attrs.get('id')) if self._args.get('id_prefix') else attrs.get('id')
        }
        if settings.CAPTCHA_FLITE_PATH:
            context.update({'audio': self.audio_url()})

        self.image_and_audio = render_to_string(settings.CAPTCHA_IMAGE_TEMPLATE, context)
        self.hidden_field = render_to_string(settings.CAPTCHA_HIDDEN_FIELD_TEMPLATE, context)
        self.text_field = render_to_string(settings.CAPTCHA_TEXT_FIELD_TEMPLATE, context)

        return super(CaptchaTextInput, self).render(name, self._value, attrs=attrs)
项目:django-spectator    作者:philgyford    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        kwargs.setdefault('blank', True)
        kwargs.setdefault('editable', False)

        populate_from = kwargs.pop('populate_from', None)
        if populate_from is None:
            raise ValueError("missing 'populate_from' argument")
        else:
            self._populate_from = populate_from

        self.slugify_function = kwargs.pop('slugify_function', slugify)
        self.separator = kwargs.pop('separator', six.u('-'))
        self.overwrite = kwargs.pop('overwrite', False)
        self.check_is_bool('overwrite')
        self.allow_duplicates = kwargs.pop('allow_duplicates', False)
        self.check_is_bool('allow_duplicates')
        self.max_unique_query_attempts = kwargs.pop('max_unique_query_attempts', MAX_UNIQUE_QUERY_ATTEMPTS)
        super(AutoSlugField, self).__init__(*args, **kwargs)
项目:pysoa    作者:eventbrite    | 项目源码 | 文件源码
def _make_context_header(
        self,
        switches=None,
        correlation_id=None,
        context_extra=None,
    ):
        # Copy the underlying context object, if it was provided
        context = dict(self.context.items()) if self.context else {}
        # Either add on, reuse or generate a correlation ID
        if correlation_id is not None:
            context['correlation_id'] = correlation_id
        elif 'correlation_id' not in context:
            context['correlation_id'] = six.u(uuid.uuid1().hex)
        # Switches can come from three different places, so merge them
        # and ensure that they are unique
        switches = set(switches or [])
        if context_extra:
            switches |= set(context_extra.pop('switches', []))
        context['switches'] = list(set(context.get('switches', [])) | switches)
        # Add any extra stuff
        if context_extra:
            context.update(context_extra)
        return context
项目:pysoa    作者:eventbrite    | 项目源码 | 文件源码
def job(self):
        return {
            'control': {
                'continue_on_error': False,
            },
            'context': {
                'switches': [1, 2, 3],
                'correlation_id': six.u(str(uuid.uuid4())),
            },
            'actions': [{
                'action': 'test_action_name',
                'body': {
                    'first_name': 'Bob',
                    'last_name': 'Mueller',
                },
            }],
        }
项目:topical_word_embeddings    作者:thunlp    | 项目源码 | 文件源码
def deaccent(text):
    """
    Remove accentuation from the given string. Input text is either a unicode string or utf8 encoded bytestring.

    Return input string with accents removed, as unicode.

    >>> deaccent("Šéf chomutovských komunist? dostal poštou bílý prášek")
    u'Sef chomutovskych komunistu dostal postou bily prasek'

    """
    if not isinstance(text, unicode):
        # assume utf8 for byte strings, use default (strict) error handling
        text = text.decode('utf8')
    norm = unicodedata.normalize("NFD", text)
    result = u('').join(ch for ch in norm if unicodedata.category(ch) != 'Mn')
    return unicodedata.normalize("NFC", result)
项目:topical_word_embeddings    作者:thunlp    | 项目源码 | 文件源码
def tokenize(text, lowercase=False, deacc=False, errors="strict", to_lower=False, lower=False):
    """
    Iteratively yield tokens as unicode strings, optionally also lowercasing them
    and removing accent marks.

    Input text may be either unicode or utf8-encoded byte string.

    The tokens on output are maximal contiguous sequences of alphabetic
    characters (no digits!).

    >>> list(tokenize('Nic nem?že let?t rychlostí vyšší, než 300 tisíc kilometr? za sekundu!', deacc = True))
    [u'Nic', u'nemuze', u'letet', u'rychlosti', u'vyssi', u'nez', u'tisic', u'kilometru', u'za', u'sekundu']

    """
    lowercase = lowercase or to_lower or lower
    text = to_unicode(text, errors=errors)
    if lowercase:
        text = text.lower()
    if deacc:
        text = deaccent(text)
    for match in PAT_ALPHABETIC.finditer(text):
        yield match.group()
项目:topical_word_embeddings    作者:thunlp    | 项目源码 | 文件源码
def deaccent(text):
    """
    Remove accentuation from the given string. Input text is either a unicode string or utf8 encoded bytestring.

    Return input string with accents removed, as unicode.

    >>> deaccent("Šéf chomutovských komunist? dostal poštou bílý prášek")
    u'Sef chomutovskych komunistu dostal postou bily prasek'

    """
    if not isinstance(text, unicode):
        # assume utf8 for byte strings, use default (strict) error handling
        text = text.decode('utf8')
    norm = unicodedata.normalize("NFD", text)
    result = u('').join(ch for ch in norm if unicodedata.category(ch) != 'Mn')
    return unicodedata.normalize("NFC", result)
项目:topical_word_embeddings    作者:thunlp    | 项目源码 | 文件源码
def deaccent(text):
    """
    Remove accentuation from the given string. Input text is either a unicode string or utf8 encoded bytestring.

    Return input string with accents removed, as unicode.

    >>> deaccent("Šéf chomutovských komunist? dostal poštou bílý prášek")
    u'Sef chomutovskych komunistu dostal postou bily prasek'

    """
    if not isinstance(text, unicode):
        # assume utf8 for byte strings, use default (strict) error handling
        text = text.decode('utf8')
    norm = unicodedata.normalize("NFD", text)
    result = u('').join(ch for ch in norm if unicodedata.category(ch) != 'Mn')
    return unicodedata.normalize("NFC", result)
项目:topical_word_embeddings    作者:thunlp    | 项目源码 | 文件源码
def tokenize(text, lowercase=False, deacc=False, errors="strict", to_lower=False, lower=False):
    """
    Iteratively yield tokens as unicode strings, optionally also lowercasing them
    and removing accent marks.

    Input text may be either unicode or utf8-encoded byte string.

    The tokens on output are maximal contiguous sequences of alphabetic
    characters (no digits!).

    >>> list(tokenize('Nic nem?že let?t rychlostí vyšší, než 300 tisíc kilometr? za sekundu!', deacc = True))
    [u'Nic', u'nemuze', u'letet', u'rychlosti', u'vyssi', u'nez', u'tisic', u'kilometru', u'za', u'sekundu']

    """
    lowercase = lowercase or to_lower or lower
    text = to_unicode(text, errors=errors)
    if lowercase:
        text = text.lower()
    if deacc:
        text = deaccent(text)
    for match in PAT_ALPHABETIC.finditer(text):
        yield match.group()
项目:python-zunclient    作者:openstack    | 项目源码 | 文件源码
def test_keys_and_vals_to_strs(self):
        dict_in = {six.u('a'): six.u('1'),
                   six.u('b'): {six.u('x'): 1,
                                'y': six.u('2'),
                                six.u('z'): six.u('3')},
                   'c': 7}

        dict_exp = collections.OrderedDict([
            ('a', '1'),
            ('b', collections.OrderedDict([
                ('x', 1),
                ('y', '2'),
                ('z', '3')])),
            ('c', 7)])

        dict_out = cliutils.keys_and_vals_to_strs(dict_in)
        dict_act = collections.OrderedDict([
            ('a', dict_out['a']),
            ('b', collections.OrderedDict(sorted(dict_out['b'].items()))),
            ('c', dict_out['c'])])

        self.assertEqual(six.text_type(dict_exp), six.text_type(dict_act))
项目:gym    作者:openai    | 项目源码 | 文件源码
def colorize(string, color, bold=False, highlight = False):
    """Return string surrounded by appropriate terminal color codes to
    print colorized text.  Valid colors: gray, red, green, yellow,
    blue, magenta, cyan, white, crimson
    """

    # Import six here so that `utils` has no import-time dependencies.
    # We want this since we use `utils` during our import-time sanity checks
    # that verify that our dependencies (including six) are actually present.
    import six

    attr = []
    num = color2num[color]
    if highlight: num += 10
    attr.append(six.u(str(num)))
    if bold: attr.append(six.u('1'))
    attrs = six.u(';').join(attr)
    return six.u('\x1b[%sm%s\x1b[0m') % (attrs, string)
项目:bbs    作者:irontec    | 项目源码 | 文件源码
def _clean_illegal_xml_chars(string_to_clean):
        """
        Removes any illegal unicode characters from the given XML string.

        @see: http://stackoverflow.com/questions/1707890/fast-way-to-filter-illegal-xml-unicode-chars-in-python
        """

        illegal_unichrs = [
            (0x00, 0x08), (0x0B, 0x1F), (0x7F, 0x84), (0x86, 0x9F),
            (0xD800, 0xDFFF), (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF),
            (0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), (0x3FFFE, 0x3FFFF),
            (0x4FFFE, 0x4FFFF), (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF),
            (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), (0x9FFFE, 0x9FFFF),
            (0xAFFFE, 0xAFFFF), (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF),
            (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), (0xFFFFE, 0xFFFFF),
            (0x10FFFE, 0x10FFFF)]

        illegal_ranges = ["%s-%s" % (unichr(low), unichr(high))
                          for (low, high) in illegal_unichrs
                          if low < sys.maxunicode]

        illegal_xml_re = re.compile(u('[%s]') % u('').join(illegal_ranges))
        return illegal_xml_re.sub('', string_to_clean)
项目:cetusshop    作者:icetusorg    | 项目源码 | 文件源码
def fetch_captcha_store(self, name, value, attrs=None):
        """
        Fetches a new CaptchaStore
        This has to be called inside render
        """
        try:
            reverse('captcha-image', args=('dummy',))
        except NoReverseMatch:
            raise ImproperlyConfigured('Make sure you\'ve included captcha.urls as explained in the INSTALLATION section on http://readthedocs.org/docs/django-simple-captcha/en/latest/usage.html#installation')

        key = CaptchaStore.generate_key()

        # these can be used by format_output and render
        self._value = [key, u('')]
        self._key = key
        self.id_ = self.build_attrs(attrs).get('id', None)
项目:cetusshop    作者:icetusorg    | 项目源码 | 文件源码
def render(self, name, value, attrs=None):
        self.fetch_captcha_store(name, value, attrs)

        context = {
            'image': self.image_url(),
            'name': name,
            'key': self._key,
            'id': u'%s_%s' % (self._args.get('id_prefix'), attrs.get('id')) if self._args.get('id_prefix') else attrs.get('id')
        }
        if settings.CAPTCHA_FLITE_PATH:
            context.update({'audio': self.audio_url()})

        self.image_and_audio = render_to_string(settings.CAPTCHA_IMAGE_TEMPLATE, context)
        self.hidden_field = render_to_string(settings.CAPTCHA_HIDDEN_FIELD_TEMPLATE, context)
        self.text_field = render_to_string(settings.CAPTCHA_TEXT_FIELD_TEMPLATE, context)

        return super(CaptchaTextInput, self).render(name, self._value, attrs=attrs)
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def test_unicode_sequence_form_value(resp_mock, mock):
    http = mock.return_value
    http.request.return_value = (Mock(), Mock())

    data = {
        "body": [u('\xe5'), u('\xe7')],
    }

    resources.make_request("POST", "http://www.example.com", data=data)

    http.request.assert_called_with(
        "http://www.example.com",
        "POST",
        headers=None,
        body="body=%C3%A5&body=%C3%A7",
    )
项目:ingest-client    作者:jhuapl-boss    | 项目源码 | 文件源码
def encode_tile_key(self, project_info, resolution, x_index, y_index, z_index, t_index=0):
        """A method to create a tile key.

        The tile key is the key used for each individual tile file.

        Args:
            project_info(list): A list of strings containing the project/data model information for where data belongs
            resolution(int): The level of the resolution hierarchy.  Typically 0
            x_index(int): The x tile index
            y_index(int): The y tile index
            z_index(int): The z tile index
            t_index(int): The time index

        Returns:
            (str): The object key to use for uploading to the tile bucket
        """
        proj_str = six.u("&".join([str(x) for x in project_info]))
        base_key = six.u("{}&{}&{}&{}&{}&{}".format(proj_str, resolution, x_index, y_index, z_index, t_index))

        hashm = hashlib.md5()
        hashm.update(base_key.encode())

        return six.u("{}&{}".format(hashm.hexdigest(), base_key))
项目:ingest-client    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_encode_tile_key(self):
        """Test encoding an object key"""
        b = BossBackend(self.example_config_data)
        b.setup(self.api_token)

        params = {"collection": 1,
                  "experiment": 2,
                  "channel": 3,
                  "resolution": 0,
                  "x_index": 5,
                  "y_index": 6,
                  "z_index": 1,
                  "t_index": 0,
                  }

        proj = [str(params['collection']), str(params['experiment']), str(params['channel'])]
        key = b.encode_tile_key(proj,
                                params['resolution'],
                                params['x_index'],
                                params['y_index'],
                                params['z_index'],
                                params['t_index'],
                                )

        assert key == six.u("03ca58a12ec662954ac12e06517d4269&1&2&3&0&5&6&1&0")
项目:ingest-client    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_encode_tile_key(self):
        """Test encoding an object key"""
        b = BossBackend(self.example_config_data)
        b.setup(self.api_token)

        params = {"collection": 1,
                  "experiment": 2,
                  "channel": 3,
                  "resolution": 0,
                  "x_index": 5,
                  "y_index": 6,
                  "z_index": 1,
                  "t_index": 0,
                  }

        proj = [str(params['collection']), str(params['experiment']), str(params['channel'])]
        key = b.encode_tile_key(proj,
                                params['resolution'],
                                params['x_index'],
                                params['y_index'],
                                params['z_index'],
                                params['t_index'],
                                )

        assert key == six.u("03ca58a12ec662954ac12e06517d4269&1&2&3&0&5&6&1&0")
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_task_init(self):
        task = self.RandomTask(string=six.u('XYZ'), integer=10)
        self.assertEqual(task.Input.string, six.u('XYZ'))
        self.assertEqual(task.Input.integer, 10)

        task.set_input(string=six.u('DEF'), integer=20)
        self.assertEqual(task.Input.string, six.u('DEF'))
        self.assertEqual(task.Input.integer, 20)
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_task_fields(self):
        task = self.RandomTask()

        self.assertEqual(task.Input.string, six.u('ABCD'))
        task.Input.string = six.u('XYZ')
        self.assertEqual(task.Input.string, six.u('XYZ'))
        with self.assertRaises(InvalidValueException):
            task.Input.string = 10

        self.assertEqual(task.Input.integer, 5)
        task.Input.integer = 10
        self.assertEqual(task.Input.integer, 10)
        with self.assertRaises(InvalidValueException):
            task.Input.integer = 'ABCD'

        self.assertEqual(task.Output.floating, 1.5)
        task.Output.floating = 5.5
        self.assertEqual(task.Output.floating, 5.5)
        with self.assertRaises(InvalidValueException):
            task.Output.floating = 'ABCD'

        task2 = self.RandomTask()
        self.assertEqual(task2.Input.string, 'ABCD')
        task2.Input.string = six.u('123')
        self.assertEqual(task2.Input.string, six.u('123'))

        # Test if the tasks share the same fields
        self.assertEqual(task.Input.string, six.u('XYZ'))

        # Check if it's cloned
        self.assertNotEqual(task.Input.instream, task2.Input.instream)
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_task_serialize(self):
        task = self.RandomTask()
        serialized = {
            'ID': 'random',
            'Input': {
                'string': six.u('ABCD'),
                'integer': 5,
                'integer_array': [],
                'anyf': six.u(base64.b64encode(pickle.dumps({1: 2})))
            },
            'Output': {
                'floating': 1.5,
                'none': None
            }
        }
        self.assertEqual(task.serialize(), serialized)

        task = self.RandomTask()
        task.Input.string = six.u('XYZ')
        task.Input.integer = 10
        task.Input.integer_array = [1, 2, 3]
        task.Input.anyf = ['hello', 'world']
        task.Output.floating = 2.5
        serialized = {
            'ID': 'random',
            'Input': {
                'string': six.u('XYZ'),
                'integer': 10,
                'integer_array': [1, 2, 3],
                'anyf': six.u(base64.b64encode(pickle.dumps(['hello', 'world'])))
            },
            'Output': {
                'floating': 2.5,
                'none': None
            }
        }
        self.assertEqual(task.serialize(), serialized)
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_task_deserialize(self):
        serialized = {
            'ID': 'random',
            'Input': {
                'string': six.u('ABCD'),
                'integer': 5,
                'integer_array': [1, 2, 3],
                'anyf': six.u(base64.b64encode(pickle.dumps(100)))
            },
            'Output': {
                'floating': 1.5,
                'none': None
            }
        }
        instance = Task.deserialize(data=serialized)
        self.assertTrue(isinstance(instance, self.RandomTask))
        self.assertEqual(instance.Input.string, six.u('ABCD'))
        self.assertEqual(instance.Input.integer, 5)
        self.assertEqual(instance.Output.floating, 1.5)

        serialized = {
            'ID': 'random',
            'Input': {
                'string': six.u('XYZ'),
                'integer': 10
            },
            'Output': {
                'floating': 2.5,
                'none': None
            }
        }
        instance = Task.deserialize(data=serialized)
        self.assertTrue(isinstance(instance, self.RandomTask))
        self.assertEqual(instance.Input.string, six.u('XYZ'))
        self.assertEqual(instance.Input.integer, 10)
        self.assertEqual(instance.Output.floating, 2.5)
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_string_stream(self):
        instance = StringStreamField().create()
        with self.assertRaises(StreamNotAvailableException):
            instance.write(six.u('hello'))

        with self.assertRaises(StreamNotAvailableException):
            instance.read()

        class DummyStream(object):
            def __init__(self):
                self.buffer = six.u('')

            def read(self, n=-1):
                if n == -1:
                    n = len(self.buffer)
                result = self.buffer[:n]
                self.buffer = self.buffer[n:]
                return result

            def write(self, data):
                self.buffer += data

        instance.set_real_stream(DummyStream())
        with self.assertRaises(InvalidValueException):
            instance.write(six.b('hello'))

        instance.write(six.u('hello'))
        self.assertEqual(instance.read(), six.u('hello'))
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_byte_stream(self):
        instance = ByteStreamField().create()
        with self.assertRaises(StreamNotAvailableException):
            instance.write(six.u('hello'))

        with self.assertRaises(StreamNotAvailableException):
            instance.read()

        class DummyStream(object):
            def __init__(self):
                self.buffer = six.b('')

            def read(self, n=-1):
                if n == -1:
                    n = len(self.buffer)
                result = self.buffer[:n]
                self.buffer = self.buffer[n:]
                return result

            def write(self, data):
                self.buffer += data

        instance.set_real_stream(DummyStream())
        with self.assertRaises(InvalidValueException):
            instance.write(six.u('hello'))

        instance.write(six.b('hello'))
        self.assertEqual(instance.read(), six.b('hello'))

        with self.assertRaises(NotImplementedError):
            StreamField().create()  # Test for coverage :)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def to_son(self, value):
        value = decimal.Decimal(value)
        return six.u(str(value.quantize(self.precision, rounding=self.rounding)))
项目:pyemtmad    作者:rmed    | 项目源码 | 文件源码
def ints_to_string(ints):
    """Convert a list of integers to a *|* separated string.

    Args:
        ints (list[int]|int): List of integer items to convert or single
            integer to convert.

    Returns:
        str: Formatted string
    """
    if not isinstance(ints, list):
        return six.u(str(ints))

    return '|'.join(six.u(str(l)) for l in ints)
项目:glog-cli    作者:globocom    | 项目源码 | 文件源码
def format(self, entry):
        message = entry.message
        timestamp = entry.timestamp.to("local")
        source = entry.message_dict.get("source")
        facility = entry.message_dict.get("facility")
        custom_fields = list(self.fields)

        log_level = LogLevel.find_by_syslog_code(entry.level)
        args = {
            'timestamp': timestamp.format(utils.DEFAULT_DATE_FORMAT),
            'level': log_level['name'],
            'message': self.encode_message(message),
            'source': source or '',
            'facility': facility or ''
        }

        for field in custom_fields:
            if field not in self.DEFAULT_FIELDS:
                args[field] = entry.message_dict.get(field, '')

        log = six.u(self.format_template).format(**args)

        if self.color:
            return colored(log, log_level['color'], log_level['bg_color'])
        else:
            return log
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def test_assert_dict_unicode_error(self):
        with catch_warnings(record=True):
            # This causes a UnicodeWarning due to its craziness
            one = ''.join(chr(i) for i in range(255))
            # this used to cause a UnicodeDecodeError constructing the failure msg
            with self.assertRaises(self.failureException):
                self.assertDictContainsSubset({'foo': one}, {'foo': u('\uFFFD')})
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def test_formatMessage_unicode_error(self):
        with catch_warnings(record=True):
            # This causes a UnicodeWarning due to its craziness
            one = ''.join(chr(i) for i in range(255))
            # this used to cause a UnicodeDecodeError constructing msg
            self._formatMessage(one, u('\uFFFD'))
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def test_pickle_unpickle(self):
        # Issue #7197: a TextTestRunner should be (un)pickleable. This is
        # required by test_multiprocessing under Windows (in verbose mode).
        stream = StringIO(u("foo"))
        runner = unittest2.TextTestRunner(stream)
        for protocol in range(2, pickle.HIGHEST_PROTOCOL + 1):
            s = pickle.dumps(runner, protocol=protocol)
            obj = pickle.loads(s)
            # StringIO objects never compare equal, a cheap test instead.
            self.assertEqual(obj.stream.getvalue(), stream.getvalue())
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def testAssertRaisesRegex(self):
        class ExceptionMock(Exception):
            pass

        def Stub():
            raise ExceptionMock('We expect')

        self.assertRaisesRegex(ExceptionMock, re.compile('expect$'), Stub)
        self.assertRaisesRegex(ExceptionMock, 'expect$', Stub)
        self.assertRaisesRegex(ExceptionMock, u('expect$'), Stub)
        with self.assertWarns(DeprecationWarning):
            self.assertRaisesRegex(ExceptionMock, 'expect$', None)
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def writeln(self, arg=None):
        if arg:
            self.write(arg)
        self.write(u('\n')) # text-mode streams translate to \r\n if needed
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def addUnexpectedSuccess(self, test):
        super(TextTestResult, self).addUnexpectedSuccess(test)
        if self.showAll:
            self.stream.writeln("unexpected success")
        elif self.dots:
            self.stream.write("u")
            self.stream.flush()
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def test_format_exception_only_undecodable__str__(self):
        # This won't decode via the ascii codec.
        X = Exception(u('\u5341').encode('shift-jis'))
        err = traceback.format_exception_only(type(X), X)
        self.assertEqual(len(err), 1)
        str_value = "b'\\x8f\\\\'"
        self.assertEqual(err[0], "Exception: %s\n" % str_value)