我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用unittest.mock.Mock()。
def test_get_cached_content_ids__calls(self, mock_get, mock_queryset): mock_redis = Mock(zrevrange=Mock(return_value=[])) mock_get.return_value = mock_redis self.stream.stream_type = StreamType.PUBLIC self.stream.get_cached_content_ids() # Skips zrevrank if not last_id self.assertFalse(mock_redis.zrevrank.called) # Calls zrevrange with correct parameters mock_redis.zrevrange.assert_called_once_with(self.stream.key, 0, self.stream.paginate_by) mock_redis.reset_mock() # Calls zrevrank with last_id self.stream.last_id = self.content2.id mock_redis.zrevrank.return_value = 3 self.stream.get_cached_content_ids() mock_redis.zrevrank.assert_called_once_with(self.stream.key, self.content2.id) mock_redis.zrevrange.assert_called_once_with(self.stream.key, 4, 4 + self.stream.paginate_by)
def test_get_cached_range(self, mock_get, mock_queryset): self.stream.stream_type = StreamType.PUBLIC mock_zrevrange = Mock(return_value=[str(self.content2.id), str(self.content1.id)]) mock_hmget = Mock(return_value=[str(self.content2.id), str(self.content1.id)]) mock_redis = Mock(zrevrange=mock_zrevrange, hmget=mock_hmget) mock_get.return_value = mock_redis ids, throughs = self.stream.get_cached_range(0) self.assertEqual(ids, [self.content2.id, self.content1.id]) self.assertEqual(throughs, {self.content2.id: self.content2.id, self.content1.id: self.content1.id}) mock_zrevrange.assert_called_once_with(self.stream.key, 0, 0 + self.stream.paginate_by) mock_hmget.assert_called_once_with(BaseStream.get_throughs_key(self.stream.key), keys=[ self.content2.id, self.content1.id, ]) # Non-zero index mock_zrevrange.reset_mock() self.stream.get_cached_range(5) mock_zrevrange.assert_called_once_with(self.stream.key, 5, 5 + self.stream.paginate_by)
def tests_event_registration(self): """Tests that events register correctly.""" # Get the event controller events = self.abode.events self.assertIsNotNone(events) # Create mock callback callback = Mock() # Test that a valid event registers self.assertTrue( events.add_event_callback(TIMELINE.ALARM_GROUP, callback)) # Test that no event group returns false self.assertFalse(events.add_event_callback(None, callback)) # Test that an invalid event throws exception with self.assertRaises(abodepy.AbodeException): events.add_event_callback("lol", callback)
def tests_timeline_registration(self): """Tests that timeline events register correctly.""" # Get the event controller events = self.abode.events self.assertIsNotNone(events) # Create mock callback callback = Mock() # Test that a valid timeline event registers self.assertTrue( events.add_timeline_callback( TIMELINE.CAPTURE_IMAGE, callback)) # Test that no timeline event returns false self.assertFalse(events.add_timeline_callback(None, callback)) # Test that an invalid timeline event string throws exception with self.assertRaises(abodepy.AbodeException): events.add_timeline_callback("lol", callback) # Test that an invalid timeline event dict throws exception with self.assertRaises(abodepy.AbodeException): events.add_timeline_callback({"lol": "lol"}, callback)
def tests_multi_events_callback(self): """Tests that multiple event updates callback correctly.""" # Get the event controller events = self.abode.events self.assertIsNotNone(events) # Create mock callback callback = Mock() # Register our events self.assertTrue( events.add_event_callback( [TIMELINE.ALARM_GROUP, TIMELINE.CAPTURE_GROUP], callback)) # Call our events callback method and trigger a capture group event # pylint: disable=protected-access event_json = json.loads(IRCAMERA.timeline_event()) events._on_timeline_update(event_json) # Ensure our callback was called callback.assert_called_with(event_json)
def tests_multi_timeline_callback(self): """Tests that multiple timeline updates callback correctly.""" # Get the event controller events = self.abode.events self.assertIsNotNone(events) # Create mock callback callback = Mock() # Register our events self.assertTrue( events.add_timeline_callback( [TIMELINE.CAPTURE_IMAGE, TIMELINE.OPENED], callback)) # Call our events callback method and trigger a capture group event # pylint: disable=protected-access event_json = json.loads(IRCAMERA.timeline_event()) events._on_timeline_update(event_json) # Ensure our callback was called callback.assert_called_with(event_json)
def tests_automations_callback(self): """Tests that automation updates callback correctly.""" # Get the event controller events = self.abode.events self.assertIsNotNone(events) # Create mock callbacks automation_callback = Mock() # Register our events self.assertTrue( events.add_event_callback( TIMELINE.AUTOMATION_EDIT_GROUP, automation_callback)) # Call our events callback method and trigger a capture group event # pylint: disable=protected-access events._on_automation_update('{}') # Our capture callback should get one, but our alarm should not automation_callback.assert_called_with('{}')
def test_no_modifications(self): self.io._write_attr_annotations = mock.Mock() self.io.write_all_blocks(self.neo_blocks) self.io._write_attr_annotations.assert_not_called() self.compare_blocks(self.neo_blocks, self.io.nix_file.blocks) # clearing hashes and checking again for k in self.io._object_hashes.keys(): self.io._object_hashes[k] = None self.io.write_all_blocks(self.neo_blocks) self.io._write_attr_annotations.assert_not_called() # changing hashes to force rewrite for k in self.io._object_hashes.keys(): self.io._object_hashes[k] = "_" self.io.write_all_blocks(self.neo_blocks) callcount = self.io._write_attr_annotations.call_count self.assertEqual(callcount, len(self.io._object_hashes)) self.compare_blocks(self.neo_blocks, self.io.nix_file.blocks)
def test_subscription_recording_processor_for_request(self): replaced_subscription_id = str(uuid.uuid4()) rp = SubscriptionRecordingProcessor(replaced_subscription_id) uri_templates = ['https://management.azure.com/subscriptions/{}/providers/Microsoft.ContainerRegistry/' 'checkNameAvailability?api-version=2017-03-01', 'https://graph.windows.net/{}/applications?api-version=1.6'] for template in uri_templates: mock_sub_id = str(uuid.uuid4()) mock_request = mock.Mock() mock_request.uri = template.format(mock_sub_id) mock_request.body = self._mock_subscription_request_body(mock_sub_id) rp.process_request(mock_request) self.assertEqual(mock_request.uri, template.format(replaced_subscription_id)) self.assertEqual(mock_request.body, self._mock_subscription_request_body(replaced_subscription_id))
def test_getattr_with_under(self, *args, **kwrags): connection = Mock(spec=core.SanicMongoAgnosticClient) delegate = MagicMock() delegate.name = 'blabla' connection.delegate = delegate name = 'blabla' db = create_class_with_framework(core.SanicMongoAgnosticDatabase, asyncio_framework, self.__module__)(connection, name) coll = create_class_with_framework(core.SanicMongoAgnosticCollection, asyncio_framework, self.__module__) coll = coll(db, name) self.assertEqual(coll._get_write_mode, coll.delegate._get_write_mode)
def test_getitem(self, *args, **kwargs): connection = Mock(spec=core.SanicMongoAgnosticClient) delegate = MagicMock() delegate.name = 'blabla' connection.delegate = delegate name = 'blabla' db = create_class_with_framework(core.SanicMongoAgnosticDatabase, asyncio_framework, self.__module__)(connection, name) coll = create_class_with_framework(core.SanicMongoAgnosticCollection, asyncio_framework, self.__module__) coll = coll(db, name) other_coll = coll['other'] self.assertTrue(isinstance(other_coll, type(coll)))
def test_find(self, *args, **kwargs): connection = Mock(spec=core.SanicMongoAgnosticClient) delegate = MagicMock() delegate.name = 'blabla' connection.delegate = delegate name = 'blabla' db = create_class_with_framework(core.SanicMongoAgnosticDatabase, asyncio_framework, self.__module__)(connection, name) coll = create_class_with_framework(core.SanicMongoAgnosticCollection, asyncio_framework, self.__module__)(db, name) cursor = create_class_with_framework(core.SanicMongoAgnosticCursor, asyncio_framework, self.__module__) self.assertIsInstance(coll.find(), cursor)
def test_get_item(self, *args, **kwargs): connection = Mock(spec=core.SanicMongoAgnosticClient) delegate = MagicMock() connection.delegate = delegate name = 'blabla' db = create_class_with_framework(core.SanicMongoAgnosticDatabase, asyncio_framework, self.__module__)(connection, name) comp_coll = create_class_with_framework( core.SanicMongoAgnosticCollection, asyncio_framework, self.__module__) coll = db['bla'] self.assertTrue(isinstance(coll, comp_coll))
def test_asynchornize(self): test_mock = Mock() class TestClass: @classmethod def _get_db(cls): db = Mock() db._framework = asyncio_framework return db @metaprogramming.asynchronize def sync(self): test_mock() testobj = TestClass() self.assertTrue(isinstance(testobj.sync(), Future)) yield from testobj.sync() self.assertTrue(test_mock.called)
def test_asynchornize_with_exception(self): class TestClass: @classmethod def _get_db(cls): db = Mock() db._framework = asyncio_framework return db @metaprogramming.asynchronize def sync(self): raise Exception testobj = TestClass() with self.assertRaises(Exception): yield from testobj.sync()
def test_create_attribute_with_attribute_error(self): test_mock = Mock() class BaseTestClass: @classmethod def _get_db(cls): db = Mock() db._framework = asyncio_framework return db def some_method(self): test_mock() class TestClass(BaseTestClass): some_method = metaprogramming.Async() test_class = TestClass with self.assertRaises(AttributeError): test_class.some_method = TestClass.some_method.create_attribute( TestClass, 'some_other_method')
def test_errors_reset(): from jenkins_epo.bot import Error, Instruction from jenkins_epo.utils import parse_datetime from jenkins_epo.extensions.core import ErrorExtension ext = ErrorExtension('error', Mock()) ext.current = ext.bot.current ext.current.denied_instructions = [] ext.current.errors = [ Error('message', parse_datetime('2016-08-03T15:58:47Z')), ] ext.current.error_reset = None ext.process_instruction( Instruction(name='reset-errors', author='bot', date=datetime.utcnow()) ) assert ext.current.error_reset yield from ext.run() assert not ext.current.head.comment.mock_calls
def test_error_denied(): from jenkins_epo.bot import Instruction from jenkins_epo.extensions.core import ErrorExtension ext = ErrorExtension('error', Mock()) ext.current = ext.bot.current ext.current.denied_instructions = [Mock()] ext.current.errors = [] ext.current.error_reset = None ext.process_instruction( Instruction(name='reset-denied', author='bot') ) assert not ext.current.denied_instructions yield from ext.run() assert not ext.current.head.comment.mock_calls ext.current.denied_instructions = [Mock(author='toto')] yield from ext.run() assert ext.current.head.comment.mock_calls
def test_yml_found(mocker, SETTINGS): GITHUB = mocker.patch('jenkins_epo.extensions.core.GITHUB') Job = mocker.patch('jenkins_epo.extensions.core.Job') from jenkins_epo.extensions.core import YamlExtension Job.jobs_filter = ['*', '-skip'] SETTINGS.update(YamlExtension.SETTINGS) ext = YamlExtension('ext', Mock()) ext.current = ext.bot.current ext.current.yaml = {'job': dict()} GITHUB.fetch_file_contents = CoroutineMock( return_value="job: command\nskip: command", ) head = ext.current.head head.repository.url = 'https://github.com/owner/repo.git' head.repository.jobs = {} yield from ext.run() assert GITHUB.fetch_file_contents.mock_calls assert 'job' in ext.current.job_specs assert 'skip' not in ext.current.job_specs
def test_yml_comment_dict(): from jenkins_epo.bot import Instruction from jenkins_epo.extensions.core import YamlExtension ext = YamlExtension('ext', Mock()) ext.current = ext.bot.current ext.current.yaml = {} ext.process_instruction(Instruction( author='a', name='yaml', args=dict(job=dict(parameters=dict(PARAM1=1))) )) ext.process_instruction(Instruction( author='a', name='params', args=dict(job=dict(PARAM2=1)) )) assert 'PARAM1' in ext.current.yaml['job']['parameters'] assert 'PARAM2' in ext.current.yaml['job']['parameters']
def test_job_update(): from jenkins_epo.extensions.jenkins import CreateJobsExtension ext = CreateJobsExtension('createjob', Mock()) ext.current = ext.bot.current ext.current.refresh_jobs = None ext.current.job_specs = {'new_job': Mock(config=dict())} ext.current.job_specs['new_job'].name = 'new_job' job = Mock() job.spec.contains.return_value = False ext.current.jobs = {'new_job': job} res = [x for x in ext.process_job_specs()] assert res action, spec = res[0] assert action == job.update
def test_jenkins_fails_existing(mocker): process_job_specs = mocker.patch( 'jenkins_epo.extensions.jenkins.CreateJobsExtension.process_job_specs' ) from jenkins_yml.job import Job as JobSpec from jenkins_epo.extensions.jenkins import CreateJobsExtension ext = CreateJobsExtension('createjob', Mock()) ext.current = ext.bot.current ext.current.errors = [] ext.current.head.sha = 'cafed0d0' ext.current.head.repository.jobs = {'job': Mock()} ext.current.job_specs = dict(job=JobSpec.factory('job', 'toto')) job = Mock() ext.current.jobs = {'job': job} job.update = CoroutineMock(side_effect=Exception('POUET')) process_job_specs.return_value = [(job.update, Mock())] yield from ext.run() assert ext.current.errors assert job.update.mock_calls
def test_second_stage(): from jenkins_epo.extensions.jenkins import StagesExtension ext = StagesExtension('stages', Mock()) ext.current = Mock() ext.current.head.ref = 'pr' ext.current.SETTINGS.STAGES = ['build', 'test'] ext.current.job_specs = specs = { 'test': Mock(config=dict()), 'missing': Mock(config=dict()), } specs['test'].name = 'test' specs['missing'].name = 'missing' ext.current.jobs = jobs = { 'test': Mock(), } jobs['test'].list_contexts.return_value = ['test'] ext.current.statuses = {} yield from ext.run() assert ext.current.current_stage.name == 'test'
def test_no_test_stage(): from jenkins_epo.extensions.jenkins import StagesExtension ext = StagesExtension('stages', Mock()) ext.current = Mock() ext.current.head.ref = 'pr' ext.current.SETTINGS.STAGES = ['build', 'deploy'] ext.current.job_specs = specs = { 'build': Mock(config=dict()), } specs['build'].name = 'build' ext.current.jobs = jobs = { 'build': Mock(), } jobs['build'].list_contexts.return_value = ['build'] ext.current.statuses = {} yield from ext.run() assert 'build' == ext.current.current_stage.name assert 'build' in ext.current.job_specs
def test_branches_limit(): from jenkins_epo.extensions.jenkins import StagesExtension ext = StagesExtension('stages', Mock()) ext.current = Mock() ext.current.head.ref = 'pr' ext.current.SETTINGS.STAGES = ['test'] ext.current.job_specs = specs = { 'job': Mock(config=dict(branches='master')), } specs['job'].name = 'job' ext.current.jobs = jobs = {'job': Mock()} jobs['job'].list_contexts.return_value = ['job'] ext.current.statuses = {} yield from ext.run() assert ext.current.current_stage.name == 'test' assert 'job' not in ext.current.job_specs
def test_comment_deny(): from jenkins_epo.bot import Instruction from jenkins_epo.extensions.core import MergerExtension ext = MergerExtension('merger', Mock()) ext.current = Mock() ext.current.SETTINGS.COLLABORATORS = [] ext.current.last_commit.date = datetime.now() ext.current.opm = {} ext.current.opm_denied = [Instruction( author='noncollaborator', name='opm', date=ext.current.last_commit.date + timedelta(hours=1), )] yield from ext.run() assert ext.current.head.comment.mock_calls
def test_deny_outdated_opm(): from jenkins_epo.bot import Instruction from jenkins_epo.extensions.core import MergerExtension ext = MergerExtension('merger', Mock()) ext.current = Mock() ext.current.SETTINGS.COLLABORATORS = ['collaborator'] ext.current.last_commit.date = datetime.now() ext.current.opm = {} ext.current.opm_denied = [] ext.process_instruction(Instruction( author='collaborator', name='opm', date=ext.current.last_commit.date - timedelta(hours=1), )) assert not ext.current.opm
def test_deny_non_collaborator_processed(): from jenkins_epo.bot import Instruction from jenkins_epo.extensions.core import MergerExtension ext = MergerExtension('merger', Mock()) ext.current = Mock() ext.current.SETTINGS.COLLABORATORS = [] ext.current.last_commit.date = datetime.now() ext.current.opm = None ext.current.opm_denied = [Instruction( author='noncollaborator', name='opm', )] ext.process_instruction(Instruction( author='bot', name='opm-processed', date=ext.current.last_commit.date + timedelta(hours=2), )) assert not ext.current.opm assert not ext.current.opm_denied
def test_accept_lgtm(): from jenkins_epo.bot import Instruction from jenkins_epo.extensions.core import MergerExtension ext = MergerExtension('merger', Mock()) ext.current = Mock() ext.current.SETTINGS.COLLABORATORS = ['collaborator'] ext.current.last_commit.date = commit_date = datetime.now() ext.current.opm = None ext.current.opm_denied = [] ext.process_instruction(Instruction( author='collaborator', name='opm', date=commit_date + timedelta(hours=1), )) assert 'collaborator' == ext.current.opm.author assert not ext.current.opm_denied
def test_merge_wip(): from jenkins_epo.extensions.core import MergerExtension ext = MergerExtension('merger', Mock()) ext.current = Mock() ext.current.SETTINGS.COLLABORATORS = ['collaborator'] ext.current.last_commit.date = datetime.now() ext.current.opm.author = 'collaborator' ext.current.opm.date = datetime.now() ext.current.opm_denied = [] ext.current.opm_processed = None ext.current.wip = True yield from ext.run() assert ext.current.head.comment.mock_calls body = ext.current.head.comment.call_args[1]['body'] assert '@collaborator' in body assert not ext.current.head.merge.mock_calls
def test_merge_wip_skip_outdated(): from jenkins_epo.extensions.core import MergerExtension ext = MergerExtension('merger', Mock()) ext.current = Mock() ext.current.SETTINGS.COLLABORATORS = ['collaborator'] ext.current.last_commit.date = datetime.now() ext.current.opm.author = 'collaborator' ext.current.opm.date = datetime.now() ext.current.opm_denied = [] ext.current.wip = True ext.current.opm_processed = Mock( date=ext.current.opm.date + timedelta(minutes=5) ) yield from ext.run() assert not ext.current.head.comment.mock_calls assert not ext.current.head.merge.mock_calls
def test_merge_fail(): from jenkins_epo.extensions.core import MergerExtension, ApiError ext = MergerExtension('merger', Mock()) ext.current = Mock() ext.current.SETTINGS.COLLABORATORS = ['collaborator'] ext.current.last_commit.date = datetime.now() ext.current.opm.author = 'collaborator' ext.current.opm_denied = [] ext.current.wip = None ext.current.last_commit.fetch_combined_status = CoroutineMock( return_value={'state': 'success'} ) ext.current.head.merge.side_effect = ApiError('url', {}, dict( code=405, json=dict(message="error") )) yield from ext.run() assert not ext.current.head.delete_branch.mock_calls
def test_merge_success(): from jenkins_epo.extensions.core import MergerExtension ext = MergerExtension('merger', Mock()) ext.current = Mock() ext.current.opm = Mock(author='author') ext.current.opm_denied = [] ext.current.last_merge_error = None ext.current.last_commit.fetch_combined_status = CoroutineMock( return_value={'state': 'success'} ) ext.current.wip = None yield from ext.run() assert ext.current.head.merge.mock_calls assert not ext.current.head.comment.mock_calls assert ext.current.head.delete_branch.mock_calls
def test_process_allow(): from jenkins_epo.bot import Instruction from jenkins_epo.extensions.core import SecurityExtension ext = SecurityExtension('sec', Mock()) ext.current = ext.bot.current ext.current.head.author = 'contributor' ext.current.security_feedback_processed = None ext.current.SETTINGS.COLLABORATORS = ['owner'] ext.current.denied_instructions = [ Instruction(author='contributor', name='skip') ] ext.process_instruction(Instruction( author='owner', name='allow' )) assert 'contributor' in ext.current.SETTINGS.COLLABORATORS assert not ext.current.denied_instructions
def test_skip_outdated(): from jenkins_epo.extensions.jenkins import PollExtension ext = PollExtension('test', Mock()) ext.current = ext.bot.current ext.current.head.sha = 'cafed0d0' ext.current.cancel_queue = [] ext.current.job_specs = {'job': Mock()} ext.current.job_specs['job'].name = 'job' ext.current.jobs = {} ext.current.jobs['job'] = job = Mock() job.fetch_builds = CoroutineMock() job.process_builds.return_value = builds = [Mock()] build = builds[0] build.is_outdated = True yield from ext.run() assert not builds[0].is_running.mock_calls assert 0 == len(ext.current.cancel_queue)
def test_skip_build_not_running(): from jenkins_epo.extensions.jenkins import PollExtension ext = PollExtension('test', Mock()) ext.current = ext.bot.current ext.current.head.sha = 'cafed0d0' ext.current.cancel_queue = [] ext.current.job_specs = {'job': Mock()} ext.current.job_specs['job'].name = 'job' ext.current.jobs = {} ext.current.jobs['job'] = job = Mock() job.fetch_builds = CoroutineMock() job.process_builds.return_value = builds = [Mock()] build = builds[0] build.is_outdated = False build.is_running = False yield from ext.run() assert 0 == len(ext.current.cancel_queue)
def test_skip_current_sha(mocker): from jenkins_epo.extensions.jenkins import PollExtension ext = PollExtension('test', Mock()) ext.current = ext.bot.current ext.current.cancel_queue = [] ext.current.head.ref = 'branch' ext.current.head.sha = 'bab1' ext.current.job_specs = {'job': Mock()} ext.current.job_specs['job'].name = 'job' ext.current.jobs = {} ext.current.jobs['job'] = job = Mock() job.list_contexts.return_value = [] job.fetch_builds = CoroutineMock() job.process_builds.return_value = builds = [Mock()] build = builds[0] build.is_outdated = False build.is_running = True build.ref = 'branch' build.sha = ext.current.head.sha yield from ext.run() assert 0 == len(ext.current.cancel_queue)
def test_cancel(mocker): from jenkins_epo.extensions.jenkins import PollExtension ext = PollExtension('test', Mock()) ext.current = ext.bot.current ext.current.cancel_queue = [] ext.current.head.ref = 'branch' ext.current.head.sha = 'bab1' ext.current.job_specs = {'job': Mock()} ext.current.job_specs['job'].name = 'job' ext.current.jobs = {} ext.current.jobs['job'] = job = Mock() job.fetch_builds = CoroutineMock() job.process_builds.return_value = builds = [Mock()] build = builds[0] build.is_outdated = False build.is_running = True build.ref = 'branch' build.sha = '01d' build.url = 'url://' build.commit_status = dict() yield from ext.run() assert 1 == len(ext.current.cancel_queue)
def test_cancel_ignore_other(mocker): Build = mocker.patch('jenkins_epo.extensions.jenkins.Build') from jenkins_epo.extensions.jenkins import ( CancellerExtension, CommitStatus, NotOnJenkins ) Build.from_url = CoroutineMock(side_effect=NotOnJenkins()) commit = Mock() ext = CancellerExtension('test', Mock()) ext.current = ext.bot.current ext.current.head.sha = 'cafed0d0' ext.current.poll_queue = [] ext.current.cancel_queue = [ (commit, CommitStatus( context='ci/...', target_url='circleci://1', state='pending', )), ] ext.current.last_commit.fetch_statuses.return_value = [] ext.current.last_commit.maybe_update_status = CoroutineMock() yield from ext.run() assert not commit.maybe_update_status.mock_calls
def test_poll_lost_build(mocker): JENKINS = mocker.patch('jenkins_epo.extensions.jenkins.JENKINS') Build = mocker.patch('jenkins_epo.extensions.jenkins.Build') from jenkins_epo.extensions.jenkins import CancellerExtension, CommitStatus commit = Mock() commit.maybe_update_status = CoroutineMock() ext = CancellerExtension('test', Mock()) ext.current = ext.bot.current ext.current.head.sha = 'cafed0d0' ext.current.poll_queue = [] ext.current.cancel_queue = [ (commit, CommitStatus(context='job', target_url='jenkins://job/1')), ] ext.current.last_commit.fetch_statuses.return_value = [] JENKINS.baseurl = 'jenkins://' Build.from_url = CoroutineMock() yield from ext.run() assert Build.from_url.mock_calls assert commit.maybe_update_status.mock_calls
def test_get_network_data(self, time_mock, sleep_mock): time_mock.side_effect = [1, 2] Counter = namedtuple('Counter', ['bytes_sent', 'bytes_recv', 'packets_sent', 'packets_recv']) first_counter = Counter(bytes_sent=54000, bytes_recv=12000, packets_sent=50, packets_recv=100) second_counter = Counter(bytes_sent=108000, bytes_recv=36000, packets_sent=75, packets_recv=150) m = mock.Mock() m.side_effect = [ {'eth0': first_counter}, {'eth0': second_counter} ] self.network.psutil.net_io_counters = m kb_ul, kb_dl, p_ul, p_dl = self.network.get_network_data( interface='eth0', delay=1) self.assertEqual(kb_ul, 54000) self.assertEqual(kb_dl, 24000) self.assertEqual(p_ul, 25) self.assertEqual(p_dl, 50)
def test_xsubscribe(service_a1, service_d1, registry): # assert service_d1 == {} registry.register_service( packet={'params': service_a1}, registry_protocol=mock.Mock()) registry.register_service( packet={'params': service_d1}, registry_protocol=mock.Mock()) registry._xsubscribe(packet={'params': service_d1}) protocol = mock.Mock() params = { 'name': service_a1['name'], 'version': service_a1['version'], 'endpoint': service_d1['events'][0]['endpoint'] } registry.get_subscribers(packet={'params': params, 'request_id': str(uuid.uuid4())}, protocol=protocol) assert subscriber_returned_successfully(protocol.send.call_args_list[0][0][0], service_d1)
def test_non_post_entity_types_are_skipped(self, mock_logger): process_entity_retraction(Mock(entity_type="foo"), Mock()) mock_logger.assert_called_with("Ignoring retraction of entity_type %s", "foo")
def test_does_nothing_if_content_doesnt_exist(self, mock_logger): process_entity_retraction(Mock(entity_type="Post", target_guid="bar"), Mock()) mock_logger.assert_called_with("Retracted remote content %s cannot be found", "bar")
def test_does_nothing_if_content_is_not_local(self, mock_logger): process_entity_retraction(Mock(entity_type="Post", target_guid=self.local_content.guid), Mock()) mock_logger.assert_called_with( "Retracted remote content %s cannot be found", self.local_content.guid, )
def test_does_nothing_if_content_author_is_not_same_as_remote_profile(self, mock_logger): remote_profile = Mock() process_entity_retraction(Mock(entity_type="Post", target_guid=self.content.guid), remote_profile) mock_logger.assert_called_with( "Content %s is not owned by remote retraction profile %s", self.content, remote_profile )
def test_deletes_content(self): process_entity_retraction( Mock(entity_type="Post", target_guid=self.content.guid, handle=self.content.author.handle), self.content.author ) with self.assertRaises(Content.DoesNotExist): Content.objects.get(id=self.content.id)
def test_returns_none_on_exception(self, mock_post): entity = make_federable_retraction(Mock(), Mock()) self.assertIsNone(entity)
def test_remote_profile_public_key_is_returned(self, mock_from_remote, mock_retrieve): remote_profile = Mock(rsa_public_key="foo") mock_retrieve.return_value = mock_from_remote.return_value = remote_profile self.assertEqual(sender_key_fetcher("bar"), "foo") mock_retrieve.assert_called_once_with("bar") mock_from_remote.assert_called_once_with(remote_profile)
def test_returns_legacy_xml_payload(self): self.assertEqual( DiasporaReceiveViewMixin.get_payload_from_request(Mock(body="foobar", POST={"xml": "barfoo"})), "barfoo", )