Python urllib2 模块,HTTPDigestAuthHandler() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用urllib2.HTTPDigestAuthHandler()

项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def update_tags(self,tags,expression):
        # support cases where we might be doing basic auth through a proxy
        if self.MOLOCH_AUTH == "basic":
            auth_handler = urllib2.HTTPPasswordMgrWithDefaultRealm()
            auth_handler.add_password(None, self.MOLOCH_URL, self.MOLOCH_USER, self.MOLOCH_PASSWORD)
            handler = urllib2.HTTPBasicAuthHandler(auth_handler)
            opener = urllib2.build_opener(handler)
        else: 
            auth_handler = urllib2.HTTPDigestAuthHandler()
            auth_handler.add_password(self.MOLOCH_REALM, self.MOLOCH_URL, self.MOLOCH_USER, self.MOLOCH_PASSWORD)
            opener = urllib2.build_opener(auth_handler)

        data = urllib.urlencode({'tags' : tags})
        qstring = urllib.urlencode({'date' : "-1",'expression' : expression})
        TAG_URL = self.MOLOCH_URL + 'addTags?' + qstring
        try:
            response = opener.open(TAG_URL,data=data)
            if response.code == 200:
                plain_answer = response.read()
                json_data = json.loads(plain_answer)
        except Exception, e:
            log.warning("Moloch: Unable to update tags %s" % (e))
项目:alicloud-duplicity    作者:aliyun    | 项目源码 | 文件源码
def get_digest_authorization(self, path):
        """
        Returns the digest auth header
        """
        u = self.parsed_url
        if self.digest_auth_handler is None:
            pw_manager = urllib2.HTTPPasswordMgrWithDefaultRealm()
            pw_manager.add_password(None, self.conn.host, self.username, self.password)
            self.digest_auth_handler = urllib2.HTTPDigestAuthHandler(pw_manager)

        # building a dummy request that gets never sent,
        # needed for call to auth_handler.get_authorization
        scheme = u.scheme == 'webdavs' and 'https' or 'http'
        hostname = u.port and "%s:%s" % (u.hostname, u.port) or u.hostname
        dummy_url = "%s://%s%s" % (scheme, hostname, path)
        dummy_req = CustomMethodRequest(self.conn._method, dummy_url)
        auth_string = self.digest_auth_handler.get_authorization(dummy_req,
                                                                 self.digest_challenge)
        return 'Digest %s' % auth_string
项目:python-mysql-pool    作者:LuciferJack    | 项目源码 | 文件源码
def __init__(self, username, password,  # pylint: disable=E1002
                 verbose=0, use_datetime=False, https_handler=None):
        """Initialize"""
        if PY2:
            Transport.__init__(self, use_datetime=False)
        else:
            super().__init__(use_datetime=False)
        self._username = username
        self._password = password
        self._use_datetime = use_datetime
        self.verbose = verbose
        self._username = username
        self._password = password

        self._handlers = []

        if self._username and self._password:
            self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
            self._auth_handler = urllib2.HTTPDigestAuthHandler(self._passmgr)
        else:
            self._auth_handler = None
            self._passmgr = None

        if https_handler:
            self._handlers.append(https_handler)
            self._scheme = 'https'
        else:
            self._scheme = 'http'

        if self._auth_handler:
            self._handlers.append(self._auth_handler)
项目:importacsv    作者:rasertux    | 项目源码 | 文件源码
def __init__(self, username, password,  #pylint: disable=E1002
                 verbose=0, use_datetime=False, https_handler=None):
        """Initialize"""
        if PY2:
            Transport.__init__(self, use_datetime=False)
        else:
            super().__init__(use_datetime=False)
        self._username = username
        self._password = password
        self._use_datetime = use_datetime
        self.verbose = verbose
        self._username = username
        self._password = password

        self._handlers = []

        if self._username and self._password:
            self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
            self._auth_handler = urllib2.HTTPDigestAuthHandler(self._passmgr)
        else:
            self._auth_handler = None
            self._passmgr = None

        if https_handler:
            self._handlers.append(https_handler)
            self._scheme = 'https'
        else:
            self._scheme = 'http'

        if self._auth_handler:
            self._handlers.append(self._auth_handler)
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def __init__(self, username, password,  #pylint: disable=E1002
                 verbose=0, use_datetime=False, https_handler=None):
        """Initialize"""
        if PY2:
            Transport.__init__(self, use_datetime=False)
        else:
            super().__init__(use_datetime=False)
        self._username = username
        self._password = password
        self._use_datetime = use_datetime
        self.verbose = verbose
        self._username = username
        self._password = password

        self._handlers = []

        if self._username and self._password:
            self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
            self._auth_handler = urllib2.HTTPDigestAuthHandler(self._passmgr)
        else:
            self._auth_handler = None
            self._passmgr = None

        if https_handler:
            self._handlers.append(https_handler)
            self._scheme = 'https'
        else:
            self._scheme = 'http'

        if self._auth_handler:
            self._handlers.append(self._auth_handler)
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def __setHTTPAuthentication():
    """
    Check and set the HTTP authentication method (Basic or Digest),
    username and password to perform HTTP requests with.
    """

    global authHandler

    if not conf.aType and not conf.aCred:
        return

    elif conf.aType and not conf.aCred:
        errMsg  = "you specified the HTTP Authentication type, but "
        errMsg += "did not provide the credentials"
        raise sqlmapSyntaxException, errMsg

    elif not conf.aType and conf.aCred:
        errMsg  = "you specified the HTTP Authentication credentials, "
        errMsg += "but did not provide the type"
        raise sqlmapSyntaxException, errMsg

    parseTargetUrl()

    debugMsg = "setting the HTTP Authentication type and credentials"
    logger.debug(debugMsg)

    aTypeLower = conf.aType.lower()

    if aTypeLower not in ( "basic", "digest" ):
        errMsg  = "HTTP Authentication type value must be "
        errMsg += "Basic or Digest"
        raise sqlmapSyntaxException, errMsg

    aCredRegExp = re.search("^(.*?)\:(.*?)$", conf.aCred)

    if not aCredRegExp:
        errMsg  = "HTTP Authentication credentials value must be "
        errMsg += "in format username:password"
        raise sqlmapSyntaxException, errMsg

    authUsername = aCredRegExp.group(1)
    authPassword = aCredRegExp.group(2)

    passwordMgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
    passwordMgr.add_password(None, "%s://%s" % (conf.scheme, conf.hostname), authUsername, authPassword)

    if aTypeLower == "basic":
        authHandler = urllib2.HTTPBasicAuthHandler(passwordMgr)
    elif aTypeLower == "digest":
        authHandler = urllib2.HTTPDigestAuthHandler(passwordMgr)
项目:enigma2    作者:OpenLD    | 项目源码 | 文件源码
def openWebIF(self, part = None, reader = None):
        self.proto = "http"
        if config.oscaminfo.userdatafromconf.value:
            self.ip = "127.0.0.1"
            udata = self.getUserData()
            if isinstance(udata, str):
                if "httpuser" in udata:
                    self.username=""
                elif "httppwd" in udata:
                    self.password = ""
                else:
                    return False, udata
            else:
                self.port = udata[2]
                self.username = udata[0]
                self.password = udata[1]
        else:
            self.ip = ".".join("%d" % d for d in config.oscaminfo.ip.value)
            self.port = str(config.oscaminfo.port.value)
            self.username = str(config.oscaminfo.username.value)
            self.password = str(config.oscaminfo.password.value)

        if self.port.startswith( '+' ):
            self.proto = "https"
            self.port.replace("+","")

        if part is None:
            self.url = "%s://%s:%s/oscamapi.html?part=status" % ( self.proto, self.ip, self.port )
        else:
            self.url = "%s://%s:%s/oscamapi.html?part=%s" % ( self.proto, self.ip, self.port, part )
        if part is not None and reader is not None:
            self.url = "%s://%s:%s/oscamapi.html?part=%s&label=%s" % ( self.proto, self.ip, self.port, part, reader )

        pwman = urllib2.HTTPPasswordMgrWithDefaultRealm()
        pwman.add_password( None, self.url, self.username, self.password )
        handlers = urllib2.HTTPDigestAuthHandler( pwman )
        opener = urllib2.build_opener( urllib2.HTTPHandler, handlers )
        urllib2.install_opener( opener )
        request = urllib2.Request( self.url )
        err = False
        try:
            data = urllib2.urlopen( request ).read()
            # print data
        except urllib2.URLError, e:
            if hasattr(e, "reason"):
                err = str(e.reason)
            elif hasattr(e, "code"):
                err = str(e.code)
        if err is not False:
            print "[openWebIF] Fehler: %s" % err
            return False, err
        else:
            return True, data
项目:enigma2    作者:OpenLD    | 项目源码 | 文件源码
def openWebIF(self, part = None, reader = None):
        self.proto = "http"
        if config.ncaminfo.userdatafromconf.value:
            self.ip = "127.0.0.1"
            udata = self.getUserData()
            if isinstance(udata, str):
                if "httpuser" in udata:
                    self.username=""
                elif "httppwd" in udata:
                    self.password = ""
                else:
                    return False, udata
            else:
                self.port = udata[2]
                self.username = udata[0]
                self.password = udata[1]
        else:
            self.ip = ".".join("%d" % d for d in config.ncaminfo.ip.value)
            self.port = str(config.ncaminfo.port.value)
            self.username = str(config.ncaminfo.username.value)
            self.password = str(config.ncaminfo.password.value)

        if self.port.startswith( '+' ):
            self.proto = "https"
            self.port.replace("+","")

        if part is None:
            self.url = "%s://%s:%s/ncamapi.html?part=status" % ( self.proto, self.ip, self.port )
        else:
            self.url = "%s://%s:%s/ncamapi.html?part=%s" % ( self.proto, self.ip, self.port, part )
        if part is not None and reader is not None:
            self.url = "%s://%s:%s/ncamapi.html?part=%s&label=%s" % ( self.proto, self.ip, self.port, part, reader )

        pwman = urllib2.HTTPPasswordMgrWithDefaultRealm()
        pwman.add_password( None, self.url, self.username, self.password )
        handlers = urllib2.HTTPDigestAuthHandler( pwman )
        opener = urllib2.build_opener( urllib2.HTTPHandler, handlers )
        urllib2.install_opener( opener )
        request = urllib2.Request( self.url )
        err = False
        try:
            data = urllib2.urlopen( request ).read()
            # print data
        except urllib2.URLError, e:
            if hasattr(e, "reason"):
                err = str(e.reason)
            elif hasattr(e, "code"):
                err = str(e.code)
        if err is not False:
            print "[openWebIF] Fehler: %s" % err
            return False, err
        else:
            return True, data
项目:enigma2    作者:Openeight    | 项目源码 | 文件源码
def openWebIF(self, part = None, reader = None):
        if config.oscaminfo.userdatafromconf.getValue():
            self.ip = "127.0.0.1"
            udata = self.getUserData()
            if isinstance(udata, str):
                if "httpuser" in udata:
                    self.username=""
                elif "httppwd" in udata:
                    self.password = ""
                else:
                    return (False, udata)
            else:
                self.port = udata[2]
                self.username = udata[0]
                self.password = udata[1]
        else:
            self.ip = ".".join("%d" % d for d in config.oscaminfo.ip.getValue())
            self.port = config.oscaminfo.port.getValue()
            self.username = config.oscaminfo.username.getValue()
            self.password = config.oscaminfo.password.getValue()
        if part is None:
            self.url = "http://%s:%s/oscamapi.html?part=status" % ( self.ip, self.port )
        else:
            self.url = "http://%s:%s/oscamapi.html?part=%s" % (self.ip, self.port, part )
        if part is not None and reader is not None:
            self.url = "http://%s:%s/oscamapi.html?part=%s&label=%s" % ( self.ip, self.port, part, reader )

        print "URL=%s" % self.url
        pwman = urllib2.HTTPPasswordMgrWithDefaultRealm()
        pwman.add_password( None, self.url, self.username, self.password )
        handlers = urllib2.HTTPDigestAuthHandler( pwman )
        opener = urllib2.build_opener( urllib2.HTTPHandler, handlers )
        urllib2.install_opener( opener )
        request = urllib2.Request( self.url )
        err = False
        try:
            data = urllib2.urlopen( request ).read()
            # print data
        except urllib2.URLError, e:
            if hasattr(e, "reason"):
                err = str(e.reason)
            elif hasattr(e, "code"):
                err = str(e.code)
        if err is not False:
            print "[openWebIF] Fehler: %s" % err
            return (False, err)
        else:
            return (True, data)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_basic_and_digest_auth_handlers(self):
        # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
        # response (http://python.org/sf/1479302), where it should instead
        # return None to allow another handler (especially
        # HTTPBasicAuthHandler) to handle the response.

        # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
        # try digest first (since it's the strongest auth scheme), so we record
        # order of calls here to check digest comes first:
        class RecordingOpenerDirector(OpenerDirector):
            def __init__(self):
                OpenerDirector.__init__(self)
                self.recorded = []
            def record(self, info):
                self.recorded.append(info)
        class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
            def http_error_401(self, *args, **kwds):
                self.parent.record("digest")
                urllib2.HTTPDigestAuthHandler.http_error_401(self,
                                                             *args, **kwds)
        class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
            def http_error_401(self, *args, **kwds):
                self.parent.record("basic")
                urllib2.HTTPBasicAuthHandler.http_error_401(self,
                                                            *args, **kwds)

        opener = RecordingOpenerDirector()
        password_manager = MockPasswordManager()
        digest_handler = TestDigestAuthHandler(password_manager)
        basic_handler = TestBasicAuthHandler(password_manager)
        realm = "ACME Networks"
        http_handler = MockHTTPHandler(
            401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
        opener.add_handler(basic_handler)
        opener.add_handler(digest_handler)
        opener.add_handler(http_handler)

        # check basic auth isn't blocked by digest handler failing
        self._test_basic_auth(opener, basic_handler, "Authorization",
                              realm, http_handler, password_manager,
                              "http://acme.example.com/protected",
                              "http://acme.example.com/protected",
                              )
        # check digest was tried before basic (twice, because
        # _test_basic_auth called .open() twice)
        self.assertEqual(opener.recorded, ["digest", "basic"]*2)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_basic_and_digest_auth_handlers(self):
        # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
        # response (http://python.org/sf/1479302), where it should instead
        # return None to allow another handler (especially
        # HTTPBasicAuthHandler) to handle the response.

        # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
        # try digest first (since it's the strongest auth scheme), so we record
        # order of calls here to check digest comes first:
        class RecordingOpenerDirector(OpenerDirector):
            def __init__(self):
                OpenerDirector.__init__(self)
                self.recorded = []
            def record(self, info):
                self.recorded.append(info)
        class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
            def http_error_401(self, *args, **kwds):
                self.parent.record("digest")
                urllib2.HTTPDigestAuthHandler.http_error_401(self,
                                                             *args, **kwds)
        class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
            def http_error_401(self, *args, **kwds):
                self.parent.record("basic")
                urllib2.HTTPBasicAuthHandler.http_error_401(self,
                                                            *args, **kwds)

        opener = RecordingOpenerDirector()
        password_manager = MockPasswordManager()
        digest_handler = TestDigestAuthHandler(password_manager)
        basic_handler = TestBasicAuthHandler(password_manager)
        realm = "ACME Networks"
        http_handler = MockHTTPHandler(
            401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
        opener.add_handler(basic_handler)
        opener.add_handler(digest_handler)
        opener.add_handler(http_handler)

        # check basic auth isn't blocked by digest handler failing
        self._test_basic_auth(opener, basic_handler, "Authorization",
                              realm, http_handler, password_manager,
                              "http://acme.example.com/protected",
                              "http://acme.example.com/protected",
                              )
        # check digest was tried before basic (twice, because
        # _test_basic_auth called .open() twice)
        self.assertEqual(opener.recorded, ["digest", "basic"]*2)
项目:enigma2    作者:BlackHole    | 项目源码 | 文件源码
def openWebIF(self, part = None, reader = None):
        self.proto = "http"
        if config.oscaminfo.userdatafromconf.value:
            self.ip = "127.0.0.1"
            udata = self.getUserData()
            if isinstance(udata, str):
                if "httpuser" in udata:
                    self.username=""
                elif "httppwd" in udata:
                    self.password = ""
                else:
                    return False, udata
            else:
                self.port = udata[2]
                self.username = udata[0]
                self.password = udata[1]
        else:
            self.ip = ".".join("%d" % d for d in config.oscaminfo.ip.value)
            self.port = str(config.oscaminfo.port.value)
            self.username = str(config.oscaminfo.username.value)
            self.password = str(config.oscaminfo.password.value)

        if self.port.startswith( '+' ):
            self.proto = "https"
            self.port.replace("+","")

        if part is None:
            self.url = "%s://%s:%s/oscamapi.html?part=status" % ( self.proto, self.ip, self.port )
        else:
            self.url = "%s://%s:%s/oscamapi.html?part=%s" % ( self.proto, self.ip, self.port, part )
        if part is not None and reader is not None:
            self.url = "%s://%s:%s/oscamapi.html?part=%s&label=%s" % ( self.proto, self.ip, self.port, part, reader )

        pwman = urllib2.HTTPPasswordMgrWithDefaultRealm()
        pwman.add_password( None, self.url, self.username, self.password )
        handlers = urllib2.HTTPDigestAuthHandler( pwman )
        opener = urllib2.build_opener( urllib2.HTTPHandler, handlers )
        urllib2.install_opener( opener )
        request = urllib2.Request( self.url )
        err = False
        try:
            data = urllib2.urlopen( request ).read()
            # print data
        except urllib2.URLError, e:
            if hasattr(e, "reason"):
                err = str(e.reason)
            elif hasattr(e, "code"):
                err = str(e.code)
        if err is not False:
            print "[OScaminfo] Fehler: %s" % err
            return False, err
        else:
            return True, data
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_basic_and_digest_auth_handlers(self):
        # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
        # response (http://python.org/sf/1479302), where it should instead
        # return None to allow another handler (especially
        # HTTPBasicAuthHandler) to handle the response.

        # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
        # try digest first (since it's the strongest auth scheme), so we record
        # order of calls here to check digest comes first:
        class RecordingOpenerDirector(OpenerDirector):
            def __init__(self):
                OpenerDirector.__init__(self)
                self.recorded = []
            def record(self, info):
                self.recorded.append(info)
        class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
            def http_error_401(self, *args, **kwds):
                self.parent.record("digest")
                urllib2.HTTPDigestAuthHandler.http_error_401(self,
                                                             *args, **kwds)
        class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
            def http_error_401(self, *args, **kwds):
                self.parent.record("basic")
                urllib2.HTTPBasicAuthHandler.http_error_401(self,
                                                            *args, **kwds)

        opener = RecordingOpenerDirector()
        password_manager = MockPasswordManager()
        digest_handler = TestDigestAuthHandler(password_manager)
        basic_handler = TestBasicAuthHandler(password_manager)
        realm = "ACME Networks"
        http_handler = MockHTTPHandler(
            401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
        opener.add_handler(basic_handler)
        opener.add_handler(digest_handler)
        opener.add_handler(http_handler)

        # check basic auth isn't blocked by digest handler failing
        self._test_basic_auth(opener, basic_handler, "Authorization",
                              realm, http_handler, password_manager,
                              "http://acme.example.com/protected",
                              "http://acme.example.com/protected",
                              )
        # check digest was tried before basic (twice, because
        # _test_basic_auth called .open() twice)
        self.assertEqual(opener.recorded, ["digest", "basic"]*2)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_basic_and_digest_auth_handlers(self):
        # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
        # response (http://python.org/sf/1479302), where it should instead
        # return None to allow another handler (especially
        # HTTPBasicAuthHandler) to handle the response.

        # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
        # try digest first (since it's the strongest auth scheme), so we record
        # order of calls here to check digest comes first:
        class RecordingOpenerDirector(OpenerDirector):
            def __init__(self):
                OpenerDirector.__init__(self)
                self.recorded = []
            def record(self, info):
                self.recorded.append(info)
        class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
            def http_error_401(self, *args, **kwds):
                self.parent.record("digest")
                urllib2.HTTPDigestAuthHandler.http_error_401(self,
                                                             *args, **kwds)
        class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
            def http_error_401(self, *args, **kwds):
                self.parent.record("basic")
                urllib2.HTTPBasicAuthHandler.http_error_401(self,
                                                            *args, **kwds)

        opener = RecordingOpenerDirector()
        password_manager = MockPasswordManager()
        digest_handler = TestDigestAuthHandler(password_manager)
        basic_handler = TestBasicAuthHandler(password_manager)
        realm = "ACME Networks"
        http_handler = MockHTTPHandler(
            401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
        opener.add_handler(basic_handler)
        opener.add_handler(digest_handler)
        opener.add_handler(http_handler)

        # check basic auth isn't blocked by digest handler failing
        self._test_basic_auth(opener, basic_handler, "Authorization",
                              realm, http_handler, password_manager,
                              "http://acme.example.com/protected",
                              "http://acme.example.com/protected",
                              )
        # check digest was tried before basic (twice, because
        # _test_basic_auth called .open() twice)
        self.assertEqual(opener.recorded, ["digest", "basic"]*2)