Python urllib2 模块,ProxyHandler() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib2.ProxyHandler()

项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def get(self, url, proxy=None):
        if proxy:
            proxy = urllib2.ProxyHandler({'http': proxy})
            opener = urllib2.build_opener(proxy)
            urllib2.install_opener(opener)

        try:
            response = urllib2.urlopen(url)
        except HTTPError, e:
            resp = e.read()
            self.status_code = e.code
        except URLError, e:
            resp = e.read()
            self.status_code = e.code
        else:
            self.status_code = response.code
            resp = response.read()

        return resp
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def download_from_url(url):
    proxy = env_server.get_proxy()
    if proxy['enabled']:
        server = proxy['server'].replace('http://', '')
        proxy_dict = {
            'http': 'http://{login}:{pass}@{0}'.format(server, **proxy)
        }
        proxy_handler = urllib2.ProxyHandler(proxy_dict)
        auth = urllib2.HTTPBasicAuthHandler()
        opener = urllib2.build_opener(proxy_handler, auth, urllib2.HTTPHandler)
        urllib2.install_opener(opener)

    run_thread = tc.ServerThread(env_inst.ui_main)
    run_thread.kwargs = dict(url=url, timeout=1)
    run_thread.routine = urllib2.urlopen
    run_thread.run()
    result_thread = tc.treat_result(run_thread, silent=True)
    if result_thread.isFailed():
        return False
    else:
        return result_thread.result
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def ipcheck(proxy):
    try:
        pxhandle = urllib2.ProxyHandler({"http": proxy})
        opener = urllib2.build_opener(pxhandle)
        urllib2.install_opener(opener)
        myip = urllib2.urlopen('http://www.whatismyip.com/automation/n09230945.asp').read()
        xs =  re.findall(('\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}'), StripTags(myip))
        if xs[0] == myipadress or myipadress == myip:
            trans_list.append(proxy)
            print proxy[:-1],"\t- ALIVE -", timer(), "- TRANSPARENT"
        elif xs == None:
            pass
        else:
            anon_list.append(proxy)
            print proxy[:-1],"\t- ALIVE -", timer(), "- EXT-iP :",xs[0]
    except KeyboardInterrupt:
        print "\n\nCTRL+C - check temporary proxylist file\n\n"
        sys.exit(0)
    except:
        pass
项目:dart    作者:lmco    | 项目源码 | 文件源码
def download_vcpython27(self):
        """
        Download vcpython27 since some Windows 7 boxes have it and some don't.
        :return: None
        """

        self._prepare_for_download()

        logger.info('Beginning download of vcpython27... this may take a few minutes...')

        with open(os.path.join(DOWNLOADS_DIR, 'vcpython27.msi'), 'wb') as f:

            if self.PROXY is not None:
                opener = urllib2.build_opener(
                    urllib2.HTTPHandler(),
                    urllib2.HTTPSHandler(),
                    urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY})
                )
                urllib2.install_opener(opener)

            f.write(urllib2.urlopen(self.VCPYTHON27_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read())

        logger.debug('Download of vcpython27 complete')
项目:dart    作者:lmco    | 项目源码 | 文件源码
def download_python(self):
        """
        Download Python
        :return: None
        """

        self._prepare_for_download()

        logger.info('Beginning download of python')

        with open(os.path.join(DOWNLOADS_DIR, 'python-installer.msi'), 'wb') as f:

            if self.PROXY is not None:
                opener = urllib2.build_opener(
                    urllib2.HTTPHandler(),
                    urllib2.HTTPSHandler(),
                    urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY})
                )
                urllib2.install_opener(opener)

            f.write(urllib2.urlopen(self.PYTHON_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read())

        logger.debug('Download of python complete')
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def wait_xxnet_exit():

    def http_request(url, method="GET"):
        proxy_handler = urllib2.ProxyHandler({})
        opener = urllib2.build_opener(proxy_handler)
        try:
            req = opener.open(url)
            return req
        except Exception as e:
            #logging.exception("web_control http_request:%s fail:%s", url, e)
            return False

    for i in range(20):
        host_port = config.get(["modules", "launcher", "control_port"], 8085)
        req_url = "http://127.0.0.1:{port}/quit".format(port=host_port)
        if http_request(req_url) == False:
            return True
        time.sleep(1)
    return False
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def wait_xxnet_exit():

    def http_request(url, method="GET"):
        proxy_handler = urllib2.ProxyHandler({})
        opener = urllib2.build_opener(proxy_handler)
        try:
            req = opener.open(url)
            return req
        except Exception as e:
            #logging.exception("web_control http_request:%s fail:%s", url, e)
            return False

    for i in range(20):
        host_port = config.get(["modules", "launcher", "control_port"], 8085)
        req_url = "http://127.0.0.1:{port}/quit".format(port=host_port)
        if http_request(req_url) == False:
            return True
        time.sleep(1)
    return False
项目:aerospike-telemetry-agent    作者:aerospike    | 项目源码 | 文件源码
def __init__(self, url, proxy, cafile):
        self.url = url
        self.proxy = proxy
        if proxy:
            logging.info("Using HTTPS proxy: " + proxy)
            proxy_handler = urllib2.ProxyHandler({'https': proxy})
            opener = urllib2.build_opener(proxy_handler)
            urllib2.install_opener(opener)
        self.kwargs = {}
        if cafile and hasattr(ssl, "create_default_context"):
            logging.info("Using CA file: " + cafile)
            ctx = ssl.create_default_context()
            ctx.load_verify_locations(cafile = cafile)
            self.kwargs['context'] = ctx

    # given an infoMap returned by the local node, call up the home server
项目:WebScraping    作者:liinnux    | 项目源码 | 文件源码
def download(url, headers, proxy, num_retries, data=None):
    print 'Downloading:', url
    request = urllib2.Request(url, data, headers)
    opener = urllib2.build_opener()
    if proxy:
        proxy_params = {urlparse.urlparse(url).scheme: proxy}
        opener.add_handler(urllib2.ProxyHandler(proxy_params))
    try:
        response = opener.open(request)
        html = response.read()
        code = response.code
    except urllib2.URLError as e:
        print 'Download error:', e.reason
        html = ''
        if hasattr(e, 'code'):
            code = e.code
            if num_retries > 0 and 500 <= code < 600:
                # retry 5XX HTTP errors
                html = download(url, headers, proxy, num_retries-1, data)
        else:
            code = None
    return html
项目:WebScraping    作者:liinnux    | 项目源码 | 文件源码
def download(url, headers, proxy, num_retries, data=None):
    print 'Downloading:', url
    request = urllib2.Request(url, data, headers)
    opener = urllib2.build_opener()
    if proxy:
        proxy_params = {urlparse.urlparse(url).scheme: proxy}
        opener.add_handler(urllib2.ProxyHandler(proxy_params))
    try:
        response = opener.open(request)
        html = response.read()
        code = response.code
    except urllib2.URLError as e:
        print 'Download error:', e.reason
        html = ''
        if hasattr(e, 'code'):
            code = e.code
            if num_retries > 0 and 500 <= code < 600:
                # retry 5XX HTTP errors
                return download(url, headers, proxy, num_retries-1, data)
        else:
            code = None
    return html
项目:WebScraping    作者:liinnux    | 项目源码 | 文件源码
def download5(url, user_agent='wswp', proxy=None, num_retries=2):
    """Download function with support for proxies"""
    print 'Downloading:', url
    headers = {'User-agent': user_agent}
    request = urllib2.Request(url, headers=headers)
    opener = urllib2.build_opener()
    if proxy:
        proxy_params = {urlparse.urlparse(url).scheme: proxy}
        opener.add_handler(urllib2.ProxyHandler(proxy_params))
    try:
        html = opener.open(request).read()
    except urllib2.URLError as e:
        print 'Download error:', e.reason
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # retry 5XX HTTP errors
                html = download5(url, user_agent, proxy, num_retries-1)
    return html
项目:WebScraping    作者:liinnux    | 项目源码 | 文件源码
def download(self, url, headers, proxy, num_retries, data=None):
        print 'Downloading:', url
        request = urllib2.Request(url, data, headers or {})
        opener = self.opener or urllib2.build_opener()
        if proxy:
            proxy_params = {urlparse.urlparse(url).scheme: proxy}
            opener.add_handler(urllib2.ProxyHandler(proxy_params))
        try:
            response = opener.open(request)
            html = response.read()
            code = response.code
        except Exception as e:
            print 'Download error:', str(e)
            html = ''
            if hasattr(e, 'code'):
                code = e.code
                if num_retries > 0 and 500 <= code < 600:
                    # retry 5XX HTTP errors
                    return self._get(url, headers, proxy, num_retries-1, data)
            else:
                code = None
        return {'html': html, 'code': code}
项目:DistributeCrawler    作者:SmallHedgehog    | 项目源码 | 文件源码
def add_proxy(self, addr, proxy_type='all',
                  user=None, password=None):
        """Add proxy"""
        if proxy_type == 'all':
            self.proxies = {
                'http': addr,
                'https': addr,
                'ftp': addr
            }
        else:
            self.proxies[proxy_type] = addr
        proxy_handler = urllib2.ProxyHandler(self.proxies)
        self.__build_opener()
        self.opener.add_handler(proxy_handler)

        if user and password:
            pwd_manager = urllib2.HTTPPasswordMgrWithDefaultRealm()
            pwd_manager.add_password(None, addr, user, password)
            proxy_auth_handler = urllib2.ProxyBasicAuthHandler(pwd_manager)
            self.opener.add_handler(proxy_auth_handler)

        urllib2.install_opener(self.opener)
项目:Belati    作者:aancw    | 项目源码 | 文件源码
def check_single_proxy_status(self, proxy_address, domain_check):
        try:
            parse = urlparse(proxy_address)
            proxy_scheme = parse.scheme
            proxy = str(parse.hostname) + ':' + str(parse.port)
            proxy_handler = urllib2.ProxyHandler({ proxy_scheme: proxy})
            opener = urllib2.build_opener(proxy_handler)
            opener.addheaders = [('User-agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36')]
            urllib2.install_opener(opener)
            req = urllib2.Request(domain_check)
            start_time = time.time()
            sock = urllib2.urlopen(req)
            end_time = time.time()
            diff_time = round(end_time - start_time, 3)
            log.console_log(Y + "{}[+] {} OK! Response Time : {}s".format(Y, proxy_address, str(diff_time), W ))
            return 'ok'
        except urllib2.HTTPError, e:
            print('Error code: ' + str(e.code))
            return e.code
        except Exception, detail:
            print('ERROR ' +  str(detail))
            return 1
项目:catchup4kodi    作者:catchup4kodi    | 项目源码 | 文件源码
def _update_opener(self):
        '''
        Builds and installs a new opener to be used by all future calls to 
        :func:`urllib2.urlopen`.
        '''
        if self._http_debug:
            http = urllib2.HTTPHandler(debuglevel=1)
        else:
            http = urllib2.HTTPHandler()

        if self._proxy:
            opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
                                          urllib2.ProxyHandler({'http': 
                                                                self._proxy}), 
                                          urllib2.HTTPBasicAuthHandler(),
                                          http)

        else:
            opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
                                          urllib2.HTTPBasicAuthHandler(),
                                          http)
        urllib2.install_opener(opener)
项目:catchup4kodi    作者:catchup4kodi    | 项目源码 | 文件源码
def _update_opener(self):
        '''
        Builds and installs a new opener to be used by all future calls to 
        :func:`urllib2.urlopen`.
        '''
        if self._http_debug:
            http = urllib2.HTTPHandler(debuglevel=1)
        else:
            http = urllib2.HTTPHandler()

        if self._proxy:
            opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
                                          urllib2.ProxyHandler({'http': 
                                                                self._proxy}), 
                                          urllib2.HTTPBasicAuthHandler(),
                                          http)

        else:
            opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
                                          urllib2.HTTPBasicAuthHandler(),
                                          http)
        urllib2.install_opener(opener)
项目:catchup4kodi    作者:catchup4kodi    | 项目源码 | 文件源码
def _update_opener(self):
        '''
        Builds and installs a new opener to be used by all future calls to 
        :func:`urllib2.urlopen`.
        '''
        if self._http_debug:
            http = urllib2.HTTPHandler(debuglevel=1)
        else:
            http = urllib2.HTTPHandler()

        if self._proxy:
            opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
                                          urllib2.ProxyHandler({'http': 
                                                                self._proxy}), 
                                          urllib2.HTTPBasicAuthHandler(),
                                          http)

        else:
            opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
                                          urllib2.HTTPBasicAuthHandler(),
                                          http)
        urllib2.install_opener(opener)
项目:catchup4kodi    作者:catchup4kodi    | 项目源码 | 文件源码
def _update_opener(self):
        '''
        Builds and installs a new opener to be used by all future calls to 
        :func:`urllib2.urlopen`.
        '''
        if self._http_debug:
            http = urllib2.HTTPHandler(debuglevel=1)
        else:
            http = urllib2.HTTPHandler()

        if self._proxy:
            opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
                                          urllib2.ProxyHandler({'http': 
                                                                self._proxy}), 
                                          urllib2.HTTPBasicAuthHandler(),
                                          http)

        else:
            opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
                                          urllib2.HTTPBasicAuthHandler(),
                                          http)
        urllib2.install_opener(opener)
项目:crawler    作者:brantou    | 项目源码 | 文件源码
def check_gn_proxy(proxy, protocal_type='HTTP'):
    url = 'http://icanhazip.com'
    proxy_handler = urllib2.ProxyHandler({
        'http': 'http://' + proxy,
        'https': 'https://' + proxy,
    })
    if protocal_type == 'HTTPS':
        url = 'https://icanhazip.com'

    opener = urllib2.build_opener(proxy_handler, urllib2.HTTPHandler)
    try:
        response = opener.open(url, timeout=3)
        res_ip = response.read().strip()
        return response.code == 200 and res_ip == proxy.split(':')[0]
    except Exception:
        return False
项目:CN_POI_Data    作者:lyBigdata    | 项目源码 | 文件源码
def checkAlive(self,ip,port,protocol):
        testUrl = "https://www.baidu.com/"
        req_timeout = 3
        cookies = urllib2.HTTPCookieProcessor()

        proxyHost = ""
        if protocol == 'HTTP' or protocol == 'HTTPS':
            proxyHost = {"http":r'http://%s:%s' % (ip, port)}
            #print proxyHost

        proxyHandler = urllib2.ProxyHandler(proxyHost)
        opener = urllib2.build_opener(cookies, proxyHandler)
        opener.addheaders = [('User-Agent',
                              'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36')]

        try:
            req = opener.open(testUrl, timeout=req_timeout)
            result = req.read()
            #print result
            gevent.sleep(2)
            return  True
        except urllib2.HTTPError as e:
            print  e.message
            return False
项目:zhengfang-xuanke    作者:xiaohuanshu    | 项目源码 | 文件源码
def open(aurl,post='',Referer=''):
    #proxy = 'http://127.0.0.1:8088'
    #opener = urllib2.build_opener( urllib2.ProxyHandler({'http':proxy}) )
    #urllib2.install_opener(opener)
    if post!='':
        test_data_urlencode = urllib.urlencode(post)
        req = urllib2.Request(url=aurl,data = test_data_urlencode)
    else:
        req = urllib2.Request(url=aurl)
    if Referer!='':
        req.add_header('Referer',Referer)
    if aspxsession!="":
        req.add_header('Cookie',aspxsession)
    res_data = urllib2.urlopen(req)
    return res_data
#????????session
项目:zhengfang-xuanke    作者:xiaohuanshu    | 项目源码 | 文件源码
def open(aurl,post='',Referer=''):
    #proxy = 'http://127.0.0.1:8088'
    #opener = urllib2.build_opener( urllib2.ProxyHandler({'http':proxy}) )
    #urllib2.install_opener(opener)
    if post!='':
        test_data_urlencode = urllib.urlencode(post)
        req = urllib2.Request(url=aurl,data = test_data_urlencode)
    else:
        req = urllib2.Request(url=aurl)
    if Referer!='':
        req.add_header('Referer',Referer)
    if aspxsession!="":
        req.add_header('Cookie',aspxsession)
    res_data = urllib2.urlopen(req)
    return res_data
#????????session
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def setUp(self):
        super(ProxyAuthTests, self).setUp()
        # Ignore proxy bypass settings in the environment.
        def restore_environ(old_environ):
            os.environ.clear()
            os.environ.update(old_environ)
        self.addCleanup(restore_environ, os.environ.copy())
        os.environ['NO_PROXY'] = ''
        os.environ['no_proxy'] = ''

        self.digest_auth_handler = DigestAuthHandler()
        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
        self.digest_auth_handler.set_realm(self.REALM)
        # With Digest Authentication
        def create_fake_proxy_handler(*args, **kwargs):
            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)

        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
        self.server.start()
        self.server.ready.wait()
        proxy_url = "http://127.0.0.1:%d" % self.server.port
        handler = urllib2.ProxyHandler({"http" : proxy_url})
        self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
        self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_proxy(self):
        o = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
        o.add_handler(ph)
        meth_spec = [
            [("http_open", "return response")]
            ]
        handlers = add_ordered_mock_handlers(o, meth_spec)

        req = Request("http://acme.example.com/")
        self.assertEqual(req.get_host(), "acme.example.com")
        r = o.open(req)
        self.assertEqual(req.get_host(), "proxy.example.com:3128")

        self.assertEqual([(handlers[0], "http_open")],
                         [tup[0:2] for tup in o.calls])
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_proxy_https_proxy_authorization(self):
        o = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
        o.add_handler(ph)
        https_handler = MockHTTPSHandler()
        o.add_handler(https_handler)
        req = Request("https://www.example.com/")
        req.add_header("Proxy-Authorization","FooBar")
        req.add_header("User-Agent","Grail")
        self.assertEqual(req.get_host(), "www.example.com")
        self.assertIsNone(req._tunnel_host)
        r = o.open(req)
        # Verify Proxy-Authorization gets tunneled to request.
        # httpsconn req_headers do not have the Proxy-Authorization header but
        # the req will have.
        self.assertNotIn(("Proxy-Authorization","FooBar"),
                         https_handler.httpconn.req_headers)
        self.assertIn(("User-Agent","Grail"),
                      https_handler.httpconn.req_headers)
        self.assertIsNotNone(req._tunnel_host)
        self.assertEqual(req.get_host(), "proxy.example.com:3128")
        self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_proxy_basic_auth(self):
        opener = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
        opener.add_handler(ph)
        password_manager = MockPasswordManager()
        auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
        realm = "ACME Networks"
        http_handler = MockHTTPHandler(
            407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
        opener.add_handler(auth_handler)
        opener.add_handler(http_handler)
        self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
                              realm, http_handler, password_manager,
                              "http://acme.example.com:3128/protected",
                              "proxy.example.com:3128",
                              )
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def setUp(self):
        super(ProxyAuthTests, self).setUp()
        # Ignore proxy bypass settings in the environment.
        def restore_environ(old_environ):
            os.environ.clear()
            os.environ.update(old_environ)
        self.addCleanup(restore_environ, os.environ.copy())
        os.environ['NO_PROXY'] = ''
        os.environ['no_proxy'] = ''

        self.digest_auth_handler = DigestAuthHandler()
        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
        self.digest_auth_handler.set_realm(self.REALM)
        # With Digest Authentication
        def create_fake_proxy_handler(*args, **kwargs):
            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)

        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
        self.server.start()
        self.server.ready.wait()
        proxy_url = "http://127.0.0.1:%d" % self.server.port
        handler = urllib2.ProxyHandler({"http" : proxy_url})
        self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
        self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_proxy(self):
        o = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
        o.add_handler(ph)
        meth_spec = [
            [("http_open", "return response")]
            ]
        handlers = add_ordered_mock_handlers(o, meth_spec)

        req = Request("http://acme.example.com/")
        self.assertEqual(req.get_host(), "acme.example.com")
        r = o.open(req)
        self.assertEqual(req.get_host(), "proxy.example.com:3128")

        self.assertEqual([(handlers[0], "http_open")],
                         [tup[0:2] for tup in o.calls])
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_proxy_https_proxy_authorization(self):
        o = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
        o.add_handler(ph)
        https_handler = MockHTTPSHandler()
        o.add_handler(https_handler)
        req = Request("https://www.example.com/")
        req.add_header("Proxy-Authorization","FooBar")
        req.add_header("User-Agent","Grail")
        self.assertEqual(req.get_host(), "www.example.com")
        self.assertIsNone(req._tunnel_host)
        r = o.open(req)
        # Verify Proxy-Authorization gets tunneled to request.
        # httpsconn req_headers do not have the Proxy-Authorization header but
        # the req will have.
        self.assertNotIn(("Proxy-Authorization","FooBar"),
                         https_handler.httpconn.req_headers)
        self.assertIn(("User-Agent","Grail"),
                      https_handler.httpconn.req_headers)
        self.assertIsNotNone(req._tunnel_host)
        self.assertEqual(req.get_host(), "proxy.example.com:3128")
        self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_proxy_basic_auth(self):
        opener = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
        opener.add_handler(ph)
        password_manager = MockPasswordManager()
        auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
        realm = "ACME Networks"
        http_handler = MockHTTPHandler(
            407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
        opener.add_handler(auth_handler)
        opener.add_handler(http_handler)
        self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
                              realm, http_handler, password_manager,
                              "http://acme.example.com:3128/protected",
                              "proxy.example.com:3128",
                              )
项目:jd-crawler    作者:qiaofei32    | 项目源码 | 文件源码
def check_proxy(self, proxy_list):
        ava_list = []
        test_url = "http://www.baidu.com/"
        for host, port in proxy_list:
            ret = False
            host_port = "%s:%s" % (host, port)
            proxy = {
                "http": "http://%s" %(host_port),
                "https": "https://%s" %(host_port),
            }
            proxy_handler = urllib2.ProxyHandler(proxy)
            opener = urllib2.build_opener(proxy_handler)
            try:
                conn = opener.open(test_url, timeout=2.5)
                data = conn.read()
                if "??" in data:
                    ret = True
                    ava_list.append((host, port))
            except Exception as e:
                # print e
                ret = False
            print "checking proxy: %s ---> %s" % (host_port, str(ret))
        return ava_list
项目:YouPBX    作者:JoneXiong    | 项目源码 | 文件源码
def cache_resource(self, url):
        if self.proxy_url is not None:
            proxy = urllib2.ProxyHandler({'http': self.proxy_url})
            opener = urllib2.build_opener(proxy)
            urllib2.install_opener(opener)
        request = urllib2.Request(url)
        user_agent = 'Mozilla/5.0 (X11; Linux i686) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.35 Safari/535.1'
        request.add_header('User-Agent', user_agent)
        handler = urllib2.urlopen(request, timeout=self.http_timeout)
        try:
            resource_type = MIME_TYPES[handler.headers.get('Content-Type')]
            if not resource_type:
                raise UnsupportedResourceFormat("Resource format not found")
        except KeyError:
            raise UnsupportedResourceFormat("Resource format not supported")
        etag = handler.headers.get('ETag')
        last_modified = handler.headers.get('Last-Modified')
        resource_key = self.get_resource_key(url)
        stream = handler.read()
        self.update_resource_params(resource_key, resource_type, etag, last_modified, stream)
        return stream, resource_type
项目:lightbulb-framework    作者:lightbulb-framework    | 项目源码 | 文件源码
def __init__(self, configuration):
        self.setup(configuration)
        self.echo = None
        if "ECHO" in configuration:
            self.echo = configuration['ECHO']
        if self.proxy_scheme is not None and self.proxy_host is not None and \
                        self.proxy_port is not None:
            credentials = ""
            if self.proxy_username is not None and self.proxy_password is not None:
                credentials = self.proxy_username + ":" + self.proxy_password + "@"
            proxyDict = {
                self.proxy_scheme: self.proxy_scheme + "://" + credentials +
                                                    self.proxy_host + ":" + self.proxy_port
            }

            proxy = urllib2.ProxyHandler(proxyDict)

            if credentials != '':
                auth = urllib2.HTTPBasicAuthHandler()
                opener = urllib2.build_opener(proxy, auth, urllib2.HTTPHandler)
            else:
                opener = urllib2.build_opener(proxy)
            urllib2.install_opener(opener)
项目:raml2doc    作者:openconnectivityfoundation    | 项目源码 | 文件源码
def do_GET(self):
        filename = self.path.split('/')[-1]
        print "ProxyHandler: url:", self.path, " localfile:", filename
        if os.path.exists(filename):
            print "ProxyHandler: local file found:",filename
            self.copyfile(open(filename), self.wfile)
        else:
            filenamejson = filename + ".json"
            print "ProxyHandler: local file NOT found:", filename, " trying: ", filenamejson
            if os.path.exists(filenamejson):
                print "ProxyHandler: local file found:",filenamejson
                self.copyfile(open(filenamejson), self.wfile)
            else:
                print "ProxyHandler: trying url:",self.path
                proxy_handler = urllib2.ProxyHandler({})
                opener = urllib2.build_opener(proxy_handler)
                try:
                    req = urllib2.Request(self.path)
                    self.copyfile(opener.open(req), self.wfile)
                except:
                    print "ProxyHandler: file not found:", self.path
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def wait_xxnet_exit():

    def http_request(url, method="GET"):
        proxy_handler = urllib2.ProxyHandler({})
        opener = urllib2.build_opener(proxy_handler)
        try:
            req = opener.open(url)
            return req
        except Exception as e:
            #logging.exception("web_control http_request:%s fail:%s", url, e)
            return False

    for i in range(20):
        host_port = config.get(["modules", "launcher", "control_port"], 8085)
        req_url = "http://127.0.0.1:{port}/quit".format(port=host_port)
        if http_request(req_url) == False:
            return True
        time.sleep(1)
    return False
项目:etunexus_api    作者:etusolution    | 项目源码 | 文件源码
def _init_urllib(self, secure, debuglevel=0):
        cj = cookielib.CookieJar()
        no_proxy_support = urllib2.ProxyHandler({})
        cookie_handler = urllib2.HTTPCookieProcessor(cj)
        ctx = None
        if not secure:
            self._logger.info('[WARNING] Skip certificate verification.')
            ctx = ssl.create_default_context()
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE
        https_handler = urllib2.HTTPSHandler(debuglevel=debuglevel, context=ctx)
        opener = urllib2.build_opener(no_proxy_support,
                                      cookie_handler,
                                      https_handler,
                                      MultipartPostHandler.MultipartPostHandler)
        opener.addheaders = [('User-agent', API_USER_AGENT)]
        urllib2.install_opener(opener)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def setUp(self):
        super(ProxyAuthTests, self).setUp()
        self.digest_auth_handler = DigestAuthHandler()
        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
        self.digest_auth_handler.set_realm(self.REALM)
        # With Digest Authentication
        def create_fake_proxy_handler(*args, **kwargs):
            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)

        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
        self.server.start()
        self.server.ready.wait()
        proxy_url = "http://127.0.0.1:%d" % self.server.port
        handler = urllib2.ProxyHandler({"http" : proxy_url})
        self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
        self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_proxy(self):
        o = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
        o.add_handler(ph)
        meth_spec = [
            [("http_open", "return response")]
            ]
        handlers = add_ordered_mock_handlers(o, meth_spec)

        req = Request("http://acme.example.com/")
        self.assertEqual(req.get_host(), "acme.example.com")
        r = o.open(req)
        self.assertEqual(req.get_host(), "proxy.example.com:3128")

        self.assertEqual([(handlers[0], "http_open")],
                         [tup[0:2] for tup in o.calls])
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_proxy_https_proxy_authorization(self):
        o = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
        o.add_handler(ph)
        https_handler = MockHTTPSHandler()
        o.add_handler(https_handler)
        req = Request("https://www.example.com/")
        req.add_header("Proxy-Authorization","FooBar")
        req.add_header("User-Agent","Grail")
        self.assertEqual(req.get_host(), "www.example.com")
        self.assertIsNone(req._tunnel_host)
        r = o.open(req)
        # Verify Proxy-Authorization gets tunneled to request.
        # httpsconn req_headers do not have the Proxy-Authorization header but
        # the req will have.
        self.assertNotIn(("Proxy-Authorization","FooBar"),
                         https_handler.httpconn.req_headers)
        self.assertIn(("User-Agent","Grail"),
                      https_handler.httpconn.req_headers)
        self.assertIsNotNone(req._tunnel_host)
        self.assertEqual(req.get_host(), "proxy.example.com:3128")
        self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_proxy_basic_auth(self):
        opener = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
        opener.add_handler(ph)
        password_manager = MockPasswordManager()
        auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
        realm = "ACME Networks"
        http_handler = MockHTTPHandler(
            407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
        opener.add_handler(auth_handler)
        opener.add_handler(http_handler)
        self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
                              realm, http_handler, password_manager,
                              "http://acme.example.com:3128/protected",
                              "proxy.example.com:3128",
                              )
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_proxy(self):
        o = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
        o.add_handler(ph)
        meth_spec = [
            [("http_open", "return response")]
            ]
        handlers = add_ordered_mock_handlers(o, meth_spec)

        req = Request("http://acme.example.com/")
        self.assertEqual(req.get_host(), "acme.example.com")
        r = o.open(req)
        self.assertEqual(req.get_host(), "proxy.example.com:3128")

        self.assertEqual([(handlers[0], "http_open")],
                         [tup[0:2] for tup in o.calls])
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_proxy_https_proxy_authorization(self):
        o = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
        o.add_handler(ph)
        https_handler = MockHTTPSHandler()
        o.add_handler(https_handler)
        req = Request("https://www.example.com/")
        req.add_header("Proxy-Authorization","FooBar")
        req.add_header("User-Agent","Grail")
        self.assertEqual(req.get_host(), "www.example.com")
        self.assertIsNone(req._tunnel_host)
        r = o.open(req)
        # Verify Proxy-Authorization gets tunneled to request.
        # httpsconn req_headers do not have the Proxy-Authorization header but
        # the req will have.
        self.assertNotIn(("Proxy-Authorization","FooBar"),
                         https_handler.httpconn.req_headers)
        self.assertIn(("User-Agent","Grail"),
                      https_handler.httpconn.req_headers)
        self.assertIsNotNone(req._tunnel_host)
        self.assertEqual(req.get_host(), "proxy.example.com:3128")
        self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_proxy_basic_auth(self):
        opener = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
        opener.add_handler(ph)
        password_manager = MockPasswordManager()
        auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
        realm = "ACME Networks"
        http_handler = MockHTTPHandler(
            407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
        opener.add_handler(auth_handler)
        opener.add_handler(http_handler)
        self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
                              realm, http_handler, password_manager,
                              "http://acme.example.com:3128/protected",
                              "proxy.example.com:3128",
                              )
项目:Chromium_DepotTools    作者:p07r0457    | 项目源码 | 文件源码
def proxy_open(self, req, proxy, type):
    # This block is copied wholesale from Python2.6 urllib2.
    # It is idempotent, so the superclass method call executes as normal
    # if invoked.
    orig_type = req.get_type()
    proxy_type, user, password, hostport = self._parse_proxy(proxy)
    if proxy_type is None:
      proxy_type = orig_type
    if user and password:
      user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
      creds = base64.b64encode(user_pass).strip()
      # Later calls overwrite earlier calls for the same header
      req.add_header("Proxy-authorization", "Basic " + creds)
    hostport = urllib2.unquote(hostport)
    req.set_proxy(hostport, proxy_type)
    # This condition is the change
    if orig_type == "https":
      return None

    return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
项目:node-gn    作者:Shouqun    | 项目源码 | 文件源码
def proxy_open(self, req, proxy, type):
    # This block is copied wholesale from Python2.6 urllib2.
    # It is idempotent, so the superclass method call executes as normal
    # if invoked.
    orig_type = req.get_type()
    proxy_type, user, password, hostport = self._parse_proxy(proxy)
    if proxy_type is None:
      proxy_type = orig_type
    if user and password:
      user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
      creds = base64.b64encode(user_pass).strip()
      # Later calls overwrite earlier calls for the same header
      req.add_header("Proxy-authorization", "Basic " + creds)
    hostport = urllib2.unquote(hostport)
    req.set_proxy(hostport, proxy_type)
    # This condition is the change
    if orig_type == "https":
      return None

    return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def proxy_open(self, req, proxy, type):
    # This block is copied wholesale from Python2.6 urllib2.
    # It is idempotent, so the superclass method call executes as normal
    # if invoked.
    orig_type = req.get_type()
    proxy_type, user, password, hostport = self._parse_proxy(proxy)
    if proxy_type is None:
      proxy_type = orig_type
    if user and password:
      user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
      creds = base64.b64encode(user_pass).strip()
      # Later calls overwrite earlier calls for the same header
      req.add_header("Proxy-authorization", "Basic " + creds)
    hostport = urllib2.unquote(hostport)
    req.set_proxy(hostport, proxy_type)
    # This condition is the change
    if orig_type == "https":
      return None

    return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
项目:sogouWechart    作者:duanbj    | 项目源码 | 文件源码
def proxy_identify(proxy, url):
        cookie = cookielib.LWPCookieJar()
        handler = urllib2.HTTPCookieProcessor(cookie)
        proxy_support = urllib2.ProxyHandler({'http': proxy})
        opener = urllib2.build_opener(proxy_support, handler)
        try:
            response = opener.open(url, timeout=3)
            if response.code == 200:
                c = ''
                for item in cookie:
                    c += item.name+'='+item.value+';'
                print c
                IpProxy.sogou_cookie.append(c)
                return True
        except Exception, error:
            print error
            return False
项目:tools    作者:okabe    | 项目源码 | 文件源码
def checker():
    while True:
        if proxyq.empty() is not True:
            proxy = "http://{}".format( proxyq.get() )
            url = "http://icanhazip.com"
            proxy_handler = urllib2.ProxyHandler( { "http" : proxy } )
            opener = urllib2.build_opener( proxy_handler )
            urllib2.install_opener( opener )
            printq.put( "[>] Trying {}".format( proxy ) )
            try:
                response = urllib2.urlopen( url, timeout=3 ).readlines()
                for line in response:
                    if line.rstrip( "\n" ) in proxy:
                        printq.put( "[+] Working proxy: {}".format( proxy ) )
                        with open( "working.txt", "a" ) as log:
                            log.write( "{}\n".format( proxy ) )
                        log.close()
            except Exception as ERROR:
                printq.put( "[!] Bad proxy: {}".format( proxy ) )
            proxyq.task_done()
项目:00scanner    作者:xiaoqin00    | 项目源码 | 文件源码
def init_options(proxy=None, cookie=None, ua=None, referer=None):
    globals()["_headers"] = dict(filter(lambda _: _[1], ((COOKIE, cookie), (UA, ua or NAME), (REFERER, referer))))
    urllib2.install_opener(urllib2.build_opener(urllib2.ProxyHandler({'http': proxy})) if proxy else None)

# if __name__ == "__main__":
#     print "%s #v%s\n by: %s\n" % (NAME, VERSION, AUTHOR)
#     parser = optparse.OptionParser(version=VERSION)
#     parser.add_option("-u", "--url", dest="url", help="Target URL (e.g. \"http://www.target.com/page.php?id=1\")")
#     parser.add_option("--data", dest="data", help="POST data (e.g. \"query=test\")")
#     parser.add_option("--cookie", dest="cookie", help="HTTP Cookie header value")
#     parser.add_option("--user-agent", dest="ua", help="HTTP User-Agent header value")
#     parser.add_option("--referer", dest="referer", help="HTTP Referer header value")
#     parser.add_option("--proxy", dest="proxy", help="HTTP proxy address (e.g. \"http://127.0.0.1:8080\")")
#     options, _ = parser.parse_args()
#     if options.url:
#         init_options(options.proxy, options.cookie, options.ua, options.referer)
#         result = scan_page(options.url if options.url.startswith("http") else "http://%s" % options.url, options.data)
#         print "\nscan results: %s vulnerabilities found" % ("possible" if result else "no")
#     else:
#         parser.print_help()
项目:mechanize    作者:python-mechanize    | 项目源码 | 文件源码
def setUp(self):
        mechanize._testcase.TestCase.setUp(self)
        self.test_uri = urljoin(self.uri, "test_fixtures")
        self.server = self.get_cached_fixture("server")
        if self.no_proxies:
            old_opener_m = mechanize._opener._opener
            old_opener_u = urllib2._opener
            mechanize.install_opener(mechanize.build_opener(
                mechanize.ProxyHandler(proxies={})))
            urllib2.install_opener(urllib2.build_opener(
                urllib2.ProxyHandler(proxies={})))

            def revert_install():
                mechanize.install_opener(old_opener_m)
                urllib2.install_opener(old_opener_u)
            self.add_teardown(revert_install)