我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.exceptions.SuspiciousOperation()。
def record_factory(zone, created=None, **validated_data): record_type = validated_data.pop('type') if record_type == POLICY_ROUTED: assert len(validated_data['values']) == 1 policy_id = validated_data['values'][0] try: policy = models.Policy.objects.get(id=policy_id) except models.Policy.DoesNotExist: raise SuspiciousOperation("Policy {} does not exists.".format( policy_id)) record_model = models.PolicyRecord.new_or_deleted(name=validated_data['name'], zone=zone) obj = PolicyRecord( policy_record=record_model, zone=zone.r53_zone, policy=policy, dirty=True, created=created, ) else: obj = Record(zone=zone.r53_zone, type=record_type, created=created, **validated_data) return obj
def _normalize_name(self, name): """ Normalizes the name so that paths like /path/to/ignored/../foo.txt work. We check to make sure that the path pointed to is not outside the directory specified by the LOCATION setting. """ base_path = force_text(self.location) base_path = base_path.rstrip('/') final_path = urljoin(base_path.rstrip('/') + "/", name) base_path_len = len(base_path) if (not final_path.startswith(base_path) or final_path[base_path_len:base_path_len + 1] not in ('', '/')): raise SuspiciousOperation("Attempted access to '%s' denied." % name) return final_path.lstrip('/')
def decode(self, session_data): encoded_data = base64.b64decode(force_bytes(session_data)) try: # could produce ValueError if there is no ':' hash, serialized = encoded_data.split(b':', 1) expected_hash = self._hash(serialized) if not constant_time_compare(hash.decode(), expected_hash): raise SuspiciousSession("Session data corrupted") else: return self.serializer().loads(serialized) except Exception as e: # ValueError, SuspiciousOperation, unpickling exceptions. If any of # these happen, just return an empty dictionary (an empty session). if isinstance(e, SuspiciousOperation): logger = logging.getLogger('django.security.%s' % e.__class__.__name__) logger.warning(force_text(e)) return {}
def load(self): session_data = {} try: with open(self._key_to_file(), "rb") as session_file: file_data = session_file.read() # Don't fail if there is no data in the session file. # We may have opened the empty placeholder file. if file_data: try: session_data = self.decode(file_data) except (EOFError, SuspiciousOperation) as e: if isinstance(e, SuspiciousOperation): logger = logging.getLogger('django.security.%s' % e.__class__.__name__) logger.warning(force_text(e)) self.create() # Remove expired sessions. expiry_age = self.get_expiry_age(expiry=self._expiry_date(session_data)) if expiry_age <= 0: session_data = {} self.delete() self.create() except (IOError, SuspiciousOperation): self._session_key = None return session_data
def load(self): try: data = self._cache.get(self.cache_key) except Exception: # Some backends (e.g. memcache) raise an exception on invalid # cache keys. If this happens, reset the session. See #17810. data = None if data is None: # Duplicate DBStore.load, because we need to keep track # of the expiry date to set it properly in the cache. try: s = self.model.objects.get( session_key=self.session_key, expire_date__gt=timezone.now() ) data = self.decode(s.session_data) self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date)) except (self.model.DoesNotExist, SuspiciousOperation) as e: if isinstance(e, SuspiciousOperation): logger = logging.getLogger('django.security.%s' % e.__class__.__name__) logger.warning(force_text(e)) self._session_key = None data = {} return data
def verify_step2(request, uidb64, token): bytes_uid = urlsafe_base64_decode(uidb64) try: uid = int(bytes_uid) except ValueError: raise SuspiciousOperation('verify_step2 received invalid base64 user ID: {}'.format( bytes_uid)) if uid != request.user.id: raise PermissionDenied('UID mismatch - user is {}, request was for {}'.format( request.user.id, uid)) user = get_object_or_404(models.User, pk=uid) if not verify_token_generator.check_token(user, token): raise Http404('token invalid') if not user.email_verified: user.email_verified = True user.save() messages.success(request, _('Your email has been verified successfully. Thanks!')) else: messages.info(request, _('Your email address has already been verified.')) return redirect('index')
def ensure_safe_url(url, allowed_protocols=None, allowed_host=None, raise_on_fail=False): if allowed_protocols is None: allowed_protocols = ["http", "https"] parsed = urlparse.urlparse(url) # perform security checks to ensure no malicious intent # (i.e., an XSS attack with a data URL) safe = True if parsed.scheme and parsed.scheme not in allowed_protocols: if raise_on_fail: raise SuspiciousOperation("Unsafe redirect to URL with protocol '%s'" % parsed.scheme) safe = False if allowed_host and parsed.netloc and parsed.netloc != allowed_host: if raise_on_fail: raise SuspiciousOperation("Unsafe redirect to URL not matching host '%s'" % allowed_host) safe = False return safe
def ensure_safe_url(url, allowed_protocols=None, allowed_host=None, raise_on_fail=False): if allowed_protocols is None: allowed_protocols = ["http", "https"] parsed = urlparse(url) # perform security checks to ensure no malicious intent # (i.e., an XSS attack with a data URL) safe = True if parsed.scheme and parsed.scheme not in allowed_protocols: if raise_on_fail: raise SuspiciousOperation("Unsafe redirect to URL with protocol '{0}'".format(parsed.scheme)) safe = False if allowed_host and parsed.netloc and parsed.netloc != allowed_host: if raise_on_fail: raise SuspiciousOperation("Unsafe redirect to URL not matching host '{0}'".format(allowed_host)) safe = False return safe
def load(self): try: s = Session.objects.get( session_key=self.session_key, expire_date__gt=timezone.now() ) self.user_id = s.user_id # do not overwrite user_agent/ip, as those might have been updated if self.user_agent != s.user_agent or self.ip != s.ip: self.modified = True return self.decode(s.session_data) except (Session.DoesNotExist, SuspiciousOperation) as e: if isinstance(e, SuspiciousOperation): logger = logging.getLogger('django.security.%s' % e.__class__.__name__) logger.warning(force_text(e)) self.create() return {}
def test_build_invalid_zip(self): attachment_name = 'build.json' attachment_path = os.path.join(settings.MEDIA_ROOT, 'test_attachment') copyfile(os.path.join(FILES_PATH, 'invalid_archive.zip'), attachment_path) attachment = Attachment( name=attachment_name, buildable=True ) attachment.file = os.path.join('..', attachment_path) attachment.save() kwargs = { 'key': TRACKER_ATTACHMENT_EXECUTED, 'campaign_id': 1, 'target_id': 1, 'value': 'tracker: not opened', } tracker = Tracker.objects.create(**kwargs) with self.assertRaises(SuspiciousOperation): attachment.build(tracker)
def convert_exception_to_response(get_response): """ Wrap the given get_response callable in exception-to-response conversion. All exceptions will be converted. All known 4xx exceptions (Http404, PermissionDenied, MultiPartParserError, SuspiciousOperation) will be converted to the appropriate response, and all other exceptions will be converted to 500 responses. This decorator is automatically applied to all middleware to ensure that no middleware leaks an exception and that the next middleware in the stack can rely on getting a response instead of an exception. """ @wraps(get_response, assigned=available_attrs(get_response)) def inner(request): try: response = get_response(request) except Exception as exc: response = response_for_exception(request, exc) return response return inner
def skip_suspicious_operations(record): """Prevent django from sending 500 error email notifications for SuspiciousOperation events, since they are not true server errors, especially when related to the ALLOWED_HOSTS configuration background and more information: http://www.tiwoc.de/blog/2013/03/django-prevent-email-notification-on-susp\ iciousoperation/ """ if record.exc_info: exc_value = record.exc_info[1] if isinstance(exc_value, SuspiciousOperation): return False return True # A sample logging configuration. The only tangible logging # performed by this configuration is to send an email to # the site admins on every HTTP 500 error. # See http://docs.djangoproject.com/en/dev/topics/logging for # more details on how to customize your logging configuration.
def _verify_jws(self, payload, key): """Verify the given JWS payload with the given key and return the payload""" jws = JWS.from_compact(payload) jwk = JWK.load(key) if not jws.verify(jwk): msg = 'JWS token verification failed.' raise SuspiciousOperation(msg) try: alg = jws.signature.combined.alg.name if alg != self.OIDC_RP_SIGN_ALGO: msg = 'The specified alg value is not allowed' raise SuspiciousOperation(msg) except KeyError: msg = 'No alg value found in header' raise SuspiciousOperation(msg) return jws.payload