我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mock.patch.object()。
def test_verify_user(self): """ Test verify user if passed user is unverified in group """ usrmgr = self.__get_dummy_object() config = {"foogroup": {"unverified": ["@foouser"]}} self.__set_config(usrmgr, config) with patch.object(usrmgr, "_UserManager__load_config"),\ patch.object(usrmgr, "_UserManager__save_config"): result = usrmgr.verify_user(1337, "@foouser", "foogroup") self.assertTrue(result) expected_config = { "foogroup": {"users": [{"id": 1337, "username": "@foouser"}]} } self.assertEqual(usrmgr.config, expected_config)
def test_add_user_already_in_grp(self): """ Test adduser if the user is already in the passed group """ usrmgr = self.__get_dummy_object() config = {"foogroup": {"unverified": ["@foouser"]}} self.__set_config(usrmgr, config) with patch.object(usrmgr, "_UserManager__load_config"),\ patch.object(usrmgr, "_UserManager__save_config"): result = usrmgr.add_user("@foouser", "foogroup") self.assertFalse(result)
def test_add_user_verified(self): """ Test adduser if the username and the user_id is passed """ usrmgr = self.__get_dummy_object() self.__set_config(usrmgr, {}) with patch.object(usrmgr, "_UserManager__load_config"),\ patch.object(usrmgr, "_UserManager__save_config"): result = usrmgr.add_user("@foouser", "foogroup", user_id=1337) self.assertTrue(result) expected_config = { "foogroup": {"users": [{"id": 1337, "username": "@foouser"}]} } self.assertEqual(usrmgr.config, expected_config)
def make_test_handler(testcase, *args, **kwargs): """ Returns a TestHandler which will be used by the given testcase. This handler can be used to test log messages. Parameters ---------- testcase: unittest.TestCase The test class in which the log handler will be used. *args, **kwargs Forwarded to the new TestHandler object. Returns ------- handler: logbook.TestHandler The handler to use for the test case. """ handler = TestHandler(*args, **kwargs) testcase.addCleanup(handler.close) return handler
def setUp(self): def test_view(): pass # pragma: no cover root_patterns = [ url(r'^foo/$', url, name='foo'), url(r'^((?:[\w\-]+/)*)$', url, name='wagtail_serve'), url(r'^bar/$', url, name='bar'), ] self.patcher = patch.object( wagtail.wagtailcore.urls, 'urlpatterns', root_patterns ) self.patcher.start() self.addCleanup(self.patcher.stop) reload(wagtailsharing.urls) self.urlpatterns = wagtailsharing.urls.urlpatterns
def test_replace_loggroup_section(mocked_configparser_class): config_dir = api_utils.user_config_dir(cli.lecli.__name__) if not os.path.exists(api_utils.CONFIG_FILE_PATH): if not os.path.exists(config_dir): os.makedirs(config_dir) loggroups_section = api_utils.LOGGROUPS_SECTION config_parser_mock = mocked_configparser_class.return_value config_parser_mock.add_section(loggroups_section) with patch.object(api_utils.CONFIG, 'items', return_value=[('test-log-group-favs', ["test-log-key1", "test-log-key2"])]): api_utils.replace_loggroup_section() assert not api_utils.CONFIG.has_section(loggroups_section) assert config_parser_mock.has_section(api_utils.CLI_FAVORITES_SECTION) try: os.remove(api_utils.CONFIG_FILE_PATH) os.rmdir(config_dir) except OSError: pass
def test_valid_server_pool_add_slot(query_mock, add_mo_mock, commit_mock): query_mock.return_value = ComputePool(parent_mo_or_dn="org-root", name="test-pool") add_mo_mock.return_value = True commit_mock.return_value = True handle = UcsHandle('169.254.1.1', 'admin', 'password') # Scenario: Default parameters pool_retval = server_pool_add_slot(handle, 1, 8) # Verify we were passed back the correct object type assert isinstance(pool_retval, ComputePooledSlot) # Verify the ID we gave it was assigned correctly assert pool_retval.chassis_id == str(1) assert pool_retval.slot_id == str(8) # Patch UcsHandle.commit to simulate ucsm interaction w/o real ucsm
def test_non_backoff_exception( self, container_factory, rabbit_config, rpc_proxy ): """ Non-backoff exceptions are handled normally """ class Boom(Exception): pass class Service(object): name = "service" @rpc def method(self): raise Boom() container = container_factory(Service, rabbit_config) container.start() with pytest.raises(RemoteError) as exc_info: rpc_proxy.service.method() assert exc_info.value.exc_type == "Boom"
def container(self, container_factory): class Service(object): name = "service" @rpc def nothing_expected(self): raise Backoff() @rpc(expected_exceptions=self.UserException) def something_expected(self): raise Backoff() @rpc(expected_exceptions=(self.UserException, Backoff)) def backoff_expected(self): raise Backoff() config = {'AMQP_URI': 'memory://localhost'} container = container_factory(Service, config) return container
def entrypoint_tracker(): class CallTracker(object): def __init__(self): self.calls = [] def __len__(self): return len(self.calls) def track(self, **call): self.calls.append(call) def get_results(self): return [call['result'] for call in self.calls] def get_exceptions(self): return [call['exc_info'] for call in self.calls] return CallTracker()
def container(self, container_factory, rabbit_config, counter): class Service(object): name = "service" backoff = BackoffPublisher() @rpc def slow(self): if counter["slow"].increment() <= 1: raise SlowBackoff() return "slow" @rpc def quick(self): if counter["quick"].increment() <= 1: raise QuickBackoff() return "quick" container = container_factory(Service, rabbit_config) container.start() return container
def container(self, container_factory, rabbit_config, counter): class Service(object): name = "service" backoff = BackoffPublisher() @rpc def bad(self): class BadBackoff(Backoff): schedule = (-10, ) raise BadBackoff() container = container_factory(Service, rabbit_config) container.start() return container
def test_noop_model_direct_update(self): """ Tests that calling update on a model with no changes will do nothing. """ m0 = TestUpdateModel.create(count=5, text='monkey') with patch.object(self.session, 'execute') as execute: m0.update() assert execute.call_count == 0 with patch.object(self.session, 'execute') as execute: m0.update(count=5) assert execute.call_count == 0 with patch.object(self.session, 'execute') as execute: m0.update(partition=m0.partition) with patch.object(self.session, 'execute') as execute: m0.update(cluster=m0.cluster)
def test_noop_model_assignation_update(self): """ Tests that assigning the same value on a model will do nothing. """ # Create object and fetch it back to eliminate any hidden variable # cache effect. m0 = TestUpdateModel.create(count=5, text='monkey') m1 = TestUpdateModel.get(partition=m0.partition, cluster=m0.cluster) with patch.object(self.session, 'execute') as execute: m1.save() assert execute.call_count == 0 with patch.object(self.session, 'execute') as execute: m1.count = 5 m1.save() assert execute.call_count == 0 with patch.object(self.session, 'execute') as execute: m1.partition = m0.partition m1.save() assert execute.call_count == 0 with patch.object(self.session, 'execute') as execute: m1.cluster = m0.cluster m1.save() assert execute.call_count == 0
def test_only_set_values_is_updated(self): """ Test the updates work as expected when an object is deleted @since 3.9 @jira_ticket PYTHON-657 @expected_result the non updated column is None and the updated column has the set value @test_category object_mapper """ ModelWithDefault.create(id=1, mf={1: 1}, dummy=1).save() item = ModelWithDefault.filter(id=1).first() ModelWithDefault.objects(id=1).delete() item.mf = {1: 2} udt, udt_default = UDT(age=1, mf={2:3}), UDT(age=1, mf={2:3}) item.udt, item.udt_default = udt, udt_default item.save() self.assertEqual(ModelWithDefault.get()._as_dict(), {'id': 1, 'dummy': None, 'mf': {1: 2}, "udt": udt, "udt_default": udt_default})
def test_speculative_and_timeout(self): """ Test to ensure the timeout is honored when using speculative execution @since 3.10 @jira_ticket PYTHON-750 @expected_result speculative retries be schedule every fixed period, during the maximum period of the timeout. @test_category metadata """ # We mock this so no messages are sent, otherwise a reponse might arrive # and we would not know how many hosts we queried with patch.object(Connection, "send_msg", return_value = 100) as mocked_send_msg: statement = SimpleStatement("INSERT INTO test3rf.test (k, v) VALUES (0, 1);", is_idempotent=True) # An OperationTimedOut is placed here in response_future, # that's why we can't call session.execute,which would raise it, but # we have to directly wait for the event response_future = self.session.execute_async(statement, execution_profile='spec_ep_brr_lim', timeout=2.2) response_future._event.wait(4) self.assertIsInstance(response_future._final_exception, OperationTimedOut) # This is because 2.2 / 0.4 + 1 = 6 self.assertEqual(len(response_future.attempted_hosts), 6)
def test_get_genesis_block_whenCalled_thenCreatesAndReturnsBlockWithGenesisTransactions(self): genesis_transaction_one = Transaction( "0", "03dd1e57d05d9cab1d8d9b727568ad951ac2d9ecd082bc36f69e021b8427812924", 500000, 0 ) genesis_transaction_two = Transaction( "0", "03dd1e3defd36c8c0c7282ca1a324851efdb15f742cac0c5b258ef7b290ece9e5d", 500000, 0 ) genesis_transactions = [genesis_transaction_one, genesis_transaction_two] with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \ patch('crankycoin.blockchain.Block') as patched_Block: patched_Block._calculate_block_hash.return_value = "mock_block_hash" subject = Blockchain() genesis_block = subject.get_genesis_block() patched_Block.assert_called_once_with(0, genesis_transactions, 0, 0, 0)
def test_check_hash_and_hash_pattern_whenBlockHasValidHashAndPattern_thenReturnsTrue(self): mock_block = Mock(Block) transaction = Mock(Transaction) transaction.source = "from" transaction.timestamp = 1498923800 transaction.destination = "to" transaction.amount = 50 transaction.signature = "signature" transaction.tx_hash = "transaction_hash" with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \ patch.object(Blockchain, 'calculate_hash_difficulty', return_value=4) as patched_calculate_hash_difficulty: mock_block.current_hash = "0000_valid_block_hash" mock_block.index = 35 mock_block.previous_hash = "0000_valid_previous_hash" mock_block.transactions = [transaction] mock_block.nonce = 37 mock_block.timestamp = 12341234 subject = Blockchain() resp = subject._check_hash_and_hash_pattern(mock_block) self.assertIsNone(resp)
def test_check_hash_and_hash_pattern_whenBlockHasInvalidHash_thenReturnsFalse(self): mock_block = Mock(Block) transaction = Mock(Transaction) transaction.source = "from" transaction.timestamp = 1498923800 transaction.destination = "to" transaction.amount = 50 transaction.signature = "signature" transaction.tx_hash = "transaction_hash" with patch.object(Blockchain, '__init__', return_value=None) as patched_init: mock_block.current_hash = "0000_valid_block_hash" mock_block.index = 35 mock_block.previous_hash = "0000_valid_previous_hash" mock_block.transactions = [transaction] mock_block.nonce = 37 mock_block.timestamp = 12341234 subject = Blockchain() with self.assertRaises(InvalidHash) as context: subject._check_hash_and_hash_pattern(mock_block) self.assertTrue("Block Hash Mismatch" in str(context.exception))
def test_check_hash_and_hash_pattern_whenBlockHasInvalidPattern_thenReturnsFalse(self): mock_block = Mock(Block) transaction = Mock(Transaction) transaction.source = "from" transaction.timestamp = 1498923800 transaction.destination = "to" transaction.amount = 50 transaction.signature = "signature" transaction.tx_hash = "transaction_hash" with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \ patch.object(Blockchain, 'calculate_hash_difficulty', return_value=4) as patched_calculate_hash_difficulty: mock_block.current_hash = "invalid_block_hash" mock_block.index = 35 mock_block.previous_hash = "0000_valid_previous_hash" mock_block.transactions = [transaction] mock_block.nonce = 37 mock_block.timestamp = 12341234 subject = Blockchain() with self.assertRaises(InvalidHash) as context: subject._check_hash_and_hash_pattern(mock_block) self.assertTrue("Incompatible Block Hash" in str(context.exception))
def test_mine_block_whenOneTransaction_andIncorrectTransactionHash_thenReturnsNone(self): transaction = Mock(Transaction) transaction.source = "from" transaction.timestamp = 1498923800 transaction.destination = "to" transaction.amount = 25 transaction.fee = 0.1 transaction.signature = "signature" transaction.tx_hash = "incorrect_transaction_hash" latest_block = Mock(Block) latest_block.index = 35 latest_block.current_hash = "latest_block_hash" with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \ patch.object(Blockchain, 'get_latest_block', return_value=latest_block) as patched_get_latest_block, \ patch.object(Blockchain, 'pop_next_unconfirmed_transaction', side_effect=[transaction, None]) as patched_pop_next_unconfirmed_transaction: subject = Blockchain() resp = subject.mine_block("reward_address") self.assertIsNone(resp) self.assertEqual(patched_pop_next_unconfirmed_transaction.call_count, 2)
def test_mine_block_whenOneDuplicateTransaction_thenReturnsNone(self): transaction = Mock(Transaction) transaction.source = "from" transaction.timestamp = 1498923800 transaction.destination = "to" transaction.amount = 25 transaction.fee = 0.1 transaction.signature = "signature" transaction.tx_hash = "transaction_hash" block_id_with_same_transaction = 38 latest_block = Mock(Block) latest_block.index = 35 latest_block.current_hash = "latest_block_hash" with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \ patch.object(Blockchain, 'get_latest_block', return_value=latest_block) as patched_get_latest_block, \ patch.object(Blockchain, 'pop_next_unconfirmed_transaction', side_effect=[transaction, None]) as patched_pop_next_unconfirmed_transaction, \ patch.object(Blockchain, 'find_duplicate_transactions', return_value=block_id_with_same_transaction) as patched_find_duplicate_transactions: subject = Blockchain() resp = subject.mine_block("reward_address") self.assertIsNone(resp) self.assertEqual(patched_pop_next_unconfirmed_transaction.call_count, 2) patched_find_duplicate_transactions.asssert_called_once_with("transaction_hash")
def test_push_unconfirmed_transaction_thenPushesTransactionAndReturnsNone(self): transaction_one = { 'from': 'from', 'timestamp': 1498923800, 'to': 'to', 'amount': 1, 'signature': 'signature_one', 'hash': "transaction_hash_one" } with patch.object(Blockchain, '__init__', return_value=None) as patched_init: subject = Blockchain() subject.unconfirmed_transactions = Queue() resp = subject.push_unconfirmed_transaction(transaction_one) self.assertTrue(resp) self.assertEqual(subject.unconfirmed_transactions.qsize(), 1)
def test_broadcast_transaction_thenBroadcastsToAllNodes(self): with patch.object(FullNode, '__init__', return_value=None) as patched_init, \ patch.object(FullNode, 'request_nodes_from_all') as patched_request_nodes_from_all, \ patch("crankycoin.time.time", return_value="1508823223") as patched_time_time, \ patch("crankycoin.requests.post") as patched_requests: transaction = Transaction("source", "destination", 0, 0) node = FullNode("127.0.0.1", "reward_address") node.full_nodes = {"127.0.0.1", "127.0.0.2", "127.0.0.3"} node.broadcast_transaction(transaction) patched_request_nodes_from_all.assert_called_once() patched_requests.assert_has_calls([ call("http://127.0.0.1:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}), call("http://127.0.0.2:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}), call("http://127.0.0.3:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}) ], True)
def test_broadcast_transaction_whenRequestException_thenFailsGracefully(self): with patch.object(FullNode, '__init__', return_value=None) as patched_init, \ patch.object(FullNode, 'request_nodes_from_all') as patched_request_nodes_from_all, \ patch("crankycoin.time.time", return_value="1508823223") as patched_time_time, \ patch("crankycoin.requests.post", side_effect=requests.exceptions.RequestException()) as patched_requests: transaction = Transaction("source", "destination", 0, 0) node = FullNode("127.0.0.1", "reward_address") node.full_nodes = {"127.0.0.1", "127.0.0.2", "127.0.0.3"} node.broadcast_transaction(transaction) patched_request_nodes_from_all.assert_called_once() patched_requests.assert_has_calls([ call("http://127.0.0.1:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}), call("http://127.0.0.2:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}), call("http://127.0.0.3:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}) ], True)
def test_request_block_whenIndexIsNumeric_thenRequestsCorrectBlockFromNode(self): mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = '{"nonce": 12345, "index": 29, "transactions": [], "timestamp": 1234567890, "current_hash": "current_hash", "previous_hash": "previous_hash"}' with patch.object(FullNode, '__init__', return_value=None) as patched_init, \ patch("crankycoin.node.Block.current_hash", new_callable=PropertyMock) as patched_block_current_hash, \ patch("crankycoin.requests.get", return_value=mock_response) as patched_requests: patched_block_current_hash.return_value = "current_hash" node = FullNode("127.0.0.1", "reward_address") block = node.request_block("127.0.0.2", "30013", 29) self.assertIsNotNone(block) self.assertEqual(block.index, 29) self.assertEqual(block.transactions, []) self.assertEqual(block.previous_hash, "previous_hash") self.assertEqual(block.current_hash, "current_hash") self.assertEqual(block.timestamp, 1234567890) self.assertEqual(block.nonce, 12345) patched_requests.assert_called_once_with('http://127.0.0.2:30013/block/29')
def test__enter__(slot): # returns its self with patch.object(slot, '_get_connection', side_effects=[Mock(), Mock()]) as mock_gc, \ patch.object(slot, '_send_keepalive') as mock_ka: assert slot == slot.__enter__(), 'Returns itself' assert mock_gc.call_count == 2 assert call.set_isolation_level(0) in slot._normal_conn.method_calls, 'make sure we are in autocommit' assert call.cursor() in slot._repl_conn.method_calls, 'we opened a cursor' assert not mock_ka.called, "with no window we didn't start keep alive" slot._keepalive_window = 1 with patch.object(slot, '_get_connection', side_effects=[Mock(), Mock()]) as mock_gc, \ patch.object(slot, '_send_keepalive') as mock_ka: slot.__enter__() assert mock_ka.called, " we started up keepalive"
def test_delete_slot(slot): with patch.object(psycopg2.ProgrammingError, 'pgcode', new_callable=PropertyMock, return_value=psycopg2.errorcodes.UNDEFINED_OBJECT): pe = psycopg2.ProgrammingError() slot._repl_cursor.drop_replication_slot = Mock(side_effect=pe) slot.delete_slot() slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis') with patch.object(psycopg2.ProgrammingError, 'pgcode', new_callable=PropertyMock, return_value=-1): pe = psycopg2.ProgrammingError() slot._repl_cursor.create_replication_slot = Mock(side_effect=pe) with pytest.raises(psycopg2.ProgrammingError) as e_info: slot.delete_slot() slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis') assert e_info.value.pgcode == -1 slot._repl_cursor.create_replication_slot = Mock(side_effect=Exception) with pytest.raises(Exception): slot.delete_slot() slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')
def test__init__(): mock_client = Mock() with patch.object(boto3, 'client', return_value=mock_client): error_response = {'Error': {'Code': 'ResourceInUseException'}} mock_client.create_stream = Mock(side_effect=ClientError(error_response, 'create_stream')) StreamWriter('blah') assert mock_client.create_stream.called assert call.get_waiter('stream_exists') in mock_client.method_calls, "We handled stream existence" error_response = {'Error': {'Code': 'Something else'}} mock_client.create_stream = Mock(side_effect=ClientError(error_response, 'create_stream')) mock_client.reset_mock() with pytest.raises(ClientError): StreamWriter('blah') assert mock_client.create_stream.called assert call.get_waiter('stream_exists') not in mock_client.method_calls, "never reached"
def test_noop_model_direct_update(self): """ Tests that calling update on a model with no changes will do nothing. """ m0 = TestUpdateModel.create(self.conn, count=5, text='monkey') with patch.object(self.conn.session, 'execute') as execute: m0.update(self.conn) assert execute.call_count == 0 with patch.object(self.conn.session, 'execute') as execute: m0.update(self.conn, count=5) assert execute.call_count == 0 with self.assertRaises(ValidationError): m0.update(self.conn, partition=m0.partition) with self.assertRaises(ValidationError): m0.update(self.conn, cluster=m0.cluster)
def test0060_update_only_if_parent_is_modified(self): 'The left and right fields must only be updated if parent is modified' with Transaction().start(DB_NAME, USER, context=CONTEXT): records = self.mptt.search([ ('parent', '=', None), ]) with patch.object(self.mptt, '_update_tree') as mock: self.mptt.write(records, {'name': 'Parent Records'}) self.assertFalse(mock.called) first_parent, second_parent = records[:2] self.mptt.write(list(first_parent.childs), { 'parent': second_parent.id, }) self.assertTrue(mock.called)
def runTest(self): with patch.object(sys, 'argv', self.build_args): LOG.info("Running with args %s", self.build_args) bad_results, good_results, unmatched_results = build.main() failures = 0 for image, result in six.iteritems(bad_results): if image in self.excluded_images: if result is 'error': continue failures = failures + 1 LOG.warning(">>> Expected image '%s' to fail, please update" " the excluded_images in source file above if the" " image build has been fixed.", image) else: if result is not 'error': continue failures = failures + 1 LOG.critical(">>> Expected image '%s' to succeed!", image) for image in unmatched_results.keys(): LOG.warning(">>> Image '%s' was not matched", image) self.assertEqual(failures, 0, "%d failure(s) occurred" % failures)
def test_enforcing_wait_time_sleeps(mocker): f = concurrent.Future() f.set_result(None) mock_gen_sleep = mocker.patch.object(retry.gen, 'sleep') mock_gen_sleep.return_value = f def in_the_future(_): return 60 request = Mock() policy = retry.RetryPolicy(try_limit=3, sleep_func=in_the_future) yield policy.enforce(request) assert mock_gen_sleep.called is False yield policy.enforce(request) mock_gen_sleep.assert_called_once_with(60)
def test_logging_a_handler(self): """s Given that I have a handler decorated for logging When I call that handler Then I should receive logs indicating the call and return of the handler * N.b. This is an example of using decorators to extend the Brightside pipeline """ handler = MyCommandHandler() request = MyCommand() self._subscriber_registry.register(MyCommand, lambda: handler) logger = logging.getLogger("tests.handlers_testdoubles") with patch.object(logger, 'log') as mock_log: self._commandProcessor.send(request) mock_log.assert_has_calls([call(logging.DEBUG, "Entering handle " + str(request)), call(logging.DEBUG, "Exiting handle " + str(request))])
def test_data_files(self, lsdibag): with patch.object(lsdibag, 'image_files') as mock_imgfiles: with patch.object(lsdibag, 'page_text_files') as mock_txtfiles: mock_imgfiles.return_value = [ '001.tif', '002.tif', '003.tif' ] mock_txtfiles.return_value = [ '001.txt', '002.txt', '003.txt' '001.pos', '002.pos', '003.pos' ] datafiles = lsdibag.data_files() # should include pdf and ocr file assert lsdibag.item.pdf in datafiles assert lsdibag.item.ocr_file in datafiles # should all image and text files for imgfile in mock_imgfiles.return_value: assert imgfile in datafiles for txtfile in mock_txtfiles.return_value: assert txtfile in datafiles
def setUp(self): # Cleanup self.addCleanup(patch.stopall) # Request patch self.request = patch.object(module, 'request').start() # Various patches self.services = patch.object(module, 'services').start() self.original_config = dict(module.config) module.config['STORAGE_BUCKET_NAME'] = self.bucket = 'buckbuck' module.config['STORAGE_ACCESS_KEY_ID'] = '' module.config['STORAGE_SECRET_ACCESS_KEY'] = '' module.config['ACCESS_KEY_EXPIRES_IN'] = '' module.config['STORAGE_PATH_PATTERN'] = '{owner}/{dataset}/{path}' self.s3 = boto3.client('s3')
def patch(self, method): _m = patch.object(self.obj, method) mock = _m.start() self.addCleanup(_m.stop) return mock
def test_has_access_is_in_group(self): """ Test has access if passed user is in group """ user, usrmgr_mock = self.__get_test_instance( "@foouser", 1337, group="foogroup") usrmgr_mock.return_value.user_is_in_group.return_value = True with patch.object(user, "save"): user.has_access("foogroup")
def test_has_access_is_not_in_group(self): """ Test has access if passed user is not in group """ user, usrmgr_mock = self.__get_test_instance( "@foouser", 1337, group="bargroup") usrmgr_mock.return_value.user_is_in_group.return_value = False usrmgr_mock.return_value.verify_user.return_value = False with patch.object(user, "save"): user.has_access("foogroup")
def __get_dummy_object(): """Returns a dummy usermanager object.""" with patch("os.mkdir"): return UserManager()
def __set_config(usrmgr, config): """Sets the config attribute of the passed usermanager instance""" with patch.object(usrmgr, "_UserManager__save_config"): usrmgr.config = config
def test_get_set_config(self): """ Test getting and setting the config attribute """ usrmgr = self.__get_dummy_object() config = {"hello": {}, "world": {}} with patch.object(usrmgr, "_UserManager__save_config"): usrmgr.config = config with patch.object(usrmgr, "_UserManager__load_config"): self.assertEqual(usrmgr.config, config)
def test_userid_is_not_verified_grp(self): """ Test user id is not verified in group check """ usrmgr = self.__get_dummy_object() config = {"foogroup": {"users": [{"id": 1337, "username": "@foo"}]}} self.__set_config(usrmgr, config) with patch.object(usrmgr, "_UserManager__load_config"): result = usrmgr.userid_is_verified_in_group("foogroup", 1234) self.assertFalse(result)
def test_username_is_verified_grp(self): """ Test username is verified in group check """ usrmgr = self.__get_dummy_object() config = {"foogroup": {"users": [{"id": 1337, "username": "@foo"}]}} self.__set_config(usrmgr, config) with patch.object(usrmgr, "_UserManager__load_config"): result = usrmgr.username_is_verified_in_group("foogroup", "@foo") self.assertTrue(result)
def test_usernm_is_not_verified_grp(self): """ Test username is not verified in group check """ usrmgr = self.__get_dummy_object() config = {"foogroup": {"users": [{"id": 1337, "username": "@foo"}]}} self.__set_config(usrmgr, config) with patch.object(usrmgr, "_UserManager__load_config"): result = usrmgr.username_is_verified_in_group("foogroup", "@bar") self.assertFalse(result)
def test_user_is_unverified_grp(self): """ Test username is unverified in group check """ usrmgr = self.__get_dummy_object() config = {"foogroup": {"unverified": ["@foo"]}} self.__set_config(usrmgr, config) with patch.object(usrmgr, "_UserManager__load_config"): result = usrmgr.user_is_unverified_in_group("foogroup", "@foo") self.assertTrue(result)
def test_user_is_not_unverified_grp(self): """ Test username is not unverified in group check """ usrmgr = self.__get_dummy_object() config = {"foogroup": {"unverified": ["@foo"]}} self.__set_config(usrmgr, config) with patch.object(usrmgr, "_UserManager__load_config"): result = usrmgr.user_is_unverified_in_group("foogroup", "@bar") self.assertFalse(result)
def test_user_is_in_group_username(self): """ Test user is in group check if username is passed """ usrmgr = self.__get_dummy_object() config = {"foogroup": {"unverified": ["@foo"]}} self.__set_config(usrmgr, config) with patch.object(usrmgr, "_UserManager__load_config"): result = usrmgr.user_is_in_group("foogroup", username="@foo") self.assertTrue(result)
def test_user_is_in_group_userid(self): """ Test user is in group check if userid is passed """ usrmgr = self.__get_dummy_object() config = {"foogroup": {"users": [{"id": 1337, "username": "@foo"}]}} self.__set_config(usrmgr, config) with patch.object(usrmgr, "_UserManager__load_config"): result = usrmgr.user_is_in_group("foogroup", user_id=1337) self.assertTrue(result)
def test_verify_user_no_verify(self): """ Test verify user if passed user is not unverified in group """ usrmgr = self.__get_dummy_object() config = {"foogroup": {"unverified": []}} self.__set_config(usrmgr, config) with patch.object(usrmgr, "_UserManager__load_config"): result = usrmgr.verify_user(1337, "@foouser", "foogroup") self.assertFalse(result)