Python urllib 模块,error() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.error()

项目:docklet    作者:unias    | 项目源码 | 文件源码
def dorequest(url, data = "", method = 'GET'):
    try: 
        if method == 'GET':
            response = urllib.request.urlopen(url, timeout=10).read()
        else:
            # use PUT/DELETE/POST, data should be encoded in ascii/bytes 
            request = urllib.request.Request(url, data = data.encode('ascii'), method = method)
            response = urllib.request.urlopen(request, timeout=10).read()
    # etcd may return json result with response http error code
    # http error code will raise exception in urlopen
    # catch the HTTPError and get the json result
    except urllib.error.HTTPError as e:
        # e.fp must be read() in this except block.
        # the e will be deleted and e.fp will be closed after this block
        response = e.fp.read()
    # response is encoded in bytes. 
    # recoded in utf-8 and loaded in json
    result = json.loads(str(response, encoding='utf-8'))
    return result


# client to use etcd
# not all APIs are implemented below. just implement what we want
项目:yelp-fusion    作者:Yelp    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser()

    parser.add_argument('-q', '--term', dest='term', default=DEFAULT_TERM,
                        type=str, help='Search term (default: %(default)s)')
    parser.add_argument('-l', '--location', dest='location',
                        default=DEFAULT_LOCATION, type=str,
                        help='Search location (default: %(default)s)')

    input_values = parser.parse_args()

    try:
        query_api(input_values.term, input_values.location)
    except HTTPError as error:
        sys.exit(
            'Encountered HTTP error {0} on {1}:\n {2}\nAbort program.'.format(
                error.code,
                error.url,
                error.read(),
            )
        )
项目:transfer    作者:viur-framework    | 项目源码 | 文件源码
def onFinished(self ):

        self.hasFinished = True
        if self.request.error()==self.request.NoError:
            self.requestSucceeded.emit( self )
        else:
            try:
                errorDescr = NetworkErrorDescrs[ self.request.error() ]
            except: #Unknown error 
                errorDescr = None
            if errorDescr:
                QtGui.QMessageBox.warning( None, "Networkrequest Failed", "The request to \"%s\" failed with: %s" % (self.url, errorDescr) )
            self.requestFailed.emit( self, self.request.error() )
        self.finished.emit( self )
        self.logger.debug("Request finished: %s", str(self) )
        self.logger.debug("Remaining requests: %s",  len(NetworkService.currentRequests) )
项目:macholib    作者:secmobi    | 项目源码 | 文件源码
def get_pypi_src_download(package):
        url = 'https://pypi.python.org/pypi/%s/json'%(package,)
        fp = urllib.urlopen(url)
        try:
            try:
                data = fp.read()

            finally:
                fp.close()
        except urllib.error:
            raise RuntimeError("Cannot determine download link for %s"%(package,))

        pkgdata = json.loads(data.decode('utf-8'))
        if 'urls' not in pkgdata:
            raise RuntimeError("Cannot determine download link for %s"%(package,))

        for info in pkgdata['urls']:
            if info['packagetype'] == 'sdist' and info['url'].endswith('tar.gz'):
                return (info.get('md5_digest'), info['url'])

        raise RuntimeError("Cannot determine downlink link for %s"%(package,))
项目:postcards    作者:abertschi    | 项目源码 | 文件源码
def get_img_and_text(self, plugin_config, cli_args):
        imgs = []
        enable_safe_search = True if cli_args.safe_search else False
        self.logger.debug('setting image safe search to {}'.format(enable_safe_search))

        if cli_args.keyword:
            self.logger.info('using custom keyword {}'.format(cli_args.keyword))
            imgs = self._fetch_img_urls(cli_args.keyword, safe_search=enable_safe_search)
        else:
            imgs = self._get_images_for_random_keyword(safe_search=enable_safe_search)

        if not imgs:
            self.logger.error('no images found for given keyword')
            exit(1)

        if cli_args.keyword:
            img = random.choice(imgs)[2]
        else:
            img = imgs[0][2]  # always choose first img because search key is random anyway

        self.logger.info('choosing image {}'.format(img))
        return {
            'img': self._read_from_url(img),
            'text': ''
        }
项目:puresec-cli    作者:puresec    | 项目源码 | 文件源码
def check_version():
    try:
        response = request.urlopen("http://cli.puresec.io/verify/version/{}".format(puresec_cli.__version__))
    except urllib.error.URLError:
        return

    try:
        response = json.loads(response.read().decode())
    except ValueError:
        return

    if not isinstance(response, dict):
        return

    try:
        is_uptodate, last_version = response['is_uptodate'], response['last_version']
    except KeyError:
        return

    if not is_uptodate:
        eprint("warn: you are using an outdated version of PureSec CLI (installed={}, latest={})".format(puresec_cli.__version__, last_version))
项目:tts-stray    作者:tweetyf    | 项目源码 | 文件源码
def TTSBaidu(self, tid, txt, lan, spd):
        ''' 
            get the BAIDU.COM's TTS url
            filename: save the txt's Speech in the file with filetype .wav
            lan: language, 'en' for English or 'zh' for Chinese
            txt: the TTS text
            spd: the speedding of read
        '''
        socket.setdefaulttimeout(34.0)
        try:
            #print('processing... ',tid, txt)
            ttsUrl = genTTSUrl(lan ,txt, spd)
            c = getpage(ttsUrl)
            #?master?????
            #print('processing...finished',tid)
            self.results[tid]=c
        except urllib.error.URLError as e:
            print("error:URLError ",e," we will try again...tid:",tid)
            self.TTSBaidu(tid, txt, lan, spd)
        except socket.timeout:
            print("error: TTSBaidu time out!, we will try again...tid:",tid )
            self.TTSBaidu(tid, txt, lan, spd)
        finally:
            pass
项目:average-pixels    作者:liviu-    | 项目源码 | 文件源码
def save_images(term, count):
    api_key = get_api_key()
    images = search_images(term, count, api_key)
    filenames = []

    if not os.path.exists(SAVE_DIR):
        os.makedirs(SAVE_DIR)

    for i, img in enumerate(images):
        if img['encodingFormat'] == 'unknown':
            continue
        name = "{path}/{filename}.{ext}".format(
            path=SAVE_DIR,
            filename="_".join(term.split()) + str(i),
            ext=img['encodingFormat'])
        try:
            download_image(img['thumbnailUrl'], name)
            filenames.append(name)
        except urllib.error.HTTPError:
            pass

    return filenames
项目:vulssimulator_ds    作者:kn0630    | 项目源码 | 文件源码
def log(self, message='', err=None, level='info'):
        """
        Log a message
        """
        if not level.lower() in [
            'critical',
            'debug',
            'error',
            'fatal',
            'info',
            'warning'
        ]:
            level = 'info'

        if err:
            level = 'error'
            message += ' Threw exception:\n\t{}'.format(err)

        try:
            func = getattr(self.logger, level.lower())
            func(message)
        except Exception as log_err:
            self.logger.critical(
                "Could not write to log. Threw exception:\n\t{}".format(log_err))
项目:jumpscale_portal    作者:jumpscale7    | 项目源码 | 文件源码
def __mkfile(self):
        """Create new file"""
        name = current = None
        curDir = newFile = None
        if 'name' in self._request and 'current' in self._request:
            name = self._request['name']
            current = self._request['current']
            curDir = self.__findDir(current, None)
            newFile = os.path.join(curDir, name)

        if not curDir or not name:
            self._response['error'] = 'Invalid parameters'
        elif not self.__isAllowed(curDir, 'write'):
            self._response['error'] = 'Access denied'
        elif not self.__checkName(name):
            self._response['error'] = 'Invalid name'
        elif os.path.exists(newFile):
            self._response['error'] = 'File or folder with the same name already exists'
        else:
            try:
                open(newFile, 'w').close()
                self._response['select'] = [self.__hash(newFile)]
                self.__content(curDir, False)
            except:
                self._response['error'] = 'Unable to create file'
项目:jumpscale_portal    作者:jumpscale7    | 项目源码 | 文件源码
def __rm(self):
        """Delete files and directories"""
        current = rmList = None
        curDir = rmFile = None
        if 'current' in self._request and 'targets[]' in self._request:
            current = self._request['current']
            rmList = self._request['targets[]']
            curDir = self.__findDir(current, None)

        if not rmList or not curDir:
            self._response['error'] = 'Invalid parameters'
            return False

        if not isinstance(rmList, list):
            rmList = [rmList]

        for rm in rmList:
            rmFile = self.__find(rm, curDir)
            if not rmFile: continue
            self.__remove(rmFile)
        # TODO if errorData not empty return error
        self.__content(curDir, True)
项目:jumpscale_portal    作者:jumpscale7    | 项目源码 | 文件源码
def __duplicate(self):
        """Create copy of files/directories"""
        if 'current' in self._request and 'target' in self._request:
            curDir = self.__findDir(self._request['current'], None)
            target = self.__find(self._request['target'], curDir)
            if not curDir or not target:
                self._response['error'] = 'Invalid parameters'
                return
            if not self.__isAllowed(target, 'read') or not self.__isAllowed(curDir, 'write'):
                self._response['error'] = 'Access denied'
            newName = self.__uniqueName(target)
            if not self.__copy(target, newName):
                self._response['error'] = 'Unable to create file copy'
                return

        self.__content(curDir, True)
        return
项目:jumpscale_portal    作者:jumpscale7    | 项目源码 | 文件源码
def __edit(self):
        """Save content in file"""
        error = ''
        if 'current' in self._request and 'target' in self._request and 'content' in self._request:
            curDir = self.__findDir(self._request['current'], None)
            curFile = self.__find(self._request['target'], curDir)
            error = curFile
            if curFile and curDir:
                if self.__isAllowed(curFile, 'write'):
                    try:
                        f = open(curFile, 'w+')
                        f.write(self._request['content'])
                        f.close()
                        self._response['target'] = self.__info(curFile)
                    except:
                        self._response['error'] = 'Unable to write to file'
                else:
                    self._response['error'] = 'Access denied'
            return

        self._response['error'] = 'Invalid parameters'
        return
项目:randi    作者:nhatzHK    | 项目源码 | 文件源码
def removeNoise (s):
    import re
    pattern_list = ["\[\[(.*?)\]\]", "{{(.*?)}}", "\[(.*?)\]"]
    clean = s

    for pattern in pattern_list:
        regex = re.compile(pattern)
        clean = re.sub (regex, "", clean)

    return clean

#==============================================================================#
#==============================================================================#

# Get the html for a comic from the explainxkcd website
# Extract the transcript from the text
# Check if the transcript is mark as incomplete
#   if yes, mark it as so locally (for later updates)
# Returns a dictionary (result)
# If an error occured the status is not 0 and the error is passed in the error
# field of the returned dictionary
项目:randi    作者:nhatzHK    | 项目源码 | 文件源码
def get_xkcd(number = 0):
    if number is 0:
        url ='https://xkcd.com/info.0.json'
    else:
        url = 'https://xkcd.com/{}/info.0.json'.format (number)

    response = {'status': 0, 'error': '', 'comic': ""}

    try:
        online_comic = urlopen(url).read ()
        response['comic'] = json.loads (online_comic.decode('utf-8'))
    except urllib.error.HTTPError:
        response['status'] = -1
    except IOError:
        response['status'] = -2
    except:
        response['status'] = -3

    return response

#==============================================================================##==============================================================================#
项目:amadown2py    作者:aesuli    | 项目源码 | 文件源码
def download_page(url, referer, maxretries, timeout, pause):
    tries = 0
    htmlpage = None
    while tries < maxretries and htmlpage is None:
        try:
            code = 404
            req = request.Request(url)
            req.add_header('Referer', referer)
            req.add_header('User-agent',
                           'Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.30 (KHTML, like Gecko) Ubuntu/11.04 Chromium/12.0.742.91 Chrome/12.0.742.91 Safari/534.30')
            with closing(request.urlopen(req, timeout=timeout)) as f:
                code = f.getcode()
                htmlpage = f.read()
                sleep(pause)
        except (urlerror.URLError, socket.timeout, socket.error):
            tries += 1
    if htmlpage:
        return htmlpage.decode('utf-8'), code
    else:
        return None, code
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_short_content_raises_ContentTooShortError(self):
        self.fakehttp(b'''HTTP/1.1 200 OK
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
Connection: close
Content-Length: 100
Content-Type: text/html; charset=iso-8859-1

FF
''')

        def _reporthook(par1, par2, par3):
            pass

        with self.assertRaises(urllib.error.ContentTooShortError):
            try:
                urllib.request.urlretrieve('http://example.com/',
                                           reporthook=_reporthook)
            finally:
                self.unfakehttp()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def handle(self, fn_name, action, *args, **kwds):
        self.parent.calls.append((self, fn_name, args, kwds))
        if action is None:
            return None
        elif action == "return self":
            return self
        elif action == "return response":
            res = MockResponse(200, "OK", {}, "")
            return res
        elif action == "return request":
            return Request("http://blah/")
        elif action.startswith("error"):
            code = action[action.rfind(" ")+1:]
            try:
                code = int(code)
            except ValueError:
                pass
            res = MockResponse(200, "OK", {}, "")
            return self.parent.error("http", args[0], res, code, "", {})
        elif action == "raise":
            raise urllib.error.URLError("blah")
        assert False
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_badly_named_methods(self):
        # test work-around for three methods that accidentally follow the
        # naming conventions for handler methods
        # (*_open() / *_request() / *_response())

        # These used to call the accidentally-named methods, causing a
        # TypeError in real code; here, returning self from these mock
        # methods would either cause no exception, or AttributeError.

        from urllib.error import URLError

        o = OpenerDirector()
        meth_spec = [
            [("do_open", "return self"), ("proxy_open", "return self")],
            [("redirect_request", "return self")],
            ]
        handlers = add_ordered_mock_handlers(o, meth_spec)
        o.add_handler(urllib.request.UnknownHandler())
        for scheme in "do", "proxy", "redirect":
            self.assertRaises(URLError, o.open, scheme+"://example.com/")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_raise(self):
        # raising URLError stops processing of request
        o = OpenerDirector()
        meth_spec = [
            [("http_open", "raise")],
            [("http_open", "return self")],
            ]
        handlers = add_ordered_mock_handlers(o, meth_spec)

        req = Request("http://example.com/")
        self.assertRaises(urllib.error.URLError, o.open, req)
        self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])

##     def test_error(self):
##         # XXX this doesn't actually seem to be used in standard library,
##         #  but should really be tested anyway...
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_errors(self):
        h = urllib.request.HTTPErrorProcessor()
        o = h.parent = MockOpener()

        url = "http://example.com/"
        req = Request(url)
        # all 2xx are passed through
        r = MockResponse(200, "OK", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        r = MockResponse(202, "Accepted", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        r = MockResponse(206, "Partial content", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        # anything else calls o.error (and MockOpener returns None, here)
        r = MockResponse(502, "Bad gateway", {}, "", url)
        self.assertIsNone(h.http_response(req, r))
        self.assertEqual(o.proto, "http")  # o.error called
        self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_invalid_redirect(self):
        from_url = "http://example.com/a.html"
        valid_schemes = ['http','https','ftp']
        invalid_schemes = ['file','imap','ldap']
        schemeless_url = "example.com/b.html"
        h = urllib.request.HTTPRedirectHandler()
        o = h.parent = MockOpener()
        req = Request(from_url)
        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT

        for scheme in invalid_schemes:
            invalid_url = scheme + '://' + schemeless_url
            self.assertRaises(urllib.error.HTTPError, h.http_error_302,
                    req, MockFile(), 302, "Security Loophole",
                    MockHeaders({"location": invalid_url}))

        for scheme in valid_schemes:
            valid_url = scheme + '://' + schemeless_url
            h.http_error_302(req, MockFile(), 302, "That's fine",
                MockHeaders({"location": valid_url}))
            self.assertEqual(o.req.get_full_url(), valid_url)
项目:ecs    作者:ecs-org    | 项目源码 | 文件源码
def batch_sign(request):
    tasks = request.sign_session['tasks']
    if not tasks:
        return redirect('ecs.dashboard.views.view_dashboard')

    task = _get_tasks(request.user).get(pk=tasks[0])
    data = request.sign_session['data_func'](request, task)
    data['sign_session_id'] = request.sign_session.id
    sign_data = _store_sign_data(data)

    if request.user.email.startswith('signing_fail'):
        return sign_error(request, pdf_id=sign_data.id, error='forced failure', cause='requested force_fail, so we failed')

    return render(request, 'signature/batch.html', {
        'sign_url': get_pdfas_url(request, sign_data),
        'pdf_id': sign_data.id,
    })
项目:ecs    作者:ecs-org    | 项目源码 | 文件源码
def get_pdfas_url(request, sign_data):
    values = {
        'connector': 'onlinebku',
        'invoke-app-url': request.build_absolute_uri(reverse('ecs.signature.views.sign_receive', kwargs={'pdf_id': sign_data.id})),
        'invoke-app-url-target': '_top',
        'invoke-app-error-url': request.build_absolute_uri(reverse('ecs.signature.views.sign_error', kwargs={'pdf_id': sign_data.id})),
        'locale': 'DE',
        'num-bytes': str(len(sign_data['pdf_data'])),
        'sig_type': 'SIGNATURBLOCK_DE',
        'pdf-url': request.build_absolute_uri(reverse('ecs.signature.views.sign_send', kwargs={'pdf_id': sign_data.id})),

        'verify-level': 'intOnly', # Dies bedeutet, dass eine Signaturprüfung durchgeführt wird, allerdings ohne Zertifikatsprüfung.
        'filename': sign_data['document_filename'],

        #'preview': 'false',
        #'mode': 'binary',
        #'inline': 'false',
        #'pdf-id': sign_data.id,
    }
    data = urllib.parse.urlencode({k: v.encode('utf-8') for k, v in values.items()})
    return '{0}Sign?{1}'.format(settings.PDFAS_SERVICE, data)
项目:Basic-discord-bot    作者:TheWizoid    | 项目源码 | 文件源码
def stream_live_check(stream):

    url = "https://api.twitch.tv/kraken/streams/{}".format(stream.lower())

    try:
        contents = json.loads(urllib.request.urlopen(url).read().decode("utf-8"))
        if contents["stream"] == None:
            status = "offline"
            bot_message = "{} is offline.".format(stream)
        else:
            #print(contents)
            name = contents["stream"]["channel"]["name"]
            title = contents["stream"]["channel"]["status"]
            game = contents["stream"]["channel"]["game"]
            viewers = contents["stream"]["viewers"]
            bot_message = "{0} is online.\n{0}'s title is: {1} \n{0} is playing {2} \nThere are {3} viewers \n".format(name,title,game,viewers)

    except urllib.error.URLError as e:
        if e.reason == "Not found" or e.reason == "Unprocessable Entity":
            bot_message = "That stream doesn't exist."
        else:
            bot_message = "There was an error proccessing your request."

    return bot_message
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_short_content_raises_ContentTooShortError(self):
        self.fakehttp(b'''HTTP/1.1 200 OK
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
Connection: close
Content-Length: 100
Content-Type: text/html; charset=iso-8859-1

FF
''')

        def _reporthook(par1, par2, par3):
            pass

        with self.assertRaises(urllib.error.ContentTooShortError):
            try:
                urllib.request.urlretrieve('http://example.com/',
                                           reporthook=_reporthook)
            finally:
                self.unfakehttp()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def handle(self, fn_name, action, *args, **kwds):
        self.parent.calls.append((self, fn_name, args, kwds))
        if action is None:
            return None
        elif action == "return self":
            return self
        elif action == "return response":
            res = MockResponse(200, "OK", {}, "")
            return res
        elif action == "return request":
            return Request("http://blah/")
        elif action.startswith("error"):
            code = action[action.rfind(" ")+1:]
            try:
                code = int(code)
            except ValueError:
                pass
            res = MockResponse(200, "OK", {}, "")
            return self.parent.error("http", args[0], res, code, "", {})
        elif action == "raise":
            raise urllib.error.URLError("blah")
        assert False
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_badly_named_methods(self):
        # test work-around for three methods that accidentally follow the
        # naming conventions for handler methods
        # (*_open() / *_request() / *_response())

        # These used to call the accidentally-named methods, causing a
        # TypeError in real code; here, returning self from these mock
        # methods would either cause no exception, or AttributeError.

        from urllib.error import URLError

        o = OpenerDirector()
        meth_spec = [
            [("do_open", "return self"), ("proxy_open", "return self")],
            [("redirect_request", "return self")],
            ]
        add_ordered_mock_handlers(o, meth_spec)
        o.add_handler(urllib.request.UnknownHandler())
        for scheme in "do", "proxy", "redirect":
            self.assertRaises(URLError, o.open, scheme+"://example.com/")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_errors(self):
        h = urllib.request.HTTPErrorProcessor()
        o = h.parent = MockOpener()

        url = "http://example.com/"
        req = Request(url)
        # all 2xx are passed through
        r = MockResponse(200, "OK", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        r = MockResponse(202, "Accepted", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        r = MockResponse(206, "Partial content", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        # anything else calls o.error (and MockOpener returns None, here)
        r = MockResponse(502, "Bad gateway", {}, "", url)
        self.assertIsNone(h.http_response(req, r))
        self.assertEqual(o.proto, "http")  # o.error called
        self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_invalid_redirect(self):
        from_url = "http://example.com/a.html"
        valid_schemes = ['http','https','ftp']
        invalid_schemes = ['file','imap','ldap']
        schemeless_url = "example.com/b.html"
        h = urllib.request.HTTPRedirectHandler()
        o = h.parent = MockOpener()
        req = Request(from_url)
        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT

        for scheme in invalid_schemes:
            invalid_url = scheme + '://' + schemeless_url
            self.assertRaises(urllib.error.HTTPError, h.http_error_302,
                    req, MockFile(), 302, "Security Loophole",
                    MockHeaders({"location": invalid_url}))

        for scheme in valid_schemes:
            valid_url = scheme + '://' + schemeless_url
            h.http_error_302(req, MockFile(), 302, "That's fine",
                MockHeaders({"location": valid_url}))
            self.assertEqual(o.req.get_full_url(), valid_url)
项目:Hands-Chopping    作者:ecmadao    | 项目源码 | 文件源码
def __data_parser__(self, data):
        try:
            if data['mods']['itemlist']['data']['auctions']:
                search_results = data['mods']['itemlist']['data']['auctions']
                return [{
                        'intro': result["raw_title"],
                        'price': float(result["view_price"]),
                        'delivery': colorful_text(result["view_fee"], Fore.RED)
                        if float(result["view_fee"]) > 0 else result["view_fee"],
                        'sales': int(result["view_sales"].split('?')[0]),
                        'belong': colorful_text("??", Fore.CYAN)
                        if result.get('shopcard', {}).get('isTmall', False) else "??",
                        'url': result["detail_url"]
                        } for result in search_results]
            error('Ops, get no goods..')
            return []
        except KeyError:
            error('Ops, some key error happened..')
            return []
项目:MarkdownLivePreview    作者:math2001    | 项目源码 | 文件源码
def get_base64_saver(loading, url):
    def callback(content):
        if isinstance(content, urllib.error.HTTPError):
            if content.getcode() == 404:
                loading[url] = 404
                return
        elif isinstance(content, urllib.error.URLError):
            if (content.reason.errno == 11001 and
                content.reason.strerror == 'getaddrinfo failed'):
                loading[url] = 404
                return
            return sublime.error_message('An unexpected error has occured: ' +
                                         str(content))
        loading[url] = to_base64(content=content)

    return callback
项目:aqua-monitor    作者:Deltares    | 项目源码 | 文件源码
def getTaskStatus(taskId):
  """Retrieve status of one or more long-running tasks.

  Args:
    taskId: ID of the task or a list of multiple IDs.

  Returns:
    List containing one object for each queried task, in the same order as
    the input array, each object containing the following values:
      id (string) ID of the task.
      state (string) State of the task, one of READY, RUNNING, COMPLETED,
        FAILED, CANCELLED; or UNKNOWN if the task with the specified ID
        doesn't exist.
      error_message (string) For a FAILED task, a description of the error.
  """
  if isinstance(taskId, six.string_types):
    taskId = [taskId]
  args = {'q': ','.join(taskId)}
  return send_('/taskstatus', args, 'GET')
项目:gdata-python3    作者:dvska    | 项目源码 | 文件源码
def __init__(self, http_response, response_body=None):
        """Sets the HTTP information in the error.

        Args:
          http_response: The response from the server, contains error information.
          response_body: string (optional) specified if the response has already
                         been read from the http_response object.
        """
        body = response_body or http_response.read()

        self.status = http_response.status
        self.reason = http_response.reason
        self.body = body
        self.headers = atom.http_core.get_headers(http_response)

        self.error_msg = 'Invalid response %s.' % self.status
        try:
            json_from_body = simplejson.loads(body)
            if isinstance(json_from_body, dict):
                self.error_msg = json_from_body.get('error', self.error_msg)
        except (ValueError, JSONDecodeError):
            pass
项目:gdata-python3    作者:dvska    | 项目源码 | 文件源码
def _download_file(self, uri, file_path, **kwargs):
        """Downloads a file to disk from the specified URI.

        Note: to download a file in memory, use the GetContent() method.

        Args:
          uri: str The full URL to download the file from.
          file_path: str The full path to save the file to.
          kwargs: Other parameters to pass to self.get_content().

        Raises:
          gdata.client.RequestError: on error response from server.
        """
        f = open(file_path, 'wb')
        try:
            f.write(self._get_content(uri, **kwargs))
        except gdata.client.RequestError as e:
            f.close()
            raise e
        f.flush()
        f.close()
项目:gdata-python3    作者:dvska    | 项目源码 | 文件源码
def UpgradeToSessionToken(self, token=None):
        """Upgrades a single use AuthSub token to a session token.

        Args:
          token: A gdata.auth.AuthSubToken or gdata.auth.SecureAuthSubToken
                 (optional) which is good for a single use but can be upgraded
                 to a session token. If no token is passed in, the token
                 is found by looking in the token_store by looking for a token
                 for the current scope.

        Raises:
          NonAuthSubToken if the user's auth token is not an AuthSub token
          TokenUpgradeFailed if the server responded to the request with an
          error.
        """
        if token is None:
            scopes = lookup_scopes(self.service)
            if scopes:
                token = self.token_store.find_token(scopes[0])
            else:
                token = self.token_store.find_token(atom.token_store.SCOPE_ALL)
        if not isinstance(token, gdata.auth.AuthSubToken):
            raise NonAuthSubToken

        self.SetAuthSubToken(self.upgrade_to_session_token(token))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_short_content_raises_ContentTooShortError(self):
        self.fakehttp(b'''HTTP/1.1 200 OK
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
Connection: close
Content-Length: 100
Content-Type: text/html; charset=iso-8859-1

FF
''')

        def _reporthook(par1, par2, par3):
            pass

        with self.assertRaises(urllib.error.ContentTooShortError):
            try:
                urllib.request.urlretrieve('http://example.com/',
                                           reporthook=_reporthook)
            finally:
                self.unfakehttp()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def change_cwd(path, quiet=False):
    """Return a context manager that changes the current working directory.

    Arguments:

      path: the directory to use as the temporary current working directory.

      quiet: if False (the default), the context manager raises an exception
        on error.  Otherwise, it issues only a warning and keeps the current
        working directory the same.

    """
    saved_dir = os.getcwd()
    try:
        os.chdir(path)
    except OSError:
        if not quiet:
            raise
        warnings.warn('tests may fail, unable to change CWD to: ' + path,
                      RuntimeWarning, stacklevel=3)
    try:
        yield os.getcwd()
    finally:
        os.chdir(saved_dir)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def temp_cwd(name='tempcwd', quiet=False):
    """
    Context manager that temporarily creates and changes the CWD.

    The function temporarily changes the current working directory
    after creating a temporary directory in the current directory with
    name *name*.  If *name* is None, the temporary directory is
    created using tempfile.mkdtemp.

    If *quiet* is False (default) and it is not possible to
    create or change the CWD, an error is raised.  If *quiet* is True,
    only a warning is raised and the original CWD is used.

    """
    with temp_dir(path=name, quiet=quiet) as temp_path:
        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
            yield cwd_dir
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def handle(self, fn_name, action, *args, **kwds):
        self.parent.calls.append((self, fn_name, args, kwds))
        if action is None:
            return None
        elif action == "return self":
            return self
        elif action == "return response":
            res = MockResponse(200, "OK", {}, "")
            return res
        elif action == "return request":
            return Request("http://blah/")
        elif action.startswith("error"):
            code = action[action.rfind(" ")+1:]
            try:
                code = int(code)
            except ValueError:
                pass
            res = MockResponse(200, "OK", {}, "")
            return self.parent.error("http", args[0], res, code, "", {})
        elif action == "raise":
            raise urllib.error.URLError("blah")
        assert False
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_badly_named_methods(self):
        # test work-around for three methods that accidentally follow the
        # naming conventions for handler methods
        # (*_open() / *_request() / *_response())

        # These used to call the accidentally-named methods, causing a
        # TypeError in real code; here, returning self from these mock
        # methods would either cause no exception, or AttributeError.

        from urllib.error import URLError

        o = OpenerDirector()
        meth_spec = [
            [("do_open", "return self"), ("proxy_open", "return self")],
            [("redirect_request", "return self")],
            ]
        add_ordered_mock_handlers(o, meth_spec)
        o.add_handler(urllib.request.UnknownHandler())
        for scheme in "do", "proxy", "redirect":
            self.assertRaises(URLError, o.open, scheme+"://example.com/")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_errors(self):
        h = urllib.request.HTTPErrorProcessor()
        o = h.parent = MockOpener()

        url = "http://example.com/"
        req = Request(url)
        # all 2xx are passed through
        r = MockResponse(200, "OK", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        r = MockResponse(202, "Accepted", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        r = MockResponse(206, "Partial content", {}, "", url)
        newr = h.http_response(req, r)
        self.assertIs(r, newr)
        self.assertFalse(hasattr(o, "proto"))  # o.error not called
        # anything else calls o.error (and MockOpener returns None, here)
        r = MockResponse(502, "Bad gateway", {}, "", url)
        self.assertIsNone(h.http_response(req, r))
        self.assertEqual(o.proto, "http")  # o.error called
        self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_invalid_redirect(self):
        from_url = "http://example.com/a.html"
        valid_schemes = ['http','https','ftp']
        invalid_schemes = ['file','imap','ldap']
        schemeless_url = "example.com/b.html"
        h = urllib.request.HTTPRedirectHandler()
        o = h.parent = MockOpener()
        req = Request(from_url)
        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT

        for scheme in invalid_schemes:
            invalid_url = scheme + '://' + schemeless_url
            self.assertRaises(urllib.error.HTTPError, h.http_error_302,
                    req, MockFile(), 302, "Security Loophole",
                    MockHeaders({"location": invalid_url}))

        for scheme in valid_schemes:
            valid_url = scheme + '://' + schemeless_url
            h.http_error_302(req, MockFile(), 302, "That's fine",
                MockHeaders({"location": valid_url}))
            self.assertEqual(o.req.get_full_url(), valid_url)
项目:BioDownloader    作者:biomadeira    | 项目源码 | 文件源码
def _download(self):
        try:
            try:
                import urllib.request
                from urllib.error import URLError, HTTPError
                with urllib.request.urlopen(self.url) as response, \
                        open(self.outputfile_origin, 'wb') as outfile:
                    shutil.copyfileobj(response, outfile)
            except (AttributeError, ImportError):
                import urllib
                urllib.urlretrieve(self.url, self.outputfile_origin)
        except (URLError, HTTPError, IOError, Exception) as e:
            logger.debug("Unable to retrieve %s for %s", self.url, e)
项目:bitcoin-arbitrage    作者:ucfyao    | 项目源码 | 文件源码
def _buy(self, amount, price):
        """Create a buy limit order"""
        params = {"amount": amount, "price": price}
        response = self._send_request(self.buy_url, params)
        if "error" in response:
            raise TradeException(response["error"])
项目:bitcoin-arbitrage    作者:ucfyao    | 项目源码 | 文件源码
def _sell(self, amount, price):
        """Create a sell limit order"""
        params = {"amount": amount, "price": price}
        response = self._send_request(self.sell_url, params)
        if "error" in response:
            raise TradeException(response["error"])
项目:Lebanese-Channels    作者:ChadiEM    | 项目源码 | 文件源码
def __fetch_epg(channel: Channel, epg_url: EPGURL):
    try:
        html = utils.get_response(epg_url.url, epg_url.data)
        start_end_data = channel.epg_parser.parse_schedule_page(html)
        epg_utils.normalize_times(start_end_data, channel.epg_data.get_normalization())
        return get_response(start_end_data, channel.channel_id)
    except urllib.error.URLError:
        return ''
项目:mendelmd    作者:raonyguimaraes    | 项目源码 | 文件源码
def kegg_rest_request(query):
    url = 'http://rest.kegg.jp/%s' % (query)
    print(url)
    try:
        data = urllib.request.urlopen(url).read()
    except urllib.error.HTTPError as e:
        print("HTTP error: %d" % e.code)
    except urllib.error.URLError as e:
        print("Network error: %s" % e.reason.args[1])

    return data
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def check_errors(self):
        errors = False
        for c in self.constraints:
            if not c.required_version:
                errors = True

                reasons = c.why()
                if len(reasons) == 1:
                    Logs.error('%s but no matching package could be found in this repository' % reasons[0])
                else:
                    Logs.error('Conflicts on package %r:' % c.pkgname)
                    for r in reasons:
                        Logs.error('  %s' % r)
        if errors:
            self.fatal('The package requirements cannot be satisfied!')
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def load_constraints(self, pkgname, pkgver, requires=REQUIRES):
        try:
            return self.cache_constraints[(pkgname, pkgver)]
        except KeyError:
            #Logs.error("no key %r" % (pkgname, pkgver))
            text = Utils.readf(os.path.join(get_distnet_cache(), pkgname, pkgver, requires))
            ret = parse_constraints(text)
            self.cache_constraints[(pkgname, pkgver)] = ret
            return ret