我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用django.utils.encoding.smart_bytes()。
def execute_command(self, command, content=None, cwd=None): pipe = subprocess.Popen(command, shell=True, cwd=cwd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) if content: content = smart_bytes(content) stdout, stderr = pipe.communicate(content) if isinstance(stderr, bytes): try: stderr = stderr.decode() except UnicodeDecodeError: pass if stderr.strip(): raise CompilerError(stderr, error_output=stderr) if self.verbose: print(stderr) if pipe.returncode != 0: msg = "Command '{0}' returned non-zero exit status {1}".format(command, pipe.returncode) raise CompilerError(msg, error_output=msg) return stdout
def to_python(self, value): if value is None: return None if isinstance(value, oauth2client.client.Credentials): return value return pickle.loads(base64.b64decode(smart_bytes(value)))
def default_username_algo(email): # store the username as a base64 encoded sha1 of the email address # this protects against data leakage because usernames are often # treated as public identifiers (so we can't use the email address). return base64.urlsafe_b64encode( hashlib.sha1(smart_bytes(email)).digest() ).rstrip(b'=')
def to_python(self, value): """Overrides ``models.Field`` method. This is used to convert bytes (from serialization etc) to an instance of this class""" if value is None: return None elif isinstance(value, oauth2client.client.Credentials): return value else: try: return jsonpickle.decode( base64.b64decode(encoding.smart_bytes(value)).decode()) except ValueError: return pickle.loads( base64.b64decode(encoding.smart_bytes(value)))
def decrypt_pdf(src): decrypted = TemporaryFile() popen = subprocess.Popen(['qpdf', '--decrypt', '/dev/stdin', '-'], stdin=src, stdout=decrypted, stderr=subprocess.PIPE) stdout, stderr = popen.communicate() if popen.returncode in (0, 3): # 0 == ok, 3 == warning if popen.returncode == 3: logger.warn('qpdf warning:\n%s', smart_bytes(stderr, errors='backslashreplace')) else: from ecs.users.utils import get_current_user user = get_current_user() logger.warn('qpdf error (returncode=%s):\nUser: %s (%s)\n%s', popen.returncode, user, user.email if user else 'anonymous', smart_bytes(stderr, errors='backslashreplace')) raise ValueError('pdf broken') decrypted.seek(0) return decrypted
def render_style(self, style_path, force=False, **kwargs): """ Renders the CSS template at *style_path* using *kwargs* and returns the path to the rendered result. If the given style has already been rendered the existing cache path will be returned. If *force* is ``True`` the stylesheet will be rendered even if it already exists (cached). This method also cleans up older versions of the same rendered template. """ cache_dir = self.settings['cache_dir'] if not isinstance(cache_dir, str): cache_dir = cache_dir.decode('utf-8') if not isinstance(style_path, str): style_path = style_path.decode('utf-8') mtime = os.stat(style_path).st_mtime shortened_path = short_hash(style_path) rendered_filename = 'rendered_%s_%s' % (shortened_path, int(mtime)) rendered_path = os.path.join(cache_dir, rendered_filename) if not os.path.exists(rendered_path) or force: style_css = self.render_string( style_path, **kwargs ) # NOTE: Tornado templates are always rendered as bytes. That is why # we're using 'wb' below... with io.open(rendered_path, 'wb') as f: f.write(smart_bytes(style_css)) # Remove older versions of the rendered template if present for fname in os.listdir(cache_dir): if fname == rendered_filename: continue elif shortened_path in fname: # Older version present. # Remove it (and it's minified counterpart). os.remove(os.path.join(cache_dir, fname)) return rendered_path
def get_hexdigest(plaintext, length=None): digest = hashlib.md5(smart_bytes(plaintext)).hexdigest() if length: return digest[:length] return digest
def get_precompiler_cachekey(command, contents): return hashlib.sha1(smart_bytes('precompiler.%s.%s' % (command, contents))).hexdigest()
def verify_token(self, token, **kwargs): """Validate the token signature.""" nonce = kwargs.get('nonce') if self.OIDC_RP_SIGN_ALGO.startswith('RS'): key = self.OIDC_RP_IDP_SIGN_KEY else: key = self.OIDC_RP_CLIENT_SECRET # Verify the token verified_token = self._verify_jws( force_bytes(token), # Use smart_bytes here since the key string comes from settings. smart_bytes(key), ) # The 'verified_token' will always be a byte string since it's # the result of base64.urlsafe_b64decode(). # The payload is always the result of base64.urlsafe_b64decode(). # In Python 3 and 2, that's always a byte string. # In Python3.6, the json.loads() function can accept a byte string # as it will automagically decode it to a unicode string before # deserializing https://bugs.python.org/issue17909 token_nonce = json.loads(verified_token.decode('utf-8')).get('nonce') if import_from_settings('OIDC_USE_NONCE', True) and nonce != token_nonce: msg = 'JWT Nonce verification failed.' raise SuspiciousOperation(msg) return True
def to_python(self, value): """Overrides ``models.Field`` method. This is used to convert bytes (from serialization etc) to an instance of this class""" if value is None: return None elif isinstance(value, oauth2client.client.Credentials): return value else: return pickle.loads(base64.b64decode(encoding.smart_bytes(value)))
def generate_sha1(string, salt=None): if not isinstance(string, (str, text_type)): string = str(string) if not salt: salt = sha1(str(random.random()).encode('utf-8')).hexdigest()[:5] salted_bytes = (smart_bytes(salt) + smart_bytes(string)) hash_ = sha1(salted_bytes).hexdigest() return salt, hash_
def dumps(self, value): return force_bytes(json.dumps(value))
def fill_template(template_name, context, output_format='odt'): """ Fill a document with data and convert it to the requested format. Returns an absolute path to the generated file. """ if not isinstance(context, Context): context = Context(context) context['output_format'] = output_format source_file = find_template_file(template_name) source_extension = os.path.splitext(source_file)[1] source = zipfile.ZipFile(source_file, 'r') dest_file = NamedTemporaryFile(delete=False, suffix=source_extension) dest = zipfile.ZipFile(dest_file, 'w') manifest_data = '' for name in source.namelist(): data = source.read(name) if name.endswith('.xml'): data = smart_str(data) if any(name.endswith(file) for file in ('content.xml', 'styles.xml')): template = Template(fix_inline_tags(data)) data = template.render(context) elif name == 'META-INF/manifest.xml': manifest_data = data[:-20] # Cut off the closing </manifest> tag continue # We will append it at the very end dest.writestr(name, smart_bytes(data)) for _, image in context.dicts[0].get(IMAGES_CONTEXT_KEY, {}).items(): filename = os.path.basename(image.name) ext = os.path.splitext(filename)[1][1:] manifest_data += ('<manifest:file-entry ' 'manifest:media-type="image/%(ext)s" ' 'manifest:full-path="Pictures/%(filename)s"/>\n' ) % locals() image.open() dest.writestr('Pictures/%s' % filename, image.read()) image.close() manifest_data += '</manifest:manifest>' dest.writestr('META-INF/manifest.xml', manifest_data) source.close() dest.close() if source_extension[1:] != output_format: results = Queue() convertor = Process(target=_convert_subprocess, args=(str(dest_file.name), output_format, results)) convertor.start() return results.get() else: return dest_file.name