我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用django.utils.six.StringIO()。
def __init__(self, testcases_to_run, nr_of_tests, all_should_pass=True, print_bad=True, runner_options=None): runner_options = runner_options or {} self.nr_of_tests = nr_of_tests self.all_should_pass = all_should_pass self.print_bad = print_bad django_runner_cls = get_runner(settings) django_runner = django_runner_cls(**runner_options) self.suite = django_runner.test_suite() for testcase_cls in testcases_to_run: tests = django_runner.test_loader.loadTestsFromTestCase( testcase_cls) self.suite.addTests(tests) self.test_runner = django_runner.test_runner( resultclass=django_runner.get_resultclass(), stream=six.StringIO() )
def test_import_single_set(self): out = StringIO() call_command(self.command, 'SOM', stdout=out) self.assertEqual( "Beginning import of 1 set (SOM).\n" "Import complete.\n" "Added 1 new Set, {} new Cards, and {} new Printings.\n".format( SOM_CARDS, SOM_PRINTINGS), out.getvalue() ) self.assertEqual(Set.objects.count(), 1) scars = Set.objects.first() self.assertEqual(scars.name, "Scars of Mirrodin") self.assertEqual(scars.code, "SOM") self.assertEqual(Card.objects.count(), SOM_CARDS) self.assertEqual(Printing.objects.count(), SOM_PRINTINGS) self.check_common_set_constraints()
def test_progress_bar_console_no_eta(self): stream = StringIO() console = Console(stream) progress_bar = ProgressBar(console) for i in range(1, 51): progress_bar.progress(i, 50) output = stream.getvalue() self.assertEqual( [ln.strip() for ln in output.split('\r')], [ '', '[##### ] 20.00% (10 / 50) ETA: ? |', '[######### ] 40.00% (20 / 50) ETA: ? /', '[############# ] 60.00% (30 / 50) ETA: ? -', '[################# ] 80.00% (40 / 50) ETA: ? \\', '[#####################] 100.00% (50 / 50) ETA: 0:00:00' ] )
def test_progress_bar_stream_no_eta_custom(self): stream = StringIO() progress_bar = ProgressBar( stream, template='$progress $percent ($current / $total)', progress_brick='*', progress_num_bricks=5 ) for i in range(1, 51): progress_bar.progress(i, 50) output = stream.getvalue() self.assertEqual( [ln.strip() for ln in output.split('\r')], [ '', '[* ] 20.00% (10 / 50)', '[** ] 40.00% (20 / 50)', '[*** ] 60.00% (30 / 50)', '[**** ] 80.00% (40 / 50)', '[*****] 100.00% (50 / 50)' ] )
def test_show_usage_yearly(self): out = StringIO() call_command('show_usage', '--yearly', '--subscription-start={:%m/%d/%Y}'.format( self.subscription_start), '--ascending', stdout=out) assert out.getvalue().split() == ''' year start end registered activated active ------ ---------- ---------- ------------ ----------- -------- Y01 2010-01-24 2011-01-24 1 1 1 Y02 2011-01-24 2012-01-24 2 2 1 Y03 2012-01-24 2013-01-24 3 3 1 Y04 2013-01-24 2014-01-24 4 4 1 Y05 2014-01-24 2015-01-24 5 5 1 Y06 2015-01-24 2016-01-24 7 7 2 Y07 2016-01-24 2017-01-24 8 8 1 Y08 2017-01-24 2017-04-04 9 9 1 '''.split()
def test_import_command(self): out = StringIO() sys.stout = out args = [] call_command('import', *args, stdout=out) self.assertIn('Successfully Imported People DataQualityIssues into DB', out.getvalue()) self.assertIn( 'Successfully Imported Organization DataQualityIssues into DB', out.getvalue()) self.assertIn('Successfully Imported Post DataQualityIssues into DB', out.getvalue()) self.assertIn('Successfully Imported Membership DataQualityIssues ' 'into DB', out.getvalue()) self.assertIn( 'Successfully Imported VoteEvent DataQualityIssues into DB', out.getvalue()) self.assertIn( 'Successfully Imported Bill DataQualityIssues into DB', out.getvalue())
def test_log_student_updated_added_event(self): """Check logging signal for newly created student""" # add own root handler to catch student signals output out = StringIO() handler = logging.StreamHandler(out) logging.root.addHandler(handler) # now create student, this should raise new message inside # our logger output file student = Student(first_name='Demo', last_name='Student') student.save() # check output file content out.seek(0) self.assertEqual(out.readlines()[-1], 'Student Demo Student added (ID: %d)\n' % student.id) # now update existing student and check last line in out student.ticket = '12345' student.save() out.seek(0) self.assertEqual(out.readlines()[-1], 'Student Demo Student updated (ID: %d)\n' % student.id) # remove our handler from root logger logging.root.removeHandler(handler)
def test_log_student_deleted_event(self): """Check logging signals for deleted student""" student = Student(first_name='Demo', last_name='Student') student.save() # now override signal # add own root handler to catch student signals output out = StringIO() handler = logging.StreamHandler(out) logging.root.addHandler(handler) # delete existing student and check logger output sid = student.id student.delete() out.seek(0) self.assertEqual(out.readlines()[-1], 'Student Demo Student deleted (ID: %d)\n' % sid) # remove our handler from root logger logging.root.removeHandler(handler)
def test_log_group_updated_added_event(self): """Check logging group for newly created student""" # add own root handler to catch student signals output out = StringIO() handler = logging.StreamHandler(out) logging.root.addHandler(handler) # now create group, this should raise new message inside # our logger output file group = Group(title='Demo Group 1') group.save() # check output file content out.seek(0) self.assertEqual(out.readlines()[-1], 'Group Demo Group 1 added (ID: %d)\n' % group.id) # now update existing group and check last line in out group.title = 'Demo Group 2' group.save() out.seek(0) self.assertEqual(out.readlines()[-1], 'Group Demo Group 2 updated (ID: %d)\n' % group.id) # remove our handler from root logger logging.root.removeHandler(handler)
def test_delete_create_populate_commands(self): out = StringIO() self.assertTrue(ad_index.exists()) self.assertTrue(car_index.exists()) call_command('search_index', action='delete', force=True, stdout=out, models=['tests.ad']) self.assertFalse(ad_index.exists()) self.assertTrue(car_index.exists()) call_command('search_index', action='create', models=['tests.ad'], stdout=out) self.assertTrue(ad_index.exists()) result = AdDocument().search().execute() self.assertEqual(len(result), 0) call_command('search_index', action='populate', models=['tests.ad'], stdout=out) result = AdDocument().search().execute() self.assertEqual(len(result), 2)
def setUp(self): self.out = StringIO() self.registry = DocumentRegistry() self.index_a = Mock() self.index_b = Mock() self.doc_a1_qs = Mock() self.doc_a1 = self._generate_doc_mock( self.ModelA, self.index_a, self.doc_a1_qs ) self.doc_a2_qs = Mock() self.doc_a2 = self._generate_doc_mock( self.ModelA, self.index_a, self.doc_a2_qs ) self.doc_b1_qs = Mock() self.doc_b1 = self._generate_doc_mock( self.ModelB, self.index_a, self.doc_b1_qs ) self.doc_c1_qs = Mock() self.doc_c1 = self._generate_doc_mock( self.ModelC, self.index_b, self.doc_c1_qs )
def validate(self, app=None, display_num_errors=False): """ Validates the given app, raising CommandError for any errors. If app is None, then this will validate all installed apps. """ from django.core.management.validation import get_validation_errors s = StringIO() num_errors = get_validation_errors(s, app) if num_errors: s.seek(0) error_text = s.read() raise CommandError("One or more models did not validate:\n%s" % error_text) if display_num_errors: self.stdout.write("%s error%s found" % (num_errors, '' if num_errors == 1 else 's'))
def test_createsuperuser_non_ascii_verbose_name(self): # Aliased so the string doesn't get extracted from django.utils.translation import ugettext_lazy as ulazy username_field = User._meta.get_field('username') old_verbose_name = username_field.verbose_name username_field.verbose_name = ulazy('uživatel') new_io = StringIO() try: call_command("createsuperuser", interactive=True, stdout=new_io ) finally: username_field.verbose_name = old_verbose_name command_output = new_io.getvalue().strip() self.assertEqual(command_output, 'Superuser created successfully.')
def test_report_printed_includes_all_needed_data(): report = WorstReport() report.handle_results_collected( signal=None, sender=FakeSender('id 2', 'querycount'), results=[ NameValueResult(name='nine', value=9)], context={'foo': 'bar'}) report.handle_results_collected( signal=None, sender=FakeSender('id 1', 'querycount'), results=[NameValueResult(name='two', value=2)], context={'test': 'some.app.tests.TestCase.test_foo'}) stream = six.StringIO() report.render(stream) assert stream.getvalue().strip() == expected_report_data
def test_report_prints_nothing_when_there_is_no_data(): report = WorstReport() stream = six.StringIO() report.render(stream) assert stream.getvalue() == ''
def get_command_after_run(): cmd = WorstReportCommand(stdout=six.StringIO()) cmd.handle() return cmd
def rendered(self): out = six.StringIO() self.render(out) return out.getvalue()
def test_write(self): stream = StringIO() console = Console(stream) console.write('The quick brown fox jumps over the lazy dog') output = stream.getvalue() self.assertEqual( repr(output), repr('The quick brown fox jumps over the lazy dog') )
def test_new_line(self): stream = StringIO() console = Console(stream) console.new_line('The quick brown fox jumps over the lazy dog') output = stream.getvalue() self.assertEqual( repr(output), repr( '\r' 'The quick brown fox jumps over the lazy dog' + ' ' * 37 + '\n' ) )
def test_same_line(self): stream = StringIO() console = Console(stream) console.same_line('The quick brown fox jumps over the lazy dog') output = stream.getvalue() self.assertEqual( repr(output), repr( '\r' 'The quick brown fox jumps over the lazy dog' + ' ' * 37 ) )
def test_info(self): stream = StringIO() console = Console(stream) console.info('The quick brown fox jumps over the lazy dog') output = stream.getvalue() self.assertEqual( repr(output), repr( '\r' 'The quick brown fox jumps over the lazy dog' + ' ' * 37 + '\n' ) )
def test_warning(self): stream = StringIO() console = Console(stream) console.warning('The quick brown fox jumps over the lazy dog') output = stream.getvalue() self.assertEqual( repr(output), repr( '\r\x1b[33m' 'The quick brown fox jumps over the lazy dog' '\x1b[39m' + ' ' * 27 + '\n' ) )
def test_success(self): stream = StringIO() console = Console(stream) console.success('The quick brown fox jumps over the lazy dog') output = stream.getvalue() self.assertEqual( repr(output), repr( '\r\x1b[32m' 'The quick brown fox jumps over the lazy dog' '\x1b[39m' + ' ' * 27 + '\n' ) )
def test_update_index_command(self): self.backend.reset_index() results = self.backend.search(None, SearchTest) # We find results anyway because we searched for nothing. self.assertSetEqual(set(results), {self.testa, self.testb, self.testc.searchtest_ptr, self.testd.searchtest_ptr}) # But now, we can't find anything because the index is empty. results = self.backend.search('hello', SearchTest) self.assertSetEqual(set(results), set()) results = self.backend.search('world', SearchTest) self.assertSetEqual(set(results), set()) # Run update_index command with self.ignore_deprecation_warnings(): # ignore any DeprecationWarnings thrown by models with old-style # indexed_fields definitions call_command('update_index', backend_name=self.backend_name, interactive=False, stdout=StringIO()) # And now we can finally find results. results = self.backend.search('hello', SearchTest) self.assertSetEqual(set(results), {self.testa, self.testb, self.testc.searchtest_ptr}) results = self.backend.search('world', SearchTest) self.assertSetEqual(set(results), {self.testa, self.testd.searchtest_ptr})
def test_show_usage_monthly(self): out = StringIO() call_command('show_usage', '--monthly', '--subscription-start={}'.format( self.subscription_start.isoformat()), '--after=6/8/2014', '--before=12/25/2015', stdout=out) assert out.getvalue().split() == ''' month start end registered activated active ------- ---------- ---------- ------------ ----------- -------- Y06 M12 2015-12-24 2015-12-25 7 7 0 Y06 M11 2015-11-24 2015-12-24 7 7 0 Y06 M10 2015-10-24 2015-11-24 7 7 0 Y06 M09 2015-09-24 2015-10-24 7 7 1 Y06 M08 2015-08-24 2015-09-24 6 6 0 Y06 M07 2015-07-24 2015-08-24 6 6 0 Y06 M06 2015-06-24 2015-07-24 6 6 0 Y06 M05 2015-05-24 2015-06-24 6 6 0 Y06 M04 2015-04-24 2015-05-24 6 6 0 Y06 M03 2015-03-24 2015-04-24 6 6 0 Y06 M02 2015-02-24 2015-03-24 6 6 0 Y06 M01 2015-01-24 2015-02-24 6 6 1 Y05 M12 2014-12-24 2015-01-24 5 5 0 Y05 M11 2014-11-24 2014-12-24 5 5 0 Y05 M10 2014-10-24 2014-11-24 5 5 0 Y05 M09 2014-09-24 2014-10-24 5 5 0 Y05 M08 2014-08-24 2014-09-24 5 5 0 Y05 M07 2014-07-24 2014-08-24 5 5 0 Y05 M06 2014-06-24 2014-07-24 5 5 0 Y05 M05 2014-05-24 2014-06-24 5 5 0 '''.split()
def test_show_usage_quarterly(self): out = StringIO() call_command('show_usage', '--quarterly', '--subscription-start={:%m/%d/%Y}'.format( self.subscription_start), stdout=out) assert out.getvalue().split() == ''' quarter start end registered activated active --------- ---------- ---------- ------------ ----------- -------- Y08 Q1 2017-01-24 2017-04-04 9 9 1 Y07 Q4 2016-10-24 2017-01-24 8 8 0 Y07 Q3 2016-07-24 2016-10-24 8 8 0 Y07 Q2 2016-04-24 2016-07-24 8 8 0 Y07 Q1 2016-01-24 2016-04-24 8 8 1 Y06 Q4 2015-10-24 2016-01-24 7 7 0 Y06 Q3 2015-07-24 2015-10-24 7 7 1 Y06 Q2 2015-04-24 2015-07-24 6 6 0 Y06 Q1 2015-01-24 2015-04-24 6 6 1 Y05 Q4 2014-10-24 2015-01-24 5 5 0 Y05 Q3 2014-07-24 2014-10-24 5 5 0 Y05 Q2 2014-04-24 2014-07-24 5 5 0 Y05 Q1 2014-01-24 2014-04-24 5 5 1 Y04 Q4 2013-10-24 2014-01-24 4 4 0 Y04 Q3 2013-07-24 2013-10-24 4 4 0 Y04 Q2 2013-04-24 2013-07-24 4 4 0 Y04 Q1 2013-01-24 2013-04-24 4 4 1 Y03 Q4 2012-10-24 2013-01-24 3 3 0 Y03 Q3 2012-07-24 2012-10-24 3 3 0 Y03 Q2 2012-04-24 2012-07-24 3 3 0 Y03 Q1 2012-01-24 2012-04-24 3 3 1 Y02 Q4 2011-10-24 2012-01-24 2 2 0 Y02 Q3 2011-07-24 2011-10-24 2 2 0 Y02 Q2 2011-04-24 2011-07-24 2 2 0 Y02 Q1 2011-01-24 2011-04-24 2 2 1 Y01 Q4 2010-10-24 2011-01-24 1 1 0 Y01 Q3 2010-07-24 2010-10-24 1 1 0 Y01 Q2 2010-04-24 2010-07-24 1 1 0 Y01 Q1 2010-01-24 2010-04-24 1 1 1 '''.split()
def test_dump_data_base(self): """ Testing the case of dump full and empty dataset """ fileobj = self.create_filer_file() jdata, jdata2 = StringIO(), StringIO() call_command("dumpdata", "filer", stdout=jdata) fileobj.delete() call_command("dumpdata", "filer", stdout=jdata2) data = json.loads(jdata.getvalue()) data2 = json.loads(jdata2.getvalue()) self.assertEqual(len(data), 1) self.assertEqual(len(data2), 0)
def test_dump_load_data_content(self): """ Testing the dump / load with full dump of file content data """ with SettingsOverride(filer_settings, FILER_DUMP_PAYLOAD=True): # Initialize the test data create_folder_structure(1,1) fileobj = self.create_filer_file(Folder.objects.all()[0]) jdata = StringIO() # Dump the current data fobj = tempfile.NamedTemporaryFile(suffix=".json", delete=False) call_command("dumpdata", "filer", stdout=jdata, indent=3) # Delete database and filesystem data and complete = os.path.join(fileobj.file.storage.location, fileobj.path) os.unlink(complete) fileobj.delete() # Dump data to json file fobj.write(jdata.getvalue().encode('utf-8')) fobj.seek(0) # Load data back call_command("loaddata", fobj.name, stdout=jdata) # Database data is restored self.assertEqual(Folder.objects.all().count(), 1) self.assertEqual(File.objects.all().count(), 1) self.assertEqual(File.objects.all()[0].original_filename, self.image_name) fileobj = File.objects.all()[0] complete = os.path.join(fileobj.file.storage.location, fileobj.path) # Filesystem data too! self.assertTrue(os.path.exists(complete))
def test_makemigrations(self): out = StringIO() call_command('makemigrations', dry_run=True, noinput=True, stdout=out) output = out.getvalue() self.assertEqual(output, 'No changes detected\n')
def serialize(self, queryset, **options): """ Serialize a queryset. """ self.options = options self.stream = options.pop("stream", six.StringIO()) self.selected_fields = options.pop("fields", None) self.use_natural_foreign_keys = options.pop('use_natural_foreign_keys', False) self.use_natural_primary_keys = options.pop('use_natural_primary_keys', False) progress_bar = self.progress_class( options.pop('progress_output', None), options.pop('object_count', 0) ) self.start_serialization() self.first = True for count, obj in enumerate(queryset, start=1): self.start_object(obj) # Use the concrete parent class' _meta instead of the object's _meta # This is to avoid local_fields problems for proxy models. Refs #17717. concrete_model = obj._meta.concrete_model for field in concrete_model._meta.local_fields: if field.serialize: if field.remote_field is None: if self.selected_fields is None or field.attname in self.selected_fields: self.handle_field(obj, field) else: if self.selected_fields is None or field.attname[:-3] in self.selected_fields: self.handle_fk_field(obj, field) for field in concrete_model._meta.many_to_many: if field.serialize: if self.selected_fields is None or field.attname in self.selected_fields: self.handle_m2m_field(obj, field) self.end_object(obj) progress_bar.update(count) if self.first: self.first = False self.end_serialization() return self.getvalue()
def __init__(self, stream_or_string, **options): """ Init this serializer given a stream or a string """ self.options = options if isinstance(stream_or_string, six.string_types): self.stream = six.StringIO(stream_or_string) else: self.stream = stream_or_string
def as_string(self, unixfrom=False, linesep='\n'): """Return the entire formatted message as a string. Optional `unixfrom' when True, means include the Unix From_ envelope header. This overrides the default as_string() implementation to not mangle lines that begin with 'From '. See bug #13433 for details. """ fp = six.StringIO() g = generator.Generator(fp, mangle_from_=False) if six.PY2: g.flatten(self, unixfrom=unixfrom) else: g.flatten(self, unixfrom=unixfrom, linesep=linesep) return fp.getvalue()
def serialize_db_to_string(self): """ Serializes all data in the database into a JSON string. Designed only for test runner usage; will not handle large amounts of data. """ # Build list of all apps to serialize from django.db.migrations.loader import MigrationLoader loader = MigrationLoader(self.connection) app_list = [] for app_config in apps.get_app_configs(): if ( app_config.models_module is not None and app_config.label in loader.migrated_apps and app_config.name not in settings.TEST_NON_SERIALIZED_APPS ): app_list.append((app_config, None)) # Make a function to iteratively return every object def get_objects(): for model in serializers.sort_dependencies(app_list): if (model._meta.can_migrate(self.connection) and router.allow_migrate_model(self.connection.alias, model)): queryset = model._default_manager.using(self.connection.alias).order_by(model._meta.pk.name) for obj in queryset.iterator(): yield obj # Serialize to a string out = StringIO() serializers.serialize("json", get_objects(), indent=None, stream=out) return out.getvalue()
def deserialize_db_from_string(self, data): """ Reloads the database with data from a string generated by the serialize_db_to_string method. """ data = StringIO(data) for obj in serializers.deserialize("json", data, using=self.connection.alias): obj.save()
def setUp(self): self.logger = logging.getLogger('django') self.old_stream = self.logger.handlers[0].stream self.logger_output = six.StringIO() self.logger.handlers[0].stream = self.logger_output
def startTest(self, test): self.debug_sql_stream = StringIO() self.handler = logging.StreamHandler(self.debug_sql_stream) self.logger.addHandler(self.handler) super(DebugSQLTextTestResult, self).startTest(test)
def writeString(self, encoding): """ Returns the feed in the given encoding as a string. """ s = StringIO() self.write(s, encoding) return s.getvalue()
def get_output(migration): output = StringIO() migration.write(output) return output.getvalue()
def test_delete_today(self): SentNotification( notification_class=NOTIFICATION_CLASS, date_sent=timezone.now() ).save() SentNotification( notification_class=NOTIFICATION_CLASS, date_sent=timezone.now()+timedelta(days=1) ).save() out = StringIO() call_command('delnotifs', stdout=out) self.assertIn(MSG.format(num=1), out.getvalue())
def test_do_not_delete_tomorrow(self): SentNotification( notification_class=NOTIFICATION_CLASS, date_sent=timezone.now()+timedelta(days=1) ).save() out = StringIO() call_command('delnotifs', stdout=out) self.assertIn(MSG.format(num=0), out.getvalue())
def test_do_not_delete_yesterday(self): SentNotification( notification_class=NOTIFICATION_CLASS, date_sent=timezone.now() - timedelta(days=1) ).save() out = StringIO() call_command('delnotifs', stdout=out) self.assertIn(MSG.format(num=0), out.getvalue())
def test_delete_today_start_arg(self): SentNotification( notification_class=NOTIFICATION_CLASS, date_sent=timezone.now() ).save() SentNotification( notification_class=NOTIFICATION_CLASS, date_sent=timezone.now()+timedelta(days=1) ).save() out = StringIO() today = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0) call_command('delnotifs', stdout=out, start=str(today)) self.assertIn(MSG.format(num=1), out.getvalue())
def test_do_not_delete_tomorrow_end_arg(self): SentNotification( notification_class=NOTIFICATION_CLASS, date_sent=timezone.now()+timedelta(days=1) ).save() out = StringIO() two_days_later = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=2) call_command('delnotifs', stdout=out, end=str(two_days_later)) self.assertIn(MSG.format(num=1), out.getvalue())