Python urllib.request 模块,urlretrieve() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.request.urlretrieve()

项目:spider    作者:aweng126    | 项目源码 | 文件源码
def download_url_picture(img_url, file_name, folder_name, folder_base):
    try:
        file_path = folder_base + '/' + folder_name
        if not os.path.exists(file_path):
            print('???', file_path, '????????')
            os.makedirs(file_path)
        # ??????
        file_suffix = os.path.splitext(img_url)[1]
        # ???????????
        filename = '{}{}{}{}'.format(file_path, os.sep, file_name, file_suffix)
        # ?????????????
        request.urlretrieve(img_url, filename=filename)
    except IOError as e:
        print('??????', e)
    except Exception as e:
        print('?? ?', e)
项目:afl-mothership    作者:afl-mothership    | 项目源码 | 文件源码
def feed_queue(dest_dir, queue, queue_url, campaign_id):
    s = requests.Session()
    while True:
        logger.debug('fetching %s', queue_url)
        analysis_queue = s.get(queue_url).json()['crashes']
        for crash in analysis_queue:
            crash_name = str(crash['crash_id'])
            logger.info('downloading %s', crash_name)
            local_filename = os.path.join(dest_dir, crash_name)
            urllib_request.urlretrieve(crash['download'], filename=local_filename)
            logger.debug('%d crashes waiting', queue.qsize())
            queue.put((crash['crash_id'], local_filename))
项目:polyglot-server    作者:MontrealCorpusTools    | 项目源码 | 文件源码
def download_neo4j():
    if sys.platform.startswith('win'):
        dist_string = 'windows.zip'
        path = os.path.join(settings.POLYGLOT_TEMP_DIR, 'neo4j.zip')
    else:
        dist_string = 'unix.tar.gz'
        path = os.path.join(settings.POLYGLOT_TEMP_DIR, 'neo4j.tar.gz')
    if os.path.exists(path):
        return path
    os.makedirs(settings.POLYGLOT_TEMP_DIR, exist_ok=True)

    download_link = 'https://neo4j.com/artifact.php?name=neo4j-community-{version}-{dist_string}'.format(
        version=settings.NEO4J_VERSION, dist_string=dist_string)

    archive_path, headers = urlretrieve(download_link, path, data=None)
    return archive_path
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def loadData(src):
    print('Downloading ' + src)
    fname, h = urlretrieve(src, './delete.me')
    print('Done.')
    try:
        print('Extracting files...')
        with tarfile.open(fname) as tar:
            tar.extractall()
        print('Done.')
        print('Preparing train set...')
        trn = np.empty((0, numFeature + 1), dtype=np.int)
        for i in range(5):
            batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1)
            trn = np.vstack((trn, readBatch(batchName)))
        print('Done.')
        print('Preparing test set...')
        tst = readBatch('./cifar-10-batches-py/test_batch')
        print('Done.')
    finally:
        os.remove(fname)
    return (trn, tst)
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def loadData(src, cimg):
    gzfname, h = urlretrieve(src, './delete.me')
    try:
        with gzip.open(gzfname) as gz:
            n = struct.unpack('I', gz.read(4))
            if n[0] != 0x3080000:
                raise Exception('Invalid file: unexpected magic number.')
            n = struct.unpack('>I', gz.read(4))[0]
            if n != cimg:
                raise Exception('Invalid file: expected {0} entries.'.format(cimg))
            crow = struct.unpack('>I', gz.read(4))[0]
            ccol = struct.unpack('>I', gz.read(4))[0]
            if crow != 28 or ccol != 28:
                raise Exception('Invalid file: expected 28 rows/cols per image.')
            res = np.fromstring(gz.read(cimg * crow * ccol), dtype=np.uint8)
    finally:
        os.remove(gzfname)
    return res.reshape((cimg, crow * ccol))
项目:ai-gym    作者:tuzzer    | 项目源码 | 文件源码
def load_or_download_mnist_files(filename, num_samples, local_data_dir):

    if (local_data_dir):
        local_path = os.path.join(local_data_dir, filename)
    else:
        local_path = os.path.join(os.getcwd(), filename)

    if os.path.exists(local_path):
        gzfname = local_path
    else:
        local_data_dir = os.path.dirname(local_path)
        if not os.path.exists(local_data_dir):
            os.makedirs(local_data_dir)
        filename = "http://yann.lecun.com/exdb/mnist/" + filename
        print ("Downloading from" + filename, end=" ")
        gzfname, h = urlretrieve(filename, local_path)
        print ("[Done]")

    return gzfname
项目:aboleth    作者:data61    | 项目源码 | 文件源码
def fetch_data():
    """Download the data."""
    train_file = tempfile.NamedTemporaryFile()
    test_file = tempfile.NamedTemporaryFile()
    req.urlretrieve("http://mlr.cs.umass.edu/ml/machine-learning-databases"
                    "/adult/adult.data", train_file.name)
    req.urlretrieve("http://mlr.cs.umass.edu/ml/machine-learning-databases/"
                    "adult/adult.test", test_file.name)

    df_train = pd.read_csv(train_file, names=COLUMNS, skipinitialspace=True)
    df_test = pd.read_csv(test_file, names=COLUMNS, skipinitialspace=True,
                          skiprows=1)

    df_train[LABEL_COLUMN] = (df_train["income_bracket"]
                              .apply(lambda x: ">50K" in x)).astype(int)
    df_test[LABEL_COLUMN] = (df_test["income_bracket"]
                             .apply(lambda x: ">50K" in x)).astype(int)

    return df_train, df_test
项目:k8scntkSamples    作者:weehyong    | 项目源码 | 文件源码
def loadData(src):
    print ('Downloading ' + src)
    fname, h = urlretrieve(src, './delete.me')
    print ('Done.')
    try:
        print ('Extracting files...')
        with tarfile.open(fname) as tar:
            tar.extractall()
        print ('Done.')
        print ('Preparing train set...')
        trn = np.empty((0, numFeature + 1), dtype=np.int)
        for i in range(5):
            batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1)
            trn = np.vstack((trn, readBatch(batchName)))
        print ('Done.')
        print ('Preparing test set...')
        tst = readBatch('./cifar-10-batches-py/test_batch')
        print ('Done.')
    finally:
        os.remove(fname)
    return (trn, tst)
项目:k8scntkSamples    作者:weehyong    | 项目源码 | 文件源码
def loadData(src):
    print ('Downloading ' + src)
    fname, h = urlretrieve(src, './delete.me')
    print ('Done.')
    try:
        print ('Extracting files...')
        with tarfile.open(fname) as tar:
            tar.extractall()
        print ('Done.')
        print ('Preparing train set...')
        trn = np.empty((0, NumFeat + 1), dtype=np.int)
        for i in range(5):
            batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1)
            trn = np.vstack((trn, readBatch(batchName)))
        print ('Done.')
        print ('Preparing test set...')
        tst = readBatch('./cifar-10-batches-py/test_batch')
        print ('Done.')
    finally:
        os.remove(fname)
    return (trn, tst)
项目:k8scntkSamples    作者:weehyong    | 项目源码 | 文件源码
def loadData(src):
    print ('Downloading ' + src)
    fname, h = urlretrieve(src, './delete.me')
    print ('Done.')
    try:
        print ('Extracting files...')
        with tarfile.open(fname) as tar:
            tar.extractall()
        print ('Done.')
        print ('Preparing train set...')
        trn = np.empty((0, NumFeat + 1), dtype=np.int)
        for i in range(5):
            batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1)
            trn = np.vstack((trn, readBatch(batchName)))
        print ('Done.')
        print ('Preparing test set...')
        tst = readBatch('./cifar-10-batches-py/test_batch')
        print ('Done.')
    finally:
        os.remove(fname)
    return (trn, tst)
项目:coolq-telegram-bot    作者:jqqqqqqqqqq    | 项目源码 | 文件源码
def cq_download_pic(filename: str):
    """
    download image by cqimg file
    :param filename: cqimg file name
    """
    try:
        path = os.path.join(CQ_IMAGE_ROOT, filename)
        if os.path.exists(path):
            return

        cqimg = os.path.join(CQ_IMAGE_ROOT, filename + '.cqimg')
        parser = ConfigParser()
        parser.read(cqimg)

        url = parser['image']['url']
        urlretrieve(url, path)
    except:
        logger.error(filename)
        traceback.print_exc()
项目:coolq-telegram-bot    作者:jqqqqqqqqqq    | 项目源码 | 文件源码
def tg_get_pic_url(file_id: str, pic_type: str):
    """
    download image from Telegram Server, and generate new image link that send to QQ group
    :param file_id: telegram file id
    :param pic_type: picture extension name
    :return: pic url
    """
    file = global_vars.tg_bot.getFile(file_id)
    # urlretrieve(file.file_path, os.path.join(CQ_IMAGE_ROOT, file_id))  # download image
    file.download(custom_path=os.path.join(CQ_IMAGE_ROOT, file_id))
    if pic_type == 'jpg':
        create_jpg_image(CQ_IMAGE_ROOT, file_id)
        pic_url = get_short_url(SERVER_PIC_URL + file_id + '.jpg')
        return pic_url
    elif pic_type == 'png':
        create_png_image(CQ_IMAGE_ROOT, file_id)
        pic_url = get_short_url(SERVER_PIC_URL + file_id + '.png')
        return pic_url
    return ''

# endregion
项目:mimesis    作者:lk-geimfari    | 项目源码 | 文件源码
def download_image(url: str = '', save_path: str = '',
                   unverified_ctx: bool = False) -> Union[None, str]:
    """Download image and save in current directory on local machine.

    :param str url: URL to image.
    :param str save_path: Saving path.
    :param bool unverified_ctx: Create unverified context.
    :return: Image name.
    :rtype: str or None
    """
    if unverified_ctx:
        ssl._create_default_https_context = ssl._create_unverified_context

    if url is not None:
        image_name = url.rsplit('/')[-1]
        request.urlretrieve(url, save_path + image_name)
        return image_name
    return None
项目:tensorflow-cifar-10    作者:exelban    | 项目源码 | 文件源码
def maybe_download_and_extract():
    main_directory = "./data_set/"
    cifar_10_directory = main_directory+"cifar_10/"
    if not os.path.exists(main_directory):
        os.makedirs(main_directory)

        url = "http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz"
        filename = url.split('/')[-1]
        file_path = os.path.join(main_directory, filename)
        zip_cifar_10 = file_path
        file_path, _ = urlretrieve(url=url, filename=file_path, reporthook=_print_download_progress)

        print()
        print("Download finished. Extracting files.")
        if file_path.endswith(".zip"):
            zipfile.ZipFile(file=file_path, mode="r").extractall(main_directory)
        elif file_path.endswith((".tar.gz", ".tgz")):
            tarfile.open(name=file_path, mode="r:gz").extractall(main_directory)
        print("Done.")

        os.rename(main_directory+"./cifar-10-batches-py", cifar_10_directory)
        os.remove(zip_cifar_10)
项目:FalltoSkyBot    作者:Sakiut    | 项目源码 | 文件源码
def getLDIcon(pseudo):
    """Récupère l'icone leveldown depuis leur site"""

    pseudo = str(pseudo)
    url = "http://leveldown.fr/profile/{}".format(pseudo)
    fileName = "./ldsource/{}.html".format(pseudo)
    request.urlretrieve(url, fileName)

    with open(fileName) as f: lines = f.read().splitlines()

    Names = []
    Values = []

    for line in lines:
        if line.startswith('                            <div class="avatar"'):
            getLine = line.rstrip(');"></div>')
            Line = getLine.split("(")
            Line = Line[1]

    return Line

#####################################################################################################################################################
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def setUp(self):
        # Create a list of temporary files. Each item in the list is a file
        # name (absolute path or relative to the current working directory).
        # All files in this list will be deleted in the tearDown method. Note,
        # this only helps to makes sure temporary files get deleted, but it
        # does nothing about trying to close files that may still be open. It
        # is the responsibility of the developer to properly close files even
        # when exceptional conditions occur.
        self.tempFiles = []

        # Create a temporary file.
        self.registerFileForCleanUp(support.TESTFN)
        self.text = b'testing urllib.urlretrieve'
        try:
            FILE = open(support.TESTFN, 'wb')
            FILE.write(self.text)
            FILE.close()
        finally:
            try: FILE.close()
            except: pass
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_copy(self):
        # Test that setting the filename argument works.
        second_temp = "%s.2" % support.TESTFN
        self.registerFileForCleanUp(second_temp)
        result = urllib_request.urlretrieve(self.constructLocalFileUrl(
            support.TESTFN), second_temp)
        self.assertEqual(second_temp, result[0])
        self.assertTrue(os.path.exists(second_temp), "copy of the file was not "
                                                  "made")
        FILE = open(second_temp, 'rb')
        try:
            text = FILE.read()
            FILE.close()
        finally:
            try: FILE.close()
            except: pass
        self.assertEqual(self.text, text)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_short_content_raises_ContentTooShortError(self):
        self.fakehttp(b'''HTTP/1.1 200 OK
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
Connection: close
Content-Length: 100
Content-Type: text/html; charset=iso-8859-1

FF
''')

        def _reporthook(par1, par2, par3):
            pass

        with self.assertRaises(urllib_error.ContentTooShortError):
            try:
                urllib_request.urlretrieve('http://example.com/',
                                           reporthook=_reporthook)
            finally:
                self.unfakehttp()
项目:pygeotools    作者:dshean    | 项目源码 | 文件源码
def getfile(url, outdir=None):
    """Function to fetch files using urllib

    Works with ftp

    """
    fn = os.path.split(url)[-1]
    if outdir is not None:
        fn = os.path.join(outdir, fn)
    if not os.path.exists(fn):
        #Find appropriate urlretrieve for Python 2 and 3
        try:
            from urllib.request import urlretrieve
        except ImportError:
            from urllib import urlretrieve 
        print("Retrieving: %s" % url)
        #Add progress bar
        urlretrieve(url, fn)
    return fn

#Function to get files using requests
#Works with https authentication
项目:pytorch_60min_blitz    作者:kyuhyoung    | 项目源码 | 文件源码
def loadData(src):
    print('Downloading ' + src)
    fname, h = urlretrieve(src, './delete.me')
    print('Done.')
    try:
        print('Extracting files...')
        with tarfile.open(fname) as tar:
            tar.extractall()
        print('Done.')
        print('Preparing train set...')
        trn = np.empty((0, NumFeat + 1), dtype=np.int)
        for i in range(5):
            batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1)
            trn = np.vstack((trn, readBatch(batchName)))
        print('Done.')
        print('Preparing test set...')
        tst = readBatch('./cifar-10-batches-py/test_batch')
        print('Done.')
    finally:
        os.remove(fname)
    return (trn, tst)
项目:Cayenne-Agent    作者:myDevicesIoT    | 项目源码 | 文件源码
def TestDownload(self):
        try:
            a = datetime.now()
            info('Excuting regular download test for network speed')
            url = self.config.cloudConfig.DownloadSpeedTestUrl if 'DownloadSpeedTestUrl' in self.config.cloudConfig else defaultUrl
            debug(url + ' ' + download_path)
            request.urlretrieve(url, download_path)
            request.urlcleanup()
            b = datetime.now()
            c = b - a
            if path.exists(download_path):
                size = path.getsize(download_path)/mb
                self.downloadSpeed = size/c.total_seconds()
                remove(download_path)
                return True
        except socket_error as serr:
                error ('TestDownload:' + str(serr))
                ret = False
                Daemon.OnFailure('cloud', serr.errno)
                return
        except:
            exception('TestDownload Failed')
        return False
项目:TnyBot-Discord    作者:00firestar00    | 项目源码 | 文件源码
def url_request(self, url, file_path, proxy_url=None):
        try:
            urllib_request.urlretrieve(url, file_path)
            print(file_path)
        except HTTPError as e:
            print(e.code)
            if proxy_url is not None:
                print("Trying proxy URL")
                url = proxy_url
                await self.url_request(url, file_path)
            else:
                raise e
        except UnicodeEncodeError:
            # Special retry logic for IDN domains
            url = "http://" + url.replace("http://", "").encode("idna").decode("utf-8")
            await self.url_request(url, file_path)
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def run(self):
        print(self.url)
        for link,t in set(re.findall(r'(http://699pic.com/tupian[^\s]*?(html))', str(reqfun(self.url)))):
           #res = request.urlopen(link)
           #if re.split("\.",os.path.basename(link))[1].lower() == 'gif' or len(res.read()) <10000:
           #    continue
           #print(link)
           for piclink,t in set(re.findall(r'(http://seopic.699pic.com/photo/[^s]*?jpg[^s]*?(jpg))', str(reqfun(link)))):
                print(piclink,"loading...")
                try:
                    request.urlretrieve(piclink,self.saveFile(piclink))
                    time.sleep(1)
                    if os.path.exists(targetPath+os.path.basename(piclink)) is False:
                        request.urlretrieve(piclink,self.saveFile(piclink))
                        print("err")
                except:
                    print('??')
        q.task_done()
项目:vocabulary2anki.py    作者:lujun9972    | 项目源码 | 文件源码
def download_for_anki(url):
    if not url:
        return ""
    try:
        fileext = os.path.splitext(url)[1]
        filename = hashlib.md5(url.encode()).hexdigest() + fileext
        filepath = os.path.join("collection.media",filename)
        if not os.path.exists("collection.media"):
            os.makedirs("collection.media")
        request.urlretrieve(url,filepath)
        if fileext in ('.jpg','.jpeg','.gif','.png','.svg'):
            filename =  '<img src="{}"></img>'.format(filename)
        elif fileext in ('.mp3','.mp4','.wav'):
            filename = '[sound:{}]'.format(filename)
    except Exception as e:
        print("fetch {} error:{}".format(url,e),file=sys.stderr)
        filename = ""
    return filename
项目:pylmnn    作者:johny-c    | 项目源码 | 文件源码
def fetch_load_letters(data_dir=None):
    path = os.path.join(get_data_home(data_dir), 'letter-recognition.data')

    if not os.path.exists(path):
        from urllib import request
        url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/letter-recognition/letter-recognition.data'
        print('Downloading letter-recognition dataset from {}...'.format(url))
        request.urlretrieve(url=url, filename=path)
    else:
        print('Found letter-recognition in {}!'.format(path))

    X, y = [], []
    with open(path) as f:
        reader = csv.reader(f)
        for row in reader:
            y.append(row[0])
            X.append(row[1:])
    labels, label_idx = np.unique(y, return_inverse=True)
    return np.asarray(X, dtype=float), label_idx
项目:kotta_client    作者:yadudoc    | 项目源码 | 文件源码
def fetch(self, filename=None):
        """ Fetch the remote url to the local file

        Kwargs:
             - filename(string) : Specify path to download files to. Default=None

        """
        if self.url:
            if filename:
                return urlretrieve(self.url, filename)
            else:
                return urlretrieve(self.url, self.file)

        else:
            logger.warning("File %s was not generated on Kotta. Nothing to fetch", self.file)
            return False
项目:IC-Inventory    作者:tjschweizer    | 项目源码 | 文件源码
def addToInventory(self, octoPart, quantity):
        """
        Adds quantity of Octopart to the inventory. Performs a searchInventory to determine if part is already in
        inventory, then either adds the part to the inventory or simply updates the quantity if the part already
        exists.

        :param octoPart: Octopart object to add
        :type octoPart: octo_utils.Octopart
        :param quantity: Quantity of part to add to inventory
        :type quantity: int
        :return: None
        :rtype: None
        """
        index = self.searchInventory(octoPart)
        if index == -1:
            octoPart.quantity = quantity
            self.inventory.append(octoPart)
        else:
            self.inventory[index].quantity += quantity
        self.saveInventory()

        if octoPart.dataFile:
            request.urlretrieve(octoPart.dataURL, octoPart.dataFile)
项目:comp    作者:McSinyx    | 项目源码 | 文件源码
def json_extract_info(filename):
    """Return list of entries extracted from a file using json. If an
    error occur during the extraction, return None.
    """
    try:
        if not isfile(filename):
            filename = urlretrieve(filename)[0]
        with open(filename) as f: raw_info, info = json.load(f), []
        for i in raw_info:
            e = DEFAULT_ENTRY.copy()
            for k in e:
                if k in i and isinstance(i[k], type(e[k])): e[k] = i[k]
            info.append(e)
    except:
        return None
    else:
        return info
项目:PyIPOL    作者:martinResearch    | 项目源码 | 文件源码
def download_and_extract(f,subfolder=''):   
    print("downloading %s..."%f)


    local_filename, headers = urlretrieve(f)
    print("decompressin the zip file %s"%local_filename)
    if not os.path.isdir(extraction_directory):
        os.mkdir(extraction_directory)
    if f[-3:]=='zip':
        with zipfile.ZipFile(local_filename) as zf:
            zf.extractall(extraction_directory+subfolder)
    elif f[-2:]=='gz':
        with tarfile.open(local_filename, "r") as tar:
            tar.extractall(extraction_directory+subfolder)
    else:
        print('unrecognized archive type')
        raise
    print("done")
项目:OrderScaner    作者:Nikakto    | 项目源码 | 文件源码
def item_Load(itemID, bid, region):

    if bid:
        orderType = 'buy'
    else:
        orderType = 'sell'

    url = "https://crest-tq.eveonline.com/market/"+str(region)+"/orders/"+orderType+"/?type=https://crest-tq.eveonline.com/inventory/types/"+str(itemID)+"/"

    data_file = request.urlopen(url).read().decode('utf-8')
    #request.urlretrieve(url, "D:/result.json")

    #with open('D:/result.json') as data_file:
    data = json.loads(data_file)

    return data
项目:Isomantics    作者:mike-seeber    | 项目源码 | 文件源码
def polyglot_retrieve(polyglot_links):
    """Retrieve files from links and/or google drive"""
    for polyglot_link in polyglot_links:
        address, lg = polyglot_link
        # Retrieve url
        url.urlretrieve(address, '../data/polyglot/' + lg + '.pkl')

        # If the url file was too big, we retrieved an error file.
        # If we obtained the correct file, is is bytes and try will fail.
        try:
            with open('../data/polyglot/' + lg + '.pkl') as f:
                f_str = f.read()
            pattern = '<a href="https://docs.google.com/open\?id=(\w{28})">'
            g_id = re.findall(pattern, f_str)[0]
        except:
            g_id = None

        # If we retrieved the google id from the error file
        if g_id:
            f = drive.CreateFile({'id': g_id})
            f.GetContentFile('../data/polyglot/' + lg + '.pkl')
项目:maidstone-hackspace-website    作者:maidstone-hackspace    | 项目源码 | 文件源码
def download_remote_images():
    for article in Article.objects.all():
        if not article.original_image:
            continue
        try:
            result = urlretrieve(article.original_image.__str__())
            article.image.save(
                os.path.basename(article.original_image.__str__()),
                File(open(result[0], 'rb'))
            )
            render_variations(result[0], image_variations, replace=True)
            article.save()
        except:
            logger.exception(result)
            logger.exception(result[0])
            logger.exception('Unable to download remote image for %s' % article.original_image)
项目:mv3d    作者:lmb-freiburg    | 项目源码 | 文件源码
def download_and_unpack(url, dest):
    if not os.path.exists(dest):
        try:
            import urllib
            urllib.urlretrieve('http://google.com')
        except AttributeError:
            import urllib.request as urllib
        print("downloading " + dest + " ...")

        archive_name = dest + ".zip"
        urllib.urlretrieve(url, archive_name)
        in_file = open(archive_name, 'rb')
        z = zipfile.ZipFile(in_file)
        for name in z.namelist():
            print("extracting " + name)
            outpath = "./"
            z.extract(name, outpath)
        in_file.close()
        os.remove(archive_name)

        print("done.")
项目:pytest-cython    作者:lgpage    | 项目源码 | 文件源码
def download_file(url, path):
    print("Downloading: {} (into {})".format(url, path))
    progress = [0, 0]

    def report(count, size, total):
        progress[0] = count * size
        if progress[0] - progress[1] > 1000000:
            progress[1] = progress[0]
            print("Downloaded {:,}/{:,} ...".format(progress[1], total))

    dest, _ = urlretrieve(url, path, reporthook=report)
    return dest
项目:epubcheck    作者:titusz    | 项目源码 | 文件源码
def download_file(url, path):
    print("Downloading: {} (into {})".format(url, path))
    progress = [0, 0]

    def report(count, size, total):
        progress[0] = count * size
        if progress[0] - progress[1] > 1000000:
            progress[1] = progress[0]
            print("Downloaded {:,}/{:,} ...".format(progress[1], total))

    dest, _ = urlretrieve(url, path, reporthook=report)
    return dest
项目:python-twelve-tone    作者:accraze    | 项目源码 | 文件源码
def download_file(url, path):
    print("Downloading: {} (into {})".format(url, path))
    progress = [0, 0]

    def report(count, size, total):
        progress[0] = count * size
        if progress[0] - progress[1] > 1000000:
            progress[1] = progress[0]
            print("Downloaded {:,}/{:,} ...".format(progress[1], total))

    dest, _ = urlretrieve(url, path, reporthook=report)
    return dest
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def download_file(url, path):
    print("Downloading: {} (into {})".format(url, path))
    progress = [0, 0]

    def report(count, size, total):
        progress[0] = count * size
        if progress[0] - progress[1] > 1000000:
            progress[1] = progress[0]
            print("Downloaded {:,}/{:,} ...".format(progress[1], total))

    dest, _ = urlretrieve(url, path, reporthook=report)
    return dest
项目:PyWebRunner    作者:IntuitiveWebSolutions    | 项目源码 | 文件源码
def download_file(url, destination):
    return urlretrieve(url, destination)
项目:microbar    作者:Bengt    | 项目源码 | 文件源码
def download_file(url, path):
    print("Downloading: {} (into {})".format(url, path))
    progress = [0, 0]

    def report(count, size, total):
        progress[0] = count * size
        if progress[0] - progress[1] > 1000000:
            progress[1] = progress[0]
            print("Downloaded {:,}/{:,} ...".format(progress[1], total))

    dest, _ = urlretrieve(url, path, reporthook=report)
    return dest
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def _download_if_not_yet_done(self, source_path_or_url: str, target_path: Path) -> Path:
        if not target_path.is_file():
            log("Downloading corpus {} to {}".format(source_path_or_url, target_path))
            if self.base_url_or_directory.startswith("http"):
                request.urlretrieve(source_path_or_url, str(target_path))
            else:
                try:
                    subprocess.check_output(["scp", source_path_or_url, str(target_path)], stderr=subprocess.STDOUT)
                except subprocess.CalledProcessError as e:
                    raise IOError("Copying failed: " + str(e.output))

        return target_path
项目:photinia    作者:XoriieInpottn    | 项目源码 | 文件源码
def _download(self, filename):
        """????????????????

        :param filename: ???
        :return: ???????
        """
        file_path = os.path.join(self._dir, filename)
        if not os.path.exists(file_path):
            print('Downloading {}'.format(filename))
            request.urlretrieve(self._source_url + filename, file_path)
            print('Successfully downloaded {}'.format(filename))
        return file_path
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def get_php_references():
        download = urlretrieve(PHP_MANUAL_URL)
        tar = tarfile.open(download[0])
        tar.extractall()
        tar.close()
        for file in glob.glob("%s%s" % (PHP_MANUAL_DIR, PHP_REFERENCE_GLOB)):
            yield file
        os.remove(download[0])
项目:python-memwiper    作者:qlixed    | 项目源码 | 文件源码
def download_file(url, path):
    print("Downloading: {} (into {})".format(url, path))
    progress = [0, 0]

    def report(count, size, total):
        progress[0] = count * size
        if progress[0] - progress[1] > 1000000:
            progress[1] = progress[0]
            print("Downloaded {:,}/{:,} ...".format(progress[1], total))

    dest, _ = urlretrieve(url, path, reporthook=report)
    return dest
项目:python-markov-novel    作者:accraze    | 项目源码 | 文件源码
def download_file(url, path):
    print("Downloading: {} (into {})".format(url, path))
    progress = [0, 0]

    def report(count, size, total):
        progress[0] = count * size
        if progress[0] - progress[1] > 1000000:
            progress[1] = progress[0]
            print("Downloaded {:,}/{:,} ...".format(progress[1], total))

    dest, _ = urlretrieve(url, path, reporthook=report)
    return dest
项目:aws_emojipacks    作者:Surgo    | 项目源码 | 文件源码
def download_icons(icons_url, dist_path):
    dist_path, headers = urlretrieve(icons_url, filename=dist_path)
    logging.info("Download to: %s", dist_path)
    return dist_path
项目:UKCensusAPI    作者:virgesmith    | 项目源码 | 文件源码
def get_data(self, table, table_internal, query_params, r_compat=False):
    """Downloads or retrieves data given a table and query parameters.
    Args:
       table: ONS table name
       table_internal: nomisweb table code (can be found in metadata)
       query_params: table query parameters
    Returns:
        a dataframe containing the data. If downloaded, the data is also cached to a file
    """
    query_params["uid"] = Nomisweb.KEY
    query_string = self.get_url(table_internal, query_params)

    filename = self.cache_dir + table + "_" + hashlib.md5(query_string.encode()).hexdigest()+".tsv"

    # retrieve if not in cache
    if not os.path.isfile(filename):
      meta = self.get_metadata(table)
      self.write_metadata(table, meta)
      print("Downloading and cacheing data: " + filename)
      request.urlretrieve(query_string, filename) #, timeout = Nomisweb.Timeout)

      # check for empty file, if so delete it and report error
      if os.stat(filename).st_size == 0:
        os.remove(filename)
        errormsg = "ERROR: Query returned no data. Check table and query parameters"
        if r_compat:
          return errormsg
        print(errormsg)
        return
    else:
      print("Using cached data: " + filename)

    # now load from cache and return
    if r_compat:
      return filename
    return pd.read_csv(filename, delimiter='\t')
项目:polyglot-server    作者:MontrealCorpusTools    | 项目源码 | 文件源码
def download_influxdb():
    if sys.platform.startswith('win'):
        dist_string = 'windows_amd64.zip'
        path = os.path.join(settings.POLYGLOT_TEMP_DIR, 'influxdb.zip')
    else:
        dist_string = 'linux_amd64.tar.gz'
        path = os.path.join(settings.POLYGLOT_TEMP_DIR, 'influxdb.tar.gz')
    if os.path.exists(path):
        return path
    os.makedirs(settings.POLYGLOT_TEMP_DIR, exist_ok=True)

    download_link = 'https://dl.influxdata.com/influxdb/releases/influxdb-{version}_{dist_string}'.format(
        version=settings.INFLUXDB_VERSION, dist_string=dist_string)
    archive_path, headers = urlretrieve(download_link, path, data=None)
    return archive_path
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def loadLabels(src, cimg):
    gzfname, h = urlretrieve(src, './delete.me')
    try:
        with gzip.open(gzfname) as gz:
            n = struct.unpack('I', gz.read(4))
            if n[0] != 0x1080000:
                raise Exception('Invalid file: unexpected magic number.')
            n = struct.unpack('>I', gz.read(4))
            if n[0] != cimg:
                raise Exception('Invalid file: expected {0} rows.'.format(cimg))
            res = np.fromstring(gz.read(cimg), dtype=np.uint8)
    finally:
        os.remove(gzfname)
    return res.reshape((cimg, 1))
项目:enaBrowserTools    作者:enasequence    | 项目源码 | 文件源码
def download_single_record(url, dest_file):
        urlrequest.urlretrieve(url, dest_file)
项目:enaBrowserTools    作者:enasequence    | 项目源码 | 文件源码
def get_ftp_file(ftp_url, dest_dir):
    try:
        filename = ftp_url.split('/')[-1]
        dest_file = os.path.join(dest_dir, filename)
        urlrequest.urlretrieve(ftp_url, dest_file)
        return True
    except Exception as e:
        print ("Error with FTP transfer: {0}".format(e))
        return False