Python requests_mock 模块,Adapter() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用requests_mock.Adapter()

项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_propagation_true(self):
        adapter = Adapter()
        tracer, session = get_traced_session()
        session.mount('mock', adapter)
        session.distributed_tracing = True

        with tracer.trace('root') as root:
            def matcher(request):
                return self.headers_here(tracer, request, root)
            adapter.register_uri('GET', 'mock://datadog/foo', additional_matcher=matcher, text='bar')
            resp = session.get('mock://datadog/foo')
            eq_(200, resp.status_code)
            eq_('bar', resp.text)

        spans = tracer.writer.spans
        root, req = spans
        eq_('root', root.name)
        eq_('requests.request', req.name)
        eq_(root.trace_id, req.trace_id)
        eq_(root.span_id, req.parent_id)
项目:aos-pyez    作者:Apstra    | 项目源码 | 文件源码
def setUp(self):

        self.adapter = requests_mock.Adapter()
        self.aos = Session(server=Config.test_server)
        self.aos.api.requests.mount('https://%s' % Config.test_server, self.adapter)

        def get_api_version(request, context):
            context.status_code = 200
            return dict(version=Config.test_server_api_version)

        self.adapter.register_uri('GET', '/api/versions/api', json=get_api_version)

        def get_build_version(request, context):
            context.status_code = 200
            return dict(version=Config.test_server_build_version)

        self.adapter.register_uri('GET', '/api/version', json=get_build_version)

        self.adapter.register_uri(
            'POST', '/api/user/login', json=dict(token=Config.test_auth_token))

        # generally enable the probe step to pass
        patch.object(self.aos.api, 'probe', return_value=True).start()
项目:python-lunrclient    作者:rackerlabs    | 项目源码 | 文件源码
def test_list(self):
        client = StorageClient("mock://")
        adapter = Adapter()

        # should probably figure out a better way to do this
        client.volumes.session.mount('mock', adapter)

        adapter.register_uri('GET', 'mock:///volumes', text=dumps([{
                "export": {},
                "id": "thrawn",
                "name": "thrawn",
                "origin": "",
                "path": "/dev/lunr-volume/thrawn",
                "size": 12582912
            }])
        )

        # List all the volumes on a storage node
        volumes = client.volumes.list()

        # Asserts
        self.assertIn('export', volumes[0])
        self.assertIn('id', volumes[0])
        self.assertIn('origin', volumes[0])
        self.assertIn('path', volumes[0])
        self.assertIn('size', volumes[0])
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_propagation_false(self):
        adapter = Adapter()
        tracer, session = get_traced_session()
        session.mount('mock', adapter)
        session.distributed_tracing = False

        with tracer.trace('root'):
            def matcher(request):
                return self.headers_not_here(tracer, request)
            adapter.register_uri('GET', 'mock://datadog/foo', additional_matcher=matcher, text='bar')
            resp = session.get('mock://datadog/foo')
            eq_(200, resp.status_code)
            eq_('bar', resp.text)
项目:aos-pyez    作者:Apstra    | 项目源码 | 文件源码
def setUp(self):
        self.adapter = requests_mock.Adapter()
        self.aos = Session(server=Config.test_server)
        self.aos.api.requests.mount('https://%s' % Config.test_server, self.adapter)

        self.adapter.register_uri(
            'GET', '/api/versions/api',
            json=dict(version=Config.test_server_version))

        self.adapter.register_uri(
            'POST', '/api/user/login', json=dict(token=Config.test_auth_token))

        # generally enable the probe step to pass
        patch.object(self.aos.api, 'probe', return_value=True).start()
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def setUp(self):
        super(TestDruidHook, self).setUp()

        configuration.load_test_config()
        args = {
            'owner': 'airflow',
            'start_date': '2017-01-01'
        }
        self.dag = DAG('hive_to_druid', default_args=args)

        session = requests.Session()
        adapter = requests_mock.Adapter()
        session.mount('mock', adapter)
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def setUp(self):
        super(TestDruidHook, self).setUp()

        session = requests.Session()
        adapter = requests_mock.Adapter()
        session.mount('mock', adapter)
项目:electricitymap    作者:tmrowco    | 项目源码 | 文件源码
def setUp(self):
        self.session = Session()
        self.adapter = Adapter()
        self.session.mount('http://', self.adapter)
        response_text = resource_string("parsers.test.mocks", "IN_AP.html")
        self.adapter.register_uri(ANY, ANY, text=response_text)
项目:electricitymap    作者:tmrowco    | 项目源码 | 文件源码
def setUp(self):
        self.session = Session()
        self.adapter = Adapter()
        self.session.mount('http://', self.adapter)
项目:electricitymap    作者:tmrowco    | 项目源码 | 文件源码
def setUp(self):
        self.session = Session()
        self.adapter = Adapter()
        self.session.mount('https://', self.adapter)
项目:electricitymap    作者:tmrowco    | 项目源码 | 文件源码
def setUp(self):
        self.session = Session()
        self.adapter = Adapter()
        self.session.mount('http://', self.adapter)
项目:pynso    作者:DimensionDataResearch    | 项目源码 | 文件源码
def setUp(self):
        self.connection = NSOConnection('test.com', 'test', 'test', False)
        self.adapter = requests_mock.Adapter()
        self.connection.session.mount('http://test.com/', self.adapter)
项目:skywise-insight-py    作者:wdtinc    | 项目源码 | 文件源码
def setUp(self):
        InsightResource.set_site('http://my.skywise.host')
        InsightResource.set_user('my-skywise-user')
        InsightResource.set_password('my-skywise-password')
        InsightResource.set_use_session_for_async(True)

        self.adapter = requests_mock.Adapter()
        session = InsightResource.get_session()
        session.mount('http://my.skywise.host', self.adapter)
项目:postcard_creator_wrapper    作者:abertschi    | 项目源码 | 文件源码
def create_token():
    global adapter_token
    adapter_token = requests_mock.Adapter()
    Token._create_session = create_mocked_session
    return Token(_protocol='mock://')
项目:postcard_creator_wrapper    作者:abertschi    | 项目源码 | 文件源码
def create_postcard_creator():
    global adapter_pcc
    adapter_pcc = requests_mock.Adapter()
    PostcardCreator._create_session = create_mocked_session
    token = create_token()
    token.token_expires_in = 3600
    token.token_type = 'Bearer'
    token.token = 0
    return PostcardCreator(token=token, _protocol='mock://')
项目:Python-Microservices-Development    作者:PacktPublishing    | 项目源码 | 文件源码
def setUp(self):
        self.app = TestApp(app)
        # mocking the request calls
        session = get_connector(app)
        self.adapter = requests_mock.Adapter()
        session.mount('http://', self.adapter)