我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.management.CommandError()。
def test_users_missing(settings): out = io.StringIO() user1 = UserFactory.create() user2 = UserFactory.create() with pytest.raises(CommandError) as exc: call_command('sso_ping_discourse', 'bar', user1.username, 'baz', stdout=out) assert user1.username not in out.getvalue() assert user2.username not in str(exc.value) assert user2.username not in out.getvalue() assert str(exc.value) in ( 'User mismatch: couldn\'t find "bar", "baz"', 'User mismatch: couldn\'t find "baz", "bar"', )
def handle(self, *args, **options): domains = Domain.objects.all() if options['domain-name']: domains = domains.filter(name__in=options['domain-name']) domain_names = domains.values_list('name', flat=True) for domain_name in options['domain-name']: if domain_name not in domain_names: raise CommandError('{} is not a known domain'.format(domain_name)) for domain in domains: self.stdout.write('%s ...' % domain.name, ending='') try: domain.sync_from_pdns() self.stdout.write(' synced') except Exception as e: if str(e).startswith('Could not find domain ') \ and domain.owner.captcha_required: self.stdout.write(' skipped') else: self.stdout.write(' failed') msg = 'Error while processing {}: {}'.format(domain.name, e) raise CommandError(msg)
def handle(self, *args, **options): model_name = options.get('model') from_field = options.get('from_field') to_field = options.get('to_field') if not model_name: raise CommandError('--model_name is a required argument') if not from_field: raise CommandError('--from_field is a required argument') if not to_field: raise CommandError('--to_field is a required argument') model = apps.get_model(model_name) content_type = ContentType.objects.get_for_model(model) field_histories = FieldHistory.objects.filter(content_type=content_type, field_name=from_field) self.stdout.write('Updating {} FieldHistory object(s)\n'.format(field_histories.count())) field_histories.update(field_name=to_field)
def handle_subcommnad(self, root, cmd, database, options, subcmd, argvs): cmd_type = None if subcmd == 'create-all': cmd_type = CreateAllSubCommand(root, cmd, database, options, self.stdout, self.stderr, True) elif subcmd == 'drop-all': cmd_type = DropAllSubCommand(root, cmd, database, options, self.stdout, self.stderr, True) if cmd_type is not None: cmd_type.run_from_argv(argvs) else: if options.traceback: raise CommandError('Invalid Command.') self.stderr.write('CommandError: Invalid Command.') sys.exit(1)
def handle(self, *args, **options): try: from_site = int(options.get('from_site', None)) except Exception: from_site = settings.SITE_ID try: to_site = int(options.get('to_site', None)) except Exception: to_site = settings.SITE_ID try: assert from_site != to_site except AssertionError: raise CommandError('Sites must be different') from_site = self.get_site(from_site) to_site = self.get_site(to_site) pages = Page.objects.drafts().filter(site=from_site, depth=1) with transaction.atomic(): for page in pages: page.copy_page(None, to_site) self.stdout.write('Copied CMS Tree from SITE_ID {0} successfully to SITE_ID {1}.\n'.format(from_site.pk, to_site.pk))
def handle(self, *args, **kwargs): migrator = MigrationSession(self.stderr, kwargs['database']) failure = False try: migrator.apply_all() except CommandError as e: self.stderr.write("Migration error: {}".format(e)) failure = True state = dump_migration_session_state(migrator.state) self.stdout.write(state) if kwargs['output_file']: with open(kwargs['output_file'], 'w') as outfile: outfile.write(state) sys.exit(int(failure))
def handle(self, *args, **options): url = options['url'] name = options['name'] if url or name: if not name or not url: raise CommandError(MISSING_ARG_MSG) index_catalog(DataJson(url), name) return catalogs = yaml.load(requests.get(CATALOGS_INDEX).text) for catalog_id, values in catalogs.items(): if values['federado'] and values['formato'] == 'json': try: catalog = DataJson(values['url']) except (IOError, ValueError) as e: logging.warn(READ_ERROR, catalog_id, e) continue index_catalog(catalog, catalog_id)
def handle(self, **options): app_name, target = options.pop('name'), options.pop('directory') # Set the top directory to root of Biohub instead of current # path. if target is None: target = os.path.join(settings.BIOHUB_DIR, app_name) try: os.makedirs(target) except OSError as e: if e.errno == errno.EEXIST: message = "'%s' already exists" % target else: message = e raise CommandError(message) # Use custom app template if options['template'] is None: options['template'] = os.path.join( settings.BIOHUB_CORE_DIR, 'app_template') super(StartappCommand, self).handle('app', app_name, target, **options)
def test_no_matching_fingerprint_raises_error(self): out = StringIO() err = StringIO() missing_fingerprint = '01234567890ABCDEF01234567890ABCDEF01234567' rgx = re.compile(r'''^Key matching fingerprint '{fp}' not ''' r'''found.$'''.format(fp=missing_fingerprint)) self.assertEquals(Key.objects.count(), 0) with self.assertRaisesRegex(CommandError, rgx): call_command('email_signing_key', missing_fingerprint, stdout=out, stderr=err) self.assertEquals(out.getvalue(), '') self.assertEquals(err.getvalue(), '')
def test_startdash_usage_empty(self): self.assertRaises(CommandError, call_command, 'startdash')
def test_startpanel_usage_empty(self): self.assertRaises(CommandError, call_command, 'startpanel')
def get_or_create_processed_version(self): """ Get or create the current processed version. Return a tuple (ProcessedDataVersion object, created), where created is a boolean specifying whether a version was created. """ # get the latest raw data version try: latest_raw_version = RawDataVersion.objects.latest( 'release_datetime', ) except RawDataVersion.DoesNotExist: raise CommandError( 'No raw CAL-ACCESS data loaded (run `python manage.py ' 'updatecalaccessrawdata`).' ) # check if latest raw version update completed if latest_raw_version.update_stalled: msg_tmp = 'Update to raw version released at %s did not complete' raise CommandError( msg_tmp % latest_raw_version.release_datetime.ctime() ) return ProcessedDataVersion.objects.get_or_create( raw_version=latest_raw_version, )
def handle(self, **options): if options['plain']: warnings.warn( "The --plain option is deprecated in favor of the -i python or --interface python option.", RemovedInDjango20Warning ) options['interface'] = 'python' # Execute the command and exit. if options['command']: exec(options['command']) return # Execute stdin if it has anything to read and exit. # Not supported on Windows due to select.select() limitations. if sys.platform != 'win32' and select.select([sys.stdin], [], [], 0)[0]: exec(sys.stdin.read()) return available_shells = [options['interface']] if options['interface'] else self.shells for shell in available_shells: try: return getattr(self, shell)(options) except ImportError: pass raise CommandError("Couldn't import {} interface.".format(shell))
def test_call_without_apps_arguments_raise_command_error(self): """ Command call without applications lists should return usage info """ with self.assertRaises(CommandError): call_command(self.COMMAND_NAME)
def test_parent_command_throws_exception_if_no_subcommand(self): with self.assertRaises(CommandError): call_command( 'parent_command', stdout=StringIO(), interactive=False)
def get_past_ungraded_course_run(user=None, course=None, now=None): """Loop through past course runs and find one without grade data""" past_runs = CourseRun.objects.filter( course=course, end_date__lt=now, ).exclude(end_date=None).order_by('-end_date').all() for past_run in past_runs: if not (CachedCurrentGradeHandler(user).exists(past_run) or FinalGrade.objects.filter(user=user, course_run=past_run).exists()): return past_run raise CommandError("Can't find past run that isn't already passed/failed for Course '{}'".format(course.title))
def handle(self, *args, **kwargs): # pylint: disable=unused-argument if not settings.FEATURES.get('OPEN_DISCUSSIONS_USER_SYNC', False): raise CommandError('OPEN_DISCUSSIONS_USER_SYNC is set to False (so disabled).') sync_discussion_users.delay() self.stdout.write(self.style.SUCCESS('Async job to backfill users submitted'))
def handle(self, *args, **kwargs): # pylint: disable=unused-argument edx_course_key = kwargs.get('edx_course_key') try: run = CourseRun.objects.get(edx_course_key=edx_course_key) except CourseRun.DoesNotExist: raise CommandError('Course Run for course_id "{}" does not exist'.format(edx_course_key)) try: can_freeze = run.can_freeze_grades except ImproperlyConfigured: raise CommandError('Course Run for course_id "{}" is missing the freeze date'.format(edx_course_key)) if not can_freeze: raise CommandError('Course Run for course_id "{}" cannot be frozen yet'.format(edx_course_key)) if CourseRunGradingStatus.is_complete(run): self.stdout.write( self.style.SUCCESS( 'Final grades for course "{0}" are already complete'.format(edx_course_key) ) ) return freeze_course_run_final_grades.delay(run.id) self.stdout.write( self.style.SUCCESS( 'Successfully submitted async task to freeze final grades for course "{0}"'.format(edx_course_key) ) )
def check_apps(apps): """Check if a list of apps is entirely contained in the list of installed apps.""" for app in apps: installed_apps = settings.INSTALLED_APPS if app not in installed_apps: raise CommandError('App %s not contained in INSTALLED_APPS %s' % (app, settings.INSTALLED_APPS))
def test_renamefieldhistory_model_arg_is_required(self): Person.objects.create(name='Initial Name') self.assertEqual(FieldHistory.objects.filter(field_name='name').count(), 1) with self.assertRaises(CommandError): call_command('renamefieldhistory', from_field='name', to_field='name2')
def test_renamefieldhistory_from_field_arg_is_required(self): Person.objects.create(name='Initial Name') self.assertEqual(FieldHistory.objects.filter(field_name='name').count(), 1) with self.assertRaises(CommandError): call_command('renamefieldhistory', model='tests.Person', to_field='name2')
def test_renamefieldhistory_to_field_arg_is_required(self): Person.objects.create(name='Initial Name') self.assertEqual(FieldHistory.objects.filter(field_name='name').count(), 1) with self.assertRaises(CommandError): call_command('renamefieldhistory', model='tests.Person', from_field='name')
def handle(self, *args, **options): date = datetime.now().date() backup_name = '{}.zip'.format(date.strftime('%Y_%m_%d')) if args: backup_dir = os.path.abspath(args[0]) else: backup_dir = settings.BACKUP_ROOT if not os.path.exists(backup_dir): raise CommandError('output directory does not exists') backup_path = os.path.join(backup_dir, backup_name) with zipfile.ZipFile(backup_path, 'w') as ziph: for root, dirs, files in os.walk(settings.MEDIA_ROOT): for file in files: abspath = os.path.abspath(os.path.join(root, file)) relpath = os.path.relpath(abspath, settings.MEDIA_ROOT) ziph.write(abspath, os.path.join('media', relpath)) # db dump dump_path = os.path.join(settings.BASE_DIR, 'dump.json') call_command('dump', output=dump_path) ziph.write(dump_path, 'dump.json') os.unlink('dump.json') self.stdout.write('backup saved to "%s"' % backup_path)
def handle(self, *args, **options): if not all(getattr(settings, name) for name in ['FXA_ACCESS_KEY_ID', 'FXA_SECRET_ACCESS_KEY', 'FXA_S3_BUCKET']): raise CommandError('FXA S3 Bucket access not configured') main() if options['cron']: log('cron schedule starting') schedule.start()
def handle(self, *args, **options): if settings.MAINTENANCE_MODE: raise CommandError('Command unavailable in maintenance mode') count = 0 for task in QueuedTask.objects.all()[:options['num_tasks']]: task.retry() count += 1 print '{} processed. {} remaining.'.format(count, QueuedTask.objects.count())
def run_from_argv(self, argv): """ Set up any environment changes requested (e.g., Python path and Django settings), then run this command. If the command raises a ``CommandError``, intercept it and print it sensibly to stderr. If the ``--traceback`` option is present or the raised ``Exception`` is not ``CommandError``, raise it. """ self._called_from_command_line = True parser = self.create_parser() options = parser.parse_args(argv) cmd_options = vars(options) # Move positional args out of options to mimic legacy optparse args = cmd_options.pop('args', ()) try: self.execute(*args, **cmd_options) except Exception as e: if self._django_options.traceback or not isinstance(e, CommandError): raise # SystemCheckError takes care of its own formatting. if isinstance(e, SystemCheckError): self.stderr.write(str(e), lambda x: x) else: self.stderr.write('%s: %s' % (e.__class__.__name__, e)) sys.exit(1) finally: try: connections.close_all() except ImproperlyConfigured: # Ignore if connections aren't setup at this point (e.g. no # configured settings). pass
def _localimport(drive_id, channel_id, node_ids=None, update_progress=None, check_for_cancel=None): drives = get_mounted_drives_with_channel_info() drive = drives[drive_id] # copy channel's db file then copy all the content files from storage dir available_channel_ids = [c["id"] for c in drive.metadata["channels"]] assert channel_id in available_channel_ids, "The given channel was not found in the drive." try: call_command( "importchannel", "local", channel_id, drive.datafolder, update_progress=update_progress, check_for_cancel=check_for_cancel ) call_command( "importcontent", "local", channel_id, drive.datafolder, node_ids=node_ids, update_progress=update_progress, check_for_cancel=check_for_cancel ) except UserCancelledError: try: call_command("deletechannel", channel_id, update_progress=update_progress) except CommandError: pass raise
def handle(self, *args, **options): if not TRANSTOOL_DL_URL or not TRANSTOOL_DL_KEY: raise CommandError('Please, set TRANSTOOL_DL_URL and TRANSTOOL_DL_KEY settings.') if options['mo_only'] and options['po_only']: raise CommandError('Use only --mo-only or --po-only but not both.') self.stdout.write('Download file: Send POST request to {}'.format(TRANSTOOL_DL_URL)) r = requests.post(TRANSTOOL_DL_URL, { 'key': TRANSTOOL_DL_KEY, 'po-only': str(int(options['po_only'])), 'mo-only': str(int(options['mo_only'])), }, stream=True) if r.status_code != 200: self.stdout.write('Request status code is not 200: {}'.format(r.status_code)) self.stdout.write('Fail.', ending='\n\n') sys.exit(1) file_content = BytesIO() for chunk in r.iter_content(chunk_size=(16 * 1024)): file_content.write(chunk) file_content.seek(0, os.SEEK_END) file_content_size = file_content.tell() self.stdout.write('Downloaded file {} {} bytes'.format(r.headers['Content-Type'], file_content_size)) if options['po_only']: exts = ['.po'] elif options['mo_only']: exts = ['.mo'] else: exts = ['.po', '.mo'] diff_info = self._get_diff_info(file_content, exts) if options['diff']: self.print_diff_info(diff_info) else: self.copy_files(diff_info, file_content) self.stdout.write('Done.', ending='\n\n')
def handle(self, **options): directory = options.get('directory', None) # directory key is already present in options which is None. name = options['name'] full_path = None if not directory: """ If directory is not provided then use DEFAULT_APPS_DIRECTORY. This block creates DEFAULT_APPS_DIRECTORY/app_name directory and pass that path to default startapp command. """ directory = os.path.join(settings.BASE_DIR, settings.DEFAULT_APPS_DIRECTORY) full_path = os.path.join(directory, name) if not os.path.exists(full_path): os.makedirs(full_path) options['directory'] = full_path try: super().handle(**options) if full_path: """ If apps directory is used then change the app name to DEFAULT_APPS_DIRECTORY.app_name in AppConfig """ apps_py = os.path.join(full_path, 'apps.py') if os.path.isfile(apps_py): for line in fileinput.input(apps_py, inplace=True): line = line.replace("'%s'" % name, "'%s.%s'" % (settings.DEFAULT_APPS_DIRECTORY, name)) print(line, end='') except CommandError: if full_path: shutil.rmtree(full_path) raise
def handle(self, *args, **options): try: trig = data_root().trigger_ids[options['trigger_id']] except KeyError: raise CommandError('Trigger %s not found' % options['trigger_id']) trig.run(access_context='local-cli')
def handle(self, *args, **options): try: client = data_root().clients[options['client']] except KeyError: raise CommandError('Client %s not found' % options['client']) for repository in data_root().repositories: if repository.name == options['repository']: break if repository.id.startswith(options['repository']): break if repository.url == options['repository']: break else: raise CommandError('Repository %s not found' % options['repository']) with open_repository(repository) as borg_repository: manifest, key = Manifest.load(borg_repository) with Cache(borg_repository, key, manifest, lock_wait=1) as cache: names = self.find_archives(manifest, options['archive'], regex=options['regex']) imported = 0 pi = ProgressIndicatorPercent(msg='Importing archives %4.1f %%: %s', total=len(names), step=0.1) for name in names: imported += self.import_archive(manifest, cache, repository, name, client) pi.show(info=[name]) pi.finish() print('Imported %d archives.' % imported, file=sys.stderr)
def find_archives(self, manifest, archive, regex): if regex: names = [] for name in manifest.archives: if re.fullmatch(archive, name): names.append(name) return names else: try: manifest.archives[archive] return [archive] except KeyError: raise CommandError('Archive %s not found' % archive)
def handle(self, *args, **options): if not options['queue_names']: raise CommandError('Queue names (--queues) not specified') queue_names = [queue_name.rstrip() for queue_name in options['queue_names'].split(',')] logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names))) sqs = boto3.resource( 'sqs', region_name=settings.AWS_REGION, config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) ) queues = [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names] logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names))) worker = WorkerFactory.default().create() while True: for queue in queues: messages = queue.receive_messages( MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES, WaitTimeSeconds=settings.WAIT_TIME_S, ) for msg in messages: self._process_message(msg, worker)
def handle(self, *args, **options): if not options['url']: raise CommandError('Worker endpoint url parameter (--url) not found') if not options['queue_name']: raise CommandError('Queue name (--queue) not specified') url = options['url'] queue_name = options['queue_name'] retry_limit = max(int(options['retry_limit']), 1) try: self.stdout.write('Connect to SQS') sqs = boto3.resource( 'sqs', region_name=settings.AWS_REGION, config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) ) queue = sqs.get_queue_by_name(QueueName=queue_name) self.stdout.write('> Connected') while True: messages = queue.receive_messages( MaxNumberOfMessages=1, WaitTimeSeconds=20 ) if len(messages) == 0: break for msg in messages: self.stdout.write('Deliver message {}'.format(msg.message_id)) if self._process_message_with_retry(url, retry_limit, msg): self.stdout.write('> Delivered') else: self.stdout.write('> Delivery failed (retry-limit reached)') msg.delete() self.stdout.write('Message processing finished') except ConnectionError: self.stdout.write('Connection to {} failed. Message processing failed'.format(url))
def handle(self, *args, **options): if options['file'] is None: raise CommandError('Missing --file') names = [] with open(options['file']) as fp: lines = [l for l in fp.read().split('\n') if l] for l in lines: if options['logins']: names.append(l.strip()) else: firstname, lastname = [f.strip() for f in l.split('\t')] names.append((firstname, lastname)) create_users(names, options)
def handle(self, *args, **options): try: if len(options.get('ussd_app_name')[0].split()) > 1: raise CommandError app_name = options.get('ussd_app_name')[0] call_command('startapp', app_name, template=path) except CommandError: print('Provide a valid django App Name as documented here: ' ' https://docs.djangoproject.com/en/1.10/ref/django-admin/')
def test_copy_bad_languages(self): out = StringIO() with self.assertRaises(CommandError) as command_error: management.call_command( 'cms', 'copy', 'lang', '--from-lang=it', '--to-lang=fr', interactive=False, stdout=out ) self.assertEqual(str(command_error.exception), 'Both languages have to be present in settings.LANGUAGES and settings.CMS_LANGUAGES')
def get_site(self, site_id): if site_id: try: return Site.objects.get(pk=site_id) except (ValueError, Site.DoesNotExist): raise CommandError('There is no site with given site id.') else: return None
def _validate_template(self, target): """ To ensure that the plugin template directory does exist. """ if not path.exists(target): raise CommandError( "Plugin template directory missing, reinstall biohub " "or just manually create the plugin.")
def _validate_plugin_name(self, plugin_name): """ To validate the given `plugin_name`. A `plugin_name` is considered valid if: + it's not empty. + it does not exist currently. Afterwards the attributes `self.plugin_name` and `self.label` will be set. """ if not plugin_name: raise CommandError("You must provide a plugin_name.") try: importlib.import_module(plugin_name) raise CommandError( "The plugin_name %r is conflicted with another module, " "please specify a new one." % plugin_name) except ImportError: pass self.label = plugin_name.rsplit('.', 1)[-1] self.plugin_name = plugin_name
def _ensure_path_exists(self, directory): try: os.makedirs(directory) except OSError as e: if e.errno == errno.EEXIST: if not path.isdir(directory): raise CommandError("'%s' already exists." % directory) else: raise CommandError(e)
def fail_installation(self): raise CommandError("Plugin '%s' cannot be properly installed." % self.plugin_name)
def test_tracking_disabled(self): """ Test whether datalogger can bij stopped by changing track setting. """ datalogger_settings = DataloggerSettings.get_solo() datalogger_settings.track = False datalogger_settings.save() # Datalogger should crash with error. with self.assertRaisesMessage(CommandError, 'Datalogger tracking is DISABLED!'): self._intercept_command_stdout('dsmr_datalogger')
def handle(self, **options): flow_path = options.get('flow_path') output = options.get('output') graph_type = options.get('graph_type') try: file_path, flow_name = flow_path[0].rsplit('.', 1) except ValueError as e: raise CommandError("Please, specify the full path to your flow.") from e try: flows_file = importlib.import_module(file_path) flow_cls = getattr(flows_file, flow_name) except ImportError as e: raise CommandError("Could not find file %s" % (file_path,)) from e except (AttributeError, TypeError) as e: raise CommandError("Could not find the flow with the name %s" % (flow_name,)) from e grid = chart.calc_layout_data(flow_cls) if graph_type == SVG: graph = chart.grid_to_svg(grid) if graph_type == BPMN: graph = chart.grid_to_bpmn(grid) if output != '': with open(output, 'w') as f: f.write(graph) else: self.stdout.write(graph)
def test_fingerprint_and_generate_flag_raises_error(self): out = StringIO() err = StringIO() rgx = re.compile(r'^You cannot specify fingerprints and --generate ' r'when running this command$') self.assertEquals(Key.objects.count(), 0) with self.assertRaisesRegex(CommandError, rgx): call_command('email_signing_key', TEST_KEY_FINGERPRINT, generate=True, stdout=out, stderr=err) self.assertEquals(out.getvalue(), '') self.assertEquals(err.getvalue(), '')
def test_command_raises_error_when_manifest_doesnt_exist(self, read_manifest_mock): with self.assertRaises(CommandError): execute_command('deleteredundantstatic', '--noinput')