Python six 模块,integer_types() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.integer_types()

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _validate_dict_data(self, expected, actual):
        """Validate dictionary data.

           Compare expected dictionary data vs actual dictionary data.
           The values in the 'expected' dictionary can be strings, bools, ints,
           longs, or can be a function that evaluates a variable and returns a
           bool.
           """
        self.log.debug('actual: {}'.format(repr(actual)))
        self.log.debug('expected: {}'.format(repr(expected)))

        for k, v in six.iteritems(expected):
            if k in actual:
                if (isinstance(v, six.string_types) or
                        isinstance(v, bool) or
                        isinstance(v, six.integer_types)):
                    # handle explicit values
                    if v != actual[k]:
                        return "{}:{}".format(k, actual[k])
                # handle function pointers, such as not_null or valid_ip
                elif not v(actual[k]):
                    return "{}:{}".format(k, actual[k])
            else:
                return "key '{}' does not exist".format(k)
        return None
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _validate_dict_data(self, expected, actual):
        """Validate dictionary data.

           Compare expected dictionary data vs actual dictionary data.
           The values in the 'expected' dictionary can be strings, bools, ints,
           longs, or can be a function that evaluates a variable and returns a
           bool.
           """
        self.log.debug('actual: {}'.format(repr(actual)))
        self.log.debug('expected: {}'.format(repr(expected)))

        for k, v in six.iteritems(expected):
            if k in actual:
                if (isinstance(v, six.string_types) or
                        isinstance(v, bool) or
                        isinstance(v, six.integer_types)):
                    # handle explicit values
                    if v != actual[k]:
                        return "{}:{}".format(k, actual[k])
                # handle function pointers, such as not_null or valid_ip
                elif not v(actual[k]):
                    return "{}:{}".format(k, actual[k])
            else:
                return "key '{}' does not exist".format(k)
        return None
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_yahoo_bars_to_panel_source(self):
        env = TradingEnvironment()
        finder = AssetFinder(env.engine)
        stocks = ['AAPL', 'GE']
        env.write_data(equities_identifiers=stocks)
        start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc)
        end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc)
        data = factory.load_bars_from_yahoo(stocks=stocks,
                                            indexes={},
                                            start=start,
                                            end=end)
        check_fields = ['sid', 'open', 'high', 'low', 'close',
                        'volume', 'price']

        copy_panel = data.copy()
        sids = finder.map_identifier_index_to_sids(
            data.items, data.major_axis[0]
        )
        copy_panel.items = sids
        source = DataPanelSource(copy_panel)
        for event in source:
            for check_field in check_fields:
                self.assertIn(check_field, event)
            self.assertTrue(isinstance(event['volume'], (integer_types)))
            self.assertTrue(event['sid'] in sids)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def fetch_size(self, v):
        """
        Sets the number of rows that are fetched at a time.

        *Note that driver's default fetch size is 5000.*

        .. code-block:: python

            for user in User.objects().fetch_size(500):
                print(user)
        """

        if not isinstance(v, six.integer_types):
            raise TypeError
        if v == self._fetch_size:
            return self

        if v < 1:
            raise QueryException("fetch size less than 1 is not allowed")

        clone = copy.deepcopy(self)
        clone._fetch_size = v
        return clone
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def __init__(self, value):
        """
        Initializer value can be:

            - integer_type: absolute nanoseconds in the day
            - datetime.time: built-in time
            - string_type: a string time of the form "HH:MM:SS[.mmmuuunnn]"
        """
        if isinstance(value, six.integer_types):
            self._from_timestamp(value)
        elif isinstance(value, datetime.time):
            self._from_time(value)
        elif isinstance(value, six.string_types):
            self._from_timestring(value)
        else:
            raise TypeError('Time arguments must be a whole number, datetime.time, or string')
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def __init__(self, value):
        """
        Initializer value can be:

            - integer_type: absolute days from epoch (1970, 1, 1). Can be negative.
            - datetime.date: built-in date
            - string_type: a string time of the form "yyyy-mm-dd"
        """
        if isinstance(value, six.integer_types):
            self.days_from_epoch = value
        elif isinstance(value, (datetime.date, datetime.datetime)):
            self._from_timetuple(value.timetuple())
        elif isinstance(value, six.string_types):
            self._from_datestring(value)
        else:
            raise TypeError('Date arguments must be a whole number, datetime.date, or string')
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def _validate_dict_data(self, expected, actual):
        """Validate dictionary data.

           Compare expected dictionary data vs actual dictionary data.
           The values in the 'expected' dictionary can be strings, bools, ints,
           longs, or can be a function that evaluates a variable and returns a
           bool.
           """
        self.log.debug('actual: {}'.format(repr(actual)))
        self.log.debug('expected: {}'.format(repr(expected)))

        for k, v in six.iteritems(expected):
            if k in actual:
                if (isinstance(v, six.string_types) or
                        isinstance(v, bool) or
                        isinstance(v, six.integer_types)):
                    # handle explicit values
                    if v != actual[k]:
                        return "{}:{}".format(k, actual[k])
                # handle function pointers, such as not_null or valid_ip
                elif not v(actual[k]):
                    return "{}:{}".format(k, actual[k])
            else:
                return "key '{}' does not exist".format(k)
        return None
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def _validate_dict_data(self, expected, actual):
        """Validate dictionary data.

           Compare expected dictionary data vs actual dictionary data.
           The values in the 'expected' dictionary can be strings, bools, ints,
           longs, or can be a function that evaluates a variable and returns a
           bool.
           """
        self.log.debug('actual: {}'.format(repr(actual)))
        self.log.debug('expected: {}'.format(repr(expected)))

        for k, v in six.iteritems(expected):
            if k in actual:
                if (isinstance(v, six.string_types) or
                        isinstance(v, bool) or
                        isinstance(v, six.integer_types)):
                    # handle explicit values
                    if v != actual[k]:
                        return "{}:{}".format(k, actual[k])
                # handle function pointers, such as not_null or valid_ip
                elif not v(actual[k]):
                    return "{}:{}".format(k, actual[k])
            else:
                return "key '{}' does not exist".format(k)
        return None
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _asFileDescriptor(obj):
    fd = None
    if not isinstance(obj, integer_types):
        meth = getattr(obj, "fileno", None)
        if meth is not None:
            obj = meth()

    if isinstance(obj, integer_types):
        fd = obj

    if not isinstance(fd, integer_types):
        raise TypeError("argument must be an int, or have a fileno() method.")
    elif fd < 0:
        raise ValueError(
            "file descriptor cannot be a negative integer (%i)" % (fd,))

    return fd
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def use_certificate_file(self, certfile, filetype=FILETYPE_PEM):
        """
        Load a certificate from a file

        :param certfile: The name of the certificate file (``bytes`` or
            ``unicode``).
        :param filetype: (optional) The encoding of the file, default is PEM

        :return: None
        """
        certfile = _path_string(certfile)
        if not isinstance(filetype, integer_types):
            raise TypeError("filetype must be an integer")

        use_result = _lib.SSL_CTX_use_certificate_file(
            self._context, certfile, filetype
        )
        if not use_result:
            _raise_current_error()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED):
        """
        Load a private key from a file

        :param keyfile: The name of the key file (``bytes`` or ``unicode``)
        :param filetype: (optional) The encoding of the file, default is PEM

        :return: None
        """
        keyfile = _path_string(keyfile)

        if filetype is _UNSPECIFIED:
            filetype = FILETYPE_PEM
        elif not isinstance(filetype, integer_types):
            raise TypeError("filetype must be an integer")

        use_result = _lib.SSL_CTX_use_PrivateKey_file(
            self._context, keyfile, filetype)
        if not use_result:
            self._raise_passphrase_exception()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def bio_read(self, bufsiz):
        """
        When using non-socket connections this function reads the "dirty" data
        that would have traveled away on the network.

        :param bufsiz: The maximum number of bytes to read
        :return: The string read.
        """
        if self._from_ssl is None:
            raise TypeError("Connection sock was not None")

        if not isinstance(bufsiz, integer_types):
            raise TypeError("bufsiz must be an integer")

        buf = _ffi.new("char[]", bufsiz)
        result = _lib.BIO_read(self._from_ssl, buf, bufsiz)
        if result <= 0:
            self._handle_bio_errors(self._from_ssl, result)

        return _ffi.buffer(buf, result)[:]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def bytes(num_bytes):
    """
    Get some random bytes from the PRNG as a string.

    This is a wrapper for the C function ``RAND_bytes``.

    :param num_bytes: The number of bytes to fetch.

    :return: A string of random bytes.
    """
    if not isinstance(num_bytes, _integer_types):
        raise TypeError("num_bytes must be an integer")

    if num_bytes < 0:
        raise ValueError("num_bytes must not be negative")

    result_buffer = _ffi.new("char[]", num_bytes)
    result_code = _lib.RAND_bytes(result_buffer, num_bytes)
    if result_code == -1:
        # TODO: No tests for this code path.  Triggering a RAND_bytes failure
        # might involve supplying a custom ENGINE?  That's hard.
        _raise_current_error()

    return _ffi.buffer(result_buffer)[:]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, key, length, algorithm, backend):
        if not isinstance(backend, HMACBackend):
            raise UnsupportedAlgorithm(
                "Backend object does not implement HMACBackend.",
                _Reasons.BACKEND_MISSING_INTERFACE
            )

        if len(key) < 16:
            raise ValueError("Key length has to be at least 128 bits.")

        if not isinstance(length, six.integer_types):
            raise TypeError("Length parameter must be an integer type.")

        if length < 6 or length > 8:
            raise ValueError("Length of HOTP has to be between 6 to 8.")

        if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
            raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")

        self._key = key
        self._length = length
        self._algorithm = algorithm
        self._backend = backend
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, ca, path_length):
        if not isinstance(ca, bool):
            raise TypeError("ca must be a boolean value")

        if path_length is not None and not ca:
            raise ValueError("path_length must be None when ca is False")

        if (
            path_length is not None and
            (not isinstance(path_length, six.integer_types) or path_length < 0)
        ):
            raise TypeError(
                "path_length must be a non-negative integer or None"
            )

        self._ca = ca
        self._path_length = path_length
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def serial_number(self, number):
        """
        Sets the certificate serial number.
        """
        if not isinstance(number, six.integer_types):
            raise TypeError('Serial number must be of integral type.')
        if self._serial_number is not None:
            raise ValueError('The serial number may only be set once.')
        if number < 0:
            raise ValueError('The serial number should be non-negative.')
        if utils.bit_length(number) > 160:  # As defined in RFC 5280
            raise ValueError('The serial number should not be more than 160 '
                             'bits.')
        return CertificateBuilder(
            self._issuer_name, self._subject_name,
            self._public_key, number, self._not_valid_before,
            self._not_valid_after, self._extensions
        )
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _asFileDescriptor(obj):
    fd = None
    if not isinstance(obj, integer_types):
        meth = getattr(obj, "fileno", None)
        if meth is not None:
            obj = meth()

    if isinstance(obj, integer_types):
        fd = obj

    if not isinstance(fd, integer_types):
        raise TypeError("argument must be an int, or have a fileno() method.")
    elif fd < 0:
        raise ValueError(
            "file descriptor cannot be a negative integer (%i)" % (fd,))

    return fd
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED):
        """
        Load a private key from a file

        :param keyfile: The name of the key file (``bytes`` or ``unicode``)
        :param filetype: (optional) The encoding of the file, default is PEM

        :return: None
        """
        keyfile = _path_string(keyfile)

        if filetype is _UNSPECIFIED:
            filetype = FILETYPE_PEM
        elif not isinstance(filetype, integer_types):
            raise TypeError("filetype must be an integer")

        use_result = _lib.SSL_CTX_use_PrivateKey_file(
            self._context, keyfile, filetype)
        if not use_result:
            self._raise_passphrase_exception()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def set_verify(self, mode, callback):
        """
        Set the verify mode and verify callback

        :param mode: The verify mode, this is either VERIFY_NONE or
                     VERIFY_PEER combined with possible other flags
        :param callback: The Python callback to use
        :return: None

        See SSL_CTX_set_verify(3SSL) for further details.
        """
        if not isinstance(mode, integer_types):
            raise TypeError("mode must be an integer")

        if not callable(callback):
            raise TypeError("callback must be callable")

        self._verify_helper = _VerifyHelper(callback)
        self._verify_callback = self._verify_helper.callback
        _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def bio_read(self, bufsiz):
        """
        When using non-socket connections this function reads the "dirty" data
        that would have traveled away on the network.

        :param bufsiz: The maximum number of bytes to read
        :return: The string read.
        """
        if self._from_ssl is None:
            raise TypeError("Connection sock was not None")

        if not isinstance(bufsiz, integer_types):
            raise TypeError("bufsiz must be an integer")

        buf = _ffi.new("char[]", bufsiz)
        result = _lib.BIO_read(self._from_ssl, buf, bufsiz)
        if result <= 0:
            self._handle_bio_errors(self._from_ssl, result)

        return _ffi.buffer(buf, result)[:]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def bytes(num_bytes):
    """
    Get some random bytes from the PRNG as a string.

    This is a wrapper for the C function ``RAND_bytes``.

    :param num_bytes: The number of bytes to fetch.

    :return: A string of random bytes.
    """
    if not isinstance(num_bytes, _integer_types):
        raise TypeError("num_bytes must be an integer")

    if num_bytes < 0:
        raise ValueError("num_bytes must not be negative")

    result_buffer = _ffi.new("char[]", num_bytes)
    result_code = _lib.RAND_bytes(result_buffer, num_bytes)
    if result_code == -1:
        # TODO: No tests for this code path.  Triggering a RAND_bytes failure
        # might involve supplying a custom ENGINE?  That's hard.
        _raise_current_error()

    return _ffi.buffer(result_buffer)[:]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, key, length, algorithm, backend):
        if not isinstance(backend, HMACBackend):
            raise UnsupportedAlgorithm(
                "Backend object does not implement HMACBackend.",
                _Reasons.BACKEND_MISSING_INTERFACE
            )

        if len(key) < 16:
            raise ValueError("Key length has to be at least 128 bits.")

        if not isinstance(length, six.integer_types):
            raise TypeError("Length parameter must be an integer type.")

        if length < 6 or length > 8:
            raise ValueError("Length of HOTP has to be between 6 to 8.")

        if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
            raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")

        self._key = key
        self._length = length
        self._algorithm = algorithm
        self._backend = backend
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, ca, path_length):
        if not isinstance(ca, bool):
            raise TypeError("ca must be a boolean value")

        if path_length is not None and not ca:
            raise ValueError("path_length must be None when ca is False")

        if (
            path_length is not None and
            (not isinstance(path_length, six.integer_types) or path_length < 0)
        ):
            raise TypeError(
                "path_length must be a non-negative integer or None"
            )

        self._ca = ca
        self._path_length = path_length
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, require_explicit_policy, inhibit_policy_mapping):
        if require_explicit_policy is not None and not isinstance(
            require_explicit_policy, six.integer_types
        ):
            raise TypeError(
                "require_explicit_policy must be a non-negative integer or "
                "None"
            )

        if inhibit_policy_mapping is not None and not isinstance(
            inhibit_policy_mapping, six.integer_types
        ):
            raise TypeError(
                "inhibit_policy_mapping must be a non-negative integer or None"
            )

        if inhibit_policy_mapping is None and require_explicit_policy is None:
            raise ValueError(
                "At least one of require_explicit_policy and "
                "inhibit_policy_mapping must not be None"
            )

        self._require_explicit_policy = require_explicit_policy
        self._inhibit_policy_mapping = inhibit_policy_mapping
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def serial_number(self, number):
        """
        Sets the certificate serial number.
        """
        if not isinstance(number, six.integer_types):
            raise TypeError('Serial number must be of integral type.')
        if self._serial_number is not None:
            raise ValueError('The serial number may only be set once.')
        if number < 0:
            raise ValueError('The serial number should be non-negative.')
        if utils.bit_length(number) > 160:  # As defined in RFC 5280
            raise ValueError('The serial number should not be more than 160 '
                             'bits.')
        return CertificateBuilder(
            self._issuer_name, self._subject_name,
            self._public_key, number, self._not_valid_before,
            self._not_valid_after, self._extensions
        )
项目:infi.clickhouse_orm    作者:Infinidat    | 项目源码 | 文件源码
def __getitem__(self, s):
        if isinstance(s, six.integer_types):
            # Single index
            assert s >= 0, 'negative indexes are not supported'
            qs = copy(self)
            qs._limits = (s, 1)
            return six.next(iter(qs))
        else:
            # Slice
            assert s.step in (None, 1), 'step is not supported in slices'
            start = s.start or 0
            stop = s.stop or 2**63 - 1
            assert start >= 0 and stop >= 0, 'negative indexes are not supported'
            assert start <= stop, 'start of slice cannot be smaller than its end'
            qs = copy(self)
            qs._limits = (start, stop - start)
            return qs
项目:cqlmapper    作者:reddit    | 项目源码 | 文件源码
def fetch_size(self, v):
        """Sets the number of rows that are fetched at a time.

        *Note that driver's default fetch size is 5000.*

        .. code-block:: python

            for user in User.objects().fetch_size(500):
                print(user)
        """

        if not isinstance(v, six.integer_types):
            raise TypeError
        if v == self._fetch_size:
            return self

        if v < 1:
            raise QueryException("fetch size less than 1 is not allowed")

        clone = copy.deepcopy(self)
        clone._fetch_size = v
        return clone
项目:cqlmapper    作者:reddit    | 项目源码 | 文件源码
def timestamp_normalized(self):
        """
        we're expecting self.timestamp to be either a long, int, a datetime, or a timedelta
        :return:
        """
        if not self.timestamp:
            return None

        if isinstance(self.timestamp, six.integer_types):
            return self.timestamp

        if isinstance(self.timestamp, timedelta):
            tmp = datetime.now() + self.timestamp
        else:
            tmp = self.timestamp

        return int(time.mktime(tmp.timetuple()) * 1e+6 + tmp.microsecond)
项目:confu    作者:Maratyszcza    | 项目源码 | 文件源码
def validate_emscripten_memory_size(memory_size):
    if isinstance(memory_size, six.integer_types):
        if memory_size <= 0:
            raise ValueError("Invalid memory size value {size}: a positive number expected".format(size=memory_size))

        return ["-s", "TOTAL_MEMORY=" + str(memory_size)]
    elif isinstance(memory_size, str):
        import re
        match = re.match(r"^(\d+)([MK]?)$", memory_size)
        if not match:
            raise ValueError("Invalid memory size value {size}: "
                             "an integer expected with an optional M(ega) or K(ilo) suffix "
                             "(e.g. 256M)".format(size=memory_size))

        number = int(match.group(1))
        suffix = match.group(2)
        memory_size = {"": number, "K": number * 1024, "M": number * 1048576}[suffix]
        return validate_emscripten_memory_size(memory_size)
    elif memory_size is all:
        return ["-s", "ALLOW_MEMORY_GROWTH=1"]
    else:
        raise TypeError("Invalid memory size type: an integer, string, or all expected")
项目:zun    作者:openstack    | 项目源码 | 文件源码
def validate(cls, value, minimum=None, maximum=None):
        if value is None:
            return None

        if not isinstance(value, six.integer_types):
            try:
                value = int(value)
            except Exception:
                LOG.exception('Failed to convert value to int')
                raise exception.InvalidValue(value=value, type=cls.type_name)

        if minimum is not None and value < minimum:
            message = _("Integer '%(value)s' is smaller than "
                        "'%(min)d'.") % {'value': value, 'min': minimum}
            raise exception.InvalidValue(message=message)

        if maximum is not None and value > maximum:
            message = _("Integer '%(value)s' is large than "
                        "'%(max)d'.") % {'value': value, 'max': maximum}
            raise exception.InvalidValue(message=message)

        return value
项目:ironic-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def _determine_and_check_timeout_interval(timeout, default_timeout,
                                          interval, default_interval):
    if timeout is None:
        timeout = default_timeout
    if interval is None:
        interval = default_interval
    if (not isinstance(timeout, six.integer_types) or
            not isinstance(interval, six.integer_types) or
            timeout < 0 or interval < 0):
        raise AssertionError(
            'timeout and interval should be >= 0 or None, current values are: '
            '%(timeout)s, %(interval)s respectively. If timeout and/or '
            'interval are None, the default_timeout and default_interval are '
            'used, and they should be integers >= 0, current values are: '
            '%(default_timeout)s, %(default_interval)s respectively.' % dict(
                timeout=timeout, interval=interval,
                default_timeout=default_timeout,
                default_interval=default_interval)
        )
    return timeout, interval
项目:webapp2    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def _set_status(self, value):
        """The status string, including code and message."""
        message = None
        # Accept long because urlfetch in App Engine returns codes as longs.
        if isinstance(value, six.integer_types):
            code = int(value)
        else:
            if isinstance(value, six.text_type):
                # Status messages have to be ASCII safe, so this is OK.
                value = str(value)

            if not isinstance(value, str):
                raise TypeError(
                    'You must set status to a string or integer (not %s)'
                    % type(value))

            parts = value.split(' ', 1)
            code = int(parts[0])
            if len(parts) == 2:
                message = parts[1]

        message = message or Response.http_status_message(code)
        self._status = '%d %s' % (code, message)
项目:charm-nova-compute    作者:openstack    | 项目源码 | 文件源码
def _validate_dict_data(self, expected, actual):
        """Validate dictionary data.

           Compare expected dictionary data vs actual dictionary data.
           The values in the 'expected' dictionary can be strings, bools, ints,
           longs, or can be a function that evaluates a variable and returns a
           bool.
           """
        self.log.debug('actual: {}'.format(repr(actual)))
        self.log.debug('expected: {}'.format(repr(expected)))

        for k, v in six.iteritems(expected):
            if k in actual:
                if (isinstance(v, six.string_types) or
                        isinstance(v, bool) or
                        isinstance(v, six.integer_types)):
                    # handle explicit values
                    if v != actual[k]:
                        return "{}:{}".format(k, actual[k])
                # handle function pointers, such as not_null or valid_ip
                elif not v(actual[k]):
                    return "{}:{}".format(k, actual[k])
            else:
                return "key '{}' does not exist".format(k)
        return None
项目:sd2    作者:gae123    | 项目源码 | 文件源码
def process_expansions(dct):
    def expand(val, dct):
        if isinstance(val, six.integer_types) or isinstance(val, bool):
            return val
        if isinstance(val, six.string_types):
            dct2 = copy.deepcopy(dct)
            for env_key, env_val in six.iteritems(os.environ):
                dct2[env_key] = env_val
            return  string.Template(val).safe_substitute(dct2)
        if isinstance(val, list):
            return [expand(x, dct) for x in val]
        if isinstance(val, dict):
            return {k: expand(v,val) for k,v in six.iteritems(val)}
        return val

    for key,val in six.iteritems(dct):
        nval = expand(val, dct)
        dct[key] = nval
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def _asFileDescriptor(obj):
    fd = None
    if not isinstance(obj, integer_types):
        meth = getattr(obj, "fileno", None)
        if meth is not None:
            obj = meth()

    if isinstance(obj, integer_types):
        fd = obj

    if not isinstance(fd, integer_types):
        raise TypeError("argument must be an int, or have a fileno() method.")
    elif fd < 0:
        raise ValueError(
            "file descriptor cannot be a negative integer (%i)" % (fd,))

    return fd
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def use_certificate_file(self, certfile, filetype=FILETYPE_PEM):
        """
        Load a certificate from a file

        :param certfile: The name of the certificate file (``bytes`` or
            ``unicode``).
        :param filetype: (optional) The encoding of the file, default is PEM

        :return: None
        """
        certfile = _path_string(certfile)
        if not isinstance(filetype, integer_types):
            raise TypeError("filetype must be an integer")

        use_result = _lib.SSL_CTX_use_certificate_file(
            self._context, certfile, filetype
        )
        if not use_result:
            _raise_current_error()
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED):
        """
        Load a private key from a file

        :param keyfile: The name of the key file (``bytes`` or ``unicode``)
        :param filetype: (optional) The encoding of the file, default is PEM

        :return: None
        """
        keyfile = _path_string(keyfile)

        if filetype is _UNSPECIFIED:
            filetype = FILETYPE_PEM
        elif not isinstance(filetype, integer_types):
            raise TypeError("filetype must be an integer")

        use_result = _lib.SSL_CTX_use_PrivateKey_file(
            self._context, keyfile, filetype)
        if not use_result:
            self._raise_passphrase_exception()
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def bio_read(self, bufsiz):
        """
        When using non-socket connections this function reads the "dirty" data
        that would have traveled away on the network.

        :param bufsiz: The maximum number of bytes to read
        :return: The string read.
        """
        if self._from_ssl is None:
            raise TypeError("Connection sock was not None")

        if not isinstance(bufsiz, integer_types):
            raise TypeError("bufsiz must be an integer")

        buf = _ffi.new("char[]", bufsiz)
        result = _lib.BIO_read(self._from_ssl, buf, bufsiz)
        if result <= 0:
            self._handle_bio_errors(self._from_ssl, result)

        return _ffi.buffer(buf, result)[:]
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def bytes(num_bytes):
    """
    Get some random bytes from the PRNG as a string.

    This is a wrapper for the C function ``RAND_bytes``.

    :param num_bytes: The number of bytes to fetch.

    :return: A string of random bytes.
    """
    if not isinstance(num_bytes, _integer_types):
        raise TypeError("num_bytes must be an integer")

    if num_bytes < 0:
        raise ValueError("num_bytes must not be negative")

    result_buffer = _ffi.new("unsigned char[]", num_bytes)
    result_code = _lib.RAND_bytes(result_buffer, num_bytes)
    if result_code == -1:
        # TODO: No tests for this code path.  Triggering a RAND_bytes failure
        # might involve supplying a custom ENGINE?  That's hard.
        _raise_current_error()

    return _ffi.buffer(result_buffer)[:]
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def __init__(self, key, length, algorithm, backend,
                 enforce_key_length=True):
        if not isinstance(backend, HMACBackend):
            raise UnsupportedAlgorithm(
                "Backend object does not implement HMACBackend.",
                _Reasons.BACKEND_MISSING_INTERFACE
            )

        if len(key) < 16 and enforce_key_length is True:
            raise ValueError("Key length has to be at least 128 bits.")

        if not isinstance(length, six.integer_types):
            raise TypeError("Length parameter must be an integer type.")

        if length < 6 or length > 8:
            raise ValueError("Length of HOTP has to be between 6 to 8.")

        if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
            raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")

        self._key = key
        self._length = length
        self._algorithm = algorithm
        self._backend = backend
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def __init__(self, ca, path_length):
        if not isinstance(ca, bool):
            raise TypeError("ca must be a boolean value")

        if path_length is not None and not ca:
            raise ValueError("path_length must be None when ca is False")

        if (
            path_length is not None and
            (not isinstance(path_length, six.integer_types) or path_length < 0)
        ):
            raise TypeError(
                "path_length must be a non-negative integer or None"
            )

        self._ca = ca
        self._path_length = path_length
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def __init__(self, require_explicit_policy, inhibit_policy_mapping):
        if require_explicit_policy is not None and not isinstance(
            require_explicit_policy, six.integer_types
        ):
            raise TypeError(
                "require_explicit_policy must be a non-negative integer or "
                "None"
            )

        if inhibit_policy_mapping is not None and not isinstance(
            inhibit_policy_mapping, six.integer_types
        ):
            raise TypeError(
                "inhibit_policy_mapping must be a non-negative integer or None"
            )

        if inhibit_policy_mapping is None and require_explicit_policy is None:
            raise ValueError(
                "At least one of require_explicit_policy and "
                "inhibit_policy_mapping must not be None"
            )

        self._require_explicit_policy = require_explicit_policy
        self._inhibit_policy_mapping = inhibit_policy_mapping
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def serial_number(self, number):
        """
        Sets the certificate serial number.
        """
        if not isinstance(number, six.integer_types):
            raise TypeError('Serial number must be of integral type.')
        if self._serial_number is not None:
            raise ValueError('The serial number may only be set once.')
        if number <= 0:
            raise ValueError('The serial number should be positive.')

        # ASN.1 integers are always signed, so most significant bit must be
        # zero.
        if utils.bit_length(number) >= 160:  # As defined in RFC 5280
            raise ValueError('The serial number should not be more than 159 '
                             'bits.')
        return CertificateBuilder(
            self._issuer_name, self._subject_name,
            self._public_key, number, self._not_valid_before,
            self._not_valid_after, self._extensions
        )
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def serial_number(self, number):
        if not isinstance(number, six.integer_types):
            raise TypeError('Serial number must be of integral type.')
        if self._serial_number is not None:
            raise ValueError('The serial number may only be set once.')
        if number <= 0:
            raise ValueError('The serial number should be positive')

        # ASN.1 integers are always signed, so most significant bit must be
        # zero.
        if utils.bit_length(number) >= 160:  # As defined in RFC 5280
            raise ValueError('The serial number should not be more than 159 '
                             'bits.')
        return RevokedCertificateBuilder(
            number, self._revocation_date, self._extensions
        )
项目:ecpy    作者:elliptic-shiho    | 项目源码 | 文件源码
def distortion_map(s):
    """
    IMPORTANT: If you want to use this function,
                definition field should be Extended Finite Field.
    return \phi(self), \phi is Distortion map
    Polynomial: x^2+1 or x^2+x+1
    """
    def to_tuple(x):
      from six import integer_types
      if type(x) in integer_types:
        return (x, 0)
      return tuple(x)

    x = to_tuple(s.x)
    y = to_tuple(s.y)
    if s.group.field.t == 1:
      x = (-x[0], -x[1])
      y = (y[1], y[0])
    elif s.group.field.t == 2:
      x = (x[1], x[0])
      y = (y[0], y[1])
    return s.__class__(s.group, x, y)
项目:oldchain-client    作者:mediachain    | 项目源码 | 文件源码
def is_valid_code(code):
    """Check if the digest algorithm code is valid.

    >>> is_valid_code(SHA1)
    True
    >>> is_valid_code(0)
    True
    """
    if is_app_code(code):
        return True

    elif isinstance(code, six.integer_types):
        return code in CODES

    else:
        return False
项目:charm-ceph-osd    作者:openstack    | 项目源码 | 文件源码
def _validate_dict_data(self, expected, actual):
        """Validate dictionary data.

           Compare expected dictionary data vs actual dictionary data.
           The values in the 'expected' dictionary can be strings, bools, ints,
           longs, or can be a function that evaluates a variable and returns a
           bool.
           """
        self.log.debug('actual: {}'.format(repr(actual)))
        self.log.debug('expected: {}'.format(repr(expected)))

        for k, v in six.iteritems(expected):
            if k in actual:
                if (isinstance(v, six.string_types) or
                        isinstance(v, bool) or
                        isinstance(v, six.integer_types)):
                    # handle explicit values
                    if v != actual[k]:
                        return "{}:{}".format(k, actual[k])
                # handle function pointers, such as not_null or valid_ip
                elif not v(actual[k]):
                    return "{}:{}".format(k, actual[k])
            else:
                return "key '{}' does not exist".format(k)
        return None
项目:MMdnn    作者:Microsoft    | 项目源码 | 文件源码
def assign_attr_value(attr, val):
    from mmdnn.conversion.common.IR.graph_pb2 import TensorShape
    '''Assign value to AttrValue proto according to data type.'''
    if isinstance(val, bool):
        attr.b = val
    elif isinstance(val, integer_types):
        attr.i = val
    elif isinstance(val, float):
        attr.f = val
    elif isinstance(val, binary_type) or isinstance(val, text_type):
        if hasattr(val, 'encode'):
            val = val.encode()
        attr.s = val
    elif isinstance(val, TensorShape):
        attr.shape.MergeFromString(val.SerializeToString())
    elif isinstance(val, list):
        if not val: return
        if isinstance(val[0], integer_types):
            attr.list.i.extend(val)
        elif isinstance(val[0], TensorShape):
            attr.list.shape.extend(val)
        else:
            raise NotImplementedError('AttrValue cannot be of list[{}].'.format(val[0]))
    else:
        raise NotImplementedError('AttrValue cannot be of %s' % type(val))
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _asFileDescriptor(obj):
    fd = None
    if not isinstance(obj, integer_types):
        meth = getattr(obj, "fileno", None)
        if meth is not None:
            obj = meth()

    if isinstance(obj, integer_types):
        fd = obj

    if not isinstance(fd, integer_types):
        raise TypeError("argument must be an int, or have a fileno() method.")
    elif fd < 0:
        raise ValueError(
            "file descriptor cannot be a negative integer (%i)" % (fd,))

    return fd
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def use_certificate_file(self, certfile, filetype=FILETYPE_PEM):
        """
        Load a certificate from a file

        :param certfile: The name of the certificate file (``bytes`` or
            ``unicode``).
        :param filetype: (optional) The encoding of the file, default is PEM

        :return: None
        """
        certfile = _path_string(certfile)
        if not isinstance(filetype, integer_types):
            raise TypeError("filetype must be an integer")

        use_result = _lib.SSL_CTX_use_certificate_file(
            self._context, certfile, filetype
        )
        if not use_result:
            _raise_current_error()