Python urllib.error 模块,HTTPError() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.error.HTTPError()

项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(url, user_agent):
    try:
        req = request.Request(url)
        req.add_header('User-Agent', user_agent)
        response = request.urlopen(req,timeout = 8)
        #bytes?????
        content = response.read().decode('gbk')
    except error.URLError as e:
        if hasattr(e,'code'):
            print (e.code)
        if hasattr(e,'reason'):
            print (e.reason)
    except error.HTTPError as e:
        if hasattr(e,'code'):
            print(e.code)
        if hasattr(e,'reason'):
            print(e.reason)
        print('HTTPError!!!')

    return content
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def getAbstractInfo(self):

        try:
            content = requestData(self.url, self.user_agent)
            self.getDetailList(content)

        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 8)
            #bytes?????
            content = response.read().decode('utf-8')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 3)
            #bytes?????
            content = response.read().decode('gbk')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 8)
            #bytes?????
            content = response.read().decode('utf-8')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(url, user_agent):
    try:
        req = request.Request(url)
        req.add_header('User-Agent', user_agent)
        response = request.urlopen(req,timeout = 8)
        #bytes?????
        content = response.read().decode('gbk')
    except error.URLError as e:
        if hasattr(e,'code'):
            print (e.code)
        if hasattr(e,'reason'):
            print (e.reason)
    except error.HTTPError as e:
        if hasattr(e,'code'):
            print(e.code)
        if hasattr(e,'reason'):
            print(e.reason)
        print('HTTPError!!!')

    return content
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def getAbstractInfo(self):

        try:
            content = requestData(self.url, self.user_agent)
            self.getDetailList(content)

        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 3)
            #bytes?????
            content = response.read().decode('gbk')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:yelp-fusion    作者:Yelp    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser()

    parser.add_argument('-q', '--term', dest='term', default=DEFAULT_TERM,
                        type=str, help='Search term (default: %(default)s)')
    parser.add_argument('-l', '--location', dest='location',
                        default=DEFAULT_LOCATION, type=str,
                        help='Search location (default: %(default)s)')

    input_values = parser.parse_args()

    try:
        query_api(input_values.term, input_values.location)
    except HTTPError as error:
        sys.exit(
            'Encountered HTTP error {0} on {1}:\n {2}\nAbort program.'.format(
                error.code,
                error.url,
                error.read(),
            )
        )
项目:yonkoma2data    作者:esuji5    | 项目源码 | 文件源码
def fetch_amazon_item(isbn):
    req_count = 0
    # 5???amazon?API????????????
    while(req_count < 5):
        try:
            items = item_search(isbn)
            if items:
                return items
            else:
                return
        except HTTPError as e:
            # ???????( ???)???
            print(e)
            req_count += 1
            if req_count < 5:
                # sleep_time????????????
                sleep_time = 2 ** req_count
                print('retry after {} second'.format(sleep_time))
                sleep(sleep_time)
            else:
                # ???????????????
                raise
    return
项目:zimfarm    作者:openzim    | 项目源码 | 文件源码
def put_status(self):
        host = os.getenv('HOST')
        url = "https://{host}/api/task/{id}".format(host=host, id=self.request.id)
        payload = {
            'status': self.status,
            'steps': self.steps,
            'file_name': self.zim_file_name,
            'time_stamp': {
                'started': self.start_time,
                'ended': self.ended_time
            }
        }
        headers = {
            'Content-Type': 'application/json; charset=utf-8',
            'token': self.token
        }
        request = urllib.request.Request(url, json.dumps(payload, cls=JSONEncoder).encode('utf-8'),
                                         headers, method='PUT')
        try:
            with urllib.request.urlopen(request) as response:
                code = response.code
        except HTTPError as error:
            code = error.code
项目:thesaurus_query.vim    作者:Ron89    | 项目源码 | 文件源码
def _woxikon_de_url_handler(target):
    '''
    Query woxikon for sysnonym
    '''
    time_out_choice = float(get_variable(
        'tq_online_backends_timeout', _timeout_period_default))
    try:
        response = urlopen(fixurl(u'http://synonyms.woxikon.com/de/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
        web_content = StringIO(unescape(decode_utf_8(response.read())))
        response.close()
    except HTTPError:
        return 1
    except URLError as err:
        if isinstance(err.reason, socket.timeout):  # timeout error?
            return 1
        return -1   # other error
    except socket.timeout:  # timeout error failed to be captured by URLError
        return 1
    return web_content
项目:thesaurus_query.vim    作者:Ron89    | 项目源码 | 文件源码
def _jeck_ru_url_handler(target):
    '''
    Query jiport for sysnonym
    '''
    time_out_choice = float(get_variable(
        'tq_online_backends_timeout', _timeout_period_default))
    try:
        response = urlopen(fixurl(u'http://jeck.ru/tools/SynonymsDictionary/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
        web_content = StringIO(decode_utf_8(response.read()))
        response.close()
    except HTTPError:
        return 1
    except URLError as err:
        if isinstance(err.reason, socket.timeout):  # timeout error?
            return 1
        return -1   # any other error
    except socket.timeout:  # if timeout error not captured by URLError
        return 1
    return web_content
项目:Sillok-with-Deep-Learning    作者:drive0822    | 项目源码 | 文件源码
def get_sillok_element(url):
    try:
        html = urlopen(url)
    except HTTPError as e:
        pass

    try:
        bs_obj = BeautifulSoup(html.read(), "html.parser")
        sillok_id = url.split("/")[-1].split("_")[0]
        era_info = bs_obj.find("span",{"class": "tit_loc"})
        title = bs_obj.find("h3", {"class": "search_tit ins_view_tit"})
        source_info = bs_obj.find("ul", {"class": "ins_source"})
        footnote_info = bs_obj.find("ul", {"class": "ins_footnote"})
        kor_text = bs_obj.find("div", {"class": 'ins_view_in ins_left_in'})
        main_text = ' '.join([text.get_text() + '|' for text in kor_text.findAll("p", {"class": "paragraph"})]) # delimiter '|'
    except: # find crash sillok urls
        print("Error url is : " + url)
        pass

    return (sillok_id, 
            era_info.get_text(), 
            title.get_text(), 
            source_info.get_text(), 
            footnote_info.get_text(), 
            main_text)
项目:freesound    作者:iwkse    | 项目源码 | 文件源码
def request(cls, uri, params={}, client=None, wrapper=FreesoundObject, method='GET',data=False):
        p = params if params else {}
        url = '%s?%s' % (uri, urlencode(p)) if params else uri
        d = urllib.urlencode(data) if data else None
        headers = {'Authorization':client.header}
        req = Request(url,d,headers)
        try:
            f = urlopen(req)
        except HTTPError as e:
            resp = e.read()
            if e.code >= 200 and e.code < 300:
                return resp
            else:
                return FreesoundException(e.code, json.loads(resp.decode("utf-8")))
        resp = f.read()
        f.close()
        result = None
        try:
            result = json.loads(resp.decode("utf-8"))
        except:
            raise FreesoundException(0,"Couldn't parse response")
        if wrapper:
            return wrapper(result,client)
        return result
项目:telepresence    作者:datawire    | 项目源码 | 文件源码
def kubectl_or_oc(server: str) -> str:
    """
    Return "kubectl" or "oc", the command-line tool we should use.

    :param server: The URL of the cluster API server.
    """
    if which("oc") is None:
        return "kubectl"
    # We've got oc, and possibly kubectl as well. We only want oc for OpenShift
    # servers, so check for an OpenShift API endpoint:
    ctx = ssl.create_default_context()
    ctx.check_hostname = False
    ctx.verify_mode = ssl.CERT_NONE
    try:
        with urlopen(server + "/version/openshift", context=ctx) as u:
            u.read()
    except HTTPError:
        return "kubectl"
    else:
        return "oc"
项目:UKCensusAPI    作者:virgesmith    | 项目源码 | 文件源码
def __fetch_json(self, path, query_params):
    # add API KEY to params
    query_params["uid"] = Nomisweb.KEY

    query_string = Nomisweb.URL + path + str(urlencode(query_params))

    #print(query_string)
    reply = {}
    try:
      response = request.urlopen(query_string, timeout=Nomisweb.Timeout)
    except (HTTPError, URLError) as error:
      print('ERROR: ', error, '\n', query_string)
    except timeout:
      print('ERROR: request timed out\n', query_string)
    else:
      reply = json.loads(response.read().decode("utf-8"))
    return reply

  # save metadata as JSON for future reference
项目:youtube_downloader    作者:aksinghdce    | 项目源码 | 文件源码
def fetch_decode(url, encoding=None):
    """ Fetch url and decode. """
    try:
        req = g.opener.open(url)
    except HTTPError as e:
        if e.getcode() == 503:
            time.sleep(.5)
            return fetch_decode(url, encoding)
        else:
            raise

    ct = req.headers['content-type']

    if encoding:
        return req.read().decode(encoding)

    elif "charset=" in ct:
        dbg("charset: %s", ct)
        encoding = re.search(r"charset=([\w-]+)\s*(:?;|$)", ct).group(1)
        return req.read().decode(encoding)

    else:
        dbg("encoding unknown")
        return req.read()
项目:youtube_downloader    作者:aksinghdce    | 项目源码 | 文件源码
def call_gdata(api, qs):
    """Make a request to the youtube gdata api."""
    qs = dict(qs)
    qs['key'] = g.api_key
    url = g.urls['gdata'] + api + '?' + urlencode(qs)

    try:
        data = g.opener.open(url).read().decode('utf-8')
    except HTTPError as e:
        try:
            errdata = e.file.read().decode()
            error = json.loads(errdata)['error']['message']
            errmsg = 'Youtube Error %d: %s' % (e.getcode(), error)
        except:
            errmsg = str(e)
        raise GdataError(errmsg)

    return json.loads(data)
项目:Spider_Hub    作者:WiseDoge    | 项目源码 | 文件源码
def getLinks(articleUrl):
    try:
        html = urlopen("http://en.wikipedia.org"+articleUrl)
    except HTTPError:
        ServerLog.writeLog("HTTPError")
        return None
    except URLError:
        ServerLog.writeLog("URLError")
        print("Sleeping!")
        time.sleep(URLERROR_SLEEP_TIME)
        html = urlopen("http://en.wikipedia.org"+articleUrl)
    bsObj = BeautifulSoup(html, "lxml")
    return bsObj.find("div", {"id":"bodyContent"}).findAll("a", href=re.compile("^(/wiki/)((?!:).)*$"))


# ??IP
项目:Spider_Hub    作者:WiseDoge    | 项目源码 | 文件源码
def getHistoryIPs(pageUrl):
    pageUrl = pageUrl.replace("/wiki/", "")
    historyUrl = "http://en.wikipedia.org/w/index.php?title="+pageUrl+"&action=history"
    print("history url:", historyUrl)

    time.sleep(SLEEP_TIME)

    try:
        html = urlopen(historyUrl)
    except HTTPError:
        return None
    except URLError:
        print("Sleeping!")
        time.sleep(URLERROR_SLEEP_TIME)
        html = urlopen(historyUrl)
    bsObj = BeautifulSoup(html, "lxml")
    ipAddresses = bsObj.findAll("a", {"class":"mw-anonuserlink"})

    addressList = set()
    for ipAddress in ipAddresses:
        print(pageUrl+": "+ipAddress.get_text())
        addressList.add(ipAddress.get_text())
    return addressList #????IP??

# ????IP?????
项目:Spider_Hub    作者:WiseDoge    | 项目源码 | 文件源码
def getLinks(articleUrl):
    '''
    ????????????
    '''
    try:
        html = urlopen("http://en.wikipedia.org" + articleUrl)
    except HTTPError:
        return None
    except URLError:
        print("Sleeping!")
        time.sleep(URLERROR_SLEEP_TIME)
        html = urlopen("http://en.wikipedia.org" + articleUrl)
    bsObj = BeautifulSoup(html, "lxml")
    return bsObj.find("div", {"id": "bodyContent"}).findAll("a", href=re.compile("^(/wiki/)((?!:).)*$"))


# ??????
项目:cuny-bdif    作者:aristotle-tek    | 项目源码 | 文件源码
def get_validate_spark_version(version, repo):
    if "." in version:
        version = version.replace("v", "")
        if version not in VALID_SPARK_VERSIONS:
            print("Don't know about Spark version: {v}".format(v=version), file=stderr)
            sys.exit(1)
        return version
    else:
        github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version)
        request = Request(github_commit_url)
        request.get_method = lambda: 'HEAD'
        try:
            response = urlopen(request)
        except HTTPError as e:
            print("Couldn't validate Spark commit: {url}".format(url=github_commit_url),
                  file=stderr)
            print("Received HTTP response code of {code}.".format(code=e.code), file=stderr)
            sys.exit(1)
        return version


# Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/
# Last Updated: 2015-06-19
# For easy maintainability, please keep this manually-inputted dictionary sorted by key.
项目:steamwatch    作者:akeil    | 项目源码 | 文件源码
def _get(url):
    LOG.debug('GET {u!r}'.format(u=url))
    # TODO proper error handling - or none
    try:
        response = urlopen(url)
    except HTTPError:
        raise
    except ContentTooShortError:
        raise
    except URLError:
        raise
    except Exception:
        raise

    LOG.debug('{} {}'.format(response.status, response.reason))

    if response.status not in (200,):
        raise ValueError('{} {}'.format(response.status, response.reason))

    return response
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def _requests(url, kwargs):
    encoding = kwargs.get('encoding')
    method = kwargs.get('method', 'get').lower()
    meth = getattr(requests, str(method))
    if method == 'get':
        url, data = _query(url, method, kwargs)
    kw = {}
    for k in allowed_args:
        if k in kwargs:
            kw[k] = kwargs[k]
    resp = meth(url=url, **kw)
    if not (200 <= resp.status_code < 300):
        raise HTTPError(resp.url, resp.status_code,
                        resp.reason, resp.headers, None)
    if encoding:
        resp.encoding = encoding
    html = resp.text
    return html
项目:gitcha-scripts    作者:yeonghoey    | 项目源码 | 文件源码
def get_validate_spark_version(version, repo):
    if "." in version:
        version = version.replace("v", "")
        if version not in VALID_SPARK_VERSIONS:
            print("Don't know about Spark version: {v}".format(v=version), file=stderr)
            sys.exit(1)
        return version
    else:
        github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version)
        request = Request(github_commit_url)
        request.get_method = lambda: 'HEAD'
        try:
            response = urlopen(request)
        except HTTPError as e:
            print("Couldn't validate Spark commit: {url}".format(url=github_commit_url),
                  file=stderr)
            print("Received HTTP response code of {code}.".format(code=e.code), file=stderr)
            sys.exit(1)
        return version


# Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/
# Last Updated: 2015-06-19
# For easy maintainability, please keep this manually-inputted dictionary sorted by key.
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testPasswordProtectedSite(self):
        support.requires('network')
        with support.transient_internet('mueblesmoraleda.com'):
            url = 'http://mueblesmoraleda.com'
            robots_url = url + "/robots.txt"
            # First check the URL is usable for our purposes, since the
            # test site is a bit flaky.
            try:
                urlopen(robots_url)
            except HTTPError as e:
                if e.code not in {401, 403}:
                    self.skipTest(
                        "%r should return a 401 or 403 HTTP error, not %r"
                        % (robots_url, e.code))
            else:
                self.skipTest(
                    "%r should return a 401 or 403 HTTP error, not succeed"
                    % (robots_url))
            parser = urllib.robotparser.RobotFileParser()
            parser.set_url(url)
            try:
                parser.read()
            except URLError:
                self.skipTest('%s is unavailable' % url)
            self.assertEqual(parser.can_fetch("*", robots_url), False)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def http_error_auth_reqed(self, authreq, host, req, headers):
        # host may be an authority (without userinfo) or a URL with an
        # authority
        # XXX could be multiple headers
        authreq = headers.get(authreq, None)

        if self.retried > 5:
            # retry sending the username:password 5 times before failing.
            raise HTTPError(req.get_full_url(), 401, "basic auth failed",
                    headers, None)
        else:
            self.retried += 1

        if authreq:
            mo = AbstractBasicAuthHandler.rx.search(authreq)
            if mo:
                scheme, quote, realm = mo.groups()
                if scheme.lower() == 'basic':
                    response = self.retry_http_basic_auth(host, req, realm)
                    if response and response.code != 401:
                        self.retried = 0
                    return response
项目:Tencent_Cartoon_Download    作者:Fretice    | 项目源码 | 文件源码
def _requests(url, kwargs):
    encoding = kwargs.get('encoding')
    method = kwargs.get('method', 'get').lower()
    meth = getattr(requests, str(method))
    if method == 'get':
        url, data = _query(url, method, kwargs)
    kw = {}
    for k in allowed_args:
        if k in kwargs:
            kw[k] = kwargs[k]
    resp = meth(url=url, **kw)
    if not (200 <= resp.status_code < 300):
        raise HTTPError(resp.url, resp.status_code,
                        resp.reason, resp.headers, None)
    if encoding:
        resp.encoding = encoding
    html = resp.text
    return html
项目:Alex    作者:candlewill    | 项目源码 | 文件源码
def obtain_geo_codes(self, place='New York'):
        """:
        :return: Returns tuple (longitude, latitude) for given place. Default value for place is New York
        """

        data = {'address': place, 'language': 'en'}
        url = 'https://maps.googleapis.com/maps/api/geocode/json?'
        try:
            page = urlopen(url + urlencode(data))
        except HTTPError as e:
            print(e.code)
            return None, None
        else:
            json_obj = json.loads(str(page.read(), 'utf-8'))
            # print(json_obj)
            return [(result['geometry']['location']['lng'], result['geometry']['location']['lat']) for result in
                    json_obj['results']][0]
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def __call__(self, env, start_response):
        try:
            if env["PATH_INFO"] == "/app":
                status, body, headers = self._serve_application()
            elif env["PATH_INFO"] == "/callback":
                status, body, headers = self._read_auth_token(env)
            else:
                status = "301 Moved"
                body = ""
                headers = {"Location": "/app"}
        except HTTPError as http_error:
            print("HTTPError occured:")
            print(http_error.read())
            raise

        start_response(status,
                       [(header, val) for header, val in list(headers.items())])
        return [body]
项目:stashpy    作者:afroisalreadyinu    | 项目源码 | 文件源码
def main():
    config = get_config()
    try:
        urlopen(Request(index_url(), method='DELETE'))
    except HTTPError:
        pass
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((config['address'], config['port']))
    per_second = list(range(0, 210, 10))
    per_second[0] = 1
    success_vals = []
    for step in per_second:
        print('Doing {} loglines per sec'.format(step))
        vals = run_step(sock, step, config['processor_spec']['to_dict'][0])
        if step == 1:
            time.sleep(2)
        flush_url = '{}/_flush'.format(index_url())
        resp = urlopen(Request(flush_url, urlencode({'wait_if_ongoing': 'true'}).encode('utf-8'))).read()
        success_vals.append(check_step(config['indexer_config']['host'],
                                       config['indexer_config']['port'],
                                       vals))
    pyplot.plot(per_second, success_vals)
    pyplot.xlabel('Log lines per second')
    pyplot.ylabel('Successful')
    pyplot.savefig('lines_per_sec.png', bbox_inches='tight')
项目:TnyBot-Discord    作者:00firestar00    | 项目源码 | 文件源码
def url_request(self, url, file_path, proxy_url=None):
        try:
            urllib_request.urlretrieve(url, file_path)
            print(file_path)
        except HTTPError as e:
            print(e.code)
            if proxy_url is not None:
                print("Trying proxy URL")
                url = proxy_url
                await self.url_request(url, file_path)
            else:
                raise e
        except UnicodeEncodeError:
            # Special retry logic for IDN domains
            url = "http://" + url.replace("http://", "").encode("idna").decode("utf-8")
            await self.url_request(url, file_path)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testPasswordProtectedSite(self):
        support.requires('network')
        with support.transient_internet('mueblesmoraleda.com'):
            url = 'http://mueblesmoraleda.com'
            robots_url = url + "/robots.txt"
            # First check the URL is usable for our purposes, since the
            # test site is a bit flaky.
            try:
                urlopen(robots_url)
            except HTTPError as e:
                if e.code not in {401, 403}:
                    self.skipTest(
                        "%r should return a 401 or 403 HTTP error, not %r"
                        % (robots_url, e.code))
            else:
                self.skipTest(
                    "%r should return a 401 or 403 HTTP error, not succeed"
                    % (robots_url))
            parser = urllib.robotparser.RobotFileParser()
            parser.set_url(url)
            try:
                parser.read()
            except URLError:
                self.skipTest('%s is unavailable' % url)
            self.assertEqual(parser.can_fetch("*", robots_url), False)
项目:mtg-sdk-python    作者:MagicTheGathering    | 项目源码 | 文件源码
def get(url, params={}):
        """Invoke an HTTP GET request on a url

        Args:
            url (string): URL endpoint to request
            params (dict): Dictionary of url parameters 
        Returns:
            dict: JSON response as a dictionary
        """
        request_url = url

        if len(params) > 0:
            request_url = "{}?{}".format(url, urlencode(params))

        try:
            req = Request(request_url, headers={'User-Agent': 'Mozilla/5.0'})
            response = json.loads(urlopen(req).read().decode("utf-8"))

            return response
        except HTTPError as err:
            raise MtgException(err.read())
项目:pelisalacarta-ce    作者:pelisalacarta-ce    | 项目源码 | 文件源码
def fetch_decode(url, encoding=None):
    """ Fetch url and decode. """
    try:
        req = g.opener.open(url)
    except HTTPError as e:
        if e.getcode() == 503:
            time.sleep(.5)
            return fetch_decode(url, encoding)
        else:
            raise e

    ct = req.headers['content-type']

    if encoding:
        return req.read().decode(encoding)

    elif "charset=" in ct:
        dbg("charset: %s", ct)
        encoding = re.search(r"charset=([\w-]+)\s*(:?;|$)", ct).group(1)
        return req.read().decode(encoding)

    else:
        dbg("encoding unknown")
        return req.read()
项目:python-http-client    作者:sendgrid    | 项目源码 | 文件源码
def _make_request(self, opener, request):
        """Make the API call and return the response. This is separated into
           it's own function, so we can mock it easily for testing.

        :param opener:
        :type opener:
        :param request: url payload to request
        :type request: urllib.Request object
        :return: urllib response
        """
        try:
            return opener.open(request)
        except HTTPError as err:
            exc = handle_error(err)
            exc.__cause__ = None
            raise exc
项目:taikutsu_blog_works    作者:hokekiyoo    | 项目源码 | 文件源码
def extract_urls(args):
    page = 1
    is_articles = True
    urls = []
    while is_articles:
        try:
            html = request.urlopen("{}/archive?page={}".format(args.url, page))
        except error.HTTPError as e: 
            # HTTP???????????????404, 403, 401???????
            print(e.reason)
            break
        except error.URLError as e: 
            # ??????????url???????????
            print(e.reason)
            break
        soup = BeautifulSoup(html, "html.parser")
        articles = soup.find_all("a", class_="entry-title-link")
        for article in articles:
            urls.append(article.get("href"))
        if len(articles) == 0:
            # article?????????
            is_articles = False
        page += 1
    return urls
项目:plugin.video.streamondemand-pureita    作者:orione7    | 项目源码 | 文件源码
def fetch_decode(url, encoding=None):
    """ Fetch url and decode. """
    try:
        req = g.opener.open(url)
    except HTTPError as e:
        if e.getcode() == 503:
            time.sleep(.5)
            return fetch_decode(url, encoding)
        else:
            raise e

    ct = req.headers['content-type']

    if encoding:
        return req.read().decode(encoding)

    elif "charset=" in ct:
        dbg("charset: %s", ct)
        encoding = re.search(r"charset=([\w-]+)\s*(:?;|$)", ct).group(1)
        return req.read().decode(encoding)

    else:
        dbg("encoding unknown")
        return req.read()
项目:gopro-py-api    作者:KonradIT    | 项目源码 | 文件源码
def prepare_gpcontrol(self):
        try:
            response_raw = urllib.request.urlopen('http://10.5.5.9/gp/gpControl', timeout=5).read().decode('utf8')
            jsondata=json.loads(response_raw)
            response=jsondata["info"]["firmware_version"]
            if "HD5.03" in response or "HX" in response: #Only session cameras.
                connectedStatus=False
                while connectedStatus == False:
                    req=urllib.request.urlopen("http://10.5.5.9/gp/gpControl/status")
                    data = req.read()
                    encoding = req.info().get_content_charset('utf-8')
                    json_data = json.loads(data.decode(encoding))
                    #print(json_data["status"]["31"])
                    if json_data["status"]["31"] >= 1:
                        connectedStatus=True
        except (HTTPError, URLError) as error:
            self.prepare_gpcontrol()
        except timeout:
            self.prepare_gpcontrol()

        print("Camera successfully connected!")
项目:gopro-py-api    作者:KonradIT    | 项目源码 | 文件源码
def getStatus(self, param, value=""):
       if self.whichCam() == "gpcontrol":
            try:
                req=urllib.request.urlopen("http://10.5.5.9/gp/gpControl/status", timeout=5)
                data = req.read()
                encoding = req.info().get_content_charset('utf-8')
                json_data = json.loads(data.decode(encoding))
                return json_data[param][value]
            except (HTTPError, URLError) as error:
                return ""
                print("Error code:" + str(error.code) + "\nMake sure the connection to the WiFi camera is still active.")
            except timeout:
                return ""
                print("HTTP Timeout\nMake sure the connection to the WiFi camera is still active.")
       else:
            response = urllib.request.urlopen("http://10.5.5.9/camera/sx?t=" + self.getPassword(), timeout=5).read()
            response_hex = str(bytes.decode(base64.b16encode(response), 'utf-8'))
            return str(response_hex[param[0]:param[1]])
项目:gopro-py-api    作者:KonradIT    | 项目源码 | 文件源码
def getStatusRaw(self):
        if self.whichCam() == "gpcontrol":
            try:
                return urllib.request.urlopen("http://10.5.5.9/gp/gpControl/status", timeout=5).read().decode('utf-8')
            except (HTTPError, URLError) as error:
                return ""
                print("Error code:" + str(error.code) + "\nMake sure the connection to the WiFi camera is still active.")
            except timeout:
                return ""
                print("HTTP Timeout\nMake sure the connection to the WiFi camera is still active.")
        elif self.whichCam() == "auth":
            try:
                return urllib.request.urlopen("http://10.5.5.9/camera/sx?t=" + self.getPassword(), timeout=5).read()
            except (HTTPError, URLError) as error:
                return ""
                print("Error code:" + str(error.code) + "\nMake sure the connection to the WiFi camera is still active.")
            except timeout:
                return ""
                print("HTTP Timeout\nMake sure the connection to the WiFi camera is still active.")
        else:
            print("Error, camera not defined.")
项目:gopro-py-api    作者:KonradIT    | 项目源码 | 文件源码
def getMediaInfo(self, option):
        folder = ""
        file = ""
        size = ""
        try:
            raw_data = urllib.request.urlopen('http://10.5.5.9:8080/gp/gpMediaList').read().decode('utf-8')
            json_parse = json.loads(raw_data)
            for i in json_parse['media']:
                folder=i['d']
            for i in json_parse['media']:
                for i2 in i['fs']:
                    file = i2['n']
                    size = i2['s']
            if option == "folder":
                return folder
            elif option == "file":
                return file
            elif option == "size":
                return self.parse_value("media_size", int(size))
        except (HTTPError, URLError) as error:
            return ""
            print("Error code:" + str(error.code) + "\nMake sure the connection to the WiFi camera is still active.")
        except timeout:
            return ""
            print("HTTP Timeout\nMake sure the connection to the WiFi camera is still active.")
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def send(self, data_to_send):
        """ Override the default resend mechanism in SenderBase. Stop resend when it fails."""
        request_payload = json.dumps([a.write() for a in data_to_send])

        request = HTTPClient.Request(self._service_endpoint_uri, bytearray(request_payload, 'utf-8'),
                                     {'Accept': 'application/json', 'Content-Type': 'application/json; charset=utf-8'})
        try:
            response = HTTPClient.urlopen(request)
            status_code = response.getcode()
            if 200 <= status_code < 300:
                return
        except HTTPError as e:
            if e.getcode() == 400:
                return
        except Exception:  # pylint: disable=broad-except
            if self.retry < 3:
                self.retry = self.retry + 1
            else:
                return

        # Add our unsent data back on to the queue
        for data in data_to_send:
            self._queue.put(data)
项目:Yv-dl    作者:r0oth3x49    | 项目源码 | 文件源码
def fetch_decode(url, encoding=None):
    """ Fetch url and decode. """
    try:
        req = g.opener.open(url)
    except HTTPError as e:
        if e.getcode() == 503:
            time.sleep(.5)
            return fetch_decode(url, encoding)
        else:
            raise

    ct = req.headers['content-type']

    if encoding:
        return req.read().decode(encoding)

    elif "charset=" in ct:
        dbg("charset: %s", ct)
        encoding = re.search(r"charset=([\w-]+)\s*(:?;|$)", ct).group(1)
        return req.read().decode(encoding)

    else:
        dbg("encoding unknown")
        return req.read()
项目:Yv-dl    作者:r0oth3x49    | 项目源码 | 文件源码
def call_gdata(api, qs):
    """Make a request to the youtube gdata api."""
    qs = dict(qs)
    qs['key'] = g.api_key
    url = g.urls['gdata'] + api + '?' + urlencode(qs)

    try:
        data = g.opener.open(url).read().decode('utf-8')
    except HTTPError as e:
        try:
            errdata = e.file.read().decode()
            error = json.loads(errdata)['error']['message']
            errmsg = 'Youtube Error %d: %s' % (e.getcode(), error)
        except:
            errmsg = str(e)
        raise GdataError(errmsg)

    return json.loads(data)
项目:python-callfire    作者:alexanderad    | 项目源码 | 文件源码
def test_exception_wrapper_wraps_http_error(self):
        fake_request = flexmock(
            prepare=flexmock(),
            path=flexmock(),
            query=flexmock(),
            body=flexmock()
        )
        fake_exception_fp = StringIO('{"error": "Bad Request"}')
        fake_exception_fp.seek(0)
        (flexmock(callfire_base)
         .should_receive('urlopen')
         .and_raise(
            HTTPError('url', 400, 'Bad Request', {}, fake_exception_fp)))

        with self.assertRaises(callfire_base.CallFireError) as cm:
            self.base._open_request(fake_request, 'GET')

        e = cm.exception
        self.assertIsInstance(e, callfire_base.CallFireError)
        self.assertIsInstance(e.wrapped_exc, HTTPError)
        self.assertEqual(
            str(e), 'HTTP Error 400: Bad Request: {"error": "Bad Request"}')
项目:acmpv    作者:Vayn    | 项目源码 | 文件源码
def icourses_download(url, merge=False, output_dir='.', **kwargs):
    icourses_parser = ICousesExactor(url=url)
    real_url = icourses_parser.icourses_cn_url_parser(**kwargs)
    title = icourses_parser.title
    if real_url is not None:
        for tries in range(0, 5):
            try:
                _, type_, size = url_info(real_url, faker=True)
                break
            except error.HTTPError:
                logging.warning('Failed to fetch the video file! Retrying...')
                sleep(random.Random().randint(0, 5))  # Prevent from blockage
                real_url = icourses_parser.icourses_cn_url_parser()
                title = icourses_parser.title
        print_info(site_info, title, type_, size)
        if not kwargs['info_only']:
            download_urls_chunked([real_url], title, 'flv',
                          total_size=size, output_dir=output_dir, refer=url, merge=merge, faker=True, ignore_range=True, chunk_size=15000000, dyn_callback=icourses_parser.icourses_cn_url_parser)


# Why not using VideoExtractor: This site needs specical download method
项目:acmpv    作者:Vayn    | 项目源码 | 文件源码
def icourses_download(url, merge=False, output_dir='.', **kwargs):
    icourses_parser = ICousesExactor(url=url)
    real_url = icourses_parser.icourses_cn_url_parser(**kwargs)
    title = icourses_parser.title
    if real_url is not None:
        for tries in range(0, 5):
            try:
                _, type_, size = url_info(real_url, faker=True)
                break
            except error.HTTPError:
                logging.warning('Failed to fetch the video file! Retrying...')
                sleep(random.Random().randint(0, 5))  # Prevent from blockage
                real_url = icourses_parser.icourses_cn_url_parser()
                title = icourses_parser.title
        print_info(site_info, title, type_, size)
        if not kwargs['info_only']:
            download_urls_chunked([real_url], title, 'flv',
                          total_size=size, output_dir=output_dir, refer=url, merge=merge, faker=True, ignore_range=True, chunk_size=15000000, dyn_callback=icourses_parser.icourses_cn_url_parser)


# Why not using VideoExtractor: This site needs specical download method
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def http_error_auth_reqed(self, auth_header, host, req, headers):
        authreq = headers.get(auth_header, None)
        if self.retried > 5:
            # Don't fail endlessly - if we failed once, we'll probably
            # fail a second time. Hm. Unless the Password Manager is
            # prompting for the information. Crap. This isn't great
            # but it's better than the current 'repeat until recursion
            # depth exceeded' approach <wink>
            raise HTTPError(req.full_url, 401, "digest auth failed",
                            headers, None)
        else:
            self.retried += 1
        if authreq:
            scheme = authreq.split()[0]
            if scheme.lower() == 'digest':
                return self.retry_http_digest_auth(req, authreq)
            elif scheme.lower() != 'basic':
                raise ValueError("AbstractDigestAuthHandler does not support"
                                 " the following scheme: '%s'" % scheme)