Python unittest.mock.patch 模块,object() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用unittest.mock.patch.object()

项目:openbare    作者:openbare    | 项目源码 | 文件源码
def test_checkout_exception(self):
        self.c.login(username=self.user.username, password='str0ngpa$$w0rd')

        # Test check out lendable raises exception
        with patch.object(Lendable,
                          'checkout',
                          side_effect=Exception('Checkout Failed!')):
            response = self.c.get(reverse('library:checkout',
                                          args=['lendable']),
                                  follow=True)

        # Confirm error message displayed
        message = list(response.context['messages'])[0].message
        self.assertEqual(message, 'Checkout Failed!')

        # Confirm lendable not created
        self.assertEqual(Lendable.all_types.count(), 0)
项目:openbare    作者:openbare    | 项目源码 | 文件源码
def test_set_username(self):
        """Test set username method."""
        # Test unicode converts to ascii J?hn to John
        with patch.object(AmazonAccountUtils,
                          'iam_user_exists',
                          return_value=False):
            self.aws_account._set_username()
            self.assertEqual(self.aws_account.username, 'John')

        # Test random username generated if iam user exists
        with patch.object(AmazonAccountUtils,
                          'iam_user_exists',
                          return_value=True):
            self.aws_account._set_username()
            self.assertNotEqual(self.aws_account.username, 'John')
            self.assertEqual(len(self.aws_account.username), 20)

        # Test random username is generated when regex invalid
        # after conversion. ??? converts to ???.
        self.user.username = '???'
        self.aws_account._set_username()
        self.assertNotEqual(self.aws_account.username, '???')
        self.assertEqual(len(self.aws_account.username), 20)
项目:bitstore    作者:datahq    | 项目源码 | 文件源码
def setUp(self):

        # Cleanup
        self.addCleanup(patch.stopall)

        # Request patch
        self.request = patch.object(module, 'request').start()
        # Various patches
        self.services = patch.object(module, 'services').start()

        self.original_config = dict(module.config)
        module.config['STORAGE_BUCKET_NAME'] = self.bucket = 'buckbuck'
        module.config['STORAGE_ACCESS_KEY_ID'] = ''
        module.config['STORAGE_SECRET_ACCESS_KEY'] = ''
        module.config['ACCESS_KEY_EXPIRES_IN'] = ''
        module.config['STORAGE_PATH_PATTERN'] = '{owner}/{dataset}/{path}'
        self.s3 = boto3.client('s3')
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def async_test_scarlett_os(loop):
    """Return a ScarlettOS object pointing at test config dir."""
    # loop._thread_ident = threading.get_ident()

    ss = s.ScarlettSystem(loop)

    ss.config.location_name = 'test scarlett'
    ss.config.config_dir = get_test_config_dir()
    ss.config.latitude = 32.87336
    ss.config.longitude = -117.22743
    ss.config.elevation = 0
    ss.config.time_zone = date_utility.get_time_zone('US/Pacific')
    ss.config.units = METRIC_SYSTEM
    ss.config.skip_pip = True

    # if 'custom_automations.test' not in loader.AVAILABLE_COMPONENTS:
    #     yield from loop.run_in_executor(None, loader.prepare, ss)

    ss.state = s.CoreState.running

    return ss
项目:astropy-bot    作者:astropy    | 项目源码 | 文件源码
def test_close_disabled(self):

        # Second case: time is beyond close deadline, and there is no comment yet
        # but the global option to allow closing has not been enabled. Since there
        # is no comment, the warning gets posted (rather than the 'epilogue')

        self.get_issues.return_value = ['123']
        self.get_label_added_date.return_value = now() - 34443
        self.find_comments.return_value = []

        with app.app_context():
            with patch.object(app, 'stale_issue_close', False):
                process_issues('repo', 'installation')

        assert self.submit_comment.call_count == 1
        expected = ISSUE_CLOSE_WARNING.format(pasttime='9 hours ago', futuretime='5 hours')
        self.submit_comment.assert_called_with(expected)
        assert self.close.call_count == 0
项目:astropy-bot    作者:astropy    | 项目源码 | 文件源码
def setup_method(self, method):

        self.patch_repo_config = patch.object(RepoHandler, 'get_config_value')
        self.patch_open_pull_requests = patch.object(RepoHandler, 'open_pull_requests')
        self.patch_labels = patch.object(PullRequestHandler, 'labels', new_callable=PropertyMock)
        self.patch_last_commit_date = patch.object(PullRequestHandler, 'last_commit_date', new_callable=PropertyMock)
        self.patch_find_comments = patch.object(PullRequestHandler, 'find_comments')
        self.patch_submit_comment = patch.object(PullRequestHandler, 'submit_comment')
        self.patch_close = patch.object(PullRequestHandler, 'close')

        self.autoclose_stale = self.patch_repo_config.start()
        self.open_pull_requests = self.patch_open_pull_requests.start()
        self.labels = self.patch_labels.start()
        self.last_commit_date = self.patch_last_commit_date.start()
        self.find_comments = self.patch_find_comments.start()
        self.submit_comment = self.patch_submit_comment.start()
        self.close = self.patch_close.start()
项目:astropy-bot    作者:astropy    | 项目源码 | 文件源码
def test_close_disabled(self):

        # Time is beyond close deadline, and there is no comment yet but the
        # global option to allow closing has not been enabled. Since there is no
        # comment, the warning gets posted (rather than the 'epilogue')

        self.open_pull_requests.return_value = ['123']
        self.last_commit_date.return_value = now() - 241
        self.find_comments.return_value = []

        with app.app_context():
            with patch.object(app, 'stale_pull_requests_close', False):
                process_pull_requests('repo', 'installation')

        assert self.submit_comment.call_count == 1
        expected = PULL_REQUESTS_CLOSE_WARNING.format(pasttime='4 minutes', futuretime='20 seconds')
        self.submit_comment.assert_called_with(expected)
        assert self.close.call_count == 0
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def test_request_error_handler(api_path, _error_handler=True):
    client = api_path._client
    with patch.object(client, 'request', side_effect=dummy) as client_request:
        with patch.object(client, 'error_handler',
                          side_effect=dummy_error_handler) as error_handler:
            client.loop.run_until_complete(
                requests.Request(api_path, 'get',
                                 _error_handling=_error_handler,
                                 test=1, _test=2)
            )
            assert client_request.called_with(method='get',
                                              url=url,
                                              skip_params=False,
                                              test=2,
                                              params={'test': 1})
            assert error_handler.called is _error_handler
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def test_error_handler_service_unavailable():
    global tries
    tries = 4

    async def service_unavailable(**kwargs):
        global tries
        tries -= 1

        if tries > 0:
            response = MockResponse(status=503)
            raise await exceptions.throw(response)
        else:
            return MockResponse()

    with pytest.raises(exceptions.ServiceUnavailable):
        with patch.object(asyncio, 'sleep', side_effect=dummy) as sleep:
            await utils.error_handler(service_unavailable)()
            assert sleep.called

    await utils.error_handler(service_unavailable)()
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def test_streaming_apis(dummy_client):
    with patch.object(dummy_client, 'request', side_effect=dummy) as request:
        await dummy_client.api.test.get()
        assert request.called

    with patch.object(dummy_client, 'stream_request') as request:
        dummy_client.stream.test.get()
        assert request.called

    client = BasePeonyClient("", "", streaming_apis={'api'})
    with patch.object(client, 'stream_request') as request:
        client.api.test.get()
        assert request.called

    with patch.object(client, 'request', side_effect=dummy) as request:
        await client.stream.test.get()
        assert request.called
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def test_request_proxy(dummy_client):
    class RaiseProxy:

        def __init__(self, *args, proxy=None, **kwargs):
            raise RuntimeError(proxy)

    async with aiohttp.ClientSession() as session:
        with patch.object(session, 'request', side_effect=RaiseProxy):
            try:
                await dummy_client.request(method='get',
                                           url="http://hello.com",
                                           proxy="http://some.proxy.com",
                                           session=session,
                                           future=asyncio.Future())
            except RuntimeError as e:
                assert str(e) == "http://some.proxy.com"
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def test_chunked_upload_fail(dummy_peony_client, medias):
    media = medias['video']
    media_data = await media.download()

    chunk_size = 1024**2

    dummy_request = DummyRequest(dummy_peony_client, media, chunk_size, True)

    with patch.object(dummy_peony_client, 'request',
                      side_effect=dummy_request):
        with patch.object(asyncio, 'sleep', side_effect=dummy) as sleep:
            with pytest.raises(peony.exceptions.MediaProcessingError):
                await dummy_peony_client.upload_media(
                    media_data, chunk_size=chunk_size, chunked=True
                )
                sleep.assert_called_with(5)
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def test_upload_from_url(dummy_peony_client, medias, media_request, url):
    async def dummy_get(get_url):
        assert get_url == url
        return media_request

    async def dummy_request(url, method, future, data=None, skip_params=None):
        assert url == dummy_peony_client.upload.media.upload.url()
        assert method.lower() == 'post'
        assert data['media'] == media_request.content
        assert skip_params
        future.set_result(None)

    with patch.object(dummy_peony_client, '_session') as session:
        session.get = dummy_get
        with patch.object(dummy_peony_client, 'request',
                          side_effect=dummy_request):
            await dummy_peony_client.upload_media(url)
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def test_stream_reconnection_disconnection(stream):
    async def dummy(*args, **kwargs):
        pass

    turn = -1

    with patch.object(stream, '_connect', side_effect=response_disconnection):
        with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
            async for data in stream:
                assert stream._state == DISCONNECTION
                turn += 1

                if turn == 0:
                    assert data == {'connected': True}
                elif turn % 2 == 1:
                    timeout = DISCONNECTION_TIMEOUT * (turn + 1) / 2

                    if timeout > MAX_DISCONNECTION_TIMEOUT:
                        actual = data['reconnecting_in']
                        assert actual == MAX_DISCONNECTION_TIMEOUT
                        break

                    assert data == {'reconnecting_in': timeout, 'error': None}
                else:
                    assert data == {'stream_restart': True}
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def test_stream_reconnection_reconnect(stream):
    async def dummy(*args, **kwargs):
        pass

    turn = -1

    with patch.object(stream, '_connect', side_effect=response_reconnection):
        with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
            async for data in stream:
                assert stream._state == RECONNECTION
                turn += 1

                if turn == 0:
                    assert data == {'connected': True}
                elif turn % 2 == 1:
                    timeout = RECONNECTION_TIMEOUT * 2**(turn // 2)

                    if timeout > MAX_RECONNECTION_TIMEOUT:
                        actual = data['reconnecting_in']
                        assert actual == MAX_RECONNECTION_TIMEOUT
                        break

                    assert data == {'reconnecting_in': timeout, 'error': None}
                else:
                    assert data == {'stream_restart': True}
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def test_stream_reconnection_enhance_your_calm(stream):
    async def dummy(*args, **kwargs):
        pass

    turn = -1

    with patch.object(stream, '_connect', side_effect=response_calm):
        with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
            async for data in stream:
                assert stream._state == ENHANCE_YOUR_CALM
                turn += 1

                if turn >= 100:
                    break

                if turn == 0:
                    assert data == {'connected': True}
                elif turn % 2 == 1:
                    timeout = ENHANCE_YOUR_CALM_TIMEOUT * 2**(turn // 2)
                    assert data == {'reconnecting_in': timeout, 'error': None}
                else:
                    assert data == {'stream_restart': True}
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def test_stream_cancel(event_loop):
    async def cancel(task):
        await asyncio.sleep(0.001)
        task.cancel()

    async def test_stream_iterations(stream):
        while True:
            await test_stream_iteration(stream)

    with aiohttp.ClientSession(loop=event_loop) as session:
        client = peony.client.BasePeonyClient("", "", session=session)
        context = peony.stream.StreamResponse(method='GET',
                                              url="http://whatever.com",
                                              client=client)

        with context as stream:
            with patch.object(stream, '_connect',
                              side_effect=stream_content):
                coro = test_stream_iterations(stream)
                task = event_loop.create_task(coro)
                cancel_task = event_loop.create_task(cancel(task))

                with aiohttp.Timeout(1):
                    await asyncio.wait([task, cancel_task])
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def post(self, _data=None, _headers=None, **kwargs):
        self.count += 1

        if _data is not None:
            with patch.object(oauth.aiohttp.payload, 'BytesPayload',
                              side_effect=dummy_func):
                assert _data._gen_form_urlencoded() == b"access_token=abc"

        if _headers is not None:
            key = "1234567890:0987654321"
            auth = base64.b64encode(key.encode('utf-8')).decode('utf-8')
            assert _headers['Authorization'] == 'Basic ' + auth

        # This is needed to run `test_oauth2_concurrent_refreshes`
        # without that, refresh tasks would be executed sequentially
        # In a sense it is a simulation of a request being fetched
        await asyncio.sleep(0.001)
        return {'access_token': "abc"}
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def test_full_program_user_serialization_with__no_passed_course(self):
        """
        Tests that full ProgramEnrollment serialization works as expected when user
        has no passed courses.
        """
        with patch.object(MMTrack, 'count_courses_passed', return_value=0):
            self.profile.refresh_from_db()
            program = self.program_enrollment.program
            expected_result = {
                'id': program.id,
                'enrollments': self.serialized_enrollments,
                'grade_average': 75,
                'is_learner': True,
                'num_courses_passed': 0,
                'total_courses': 1
            }
            serialized_enrollments = UserProgramSearchSerializer.serialize(self.program_enrollment)
            assert serialized_enrollments == expected_result
项目:channels-api    作者:linuxlewis    | 项目源码 | 文件源码
def test_create(self):
        """Integration that asserts routing a message to the create channel.

        Asserts response is correct and an object is created.
        """
        json_content = self._send_and_consume("websocket.receive", self._build_message("testmodel", {
            'action': 'create',
            'pk': None,
            'request_id': 'client-request-id',
            'data': {'name': 'some-thing'}
        }))
        # it should create an object
        self.assertEqual(TestModel.objects.count(), 1)

        expected = {
            'action': 'create',
            'data': TestModelSerializer(TestModel.objects.first()).data,
            'errors': [],
            'request_id': 'client-request-id',
            'response_status': 201
        }
        # it should respond with the serializer.data
        self.assertEqual(json_content['payload'], expected)
项目:channels-api    作者:linuxlewis    | 项目源码 | 文件源码
def test_create_failure(self):
        """Integration that asserts error handling of a message to the create channel."""

        json_content = self._send_and_consume('websocket.receive', self._build_message("testmodel", {
            'action': 'create',
            'pk': None,
            'request_id': 'client-request-id',
            'data': {},
        }))
        # it should not create an object
        self.assertEqual(TestModel.objects.count(), 0)

        expected = {
            'action': 'create',
            'data': None,
            'request_id': 'client-request-id',
            'errors': [{'name': ['This field is required.']}],
            'response_status': 400
        }
        # it should respond with an error
        self.assertEqual(json_content['payload'], expected)
项目:channels-api    作者:linuxlewis    | 项目源码 | 文件源码
def test_bad_permission_reply(self):
        mock_has_perm = Mock()
        mock_has_perm.return_value = False
        with patch.object(TestModelResourceBinding, 'has_permission', mock_has_perm):
            json_content = self._send_and_consume('websocket.receive', self._build_message('testmodel',{
                'action': 'named_detail',
                'pk': 546,
                'data': {},
                'request_id': 'client-request-id'
            }))

            expected = {
                'action': 'named_detail',
                'errors': ['Permission Denied'],
                'data': None,
                'response_status': 401,
                'request_id': 'client-request-id'
            }

            self.assertEqual(json_content['payload'], expected)
            self.assertEqual(mock_has_perm.called, True)
项目:channels-api    作者:linuxlewis    | 项目源码 | 文件源码
def test_is_authenticated_permission(self):

        with patch.object(TestModelResourceBinding, 'permission_classes', (IsAuthenticated,)):
            content = {
                'action': 'test_list',
                'pk': None,
                'data': {},
                'request_id': 'client-request-id',
            }
            json_content = self._send_and_consume('websocket.receive', self._build_message('testmodel', content))
            # It should block the request
            self.assertEqual(json_content['payload']['response_status'], 401)

            user = User.objects.create(username="testuser", password="123")
            self.client.force_login(user)
            self.client._session_cookie = True

            json_content = self._send_and_consume('websocket.receive', self._build_message('testmodel', content))

            self.assertEqual(json_content['payload']['response_status'], 200)
项目:aioautomatic    作者:armills    | 项目源码 | 文件源码
def test_parent_inheritance(aiohttp_session):
    """Test the first api object creation."""
    mock_kwargs = {
        'arg1': 10,
        'arg2': 20,
    }
    child_kwargs = {
        'arg3': 30,
    }
    parent = base.BaseApiObject(None, request_kwargs=mock_kwargs,
                                client_session=aiohttp_session)
    child = base.BaseApiObject(parent, request_kwargs=child_kwargs)
    assert child.client_session == aiohttp_session
    assert child.loop == aiohttp_session.loop
    assert child.request_kwargs == {
        'arg1': 10,
        'arg2': 20,
        'arg3': 30,
    }
项目:aioautomatic    作者:armills    | 项目源码 | 文件源码
def test_result_list(session):
    """Test the result object."""
    obj = base.ResultList(parent=session, item_class=MockDataObject, resp={
        "_metadata": {
            "count": 2,
            "next": None,
            "previous": None,
            },
        "results": [
            {
                "attr1": "value1",
                "attr2": "value2",
            }, {
                "attr1": "value3"
            }],
        })
    next_list = session.loop.run_until_complete(obj.get_next())
    previous_list = session.loop.run_until_complete(obj.get_previous())
    assert obj.next is None
    assert obj.previous is None
    assert next_list is None
    assert previous_list is None
    assert len(obj) == 2
    assert sorted([item.attr1 for item in obj]) == sorted(["value1", "value3"])
项目:python-moira-client    作者:moira-alert    | 项目源码 | 文件源码
def test_fetch_by_trigger(self):
        client = Client(self.api_url)
        contact_manager = EventManager(client)

        trigger_id = '1'
        trigger = Trigger(client, 'Name', ['tag'], ['target'], 0, 1, id=trigger_id)

        with patch.object(client, 'get', return_value={'list': []}) as get_mock:
            contact_manager.fetch_by_trigger(trigger)

        self.assertTrue(get_mock.called)

        expected_request_data = {
            'p': 0,
            'size': MAX_FETCH_LIMIT
        }

        get_mock.assert_called_with('event/' + trigger_id, params=expected_request_data)
项目:python-moira-client    作者:moira-alert    | 项目源码 | 文件源码
def test_fetch_by_id(self):
        client = Client(self.api_url)
        trigger_manager = TriggerManager(client)

        trigger_id = '1'

        state = {
            'state': 'OK',
            'trigger_id': trigger_id
            }

        trigger = {
            'id': trigger_id,
            'name': 'trigger_name',
            'tags': ['tag'],
            'targets': ['pattern'],
            'warn_value': 0,
            'error_value': 1
            }

        with patch.object(client, 'get', side_effect=[state, trigger]) as get_mock:
            trigger = trigger_manager.fetch_by_id(trigger_id)

            self.assertTrue(get_mock.called)
            self.assertEqual(trigger_id, trigger.id)
项目:python-moira-client    作者:moira-alert    | 项目源码 | 文件源码
def test_add_bad_response(self):
        client = Client(self.api_url)
        contact_manager = ContactManager(client)

        contact_value = '#channel'

        with patch.object(client, 'put', return_value={}) as put_mock, \
                patch.object(client, 'get', return_value={'contacts': []}) as get_mock:
            with self.assertRaises(ResponseStructureError):
                contact_manager.add(contact_value, CONTACT_SLACK)

        self.assertTrue(put_mock.called)
        self.assertTrue(get_mock.called)
        expected_request_data = {
            'value': contact_value,
            'type': CONTACT_SLACK
        }
        put_mock.assert_called_with('contact', json=expected_request_data)
项目:python-moira-client    作者:moira-alert    | 项目源码 | 文件源码
def test_get_invalid_response(self):

        def get(url, params, **kwargs):
            return FakeResponse()

        response = FakeResponse()

        with patch.object(requests, 'get', side_effects=get, return_value=response) as mock_get:
            test_path = 'test_path'

            client = Client(TEST_API_URL, AUTH_HEADERS)
            with self.assertRaises(InvalidJSONError):
                client.get(test_path)

        self.assertTrue(mock_get.called)
        expected_url_call = TEST_API_URL + '/' + test_path
        mock_get.assert_called_with(expected_url_call, headers=AUTH_HEADERS, auth=None)
项目:python-moira-client    作者:moira-alert    | 项目源码 | 文件源码
def test_put_invalid_response(self):
        test_data = {'test': 'test'}

        def put(url, data, **kwargs):
            return FakeResponse()

        response = FakeResponse()

        with patch.object(requests, 'put', side_effects=put, return_value=response) as mock_put:
            test_path = 'test_path'

            client = Client(TEST_API_URL, AUTH_HEADERS)
            with self.assertRaises(InvalidJSONError):
                client.put(test_path, data=test_data)

        self.assertTrue(mock_put.called)
        expected_url_call = TEST_API_URL + '/' + test_path
        mock_put.assert_called_with(expected_url_call, data=test_data, headers=AUTH_HEADERS, auth=None)
项目:pythonrc    作者:lonetwin    | 项目源码 | 文件源码
def test_pprint_compact(self):
        with patch('sys.stdout', new_callable=StringIO) as mock_stdout:

            # - test compact pprint-ing with 80x25 terminal
            with patch.object(pythonrc.subprocess, 'check_output',
                              return_value='25 80'):
                sys.displayhook(list(range(22)))
                self.assertIn('20, 21]', sys.stdout.getvalue())
                sys.displayhook(list(range(23)))
                self.assertIn('21,\n 22]', sys.stdout.getvalue())

            # - test compact pprint-ing with resized 100x25 terminal
            with patch.object(pythonrc.subprocess, 'check_output',
                              return_value=('25 100')):
                sys.displayhook(list(range(23)))
                self.assertIn('21, 22]', sys.stdout.getvalue())
项目:pythonrc    作者:lonetwin    | 项目源码 | 文件源码
def test_completer(self):
        completer = self.pymp.improved_rlcompleter()
        rl = pythonrc.readline

        # - no leading characters
        with patch.object(rl, 'get_line_buffer', return_value='\t'):
            self.assertEqual(completer('\t', 0), '    ')

        # - keyword completion
        with patch.object(rl, 'get_line_buffer', return_value='imp\t'):
            self.assertEqual(completer('imp', 0), 'import ')

        # - module name completion
        with patch.object(rl, 'get_line_buffer', return_value='from '):
            self.assertIn(completer('th', 0), ('this', 'threading'))
            self.assertIn(completer('th', 1), ('this', 'threading'))

        # - pathname completion
        with patch.object(rl, 'get_line_buffer', return_value='./p'):
            self.assertEqual(completer('./py', 0), './pythonrc.py')
项目:CPU-Manager-for-Kubernetes    作者:Intel-Corp    | 项目源码 | 文件源码
def test_remove_all_report_tpr_success(caplog):
    mock = MagicMock()
    mock.remove.return_value = 0

    with patch('kubernetes.config.load_incluster_config',
               MagicMock(return_value=0)), \
            patch('kubernetes.client.ExtensionsV1beta1Api',
                  MagicMock(return_value=0)), \
            patch.object(third_party.ThirdPartyResourceType,
                         'create',
                         MagicMock(return_value=mock)):
        uninstall.remove_all_report()
        caplog_tuple = caplog.record_tuples
        assert caplog_tuple[-1][2] == "\"Reconcilereport\" for node \"{}\" " \
                                      "removed.".format(os.getenv("NODE_NAME"))
        assert caplog_tuple[-3][2] == "\"Nodereport\" for node \"{}\" " \
                                      "removed.".format(os.getenv("NODE_NAME"))
项目:CPU-Manager-for-Kubernetes    作者:Intel-Corp    | 项目源码 | 文件源码
def test_remove_all_report_crd_success(caplog):
    mock = MagicMock()
    mock.remove.return_value = 0

    with patch('kubernetes.config.load_incluster_config',
               MagicMock(return_value=0)), \
            patch('kubernetes.client.ExtensionsV1beta1Api',
                  MagicMock(return_value=0)), \
            patch.object(custom_resource.CustomResourceDefinitionType,
                         'create',
                         MagicMock(return_value=mock)):
        uninstall.remove_all_report()
        caplog_tuple = caplog.record_tuples
        assert caplog_tuple[-1][2] == "\"cmk-reconcilereport\" for node " \
                                      "\"{}\" removed."\
            .format(os.getenv("NODE_NAME"))
        assert caplog_tuple[-3][2] == "\"cmk-nodereport\" for node \"{}\" " \
                                      "removed.".format(os.getenv("NODE_NAME"))
项目:CPU-Manager-for-Kubernetes    作者:Intel-Corp    | 项目源码 | 文件源码
def test_remove_report_tpr_success(caplog):
    fake_tpr_report = MagicMock()
    fake_tpr_report.remove.return_value = 0

    with patch('kubernetes.config.load_incluster_config',
               MagicMock(return_value=0)),\
            patch('kubernetes.client.ExtensionsV1beta1Api',
                  MagicMock(return_value=0)), \
            patch.object(third_party.ThirdPartyResourceType, 'create',
                         MagicMock(return_value=fake_tpr_report)):
        uninstall.remove_report_tpr("NodeReport")
        caplog_tuple = caplog.record_tuples
        assert caplog_tuple[-1][2] == "\"NodeReport\" for node \"{}\" " \
                                      "removed.".format(os.getenv("NODE_NAME"))


# Remove success due to not existing report
项目:CPU-Manager-for-Kubernetes    作者:Intel-Corp    | 项目源码 | 文件源码
def test_remove_report_tpr_success2(caplog):
    fake_http_resp = FakeHTTPResponse(500, "{\"message\":\"fake message\"}",
                                      "{\"reason\":\"NotFound\"}")
    fake_api_exception = K8sApiException(http_resp=fake_http_resp)

    fake_tpr_report = MagicMock()
    fake_tpr_report.remove.side_effect = fake_api_exception

    with patch('kubernetes.config.load_incluster_config',
               MagicMock(return_value=0)),\
            patch('kubernetes.client.ExtensionsV1beta1Api',
                  MagicMock(return_value=0)), \
            patch.object(third_party.ThirdPartyResourceType, 'create',
                         MagicMock(return_value=fake_tpr_report)):

        uninstall.remove_report_tpr("NodeReport")
        caplog_tuple = caplog.record_tuples
        assert \
            caplog_tuple[-2][2] == "\"NodeReport\" for node \"{}\" does" \
                                   " not exist.".format(os.getenv("NODE_NAME"))
        assert \
            caplog_tuple[-1][2] == "\"NodeReport\" for node \"{}\" " \
                                   "removed.".format(os.getenv("NODE_NAME"))
项目:CPU-Manager-for-Kubernetes    作者:Intel-Corp    | 项目源码 | 文件源码
def test_remove_report_tpr_failure(caplog):
    fake_http_resp = FakeHTTPResponse(500, "{\"message\":\"fake message\"}",
                                      "{\"reason\":\"WrongReason\"}")
    fake_api_exception = K8sApiException(http_resp=fake_http_resp)

    fake_tpr_report = MagicMock()
    fake_tpr_report.remove.side_effect = fake_api_exception

    with patch('kubernetes.config.load_incluster_config',
               MagicMock(return_value=0)),\
            patch('kubernetes.client.ExtensionsV1beta1Api',
                  MagicMock(return_value=0)), \
            patch.object(third_party.ThirdPartyResourceType, 'create',
                         MagicMock(return_value=fake_tpr_report)):
        with pytest.raises(SystemExit):
            uninstall.remove_report_tpr("NodeReport")
        caplog_tuple = caplog.record_tuples
        exp_err = "Aborting uninstall: " \
                  "Exception when removing third party resource \"NodeReport\""
        exp_log_err = get_expected_log_error(exp_err, fake_http_resp)
        assert caplog_tuple[-1][2] == exp_log_err
项目:CPU-Manager-for-Kubernetes    作者:Intel-Corp    | 项目源码 | 文件源码
def test_remove_report_crd_success2(caplog):
    fake_http_resp = FakeHTTPResponse(500, "{\"message\":\"fake message\"}",
                                      "{\"reason\":\"NotFound\"}")
    fake_api_exception = K8sApiException(http_resp=fake_http_resp)

    fake_crd_report = MagicMock()
    fake_crd_report.remove.side_effect = fake_api_exception

    with patch('kubernetes.config.load_incluster_config',
               MagicMock(return_value=0)),\
            patch('kubernetes.client.ExtensionsV1beta1Api',
                  MagicMock(return_value=0)), \
            patch.object(custom_resource.CustomResourceDefinitionType,
                         'create',
                         MagicMock(return_value=fake_crd_report)):

        uninstall.remove_report_crd("cmk-nodereport", ["cmk-nr"])
        caplog_tuple = caplog.record_tuples
        assert \
            caplog_tuple[-2][2] == "\"cmk-nodereport\" for node \"{}\" does "\
                                   "not exist.".format(os.getenv("NODE_NAME"))
        assert \
            caplog_tuple[-1][2] == "\"cmk-nodereport\" for node \"{}\" " \
                                   "removed.".format(os.getenv("NODE_NAME"))
项目:CPU-Manager-for-Kubernetes    作者:Intel-Corp    | 项目源码 | 文件源码
def test_remove_report_crd_failure(caplog):
    fake_http_resp = FakeHTTPResponse(500, "{\"message\":\"fake message\"}",
                                      "{\"reason\":\"WrongReason\"}")
    fake_api_exception = K8sApiException(http_resp=fake_http_resp)

    fake_crd_report = MagicMock()
    fake_crd_report.remove.side_effect = fake_api_exception

    with patch('kubernetes.config.load_incluster_config',
               MagicMock(return_value=0)),\
            patch('kubernetes.client.ExtensionsV1beta1Api',
                  MagicMock(return_value=0)), \
            patch.object(custom_resource.CustomResourceDefinitionType,
                         'create',
                         MagicMock(return_value=fake_crd_report)):
        with pytest.raises(SystemExit):
            uninstall.remove_report_crd("cmk-nodereport", ["cmk-nr"])
        caplog_tuple = caplog.record_tuples
        exp_err = "Aborting uninstall: " \
                  "Exception when removing custom resource definition " \
                  "\"cmk-nodereport\""
        exp_log_err = get_expected_log_error(exp_err, fake_http_resp)
        assert caplog_tuple[-1][2] == exp_log_err
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def mock_crypto_passthrough():
    count = 0

    def mocked_urandom(size):
        nonlocal count
        count += 1
        if size > 16:
            template = '<dummy-key-{:0>%s}>' % (size - 12)
        else:
            template = '{:0>%s}' % size
        return template.format(count).encode()

    with patch.object(RSAPublicKey, 'encrypt', new=lambda _, txt: txt), \
            patch.object(RSAPublicKey, 'verify', new=lambda _, sign, txt: None), \
            patch.object(RSAPrivateKey, 'decrypt', new=lambda _, txt: txt), \
            patch.object(RSAPrivateKey, 'sign', new=lambda _, txt: '<mock-signature>'), \
            patch.object(RSAPrivateKey, 'export',
                new=lambda _, pwd: ('<mock-exported-key with password %s>' % pwd).encode()), \
            patch.object(AESKey, 'encrypt', new=lambda _, txt: txt), \
            patch.object(AESKey, 'decrypt', new=lambda _, txt: txt), \
            patch('parsec.crypto.urandom', new=mocked_urandom), \
            patch('parsec.crypto.encrypt_with_password', new=lambda p, s, t: t), \
            patch('parsec.crypto.decrypt_with_password', new=lambda p, s, c: c):
        yield
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def mock_crypto_passthrough():
    count = 0

    def mocked_urandom(size):
        nonlocal count
        count += 1
        if size > 16:
            template = '<dummy-key-{:0>%s}>' % (size - 12)
        else:
            template = '{:0>%s}' % size
        return template.format(count).encode()

    with patch.object(RSAPublicKey, 'encrypt', new=lambda _, txt: txt), \
            patch.object(RSAPublicKey, 'verify', new=lambda _, sign, txt: None), \
            patch.object(RSAPrivateKey, 'decrypt', new=lambda _, txt: txt), \
            patch.object(RSAPrivateKey, 'sign', new=lambda _, txt: '<mock-signature>'), \
            patch.object(RSAPrivateKey, 'export',
                new=lambda _, pwd: ('<mock-exported-key with password %s>' % pwd).encode()), \
            patch.object(AESKey, 'encrypt', new=lambda _, txt: txt), \
            patch.object(AESKey, 'decrypt', new=lambda _, txt: txt), \
            patch('parsec.crypto.urandom', new=mocked_urandom), \
            patch('parsec.crypto.encrypt_with_password', new=lambda p, s, t: t), \
            patch('parsec.crypto.decrypt_with_password', new=lambda p, s, c: c):
        yield
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_pype_use_parent_context_with_swallow(mock_run_pipeline):
    """pype swallowing error in child pipeline."""
    context = Context({
        'pype': {
            'name': 'pipe name',
            'pipeArg': 'argument here',
            'useParentContext': True,
            'skipParse': True,
            'raiseError': False
        }
    })

    logger = logging.getLogger('pypyr.steps.pype')
    with patch.object(logger, 'error') as mock_logger_error:
        pype.run_step(context)

    mock_run_pipeline.assert_called_once_with(
        pipeline_name='pipe name',
        pipeline_context_input='argument here',
        context=context,
        parse_input=False)

    mock_logger_error.assert_called_once_with(
        'Something went wrong pyping pipe name. RuntimeError: whoops')
    # ------------------------ run_step --------------------------------------
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_run_failure_step_group_swallows():
    """Failure step group runner swallows errors."""
    logger = pypyr.log.logger.get_logger('pypyr.stepsrunner')

    with patch('pypyr.stepsrunner.run_step_group') as mock_run_group:
        with patch.object(logger, 'error') as mock_logger_error:
            mock_run_group.side_effect = ContextError('arb error')
            pypyr.stepsrunner.run_failure_step_group(
                {'pipe': 'val'}, Context())

        mock_logger_error.assert_any_call(
            "Failure handler also failed. Swallowing.")

    mock_run_group.assert_called_once_with(pipeline_definition={'pipe': 'val'},
                                           step_group_name='on_failure',
                                           context=Context())

# ------------------------- run_failure_step_group----------------------------#

# ------------------------- run_pipeline_step---------------------------------#
项目:changelog-cli    作者:mc706    | 项目源码 | 文件源码
def test_get_current_version(self):
        sample_data = [
            "## Unreleased\n",
            "---\n",
            "\n",
            "### New\n",
            "\n",
            "### Fixes\n",
            "\n",
            "### Breaks\n",
            "\n",
            "\n",
            "## 0.3.2 - (2017-06-09)\n",
            "---\n",
        ]
        with patch.object(ChangelogUtils, 'get_changelog_data', return_value=sample_data) as mock_read:
            CL = ChangelogUtils()
            result = CL.get_current_version()
        self.assertEqual(result, '0.3.2')
项目:changelog-cli    作者:mc706    | 项目源码 | 文件源码
def test_get_changes(self):
        sample_data = [
            "## Unreleased\n",
            "---\n",
            "\n",
            "### New\n",
            "* added feature x\n",
            "\n",
            "### Fixes\n",
            "* fixed bug 1\n",
            "\n",
            "### Breaks\n",
            "\n",
            "\n",
            "## 0.3.2 - (2017-06-09)\n",
            "---\n",
        ]
        with patch.object(ChangelogUtils, 'get_changelog_data', return_value=sample_data) as mock_read:
            CL = ChangelogUtils()
            result = CL.get_changes()
        self.assertTrue('new' in result)
        self.assertTrue('fix' in result)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def doctest_reset_del():
    """Test that resetting doesn't cause errors in __del__ methods.

    In [2]: class A(object):
       ...:     def __del__(self):
       ...:         print str("Hi")
       ...:

    In [3]: a = A()

    In [4]: get_ipython().reset()
    Hi

    In [5]: 1+1
    Out[5]: 2
    """

# For some tests, it will be handy to organize them in a class with a common
# setup that makes a temp file
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def test_obj_del(self):
        """Test that object's __del__ methods are called on exit."""
        if sys.platform == 'win32':
            try:
                import win32api
            except ImportError:
                raise SkipTest("Test requires pywin32")
        src = ("class A(object):\n"
               "    def __del__(self):\n"
               "        print 'object A deleted'\n"
               "a = A()\n")
        self.mktmp(py3compat.doctest_refactor_print(src))
        if dec.module_not_available('sqlite3'):
            err = 'WARNING: IPython History requires SQLite, your history will not be saved\n'
        else:
            err = None
        tt.ipexec_validate(self.fname, 'object A deleted', err)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def test_tclass(self):
        mydir = os.path.dirname(__file__)
        tc = os.path.join(mydir, 'tclass')
        src = ("%%run '%s' C-first\n"
               "%%run '%s' C-second\n"
               "%%run '%s' C-third\n") % (tc, tc, tc)
        self.mktmp(src, '.ipy')
        out = """\
ARGV 1-: ['C-first']
ARGV 1-: ['C-second']
tclass.py: deleting object: C-first
ARGV 1-: ['C-third']
tclass.py: deleting object: C-second
tclass.py: deleting object: C-third
"""
        if dec.module_not_available('sqlite3'):
            err = 'WARNING: IPython History requires SQLite, your history will not be saved\n'
        else:
            err = None
        tt.ipexec_validate(self.fname, out, err)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_get_content_ids__returns_cached_ids_if_enough_in_cache(self, mock_queryset):
        stream = FollowedStream(user=self.user)
        stream.paginate_by = 1
        with patch.object(stream, "get_queryset") as mock_queryset, \
                patch.object(stream, "get_cached_content_ids") as mock_cached:
            mock_cached.return_value = [self.content1.id], {self.content1.id: self.content1.id}
            stream.get_content_ids()
            self.assertEqual(mock_queryset.call_count, 0)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_get_content_ids__uses_cached_ids(self):
        with patch.object(self.stream, "get_cached_content_ids", return_value=([], {})) as mock_cached:
            self.stream.get_content_ids()
            mock_cached.assert_called_once_with()