Python urllib.request 模块,urlopen() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.request.urlopen()

项目:Adafruit_Python_PCA9685    作者:adafruit    | 项目源码 | 文件源码
def download_file_insecure(url, target):
    """
    Use Python to download the file, even though it cannot authenticate the
    connection.
    """
    try:
        from urllib.request import urlopen
    except ImportError:
        from urllib2 import urlopen
    src = dst = None
    try:
        src = urlopen(url)
        # Read/write all in one block, so we don't create a corrupt file
        # if the download is interrupted.
        data = src.read()
        dst = open(target, "wb")
        dst.write(data)
    finally:
        if src:
            src.close()
        if dst:
            dst.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def workthread(item, user_agent,path):
    strurl = 'http://yxpjw.club'+item[0]
    picname = item[1]
    print('????%s...........................\n' %(picname))
    req = request.Request(strurl)
    req.add_header('User-Agent',user_agent)
    response = request.urlopen(req)
    content = response.read().decode('gbk')
    strurl2 = re.search(r'^(.*)/',strurl).group(0)
    print('https headers...............%s'%(strurl2))
    #destname = os.path.join(path,picname+'.txt')
    #with open(destname, 'w',encoding='gbk') as file:
        #file.write(content)
    destdir = os.path.join(path,picname)
    os.makedirs(destdir)
    page = 1
    while(1):
        content = getpagedata(content,destdir,page,strurl2)
        if not content:
            break
        page = page + 1
    print('%s?????????\n'%(picname))
项目:Adafruit_Python_MCP4725    作者:adafruit    | 项目源码 | 文件源码
def download_file_insecure(url, target):
    """
    Use Python to download the file, even though it cannot authenticate the
    connection.
    """
    try:
        from urllib.request import urlopen
    except ImportError:
        from urllib2 import urlopen
    src = dst = None
    try:
        src = urlopen(url)
        # Read/write all in one block, so we don't create a corrupt file
        # if the download is interrupted.
        data = src.read()
        dst = open(target, "wb")
        dst.write(data)
    finally:
        if src:
            src.close()
        if dst:
            dst.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(url, user_agent):
    try:
        req = request.Request(url)
        req.add_header('User-Agent', user_agent)
        response = request.urlopen(req,timeout = 8)
        #bytes?????
        content = response.read().decode('gbk')
    except error.URLError as e:
        if hasattr(e,'code'):
            print (e.code)
        if hasattr(e,'reason'):
            print (e.reason)
    except error.HTTPError as e:
        if hasattr(e,'code'):
            print(e.code)
        if hasattr(e,'reason'):
            print(e.reason)
        print('HTTPError!!!')

    return content
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 8)
            #bytes?????
            content = response.read().decode('utf-8')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 3)
            #bytes?????
            content = response.read().decode('gbk')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def getAbstractInfo(self):

        try:
            req = request.Request(self.url)
            req.add_header('User-Agent', self.user_agent)
            response = request.urlopen(req)
            #bytes?????
            content = response.read().decode('gbk')
            self.getDetailList(content)

        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(url, user_agent):
    try:
        req = request.Request(url)
        req.add_header('User-Agent', user_agent)
        response = request.urlopen(req,timeout = 8)
        #bytes?????
        content = response.read().decode('gbk')
    except error.URLError as e:
        if hasattr(e,'code'):
            print (e.code)
        if hasattr(e,'reason'):
            print (e.reason)
    except error.HTTPError as e:
        if hasattr(e,'code'):
            print(e.code)
        if hasattr(e,'reason'):
            print(e.reason)
        print('HTTPError!!!')

    return content
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 3)
            #bytes?????
            content = response.read().decode('gbk')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:py_find_1st    作者:roebel    | 项目源码 | 文件源码
def download_file_insecure(url, target):
    """
    Use Python to download the file, even though it cannot authenticate the
    connection.
    """
    try:
        from urllib.request import urlopen
    except ImportError:
        from urllib.request import urlopen
    src = dst = None
    try:
        src = urlopen(url)
        # Read/write all in one block, so we don't create a corrupt file
        # if the download is interrupted.
        data = src.read()
        dst = open(target, "wb")
        dst.write(data)
    finally:
        if src:
            src.close()
        if dst:
            dst.close()
项目:Adafruit_Python_ADS1x15    作者:adafruit    | 项目源码 | 文件源码
def download_file_insecure(url, target):
    """
    Use Python to download the file, even though it cannot authenticate the
    connection.
    """
    try:
        from urllib.request import urlopen
    except ImportError:
        from urllib2 import urlopen
    src = dst = None
    try:
        src = urlopen(url)
        # Read/write all in one block, so we don't create a corrupt file
        # if the download is interrupted.
        data = src.read()
        dst = open(target, "wb")
        dst.write(data)
    finally:
        if src:
            src.close()
        if dst:
            dst.close()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def paste(self):
        """Create a paste and return the paste id."""
        data = json.dumps({
            'description': 'Werkzeug Internal Server Error',
            'public': False,
            'files': {
                'traceback.txt': {
                    'content': self.plaintext
                }
            }
        }).encode('utf-8')
        try:
            from urllib2 import urlopen
        except ImportError:
            from urllib.request import urlopen
        rv = urlopen('https://api.github.com/gists', data=data)
        resp = json.loads(rv.read().decode('utf-8'))
        rv.close()
        return {
            'url': resp['html_url'],
            'id': resp['id']
        }
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def download_archive(self, src, dst):
    for x in self.env.PACKAGE_REPO:
        url = '/'.join((x, src))
        try:
            web = urlopen(url)
            try:
                if web.getcode() != 200:
                    continue
            except AttributeError:
                pass
        except Exception:
            # on python3 urlopen throws an exception
            # python 2.3 does not have getcode and throws an exception to fail
            continue
        else:
            tmp = self.root.make_node(dst)
            tmp.write(web.read())
            Logs.warn('Downloaded %s from %s' % (tmp.abspath(), url))
            break
    else:
        self.fatal('Could not get the package %s' % src)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def download_archive(self, src, dst):
    for x in self.env.PACKAGE_REPO:
        url = '/'.join((x, src))
        try:
            web = urlopen(url)
            try:
                if web.getcode() != 200:
                    continue
            except AttributeError:
                pass
        except Exception:
            # on python3 urlopen throws an exception
            # python 2.3 does not have getcode and throws an exception to fail
            continue
        else:
            tmp = self.root.make_node(dst)
            tmp.write(web.read())
            Logs.warn('Downloaded %s from %s' % (tmp.abspath(), url))
            break
    else:
        self.fatal('Could not get the package %s' % src)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def download_archive(self, src, dst):
    for x in self.env.PACKAGE_REPO:
        url = '/'.join((x, src))
        try:
            web = urlopen(url)
            try:
                if web.getcode() != 200:
                    continue
            except AttributeError:
                pass
        except Exception:
            # on python3 urlopen throws an exception
            # python 2.3 does not have getcode and throws an exception to fail
            continue
        else:
            tmp = self.root.make_node(dst)
            tmp.write(web.read())
            Logs.warn('Downloaded %s from %s' % (tmp.abspath(), url))
            break
    else:
        self.fatal('Could not get the package %s' % src)
项目:scheduled-bots    作者:SuLab    | 项目源码 | 文件源码
def get_assembly_report(self, taxid):
        if self.ass_sum is None:
            self.get_assembly_summaries()
        df = self.ass_sum.query("taxid == {} & refseq_category == 'reference genome'".format(taxid))
        if len(df) == 0:
            # try "representative genome" (needed for mouse and rat)
            df = self.ass_sum.query("taxid == {} & refseq_category == 'representative genome'".format(taxid))
        if len(df) != 1:
            raise ValueError("unknown reference: {}".format(df))
        print(df)
        ftp_path = list(df.ftp_path)[0]
        assembly = os.path.split(ftp_path)[1]
        url = os.path.join(ftp_path, assembly + "_assembly_report.txt")
        print(url)
        # read the column names from the file
        table = request.urlopen(request.Request(url)).read().decode()
        names = [x for x in table.split("\n") if x.startswith("#")][-1].strip().replace("# ", "").split("\t")
        self.chr_df[taxid] = pd.read_csv(StringIO(table), sep="\t", names=names, comment='#')
        self.chr_df[taxid] = self.chr_df[taxid].rename(columns={'Sequence-Name': 'SequenceName', 'Sequence-Role': 'SequenceRole',
                                                                'Assigned-Molecule': 'AssignedMolecule',
                                                                'Assigned-Molecule-Location/Type': 'AssignedMoleculeLocationType',
                                                                'GenBank-Accn': 'GenBankAccn', 'RefSeq-Accn': 'RefSeqAccn',
                                                                'UCSC-style-name': 'UCSCstylename'})
        #print(self.chr_df[taxid].query("SequenceRole == 'assembled-molecule'"))
项目:holcrawl    作者:shaypal5    | 项目源码 | 文件源码
def _get_business_props(movie_code):
    cur_business_url = _BUSINESS_URL.format(code=movie_code)
    busi_page = bs(request.urlopen(cur_business_url), "html.parser")
    busi_str = str(busi_page)
    weekend_contents = re.findall(_WEEKEND_CONTENT_REGEX, busi_str)[0]
    num_screens_list = [
        int(match.replace(',', ''))
        for match in re.findall(_US_OPEN_WEEKEND_REGEX, weekend_contents)]
    busi_props = {}
    busi_props['screens_by_weekend'] = [
        val for val in reversed(num_screens_list)]
    busi_props['opening_weekend_screens'] = busi_props['screens_by_weekend'][0]
    busi_props['max_screens'] = max(num_screens_list)
    busi_props['total_screens'] = sum(num_screens_list)
    busi_props['avg_screens'] = sum(num_screens_list) / len(num_screens_list)
    busi_props['num_weekends'] = len(num_screens_list)
    return busi_props


# ==== crawling the release page ====
项目:holcrawl    作者:shaypal5    | 项目源码 | 文件源码
def _get_release_props(movie_code):
    cur_release_url = _RELEASE_URL.format(code=movie_code)
    release_page = bs(urllib.request.urlopen(cur_release_url), "html.parser")
    release_table = release_page.find_all("table", {"id": "release_dates"})[0]
    us_rows = []
    for row in release_table.find_all("tr")[1:]:
        row_str = str(row)
        if 'USA' in row_str:
            us_rows.append(row_str)
    release_props = {}
    release_props['release_day'] = None
    release_props['release_month'] = None
    release_props['release_year'] = None
    for row in us_rows:
        if re.match(_USA_ROW_REGEX, row):
            release = re.findall(_USA_ROW_REGEX, row)[0]
            release_props['release_day'] = int(release[0])
            release_props['release_month'] = release[1]
            release_props['release_year'] = int(release[2])
    return release_props


# ==== crawling the user reviews page ====
项目:holcrawl    作者:shaypal5    | 项目源码 | 文件源码
def _get_reviews_props(movie_code):
    cur_reviews_url = _REVIEWS_URL.format(code=movie_code)
    reviews_page = bs(urllib.request.urlopen(cur_reviews_url), "html.parser")
    reviews = reviews_page.find_all("td", {"class": "comment-summary"})
    user_reviews = []
    for review in reviews:
        try:
            rating = int(re.findall(_USER_REVIEW_RATING_REGEX, str(review))[0])
            date_str = re.findall(
                r"on (\d{1,2} [a-zA-Z]+ \d{4})", str(review))[0]
            date = datetime.strptime(date_str, "%d %B %Y").date()
            contents = review.find_all(
                'a', href=re.compile(r'reviews.+?'))[0].contents[0]
            user = review.find_all(
                'a', href=re.compile(r'/user/.+?'))[1].contents[0]
            user_reviews.append({
                'score': rating, 'review_date': date,
                'contents': contents, 'user': user
            })
        except Exception:  # pylint: disable=W0703
            pass
    return {'imdb_user_reviews': user_reviews}


# ==== crawling a movie profile ====
项目:SmartSocks    作者:waylybaye    | 项目源码 | 文件源码
def run(self):
        request = self.request
        try:
            if ((timeit.default_timer() - self.starttime) <= self.timeout and
                    not SHUTDOWN_EVENT.isSet()):
                try:
                    f = urlopen(request)
                except TypeError:
                    # PY24 expects a string or buffer
                    # This also causes issues with Ctrl-C, but we will concede
                    # for the moment that Ctrl-C on PY24 isn't immediate
                    request = build_request(self.request.get_full_url(),
                                            data=request.data.read(self.size))
                    f = urlopen(request)
                f.read(11)
                f.close()
                self.result = sum(self.request.data.total)
            else:
                self.result = 0
        except (IOError, SpeedtestUploadTimeout):
            self.result = sum(self.request.data.total)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def paste(self):
        """Create a paste and return the paste id."""
        data = json.dumps({
            'description': 'Werkzeug Internal Server Error',
            'public': False,
            'files': {
                'traceback.txt': {
                    'content': self.plaintext
                }
            }
        }).encode('utf-8')
        try:
            from urllib2 import urlopen
        except ImportError:
            from urllib.request import urlopen
        rv = urlopen('https://api.github.com/gists', data=data)
        resp = json.loads(rv.read().decode('utf-8'))
        rv.close()
        return {
            'url': resp['html_url'],
            'id': resp['id']
        }
项目:waybackscraper    作者:abrenaut    | 项目源码 | 文件源码
def list_archive_timestamps(url, min_date, max_date, user_agent):
    """
    List the available archive between min_date and max_date for the given URL
    """
    logger.info('Listing the archives for the url {url}'.format(url=url))

    # Construct the URL used to download the memento list
    parameters = {'url': url,
                  'output': 'json',
                  'from': min_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT),
                  'to': max_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT)}
    cdx_url = WEB_ARCHIVE_CDX_TEMPLATE.format(params=urlencode(parameters))

    req = Request(cdx_url, None, {'User-Agent': user_agent})
    with urlopen(req) as cdx:
        memento_json = cdx.read().decode("utf-8")

        timestamps = []
        # Ignore the first line which contains column names
        for url_key, timestamp, original, mime_type, status_code, digest, length in json.loads(memento_json)[1:]:
            # Ignore archives with a status code != OK
            if status_code == '200':
                timestamps.append(datetime.strptime(timestamp, WEB_ARCHIVE_TIMESTAMP_FORMAT))

    return timestamps
项目:CMSpider    作者:chengyu2333    | 项目源码 | 文件源码
def fetch_file(self, url, filename):
        # if not os.path.exists(filename):
        #     os.makedirs(filename)
        try:
            req = request.Request(url, headers=self.__headers)
            data = request.urlopen(req).read()
            with open(filename, 'wb') as f:
                f.write(data)
                f.flush()
                f.close()
                self.__url_manager.set_url_status(url, 2)
        except Exception as e:
            self.__url_manager.set_url_status(url, -1)
            raise e
        finally:
            time.sleep(config['basic']['sleep'])
项目:veneer-py    作者:flowmatters    | 项目源码 | 文件源码
def retrieve_json(self,url):
        '''
        Retrieve data from the Veneer service at the given url path.

        url: Path to required resource, relative to the root of the Veneer service.
        '''
        if PRINT_URLS:
            print("*** %s ***" % (url))

        if self.protocol=='file':
            text = open(self.prefix+url+self.data_ext).read()
        else:
            conn = hc.HTTPConnection(self.host,port=self.port)
            conn.request('GET',quote(url+self.data_ext))
            resp = conn.getresponse()
            text = resp.read().decode('utf-8')
            #text = urlopen(self.base_url + quote(url+self.data_ext)).read().decode('utf-8')

        text = self._replace_inf(text)
        if PRINT_ALL:
            print(json.loads(text))
            print("")
        return json.loads(text)
项目:veneer-py    作者:flowmatters    | 项目源码 | 文件源码
def retrieve_csv(self,url):
        '''
        Retrieve data from the Veneer service, at the given url path, in CSV format.

        url: Path to required resource, relative to the root of the Veneer service.

        NOTE: CSV responses are currently only available for time series results
        '''
        if PRINT_URLS:
            print("*** %s ***" % (url))

        req = Request(self.base_url + quote(url+self.data_ext),headers={"Accept":"text/csv"})
        text = urlopen(req).read().decode('utf-8')

        result = utils.read_veneer_csv(text)
        if PRINT_ALL:
            print(result)
            print("")
        return result
项目:veneer-py    作者:flowmatters    | 项目源码 | 文件源码
def retrieve_json(self,url,**kwargs):
        if self.print_urls:
            print("*** %s ***" % (url))

        try:
            text = urlopen(self.base_url + quote(url)).read().decode('utf-8')
        except:
            self.log("Couldn't retrieve %s"%url)
            return None

        self.save_data(url[1:],bytes(text,'utf-8'),"json")

        if self.print_all:
            print(json.loads(text))
            print("")
        return json.loads(text)
项目:macholib    作者:secmobi    | 项目源码 | 文件源码
def get_pypi_src_download(package):
        url = 'https://pypi.python.org/pypi/%s/json'%(package,)
        fp = urllib.urlopen(url)
        try:
            try:
                data = fp.read()

            finally:
                fp.close()
        except urllib.error:
            raise RuntimeError("Cannot determine download link for %s"%(package,))

        pkgdata = json.loads(data.decode('utf-8'))
        if 'urls' not in pkgdata:
            raise RuntimeError("Cannot determine download link for %s"%(package,))

        for info in pkgdata['urls']:
            if info['packagetype'] == 'sdist' and info['url'].endswith('tar.gz'):
                return (info.get('md5_digest'), info['url'])

        raise RuntimeError("Cannot determine downlink link for %s"%(package,))
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def paste(self):
        """Create a paste and return the paste id."""
        data = json.dumps({
            'description': 'Werkzeug Internal Server Error',
            'public': False,
            'files': {
                'traceback.txt': {
                    'content': self.plaintext
                }
            }
        }).encode('utf-8')
        try:
            from urllib2 import urlopen
        except ImportError:
            from urllib.request import urlopen
        rv = urlopen('https://api.github.com/gists', data=data)
        resp = json.loads(rv.read().decode('utf-8'))
        rv.close()
        return {
            'url': resp['html_url'],
            'id': resp['id']
        }
项目:securedrop-reachability-monitor    作者:freedomofpress    | 项目源码 | 文件源码
def read_directory(self, directory_url):
        """Parses the SecureDrop directory into a dictionary of instance
        details."""
        # CloudFlare will block us if we don't set user-agent
        dir_req = Request(directory_url)
        dir_req.add_header("User-Agent", 
                           "Mozilla/5.0 (Windows NT 6.1; rv:45.0) "
                           "Gecko/20100101 Firefox/45.0")
        directory = urlopen(dir_req).read().decode()

        instances = []
        for line in directory.splitlines()[1:-1]:
            fields = line.split("\t")
            instances.append(dict(organization=fields[0],
                                  landing_page=fields[1],
                                  ths_address=fields[2]))

        return instances
项目:instagram_private_api    作者:ping    | 项目源码 | 文件源码
def test_post_video(self):
        # Reposting https://streamable.com/deltx
        video_info_res = urlopen('https://api.streamable.com/videos/deltx')
        video_info = json.loads(video_info_res.read().decode('utf8'))
        mp4_info = video_info['files']['mp4']

        video_url = ('https:' if mp4_info['url'].startswith('//') else '') + mp4_info['url']
        video_size = (mp4_info['width'], mp4_info['height'])
        thumbnail_url = ('https:' if video_info['thumbnail_url'].startswith('//') else '') + video_info['thumbnail_url']
        duration = mp4_info['duration']

        video_res = urlopen(video_url)
        video_data = video_res.read()
        thumb_res = urlopen(thumbnail_url)
        thumb_data = thumb_res.read()
        results = self.api.post_video(video_data, video_size, duration, thumb_data, caption='<3')
        self.assertEqual(results.get('status'), 'ok')
        self.assertIsNotNone(results.get('media'))
项目:instagram_private_api    作者:ping    | 项目源码 | 文件源码
def test_post_video_story(self):
        # Reposting https://streamable.com/08ico
        video_info_res = urlopen('https://api.streamable.com/videos/08ico')
        video_info = json.loads(video_info_res.read().decode('utf8'))
        mp4_info = video_info['files']['mp4']

        video_url = ('https:' if mp4_info['url'].startswith('//') else '') + mp4_info['url']
        video_size = (mp4_info['width'], mp4_info['height'])
        thumbnail_url = ('https:' if video_info['thumbnail_url'].startswith('//') else '') + video_info['thumbnail_url']
        duration = mp4_info['duration']

        video_res = urlopen(video_url)
        video_data = video_res.read()
        thumb_res = urlopen(thumbnail_url)
        thumb_data = thumb_res.read()
        results = self.api.post_video_story(video_data, video_size, duration, thumb_data)
        self.assertEqual(results.get('status'), 'ok')
        self.assertIsNotNone(results.get('media'))
项目:instagram_private_api    作者:ping    | 项目源码 | 文件源码
def respond_to_checkpoint(self, response_code):
        headers = {
            'User-Agent': self.USER_AGENT,
            'Origin': 'https://i.instagram.com',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US',
            'Accept-Encoding': 'gzip',
            'Referer': self.endpoint,
            'Cookie': self.cookie,
        }

        req = Request(self.endpoint, headers=headers)
        data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code}
        res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout)

        if res.info().get('Content-Encoding') == 'gzip':
            buf = BytesIO(res.read())
            content = gzip.GzipFile(fileobj=buf).read().decode('utf-8')
        else:
            content = res.read().decode('utf-8')

        return res.code, content
项目:Rpi-envMonitor    作者:conanwhf    | 项目源码 | 文件源码
def public_ip(self):
        ip_regex = re.compile("(([0-9]{1,3}\.){3}[0-9]{1,3})")
        # List of host which return the public IP address:
        hosts = """http://www.lawrencegoetz.com/programs/ipinfo/
            http://mwburden.com/cgi-bin/getipaddr
            http://checkip.eurodyndns.org/
            http://checkip.dyndns.org/
            http://checkrealip.com/
            http://adresseip.com
            http://www.ipchicken.com/
            http://checkmyip.com/
            http://www.naumann-net.org/""".split("\n")
        for i in hosts:
            host = i.strip()
            #print(host)
            try:
                response = request.urlopen(host).read()
                result = ip_regex.findall(response.decode('utf-8'))
                if result: 
                    return result[0][0]
            except:
                pass
        return "UNKNOWN"
项目:thesaurus_query.vim    作者:Ron89    | 项目源码 | 文件源码
def _woxikon_de_url_handler(target):
    '''
    Query woxikon for sysnonym
    '''
    time_out_choice = float(get_variable(
        'tq_online_backends_timeout', _timeout_period_default))
    try:
        response = urlopen(fixurl(u'http://synonyms.woxikon.com/de/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
        web_content = StringIO(unescape(decode_utf_8(response.read())))
        response.close()
    except HTTPError:
        return 1
    except URLError as err:
        if isinstance(err.reason, socket.timeout):  # timeout error?
            return 1
        return -1   # other error
    except socket.timeout:  # timeout error failed to be captured by URLError
        return 1
    return web_content
项目:thesaurus_query.vim    作者:Ron89    | 项目源码 | 文件源码
def _jeck_ru_url_handler(target):
    '''
    Query jiport for sysnonym
    '''
    time_out_choice = float(get_variable(
        'tq_online_backends_timeout', _timeout_period_default))
    try:
        response = urlopen(fixurl(u'http://jeck.ru/tools/SynonymsDictionary/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
        web_content = StringIO(decode_utf_8(response.read()))
        response.close()
    except HTTPError:
        return 1
    except URLError as err:
        if isinstance(err.reason, socket.timeout):  # timeout error?
            return 1
        return -1   # any other error
    except socket.timeout:  # if timeout error not captured by URLError
        return 1
    return web_content
项目:python    作者:goodstuden    | 项目源码 | 文件源码
def getLinks(pageUrl):
    global pages
    html=urlopen("http://en.wikipedia.org"+pageUrl)
    bs=BeautifulSoup(html,"html.parser")
    try:
        print(bs.h1.get_text())
        print(bs.find(id="mw-content-text").findAll("p")[0])
        print(bs.find(id="ca-edit").find("span").find("a").attrs['href'])
    except AttributeError:
        print("????????")
    for link in bs.findAll("a",href=re.compile("^(/wiki/)")):
        if 'href' in link.attrs:
            if link.attrs['href'] not in pages:
                newpage=link.attrs["href"]
                print("---------\n"+newpage)
                pages.add(newpage)
                getLinks(newpage)
项目:AlexaPi    作者:alexa-pi    | 项目源码 | 文件源码
def mm_heartbeat(self):
        # Check if stop or set next timer
        if self.shutdown:
            return
        threading.Timer(self.hb_timer, self.mm_heartbeat).start()

        address = ("http://" + self.mm_host + ":" + self.mm_port + "/alexapi?action=AVSHB")

        logger.debug("Sending MM Heatbeat")

        try:
            response = urlopen(address).read()
        except URLError as err:
            logger.error("URLError: %s", err.reason)
            return

        logger.debug("Response: " + response)
项目:BioQueue    作者:liyao001    | 项目源码 | 文件源码
def query_usage(request):
    """
    This function should only be called when the user is using IE8 or IE9
    :param request: 
    :return: 
    """
    try:
        from urllib2 import urlopen
    except ImportError:
        from urllib.request import urlopen

    api_bus = get_config('program', 'api', 1)+'/Kb/findSoftwareUsage?software='+request.POST['software']
    try:
        res_data = urlopen(api_bus)
        res = res_data.read()
        return HttpResponse(res)
    except Exception as e:
        return error(api_bus)
项目:AboveTustin    作者:kevinabrandon    | 项目源码 | 文件源码
def refresh(self):
        try:
            #open the data url
            self.req = urlopen(self.data_url)

            #read data from the url
            self.raw_data = self.req.read()

            #load in the json
            self.json_data = json.loads(self.raw_data.decode())

            #get time from json
            self.time = datetime.fromtimestamp(self.parser.time(self.json_data))

            #load all the aircarft
            self.aircraft = self.parser.aircraft_data(self.json_data, self.time)

        except Exception:
            print("exception in FlightData.refresh():")
            traceback.print_exc()
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def paste(self):
        """Create a paste and return the paste id."""
        data = json.dumps({
            'description': 'Werkzeug Internal Server Error',
            'public': False,
            'files': {
                'traceback.txt': {
                    'content': self.plaintext
                }
            }
        }).encode('utf-8')
        try:
            from urllib2 import urlopen
        except ImportError:
            from urllib.request import urlopen
        rv = urlopen('https://api.github.com/gists', data=data)
        resp = json.loads(rv.read().decode('utf-8'))
        rv.close()
        return {
            'url': resp['html_url'],
            'id': resp['id']
        }
项目:geocoder-ie    作者:devgateway    | 项目源码 | 文件源码
def download(dest_path, url):
    try:
        file_name = url.split('/')[-1]
        path = os.path.realpath(os.path.join(dest_path, unquote_plus(file_name)))
        if not os.path.exists(path):
            f = urlopen(url)
            headers = f.headers['content-type'].split('/')
            md = 'w'
            if 'html' in headers:
                file_name = '{}.html'.format(uuid.uuid1())
            else:
                md = 'wb'
                with open(path, md) as local_file:
                    local_file.write(f.read())

        if os.path.exists(path):
            return path
        else:
            logger.info("Wasn't able to find the file....!")
            return None
    except Exception as error:
        logger.error('download error %s', error)
项目:geocoder-ie    作者:devgateway    | 项目源码 | 文件源码
def query(location, cty_codes, query_method, fuzzy):
    results = []
    try:
        base_url = get_geonames_base_url()
        username = get_geonames_user_name()
        query_string = base_url + 'username={user}&{query_method}={name}&' \
                                  'style=FULL&orderby={order}&startRow=0&maxRows=5&fuzzy={fuzzy}' \
            .format(user=username, query_method=query_method, name=quote(location), order='relevance', fuzzy=fuzzy)
        if cty_codes and len(cty_codes) > 0:
            query_string = query_string + '&' + '&'.join([('country={}'.format(c)) for c in cty_codes])

        json_decode = json.JSONDecoder()  # used to parse json response
        response = urlopen(query_string)
        response_string = response.read().decode('utf-8')
        parsed_response = json_decode.decode(response_string)
        if parsed_response.get('geonames') and len(parsed_response.get('geonames')) > 0:
            for item in parsed_response['geonames']:
                results.append(parse(item))

    except URLError as e:
        logger.info("Oops!  something didn't go well")
        logger.info(e)

    return results
项目:learn-python    作者:xushubo    | 项目源码 | 文件源码
def fetch_xml(url):
    with request.urlopen(url) as f:
        print('Status:', f.status, f.reason)
        for k, v in f.getheaders():
            print('%s: %s' % (k, v))
        html = f.read().decode('utf-8')
    pattern_one = re.compile(r'<yweather:location.*?city="(.*?)".*?country="(.*?)".*?region="(.*?)".*?/>', re.S)
    pattern_two = re.compile(r'<yweather:forecast.*?date="(.*?)".*?day="(.*?)".*?high="(.*?)".*?low="(.*?)".*?text="(.*?)".*?/>', re.S)
    location_info = re.findall(pattern_one, html)
    items = re.findall(pattern_two, html)
    weather = {}
    weather['city'] = location_info[0][0]
    weather['country'] = location_info[0][1]
    weather['region'] = location_info[0][2]
    for item in items:
        weather[item[1]] = {}
        weather[item[1]]['data'] = item[0]
        weather[item[1]]['high'] = item[2]
        weather[item[1]]['low'] = item[3]
        weather[item[1]]['text'] = item[4]
    return weather
项目:cleverhans    作者:tensorflow    | 项目源码 | 文件源码
def download_image(image_id, url, x1, y1, x2, y2, output_dir):
    """Downloads one image, crops it, resizes it and saves it locally."""
    output_filename = os.path.join(output_dir, image_id + '.png')
    if os.path.exists(output_filename):
        # Don't download image if it's already there
        return True
    try:
        # Download image
        url_file = urlopen(url)
        if url_file.getcode() != 200:
            return False
        image_buffer = url_file.read()
        # Crop, resize and save image
        image = Image.open(BytesIO(image_buffer)).convert('RGB')
        w = image.size[0]
        h = image.size[1]
        image = image.crop((int(x1 * w), int(y1 * h), int(x2 * w),
                            int(y2 * h)))
        image = image.resize((299, 299), resample=Image.ANTIALIAS)
        image.save(output_filename)
    except IOError:
        return False
    return True
项目:danmu-bilibili    作者:saberxxy    | 项目源码 | 文件源码
def getSoup(start, stop):

    try:
        for number in range(start, stop+1):
            # http://space.bilibili.com/15989779/#!/
            url = 'http://space.bilibili.com/' + str(number) + '/#!/'
            response = request.urlopen(url)
            # print(response.getcode())
            html_cont = response.read()
            soup = BeautifulSoup(html_cont, 'lxml', from_encoding='utf-8')
            username = soup.find("h1").get_text().strip()[:-6]  # ?????
            uid = number  # number??uid
            get_gz_uid = GetFollowUid(number)
            gzsuid, gznumber = get_gz_uid.get_uids()  # ????id?????

            saveData(uid, username, gznumber, gzsuid)  # ?????
    except Exception:
        print("get page error")
        return getSoup(number+1, stop+1)


# ????
项目:danmu-bilibili    作者:saberxxy    | 项目源码 | 文件源码
def getSoup(start, stop):
    try:
        for number in range(start, stop + 1):
            # http://space.bilibili.com/15989779/#!/
            url = 'http://space.bilibili.com/'+str(number)+'/#!/'
            response = request.urlopen(url)
            # print(response.getcode())
            html_cont = response.read()
            soup = BeautifulSoup(html_cont, 'lxml', from_encoding='utf-8')
            username = soup.find("h1").get_text().strip()[:-6] # ?????
            uid = number  # number??uid
            get_fans_uid = GetFansUid(number)
            fansuid, fansnumber = get_fans_uid.get_uids()  # ????id?????

            saveData(uid, username, fansnumber, fansuid)  # ?????
    except Exception:
        print("get page error")
        return getSoup(number + 1, stop + 1)


# ????