我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.conf.settings.SECRET_KEY。
def salted_hmac(key_salt, value, secret=None): """ Returns the HMAC-SHA1 of 'value', using a key generated from key_salt and a secret (which defaults to settings.SECRET_KEY). A different key_salt should be passed in for every application of HMAC. """ if secret is None: secret = settings.SECRET_KEY key_salt = force_bytes(key_salt) secret = force_bytes(secret) # We need to generate a derived key from our base key. We can do this by # passing the key_salt and our base key through a pseudo-random function and # SHA1 works nicely. key = hashlib.sha1(key_salt + secret).digest() # If len(key_salt + secret) > sha_constructor().block_size, the above # line is redundant and could be replaced by key = key_salt + secret, since # the hmac module does the same thing for keys longer than the block size. # However, we need to ensure that we *always* do this. return hmac.new(key, msg=force_bytes(value), digestmod=hashlib.sha1)
def process_request(self, request): url = match(r'^/django_dev_protector/$', request.path) if url and request.method == 'POST': import json data = json.loads(request.body.decode('utf-8')) if data['key'] == settings.SECRET_KEY: from .setup import save_status environ[PROTECT_STATUS_VARIABLE] = str(data['status']) save_status(data['status']) return redirect('/') if environ.get(PROTECT_STATUS_VARIABLE) == 'True': from django.shortcuts import render return render(request, TEMPLATE_NAME, { 'redirect_url': REDIRECT_URL })
def get_user(request): expiration = settings.COOKIE_EXPIRES token = request.GET.get('token') sso_dict = {} if token: token_confirm = Token(settings.SECRET_KEY) try: username = token_confirm.confirm_validate_token(token, expiration=expiration) ret = User.objects.filter(username=username) if ret: sso_dict['username'] = ret[0].username sso_dict['email'] = ret[0].email sso_dict['cn'] = ret[0].last_name except Exception as e: sso_dict['error'] = 'token error' else: sso_dict['error'] = 'args error' return HttpResponse(json.dumps(sso_dict))
def logout(request): """ Logout a user """ try: token = request.environ['HTTP_X_API_TOKEN'] except (KeyError, IndexError, TypeError): raise BadRequest('Missing HTTP X-Api-Token header') try: data = jwt.decode(token, settings.SECRET_KEY) data = json.loads(CRYPTO.decrypt(str(data['data']))) user = User.objects.get(id=data['id']) user.last_login = datetime.fromtimestamp(0) user.save() return {'message': 'Logged out'} except (utils.CryptoException, KeyError, jwt.DecodeError, jwt.ExpiredSignature, User.DoesNotExist): raise BadRequest('Invalid token')
def post(self, request, pk): user = User.objects.get(id=pk) sign = hashlib.md5(user.email + settings.SECRET_KEY).hexdigest() url = urlparse.ParseResult( scheme=request.scheme, netloc=urlparse.urlparse(request.get_raw_uri()).netloc, path=reverse(('core:SetPassword')), params='', query = urllib.urlencode({'email': user.email, 'sign': sign}), fragment='', ).geturl() msg = EmailMultiAlternatives( subject='??????', body=get_template('users/user_email_activate.html').render({'url': url}), from_email=settings.EMAIL_HOST_USER, to=[user.email,], ) msg.content_subtype = 'html' status = msg.send(fail_silently=True) response = '??????' if status else '??????, ???' return HttpResponse(response)
def get_url_protection_options(user=None): defaults = { 'TOKEN_LENGTH': 20, 'SIGNING_KEY': settings.SECRET_KEY, 'SIGNING_SALT': 'qr_code_url_protection_salt', 'ALLOWS_EXTERNAL_REQUESTS_FOR_REGISTERED_USER': False, 'ALLOWS_EXTERNAL_REQUESTS': False } options = defaults if hasattr(settings, 'QR_CODE_URL_PROTECTION') and isinstance(settings.QR_CODE_URL_PROTECTION, dict): options.update(settings.QR_CODE_URL_PROTECTION) # Evaluate the callable if required. if callable(options['ALLOWS_EXTERNAL_REQUESTS_FOR_REGISTERED_USER']): options['ALLOWS_EXTERNAL_REQUESTS'] = user and options['ALLOWS_EXTERNAL_REQUESTS_FOR_REGISTERED_USER'](user) elif options['ALLOWS_EXTERNAL_REQUESTS_FOR_REGISTERED_USER'] and user: if callable(user.is_authenticated): # Django version < 1.10 options['ALLOWS_EXTERNAL_REQUESTS'] = user.is_authenticated() else: # Django version >= 1.10 options['ALLOWS_EXTERNAL_REQUESTS'] = user.is_authenticated else: options['ALLOWS_EXTERNAL_REQUESTS'] = False return options
def post(self, request): serializer = LoginSerializer(data=request.data) if serializer.is_valid(): employee = Employee.objects.get(emp_id=request.data.get('emp_id')) encode = jwt.encode({'emp_id': employee.emp_id, 'auth': employee.auth, 'part_id': employee.part_id, 'create_time': time(), 'ip_addr': request.META.get('REMOTE_ADDR') }, settings.SECRET_KEY, algorithm='HS256') token = dict() token['token'] = 'JWT ' + encode return Response(token) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def handle(self, **kwargs): self.write('Bootstrapping Promgen') if not os.path.exists(settings.CONFIG_DIR): self.write('Creating config directory {} ', settings.CONFIG_DIR) os.makedirs(settings.CONFIG_DIR) if not os.path.exists(settings.PROMGEN_CONFIG): path = os.path.join(settings.BASE_DIR, 'promgen', 'tests', 'examples', 'promgen.yml') self.write('Creating promgen config {} from {}', settings.PROMGEN_CONFIG, path) shutil.copy(path, settings.PROMGEN_CONFIG) self.write_setting('SECRET_KEY', default=settings.SECRET_KEY) self.write_setting('DATABASE_URL', test=dj_database_url.parse) # Schemes based on list of supported brokers # http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html self.write_setting('CELERY_BROKER_URL', test=URLValidator(schemes=['redis', 'amqp', 'sqs']))
def get(self, request, *args, **kwargs): if request.GET.get('key'): serializer = URLSafeTimedSerializer(settings.SECRET_KEY) try: user_id = serializer.loads( request.GET.get('key'), max_age=60 * 2, # Signature expires after 2 minutes ) user = get_object_or_404(User, id=user_id) user.backend = 'django.contrib.auth.backends.ModelBackend' login(request, user) return redirect('home') except (BadSignature, BadTimeSignature): return redirect('login') return super().get(request, *args, **kwargs)
def form_valid(self, form): email = form.cleaned_data['email'] user = User.objects.get(username=email) safe = URLSafeTimedSerializer(settings.SECRET_KEY) url = '{site}{path}?key={key}'.format( site=settings.SITE_URL, path=reverse('login'), key=safe.dumps(user.id), ) send_mail( _('Link to login into the Knowledge Base'), url, settings.DEFAULT_FROM_EMAIL, [email], fail_silently=False, html_message=render_to_string( 'login_email.html', {'url': url} ), ) return redirect('home')
def _authenticate_credentials(self, request, token): """ Try to authenticate the given credentials. If authentication is successful, return the user and token. If not, throw an error. """ try: payload = jwt.decode(token, settings.SECRET_KEY) except: msg = 'Invalid authentication. Could not decode token.' raise exceptions.AuthenticationFailed(msg) try: user = User.objects.get(pk=payload['id']) except User.DoesNotExist: msg = 'No user matching this token was found.' raise exceptions.AuthenticationFailed(msg) if not user.is_active: msg = 'This user has been deactivated.' raise exceptions.AuthenticationFailed(msg) return (user, token)
def save(self, *args, **kwargs): """save""" try: int(self.gender) except ValueError: self.gender = 0 super(Contact, self).save(*args, **kwargs) if not self.uuid: ascii_name = unicodedata.normalize('NFKD', unicode(self.fullname)).encode("ascii", 'ignore') name = u'{0}-contact-{1}-{2}-{3}'.format(project_settings.SECRET_KEY, self.id, ascii_name, self.email) name = unicodedata.normalize('NFKD', unicode(name)).encode("ascii", 'ignore') self.uuid = unicode(uuid.uuid5(uuid.NAMESPACE_URL, name)) return super(Contact, self).save() if self.entity.is_single_contact: #force the entity name for ordering self.entity.save()
def auth_return(request): if not xsrfutil.validate_token(settings.SECRET_KEY, str(request.GET['state']), request.user.username): return HttpResponseBadRequest() credential = FLOW.step2_exchange(request.GET) http = httplib2.Http() http = credential.authorize(http) resp, data = http.request("https://beam.pro/api/v1/users/current") data = json.loads(data) print data channelId = None if 'channel' in data: channelId = data['channel']['id'] name = data['channel'].get("name") or "(unnamed)" internal_label = "%s-%s" % (channelId, name) ac = BeamAppCreds(user=request.user, label=internal_label) ac.save() storage = Storage(BeamCredentialsModel, 'id', ac, 'credential') storage.put(credential) pu = BeamUpdate(credentials=ac, user=request.user, type="beam") pu.save() return HttpResponseRedirect("/beam/")
def auth_return(request): if not xsrfutil.validate_token(settings.SECRET_KEY, str(request.GET['state']), request.user.username): return HttpResponseBadRequest() credential = FLOW.step2_exchange(request.GET) http = httplib2.Http() http = credential.authorize(http) resp, data = http.request("https://api.patreon.com/oauth2/api/current_user") data = json.loads(data) name = data['data']['attributes'].get("full_name") or "(unnamed)" internal_label = "%s-%s" % (request.user.id, name) ac = PatreonAppCreds(user=request.user, label=internal_label) ac.save() storage = Storage(PatreonCredentialsModel, 'id', ac, 'credential') storage.put(credential) pu = PatreonUpdate(credentials=ac, user=request.user, type="patreon") pu.save() return HttpResponseRedirect("/patreon/")
def get_cookie_signer(salt='django.core.signing.get_cookie_signer'): Signer = import_string(settings.SIGNING_BACKEND) key = force_bytes(settings.SECRET_KEY) return Signer(b'django.http.cookies' + key, salt=salt)
def dumps(obj, key=None, salt='django.core.signing', serializer=JSONSerializer, compress=False): """ Returns URL-safe, sha1 signed base64 compressed JSON string. If key is None, settings.SECRET_KEY is used instead. If compress is True (not the default) checks if compressing using zlib can save some space. Prepends a '.' to signify compression. This is included in the signature, to protect against zip bombs. Salt can be used to namespace the hash, so that a signed string is only valid for a given namespace. Leaving this at the default value or re-using a salt value across different parts of your application without good cause is a security risk. The serializer is expected to return a bytestring. """ data = serializer().dumps(obj) # Flag for if it's been compressed or not is_compressed = False if compress: # Avoid zlib dependency unless compress is being used compressed = zlib.compress(data) if len(compressed) < (len(data) - 1): data = compressed is_compressed = True base64d = b64_encode(data) if is_compressed: base64d = b'.' + base64d return TimestampSigner(key, salt=salt).sign(base64d)
def __init__(self, key=None, sep=':', salt=None): # Use of native strings in all versions of Python self.key = key or settings.SECRET_KEY self.sep = force_str(sep) if _SEP_UNSAFE.match(self.sep): warnings.warn('Unsafe Signer separator: %r (cannot be empty or consist of only A-z0-9-_=)' % sep, RemovedInDjango110Warning) self.salt = force_str(salt or '%s.%s' % (self.__class__.__module__, self.__class__.__name__))
def check_secret_key(app_configs, **kwargs): passed_check = ( getattr(settings, 'SECRET_KEY', None) and len(set(settings.SECRET_KEY)) >= SECRET_KEY_MIN_UNIQUE_CHARACTERS and len(settings.SECRET_KEY) >= SECRET_KEY_MIN_LENGTH ) return [] if passed_check else [W009]
def __init__(self, key=None, sep=':', salt=None): # Use of native strings in all versions of Python self.key = key or settings.SECRET_KEY self.sep = force_str(sep) if _SEP_UNSAFE.match(self.sep): raise ValueError( 'Unsafe Signer separator: %r (cannot be empty or consist of ' 'only A-z0-9-_=)' % sep, ) self.salt = force_str(salt or '%s.%s' % (self.__class__.__module__, self.__class__.__name__))
def encode(the_id, sub_key): assert 0 <= the_id < 2 ** 64 crc = binascii.crc32(bytes(the_id)) & 0xffffffff message = struct.pack(b"<IQxxxx", crc, the_id) assert len(message) == 16 key = settings.SECRET_KEY iv = hashlib.sha256((key + sub_key).encode('ascii')).digest()[:16] cypher = AES.new(key[:32], AES.MODE_CBC, iv) eid = base64.urlsafe_b64encode(cypher.encrypt(message)).replace(b"=", b"") return eid.decode('utf-8')
def decode(e, sub_key): if isinstance(e, basestring): e = bytes(e.encode("ascii")) try: padding = (3 - len(e) % 3) * b"=" e = base64.urlsafe_b64decode(e + padding) except (TypeError, AttributeError, binascii.Error): raise EncryptedIDDecodeError() for key in getattr(settings, "SECRET_KEYS", [settings.SECRET_KEY]): iv = hashlib.sha256((key + sub_key).encode('ascii')).digest()[:16] cypher = AES.new(key[:32], AES.MODE_CBC, iv) try: msg = cypher.decrypt(e) except ValueError: raise EncryptedIDDecodeError() try: crc, the_id = struct.unpack(b"<IQxxxx", msg) except struct.error: raise EncryptedIDDecodeError() try: if crc != binascii.crc32(bytes(the_id)) & 0xffffffff: continue except (MemoryError, OverflowError): raise EncryptedIDDecodeError() return the_id raise EncryptedIDDecodeError("Failed to decrypt, CRC never matched.")
def _sign(self, q, a, expires): plain = [getattr(settings, 'SITE_URL', ''), settings.SECRET_KEY, q, a, expires] plain = "".join([str(p) for p in plain]) return sha1(plain).hexdigest()
def test_block_the_server(self): """ Blocks server by request """ response = self.client.post('/django_dev_protector/', json.dumps({ 'key': settings.SECRET_KEY, 'status': True }), content_type='application/json') self.assertEqual(get_status(), 'True')
def test_is_server_blocked(self): response = self.client.post('/django_dev_protector/', json.dumps({ 'key': settings.SECRET_KEY, 'status': True }), content_type='application/json') response = self.client.get('/') self.assertEqual(get_status(), 'True') self.assertContains(response, 'The work was not paid.')
def test_unblock_the_server(self): """ Unblocks server by request """ response = self.client.post('/django_dev_protector/', json.dumps({ 'key': settings.SECRET_KEY, 'status': False }), content_type='application/json') self.assertEqual(get_status(), 'False') self.assertEqual(os.environ[PROTECT_STATUS_VARIABLE], 'False')
def test_is_server_unblocked(self): response = self.client.post('/django_dev_protector/', json.dumps({ 'key': settings.SECRET_KEY, 'status': False }), content_type='application/json') response = self.client.get('/') self.assertEqual(get_status(), 'False') self.assertContains(response, 'works!')
def check_signature(username, tag, sig): ours = base64_hmac(str(username), tag, settings.SECRET_KEY) ours = ours[:8].decode("utf-8") return ours == sig
def get_badge_url(username, tag): sig = base64_hmac(str(username), tag, settings.SECRET_KEY) url = reverse("hc-badge", args=[username, sig[:8], tag]) return settings.SITE_ROOT + url
def test_it_returns_svg(self): sig = base64_hmac(str(self.alice.username), "foo", settings.SECRET_KEY) sig = sig[:8].decode("utf-8") url = "/badge/%s/%s/foo.svg" % (self.alice.username, sig) r = self.client.get(url) ### Assert that the svg is returned self.assertContains(r, "svg")
def make_token(self): seed = "%s%s" % (self.code, settings.SECRET_KEY) seed = seed.encode("utf8") return hashlib.sha1(seed).hexdigest()
def login(request): title = '????' user = request.user passwd_url = settings.PASSWD_URL show_captcha = settings.SHOW_CAPTCHA back = request.GET.get('back') if not back: back = '/' if request.method == 'POST': username = request.POST.get('username') password = request.POST.get('password') code = request.POST.get('code') if show_captcha: if not Captcha(request).check(code): result = '?????' return render_to_response('sso/login.html',locals()) user = auth.authenticate(username=username,password=password) if user is not None: auth.login(request,user) token_confirm = Token(settings.SECRET_KEY) token_key = '%s' % username token = token_confirm.generate_validate_token(token_key) #redirect_uri = '%s?token=%s' % (back, token) #return HttpResponseRedirect(redirect_uri) response = HttpResponseRedirect(back) response.set_cookie('sso_token', token, settings.COOKIE_EXPIRES,domain=settings.SESSION_COOKIE_DOMAIN) return response else: result = '????????' return render_to_response('sso/login.html',locals())
def __init__(self): self._salt = settings.SECRET_KEY self._kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=self._salt, iterations=100000, backend=default_backend() ) self._key = base64.urlsafe_b64encode(self._kdf.derive(settings.SECRET_KEY)) self._fernet = Fernet(self._key)
def auth(body): """ Login/password based auth if success, generates HMAC512 based token """ username = body['name'] password = body['password'] user = authenticate(username=username, password=password) if user is not None and user.is_active: user = User.objects.get_or_create(username=username)[0] user.last_login = datetime.now() user.save() data = { 'id': user.id, 'rand': b64encode(os.urandom(64)).decode('utf-8') } token = jwt.encode( { 'data': CRYPTO.encrypt(json.dumps(data)), 'exp': datetime.utcnow() + timedelta(days=1), }, settings.SECRET_KEY, algorithm='HS512' ) return True, {'token': token} return False, 'Invalid username or password'
def generate_key(user, for_subscription=True): # not really a proper use of salts # but meh salt = _get_salt(for_subscription) signer = TimestampSigner(settings.SECRET_KEY, salt=salt) return signer.sign(str(user.id))