我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用django.core.management.base.CommandError()。
def handle_template(self, template, subdir): """ Determines where the app or project templates are. Use django.__path__[0] as the default because we don't know into which directory Django has been installed. """ if template is None: return path.join(django.__path__[0], 'conf', subdir) else: if template.startswith('file://'): template = template[7:] expanded_template = path.expanduser(template) expanded_template = path.normpath(expanded_template) if path.isdir(expanded_template): return expanded_template if self.is_url(template): # downloads the file and returns the path absolute_path = self.download(template) else: absolute_path = path.abspath(expanded_template) if path.exists(absolute_path): return self.extract(absolute_path) raise CommandError("couldn't handle %s template %s." % (self.app_or_project, template))
def handle(self, *args, **options): print('Populate Genes!') populate_genes() print('Populate Diseases from OMIM!') populate_diseases() #populate diseases from OMIM print('Populate Diseases from CGD!') populate_CGD() #populate dieases from CGD #populate dieases from CGD # for poll_id in args: # try: # poll = Poll.objects.get(pk=int(poll_id)) # except Poll.DoesNotExist: # raise CommandError('Poll "%s" does not exist' % poll_id) # poll.opened = False # poll.save() # self.stdout.write('Successfully closed poll "%s"' % poll_id)
def handle(self, *args, **options): report = options['report'] or 'monthly' if (report in ['daily', 'monthly', 'quarterly', 'yearly'] and options['subscription_start'] is None ): raise CommandError("{} report requires --subscription-start" .format(report)) report_method = getattr(self, '{}_report'.format(report)) headers, rows = report_method(options) if options['recent']: rows = rows[-options['recent']:] # reports are chronologically ascending by default (mainly because of # enumerations), but for display we prefer reversed by default. if not options['ascending']: rows = reversed(rows) return headers, rows
def extract(self, filename): """ Extracts the given file to a temporarily and returns the path of the directory with the extracted content. """ prefix = 'django_%s_template_' % self.app_or_project tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_extract') self.paths_to_remove.append(tempdir) if self.verbosity >= 2: self.stdout.write("Extracting %s\n" % filename) try: archive.extract(filename, tempdir) return tempdir except (archive.ArchiveException, IOError) as e: raise CommandError("couldn't extract file %s to %s: %s" % (filename, tempdir, e))
def build_potfiles(self): """ Build pot files and apply msguniq to them. """ file_list = self.find_files(".") self.remove_potfiles() self.process_files(file_list) potfiles = [] for path in self.locale_paths: potfile = os.path.join(path, '%s.pot' % str(self.domain)) if not os.path.exists(potfile): continue args = ['msguniq'] + self.msguniq_options + [potfile] msgs, errors, status = gettext_popen_wrapper(args) if errors: if status != STATUS_OK: raise CommandError( "errors happened while running msguniq\n%s" % errors) elif self.verbosity > 0: self.stdout.write(errors) with io.open(potfile, 'w', encoding='utf-8') as fp: fp.write(msgs) potfiles.append(potfile) return potfiles
def _check_permission_clashing(custom, builtin, ctype): """ Check that permissions for a model do not clash. Raises CommandError if there are duplicate permissions. """ pool = set() builtin_codenames = set(p[0] for p in builtin) for codename, _name in custom: if codename in pool: raise CommandError( "The permission codename '%s' is duplicated for model '%s.%s'." % (codename, ctype.app_label, ctype.model_class().__name__)) elif codename in builtin_codenames: raise CommandError( "The permission codename '%s' clashes with a builtin permission " "for model '%s.%s'." % (codename, ctype.app_label, ctype.model_class().__name__)) pool.add(codename)
def validate_name(self, name, app_or_project): if name is None: raise CommandError("you must provide %s %s name" % ( "an" if app_or_project == "app" else "a", app_or_project)) # If it's not a valid directory name. if six.PY2: if not re.search(r'^[_a-zA-Z]\w*$', name): # Provide a smart error message, depending on the error. if not re.search(r'^[_a-zA-Z]', name): message = 'make sure the name begins with a letter or underscore' else: message = 'use only numbers, letters and underscores' raise CommandError("%r is not a valid %s name. Please %s." % (name, app_or_project, message)) else: if not name.isidentifier(): raise CommandError( "%r is not a valid %s name. Please make sure the name is " "a valid identifier." % (name, app_or_project) )
def build_potfiles(self): """ Build pot files and apply msguniq to them. """ file_list = self.find_files(".") self.remove_potfiles() self.process_files(file_list) potfiles = [] for path in self.locale_paths: potfile = os.path.join(path, '%s.pot' % str(self.domain)) if not os.path.exists(potfile): continue args = ['msguniq'] + self.msguniq_options + [potfile] msgs, errors, status = popen_wrapper(args) if errors: if status != STATUS_OK: raise CommandError( "errors happened while running msguniq\n%s" % errors) elif self.verbosity > 0: self.stdout.write(errors) msgs = normalize_eols(msgs) with io.open(potfile, 'w', encoding='utf-8') as fp: fp.write(msgs) potfiles.append(potfile) return potfiles
def handle(self, **options): project_name, target = options.pop('name'), options.pop('directory') self.validate_name(project_name, "project") # Check that the project_name cannot be imported. try: import_module(project_name) except ImportError: pass else: raise CommandError( "%r conflicts with the name of an existing Python module and " "cannot be used as a project name. Please try another name." % project_name ) # Create a random SECRET_KEY to put it in the main settings. options['secret_key'] = get_random_secret_key() super(Command, self).handle('project', project_name, target, **options)
def handle(self, **options): app_name, target = options.pop('name'), options.pop('directory') self.validate_name(app_name, "app") # Check that the app_name cannot be imported. try: import_module(app_name) except ImportError: pass else: raise CommandError( "%r conflicts with the name of an existing Python module and " "cannot be used as an app name. Please try another name." % app_name ) super(Command, self).handle('app', app_name, target, **options)
def handle(self, *args, **options): """ Make it happen. """ super(Command, self).handle(*args, **options) if not CandidateContest.objects.exists(): error_message = 'No contests currently loaded (run loadocdcandidatecontests).' if self._called_from_command_line: self.failure(error_message) else: raise CommandError(error_message) else: form501_count = Form501Filing.objects.without_candidacy().count() self.header( "Processing %s Form 501 filings without candidacies" % form501_count ) self.load() self.success("Done!")
def handle(self, *args, **options): email = options['email'] if ' ' in email or email.count('@') != 1: raise CommandError(f'Invalid email {email!r}') users = find_users( settings.OIDC_RP_CLIENT_ID, settings.OIDC_RP_CLIENT_SECRET, urlparse(settings.OIDC_OP_USER_ENDPOINT).netloc, email, requests, ) for user in users: if user.get('blocked'): self.stdout.write(self.style.ERROR('BLOCKED!')) else: self.stdout.write(self.style.SUCCESS('NOT blocked!')) break else: self.stdout.write(self.style.WARNING( f'{email} could not be found in Auth0' ))
def handle(self, **options): if bool(options['user']) == options['all']: raise CommandError("Either provide a 'user' to verify or " "use '--all' to verify all users") if options['all']: for user in get_user_model().objects.hide_meta(): try: utils.verify_user(user) self.stdout.write("Verified user '%s'" % user.username) except (ValueError, ValidationError) as e: self.stderr.write(e.message) if options['user']: for user in options['user']: try: utils.verify_user(self.get_user(user)) self.stdout.write("User '%s' has been verified" % user) except (ValueError, ValidationError) as e: self.stderr.write(e.message)
def build_row_data(self, rawrow): if len(rawrow) < len(self.name2col): raise CommandError('line %d: too few columns' % self.rowcount) statusval = rawrow[self.name2col['status']] if len(statusval) == 0: statusval = '0' row = RowData( kit=re.sub('_', ' ', rawrow[self.name2col['kit']]).strip(), subkit=re.sub('_', ' ', rawrow[self.name2col['subkit']]).strip(), barcode=rawrow[self.name2col['barcode']], index_seq=rawrow[self.name2col['index_seq']], index=rawrow[self.name2col['index']], adapter_seq=rawrow[self.name2col['adapter_seq']], version=rawrow[self.name2col['version']], manufacturer=rawrow[self.name2col['manufacturer']], model_range=rawrow[self.name2col['model_range']], status=statusval, ) return row
def handle(self, *args, **opts): user = None if opts['username'] is not None: user = User.objects.get(username=opts['username']) elif opts['userid'] is not None: user = User.objects.get(id=opts['userid']) else: raise CommandError('Either username or userid is required') if opts['clear']: Run.objects.all().delete() Adapter.objects.all().delete() Kit.objects.all().delete() with open(opts['csvfile'], 'r') as f: if opts['format'] == 'chaim': self.import_chaim_csv(user, f) else: self.import_csv(user, f)
def _download_latest_clinvar_xml(self, dest_dir): ftp = FTP('ftp.ncbi.nlm.nih.gov') ftp.login() ftp.cwd(CV_XML_DIR) # sort just in case the ftp lists the files in random order cv_xml_w_date = sorted( [f for f in ftp.nlst() if re.match(CV_XML_REGEX, f)]) if len(cv_xml_w_date) == 0: raise CommandError('ClinVar reporting zero XML matching' + ' regex: \'{0}\' in directory {1}'.format( CV_XML_REGEX, CV_XML_DIR)) ftp_xml_filename = cv_xml_w_date[-1] dest_filepath = os.path.join(dest_dir, ftp_xml_filename) with open(dest_filepath, 'w') as fh: ftp.retrbinary('RETR {0}'.format(ftp_xml_filename), fh.write) return dest_filepath, ftp_xml_filename
def _create(self, fixture_file_path, less_verbose=0): try: self.write_info('Creating fixture %s' % fixture_file_path, 1+less_verbose) fixture_file_path = re.sub(r'\.zip$', '', fixture_file_path) # we strip away .zip if given tmp_dir = tempfile.mkdtemp() # copy media root shutil.copytree(self._media_root, join(tmp_dir, 'MEDIA_ROOT')) # database dump with open(join(tmp_dir, 'db.sql'), 'w') as fp: return_code = subprocess.call(['pg_dump', '--clean', '--no-owner', self._database_name], stdout=fp) if return_code != 0: raise CommandError('pg_dump failed with exit code {}'.format(return_code)) # creating the fixture archive archive_name = shutil.make_archive(fixture_file_path, 'zip', root_dir=tmp_dir) self.write_debug(subprocess.check_output(['unzip', '-l', archive_name])) except: self.write_debug('Temporary directory %s kept due to exception.' % tmp_dir) raise else: self.write_info('... fixture created', 1+less_verbose) shutil.rmtree(tmp_dir)
def handle(self, *args, **options): if options['username']: users = list(User.objects.filter(username__in=options['username'])) usernames = {user.username for user in users} if usernames != set(options['username']): raise CommandError('User mismatch: couldn\'t find "{}"'.format( '", "'.join(set(options['username']) - usernames))) else: users = list(User.objects.filter(is_active=True, email_verified=True)) for user in users: self.stdout.write(user.username, ending=' ') if not user.is_active or not user.email_verified: self.stdout.write(self.style.WARNING('SKIP')) continue try: self.send_update(user) self.stdout.write(self.style.SUCCESS('OK')) except Exception as ex: self.stdout.write(self.style.ERROR('failed: {}'.format(repr(ex))))
def create_cache(sizes, options): """ Clears the cache for the given files """ size_list = [size.strip(' ,') for size in sizes] if len(size_list) < 1: sizes = PhotoSize.objects.all() else: sizes = PhotoSize.objects.filter(name__in=size_list) if not len(sizes): raise CommandError('No photo sizes were found.') print('Flushing cache...') for cls in ImageModel.__subclasses__(): for photosize in sizes: print('Flushing %s size images' % photosize.name) for obj in cls.objects.all(): obj.remove_size(photosize)
def create_cache(sizes, options): """ Creates the cache for the given files """ reset = options.get('reset', None) size_list = [size.strip(' ,') for size in sizes] if len(size_list) < 1: sizes = PhotoSize.objects.filter(pre_cache=True) else: sizes = PhotoSize.objects.filter(name__in=size_list) if not len(sizes): raise CommandError('No photo sizes were found.') print('Caching photos, this may take a while...') for cls in ImageModel.__subclasses__(): for photosize in sizes: print('Cacheing %s size images' % photosize.name) for obj in cls.objects.all(): if reset: obj.remove_size(photosize) obj.create_size(photosize)
def handle(self, *args, **options): path = options.get('path') or settings.CARD_IMAGES_PATH path = os.path.realpath(path) if not os.path.exists(path): raise CommandError('Image path does not exist') def check_extension(f): f = f.lower() return f[f.rfind('.'):] in SUPPORTED_FORMATS for *_, files in os.walk(path): available = set(filter(check_extension, files)) registered = {c.path for c in Card.objects.all()} Card.objects.exclude(path__in=available).delete() for path in available.difference(registered): Card(path=path).save()
def handle(self, addrport=None, *args, **options): # deprecation warning to announce future deletion in R21 util.warn("""This command is deprecated. You should now run your application with the WSGI interface installed with your project. Ex.: gunicorn myproject.wsgi:application See https://docs.djangoproject.com/en/1.5/howto/deployment/wsgi/gunicorn/ for more info.""") if args: raise CommandError('Usage is run_gunicorn %s' % self.args) if addrport: sys.argv = sys.argv[:-1] options['bind'] = addrport admin_media_path = options.pop('admin_media_path', '') DjangoApplicationCommand(options, admin_media_path).run()
def get_resetable_apps(app_labels=()): """ ?????? ??????????, ??? ???????? ????? ???????? """ local_apps = {} for app in apps.get_apps(): app_path = apps._get_app_path(app) if app_path.startswith(settings.BASE_DIR): app_name = app.__name__.rsplit('.', 1)[0] local_apps[app_name] = app_path if app_labels: result_apps = {} for app_label in app_labels: if app_label in local_apps: result_apps[app_label] = local_apps[app_label] else: raise CommandError('application %s not found' % app_label) else: return result_apps else: return local_apps
def start_karma(self): logger.info( 'Starting karma test watcher process from Django runserver command' ) self.karma_process = subprocess.Popen( 'yarn run test-karma:watch', shell=True, stdin=subprocess.PIPE, stdout=sys.stdout, stderr=sys.stderr) if self.karma_process.poll() is not None: raise CommandError( 'Karma process failed to start from Django runserver command') logger.info( 'Django Runserver command has spawned a Karma test watcher process on pid {0}'. format(self.karma_process.pid)) self.karma_process.wait() if self.karma_process.returncode != 0 and not self.karma_cleanup_closing: logger.error("Karma process exited unexpectedly.")
def handle(self, root_url='/', stdout=None, stderr=None, **options): """Performs the operation""" operation = self.operation_class( root_url=root_url, error_class=CommandError, stdout=self.stdout, stderr=self.stderr, **options ) return operation.run()
def handle(self, *args, **kwargs): program_list = list(self.find_programs()) if len(program_list) < 2: raise CommandError("Only works with multiple programs") self.report_on_differences(program_list, 'groups', lambda i: i['attributes']['name']) self.stdout.write('') self.report_on_differences(program_list, 'members', lambda i: i['relationships']['user']['data']['attributes']['username']) self.stdout.write('') self.report_on_perm_differences(program_list)
def handle(self, *args, **options): try: user = User.objects.get(username=options['username']) except User.DoesNotExist: raise CommandError('User named does not exist'.format(options['username'])) token, created = Token.objects.get_or_create(user = user) self.stdout.write('Token: {}'.format(token.key))
def handle(self, *args, **options): try: existing = User.objects.get(username=options['user']) except User.DoesNotExist: existing = False if existing: raise CommandError('User named {} already exists'.format(options['user'])) password = os.environ.get('ADMINUSER_PASS') if password: password_source = 'env' else: password_source = 'random' password = User.objects.make_random_password() password = os.environ.get('ADMINUSER_PASS', User.objects.make_random_password()) admin = User( username = options['user'], email = options['email'], is_staff = True, is_superuser = True, ) admin.set_password(password) admin.save() self.stdout.write(self.style.SUCCESS('Created superuser named {user}'.format(**options))) if password_source == 'random': self.stdout.write('Password: ' + password)
def handle(self, *args, **options): if options.get('number'): number = options['number'] else: number = DEFAULT_NUMBER_OF_ITEMS_TO_CREATE with_books = bool(options.get('with_books')) if with_books: try: books = factories.BookFactory.create_batch(number) print("{} book objects created.".format(number)) except Exception as err: raise CommandError(str(err)) try: book = factories.SingleBookFactory() print("A single book object is created.") except Exception as err: raise CommandError(str(err)) try: addresses = factories.AddressFactory.create_batch(number) print("{} address objects created.".format(number)) except Exception as err: raise CommandError(str(err))
def handle(self, file_name=None, **options): project_name = os.path.basename(os.getcwd()) dst = file_name is not None and file_name or DEFAULT_FILE_NAME if os.path.exists(dst): raise CommandError('Error: file "%s" already exists' % dst) open(dst, 'w').write(render_to_string('cml/cml-pipelines.txt', { 'project': project_name, 'file': os.path.basename(dst).split('.')[0] })) self.stdout.write('"%s" written.' % os.path.join(dst))
def __call__(self, parser, namespace, values, option_string=None): try: dt = dateutil.parser.parse(values) except ValueError: raise CommandError("can't parse date: {}".format(values)) if dt.tzinfo is None: dt = timezone.make_aware(dt) setattr(namespace, self.dest, dt)
def handle(self, *args, **options): filename = options['filename'] datatype = options['datatype'] filetype = options['filetype'] coordSystem = options['coordSystem'] coordSystem2 = options['coordSystem2'] # coord = options['coord'] uid = options.get('uid') or slugid.nice().decode('utf-8') name = options.get('name') or op.split(filename)[1] if options['no_upload']: if not op.isfile(op.join(settings.MEDIA_ROOT, filename)): raise CommandError('File does not exist under media root') django_file = filename else: django_file = File(open(filename, 'rb')) # remove the filepath of the filename django_file.name = op.split(django_file.name)[1] tm.Tileset.objects.create( datafile=django_file, filetype=filetype, datatype=datatype, coordSystem=coordSystem, coordSystem2=coordSystem2, owner=None, uuid=uid, name=name)
def validate_name(self, name, app_or_project): if name is None: raise CommandError("you must provide %s %s name" % ( "an" if app_or_project == "app" else "a", app_or_project)) # If it's not a valid directory name. if not re.search(r'^[_a-zA-Z]\w*$', name): # Provide a smart error message, depending on the error. if not re.search(r'^[_a-zA-Z]', name): message = 'make sure the name begins with a letter or underscore' else: message = 'use only numbers, letters and underscores' raise CommandError("%r is not a valid %s name. Please %s." % (name, app_or_project, message))
def check_programs(*programs): for program in programs: if find_command(program) is None: raise CommandError("Can't find %s. Make sure you have GNU " "gettext tools 0.15 or newer installed." % program)
def gettext_version(self): # Gettext tools will output system-encoded bytestrings instead of UTF-8, # when looking up the version. It's especially a problem on Windows. out, err, status = gettext_popen_wrapper( ['xgettext', '--version'], stdout_encoding=DEFAULT_LOCALE_ENCODING, ) m = re.search(r'(\d+)\.(\d+)\.?(\d+)?', out) if m: return tuple(int(d) for d in m.groups() if d is not None) else: raise CommandError("Unable to get gettext version. Is it installed?")
def write_po_file(self, potfile, locale): """ Creates or updates the PO file for self.domain and :param locale:. Uses contents of the existing :param potfile:. Uses msgmerge, and msgattrib GNU gettext utilities. """ basedir = os.path.join(os.path.dirname(potfile), locale, 'LC_MESSAGES') if not os.path.isdir(basedir): os.makedirs(basedir) pofile = os.path.join(basedir, '%s.po' % str(self.domain)) if os.path.exists(pofile): args = ['msgmerge'] + self.msgmerge_options + [pofile, potfile] msgs, errors, status = gettext_popen_wrapper(args) if errors: if status != STATUS_OK: raise CommandError( "errors happened while running msgmerge\n%s" % errors) elif self.verbosity > 0: self.stdout.write(errors) else: with io.open(potfile, 'r', encoding='utf-8') as fp: msgs = fp.read() if not self.invoked_for_django: msgs = self.copy_plural_forms(msgs, locale) msgs = msgs.replace( "#. #-#-#-#-# %s.pot (PACKAGE VERSION) #-#-#-#-#\n" % self.domain, "") with io.open(pofile, 'w', encoding='utf-8') as fp: fp.write(msgs) if self.no_obsolete: args = ['msgattrib'] + self.msgattrib_options + ['-o', pofile, pofile] msgs, errors, status = gettext_popen_wrapper(args) if errors: if status != STATUS_OK: raise CommandError( "errors happened while running msgattrib\n%s" % errors) elif self.verbosity > 0: self.stdout.write(errors)
def show_list(self, connection, app_names=None): """ Shows a list of all migrations on the system, or only those of some named apps. """ # Load migrations from disk/DB loader = MigrationLoader(connection, ignore_no_migrations=True) graph = loader.graph # If we were passed a list of apps, validate it if app_names: invalid_apps = [] for app_name in app_names: if app_name not in loader.migrated_apps: invalid_apps.append(app_name) if invalid_apps: raise CommandError("No migrations present for: %s" % (", ".join(invalid_apps))) # Otherwise, show all apps in alphabetic order else: app_names = sorted(loader.migrated_apps) # For each app, print its migrations in order from oldest (roots) to # newest (leaves). for app_name in app_names: self.stdout.write(app_name, self.style.MIGRATE_LABEL) shown = set() for node in graph.leaf_nodes(app_name): for plan_node in graph.forwards_plan(node): if plan_node not in shown and plan_node[0] == app_name: # Give it a nice title if it's a squashed one title = plan_node[1] if graph.nodes[plan_node].replaces: title += " (%s squashed migrations)" % len(graph.nodes[plan_node].replaces) # Mark it as applied/unapplied if plan_node in loader.applied_migrations: self.stdout.write(" [X] %s" % title) else: self.stdout.write(" [ ] %s" % title) shown.add(plan_node) # If we didn't print anything, then a small message if not shown: self.stdout.write(" (no migrations)", self.style.MIGRATE_FAILURE)