Python httplib2 模块,ProxyInfo() 实例源码

我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用httplib2.ProxyInfo()

项目:splunk_ta_ps4_f1_2016    作者:jonathanvarley    | 项目源码 | 文件源码
def _prepare_proxy_info(self, proxy):
        if not proxy or not proxy.enabled:
            _logger.debug('Proxy is not enabled')
            return None

        username = proxy.username \
            if 'username' in proxy and proxy.username else None
        password = proxy.password \
            if 'password' in proxy and proxy.password else None

        proxy_type = self._PROXY_TYPE.get(proxy.type) or self._PROXY_TYPE['http']

        return ProxyInfo(proxy_host=proxy.host,
                         proxy_port=int(proxy.port),
                         proxy_type=proxy_type,
                         proxy_user=username,
                         proxy_pass=password,
                         proxy_rdns=proxy.rdns)
项目:TA-SyncKVStore    作者:georgestarcher    | 项目源码 | 文件源码
def _prepare_proxy_info(self, proxy):
        if not proxy or not proxy.enabled:
            _logger.debug('Proxy is not enabled')
            return None

        username = proxy.username \
            if 'username' in proxy and proxy.username else None
        password = proxy.password \
            if 'password' in proxy and proxy.password else None

        proxy_type = self._PROXY_TYPE.get(proxy.type) or self._PROXY_TYPE['http']

        return ProxyInfo(proxy_host=proxy.host,
                         proxy_port=int(proxy.port),
                         proxy_type=proxy_type,
                         proxy_user=username,
                         proxy_pass=password,
                         proxy_rdns=proxy.rdns)
项目:cb-defense-splunk-app    作者:carbonblack    | 项目源码 | 文件源码
def _prepare_proxy_info(self, proxy):
        if not proxy or not proxy.enabled:
            _logger.debug('Proxy is not enabled')
            return None

        username = proxy.username \
            if 'username' in proxy and proxy.username else None
        password = proxy.password \
            if 'password' in proxy and proxy.password else None

        proxy_type = self._PROXY_TYPE.get(proxy.type) or self._PROXY_TYPE['http']

        return ProxyInfo(proxy_host=proxy.host,
                         proxy_port=int(proxy.port),
                         proxy_type=proxy_type,
                         proxy_user=username,
                         proxy_pass=password,
                         proxy_rdns=proxy.rdns)
项目:TumblrVideos    作者:moedje    | 项目源码 | 文件源码
def __init__(self, consumer_key, consumer_secret="", oauth_token="", oauth_secret="", host="https://api.tumblr.com",
                 proxy_url=None):
        self.host = host
        self.consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
        self.token = oauth.Token(key=oauth_token, secret=oauth_secret)
        self.proxy_url = proxy_url
        if proxy_url:
            print("Generating Proxy From proxy_url")
            self.proxy_info = httplib2.proxy_info_from_url("https://" + proxy_url, 'http')
            self.proxy_info.proxy_rdns = True
            # uri = urlparse(proxy_url)
            # self.proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP,uri.hostname,uri.port,proxy_rdns=True)
        else:
            print("Generating proxy from ENV")
            proxy_url = os.environ.get('HTTPS_PROXY', None)
            if proxy_url:
                uri = urlparse(proxy_url)
                self.proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP, uri.hostname, uri.port, proxy_rdns=True)
            else:
                self.proxy_info = None
项目:iris-relay    作者:linkedin    | 项目源码 | 文件源码
def __init__(self, config=None, proxy_config=None):
        self.config = config
        self.client = None
        if proxy_config and 'host' in proxy_config and 'port' in proxy_config:
            proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP_NO_TUNNEL,
                                   proxy_config['host'], proxy_config['port'])
        else:
            proxy_info = None
        self.http = Http(proxy_info=proxy_info)
        self.var_dir = self.config['var_dir']
        if not exists(self.var_dir):
            makedirs(self.var_dir)
        self.history_id_f = join(self.var_dir, 'gmail_last_history_id')
        if exists(self.history_id_f):
            with open(self.history_id_f) as fh:
                logger.info('Loaded last gmail history id %d', int(fh.read()))
        else:
            # store an invalid id, which will get renewed on next push event
            self.save_last_history_id('1')
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testSimpleProxy(self):
        proxy_info = httplib2.ProxyInfo(socks.PROXY_TYPE_HTTP,
                                        'localhost', self.proxyport)
        client = httplib2.Http(proxy_info=proxy_info)
        src = 'miniserver.py'
        response, body = client.request('http://localhost:%d/%s' %
                                        (self.port, src))
        self.assertEqual(response.status, 200)
        self.assertEqual(body, open(os.path.join(miniserver.HERE, src)).read())
        lf = open(self.logfile).read()
        expect = ('Established connection to host "127.0.0.1" '
                  'using file descriptor')
        self.assertTrue(expect in lf,
                        'tinyproxy did not proxy a request for miniserver')
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testProxyDisabled(self):
        proxy_info = httplib2.ProxyInfo('blah',
                                        'localhost', 0)
        client = httplib2.Http(proxy_info=proxy_info)
        self.assertRaises(httplib2.ProxiesUnavailableError,
                          client.request, 'http://localhost:-1/')
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def test_proxy_headers(self):
        headers = {'key0': 'val0', 'key1': 'val1'}
        pi = httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP, 'localhost', 1234, proxy_headers = headers)
        self.assertEquals(pi.proxy_headers, headers)
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def test_proxy_headers(self):
        headers = {'key0': 'val0', 'key1': 'val1'}
        pi = httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP, 'localhost', 1234, proxy_headers = headers)
        self.assertEqual(pi.proxy_headers, headers)
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def __init__(self, timeout, proxy=None, cacert=None, sessions=False):
#            httplib2.debuglevel=4 
            kwargs = {}
            if proxy:
                import socks
                kwargs['proxy_info'] = httplib2.ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP, **proxy)
                log.info("using proxy %s" % proxy)

            # set optional parameters according to supported httplib2 version
            if httplib2.__version__ >= '0.3.0':
                kwargs['timeout'] = timeout
            if httplib2.__version__ >= '0.7.0':
                kwargs['disable_ssl_certificate_validation'] = cacert is None
                kwargs['ca_certs'] = cacert
            httplib2.Http.__init__(self, **kwargs)
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def __init__(self, timeout, proxy=None, cacert=None, sessions=False):
            ##httplib2.debuglevel=4
            kwargs = {}
            if proxy:
                import socks
                kwargs['proxy_info'] = httplib2.ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP, **proxy)
                log.info("using proxy %s" % proxy)

            # set optional parameters according supported httplib2 version
            if httplib2.__version__ >= '0.3.0':
                kwargs['timeout'] = timeout
            if httplib2.__version__ >= '0.7.0':
                kwargs['disable_ssl_certificate_validation'] = cacert is None
                kwargs['ca_certs'] = cacert
            httplib2.Http.__init__(self, **kwargs)
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def __init__(self, timeout, proxy=None, cacert=None, sessions=False):
            ##httplib2.debuglevel=4
            kwargs = {}
            if proxy:
                import socks
                kwargs['proxy_info'] = httplib2.ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP, **proxy)
                log.info("using proxy %s" % proxy)

            # set optional parameters according supported httplib2 version
            if httplib2.__version__ >= '0.3.0':
                kwargs['timeout'] = timeout
            if httplib2.__version__ >= '0.7.0':
                kwargs['disable_ssl_certificate_validation'] = cacert is None
                kwargs['ca_certs'] = cacert
            httplib2.Http.__init__(self, **kwargs)
项目:catchup4kodi    作者:catchup4kodi    | 项目源码 | 文件源码
def get_httplib():
    http = None
    try:
        if __settings__.getSetting('proxy_use') == 'true':
            (proxy_type, proxy_server, proxy_port, proxy_dns, proxy_user, proxy_pass) = get_proxy()
            logging.info("Using proxy: type %i rdns: %i server: %s port: %s user: %s pass: %s", proxy_type, proxy_dns, proxy_server, proxy_port, "***", "***")
            http = httplib2.Http(proxy_info = httplib2.ProxyInfo(proxy_type, proxy_server, proxy_port, proxy_dns, proxy_user, proxy_pass))
        else:
          logging.info("No Proxy\n")
          http = httplib2.Http()
    except:
        raise
        logging.error('Failed to initialize httplib2 module')

    return http
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def testSimpleProxy(self):
        proxy_info = httplib2.ProxyInfo(socks.PROXY_TYPE_HTTP,
                                        'localhost', self.proxyport)
        client = httplib2.Http(proxy_info=proxy_info)
        src = 'miniserver.py'
        response, body = client.request('http://localhost:%d/%s' %
                                        (self.port, src))
        self.assertEqual(response.status, 200)
        self.assertEqual(body, open(os.path.join(miniserver.HERE, src)).read())
        lf = open(self.logfile).read()
        expect = ('Established connection to host "127.0.0.1" '
                  'using file descriptor')
        self.assertTrue(expect in lf,
                        'tinyproxy did not proxy a request for miniserver')
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def testProxyDisabled(self):
        proxy_info = httplib2.ProxyInfo('blah',
                                        'localhost', 0)
        client = httplib2.Http(proxy_info=proxy_info)
        self.assertRaises(httplib2.ProxiesUnavailableError,
                          client.request, 'http://localhost:-1/')
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def __init__(self, timeout, proxy=None, cacert=None, sessions=False):
            ##httplib2.debuglevel=4
            kwargs = {}
            if proxy:
                import socks
                kwargs['proxy_info'] = httplib2.ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP, **proxy)
                log.info("using proxy %s" % proxy)

            # set optional parameters according supported httplib2 version
            if httplib2.__version__ >= '0.3.0':
                kwargs['timeout'] = timeout
            if httplib2.__version__ >= '0.7.0':
                kwargs['disable_ssl_certificate_validation'] = cacert is None
                kwargs['ca_certs'] = cacert
            httplib2.Http.__init__(self, **kwargs)
项目:ecodash    作者:Servir-Mekong    | 项目源码 | 文件源码
def testSimpleProxy(self):
        proxy_info = httplib2.ProxyInfo(socks.PROXY_TYPE_HTTP,
                                        'localhost', self.proxyport)
        client = httplib2.Http(proxy_info=proxy_info)
        src = 'miniserver.py'
        response, body = client.request('http://localhost:%d/%s' %
                                        (self.port, src))
        self.assertEqual(response.status, 200)
        self.assertEqual(body, open(os.path.join(miniserver.HERE, src)).read())
        lf = open(self.logfile).read()
        expect = ('Established connection to host "127.0.0.1" '
                  'using file descriptor')
        self.assertTrue(expect in lf,
                        'tinyproxy did not proxy a request for miniserver')
项目:ecodash    作者:Servir-Mekong    | 项目源码 | 文件源码
def testProxyDisabled(self):
        proxy_info = httplib2.ProxyInfo('blah',
                                        'localhost', 0)
        client = httplib2.Http(proxy_info=proxy_info)
        self.assertRaises(httplib2.ProxiesUnavailableError,
                          client.request, 'http://localhost:-1/')
项目:rekall-agent-server    作者:rekall-innovations    | 项目源码 | 文件源码
def __init__(self, timeout, proxy=None, cacert=None, sessions=False):
#            httplib2.debuglevel=4 
            kwargs = {}
            if proxy:
                import socks
                kwargs['proxy_info'] = httplib2.ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP, **proxy)
                log.info("using proxy %s" % proxy)

            # set optional parameters according to supported httplib2 version
            if httplib2.__version__ >= '0.3.0':
                kwargs['timeout'] = timeout
            if httplib2.__version__ >= '0.7.0':
                kwargs['disable_ssl_certificate_validation'] = cacert is None
                kwargs['ca_certs'] = cacert
            httplib2.Http.__init__(self, **kwargs)
项目:Pybelt    作者:Ekultek    | 项目源码 | 文件源码
def configure_proxy(self):
        """ Configure a proxy so that the API accepts it, had to use httplib2.. """
        if self.proxy is not None:
            prox_list = self.proxy.split(":")
            prox_list[0] = ""
            prox_list[1] = '.'.join(re.findall(r"\d+", prox_list[1]))
            return httplib2.ProxyInfo(proxy_type=httplib2.socks.PROXY_TYPE_HTTP,
                                      proxy_host=prox_list[1],
                                      proxy_port=prox_list[2])
        else:
            return None
项目:SurfaceWaterTool    作者:Servir-Mekong    | 项目源码 | 文件源码
def testSimpleProxy(self):
        proxy_info = httplib2.ProxyInfo(socks.PROXY_TYPE_HTTP,
                                        'localhost', self.proxyport)
        client = httplib2.Http(proxy_info=proxy_info)
        src = 'miniserver.py'
        response, body = client.request('http://localhost:%d/%s' %
                                        (self.port, src))
        self.assertEqual(response.status, 200)
        self.assertEqual(body, open(os.path.join(miniserver.HERE, src)).read())
        lf = open(self.logfile).read()
        expect = ('Established connection to host "127.0.0.1" '
                  'using file descriptor')
        self.assertTrue(expect in lf,
                        'tinyproxy did not proxy a request for miniserver')
项目:SurfaceWaterTool    作者:Servir-Mekong    | 项目源码 | 文件源码
def testProxyDisabled(self):
        proxy_info = httplib2.ProxyInfo('blah',
                                        'localhost', 0)
        client = httplib2.Http(proxy_info=proxy_info)
        self.assertRaises(httplib2.ProxiesUnavailableError,
                          client.request, 'http://localhost:-1/')
项目:slugiot-client    作者:slugiot    | 项目源码 | 文件源码
def __init__(self, timeout, proxy=None, cacert=None, sessions=False):
            ##httplib2.debuglevel=4
            kwargs = {}
            if proxy:
                import socks
                kwargs['proxy_info'] = httplib2.ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP, **proxy)
                log.info("using proxy %s" % proxy)

            # set optional parameters according supported httplib2 version
            if httplib2.__version__ >= '0.3.0':
                kwargs['timeout'] = timeout
            if httplib2.__version__ >= '0.7.0':
                kwargs['disable_ssl_certificate_validation'] = cacert is None
                kwargs['ca_certs'] = cacert
            httplib2.Http.__init__(self, **kwargs)
项目:iris    作者:linkedin    | 项目源码 | 文件源码
def __init__(self, config, proxy_cfg=None):
        self.config = config
        if proxy_cfg:
            proxy_info = ProxyInfo(
                socks.PROXY_TYPE_HTTP_NO_TUNNEL,
                proxy_cfg['host'],
                proxy_cfg['port']
            )
        else:
            proxy_info = None
        self.http = Http(proxy_info=proxy_info)
        self.client = None
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def testSimpleProxy(self):
        proxy_info = httplib2.ProxyInfo(socks.PROXY_TYPE_HTTP,
                                        'localhost', self.proxyport)
        client = httplib2.Http(proxy_info=proxy_info)
        src = 'miniserver.py'
        response, body = client.request('http://localhost:%d/%s' %
                                        (self.port, src))
        self.assertEqual(response.status, 200)
        self.assertEqual(body, open(os.path.join(miniserver.HERE, src)).read())
        lf = open(self.logfile).read()
        expect = ('Established connection to host "127.0.0.1" '
                  'using file descriptor')
        self.assertTrue(expect in lf,
                        'tinyproxy did not proxy a request for miniserver')
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def testProxyDisabled(self):
        proxy_info = httplib2.ProxyInfo('blah',
                                        'localhost', 0)
        client = httplib2.Http(proxy_info=proxy_info)
        self.assertRaises(httplib2.ProxiesUnavailableError,
                          client.request, 'http://localhost:-1/')
项目:IoT_Parking    作者:leeshlay    | 项目源码 | 文件源码
def testSimpleProxy(self):
        proxy_info = httplib2.ProxyInfo(socks.PROXY_TYPE_HTTP,
                                        'localhost', self.proxyport)
        client = httplib2.Http(proxy_info=proxy_info)
        src = 'miniserver.py'
        response, body = client.request('http://localhost:%d/%s' %
                                        (self.port, src))
        self.assertEqual(response.status, 200)
        self.assertEqual(body, open(os.path.join(miniserver.HERE, src)).read())
        lf = open(self.logfile).read()
        expect = ('Established connection to host "127.0.0.1" '
                  'using file descriptor')
        self.assertTrue(expect in lf,
                        'tinyproxy did not proxy a request for miniserver')
项目:IoT_Parking    作者:leeshlay    | 项目源码 | 文件源码
def testProxyDisabled(self):
        proxy_info = httplib2.ProxyInfo('blah',
                                        'localhost', 0)
        client = httplib2.Http(proxy_info=proxy_info)
        self.assertRaises(httplib2.ProxiesUnavailableError,
                          client.request, 'http://localhost:-1/')
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def __init__(self, timeout, proxy=None, cacert=None, sessions=False):
            ##httplib2.debuglevel=4
            kwargs = {}
            if proxy:
                import socks
                kwargs['proxy_info'] = httplib2.ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP, **proxy)
                log.info("using proxy %s" % proxy)

            # set optional parameters according supported httplib2 version
            if httplib2.__version__ >= '0.3.0':
                kwargs['timeout'] = timeout
            if httplib2.__version__ >= '0.7.0':
                kwargs['disable_ssl_certificate_validation'] = cacert is None
                kwargs['ca_certs'] = cacert
            httplib2.Http.__init__(self, **kwargs)