我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.integer_types()。
def _validate_dict_data(self, expected, actual): """Validate dictionary data. Compare expected dictionary data vs actual dictionary data. The values in the 'expected' dictionary can be strings, bools, ints, longs, or can be a function that evaluates a variable and returns a bool. """ self.log.debug('actual: {}'.format(repr(actual))) self.log.debug('expected: {}'.format(repr(expected))) for k, v in six.iteritems(expected): if k in actual: if (isinstance(v, six.string_types) or isinstance(v, bool) or isinstance(v, six.integer_types)): # handle explicit values if v != actual[k]: return "{}:{}".format(k, actual[k]) # handle function pointers, such as not_null or valid_ip elif not v(actual[k]): return "{}:{}".format(k, actual[k]) else: return "key '{}' does not exist".format(k) return None
def test_yahoo_bars_to_panel_source(self): env = TradingEnvironment() finder = AssetFinder(env.engine) stocks = ['AAPL', 'GE'] env.write_data(equities_identifiers=stocks) start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc) end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc) data = factory.load_bars_from_yahoo(stocks=stocks, indexes={}, start=start, end=end) check_fields = ['sid', 'open', 'high', 'low', 'close', 'volume', 'price'] copy_panel = data.copy() sids = finder.map_identifier_index_to_sids( data.items, data.major_axis[0] ) copy_panel.items = sids source = DataPanelSource(copy_panel) for event in source: for check_field in check_fields: self.assertIn(check_field, event) self.assertTrue(isinstance(event['volume'], (integer_types))) self.assertTrue(event['sid'] in sids)
def fetch_size(self, v): """ Sets the number of rows that are fetched at a time. *Note that driver's default fetch size is 5000.* .. code-block:: python for user in User.objects().fetch_size(500): print(user) """ if not isinstance(v, six.integer_types): raise TypeError if v == self._fetch_size: return self if v < 1: raise QueryException("fetch size less than 1 is not allowed") clone = copy.deepcopy(self) clone._fetch_size = v return clone
def __init__(self, value): """ Initializer value can be: - integer_type: absolute nanoseconds in the day - datetime.time: built-in time - string_type: a string time of the form "HH:MM:SS[.mmmuuunnn]" """ if isinstance(value, six.integer_types): self._from_timestamp(value) elif isinstance(value, datetime.time): self._from_time(value) elif isinstance(value, six.string_types): self._from_timestring(value) else: raise TypeError('Time arguments must be a whole number, datetime.time, or string')
def __init__(self, value): """ Initializer value can be: - integer_type: absolute days from epoch (1970, 1, 1). Can be negative. - datetime.date: built-in date - string_type: a string time of the form "yyyy-mm-dd" """ if isinstance(value, six.integer_types): self.days_from_epoch = value elif isinstance(value, (datetime.date, datetime.datetime)): self._from_timetuple(value.timetuple()) elif isinstance(value, six.string_types): self._from_datestring(value) else: raise TypeError('Date arguments must be a whole number, datetime.date, or string')
def _asFileDescriptor(obj): fd = None if not isinstance(obj, integer_types): meth = getattr(obj, "fileno", None) if meth is not None: obj = meth() if isinstance(obj, integer_types): fd = obj if not isinstance(fd, integer_types): raise TypeError("argument must be an int, or have a fileno() method.") elif fd < 0: raise ValueError( "file descriptor cannot be a negative integer (%i)" % (fd,)) return fd
def use_certificate_file(self, certfile, filetype=FILETYPE_PEM): """ Load a certificate from a file :param certfile: The name of the certificate file (``bytes`` or ``unicode``). :param filetype: (optional) The encoding of the file, default is PEM :return: None """ certfile = _path_string(certfile) if not isinstance(filetype, integer_types): raise TypeError("filetype must be an integer") use_result = _lib.SSL_CTX_use_certificate_file( self._context, certfile, filetype ) if not use_result: _raise_current_error()
def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED): """ Load a private key from a file :param keyfile: The name of the key file (``bytes`` or ``unicode``) :param filetype: (optional) The encoding of the file, default is PEM :return: None """ keyfile = _path_string(keyfile) if filetype is _UNSPECIFIED: filetype = FILETYPE_PEM elif not isinstance(filetype, integer_types): raise TypeError("filetype must be an integer") use_result = _lib.SSL_CTX_use_PrivateKey_file( self._context, keyfile, filetype) if not use_result: self._raise_passphrase_exception()
def bio_read(self, bufsiz): """ When using non-socket connections this function reads the "dirty" data that would have traveled away on the network. :param bufsiz: The maximum number of bytes to read :return: The string read. """ if self._from_ssl is None: raise TypeError("Connection sock was not None") if not isinstance(bufsiz, integer_types): raise TypeError("bufsiz must be an integer") buf = _ffi.new("char[]", bufsiz) result = _lib.BIO_read(self._from_ssl, buf, bufsiz) if result <= 0: self._handle_bio_errors(self._from_ssl, result) return _ffi.buffer(buf, result)[:]
def bytes(num_bytes): """ Get some random bytes from the PRNG as a string. This is a wrapper for the C function ``RAND_bytes``. :param num_bytes: The number of bytes to fetch. :return: A string of random bytes. """ if not isinstance(num_bytes, _integer_types): raise TypeError("num_bytes must be an integer") if num_bytes < 0: raise ValueError("num_bytes must not be negative") result_buffer = _ffi.new("char[]", num_bytes) result_code = _lib.RAND_bytes(result_buffer, num_bytes) if result_code == -1: # TODO: No tests for this code path. Triggering a RAND_bytes failure # might involve supplying a custom ENGINE? That's hard. _raise_current_error() return _ffi.buffer(result_buffer)[:]
def __init__(self, key, length, algorithm, backend): if not isinstance(backend, HMACBackend): raise UnsupportedAlgorithm( "Backend object does not implement HMACBackend.", _Reasons.BACKEND_MISSING_INTERFACE ) if len(key) < 16: raise ValueError("Key length has to be at least 128 bits.") if not isinstance(length, six.integer_types): raise TypeError("Length parameter must be an integer type.") if length < 6 or length > 8: raise ValueError("Length of HOTP has to be between 6 to 8.") if not isinstance(algorithm, (SHA1, SHA256, SHA512)): raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.") self._key = key self._length = length self._algorithm = algorithm self._backend = backend
def __init__(self, ca, path_length): if not isinstance(ca, bool): raise TypeError("ca must be a boolean value") if path_length is not None and not ca: raise ValueError("path_length must be None when ca is False") if ( path_length is not None and (not isinstance(path_length, six.integer_types) or path_length < 0) ): raise TypeError( "path_length must be a non-negative integer or None" ) self._ca = ca self._path_length = path_length
def serial_number(self, number): """ Sets the certificate serial number. """ if not isinstance(number, six.integer_types): raise TypeError('Serial number must be of integral type.') if self._serial_number is not None: raise ValueError('The serial number may only be set once.') if number < 0: raise ValueError('The serial number should be non-negative.') if utils.bit_length(number) > 160: # As defined in RFC 5280 raise ValueError('The serial number should not be more than 160 ' 'bits.') return CertificateBuilder( self._issuer_name, self._subject_name, self._public_key, number, self._not_valid_before, self._not_valid_after, self._extensions )
def set_verify(self, mode, callback): """ Set the verify mode and verify callback :param mode: The verify mode, this is either VERIFY_NONE or VERIFY_PEER combined with possible other flags :param callback: The Python callback to use :return: None See SSL_CTX_set_verify(3SSL) for further details. """ if not isinstance(mode, integer_types): raise TypeError("mode must be an integer") if not callable(callback): raise TypeError("callback must be callable") self._verify_helper = _VerifyHelper(callback) self._verify_callback = self._verify_helper.callback _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback)
def __init__(self, require_explicit_policy, inhibit_policy_mapping): if require_explicit_policy is not None and not isinstance( require_explicit_policy, six.integer_types ): raise TypeError( "require_explicit_policy must be a non-negative integer or " "None" ) if inhibit_policy_mapping is not None and not isinstance( inhibit_policy_mapping, six.integer_types ): raise TypeError( "inhibit_policy_mapping must be a non-negative integer or None" ) if inhibit_policy_mapping is None and require_explicit_policy is None: raise ValueError( "At least one of require_explicit_policy and " "inhibit_policy_mapping must not be None" ) self._require_explicit_policy = require_explicit_policy self._inhibit_policy_mapping = inhibit_policy_mapping
def __getitem__(self, s): if isinstance(s, six.integer_types): # Single index assert s >= 0, 'negative indexes are not supported' qs = copy(self) qs._limits = (s, 1) return six.next(iter(qs)) else: # Slice assert s.step in (None, 1), 'step is not supported in slices' start = s.start or 0 stop = s.stop or 2**63 - 1 assert start >= 0 and stop >= 0, 'negative indexes are not supported' assert start <= stop, 'start of slice cannot be smaller than its end' qs = copy(self) qs._limits = (start, stop - start) return qs
def fetch_size(self, v): """Sets the number of rows that are fetched at a time. *Note that driver's default fetch size is 5000.* .. code-block:: python for user in User.objects().fetch_size(500): print(user) """ if not isinstance(v, six.integer_types): raise TypeError if v == self._fetch_size: return self if v < 1: raise QueryException("fetch size less than 1 is not allowed") clone = copy.deepcopy(self) clone._fetch_size = v return clone
def timestamp_normalized(self): """ we're expecting self.timestamp to be either a long, int, a datetime, or a timedelta :return: """ if not self.timestamp: return None if isinstance(self.timestamp, six.integer_types): return self.timestamp if isinstance(self.timestamp, timedelta): tmp = datetime.now() + self.timestamp else: tmp = self.timestamp return int(time.mktime(tmp.timetuple()) * 1e+6 + tmp.microsecond)
def validate_emscripten_memory_size(memory_size): if isinstance(memory_size, six.integer_types): if memory_size <= 0: raise ValueError("Invalid memory size value {size}: a positive number expected".format(size=memory_size)) return ["-s", "TOTAL_MEMORY=" + str(memory_size)] elif isinstance(memory_size, str): import re match = re.match(r"^(\d+)([MK]?)$", memory_size) if not match: raise ValueError("Invalid memory size value {size}: " "an integer expected with an optional M(ega) or K(ilo) suffix " "(e.g. 256M)".format(size=memory_size)) number = int(match.group(1)) suffix = match.group(2) memory_size = {"": number, "K": number * 1024, "M": number * 1048576}[suffix] return validate_emscripten_memory_size(memory_size) elif memory_size is all: return ["-s", "ALLOW_MEMORY_GROWTH=1"] else: raise TypeError("Invalid memory size type: an integer, string, or all expected")
def validate(cls, value, minimum=None, maximum=None): if value is None: return None if not isinstance(value, six.integer_types): try: value = int(value) except Exception: LOG.exception('Failed to convert value to int') raise exception.InvalidValue(value=value, type=cls.type_name) if minimum is not None and value < minimum: message = _("Integer '%(value)s' is smaller than " "'%(min)d'.") % {'value': value, 'min': minimum} raise exception.InvalidValue(message=message) if maximum is not None and value > maximum: message = _("Integer '%(value)s' is large than " "'%(max)d'.") % {'value': value, 'max': maximum} raise exception.InvalidValue(message=message) return value
def _determine_and_check_timeout_interval(timeout, default_timeout, interval, default_interval): if timeout is None: timeout = default_timeout if interval is None: interval = default_interval if (not isinstance(timeout, six.integer_types) or not isinstance(interval, six.integer_types) or timeout < 0 or interval < 0): raise AssertionError( 'timeout and interval should be >= 0 or None, current values are: ' '%(timeout)s, %(interval)s respectively. If timeout and/or ' 'interval are None, the default_timeout and default_interval are ' 'used, and they should be integers >= 0, current values are: ' '%(default_timeout)s, %(default_interval)s respectively.' % dict( timeout=timeout, interval=interval, default_timeout=default_timeout, default_interval=default_interval) ) return timeout, interval
def _set_status(self, value): """The status string, including code and message.""" message = None # Accept long because urlfetch in App Engine returns codes as longs. if isinstance(value, six.integer_types): code = int(value) else: if isinstance(value, six.text_type): # Status messages have to be ASCII safe, so this is OK. value = str(value) if not isinstance(value, str): raise TypeError( 'You must set status to a string or integer (not %s)' % type(value)) parts = value.split(' ', 1) code = int(parts[0]) if len(parts) == 2: message = parts[1] message = message or Response.http_status_message(code) self._status = '%d %s' % (code, message)
def process_expansions(dct): def expand(val, dct): if isinstance(val, six.integer_types) or isinstance(val, bool): return val if isinstance(val, six.string_types): dct2 = copy.deepcopy(dct) for env_key, env_val in six.iteritems(os.environ): dct2[env_key] = env_val return string.Template(val).safe_substitute(dct2) if isinstance(val, list): return [expand(x, dct) for x in val] if isinstance(val, dict): return {k: expand(v,val) for k,v in six.iteritems(val)} return val for key,val in six.iteritems(dct): nval = expand(val, dct) dct[key] = nval
def bytes(num_bytes): """ Get some random bytes from the PRNG as a string. This is a wrapper for the C function ``RAND_bytes``. :param num_bytes: The number of bytes to fetch. :return: A string of random bytes. """ if not isinstance(num_bytes, _integer_types): raise TypeError("num_bytes must be an integer") if num_bytes < 0: raise ValueError("num_bytes must not be negative") result_buffer = _ffi.new("unsigned char[]", num_bytes) result_code = _lib.RAND_bytes(result_buffer, num_bytes) if result_code == -1: # TODO: No tests for this code path. Triggering a RAND_bytes failure # might involve supplying a custom ENGINE? That's hard. _raise_current_error() return _ffi.buffer(result_buffer)[:]
def __init__(self, key, length, algorithm, backend, enforce_key_length=True): if not isinstance(backend, HMACBackend): raise UnsupportedAlgorithm( "Backend object does not implement HMACBackend.", _Reasons.BACKEND_MISSING_INTERFACE ) if len(key) < 16 and enforce_key_length is True: raise ValueError("Key length has to be at least 128 bits.") if not isinstance(length, six.integer_types): raise TypeError("Length parameter must be an integer type.") if length < 6 or length > 8: raise ValueError("Length of HOTP has to be between 6 to 8.") if not isinstance(algorithm, (SHA1, SHA256, SHA512)): raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.") self._key = key self._length = length self._algorithm = algorithm self._backend = backend
def serial_number(self, number): """ Sets the certificate serial number. """ if not isinstance(number, six.integer_types): raise TypeError('Serial number must be of integral type.') if self._serial_number is not None: raise ValueError('The serial number may only be set once.') if number <= 0: raise ValueError('The serial number should be positive.') # ASN.1 integers are always signed, so most significant bit must be # zero. if utils.bit_length(number) >= 160: # As defined in RFC 5280 raise ValueError('The serial number should not be more than 159 ' 'bits.') return CertificateBuilder( self._issuer_name, self._subject_name, self._public_key, number, self._not_valid_before, self._not_valid_after, self._extensions )
def serial_number(self, number): if not isinstance(number, six.integer_types): raise TypeError('Serial number must be of integral type.') if self._serial_number is not None: raise ValueError('The serial number may only be set once.') if number <= 0: raise ValueError('The serial number should be positive') # ASN.1 integers are always signed, so most significant bit must be # zero. if utils.bit_length(number) >= 160: # As defined in RFC 5280 raise ValueError('The serial number should not be more than 159 ' 'bits.') return RevokedCertificateBuilder( number, self._revocation_date, self._extensions )
def distortion_map(s): """ IMPORTANT: If you want to use this function, definition field should be Extended Finite Field. return \phi(self), \phi is Distortion map Polynomial: x^2+1 or x^2+x+1 """ def to_tuple(x): from six import integer_types if type(x) in integer_types: return (x, 0) return tuple(x) x = to_tuple(s.x) y = to_tuple(s.y) if s.group.field.t == 1: x = (-x[0], -x[1]) y = (y[1], y[0]) elif s.group.field.t == 2: x = (x[1], x[0]) y = (y[0], y[1]) return s.__class__(s.group, x, y)
def is_valid_code(code): """Check if the digest algorithm code is valid. >>> is_valid_code(SHA1) True >>> is_valid_code(0) True """ if is_app_code(code): return True elif isinstance(code, six.integer_types): return code in CODES else: return False
def assign_attr_value(attr, val): from mmdnn.conversion.common.IR.graph_pb2 import TensorShape '''Assign value to AttrValue proto according to data type.''' if isinstance(val, bool): attr.b = val elif isinstance(val, integer_types): attr.i = val elif isinstance(val, float): attr.f = val elif isinstance(val, binary_type) or isinstance(val, text_type): if hasattr(val, 'encode'): val = val.encode() attr.s = val elif isinstance(val, TensorShape): attr.shape.MergeFromString(val.SerializeToString()) elif isinstance(val, list): if not val: return if isinstance(val[0], integer_types): attr.list.i.extend(val) elif isinstance(val[0], TensorShape): attr.list.shape.extend(val) else: raise NotImplementedError('AttrValue cannot be of list[{}].'.format(val[0])) else: raise NotImplementedError('AttrValue cannot be of %s' % type(val))