Python django.core.management 模块,call_command() 实例源码

我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用django.core.management.call_command()

项目:DjangoCMS    作者:farhan711    | 项目源码 | 文件源码
def test_copy_filled_placeholder(self):
        """
        If an existing title in the target language has plugins in a placeholder
        that placeholder is skipped
        """
        site = 1
        number_start_plugins = CMSPlugin.objects.all().count()

        # create an empty title language
        root_page = Page.objects.on_site(site).get_home()
        create_title("de", "root page de", root_page)
        ph = root_page.placeholders.get(slot="body")
        add_plugin(ph, "TextPlugin", "de", body="Hello World")

        out = StringIO()
        management.call_command(
            'cms', 'copy', 'lang', '--from-lang=en', '--to-lang=de', interactive=False, stdout=out
        )

        self.assertEqual(CMSPlugin.objects.filter(language='en').count(), number_start_plugins)
        # one placeholder (with 7 plugins) is skipped, so the difference must be 6
        self.assertEqual(CMSPlugin.objects.filter(language='de').count(), number_start_plugins-6)
项目:tts-bug-bounty-dashboard    作者:18F    | 项目源码 | 文件源码
def call_runscheduler(loops=1, mock_call_command=None):
    ctx = {'sleep_count': 0}

    def fake_sleep(seconds):
        ctx['sleep_count'] += 1
        if ctx['sleep_count'] > loops:
            raise KeyboardInterrupt()

    if mock_call_command is None:
        mock_call_command = mock.MagicMock()

    with mock.patch.object(runscheduler, 'call_command', mock_call_command):
        with mock.patch.object(runscheduler, 'logger') as mock_logger:
            with mock.patch('time.sleep', fake_sleep):
                with pytest.raises(KeyboardInterrupt):
                    call_command('runscheduler')
            return mock_call_command, mock_logger
项目:wagtail-bakery    作者:moorinteractive    | 项目源码 | 文件源码
def test_page_has_redirect_response(site):
    redirect_page = RedirectPage.objects.create(
        depth=1,
        path='0002',
        title='Page',
        slug='page',
    )

    # Make redirect page the root page
    site.root_page = redirect_page
    site.save()

    # Build static files
    management.call_command('build', '--skip-static', '--skip-media')
    assert os.path.exists(os.path.join(settings.BUILD_DIR, 'index.html'))

    # Check if meta tag is present
    content = open(os.path.join(settings.BUILD_DIR, 'index.html')).read()
    assert '<meta http-equiv="refresh" content="1; url=http://www.example.com/">' in content # noqa
项目:django-gitversions    作者:michaelkuty    | 项目源码 | 文件源码
def test_04_versioner(self):

        # remove local files
        #versioner.backend.destroy()

        management.call_command('gitversions',
                                format='json',
                                indent=4)

        # check local uncomited changes
        self.assertEqual(versioner.backend.check(), True)

        versioner.backend.commit('Initial Commit')

        self.assertEqual(versioner.backend.check(), False)

        # cleanup
项目:vaultier    作者:Movile    | 项目源码 | 文件源码
def handle(self, *args, **options):
        print ">>> Initializing your database"
        try:
            management.call_command('syncdb')
            management.call_command('migrate')
            try:
                # do we need cache table?
                cache.get('', None)
            except ProgrammingError:
                # yes we do
                management.call_command('createcachetable', 'vaultier_cache')

            # public static files
            management.call_command('collectstatic', interactive=False)
        except OperationalError as e:
            msg = ">>> Your DB is not configured correctly: {}"
            print msg.format(e.message)
        else:
            if options.get('no_statistics'):
                task_statistics_collector.delay()
            print (">>> DB is initialized, you can now try to run Vaultier "
                   "using 'vaultier runserver'")
项目:oim-cms    作者:parksandwildlife    | 项目源码 | 文件源码
def user_logged_in_handler(sender, request, user, **kwargs):
    logging.debug('user_logged_in_handler')
    request.session.save()
    usersession, created = UserSession.objects.get_or_create(user=user, session_id=request.session.session_key)
    usersession.ip = get_ip(request)
    if DepartmentUser.objects.filter(email__iexact=user.email).exists():
        logging.debug('user_logged_in_handler departmentuser {}'.format(user.email))
        usersession.department_user = DepartmentUser.objects.filter(email__iexact=user.email)[0]
        if (user.username != usersession.department_user.username):
            test = get_user_model().objects.filter(username=usersession.department_user.username)
            if test.exists():
                test.delete()
            user.username = usersession.department_user.username
            user.save()
    usersession.save()
    logging.debug('user_logged_in_handler saving stuff')
    management.call_command("clearsessions", verbosity=0)
项目:tissuelab    作者:VirtualPlants    | 项目源码 | 文件源码
def test_clearsessions_command(self):
        """
        Test clearsessions command for clearing expired sessions.
        """
        self.assertEqual(0, Session.objects.count())

        # One object in the future
        self.session['foo'] = 'bar'
        self.session.set_expiry(3600)
        self.session.save()

        # One object in the past
        other_session = self.backend()
        other_session['foo'] = 'bar'
        other_session.set_expiry(-3600)
        other_session.save()

        # Two sessions are in the database before clearsessions...
        self.assertEqual(2, Session.objects.count())
        management.call_command('clearsessions')
        # ... and one is deleted.
        self.assertEqual(1, Session.objects.count())
项目:openkamer    作者:openkamer    | 项目源码 | 文件源码
def create_json_dump():
        filepath = os.path.join(settings.DBBACKUP_STORAGE_OPTIONS['location'], 'openkamer-' + str(datetime.date.today()) + '.json')
        filepath_compressed = filepath + '.gz'
        with open(filepath, 'w') as fileout:
            management.call_command(
                'dumpdata',
                '--all',
                '--natural-foreign',
                '--exclude', 'auth.permission',
                '--exclude', 'contenttypes',
                'person',
                'parliament',
                'government',
                'document',
                'stats',
                'website',
                stdout=fileout
            )
        with open(filepath, 'rb') as f_in:
            with gzip.open(filepath_compressed, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        os.remove(filepath)
        BackupDaily.remove_old_json_dumps(days_old=30)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_createadmin_prompts_for_password_if_not_given(self):
        stderr = StringIO()
        stdout = StringIO()
        username = factory.make_name('user')
        password = factory.make_string()
        ssh_import = "%s:%s" % (
            random.choice([KEYS_PROTOCOL_TYPE.LP, KEYS_PROTOCOL_TYPE.GH]),
            factory.make_name('user-id'))
        email = factory.make_email_address()
        self.patch(createadmin, 'prompt_for_password').return_value = password
        self.patch(keysource_module.KeySource, 'import_keys')

        call_command(
            'createadmin', username=username, email=email,
            ssh_import=ssh_import, stdout=stdout, stderr=stderr)
        user = User.objects.get(username=username)

        self.assertThat(stderr, IsEmpty)
        self.assertThat(stdout, IsEmpty)
        self.assertTrue(user.check_password(password))
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_createadmin_prompts_for_username_if_not_given(self):
        stderr = StringIO()
        stdout = StringIO()
        username = factory.make_name('user')
        password = factory.make_string()
        email = factory.make_email_address()
        ssh_import = "%s:%s" % (
            random.choice([KEYS_PROTOCOL_TYPE.LP, KEYS_PROTOCOL_TYPE.GH]),
            factory.make_name('user-id'))
        self.patch(createadmin, 'prompt_for_username').return_value = username
        self.patch(keysource_module.KeySource, 'import_keys')

        call_command(
            'createadmin', password=password, email=email,
            ssh_import=ssh_import, stdout=stdout, stderr=stderr)
        user = User.objects.get(username=username)

        self.assertThat(stderr, IsEmpty)
        self.assertThat(stdout, IsEmpty)
        self.assertTrue(user.check_password(password))
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_createadmin_prompts_for_email_if_not_given(self):
        stderr = StringIO()
        stdout = StringIO()
        username = factory.make_name('user')
        password = factory.make_string()
        email = factory.make_email_address()
        ssh_import = "%s:%s" % (
            random.choice([KEYS_PROTOCOL_TYPE.LP, KEYS_PROTOCOL_TYPE.GH]),
            factory.make_name('user-id'))
        self.patch(createadmin, 'prompt_for_email').return_value = email
        self.patch(keysource_module.KeySource, 'import_keys')

        call_command(
            'createadmin', username=username, password=password,
            ssh_import=ssh_import, stdout=stdout, stderr=stderr)
        user = User.objects.get(username=username)

        self.assertThat(stderr, IsEmpty)
        self.assertThat(stdout, IsEmpty)
        self.assertTrue(user.check_password(password))
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_createadmin_prompts_for_ssh_import_if_not_given(self):
        stderr = StringIO()
        stdout = StringIO()
        username = factory.make_name('user')
        password = factory.make_string()
        email = factory.make_email_address()
        ssh_import = "%s:%s" % (
            random.choice([KEYS_PROTOCOL_TYPE.LP, KEYS_PROTOCOL_TYPE.GH]),
            factory.make_name('user-id'))
        self.patch(
            createadmin, 'prompt_for_ssh_import').return_value = ssh_import
        self.patch(keysource_module.KeySource, 'import_keys')

        call_command(
            'createadmin', username=username, password=password, email=email,
            stdout=stdout, stderr=stderr)
        user = User.objects.get(username=username)

        self.assertThat(stderr, IsEmpty)
        self.assertThat(stdout, IsEmpty)
        self.assertTrue(user.check_password(password))
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_apikey_generates_key(self):
        stderr = StringIO()
        stdout = StringIO()
        user = factory.make_User()
        num_keys = len(user.userprofile.get_authorisation_tokens())
        call_command(
            'apikey', username=user.username, generate=True, stderr=stderr,
            stdout=stdout)
        self.assertThat(stderr, IsEmpty)
        keys_after = user.userprofile.get_authorisation_tokens()
        expected_num_keys = num_keys + 1
        self.assertEqual(expected_num_keys, len(keys_after))
        expected_token = user.userprofile.get_authorisation_tokens()[1]
        expected_string = convert_tuple_to_string(
            get_creds_tuple(expected_token)) + '\n'
        self.assertEqual(expected_string, stdout.getvalue())
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_api_key_rejects_deletion_of_nonexistent_key(self):
        stderr = StringIO()
        user = factory.make_User()
        existing_token = get_one(
            user.userprofile.get_authorisation_tokens())
        token_string = convert_tuple_to_string(
            get_creds_tuple(existing_token))
        call_command(
            'apikey', username=user.username, delete=token_string,
            stderr=stderr)
        self.assertThat(stderr, IsEmpty)

        # Delete it again. Check that there's a sensible rejection.
        error_text = assertCommandErrors(
            self, 'apikey', username=user.username, delete=token_string)
        self.assertIn(
            "No matching api key found", error_text)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_api_key_rejects_update_of_nonexistent_key(self):
        stderr = StringIO()
        user = factory.make_User()
        fake_api_key_name = "Test Key Name"
        existing_token = get_one(
            user.userprofile.get_authorisation_tokens())
        token_string = convert_tuple_to_string(
            get_creds_tuple(existing_token))
        call_command(
            'apikey', username=user.username, delete=token_string,
            stderr=stderr)
        self.assertThat(stderr, IsEmpty)

        # Try to update the deleted token.
        error_text = assertCommandErrors(
            self, 'apikey', username=user.username, update=token_string,
            api_key_name=fake_api_key_name)
        self.assertIn(
            "No matching api key found", error_text)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_django_run_renames_piston_tables_if_piston_tables_exists(self):
        self.patch(
            dbupgrade_command, "_south_was_performed").return_value = True
        self.patch(dbupgrade_command, "_find_tables").return_value = [
            "piston_consumer",
            "piston_token",
        ]
        mock_rename = self.patch(
            dbupgrade_command, "_rename_piston_to_piston3")
        mock_call = self.patch(dbupgrade_module, "call_command")
        call_command('dbupgrade', django=True)
        self.assertThat(
            mock_rename, MockCalledOnceWith("default", ["consumer", "token"]))
        self.assertThat(
            mock_call, MockCalledOnceWith(
                "migrate", interactive=False, fake_initial=True))
项目:django-onmydesk    作者:knowledge4life    | 项目源码 | 文件源码
def test_call_must_have_first_and_last_message_correct(self):
        scheduler = Scheduler(report='my_report_class',
                              periodicity=Scheduler.PER_MON_SUN)
        scheduler.save()

        out = StringIO()
        management.call_command('scheduler_process', stdout=out)

        first_line, last_line, blank_line = (
            out.getvalue().split('\n')[0],
            out.getvalue().split('\n')[-2],
            out.getvalue().split('\n')[-1]
        )

        first_message = 'Starting scheduler process'
        last_message = 'Scheduler #{} processed'.format(scheduler.id)

        self.assertIn(first_message, first_line)
        self.assertIn(last_message, last_line)
        self.assertEqual(blank_line, '')
项目:balafon    作者:ljean    | 项目源码 | 文件源码
def test_find_same_as_with_group(self):

        contact1 = mommy.make(models.Contact, firstname="John", lastname="Lennon")
        contact2 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
        contact3 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")

        buf = StringIO()
        sysout = sys.stdout
        sys.stdout = buf
        management.call_command('find_same_as', "SameAs", verbosity=0, interactive=False, stdout=buf)
        buf.seek(0, 0)
        sys.stdout = sysout
        self.assertEqual(2, len(buf.readlines()))
        qs = models.Group.objects.filter(name="SameAs")
        self.assertEqual(1, qs.count())
        self.assertEqual(qs[0].contacts.count(), 2)
        self.assertFalse(contact1 in qs[0].contacts.all())
        self.assertTrue(contact2 in qs[0].contacts.all())
        self.assertTrue(contact3 in qs[0].contacts.all())
项目:balafon    作者:ljean    | 项目源码 | 文件源码
def test_find_same_as_with_existing_group(self):

        contact1 = mommy.make(models.Contact, firstname="John", lastname="Lennon")
        contact2 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
        contact3 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")

        gr = models.Group.objects.create(name="SameAs")
        gr.contacts.add(contact1)
        gr.save()

        buf = StringIO()
        sysout = sys.stdout
        sys.stdout = buf
        management.call_command('find_same_as', "SameAs", verbosity=0, interactive=False, stdout=buf)
        buf.seek(0, 0)
        sys.stdout = sysout
        self.assertEqual(2, len(buf.readlines()))

        qs = models.Group.objects.filter(name="SameAs")
        self.assertEqual(1, qs.count())
        self.assertEqual(qs[0].contacts.count(), 3)
        self.assertTrue(contact1 in qs[0].contacts.all())
        self.assertTrue(contact2 in qs[0].contacts.all())
        self.assertTrue(contact3 in qs[0].contacts.all())
项目:balafon    作者:ljean    | 项目源码 | 文件源码
def test_find_same_as_with_no_name(self):

        contact1 = mommy.make(models.Contact, firstname="John", lastname="Lennon")
        contact2 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
        contact3 = mommy.make(models.Contact, firstname="Paul", lastname="McCartney")
        contact4 = mommy.make(models.Contact, firstname="", lastname="")
        contact5 = mommy.make(models.Contact, firstname="", lastname="")

        buf = StringIO()
        sysout = sys.stdout
        sys.stdout = buf
        management.call_command('find_same_as', "SameAs", verbosity=0, interactive=False, stdout=buf)
        buf.seek(0, 0)
        sys.stdout = sysout
        self.assertEqual(2, len(buf.readlines()))

        qs = models.Group.objects.filter(name="SameAs")
        self.assertEqual(1, qs.count())
        self.assertEqual(qs[0].contacts.count(), 2)
        self.assertFalse(contact1 in qs[0].contacts.all())
        self.assertTrue(contact2 in qs[0].contacts.all())
        self.assertTrue(contact3 in qs[0].contacts.all())
        self.assertFalse(contact4 in qs[0].contacts.all())
        self.assertFalse(contact5 in qs[0].contacts.all())
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def forward(apps, schema_editor):
    # Removed as it's causing exceptions and is not needed except for old nodes
    # which should have updated by now
    # call_command("runscript", "0016_populate_content_rendered")
    pass
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def test_startdash_usage_empty(self):
        self.assertRaises(CommandError, call_command, 'startdash')
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def test_startdash_usage_correct(self, handle):
        call_command('startdash', 'test_dash')

        handle.assert_called_with(dash_name='test_dash',
                                  extensions=["py", "tmpl", "html", "js",
                                              "css"],
                                  files=[], no_color=False, pythonpath=None,
                                  settings=None, skip_checks=True, target=None,
                                  template=None, traceback=False, verbosity=1)
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def test_startpanel_usage_empty(self):
        self.assertRaises(CommandError, call_command, 'startpanel')
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def test_startpanel_usage_correct(self, handle):
        call_command('startpanel', 'test_dash', '--dashboard=foo.bar')

        handle.assert_called_with(panel_name='test_dash', dashboard='foo.bar',
                                  extensions=["py", "tmpl", "html"],
                                  files=[], no_color=False, pythonpath=None,
                                  settings=None, skip_checks=True, target=None,
                                  template=None, traceback=False, verbosity=1)
项目:offenewahlen-nrw17    作者:OKFNat    | 项目源码 | 文件源码
def setUp(self):
        """
        Set up the test class.

        For the import to run an election has to exist in the
        database.
        """

        call_command('import_basedata')
        test_path = setup_path = os.path.dirname(os.path.realpath(__file__))
        results_file = test_path + '/data/example_01.json'
        mapping_file = test_path + '/data/example_config.json'
        call_command('import_results', results_file, mapping_file)
项目:offenewahlen-nrw17    作者:OKFNat    | 项目源码 | 文件源码
def test_import_json_result_data_with_mapping(self):
        """
        Tests an json import of results.
        """

        number_of_results = RawData.objects.count()
        self.assertEqual(number_of_results, 1)

        number_of_results = PollingStationResult.objects.count()
        self.assertEqual(number_of_results, 4)

        # call_command('import_results', local_data_file, location='local',
        #   file_type='json', mapping_file=mapping_file)
        # number_of_results = RawData.objects.count()
        # self.assertEqual(number_of_results, 2)

    #def test_import_base_data(self):
    #
    #   number_of_results = RawData.objects.count()
    #   self.assertEqual()

    # def test_import_xml_result_data_with_mapping(self):
    #   """
    #   Tests an xml import.
    #   """
    #   test_path = os.path.dirname(os.path.realpath(__file__))
    #   local_data_file = test_path + '/data/example_01.xml'
    #   mapping_file = test_path + '/data/example_mapping.json'

    #   call_command('import_results', local_data_file, location='local',
    #       file_type='xml', mapping_file=mapping_file)
    #   number_of_results = RawData.objects.count()
    #   self.assertEqual(number_of_results, 1)

    #   call_command('import_results', local_data_file, location='local',
    #       file_type='xml', mapping_file=mapping_file)
    #   number_of_results = RawData.objects.count()
    #   self.assertEqual(number_of_results, 2)
项目:Dwarf    作者:Dwarf-Community    | 项目源码 | 文件源码
def sync_database():
        management.call_command('makemigrations', 'dwarf')
        management.call_command('migrate', 'dwarf')
项目:mccelections    作者:mcclatchy    | 项目源码 | 文件源码
def calculate_vote(sender, instance, **kwargs):
    ## don't fire if it's a gdoc result
    if instance.gdoc_import != True:
        electiondate_arg = str(instance.electiondate)
        call_command('calculate_vote', electiondate_arg)
        ## needs to be written <-- not doing this bc it would mess with manual results w/ multiple winners, winners that req 2/3, etc
        # call_command('declare_winner')


## NEEDS TO STAY models.model so PSQL import works
项目:mccelections    作者:mcclatchy    | 项目源码 | 文件源码
def snapshot(electiondate_string):
    ## *** need to have this on S3 or other external location to not fill up server ***
        ## NOTE: instead of saving anew, just use/copy the tmp file?
        ## should we snapshot manual results from gdoc? use separate parent dir to avoid conflict
    if mccelectionsenv == "local":
        # call_command('snapshot_results')
        ## copy and rename results.csv w/ cp command
        file_path = os.environ["SAVER_PATH"]
        orgin = file_path + "/tmp/results.csv"
        now = timezone.localtime(timezone.now())
        save_date = now.date()
        save_date_string = str(save_date)
        timestamp = now.strftime('%Y-%m-%d_%H-%M-%S')

        snapshot_filename = "results%s.csv" % (timestamp)
        destination_dir = "%s/%s/%s" % (file_path, electiondate_string, save_date_string)
        destination = "%s/%s" % (destination_dir, snapshot_filename)

        mkdir = "mkdir -p %s" % (destination_dir)
        snapshot = "cp %s %s" % (origin, destination)

        ## making the dir, if it's not there
        call(mkdir, shell=True)
        message = "Making new directory, if needed:\n%s" % (destination_dir)
        slackbot(message)
        ## actual snapshot executed
        call(snapshot, shell=True)
        message = "Snapshotting"
        slackbot(message)
    # else:
        # snapshot to S3
项目:mccelections    作者:mcclatchy    | 项目源码 | 文件源码
def snapshot_local(electiondate_string):
    ## for efficiency, we just copy the tmp file for local snapshots instead of saving anew
    ## this is only for local bc otherwise it would fill up the servers
        ## S3 snapshot would be needed for test/prod
    ## should we snapshot manual results from gdoc? use separate parent dir to avoid conflict

    # call_command('snapshot_results')

    ## copy and rename results.csv w/ cp command
    file_path = os.environ["SAVER_PATH"]
    origin = file_path + "/tmp/results.csv"
    now = timezone.localtime(timezone.now())
    save_date = now.date()
    save_date_string = str(save_date)
    timestamp = now.strftime('%Y-%m-%d_%H-%M-%S')

    snapshot_filename = "results%s.csv" % (timestamp)
    destination_dir = "%s/%s/%s" % (file_path, electiondate_string, save_date_string)
    destination = "%s/%s" % (destination_dir, snapshot_filename)

    message = "\nSNAPSHOTTING\nSaving to the following directory:\n%s\n" % (destination_dir)
    slackbot(message)

    ## making the dir, if it's not there
    if not os.path.exists(destination_dir):
        os.makedirs(destination_dir)
    ## copy the file
    copyfile(origin, destination)

    message = "File copied for snapshot\n"
    slackbot(message)
项目:mccelections    作者:mcclatchy    | 项目源码 | 文件源码
def download_elections():
    call_command('download_elections')

## check if there's an Election today; if so, start checking every minute whether to set live and start import
项目:django-ohio-voter-file    作者:hodgesmr    | 项目源码 | 文件源码
def handle(self, **kwargs):
        message = ('\nThis command will completely wipe your database and '
                   'download & parse the latest Ohio Voter File data. The '
                   'process can take minutes or hours depending on your '
                   'machine. Continue? (y/n): ')

        answer = input(message)

        if answer == 'y':
            # start fresh
            management.call_command('flush', interactive=False)
            management.call_command('migrate', interactive=False)

            print('\nDownloading and parsing county data. This will take a while...')

            with tempfile.TemporaryDirectory() as tmpdirname:
                num_cpus = cpu_count()

                pool = Pool(num_cpus)

                args = [(county, tmpdirname) for county in COUNTIES]

                print('Downloading county data...')
                pool.starmap(self.download_county_data, args)

                print('Importing county data...')
                pool.starmap(self.load_county_data_into_db, args)

            print('\nDone!')
项目:django-url-migration    作者:socialwifi    | 项目源码 | 文件源码
def test_if_url_mapping_is_removed(self):
        log = factories.LastUsageLogFactory(used_date=timezone.datetime(1999, 12, 10, 22, 11, tzinfo=pytz.utc))
        factories.UrlMappingFactory(last_usage=log, id=199)
        management.call_command('remove_expired_redirects')
        self.assertFalse(models.UrlMapping.objects.filter(pk=199).exists())
项目:django-url-migration    作者:socialwifi    | 项目源码 | 文件源码
def test_if_regexp_mapping_is_removed(self):
        log = factories.LastUsageLogFactory(used_date=timezone.datetime(1999, 12, 10, 22, 11, tzinfo=pytz.utc))
        factories.UrlRegexpMappingFactory(last_usage=log, id=233)
        management.call_command('remove_expired_redirects')
        self.assertFalse(models.UrlRegexpMapping.objects.filter(pk=233).exists())
项目:django-url-migration    作者:socialwifi    | 项目源码 | 文件源码
def test_if_used_mapping_is_not_removed(self):
        log = factories.LastUsageLogFactory(used_date=timezone.datetime(2001, 12, 10, 22, 11, tzinfo=pytz.utc))
        factories.UrlMappingFactory(last_usage=log, id=344)
        management.call_command('remove_expired_redirects')
        self.assertTrue(models.UrlMapping.objects.filter(pk=344).exists())
项目:django-url-migration    作者:socialwifi    | 项目源码 | 文件源码
def test_if_used_regexp_mapping_is_not_removed(self):
        log = factories.LastUsageLogFactory(used_date=timezone.datetime(2001, 12, 10, 22, 11, tzinfo=pytz.utc))
        factories.UrlRegexpMappingFactory(last_usage=log, id=422)
        management.call_command('remove_expired_redirects')
        self.assertTrue(models.UrlRegexpMapping.objects.filter(pk=422).exists())
项目:django-url-migration    作者:socialwifi    | 项目源码 | 文件源码
def test_if_unused_mapping_is_not_removed(self):
        factories.UrlMappingFactory(id=566)
        management.call_command('remove_expired_redirects')
        self.assertTrue(models.UrlMapping.objects.filter(pk=566).exists())
项目:django-url-migration    作者:socialwifi    | 项目源码 | 文件源码
def test_if_removing_regexp_mapping_does_not_remove_generated_mappings(self):
        log = factories.LastUsageLogFactory(used_date=timezone.datetime(1999, 12, 10, 22, 11, tzinfo=pytz.utc))
        mapping = factories.UrlRegexpMappingFactory(last_usage=log, id=988)
        factories.RegexpGeneratedMappingFactory(regexp=mapping, id=1022)
        management.call_command('remove_expired_redirects')
        self.assertTrue(models.RegexpGeneratedMapping.objects.filter(pk=1022).exists())
项目:prngmgr    作者:wolcomm    | 项目源码 | 文件源码
def handle(self, *args, **options):
        """Handle command request."""
        self.stdout.write(self.style.NOTICE('Syncing PDB'))
        management.call_command('pdb_sync')
        self.stdout.write(self.style.NOTICE('Rebuilding session data'))
        management.call_command('prngmgr-sync-sessions', *args, **options)
        self.stdout.write(self.style.NOTICE('Sync complete'))
项目:django-composite-foreignkey    作者:onysos    | 项目源码 | 文件源码
def test_graph_data(self):
        out = StringIO()
        call_command("graph_datas", "testapp", stdout=out)

        result = out.getvalue()
        # print(result)
        self.assertEqual("""digraph items_in_db {
{ rank=same; address_1;address_2;address_3; }
{ rank=same; representant_1;representant_2; }
{ rank=same; customer_1;customer_2;customer_3;customer_4;customer_5; }
{ rank=same; contact_1;contact_2; }
{ rank=same; phonenumber_1; }
address_1;
address_2;
address_3;
representant_1;
representant_2;
customer_1;
customer_1  -> address_1;
customer_1  -> representant_1;
customer_2;
customer_3;
customer_3  -> representant_2;
customer_4;
customer_5;
contact_1;
contact_1  -> customer_3;
contact_2;
contact_2  -> customer_1;
phonenumber_1;
phonenumber_1  -> contact_2;
}
""", result)
项目:django-composite-foreignkey    作者:onysos    | 项目源码 | 文件源码
def test_app_not_exists(self):
        self.assertRaises(CommandError, call_command, "graph_datas", "doesnotexistsapp")
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def test_captcha_create_pool(self):
        CaptchaStore.objects.all().delete()  # Delete objects created during SetUp
        POOL_SIZE = 10
        management.call_command('captcha_create_pool', pool_size=POOL_SIZE, verbosity=0)
        self.assertEqual(CaptchaStore.objects.count(), POOL_SIZE)
项目:OneStack    作者:itpubs    | 项目源码 | 文件源码
def handle(self, *args, **options):
        # self.execute('pip install django')
        # management.call_command('runserver')
        subprocess.call('pip install -r requirements/dev.txt', shell=True)
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def makemigrations():
    management.call_command('makemigrations')
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def handle(self, *args, **options):
        for bid in args:
            try:
                b = Build.objects.get(pk = bid)
            except ObjectDoesNotExist:
                print('build %s does not exist, skipping...' %(bid))
                continue
            # theoretically, just b.delete() would suffice
            # however SQLite runs into problems when you try to
            # delete too many rows at once, so we delete some direct
            # relationships from Build manually.
            for t in b.target_set.all():
                t.delete()
            for t in b.task_build.all():
                t.delete()
            for p in b.package_set.all():
                p.delete()
            for lv in b.layer_version_build.all():
                lv.delete()
            for v in b.variable_build.all():
                v.delete()
            for l in b.logmessage_set.all():
                l.delete()

            # delete the build; some databases might have had problem with migration of the bldcontrol app
            retry_count = 0
            need_bldcontrol_migration = False
            while True:
                if retry_count >= 5:
                    break
                retry_count += 1
                if need_bldcontrol_migration:
                    from django.core import management
                    management.call_command('migrate', 'bldcontrol', interactive=False)

                try:
                    b.delete()
                    break
                except OperationalError as e:
                    # execute migrations
                    need_bldcontrol_migration = True
项目:intake    作者:codeforamerica    | 项目源码 | 文件源码
def handle(self, *args, **kwargs):
        management.call_command('migrate')
        management.call_command('load_essential_data')