我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用django.utils.encoding.force_bytes()。
def test_send_set_password_email(staff_user): site = Site.objects.get_current() ctx = {'protocol': 'http', 'domain': site.domain, 'site_name': site.name, 'uid': urlsafe_base64_encode(force_bytes(staff_user.pk)), 'token': default_token_generator.make_token(staff_user)} send_templated_mail(template_name='dashboard/staff/set_password', from_email=DEFAULT_FROM_EMAIL, recipient_list=[staff_user.email], context=ctx) assert len(mail.outbox) == 1 generated_link = ('http://%s/account/password/reset/%s/%s/' % (ctx['domain'], ctx['uid'].decode('utf-8'), ctx['token'])) sended_message = mail.outbox[0].body assert generated_link in sended_message
def post_stats(request, response, data): es_url_template = getattr(settings, 'CAVALRY_ELASTICSEARCH_URL_TEMPLATE', None) if not es_url_template: return payload = build_payload(data, request, response) es_url = es_url_template.format_map( dict( payload, ymd=datetime.utcnow().strftime('%Y-%m-%d'), ), ) body = force_bytes(json.dumps(payload, cls=PayloadJSONEncoder)) try: resp = sess.post(es_url, data=body, headers={'Content-Type': 'application/json'}, timeout=0.5) if resp.status_code != 201: log.warning('Unable to post data to %s (error %s): %s', es_url, resp.status_code, resp.text) except Exception as e: log.warning('Unable to post data to %s: %s', es_url, e)
def cast_primitive_value(spec, value): format = spec.get('format') type = spec.get('type') if type == 'boolean': return (force_text(value).lower() in ('1', 'yes', 'true')) if type == 'integer' or format in ('integer', 'long'): return int(value) if type == 'number' or format in ('float', 'double'): return float(value) if format == 'byte': # base64 encoded characters return base64.b64decode(value) if format == 'binary': # any sequence of octets return force_bytes(value) if format == 'date': # ISO8601 date return iso8601.parse_date(value).date() if format == 'dateTime': # ISO8601 datetime return iso8601.parse_date(value) if type == 'string': return force_text(value) return value
def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None): """ Reverse of dumps(), raises BadSignature if signature fails. The serializer is expected to accept a bytestring. """ # TimestampSigner.unsign always returns unicode but base64 and zlib # compression operate on bytes. base64d = force_bytes(TimestampSigner(key, salt=salt).unsign(s, max_age=max_age)) decompress = False if base64d[:1] == b'.': # It's compressed; uncompress it first base64d = base64d[1:] decompress = True data = b64_decode(base64d) if decompress: data = zlib.decompress(data) return serializer().loads(data)
def encode(self, password, salt): bcrypt = self._load_library() # Need to reevaluate the force_bytes call once bcrypt is supported on # Python 3 # Hash the password prior to using bcrypt to prevent password truncation # See: https://code.djangoproject.com/ticket/20138 if self.digest is not None: # We use binascii.hexlify here because Python3 decided that a hex encoded # bytestring is somehow a unicode. password = binascii.hexlify(self.digest(force_bytes(password)).digest()) else: password = force_bytes(password) data = bcrypt.hashpw(password, salt) return "%s$%s" % (self.algorithm, force_text(data))
def groups_for_user(environ, username): """ Authorizes a user based on groups """ UserModel = auth.get_user_model() db.reset_queries() try: try: user = UserModel._default_manager.get_by_natural_key(username) except UserModel.DoesNotExist: return [] if not user.is_active: return [] return [force_bytes(group.name) for group in user.groups.all()] finally: db.close_old_connections()
def decode(self, session_data): encoded_data = base64.b64decode(force_bytes(session_data)) try: # could produce ValueError if there is no ':' hash, serialized = encoded_data.split(b':', 1) expected_hash = self._hash(serialized) if not constant_time_compare(hash.decode(), expected_hash): raise SuspiciousSession("Session data corrupted") else: return self.serializer().loads(serialized) except Exception as e: # ValueError, SuspiciousOperation, unpickling exceptions. If any of # these happen, just return an empty dictionary (an empty session). if isinstance(e, SuspiciousOperation): logger = logging.getLogger('django.security.%s' % e.__class__.__name__) logger.warning(force_text(e)) return {}
def encode_file(boundary, key, file): to_bytes = lambda s: force_bytes(s, settings.DEFAULT_CHARSET) filename = os.path.basename(file.name) if hasattr(file, 'name') else '' if hasattr(file, 'content_type'): content_type = file.content_type elif filename: content_type = mimetypes.guess_type(filename)[0] else: content_type = None if content_type is None: content_type = 'application/octet-stream' if not filename: filename = key return [ to_bytes('--%s' % boundary), to_bytes('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename)), to_bytes('Content-Type: %s' % content_type), b'', to_bytes(file.read()) ]
def salted_hmac(key_salt, value, secret=None): """ Returns the HMAC-SHA1 of 'value', using a key generated from key_salt and a secret (which defaults to settings.SECRET_KEY). A different key_salt should be passed in for every application of HMAC. """ if secret is None: secret = settings.SECRET_KEY key_salt = force_bytes(key_salt) secret = force_bytes(secret) # We need to generate a derived key from our base key. We can do this by # passing the key_salt and our base key through a pseudo-random function and # SHA1 works nicely. key = hashlib.sha1(key_salt + secret).digest() # If len(key_salt + secret) > sha_constructor().block_size, the above # line is redundant and could be replaced by key = key_salt + secret, since # the hmac module does the same thing for keys longer than the block size. # However, we need to ensure that we *always* do this. return hmac.new(key, msg=force_bytes(value), digestmod=hashlib.sha1)
def _check_query(self, query, country=False, city=False, city_or_country=False): "Helper routine for checking the query and database availability." # Making sure a string was passed in for the query. if not isinstance(query, six.string_types): raise TypeError('GeoIP query must be a string, not type %s' % type(query).__name__) # Extra checks for the existence of country and city databases. if city_or_country and not (self._country or self._city): raise GeoIPException('Invalid GeoIP country and city data files.') elif country and not self._country: raise GeoIPException('Invalid GeoIP country data file: %s' % self._country_file) elif city and not self._city: raise GeoIPException('Invalid GeoIP city data file: %s' % self._city_file) # Return the query string back to the caller. GeoIP only takes bytestrings. return force_bytes(query)
def groups_for_user(environ, username): """ Authorizes a user based on groups """ db.reset_queries() try: try: user = UserModel._default_manager.get_by_natural_key(username) except UserModel.DoesNotExist: return [] if not user.is_active: return [] return [force_bytes(group.name) for group in user.groups.all()] finally: db.close_old_connections()
def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None): """ Reverse of dumps(), raises BadSignature if signature fails. The serializer is expected to accept a bytestring. """ # TimestampSigner.unsign always returns unicode but base64 and zlib # compression operate on bytes. base64d = force_bytes( TimestampSigner(key, salt=salt).unsign(s, max_age=max_age)) decompress = False if base64d[:1] == b'.': # It's compressed; uncompress it first base64d = base64d[1:] decompress = True data = b64_decode(base64d) if decompress: data = zlib.decompress(data) return serializer().loads(data)
def pbkdf2(password, salt, iterations, dklen=0, digest=None): """ Implements PBKDF2 with the same API as Django's existing implementation, using cryptography. :type password: any :type salt: any :type iterations: int :type dklen: int :type digest: cryptography.hazmat.primitives.hashes.HashAlgorithm """ if digest is None: digest = settings.CRYPTOGRAPHY_DIGEST if not dklen: dklen = digest.digest_size password = force_bytes(password) salt = force_bytes(salt) kdf = PBKDF2HMAC( algorithm=digest, length=dklen, salt=salt, iterations=iterations, backend=settings.CRYPTOGRAPHY_BACKEND) return kdf.derive(password)
def send_reset_password_email(request, user): from_email = settings.DEFAULT_FROM_EMAIL current_site = get_current_site(request) site_name = current_site.name domain = current_site.domain token_generator = default_token_generator use_https = request.is_secure() context = { 'email': user.email, 'domain': domain, 'site_name': site_name, 'uid': urlsafe_base64_encode(force_bytes(user.pk)).decode(), 'user': user, 'token': token_generator.make_token(user), 'protocol': 'https' if use_https else 'http', } subject = loader.render_to_string('registration/password_reset_subject.txt', context) subject = ''.join(subject.splitlines()) body = loader.render_to_string('registration/password_reset_email.html', context) email_message = EmailMultiAlternatives(subject, body, from_email, [user.email]) email_message.send()
def _extarct_only_robot(xmlfile): """remove from file not robot's elements robot elements are: 'suite' 'statistics' 'errors' """ original_doc = ET.parse(xmlfile) root = original_doc.getroot() devices = root.find("devices") if devices is not None: root.remove(devices) source = StringIO(ET.tostring(root)) ets = ETSource(source) execution_result = ExecutionResultBuilder(ets).build(Result()) patched_file = File(BytesIO(force_bytes(source.getvalue())), name=xmlfile.name) return (execution_result, patched_file)
def md5(*bits): return _md5(':'.join((force_bytes(bit, errors='replace') for bit in bits)))
def form_valid(self, form): """??????????????.""" user = form.save() current_site = get_current_site(self.request) domain = current_site.domain subject_template = get_template( 'easy_regist/mailtemplate/new/subject.txt') message_template = get_template( 'easy_regist/mailtemplate/new/message.txt') context = { 'protocol': 'https' if self.request.is_secure() else 'http', 'domain': domain, 'uid': urlsafe_base64_encode(force_bytes(user.pk)), 'token': default_token_generator.make_token(user), 'user': user, } subject = subject_template.render(context) message = message_template.render(context) from_email = settings.EMAIL_HOST_USER to = [user.email] send_mail(subject, message, from_email, to) return super().form_valid(form)
def _hexdigest(cls, *args): h = hashlib.md5() if args: for a in args: h.update(force_bytes(a)) hexdigest = h.hexdigest()[:8] return hexdigest
def active_email(request, user): message = render_to_string('email/active_email.txt', { 'user': user, 'domain': request.get_host(), 'uid': urlsafe_base64_encode(force_bytes(user.pk)), 'token': account_activation_token.make_token(user), }) subject = 'Activate your blog account.' to_email = user.email send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, recipient_list=[to_email])
def write(self, content): if 'w' not in self._mode: raise AliyunOperationError("Operation write is not allowed.") self.file.write(force_bytes(content)) self._is_dirty = True self._is_read = True
def get_cookie_signer(salt='django.core.signing.get_cookie_signer'): Signer = import_string(settings.SIGNING_BACKEND) key = force_bytes(settings.SECRET_KEY) return Signer(b'django.http.cookies' + key, salt=salt)
def get(self, key, default=None, version=None): key = self.make_key(key, version=version) self.validate_key(key) db = router.db_for_read(self.cache_model_class) connection = connections[db] table = connection.ops.quote_name(self._table) with connection.cursor() as cursor: cursor.execute("SELECT cache_key, value, expires FROM %s " "WHERE cache_key = %%s" % table, [key]) row = cursor.fetchone() if row is None: return default expires = row[2] expression = models.Expression(output_field=models.DateTimeField()) for converter in (connection.ops.get_db_converters(expression) + expression.get_db_converters(connection)): expires = converter(expires, expression, connection, {}) if expires < timezone.now(): db = router.db_for_write(self.cache_model_class) connection = connections[db] with connection.cursor() as cursor: cursor.execute("DELETE FROM %s " "WHERE cache_key = %%s" % table, [key]) return default value = connection.ops.process_clob(row[1]) return pickle.loads(base64.b64decode(force_bytes(value)))
def _key_to_file(self, key, version=None): """ Convert a key into a cache file path. Basically this is the root cache path joined with the md5sum of the key and a suffix. """ key = self.make_key(key, version=version) self.validate_key(key) return os.path.join(self._dir, ''.join( [hashlib.md5(force_bytes(key)).hexdigest(), self.cache_suffix]))
def make_template_fragment_key(fragment_name, vary_on=None): if vary_on is None: vary_on = () key = ':'.join(urlquote(var) for var in vary_on) args = hashlib.md5(force_bytes(key)) return TEMPLATE_FRAGMENT_KEY_TEMPLATE % (fragment_name, args.hexdigest())
def verify(self, password, encoded): algorithm, data = encoded.split('$', 1) assert algorithm == self.algorithm encoded_2 = self.encode(password, force_bytes(data)) return constant_time_compare(encoded, encoded_2)
def harden_runtime(self, password, encoded): _, data = encoded.split('$', 1) salt = data[:29] # Length of the salt in bcrypt. rounds = data.split('$')[2] # work factor is logarithmic, adding one doubles the load. diff = 2**(self.rounds - int(rounds)) - 1 while diff > 0: self.encode(password, force_bytes(salt)) diff -= 1
def encode(self, password, salt): assert password is not None assert salt and '$' not in salt hash = hashlib.sha1(force_bytes(salt + password)).hexdigest() return "%s$%s$%s" % (self.algorithm, salt, hash)
def encode(self, password, salt): assert password is not None assert salt and '$' not in salt hash = hashlib.md5(force_bytes(salt + password)).hexdigest() return "%s$%s$%s" % (self.algorithm, salt, hash)
def encode(self, password, salt): assert salt == '' return hashlib.md5(force_bytes(password)).hexdigest()
def parse_rst(text, default_reference_context, thing_being_parsed=None): """ Convert the string from reST to an XHTML fragment. """ overrides = { 'doctitle_xform': True, 'inital_header_level': 3, "default_reference_context": default_reference_context, "link_base": reverse('django-admindocs-docroot').rstrip('/'), 'raw_enabled': False, 'file_insertion_enabled': False, } if thing_being_parsed: thing_being_parsed = force_bytes("<%s>" % thing_being_parsed) # Wrap ``text`` in some reST that sets the default role to ``cmsreference``, # then restores it. source = """ .. default-role:: cmsreference %s .. default-role:: """ parts = docutils.core.publish_parts(source % text, source_path=thing_being_parsed, destination_path=None, writer_name='html', settings_overrides=overrides) return mark_safe(parts['fragment']) # # reST roles #
def hash_key(self, name): key = hashlib.md5(force_bytes(self.clean_name(name))).hexdigest() return 'staticfiles:%s' % key
def __init__(self, dr_input): """ Initializes an GDAL/OGR driver on either a string or integer input. """ if isinstance(dr_input, six.string_types): # If a string name of the driver was passed in self.ensure_registered() # Checking the alias dictionary (case-insensitive) to see if an # alias exists for the given driver. if dr_input.lower() in self._alias: name = self._alias[dr_input.lower()] else: name = dr_input # Attempting to get the GDAL/OGR driver by the string name. for iface in (vcapi, rcapi): driver = iface.get_driver_by_name(force_bytes(name)) if driver: break elif isinstance(dr_input, int): self.ensure_registered() for iface in (vcapi, rcapi): driver = iface.get_driver(dr_input) if driver: break elif isinstance(dr_input, c_void_p): driver = dr_input else: raise GDALException('Unrecognized input type for GDAL/OGR Driver: %s' % str(type(dr_input))) # Making sure we get a valid pointer to the OGR Driver if not driver: raise GDALException('Could not initialize GDAL/OGR Driver on input: %s' % str(dr_input)) self.ptr = driver
def test_capability(self, capability): """ Returns a bool indicating whether the this Layer supports the given capability (a string). Valid capability strings include: 'RandomRead', 'SequentialWrite', 'RandomWrite', 'FastSpatialFilter', 'FastFeatureCount', 'FastGetExtent', 'CreateField', 'Transactions', 'DeleteFeature', and 'FastSetNextByIndex'. """ return bool(capi.test_capability(self.ptr, force_bytes(capability)))
def index(self, field_name): "Returns the index of the given field name." i = capi.get_field_index(self.ptr, force_bytes(field_name)) if i < 0: raise OGRIndexError('invalid OFT field name given: "%s"' % field_name) return i
def attr_value(self, target, index=0): """ The attribute value for the given target node (e.g. 'PROJCS'). The index keyword specifies an index of the child node to return. """ if not isinstance(target, six.string_types) or not isinstance(index, int): raise TypeError return capi.get_attr_value(self.ptr, force_bytes(target), index)