我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.request.urlretrieve()。
def download_url_picture(img_url, file_name, folder_name, folder_base): try: file_path = folder_base + '/' + folder_name if not os.path.exists(file_path): print('???', file_path, '????????') os.makedirs(file_path) # ?????? file_suffix = os.path.splitext(img_url)[1] # ??????????? filename = '{}{}{}{}'.format(file_path, os.sep, file_name, file_suffix) # ????????????? request.urlretrieve(img_url, filename=filename) except IOError as e: print('??????', e) except Exception as e: print('?? ?', e)
def feed_queue(dest_dir, queue, queue_url, campaign_id): s = requests.Session() while True: logger.debug('fetching %s', queue_url) analysis_queue = s.get(queue_url).json()['crashes'] for crash in analysis_queue: crash_name = str(crash['crash_id']) logger.info('downloading %s', crash_name) local_filename = os.path.join(dest_dir, crash_name) urllib_request.urlretrieve(crash['download'], filename=local_filename) logger.debug('%d crashes waiting', queue.qsize()) queue.put((crash['crash_id'], local_filename))
def download_neo4j(): if sys.platform.startswith('win'): dist_string = 'windows.zip' path = os.path.join(settings.POLYGLOT_TEMP_DIR, 'neo4j.zip') else: dist_string = 'unix.tar.gz' path = os.path.join(settings.POLYGLOT_TEMP_DIR, 'neo4j.tar.gz') if os.path.exists(path): return path os.makedirs(settings.POLYGLOT_TEMP_DIR, exist_ok=True) download_link = 'https://neo4j.com/artifact.php?name=neo4j-community-{version}-{dist_string}'.format( version=settings.NEO4J_VERSION, dist_string=dist_string) archive_path, headers = urlretrieve(download_link, path, data=None) return archive_path
def loadData(src): print('Downloading ' + src) fname, h = urlretrieve(src, './delete.me') print('Done.') try: print('Extracting files...') with tarfile.open(fname) as tar: tar.extractall() print('Done.') print('Preparing train set...') trn = np.empty((0, numFeature + 1), dtype=np.int) for i in range(5): batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1) trn = np.vstack((trn, readBatch(batchName))) print('Done.') print('Preparing test set...') tst = readBatch('./cifar-10-batches-py/test_batch') print('Done.') finally: os.remove(fname) return (trn, tst)
def loadData(src, cimg): gzfname, h = urlretrieve(src, './delete.me') try: with gzip.open(gzfname) as gz: n = struct.unpack('I', gz.read(4)) if n[0] != 0x3080000: raise Exception('Invalid file: unexpected magic number.') n = struct.unpack('>I', gz.read(4))[0] if n != cimg: raise Exception('Invalid file: expected {0} entries.'.format(cimg)) crow = struct.unpack('>I', gz.read(4))[0] ccol = struct.unpack('>I', gz.read(4))[0] if crow != 28 or ccol != 28: raise Exception('Invalid file: expected 28 rows/cols per image.') res = np.fromstring(gz.read(cimg * crow * ccol), dtype=np.uint8) finally: os.remove(gzfname) return res.reshape((cimg, crow * ccol))
def load_or_download_mnist_files(filename, num_samples, local_data_dir): if (local_data_dir): local_path = os.path.join(local_data_dir, filename) else: local_path = os.path.join(os.getcwd(), filename) if os.path.exists(local_path): gzfname = local_path else: local_data_dir = os.path.dirname(local_path) if not os.path.exists(local_data_dir): os.makedirs(local_data_dir) filename = "http://yann.lecun.com/exdb/mnist/" + filename print ("Downloading from" + filename, end=" ") gzfname, h = urlretrieve(filename, local_path) print ("[Done]") return gzfname
def fetch_data(): """Download the data.""" train_file = tempfile.NamedTemporaryFile() test_file = tempfile.NamedTemporaryFile() req.urlretrieve("http://mlr.cs.umass.edu/ml/machine-learning-databases" "/adult/adult.data", train_file.name) req.urlretrieve("http://mlr.cs.umass.edu/ml/machine-learning-databases/" "adult/adult.test", test_file.name) df_train = pd.read_csv(train_file, names=COLUMNS, skipinitialspace=True) df_test = pd.read_csv(test_file, names=COLUMNS, skipinitialspace=True, skiprows=1) df_train[LABEL_COLUMN] = (df_train["income_bracket"] .apply(lambda x: ">50K" in x)).astype(int) df_test[LABEL_COLUMN] = (df_test["income_bracket"] .apply(lambda x: ">50K" in x)).astype(int) return df_train, df_test
def loadData(src): print ('Downloading ' + src) fname, h = urlretrieve(src, './delete.me') print ('Done.') try: print ('Extracting files...') with tarfile.open(fname) as tar: tar.extractall() print ('Done.') print ('Preparing train set...') trn = np.empty((0, numFeature + 1), dtype=np.int) for i in range(5): batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1) trn = np.vstack((trn, readBatch(batchName))) print ('Done.') print ('Preparing test set...') tst = readBatch('./cifar-10-batches-py/test_batch') print ('Done.') finally: os.remove(fname) return (trn, tst)
def loadData(src): print ('Downloading ' + src) fname, h = urlretrieve(src, './delete.me') print ('Done.') try: print ('Extracting files...') with tarfile.open(fname) as tar: tar.extractall() print ('Done.') print ('Preparing train set...') trn = np.empty((0, NumFeat + 1), dtype=np.int) for i in range(5): batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1) trn = np.vstack((trn, readBatch(batchName))) print ('Done.') print ('Preparing test set...') tst = readBatch('./cifar-10-batches-py/test_batch') print ('Done.') finally: os.remove(fname) return (trn, tst)
def cq_download_pic(filename: str): """ download image by cqimg file :param filename: cqimg file name """ try: path = os.path.join(CQ_IMAGE_ROOT, filename) if os.path.exists(path): return cqimg = os.path.join(CQ_IMAGE_ROOT, filename + '.cqimg') parser = ConfigParser() parser.read(cqimg) url = parser['image']['url'] urlretrieve(url, path) except: logger.error(filename) traceback.print_exc()
def tg_get_pic_url(file_id: str, pic_type: str): """ download image from Telegram Server, and generate new image link that send to QQ group :param file_id: telegram file id :param pic_type: picture extension name :return: pic url """ file = global_vars.tg_bot.getFile(file_id) # urlretrieve(file.file_path, os.path.join(CQ_IMAGE_ROOT, file_id)) # download image file.download(custom_path=os.path.join(CQ_IMAGE_ROOT, file_id)) if pic_type == 'jpg': create_jpg_image(CQ_IMAGE_ROOT, file_id) pic_url = get_short_url(SERVER_PIC_URL + file_id + '.jpg') return pic_url elif pic_type == 'png': create_png_image(CQ_IMAGE_ROOT, file_id) pic_url = get_short_url(SERVER_PIC_URL + file_id + '.png') return pic_url return '' # endregion
def download_image(url: str = '', save_path: str = '', unverified_ctx: bool = False) -> Union[None, str]: """Download image and save in current directory on local machine. :param str url: URL to image. :param str save_path: Saving path. :param bool unverified_ctx: Create unverified context. :return: Image name. :rtype: str or None """ if unverified_ctx: ssl._create_default_https_context = ssl._create_unverified_context if url is not None: image_name = url.rsplit('/')[-1] request.urlretrieve(url, save_path + image_name) return image_name return None
def maybe_download_and_extract(): main_directory = "./data_set/" cifar_10_directory = main_directory+"cifar_10/" if not os.path.exists(main_directory): os.makedirs(main_directory) url = "http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz" filename = url.split('/')[-1] file_path = os.path.join(main_directory, filename) zip_cifar_10 = file_path file_path, _ = urlretrieve(url=url, filename=file_path, reporthook=_print_download_progress) print() print("Download finished. Extracting files.") if file_path.endswith(".zip"): zipfile.ZipFile(file=file_path, mode="r").extractall(main_directory) elif file_path.endswith((".tar.gz", ".tgz")): tarfile.open(name=file_path, mode="r:gz").extractall(main_directory) print("Done.") os.rename(main_directory+"./cifar-10-batches-py", cifar_10_directory) os.remove(zip_cifar_10)
def getLDIcon(pseudo): """Récupère l'icone leveldown depuis leur site""" pseudo = str(pseudo) url = "http://leveldown.fr/profile/{}".format(pseudo) fileName = "./ldsource/{}.html".format(pseudo) request.urlretrieve(url, fileName) with open(fileName) as f: lines = f.read().splitlines() Names = [] Values = [] for line in lines: if line.startswith(' <div class="avatar"'): getLine = line.rstrip(');"></div>') Line = getLine.split("(") Line = Line[1] return Line #####################################################################################################################################################
def setUp(self): # Create a list of temporary files. Each item in the list is a file # name (absolute path or relative to the current working directory). # All files in this list will be deleted in the tearDown method. Note, # this only helps to makes sure temporary files get deleted, but it # does nothing about trying to close files that may still be open. It # is the responsibility of the developer to properly close files even # when exceptional conditions occur. self.tempFiles = [] # Create a temporary file. self.registerFileForCleanUp(support.TESTFN) self.text = b'testing urllib.urlretrieve' try: FILE = open(support.TESTFN, 'wb') FILE.write(self.text) FILE.close() finally: try: FILE.close() except: pass
def test_copy(self): # Test that setting the filename argument works. second_temp = "%s.2" % support.TESTFN self.registerFileForCleanUp(second_temp) result = urllib_request.urlretrieve(self.constructLocalFileUrl( support.TESTFN), second_temp) self.assertEqual(second_temp, result[0]) self.assertTrue(os.path.exists(second_temp), "copy of the file was not " "made") FILE = open(second_temp, 'rb') try: text = FILE.read() FILE.close() finally: try: FILE.close() except: pass self.assertEqual(self.text, text)
def test_short_content_raises_ContentTooShortError(self): self.fakehttp(b'''HTTP/1.1 200 OK Date: Wed, 02 Jan 2008 03:03:54 GMT Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e Connection: close Content-Length: 100 Content-Type: text/html; charset=iso-8859-1 FF ''') def _reporthook(par1, par2, par3): pass with self.assertRaises(urllib_error.ContentTooShortError): try: urllib_request.urlretrieve('http://example.com/', reporthook=_reporthook) finally: self.unfakehttp()
def getfile(url, outdir=None): """Function to fetch files using urllib Works with ftp """ fn = os.path.split(url)[-1] if outdir is not None: fn = os.path.join(outdir, fn) if not os.path.exists(fn): #Find appropriate urlretrieve for Python 2 and 3 try: from urllib.request import urlretrieve except ImportError: from urllib import urlretrieve print("Retrieving: %s" % url) #Add progress bar urlretrieve(url, fn) return fn #Function to get files using requests #Works with https authentication
def loadData(src): print('Downloading ' + src) fname, h = urlretrieve(src, './delete.me') print('Done.') try: print('Extracting files...') with tarfile.open(fname) as tar: tar.extractall() print('Done.') print('Preparing train set...') trn = np.empty((0, NumFeat + 1), dtype=np.int) for i in range(5): batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1) trn = np.vstack((trn, readBatch(batchName))) print('Done.') print('Preparing test set...') tst = readBatch('./cifar-10-batches-py/test_batch') print('Done.') finally: os.remove(fname) return (trn, tst)
def TestDownload(self): try: a = datetime.now() info('Excuting regular download test for network speed') url = self.config.cloudConfig.DownloadSpeedTestUrl if 'DownloadSpeedTestUrl' in self.config.cloudConfig else defaultUrl debug(url + ' ' + download_path) request.urlretrieve(url, download_path) request.urlcleanup() b = datetime.now() c = b - a if path.exists(download_path): size = path.getsize(download_path)/mb self.downloadSpeed = size/c.total_seconds() remove(download_path) return True except socket_error as serr: error ('TestDownload:' + str(serr)) ret = False Daemon.OnFailure('cloud', serr.errno) return except: exception('TestDownload Failed') return False
def url_request(self, url, file_path, proxy_url=None): try: urllib_request.urlretrieve(url, file_path) print(file_path) except HTTPError as e: print(e.code) if proxy_url is not None: print("Trying proxy URL") url = proxy_url await self.url_request(url, file_path) else: raise e except UnicodeEncodeError: # Special retry logic for IDN domains url = "http://" + url.replace("http://", "").encode("idna").decode("utf-8") await self.url_request(url, file_path)
def run(self): print(self.url) for link,t in set(re.findall(r'(http://699pic.com/tupian[^\s]*?(html))', str(reqfun(self.url)))): #res = request.urlopen(link) #if re.split("\.",os.path.basename(link))[1].lower() == 'gif' or len(res.read()) <10000: # continue #print(link) for piclink,t in set(re.findall(r'(http://seopic.699pic.com/photo/[^s]*?jpg[^s]*?(jpg))', str(reqfun(link)))): print(piclink,"loading...") try: request.urlretrieve(piclink,self.saveFile(piclink)) time.sleep(1) if os.path.exists(targetPath+os.path.basename(piclink)) is False: request.urlretrieve(piclink,self.saveFile(piclink)) print("err") except: print('??') q.task_done()
def download_for_anki(url): if not url: return "" try: fileext = os.path.splitext(url)[1] filename = hashlib.md5(url.encode()).hexdigest() + fileext filepath = os.path.join("collection.media",filename) if not os.path.exists("collection.media"): os.makedirs("collection.media") request.urlretrieve(url,filepath) if fileext in ('.jpg','.jpeg','.gif','.png','.svg'): filename = '<img src="{}"></img>'.format(filename) elif fileext in ('.mp3','.mp4','.wav'): filename = '[sound:{}]'.format(filename) except Exception as e: print("fetch {} error:{}".format(url,e),file=sys.stderr) filename = "" return filename
def fetch_load_letters(data_dir=None): path = os.path.join(get_data_home(data_dir), 'letter-recognition.data') if not os.path.exists(path): from urllib import request url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/letter-recognition/letter-recognition.data' print('Downloading letter-recognition dataset from {}...'.format(url)) request.urlretrieve(url=url, filename=path) else: print('Found letter-recognition in {}!'.format(path)) X, y = [], [] with open(path) as f: reader = csv.reader(f) for row in reader: y.append(row[0]) X.append(row[1:]) labels, label_idx = np.unique(y, return_inverse=True) return np.asarray(X, dtype=float), label_idx
def fetch(self, filename=None): """ Fetch the remote url to the local file Kwargs: - filename(string) : Specify path to download files to. Default=None """ if self.url: if filename: return urlretrieve(self.url, filename) else: return urlretrieve(self.url, self.file) else: logger.warning("File %s was not generated on Kotta. Nothing to fetch", self.file) return False
def addToInventory(self, octoPart, quantity): """ Adds quantity of Octopart to the inventory. Performs a searchInventory to determine if part is already in inventory, then either adds the part to the inventory or simply updates the quantity if the part already exists. :param octoPart: Octopart object to add :type octoPart: octo_utils.Octopart :param quantity: Quantity of part to add to inventory :type quantity: int :return: None :rtype: None """ index = self.searchInventory(octoPart) if index == -1: octoPart.quantity = quantity self.inventory.append(octoPart) else: self.inventory[index].quantity += quantity self.saveInventory() if octoPart.dataFile: request.urlretrieve(octoPart.dataURL, octoPart.dataFile)
def json_extract_info(filename): """Return list of entries extracted from a file using json. If an error occur during the extraction, return None. """ try: if not isfile(filename): filename = urlretrieve(filename)[0] with open(filename) as f: raw_info, info = json.load(f), [] for i in raw_info: e = DEFAULT_ENTRY.copy() for k in e: if k in i and isinstance(i[k], type(e[k])): e[k] = i[k] info.append(e) except: return None else: return info
def download_and_extract(f,subfolder=''): print("downloading %s..."%f) local_filename, headers = urlretrieve(f) print("decompressin the zip file %s"%local_filename) if not os.path.isdir(extraction_directory): os.mkdir(extraction_directory) if f[-3:]=='zip': with zipfile.ZipFile(local_filename) as zf: zf.extractall(extraction_directory+subfolder) elif f[-2:]=='gz': with tarfile.open(local_filename, "r") as tar: tar.extractall(extraction_directory+subfolder) else: print('unrecognized archive type') raise print("done")
def item_Load(itemID, bid, region): if bid: orderType = 'buy' else: orderType = 'sell' url = "https://crest-tq.eveonline.com/market/"+str(region)+"/orders/"+orderType+"/?type=https://crest-tq.eveonline.com/inventory/types/"+str(itemID)+"/" data_file = request.urlopen(url).read().decode('utf-8') #request.urlretrieve(url, "D:/result.json") #with open('D:/result.json') as data_file: data = json.loads(data_file) return data
def polyglot_retrieve(polyglot_links): """Retrieve files from links and/or google drive""" for polyglot_link in polyglot_links: address, lg = polyglot_link # Retrieve url url.urlretrieve(address, '../data/polyglot/' + lg + '.pkl') # If the url file was too big, we retrieved an error file. # If we obtained the correct file, is is bytes and try will fail. try: with open('../data/polyglot/' + lg + '.pkl') as f: f_str = f.read() pattern = '<a href="https://docs.google.com/open\?id=(\w{28})">' g_id = re.findall(pattern, f_str)[0] except: g_id = None # If we retrieved the google id from the error file if g_id: f = drive.CreateFile({'id': g_id}) f.GetContentFile('../data/polyglot/' + lg + '.pkl')
def download_remote_images(): for article in Article.objects.all(): if not article.original_image: continue try: result = urlretrieve(article.original_image.__str__()) article.image.save( os.path.basename(article.original_image.__str__()), File(open(result[0], 'rb')) ) render_variations(result[0], image_variations, replace=True) article.save() except: logger.exception(result) logger.exception(result[0]) logger.exception('Unable to download remote image for %s' % article.original_image)
def download_and_unpack(url, dest): if not os.path.exists(dest): try: import urllib urllib.urlretrieve('http://google.com') except AttributeError: import urllib.request as urllib print("downloading " + dest + " ...") archive_name = dest + ".zip" urllib.urlretrieve(url, archive_name) in_file = open(archive_name, 'rb') z = zipfile.ZipFile(in_file) for name in z.namelist(): print("extracting " + name) outpath = "./" z.extract(name, outpath) in_file.close() os.remove(archive_name) print("done.")
def download_file(url, path): print("Downloading: {} (into {})".format(url, path)) progress = [0, 0] def report(count, size, total): progress[0] = count * size if progress[0] - progress[1] > 1000000: progress[1] = progress[0] print("Downloaded {:,}/{:,} ...".format(progress[1], total)) dest, _ = urlretrieve(url, path, reporthook=report) return dest
def download_file(url, destination): return urlretrieve(url, destination)
def _download_if_not_yet_done(self, source_path_or_url: str, target_path: Path) -> Path: if not target_path.is_file(): log("Downloading corpus {} to {}".format(source_path_or_url, target_path)) if self.base_url_or_directory.startswith("http"): request.urlretrieve(source_path_or_url, str(target_path)) else: try: subprocess.check_output(["scp", source_path_or_url, str(target_path)], stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: raise IOError("Copying failed: " + str(e.output)) return target_path
def _download(self, filename): """???????????????? :param filename: ??? :return: ??????? """ file_path = os.path.join(self._dir, filename) if not os.path.exists(file_path): print('Downloading {}'.format(filename)) request.urlretrieve(self._source_url + filename, file_path) print('Successfully downloaded {}'.format(filename)) return file_path
def get_php_references(): download = urlretrieve(PHP_MANUAL_URL) tar = tarfile.open(download[0]) tar.extractall() tar.close() for file in glob.glob("%s%s" % (PHP_MANUAL_DIR, PHP_REFERENCE_GLOB)): yield file os.remove(download[0])
def download_icons(icons_url, dist_path): dist_path, headers = urlretrieve(icons_url, filename=dist_path) logging.info("Download to: %s", dist_path) return dist_path
def get_data(self, table, table_internal, query_params, r_compat=False): """Downloads or retrieves data given a table and query parameters. Args: table: ONS table name table_internal: nomisweb table code (can be found in metadata) query_params: table query parameters Returns: a dataframe containing the data. If downloaded, the data is also cached to a file """ query_params["uid"] = Nomisweb.KEY query_string = self.get_url(table_internal, query_params) filename = self.cache_dir + table + "_" + hashlib.md5(query_string.encode()).hexdigest()+".tsv" # retrieve if not in cache if not os.path.isfile(filename): meta = self.get_metadata(table) self.write_metadata(table, meta) print("Downloading and cacheing data: " + filename) request.urlretrieve(query_string, filename) #, timeout = Nomisweb.Timeout) # check for empty file, if so delete it and report error if os.stat(filename).st_size == 0: os.remove(filename) errormsg = "ERROR: Query returned no data. Check table and query parameters" if r_compat: return errormsg print(errormsg) return else: print("Using cached data: " + filename) # now load from cache and return if r_compat: return filename return pd.read_csv(filename, delimiter='\t')
def download_influxdb(): if sys.platform.startswith('win'): dist_string = 'windows_amd64.zip' path = os.path.join(settings.POLYGLOT_TEMP_DIR, 'influxdb.zip') else: dist_string = 'linux_amd64.tar.gz' path = os.path.join(settings.POLYGLOT_TEMP_DIR, 'influxdb.tar.gz') if os.path.exists(path): return path os.makedirs(settings.POLYGLOT_TEMP_DIR, exist_ok=True) download_link = 'https://dl.influxdata.com/influxdb/releases/influxdb-{version}_{dist_string}'.format( version=settings.INFLUXDB_VERSION, dist_string=dist_string) archive_path, headers = urlretrieve(download_link, path, data=None) return archive_path
def loadLabels(src, cimg): gzfname, h = urlretrieve(src, './delete.me') try: with gzip.open(gzfname) as gz: n = struct.unpack('I', gz.read(4)) if n[0] != 0x1080000: raise Exception('Invalid file: unexpected magic number.') n = struct.unpack('>I', gz.read(4)) if n[0] != cimg: raise Exception('Invalid file: expected {0} rows.'.format(cimg)) res = np.fromstring(gz.read(cimg), dtype=np.uint8) finally: os.remove(gzfname) return res.reshape((cimg, 1))
def download_single_record(url, dest_file): urlrequest.urlretrieve(url, dest_file)
def get_ftp_file(ftp_url, dest_dir): try: filename = ftp_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) urlrequest.urlretrieve(ftp_url, dest_file) return True except Exception as e: print ("Error with FTP transfer: {0}".format(e)) return False