我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用django.conf.settings.DEBUG。
def send_share_notification(share_id): """Super simple you're content has been shared notification to a user.""" if settings.DEBUG: return try: content = Content.objects.get(id=share_id, content_type=ContentType.SHARE, share_of__local=True) except Content.DoesNotExist: logger.warning("No share content found with id %s", share_id) return content_url = "%s%s" % (settings.SOCIALHOME_URL, content.share_of.get_absolute_url()) subject = _("New share of: %s" % content.share_of.short_text_inline) context = get_common_context() context.update({ "subject": subject, "actor_name": content.author.name_or_handle, "actor_url": "%s%s" % (settings.SOCIALHOME_URL, content.author.get_absolute_url()), "content_url": content_url, "name": content.share_of.author.name_or_handle, }) send_mail( "%s%s" % (settings.EMAIL_SUBJECT_PREFIX, subject), render_to_string("notifications/share.txt", context=context), settings.DEFAULT_FROM_EMAIL, [content.share_of.author.user.email], fail_silently=False, html_message=render_to_string("notifications/share.html", context=context), )
def render_pdf(self, *args, **kwargs): """ Render the PDF and returns as bytes. :rtype: bytes """ html = self.render_html(*args, **kwargs) options = self.get_pdfkit_options() if 'debug' in self.request.GET and settings.DEBUG: options['debug-javascript'] = 1 kwargs = {} wkhtmltopdf_bin = os.environ.get('WKHTMLTOPDF_BIN') if wkhtmltopdf_bin: kwargs['configuration'] = pdfkit.configuration(wkhtmltopdf=wkhtmltopdf_bin) pdf = pdfkit.from_string(html, False, options, **kwargs) return pdf
def send_content(content_id): """Handle sending a Content object out via the federation layer. Currently we only deliver public content. """ try: content = Content.objects.get(id=content_id, visibility=Visibility.PUBLIC, content_type=ContentType.CONTENT, local=True) except Content.DoesNotExist: logger.warning("No local content found with id %s", content_id) return entity = make_federable_content(content) if entity: if settings.DEBUG: # Don't send in development mode return recipients = [ (settings.SOCIALHOME_RELAY_DOMAIN, "diaspora"), ] recipients.extend(_get_remote_followers(content.author)) handle_send(entity, content.author, recipients) else: logger.warning("send_content - No entity for %s", content)
def send_share(content_id): """Handle sending a share of a Content object to the federation layer. Currently we only deliver public shares. """ try: content = Content.objects.get(id=content_id, visibility=Visibility.PUBLIC, content_type=ContentType.SHARE, local=True) except Content.DoesNotExist: logger.warning("No local share found with id %s", content_id) return entity = make_federable_content(content) if entity: if settings.DEBUG: # Don't send in development mode return recipients = _get_remote_followers(content.author) if not content.share_of.local: # Send to original author recipients.append((content.share_of.author.handle, None)) handle_send(entity, content.author, recipients) else: logger.warning("send_share - No entity for %s", content)
def send_content_retraction(content, author_id): """Handle sending of retractions. Currently only for public content. """ if not content.visibility == Visibility.PUBLIC or not content.local: return author = Profile.objects.get(id=author_id) entity = make_federable_retraction(content, author) if entity: if settings.DEBUG: # Don't send in development mode return recipients = [ (settings.SOCIALHOME_RELAY_DOMAIN, "diaspora"), ] recipients.extend(_get_remote_followers(author)) handle_send(entity, author, recipients) else: logger.warning("send_content_retraction - No retraction entity for %s", content)
def forward_entity(entity, target_content_id): """Handle forwarding of an entity related to a target content. For example: remote replies on local content, remote shares on local content. Currently only for public content. """ try: target_content = Content.objects.get(id=target_content_id, visibility=Visibility.PUBLIC, local=True) except Content.DoesNotExist: logger.warning("forward_entity - No public local content found with id %s", target_content_id) return try: content = Content.objects.get(guid=entity.guid, visibility=Visibility.PUBLIC) except Content.DoesNotExist: logger.warning("forward_entity - No content found with guid %s", entity.guid) return if settings.DEBUG: # Don't send in development mode return recipients = _get_remote_participants_for_content(target_content, exclude=entity.handle) recipients.extend(_get_remote_followers(target_content.author, exclude=entity.handle)) handle_send(entity, content.author, recipients, parent_user=target_content.author)
def handle(self, *args, **kwargs): path = kwargs['path'] # With DEBUG on this will DIE. settings.DEBUG = False # figure out which path we want to use. years = ["2016", "2015", "2014", "2013", "2012", "2011"] directories = [('tl_%s_us_state' % year, year) for year in years] tiger_file = "" for (directory, year) in directories: if os.path.exists(os.path.join(path, directory)): print('Found %s files.' % year) tiger_file = os.path.join(path, directory + "/" + directory + ".shp") break if not tiger_file: print('Could not find files.') exit() print("Start States: %s" % datetime.datetime.now()) state_import(tiger_file, year) print("End States: %s" % datetime.datetime.now())
def handle(self, *args, **kwargs): path = kwargs['path'] # With DEBUG on this will DIE. settings.DEBUG = False # figure out which path we want to use. years = ["2016", "2015", "2014", "2013", "2012", "2011"] directories = [('tl_%s_us_county' % year, year) for year in years] tiger_file = "" for (directory, year) in directories: if os.path.exists(os.path.join(path, directory)): print('Found %s files.' % year) tiger_file = os.path.join(path, directory + "/" + directory + ".shp") break if not tiger_file: print('Could not find files.') exit() print("Start Counties: %s" % datetime.datetime.now()) county_import(tiger_file, year) print("End Counties: %s" % datetime.datetime.now())
def private_document(request, document_key): """ This is temp code. Hopefully I will make this function a lot better """ PRIVATE_MEDIA_ROOT = settings.PRIVATE_MEDIA_ROOT #Now get the document location and return that to the user. document_results=documents.objects.get(pk=document_key) path = PRIVATE_MEDIA_ROOT + '/' + document_results.document.name #path = '/home/luke/Downloads/gog_gods_will_be_watching_2.1.0.9.sh' """ Serve private files to users with read permission. """ #logger.debug('Serving {0} to {1}'.format(path, request.user)) #if not permissions.has_read_permission(request, path): # if settings.DEBUG: # raise PermissionDenied # else: # raise Http404('File not found') return server.serve(request, path=path)
def get_saml_settings(): base_url = '{scheme}://{domain}'.format( scheme=settings.SITES['front']['scheme'], domain=settings.SITES['front']['domain'], ) debug = settings.DEBUG saml_settings = dict(settings.SAML_AUTH) saml_settings['strict'] = settings.SAML_AUTH.get('strict', not debug) saml_settings['debug'] = settings.SAML_AUTH.get('debug', debug) del(saml_settings['mapping']) saml_settings['sp'].update({ 'entityId': base_url + reverse('taiga_contrib_saml_auth:metadata'), 'assertionConsumerService': { 'url': base_url + reverse('taiga_contrib_saml_auth:login_complete'), }, 'singleLogoutService': { 'url': base_url + reverse('taiga_contrib_saml_auth:logout_complete'), }, }) return saml_settings
def media(self): # taken from django.contrib.admin.options ModelAdmin extra = '' if settings.DEBUG else '.min' # if VERSION <= (1, 8): if StrictVersion(get_version()) < StrictVersion('1.9'): js = [ 'core.js', 'admin/RelatedObjectLookups.js', 'jquery%s.js' % extra, 'jquery.init.js', ] else: js = [ 'core.js', 'vendor/jquery/jquery%s.js' % extra, 'jquery.init.js', 'admin/RelatedObjectLookups.js', 'actions%s.js' % extra, 'urlify.js', 'prepopulate%s.js' % extra, 'vendor/xregexp/xregexp%s.js' % extra, ] return forms.Media(js=[static('admin/js/%s' % url) for url in js])
def user_register(message): if settings.DEBUG == True: return logger.debug('user_register task start - email: %s' % message['register_user_email']) user = User.objects.create_user(message['register_user_name'], message['register_user_email'], is_active = False) message['register_password'] = passwd_generator(size=25) user.set_password(message['register_password']) user.save() if 'slack_user_id' in message: user.contact = Contact(email = message['register_user_email'], slack_uid = message['slack_user_id']) user.contact.save() registration_link = "%s%s?username=%s&key=%s" % (settings.REGISTRATION_URL_PREFIX, reverse_lazy('register_activate'), message['register_user_name'], message['register_password']) SLACK_MESSAGE = "Hello %s! we've detected you are using our team's slack. please take a minute to activate you account in the following <%s|LINK>.\n (please use same email address you used to sign-up with Slack)" % (message['register_user_name'], registration_link) logger.debug('user_register sending slack activation message to slack_uid %s' % message['slack_user_id']) slack.chat.post_message(message['slack_user_id'], SLACK_MESSAGE, as_user=False, username=settings.SLACK_BOT_NAME, icon_url=settings.SLACK_BOT_ICON) message['registration_link'] = registration_link else: register_email(message) logger.debug('user_register task end - email: %s' % message['register_user_email'])
def _send_mail_or_error_page(subject, content, address, request): try: send_mail(subject, content, None, [address]) if settings.DEBUG: print(u"VALIDATION MAIL to {0}\nSubject: {1}\n{2}".format( address, subject, content)) except SMTPRecipientsRefused as e: wrong_email, (error_code, error_msg) = e.recipients.items()[0] unknown = 'User unknown' in error_msg if not unknown: error_email_content = u'{0}: {1}'.format(e.__class__.__name__, repr(e.recipients)) send_mail( _('Registration: Sending mail failed: {}'.format(address)), error_email_content, None, [settings.TEAM_EMAIL]) return TemplateResponse(request, 'registration/email_error.html', { 'unknown': unknown, 'error_code': error_code, 'error_msg': error_msg, 'recipient': wrong_email }) return redirect('registration_request_successful', address)
def process(self): if self.requires_action: if settings.DEBUG: self.setCode(getattr(self, str(self.request.method.lower() + "_" + self.action))()) else: try: self.setCode(getattr(self, str(self.request.method.lower() + "_" + self.action))()) except AttributeError: self.addError("not_found") self.setCode(status.HTTP_404_NOT_FOUND) else: try: self.setCode(getattr(self, str(self.request.method.lower() + "_process"))()) except AttributeError: self.addError("not_found") self.setCode(status.HTTP_404_NOT_FOUND)
def test_project_get_list_with_filters(self, kc): filters = {'name': 'Ni!'} request = self.mock_rest_request(**{'GET': dict(**filters)}) kc.tenant_list.return_value = ([ mock.Mock(**{'to_dict.return_value': {'name': 'Ni!'}}), mock.Mock(**{'to_dict.return_value': {'name': 'Ni!'}}) ], False) with mock.patch.object(settings, 'DEBUG', True): response = keystone.Projects().get(request) self.assertStatusCode(response, 200) self.assertEqual(response.json, {"has_more": False, "items": [{"name": "Ni!"}, {"name": "Ni!"}]}) kc.tenant_list.assert_called_once_with(request, paginate=False, marker=None, domain=None, user=None, admin=True, filters=filters)
def serve_protected_thumbnail(request, path): """ Serve protected thumbnails to authenticated users. If the user doesn't have read permissions, redirect to a static image. """ source_path = thumbnail_to_original_filename(path) if not source_path: raise Http404('File not found') try: file_obj = File.objects.get(file=source_path, is_public=False) except File.DoesNotExist: raise Http404('File not found') if not file_obj.has_read_permission(request): if settings.DEBUG: raise PermissionDenied else: raise Http404('File not found') try: thumbnail = ThumbnailFile(name=path, storage=file_obj.file.thumbnail_storage) return thumbnail_server.serve(request, thumbnail, save_as=False) except Exception: raise Http404('File not found')
def serve(request, path, insecure=False, **kwargs): """ Serve static files below a given point in the directory structure or from locations inferred from the staticfiles finders. To use, put a URL pattern such as:: from django.contrib.staticfiles import views url(r'^(?P<path>.*)$', views.serve) in your URLconf. It uses the django.views.static.serve() view to serve the found files. """ if not settings.DEBUG and not insecure: raise Http404 normalized_path = posixpath.normpath(unquote(path)).lstrip('/') absolute_path = finders.find(normalized_path) if not absolute_path: if path.endswith('/') or path == '': raise Http404("Directory indexes are not allowed here.") raise Http404("'%s' could not be found" % path) document_root, path = os.path.split(absolute_path) return static.serve(request, path, document_root=document_root, **kwargs)
def get_full_path_with_slash(self, request): """ Return the full path of the request with a trailing slash appended. Raise a RuntimeError if settings.DEBUG is True and request.method is POST, PUT, or PATCH. """ new_path = request.get_full_path(force_append_slash=True) if settings.DEBUG and request.method in ('POST', 'PUT', 'PATCH'): raise RuntimeError( "You called this URL via %(method)s, but the URL doesn't end " "in a slash and you have APPEND_SLASH set. Django can't " "redirect to the slash URL while maintaining %(method)s data. " "Change your form to point to %(url)s (note the trailing " "slash), or set APPEND_SLASH=False in your Django settings." % { 'method': request.method, 'url': request.get_host() + new_path, } ) return new_path
def process_response(self, request, response): """ Send broken link emails for relevant 404 NOT FOUND responses. """ if response.status_code == 404 and not settings.DEBUG: domain = request.get_host() path = request.get_full_path() referer = force_text(request.META.get('HTTP_REFERER', ''), errors='replace') if not self.is_ignorable_request(request, path, domain, referer): ua = force_text(request.META.get('HTTP_USER_AGENT', '<none>'), errors='replace') ip = request.META.get('REMOTE_ADDR', '<none>') mail_managers( "Broken %slink on %s" % ( ('INTERNAL ' if self.is_internal_request(domain, referer) else ''), domain ), "Referrer: %s\nRequested URL: %s\nUser agent: %s\n" "IP address: %s\n" % (referer, path, ua, ip), fail_silently=True) return response
def render(self, context): csrf_token = context.get('csrf_token') if csrf_token: if csrf_token == 'NOTPROVIDED': return format_html("") else: return format_html("<input type='hidden' name='csrfmiddlewaretoken' value='{}' />", csrf_token) else: # It's very probable that the token is missing because of # misconfiguration, so we raise a warning if settings.DEBUG: warnings.warn( "A {% csrf_token %} was used in a template, but the context " "did not provide the value. This is usually caused by not " "using RequestContext." ) return ''
def static(prefix, view=serve, **kwargs): """ Helper function to return a URL pattern for serving files in debug mode. from django.conf import settings from django.conf.urls.static import static urlpatterns = [ # ... the rest of your URLconf goes here ... ] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) """ # No-op if not in debug mode or an non-local prefix if not settings.DEBUG or (prefix and '://' in prefix): return [] elif not prefix: raise ImproperlyConfigured("Empty static prefix not permitted") return [ url(r'^%s(?P<path>.*)$' % re.escape(prefix.lstrip('/')), view, kwargs=kwargs), ]
def handle_uncaught_exception(request, resolver, exc_info): """ Processing for any otherwise uncaught exceptions (those that will generate HTTP 500 responses). """ if settings.DEBUG_PROPAGATE_EXCEPTIONS: raise logger.error( 'Internal Server Error: %s', request.path, exc_info=exc_info, extra={'status_code': 500, 'request': request}, ) if settings.DEBUG: return debug.technical_500_response(request, *exc_info) # If Http500 handler is not installed, reraise the last exception. if resolver.urlconf_module is None: six.reraise(*exc_info) # Return an HttpResponse that displays a friendly error message. callback, param_dict = resolver.resolve_error_handler(500) return callback(request, **param_dict)
def render(self, render_type, context): """ Renders the template :param render_type: the content type to render :param context: context data dictionary :return: the rendered content """ assert render_type in self.render_types, 'Invalid Render Type' try: content = render_to_string('herald/{}/{}.{}'.format( render_type, self.template_name, 'txt' if render_type == 'text' else render_type ), context) except TemplateDoesNotExist: content = None if settings.DEBUG: raise return content
def get_sanic_application(): """ Sets up django and returns a Sanic application """ if sys.version_info < (3, 5): raise RuntimeError("The SanicDjango Adaptor may only be used with python 3.5 and above.") django.setup() from django.conf import settings DEBUG = getattr(settings, 'DEBUG', False) INSTALLED_APPS = getattr(settings, 'INSTALLED_APPS', []) do_static = DEBUG and 'django.contrib.staticfiles' in INSTALLED_APPS app = Sanic(__name__) if do_static: static_url = getattr(settings, 'STATIC_URL', "/static/") static_root = getattr(settings, 'STATIC_ROOT', "./static") app.static(static_url, static_root) app.handle_request = SanicHandler(app) # patch the app to use the django adaptor handler return app
def dashboard(request): # Ideally people should... # `HTTP -X POST -d JSON http://hostname/symbolicate/` # But if they do it directly on the root it should still work, # for legacy reasons. if request.method == 'POST' and request.body: return symbolicate_json(request) absolute_url = request.build_absolute_uri() if ( absolute_url.endswith(settings.LOGIN_REDIRECT_URL) and settings.DEBUG ): # pragma: no cover return redirect('http://localhost:3000' + settings.LOGIN_REDIRECT_URL) return frontend_index_html(request)
def get_cache(cache=None): """Return ``cache`` or the 'default' cache if ``cache`` is not specified or ``cache`` is not configured. :param cache: The name of the requested cache. """ try: # Check for proper Redis persistent backends # FIXME: this logic needs to be a system sanity check if (not settings.DEBUG and cache in PERSISTENT_STORES and (cache not in settings.CACHES or 'RedisCache' not in settings.CACHES[cache]['BACKEND'] or settings.CACHES[cache].get('TIMEOUT', '') is not None)): raise ImproperlyConfigured( 'Pootle requires a Redis-backed caching backend for %r ' 'with `TIMEOUT: None`. Please review your settings.' % cache ) return caches[cache] except InvalidCacheBackendError: return default_cache
def ajax_required(f): """Check that the request is an AJAX request. Use it in your views: @ajax_required def my_view(request): .... Taken from: http://djangosnippets.org/snippets/771/ """ @wraps(f) def wrapper(request, *args, **kwargs): if not settings.DEBUG and not request.is_ajax(): return HttpResponseBadRequest("This must be an AJAX request.") return f(request, *args, **kwargs) return wrapper
def handle_uncaught_exception(request, resolver, exc_info): """ Processing for any otherwise uncaught exceptions (those that will generate HTTP 500 responses). """ if settings.DEBUG_PROPAGATE_EXCEPTIONS: raise logger.error( 'Internal Server Error: %s', request.path, exc_info=exc_info, extra={'status_code': 500, 'request': request}, ) if settings.DEBUG: return debug.technical_500_response(request, *exc_info) # Return an HttpResponse that displays a friendly error message. callback, param_dict = resolver.resolve_error_handler(500) return callback(request, **param_dict)
def dispatch(self, request, *args, **kwargs): """Override to check settings""" if django.VERSION < (1, 10): is_auth = request.user.is_authenticated() else: is_auth = request.user.is_authenticated if not ADMIN_SHELL_ENABLE: return HttpResponseNotFound("Not found: Django admin shell is not enabled") elif is_auth is False or request.user.is_staff is False: return HttpResponseForbidden("Forbidden: To access Django admin shell you must have access the admin site") elif ADMIN_SHELL_ONLY_DEBUG_MODE and settings.DEBUG is False: return HttpResponseForbidden("Forbidden :Django admin shell require DEBUG mode") elif ADMIN_SHELL_ONLY_FOR_SUPERUSER and request.user.is_superuser is False: return HttpResponseForbidden("Forbidden: To access Django admin shell you must be superuser") return super(Shell, self).dispatch(request, *args, **kwargs)
def validate_school_email(email, request=None, site_id=None): email_domain = email.rsplit('@', 1)[-1].lower() school_email_domain = TenantApi.site_settings( 'SCHOOL_EMAIL_DOMAIN', request=request, site_id=site_id, ) allowed = [_domain.strip() for _domain in school_email_domain.split(',')] allowed.append('projectcallisto.org') if email_domain not in allowed and not settings.DEBUG: logger.warning( "non school email used with domain {}".format(email_domain)) raise forms.ValidationError(non_school_email_error( request=request, site_id=site_id, ))
def exception_handler(client, request=None, **kwargs): def actually_do_stuff(request=None, **kwargs): exc_info = sys.exc_info() try: if (django_settings.DEBUG and not client.config.debug) or getattr(exc_info[1], 'skip_elasticapm', False): return client.capture('Exception', exc_info=exc_info, request=request) except Exception as exc: try: client.error_logger.exception(u'Unable to process log entry: %s' % (exc,)) except Exception as exc: warnings.warn(u'Unable to process log entry: %s' % (exc,)) finally: try: del exc_info except Exception as e: client.error_logger.exception(e) return actually_do_stuff(request, **kwargs)
def process_response(self, request, response): if (response.status_code != 404 or _is_ignorable_404(request.get_full_path())): return response if django_settings.DEBUG and not self.client.config.debug: return response data = { 'level': logging.INFO, 'logger': 'http404', } result = self.client.capture( 'Message', request=request, param_message={ 'message': 'Page Not Found: %s', 'params': [request.build_absolute_uri()] }, logger_name='http404', level=logging.INFO ) request._elasticapm = { 'service_name': data.get('service_name', self.client.config.service_name), 'id': result, } return response
def show_up_sense(context, sense=''): """Show AdSense for SENSE_UP""" request = context['request'] if sense == 'user': user = request.user if user.profile.sense_client and user.profile.sense_slot: sense_client = user.profile.sense_client sense_slot = user.profile.sense_slot else: sense_client = settings.SENSE_UP_CLIENT sense_slot = settings.SENSE_UP_SLOT else: sense_client = settings.SENSE_UP_CLIENT sense_slot = settings.SENSE_UP_SLOT sense_enabled = settings.ENABLE_ADSENSE if settings.DEBUG: sense_enabled = False return { 'sense_enabled': sense_enabled, 'sense_native': False, 'sense_client': sense_client, 'sense_slot': sense_slot, }
def show_up_sense_native(context, sense=''): """Show AdSense Native for SENSE_UP""" request = context['request'] if sense == 'user': user = request.user if user.profile.sense_client and user.profile.sense_slot: sense_client = user.profile.sense_client sense_slot = user.profile.sense_slot else: sense_client = settings.SENSE_UP_CLIENT sense_slot = settings.SENSE_UP_SLOT else: sense_client = settings.SENSE_UP_CLIENT sense_slot = settings.SENSE_UP_SLOT sense_enabled = settings.ENABLE_ADSENSE if settings.DEBUG: sense_enabled = False return { 'sense_enabled': sense_enabled, 'sense_native': True, 'sense_client': sense_client, 'sense_slot': sense_slot, }
def show_down_sense(context, sense=''): """Show AdSense for SENSE_DOWN""" request = context['request'] if sense == 'user': user = request.user if user.profile.sense_client and user.profile.sense_slot: sense_client = user.profile.sense_client sense_slot = user.profile.sense_slot else: sense_client = settings.SENSE_DOWN_CLIENT sense_slot = settings.SENSE_DOWN_SLOT else: sense_client = settings.SENSE_DOWN_CLIENT sense_slot = settings.SENSE_DOWN_SLOT sense_enabled = settings.ENABLE_ADSENSE if settings.DEBUG: sense_enabled = False return { 'sense_enabled': sense_enabled, 'sense_native': False, 'sense_client': sense_client, 'sense_slot': sense_slot, }
def show_down_sense_native(context, sense=''): """Show AdSense Native for SENSE_DOWN""" request = context['request'] if sense == 'user': user = request.user if user.profile.sense_client and user.profile.sense_slot: sense_client = user.profile.sense_client sense_slot = user.profile.sense_slot else: sense_client = settings.SENSE_DOWN_CLIENT sense_slot = settings.SENSE_DOWN_SLOT else: sense_client = settings.SENSE_DOWN_CLIENT sense_slot = settings.SENSE_DOWN_SLOT sense_enabled = settings.ENABLE_ADSENSE if settings.DEBUG: sense_enabled = False return { 'sense_enabled': sense_enabled, 'sense_native': True, 'sense_client': sense_client, 'sense_slot': sense_slot, }
def __init__(self, *args, **kwargs): self.username = getattr(settings, 'ALWAYS_AUTHENTICATED_USERNAME', 'user') self.defaults = getattr(settings, 'ALWAYS_AUTHENTICATED_USER_DEFAULTS', {}) if (not settings.DEBUG and getattr(settings,'ALWAYS_AUTHENTICATED_DEBUG_ONLY', True)): raise ImproperlyConfigured( 'DEBUG=%s, but AlwaysAuthenticatedMiddleware is configured to ' 'only run in debug mode.\n' 'Remove AlwaysAuthenticatedMiddleware from ' 'MIDDLEWARE/MIDDLEWARE_CLASSES or set ' 'ALWAYS_AUTHENTICATED_DEBUG_ONLY to False.' % settings.DEBUG) super(AlwaysAuthenticatedMiddleware, self).__init__(*args, **kwargs)
def run_check(id, request=None, fail_silently=True, fail_status=500): status = 200 try: v = config.checks[id] if isinstance(v, six.string_types): c = import_string(v) ret, status = c(request) elif callable(v): ret, status = v(request) else: ret = v except Exception as e: ret = "ERROR" status = fail_status logger.exception(e) if settings.DEBUG: ret = str(e) if not fail_silently: raise return ret, status
def get_extra(config, request=None): extras = {} for k, v in config.extra.items(): try: if isinstance(v, six.string_types): c = import_string(v) extras[k] = c(request) elif callable(v): extras[k] = v(request) else: extras[k] = v except Exception as e: logger.exception(e) if settings.DEBUG: extras[k] = str(e) return extras
def add_arguments(self, parser): parser.add_argument("--verbose", action="store_true", default=False, help="Be very chatty and run logging at DEBUG") parser.add_argument("--chunk-size", dest="chunk_size", default=DEFAULT_CHUNK_SIZE, type=int, help="The number of records to batch process at once") parser.add_argument("--num-iterations", dest="num_iterations", default=DEFAULT_NUM_ITERATIONS, type=int, help="The number of times to loop through `chunk_size` records") parser.add_argument("--num-threads", dest="num_threads", default=DEFAULT_NUM_THREADS, type=int, help="The number of threads to start up at once")
def setup_logging(debug=False): """ Configures logging in cases where a Django environment is not supposed to be configured. TODO: This is really confusing, importing django settings is allowed to fail when debug=False, but if it's true it can fail? """ try: from django.conf.settings import LOGGING except ImportError: from kolibri.deployment.default.settings.base import LOGGING if debug: from django.conf import settings settings.DEBUG = True LOGGING['handlers']['console']['level'] = 'DEBUG' LOGGING['loggers']['kolibri']['level'] = 'DEBUG' logger.debug("Debug mode is on!") logging.config.dictConfig(LOGGING)
def send_reply(content_id): """Handle sending a Content object that is a reply out via the federation layer. Currently we only deliver public content. """ try: content = Content.objects.get(id=content_id, visibility=Visibility.PUBLIC, content_type=ContentType.REPLY, local=True) except Content.DoesNotExist: logger.warning("No content found with id %s", content_id) return entity = make_federable_content(content) if not entity: logger.warning("send_reply - No entity for %s", content) if settings.DEBUG: # Don't send in development mode return # Send directly (remote parent) or as a relayable (local parent) if content.parent.local: forward_entity(entity, content.parent.id) else: # We only need to send to the original author recipients = [ (content.parent.author.handle, None), ] handle_send(entity, content.author, recipients)