我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用shutil.copyfileobj()。
def save_exported_media_to_file(logger, export_dir, media_file, filename, extension): """ Write exported media item to disk at specified location with specified file name. Any existing file with the same name will be overwritten. :param logger: the logger :param export_dir: path to directory for exports :param media_file: media file to write to disc :param filename: filename to give exported image :param extension: extension to give exported image """ if not os.path.exists(export_dir): logger.info("Creating directory at {0} for media files.".format(export_dir)) os.makedirs(export_dir) file_path = os.path.join(export_dir, filename + '.' + extension) if os.path.isfile(file_path): logger.info('Overwriting existing report at ' + file_path) try: with open(file_path, 'wb') as out_file: shutil.copyfileobj(media_file.raw, out_file) del media_file except Exception as ex: log_critical_error(logger, ex, 'Exception while writing' + file_path + ' to file')
def _unsafe_writes(self, src, dest, exception): # sadly there are some situations where we cannot ensure atomicity, but only if # the user insists and we get the appropriate error we update the file unsafely if exception.errno == errno.EBUSY: #TODO: issue warning that this is an unsafe operation, but doing it cause user insists try: try: out_dest = open(dest, 'wb') in_src = open(src, 'rb') shutil.copyfileobj(in_src, out_dest) finally: # assuring closed files in 2.4 compatible way if out_dest: out_dest.close() if in_src: in_src.close() except (shutil.Error, OSError, IOError): e = get_exception() self.fail_json(msg='Could not write data to file (%s) from (%s): %s' % (dest, src, e)) else: self.fail_json(msg='Could not replace file: %s to %s: %s' % (src, dest, exception))
def copyfileobj(src, dst, length=None, exception=OSError, bufsize=None): """Copy length bytes from fileobj src to fileobj dst. If length is None, copy the entire content. """ bufsize = bufsize or 16 * 1024 if length == 0: return if length is None: shutil.copyfileobj(src, dst, bufsize) return blocks, remainder = divmod(length, bufsize) for b in range(blocks): buf = src.read(bufsize) if len(buf) < bufsize: raise exception("unexpected end of data") dst.write(buf) if remainder != 0: buf = src.read(remainder) if len(buf) < remainder: raise exception("unexpected end of data") dst.write(buf) return
def addfile(self, tarinfo, fileobj=None): """Add the TarInfo object `tarinfo' to the archive. If `fileobj' is given, it should be a binary file, and tarinfo.size bytes are read from it and added to the archive. You can create TarInfo objects directly, or by using gettarinfo(). """ self._check("awx") tarinfo = copy.copy(tarinfo) buf = tarinfo.tobuf(self.format, self.encoding, self.errors) self.fileobj.write(buf) self.offset += len(buf) bufsize=self.copybufsize # If there's data to follow, append it. if fileobj is not None: copyfileobj(fileobj, self.fileobj, tarinfo.size, bufsize=bufsize) blocks, remainder = divmod(tarinfo.size, BLOCKSIZE) if remainder > 0: self.fileobj.write(NUL * (BLOCKSIZE - remainder)) blocks += 1 self.offset += blocks * BLOCKSIZE self.members.append(tarinfo)
def merge_files(groups, outdir): """ Merge files that belong to the same filename group. Merged files are created in the output directory. Args: groups: Dictionary of filename groups from `group_filenames`. outdir: Output path for merged files. """ logger = logging.getLogger("mergeFQs." + "merge") for groupname, filenames in groups.iteritems(): logger.info("Merging group " + groupname + " with " + str(len(filenames)) + " files...") outpath = os.path.join(outdir, groupname) logger.info("Creating merge file " + outpath + "...") with open(outpath, "wb") as outfile: for filename in filenames: logger.info("Adding " + filename + "...") with open(filename, "rb") as fq_file: shutil.copyfileobj(fq_file, outfile)
def concatenate_mtx(mtx_list, out_mtx): if len(mtx_list) == 0: return with open(out_mtx, 'w') as out_file: # write header with open(mtx_list[0], 'r') as in_file: out_file.write(in_file.readline()) out_file.write(in_file.readline()) (genes, bcs, data) = map(int, in_file.readline().rstrip().split()) for in_mtx in mtx_list[1:]: with open(in_mtx, 'r') as in_file: in_file.readline() in_file.readline() (_, _, mo_data) = map(int, in_file.readline().rstrip().split()) data += mo_data out_file.write(' '.join(map(str, [genes, bcs, data])) + '\n') # write data for in_mtx in mtx_list: with open(in_mtx, 'r') as in_file: for i in range(3): in_file.readline() shutil.copyfileobj(in_file, out_file)
def copyfileobj(src, dst, length=None): """Copy length bytes from fileobj src to fileobj dst. If length is None, copy the entire content. """ if length == 0: return if length is None: shutil.copyfileobj(src, dst) return BUFSIZE = 16 * 1024 blocks, remainder = divmod(length, BUFSIZE) for b in xrange(blocks): buf = src.read(BUFSIZE) if len(buf) < BUFSIZE: raise IOError("end of file reached") dst.write(buf) if remainder != 0: buf = src.read(remainder) if len(buf) < remainder: raise IOError("end of file reached") dst.write(buf) return
def addfile(self, tarinfo, fileobj=None): """Add the TarInfo object `tarinfo' to the archive. If `fileobj' is given, tarinfo.size bytes are read from it and added to the archive. You can create TarInfo objects using gettarinfo(). On Windows platforms, `fileobj' should always be opened with mode 'rb' to avoid irritation about the file size. """ self._check("aw") tarinfo = copy.copy(tarinfo) buf = tarinfo.tobuf(self.format, self.encoding, self.errors) self.fileobj.write(buf) self.offset += len(buf) # If there's data to follow, append it. if fileobj is not None: copyfileobj(fileobj, self.fileobj, tarinfo.size) blocks, remainder = divmod(tarinfo.size, BLOCKSIZE) if remainder > 0: self.fileobj.write(NUL * (BLOCKSIZE - remainder)) blocks += 1 self.offset += blocks * BLOCKSIZE self.members.append(tarinfo)
def do_get(self, line): ''' Command: get Description: Get (copy) a file, or parts of file, from the sensor. Args: get [OPTIONS] <RemotePath> <LocalPath> where OPTIONS are: -o, --offset : The offset to start getting the file at -b, --bytes : How many bytes of the file to get. The default is all bytes. ''' self._needs_attached() p = CliArgs(usage='get [OPTIONS] <RemoteFile> <LocalFile>') (opts, args) = p.parse_line(line) if len(args) != 2: raise CliArgsException("Wrong number of args to get command") with open(args[1], "wb") as fout: gfile = self._file_path_fixup(args[0]) shutil.copyfileobj(self.lr_session.get_raw_file(gfile), fout)
def download_url(url, dest_path): """ Download a file to a destination path given a URL """ name = url.rsplit('/')[-1] dest = dest_path + "/" + name try: response = urllib.request.urlopen(url) except (urllib.error.HTTPError, urllib.error.URLError): return False with open(dest, 'wb') as f: shutil.copyfileobj(response, f) return True # ---------------------------------------------------------- # # CI UTILS # # -----------------------------------------------------------
def ensure_dataset_exists(files, dirname): path = os.path.join("data", dirname) rv = [os.path.join(path, f) for f in files] logger.info("Retrieving dataset from {}".format(path)) if not os.path.exists(path): # Extract or download data try: os.makedirs(path) except OSError as exception: if exception.errno != errno.EEXIST: raise for f, file_path in zip(files, rv): data_url = BASE_URL + dirname + "/" + f if not os.path.exists(file_path): logger.warn("Downloading {}".format(data_url)) with urllib3.PoolManager().request('GET', data_url, preload_content=False) as r, \ open(file_path, 'wb') as w: shutil.copyfileobj(r, w) return rv # Convert data into a stream of never-ending data
def decompress(fname, remove_compressed=True): """ Decompress *fname* and return the file name without the compression suffix, e.g., .gz. If *remove_compressed*, the compressed file is deleted after it is decompressed. """ if fname.endswith('.gz'): uncompressed_fname = fname[:-3] logger.info('gunzip {} to {}'.format(fname, uncompressed_fname)) with gzip.open(fname) as in_fid, open(uncompressed_fname, 'w') as out_fid: shutil.copyfileobj(in_fid, out_fid) if remove_compressed: logger.debug('removing {}'.format(fname)) os.remove(fname) return uncompressed_fname else: return fname
def download_model(lang, paths): model_folder = join(paths.user_config, 'model') model_en_folder = join(model_folder, lang) if not isdir(model_folder): mkdir(model_folder) if not isdir(model_en_folder): mkdir(model_en_folder) file_name = paths.model_dir + '.tar.gz' if not isfile(file_name): import urllib.request import shutil url = 'https://github.com/MatthewScholefield/pocketsphinx-models/raw/master/' + lang + '.tar.gz' with urllib.request.urlopen(url) as response, open(file_name, 'wb') as file: shutil.copyfileobj(response, file) import tarfile tar = tarfile.open(file_name) tar.extractall(path=model_en_folder) tar.close()
def download_links(self, dir_path): """Download web pages or images from search result links. Args: dir_path (str): Path of directory to save downloads of :class:`api.results`.links """ links = self.links if not path.exists(dir_path): makedirs(dir_path) for i, url in enumerate(links): if 'start' in self.cseargs: i += int(self.cseargs['start']) ext = self.cseargs['fileType'] ext = '.html' if ext == '' else '.' + ext file_name = self.cseargs['q'].replace(' ', '_') + '_' + str(i) + ext file_path = path.join(dir_path, file_name) r = requests.get(url, stream=True) if r.status_code == 200: with open(file_path, 'wb') as f: r.raw.decode_content = True shutil.copyfileobj(r.raw, f)
def getFile(link): try: source = urllib2.urlopen(link) except(urllib2.HTTPError),msg: print "\nError:",msg sys.exit() num = 1 file = 'tmp_insidepropw_'+link.split('=')[1]+'.txt' while os.path.isfile(file) == True: file = link.rsplit("/",1)[1]+"."+str(num) num+=1 try: shutil.copyfileobj(source, open(file, "w+")) except(IOError): print "\nCannot write to `"+file+"' (Permission denied)." sys.exit(1) print "File downloaded", file newfilelist.append(file)
def generate_readme(for_pdf): with open('README.md', 'wb') as wfd: if for_pdf: with open('programme/header.md') as head: head_txt = head.read() head_txt = head_txt.partition('<p align="center"><img src="/annexes/photo.jpg" alt="Vincent Lamotte" title="Photo de Vincent Lamotte" width="300"></p>') wfd.write(bytes(head_txt[0], encoding='utf-8')) wfd.write(bytes('![Vincent Lamotte](annexes/photo.jpg){#id ' '.class ' 'width=280 text-align=center}', encoding='utf-8')) wfd.write(bytes(head_txt[2], encoding='utf-8')) else: programmeFiles.insert(0, 'header.md') for f in programmeFiles: with open('programme/' + f, 'rb') as fd: shutil.copyfileobj(fd, wfd)
def find_xorpad(titleid, crc32): expectedname = "%s.%08lx.Main.exheader.xorpad" % (titleid, crc32) legacyname = titleid + ".Main.exheader.xorpad" xorpads = glob.glob(os.path.join("xorpads", "*.[xX][oO][rR][pP][aA][dD]")) xorpads += glob.glob(os.path.join("xorpads", "*.[zZ][iI][pP]")) for xorpad in xorpads: if zipfile.is_zipfile(xorpad): with zipfile.ZipFile(xorpad, "r") as e: for entry in e.infolist(): filename = os.path.join(tmpdir, expectedname) basename = os.path.basename(entry.filename) if basename.lower() == expectedname.lower(): source = e.open(entry, "r") target = file(filename, "wb") with source, target: shutil.copyfileobj(source, target) return filename else: basename = os.path.basename(xorpad) if basename.lower() == expectedname.lower() or \ basename.lower() == legacyname.lower(): return xorpad
def concat_job(job, options, file_ids): """ Merge zero or more VG protobuf files into one by concatenation. Returns the merged file ID. """ with job.fileStore.writeGlobalFileStream() as (cat_handle, cat_id): # Make one merged file for part_id in file_ids: # For each part file with job.fileStore.readGlobalFileStream(part_id) as part_handle: # Open it # And stream it to the combined file shutil.copyfileobj(part_handle, cat_handle) return cat_id
def write(self, inner_path, content): file_path = self.getPath(inner_path) # Create dir if not exist file_dir = os.path.dirname(file_path) if not os.path.isdir(file_dir): os.makedirs(file_dir) # Write file if hasattr(content, 'read'): # File-like object with open(file_path, "wb") as file: shutil.copyfileobj(content, file) # Write buff to disk else: # Simple string with open(file_path, "wb") as file: file.write(content) del content self.onUpdated(inner_path) # Remove file from filesystem
def get_local_filename(self): """ get_local_filename() If the filename is an existing file on this filesystem, return that. Otherwise a temporary file is created on the local file system which can be used by the format to read from or write to. """ if self._uri_type == URI_FILENAME: return self._filename else: # Get filename ext = os.path.splitext(self._filename)[1] self._filename_local = tempfile.mktemp(ext, 'imageio_') # Write stuff to it? if self.mode[0] == 'r': with open(self._filename_local, 'wb') as file: shutil.copyfileobj(self.get_file(), file) return self._filename_local
def copyfileobj(fsrc, fdst): """ Copy the contents of the file-like object fsrc to the file-like object fdst. :Arguments: fsrc - file descriptor of the file to be copied fdst - file descriptor of the file on which to be copied :Return: True/False - based on the success/failure of the operation """ status = False try: shutil.copyfileobj(fsrc, fdst) status = True except Exception as e: print_error("copying file {} to file {} raised exception {}". format(fsrc, fdst, str(e))) return status
def upload(url, filename=None): from urllib.request import Request, urlopen from urllib.parse import urlsplit import shutil def getFilename(url,openUrl): if 'Content-Disposition' in openUrl.info(): # If the response has Content-Disposition, try to get filename from it cd = dict([x.strip().split('=') if '=' in x else (x.strip(),'') for x in openUrl.info().split(';')]) if 'filename' in cd: fname = cd['filename'].strip("\"'") if fname: return fname # if no filename was found above, parse it out of the final URL. return os.path.basename(urlsplit(openUrl.url)[2]) r = urlopen(Request(url)) success = None try: filename = filename or "/tmp/%s" % getFilename(url,r) with open(filename, 'wb') as f: shutil.copyfileobj(r,f) success = filename finally: r.close() return success
def main(): print HTML_HEADER print HEAD data = cgi.FieldStorage() fileds = data['file'] if fileds.filename.endswith('.jpg') or fileds.filename.endswith('.png') or fileds.filename.endswith('.jpeg') or fileds.filename.endswith('.tiff') and fileds.filename.count('/') == -1: os.chdir('files') with open(fileds.filename, 'wb') as fout: shutil.copyfileobj(fileds.file, fout, 100000) os.chdir('../') # do NOT touch above code if fileds.filename.endswith('.png'): print lsb.reveal("files/"+fileds.filename) if fileds.filename.endswith('.jpg') or fileds.filename.endswith('.jpeg'): print exifHeader.reveal("files/"+fileds.filename) print "<p>Attempted to decode.</p>" print END
def main(): print HTML_HEADER print HEAD data = cgi.FieldStorage() fileds = data['file'] if fileds.filename.endswith('.jpg') or fileds.filename.endswith('.png') or fileds.filename.endswith('.jpeg') or fileds.filename.endswith('.tiff') and fileds.filename.count('/') == -1: os.chdir('files') with open(fileds.filename, 'wb') as fout: shutil.copyfileobj(fileds.file, fout, 100000) os.chdir('../') # do NOT touch above code if fileds.filename.endswith('.png'): sec = lsb.hide('files/'+fileds.filename, data['message'].value) sec.save('files/'+fileds.filename) if fileds.filename.endswith('.jpg') or fileds.filename.endswith('.jpeg'): secret = exifHeader.hide('files/'+fileds.filename, 'files/'+fileds.filename, secret_message=data['message'].value) print "Successfully generated." print '<a href="http://jonathanwong.koding.io/bstego/files/'+fileds.filename+'">Link here</a>' print END
def upload_file(self, fn): '''Upload log file to s3''' target = fn + '.gz' s3_client = boto3.client('s3') try: with open(fn, 'rb') as f_in, gzip.open(target, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) with open(target, 'rb') as fd: s3_client.put_object( Body=fd, Bucket=self.bucket, Key=self.prefix + '/' + os.path.basename(target)) finally: if os.path.exists(target): os.remove(target)
def save_image_to_s3(self): """TODO""" import boto s3_connection = boto.connect_s3() bucket = s3_connection.get_bucket('endorsementdb.com') url = self.get_large_image() response = requests.get(url, stream=True) with open('/tmp/profile_image.png', 'wb') as out_file: shutil.copyfileobj(response.raw, out_file) del response key = bucket.new_key('images/endorsers/%d.png' % self.endorser.pk) key.set_contents_from_filename(out_file.name) key.make_public()
def handle(self, *args, **options): s3_connection = boto.connect_s3() bucket = s3_connection.get_bucket('endorsementdb.com') usernames = options['usernames'] for username in usernames: account = Account.objects.get_from_username(username) endorser = account.endorser url = account.get_large_image() print url, endorser.name response = requests.get(url, stream=True) with open('/tmp/profile_image.png', 'wb') as out_file: shutil.copyfileobj(response.raw, out_file) del response key = bucket.new_key('images/endorsers/%d.png' % endorser.pk) key.set_contents_from_filename(out_file.name) key.make_public()
def copyfileobj(src, dst, length=None): """Copy length bytes from fileobj src to fileobj dst. If length is None, copy the entire content. """ if length == 0: return if length is None: shutil.copyfileobj(src, dst) return bufsize = 16 * 1024 blocks, remainder = divmod(length, bufsize) for _ in range(blocks): buf = src.read(bufsize) if len(buf) < bufsize: raise IOError("end of file reached") dst.write(buf) if remainder != 0: buf = src.read(remainder) if len(buf) < remainder: raise IOError("end of file reached") dst.write(buf) return
def makefile(self, cpioinfo, cpiogetpath): """Make a file called cpiogetpath. """ extractinfo = None if cpioinfo.nlink == 1: extractinfo = cpioinfo else: if cpioinfo.ino in self.inodes: # actual file exists, create link # FIXME handle platforms that don't support hardlinks os.link(os.path.join(cpioinfo._link_path, self.inodes[cpioinfo.ino][0]), cpiogetpath) else: extractinfo = self._datamember(cpioinfo) if cpioinfo.ino not in self.inodes: self.inodes[cpioinfo.ino] = [] self.inodes[cpioinfo.ino].append(cpioinfo.name) if extractinfo: source = self.extractfile(extractinfo) cpioget = open(cpiogetpath, "wb") copyfileobj(source, cpioget) source.close() cpioget.close()
def overwrite(filepage, msg, res, path): filepage._file_revisions.clear() if not filepage.get_file_history(): pywikibot.warning("Page doesn't exist, skipping upload.") return with tempfile.NamedTemporaryFile() as tmp: with open(path, 'rb') as old: shutil.copyfileobj(old, tmp) tmp.truncate(res[0]['pos']) retry_apierror( lambda: filepage.upload(tmp.name, comment=MESSAGE_PREFIX+msg, ignore_warnings=True) )
def see(url): import requests import shutil path = str(random.random()) image = requests.get(url, stream=True) if image.status_code == 200: with open(path, 'wb') as tmpfile: image.raw.decode_content = True shutil.copyfileobj(image.raw, tmpfile) res = process(path) os.remove(path) return res
def record_frame(self, image_buffer, angle, throttle): ''' Record a single image buffer, with frame index, angle and throttle values as its filename ''' # throttle is inversed, i.e. forward is negative, backwards positive # we are only interested in forward values of throttle # angle is counter-clockwise, i.e. left is positive # TODO: make a proper value mapping here, and then transform if (throttle * -1.0 < config.recording.throttle_threshold or abs(angle) < config.recording.steering_threshold): self.is_recording = False return self.is_recording = True file_angle = int(angle * 10) file_throttle = int(throttle * 1000) filepath = self.create_img_filepath( self.instance_path, self.frame_count, file_angle, file_throttle) with open(filepath, 'w') as fd: image_buffer.seek(0) shutil.copyfileobj(image_buffer, fd, -1) self.frame_count += 1
def remove_line(fname, line): '''Remove line from file by creating a temporary file containing all lines from original file except those matching the given line, then copying the temporary file back into the original file, overwriting its contents. ''' with lockfile.FileLock(fname): tmp = tempfile.TemporaryFile() fp = open(fname, 'rw+') # write all lines from orig file, except if matches given line for l in fp: if l.strip() != line: tmp.write(l) # reset file pointers so entire files are copied fp.seek(0) tmp.seek(0) # copy tmp into fp, then truncate to remove trailing line(s) shutil.copyfileobj(tmp, fp) fp.truncate() fp.close() tmp.close()
def test_fs_replicas(self): mock_clients = MockClients(self.user) with mock.patch('main.models.User.get_clients', mock_clients.get_clients): fs = get_fs(self.user, chunk_size=3, replicas=2) with BytesIO(TEST_FILE) as f: file = fs.upload('/foo', f) mock_clients.clients[2].data.clear() self.assertEqual('/foo', file.path) with BytesIO() as o: with fs.download('/foo') as f: shutil.copyfileobj(f, o) self.assertEqual(TEST_FILE, o.getvalue()) with self.assertRaises(FileNotFoundError): fs.download('/barfoo') fs.delete('/foo')
def build(self): if not os.path.exists(self.srcPath): raise HTTPError('No such file', 404) if os.path.isdir(self.srcPath): raise HTTPError('Is a directory: %s' % self.srcPath, 401) self.handler.send_response(200) self.handler.send_header('Content-Type', 'application/octet-stream') self.handler.send_header('Content-Disposition', 'attachment; filename=%s' % os.path.split(self.srcPath)[-1]) self.handler.send_header('Content-Length', str(os.stat(self.srcPath).st_size)) self.handler.end_headers() with open(self.srcPath, 'rb') as src: shutil.copyfileobj(src, self.handler.wfile) super(DownloadBuilder, self).build()
def http_download(url, target_path): """Download file to local Args: - url(string): url request path - target_path(string): download destination Raises: IOError urllib2.URLError """ try: resp = urllib2.urlopen(url) except urllib2.URLError, e: if not hasattr(e, 'code'): raise resp = e if resp.code != 200: raise IOError("Request url(%s) expect 200 but got %d" %(url, resp.code)) with open(target_path, 'wb') as f: shutil.copyfileobj(resp, f) return target_path
def download_url(module, url, dest): ''' :param url: the URL to download :param dest: the absolute path of where to save the downloaded content to; it must be writable and not a directory :raises ModuleError ''' # Hack to add params in the form that fetch_url expects module.params['http_agent'] = USERAGENT response, info = fetch_url(module, url) if info['status'] != 200: raise ModuleError("Failed to get %s: %s" % (url, info['msg'])) try: with open(dest, 'w') as f: shutil.copyfileobj(response, f) except IOError as e: raise ModuleError("Failed to write: %s" % str(e))
def fix_synctex(self, project_directory, compiled_path_relative_to_project_path, filename): old_synctex = project_directory+'/'+compiled_path_relative_to_project_path+'/'+filename+'.synctex' new_synctex = project_directory+'/'+compiled_path_relative_to_project_path+'/'+filename+'.synctex.new' # if os.path.isfile(old_synctex): f1 = open(old_synctex, 'r') f2 = open(new_synctex, 'w') # project_path_relative_to_compiled_path = os.path.relpath(project_directory, project_directory+'/'+compiled_path_relative_to_project_path) for line in f1: f2.write(line.replace(os.path.abspath(project_directory), project_path_relative_to_compiled_path)) # f1.close() f2.close() os.remove(old_synctex) #os.rename(new_synctex, old_synctex) # with open(new_synctex, 'rb') as f_in, gzip.open(old_synctex+'.gz', 'wb') as f_out: shutil.copyfileobj(f_in, f_out) # os.remove(new_synctex) # # #
def unzip_nifti(input_filename, output_filename): """Unzips the given nifti file. This will create the output directories if they do not yet exist. Args: input_filename (str): the nifti file we would like to unzip. Should have the extension ``.gz``. output_filename (str): the location for the output file. Should have the extension ``.nii``. Raises: ValueError: if the extensions of either the input or output filename are not correct. """ if not os.path.exists(os.path.dirname(output_filename)): os.makedirs(os.path.dirname(output_filename)) if not input_filename.rstrip().endswith('.gz') or not output_filename.rstrip().endswith('.nii'): raise ValueError('The input filename should have extension ".gz" and the ' 'output filename should have extension ".nii".') with gzip.open(input_filename, 'rb') as f_in, open(output_filename, 'wb') as f_out: shutil.copyfileobj(f_in, f_out)
def fetch(config, fileobj): """ Fetch transactions for the Visa card specified in the config. We start by logging in to fidelity.com, then click through some menus to transfer credentials to Elan Financial Services' site fidelityrewards.com, where we download transactions for the past 17-18 months in CSV format. """ *_, lastfour = config account_name = f'Fidelity Visa {lastfour.value}' fileobj.write(account_name + '\n') with tempfile.TemporaryDirectory() as tempdir: csv_path, balance = _download(config, tempdir) fileobj.write(balance + '\n') with open(csv_path, 'r') as csv_file: shutil.copyfileobj(csv_file, fileobj)
def copyfileobj(src, dst, length=None): """Copy length bytes from fileobj src to fileobj dst. If length is None, copy the entire content. """ if length == 0: return if length is None: shutil.copyfileobj(src, dst) return BUFSIZE = 16 * 1024 blocks, remainder = divmod(length, BUFSIZE) for b in range(blocks): buf = src.read(BUFSIZE) if len(buf) < BUFSIZE: raise IOError("end of file reached") dst.write(buf) if remainder != 0: buf = src.read(remainder) if len(buf) < remainder: raise IOError("end of file reached") dst.write(buf) return
def fetch_files(data, filepath_dict): ''' Fetch the files given by urls in data['style'] and data['content'] and save them to the corresponding file paths given in filepath_dict. ''' logger.info('Fetching remote files') for key, filepath in filepath_dict.items(): if key != settings.OUTPUT_SUFFIX: file_url = data[key] logger.info('Fetching remote {} file: {}'.format(key, file_url)) response = requests.get(file_url, stream=True) if response.status_code == 200: with open(filepath, 'wb') as outfile: response.raw.decode_content = True shutil.copyfileobj(response.raw, outfile) else: raise FileNotFoundError('Received 404 when fetching {}'.format(file_url))
def download_image(url, path): if os.path.exists(path): return True headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.101 Safari/537.36' } try: r = requests.get(url, stream=True, timeout=9, headers=headers) if r.status_code == 200: with open(path, 'wb') as f: r.raw.decode_content = True shutil.copyfileobj(r.raw, f) return True else: print(("Could not download image %s, response %d" % (url, r.status_code))) except Exception as e: if hasattr(e, 'message'): print(("Could not download image %s due to %s" % (url, e.message))) else: print(("Could not download image %s due to %s" % (url, repr(e)))) return False