我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用django.utils.six.binary_type()。
def effective_default(self, field): """ Returns a field's effective database default value """ if field.has_default(): default = field.get_default() elif not field.null and field.blank and field.empty_strings_allowed: if field.get_internal_type() == "BinaryField": default = six.binary_type() else: default = six.text_type() else: default = None # If it's a callable, call it if six.callable(default): default = default() # Run it through the field's get_db_prep_save method so we can send it # to the database. default = field.get_db_prep_save(default, self.connection) return default
def test_sign_unsign(self): """sign/unsign should be reversible""" signer = signing.BytesSigner('predictable-secret') examples = [ b'q;wjmbk;wkmb', b'3098247529087', b'3098247:529:087:', b'jkw osanteuh ,rcuh nthu aou oauh ,ud du', b'\u2019', ] if six.PY2: examples.append(b'a byte string') for example in examples: signed = signer.sign(example) self.assertIsInstance(signed, six.binary_type) self.assertNotEqual(force_str(example), signed) self.assertEqual(example, signer.unsign(signed))
def save(self): """ ?? manifest ?? ?? ?? Layers ?Links ?? :return: """ data_string = self.data_string if not isinstance(data_string, six.binary_type): data_string = data_string.encode("utf-8") manifest_digest = storage.save_manifest(data_string) # save tags current_tag_path = storage.path_spec.get_tag_current_path(name=self.name, tag_name=self.reference) storage.link(manifest_digest, current_tag_path) tag_index_path = storage.path_spec.get_tag_index_path(name=self.name, tag_name=self.reference, digest=manifest_digest) storage.link(manifest_digest, tag_index_path) # Save reference reference_path = storage.path_spec.get_reference_path(name=self.name, digest=manifest_digest) storage.link(manifest_digest, reference_path) for layer in self.layers: layer_path = storage.path_spec.get_layer_path(name=self.name, digest=layer) storage.link(layer, layer_path)
def _decode(value): """ Base64 ?????"=" ???????“?”???Docker????????? :param value: :return: """ length = len(value) % 4 if length in (2, 3,): value += (4 - length) * "=" elif length != 0: raise ValueError("Invalid base64 string") if not isinstance(value, six.binary_type): value = value.encode() return base64.urlsafe_b64decode(value)
def test_read(self): key_data = keys.read_key_file('test-2048.pem') self.assertIsInstance(key_data, six.binary_type) key_data_lines = key_data.decode('utf-8').split('\n') self.assertIn('-----BEGIN PRIVATE KEY-----', key_data_lines) self.assertIn('-----END PRIVATE KEY-----', key_data_lines) self.assertIn('-----BEGIN PUBLIC KEY-----', key_data_lines) self.assertIn('-----END PUBLIC KEY-----', key_data_lines)
def attach(self, filename=None, content=None, mimetype=None): """ Attaches a file with the given filename and content. The filename can be omitted and the mimetype is guessed, if not provided. If the first parameter is a MIMEBase subclass it is inserted directly into the resulting message attachments. For a text/* mimetype (guessed or specified), when a bytes object is specified as content, it will be decoded as UTF-8. If that fails, the mimetype will be set to DEFAULT_ATTACHMENT_MIME_TYPE and the content is not decoded. """ if isinstance(filename, MIMEBase): assert content is None assert mimetype is None self.attachments.append(filename) else: assert content is not None if not mimetype: mimetype, _ = mimetypes.guess_type(filename) if not mimetype: mimetype = DEFAULT_ATTACHMENT_MIME_TYPE basetype, subtype = mimetype.split('/', 1) if basetype == 'text': if isinstance(content, six.binary_type): try: content = content.decode('utf-8') except UnicodeDecodeError: # If mimetype suggests the file is text but it's actually # binary, read() will raise a UnicodeDecodeError on Python 3. mimetype = DEFAULT_ATTACHMENT_MIME_TYPE self.attachments.append((filename, content, mimetype))
def to_internal_value(self, data): try: if self.binary or getattr(data, 'is_json_string', False): if isinstance(data, six.binary_type): data = data.decode('utf-8') return json.loads(data) else: json.dumps(data) except (TypeError, ValueError): self.fail('invalid') return data
def unicode_http_header(value): # Coerce HTTP header value to unicode. if isinstance(value, six.binary_type): return value.decode('iso-8859-1') return value
def test_settings_has_key(self): key = settings.CRYPTOGRAPHY_KEY self.assertIsNotNone(key) self.assertIsInstance(key, six.binary_type)
def _from_python(self, value): """Convert more Python data types to ES-understandable JSON.""" iso = self._iso_datetime(value) if iso: return iso elif isinstance(value, six.binary_type): # TODO: Be stricter. return six.text_type(value, errors='replace') elif isinstance(value, set): return list(value) return value
def context(self): ctx = {} if six.PY2 and isinstance(self.raw_response, six.text_type): self.raw_response = self.raw_response.encode('utf8') for key, val in parse_qsl(self.raw_response): if isinstance(key, six.binary_type): key = key.decode('utf8') if isinstance(val, six.binary_type): val = val.decode('utf8') ctx[key] = [val] return ctx
def to_internal_value(self, data): try: if self.binary: if isinstance(data, six.binary_type): data = data.decode('utf-8') return json.loads(data) else: json.dumps(data) except (TypeError, ValueError): self.fail('invalid') return data
def effective_default(self, field): """ Returns a field's effective database default value """ if field.has_default(): default = field.get_default() elif not field.null and field.blank and field.empty_strings_allowed: if field.get_internal_type() == "BinaryField": default = six.binary_type() else: default = six.text_type() elif getattr(field, 'auto_now', False) or getattr(field, 'auto_now_add', False): default = datetime.now() internal_type = field.get_internal_type() if internal_type == 'DateField': default = default.date elif internal_type == 'TimeField': default = default.time elif internal_type == 'DateTimeField': default = timezone.now else: default = None # If it's a callable, call it if callable(default): default = default() # Run it through the field's get_db_prep_save method so we can send it # to the database. default = field.get_db_prep_save(default, self.connection) return default
def __init__(self, data, delimiter=None): if hasattr(data, 'seek'): self.data = data else: if six.PY3: if isinstance(data, six.binary_type): self.data = io.StringIO(data.decode('utf-8')) else: self.data = io.StringIO(data) else: self.data = io.BytesIO(data) if delimiter is None: try: self.dialect = csv.Sniffer().sniff(self.data.read(200), delimiters=';:,') except csv.Error: self.dialect = None self.delimiter = ':' else: self.delimiter = self.dialect.delimiter finally: self.data.seek(0) self.reader = csv.DictReader(CommentStripper(self.data), fieldnames=self.format, delimiter=self.delimiter, restkey=self.restkey) self.line_num = 0
def _decode_as_utf8(row): """Decodes all unicode values in row as utf-8 strings""" for key, value in row.items(): if isinstance(value, six.binary_type): row[key] = value.decode('utf-8') return row
def is_invalid_utf8(string): """Returns True if string is invalid UTF-8. If string is not a an str object, or is decodeable as UTF-8, False is returned. """ if isinstance(string, six.binary_type): try: string.decode('utf-8') except UnicodeDecodeError: return True return False
def unicode_utf8(thing): """Casts thing to unicode, assuming utf-8 encoding if a binary string. If the argument is None, it is returned unchanged. """ if isinstance(thing, six.binary_type): return thing.decode('utf-8') elif thing is not None: return six.text_type(thing)
def scan(addresses): """Scan a list of ip-addresses for netbios names""" _logger.debug('Scanning %s addresses', len(addresses)) proc = Popen(['nbtscan', '-f-', '-s', SPLITCHAR], stdin=PIPE, stdout=PIPE) stdout, stderr = proc.communicate('\n'.join(addresses)) if stderr: raise Exception(stderr) if isinstance(stdout, six.binary_type): stdout = stdout.decode('cp850') # cp850 seems like netbios' standard _logger.debug('Result from scan:\n%s', stdout) return stdout
def link(self, digest, target): """ Link Blob ????? :param digest: sha256?XXX ??? :param target: :return: """ target += "/link" if not isinstance(digest, six.binary_type): digest = digest.encode("utf-8") ensure_dir(target) with open(target, "w+b")as f: f.write(digest)
def __init__(self, data_string, name, reference): self.name = name self.reference = reference if isinstance(data_string, six.binary_type): data_string = data_string.decode() data = json.loads(data_string) self.version = data['schemaVersion'] if self.version == 2: self.manifest = ManifestV2(data_string, data, name, reference) else: self.manifest = ManifestV1(data_string, data, name, reference)
def _encode(value): """ Base64 ?? ???????“?”???Docker????????? :param value: :return: """ if not isinstance(value, six.binary_type): value = value.encode() data_string = base64.urlsafe_b64encode(value) # ?????“?”???Docker????????? return data_string.decode().replace("=", "").encode()
def mock_inputs(inputs): """ Decorator to temporarily replace input/getpass to allow interactive createsuperuser. """ def inner(test_func): def wrapped(*args): class mock_getpass: @staticmethod def getpass(prompt=b'Password: ', stream=None): if PY2: # getpass on Windows only supports prompt as bytestring (#19807) assert isinstance(prompt, binary_type) return inputs['password'] def mock_input(prompt): # prompt should be encoded in Python 2. This line will raise an # Exception if prompt contains unencoded non-ascii on Python 2. prompt = str(prompt) assert str('__proxy__') not in prompt response = '' for key, val in inputs.items(): if force_str(key) in prompt.lower(): response = val break return response old_getpass = createsuperuser.getpass old_input = createsuperuser.input createsuperuser.getpass = mock_getpass createsuperuser.input = mock_input try: test_func(*args) finally: createsuperuser.getpass = old_getpass createsuperuser.input = old_input return wrapped return inner
def get_traceback_data(self): """Return a dictionary containing traceback information.""" if self.exc_type and issubclass(self.exc_type, TemplateDoesNotExist): self.template_does_not_exist = True self.postmortem = self.exc_value.chain or [self.exc_value] frames = self.get_traceback_frames() for i, frame in enumerate(frames): if 'vars' in frame: frame_vars = [] for k, v in frame['vars']: v = pprint(v) # The force_escape filter assume unicode, make sure that works if isinstance(v, six.binary_type): v = v.decode('utf-8', 'replace') # don't choke on non-utf-8 input # Trim large blobs of data if len(v) > 4096: v = '%s... <trimmed %d bytes string>' % (v[0:4096], len(v)) frame_vars.append((k, force_escape(v))) frame['vars'] = frame_vars frames[i] = frame unicode_hint = '' if self.exc_type and issubclass(self.exc_type, UnicodeError): start = getattr(self.exc_value, 'start', None) end = getattr(self.exc_value, 'end', None) if start is not None and end is not None: unicode_str = self.exc_value.args[1] unicode_hint = smart_text( unicode_str[max(start - 5, 0):min(end + 5, len(unicode_str))], 'ascii', errors='replace' ) from django import get_version c = { 'is_email': self.is_email, 'unicode_hint': unicode_hint, 'frames': frames, 'request': self.request, 'filtered_POST': self.filter.get_post_parameters(self.request), 'settings': get_safe_settings(), 'sys_executable': sys.executable, 'sys_version_info': '%d.%d.%d' % sys.version_info[0:3], 'server_time': timezone.now(), 'django_version_info': get_version(), 'sys_path': sys.path, 'template_info': self.template_info, 'template_does_not_exist': self.template_does_not_exist, 'postmortem': self.postmortem, } # Check whether exception info is available if self.exc_type: c['exception_type'] = self.exc_type.__name__ if self.exc_value: c['exception_value'] = smart_text(self.exc_value, errors='replace') if frames: c['lastframe'] = frames[-1] return c
def _get_lines_from_file(self, filename, lineno, context_lines, loader=None, module_name=None): """ Returns context_lines before and after lineno from file. Returns (pre_context_lineno, pre_context, context_line, post_context). """ source = None if loader is not None and hasattr(loader, "get_source"): try: source = loader.get_source(module_name) except ImportError: pass if source is not None: source = source.splitlines() if source is None: try: with open(filename, 'rb') as fp: source = fp.read().splitlines() except (OSError, IOError): pass if source is None: return None, [], None, [] # If we just read the source from a file, or if the loader did not # apply tokenize.detect_encoding to decode the source into a Unicode # string, then we should do that ourselves. if isinstance(source[0], six.binary_type): encoding = 'ascii' for line in source[:2]: # File coding may be specified. Match pattern from PEP-263 # (http://www.python.org/dev/peps/pep-0263/) match = re.search(br'coding[:=]\s*([-\w.]+)', line) if match: encoding = match.group(1).decode('ascii') break source = [six.text_type(sline, encoding, 'replace') for sline in source] lower_bound = max(0, lineno - context_lines) upper_bound = lineno + context_lines pre_context = source[lower_bound:lineno] context_line = source[lineno] post_context = source[lineno + 1:upper_bound] return lower_bound, pre_context, context_line, post_context
def default(self, obj): # For Date Time string spec, see ECMA 262 # http://ecma-international.org/ecma-262/5.1/#sec-15.9.1.15 if isinstance(obj, Promise): return force_text(obj) elif isinstance(obj, datetime.datetime): representation = obj.isoformat() if representation.endswith('+00:00'): representation = representation[:-6] + 'Z' return representation elif isinstance(obj, datetime.date): return obj.isoformat() elif isinstance(obj, datetime.time): if timezone and timezone.is_aware(obj): raise ValueError("JSON can't represent timezone-aware times.") representation = obj.isoformat() if obj.microsecond: representation = representation[:12] return representation elif isinstance(obj, datetime.timedelta): return six.text_type(total_seconds(obj)) elif isinstance(obj, decimal.Decimal): # Serializers will coerce decimals to strings by default. return float(obj) elif isinstance(obj, uuid.UUID): return six.text_type(obj) elif isinstance(obj, QuerySet): return tuple(obj) elif isinstance(obj, six.binary_type): # Best-effort for binary blobs. See #4187. return obj.decode('utf-8') elif hasattr(obj, 'tolist'): # Numpy arrays and array scalars. return obj.tolist() elif (coreapi is not None) and isinstance(obj, (coreapi.Document, coreapi.Error)): raise RuntimeError( 'Cannot return a coreapi object from a JSON view. ' 'You should be using a schema renderer instead for this view.' ) elif hasattr(obj, '__getitem__'): try: return dict(obj) except: pass elif hasattr(obj, '__iter__'): return tuple(item for item in obj) return super(JSONEncoder, self).default(obj)
def default(self, obj): # For Date Time string spec, see ECMA 262 # http://ecma-international.org/ecma-262/5.1/#sec-15.9.1.15 if isinstance(obj, Promise): return force_text(obj) elif isinstance(obj, datetime.datetime): representation = obj.isoformat() if representation.endswith('+00:00'): representation = representation[:-6] + 'Z' return representation elif isinstance(obj, datetime.date): return obj.isoformat() elif isinstance(obj, datetime.time): if timezone and timezone.is_aware(obj): raise ValueError("JSON can't represent timezone-aware times.") representation = obj.isoformat() if obj.microsecond: representation = representation[:12] return representation elif isinstance(obj, datetime.timedelta): return six.text_type(total_seconds(obj)) elif isinstance(obj, decimal.Decimal): # Serializers will coerce decimals to strings by default. return float(obj) elif isinstance(obj, uuid.UUID): return six.text_type(obj) elif isinstance(obj, QuerySet): return tuple(obj) elif isinstance(obj, six.binary_type): # Best-effort for binary blobs. See #4187. return obj.decode('utf-8') elif hasattr(obj, 'tolist'): # Numpy arrays and array scalars. return obj.tolist() elif hasattr(obj, '__getitem__'): # noinspection PyBroadException try: return dict(obj) except: pass elif hasattr(obj, '__iter__'): return tuple(item for item in obj) return super(JSONEncoder, self).default(obj)
def get_traceback_data(self): """Return a dictionary containing traceback information.""" if self.exc_type and issubclass(self.exc_type, TemplateDoesNotExist): self.template_does_not_exist = True self.postmortem = self.exc_value.chain or [self.exc_value] frames = self.get_traceback_frames() for i, frame in enumerate(frames): if 'vars' in frame: frame_vars = [] for k, v in frame['vars']: v = pprint(v) # The force_escape filter assume unicode, make sure that works if isinstance(v, six.binary_type): v = v.decode('utf-8', 'replace') # don't choke on non-utf-8 input # Trim large blobs of data if len(v) > 4096: v = '%s... <trimmed %d bytes string>' % (v[0:4096], len(v)) frame_vars.append((k, force_escape(v))) frame['vars'] = frame_vars frames[i] = frame unicode_hint = '' if self.exc_type and issubclass(self.exc_type, UnicodeError): start = getattr(self.exc_value, 'start', None) end = getattr(self.exc_value, 'end', None) if start is not None and end is not None: unicode_str = self.exc_value.args[1] unicode_hint = force_text( unicode_str[max(start - 5, 0):min(end + 5, len(unicode_str))], 'ascii', errors='replace' ) from django import get_version c = { 'is_email': self.is_email, 'unicode_hint': unicode_hint, 'frames': frames, 'request': self.request, 'filtered_POST_items': self.filter.get_post_parameters(self.request).items(), 'settings': get_safe_settings(), 'sys_executable': sys.executable, 'sys_version_info': '%d.%d.%d' % sys.version_info[0:3], 'server_time': timezone.now(), 'django_version_info': get_version(), 'sys_path': sys.path, 'template_info': self.template_info, 'template_does_not_exist': self.template_does_not_exist, 'postmortem': self.postmortem, } if self.request is not None: c['request_GET_items'] = self.request.GET.items() c['request_FILES_items'] = self.request.FILES.items() c['request_COOKIES_items'] = self.request.COOKIES.items() # Check whether exception info is available if self.exc_type: c['exception_type'] = self.exc_type.__name__ if self.exc_value: c['exception_value'] = force_text(self.exc_value, errors='replace') if frames: c['lastframe'] = frames[-1] return c