我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用boto.storage_uri_for_key()。
def _attempt_resumable_download(self, key, fp, headers, cb, num_cb, torrent, version_id, hash_algs): """ Attempts a resumable download. Raises ResumableDownloadException if any problems occur. """ cur_file_size = get_cur_file_size(fp, position_to_eof=True) if (cur_file_size and self.etag_value_for_current_download and self.etag_value_for_current_download == key.etag.strip('"\'')): # Try to resume existing transfer. if cur_file_size > key.size: raise ResumableDownloadException( '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' 'if you re-try this download it will start from scratch' % (fp.name, cur_file_size, str(storage_uri_for_key(key)), key.size), ResumableTransferDisposition.ABORT) elif cur_file_size == key.size: if key.bucket.connection.debug >= 1: print('Download complete.') return if key.bucket.connection.debug >= 1: print('Resuming download.') headers = headers.copy() headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1) cb = ByteTranslatingCallbackHandler(cb, cur_file_size).call self.download_start_point = cur_file_size else: if key.bucket.connection.debug >= 1: print('Starting new resumable download.') self._save_tracker_info(key) self.download_start_point = 0 # Truncate the file, in case a new resumable download is being # started atop an existing file. fp.truncate(0) # Disable AWSAuthConnection-level retry behavior, since that would # cause downloads to restart from scratch. if isinstance(key, GSKey): key.get_file(fp, headers, cb, num_cb, torrent, version_id, override_num_retries=0, hash_algs=hash_algs) else: key.get_file(fp, headers, cb, num_cb, torrent, version_id, override_num_retries=0) fp.flush()
def _attempt_resumable_download(self, key, fp, headers, cb, num_cb, torrent, version_id): """ Attempts a resumable download. Raises ResumableDownloadException if any problems occur. """ cur_file_size = get_cur_file_size(fp, position_to_eof=True) if (cur_file_size and self.etag_value_for_current_download and self.etag_value_for_current_download == key.etag.strip('"\'')): # Try to resume existing transfer. if cur_file_size > key.size: raise ResumableDownloadException( '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' 'if you re-try this download it will start from scratch' % (fp.name, cur_file_size, str(storage_uri_for_key(key)), key.size), ResumableTransferDisposition.ABORT) elif cur_file_size == key.size: if key.bucket.connection.debug >= 1: print 'Download complete.' return if key.bucket.connection.debug >= 1: print 'Resuming download.' headers = headers.copy() headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1) cb = ByteTranslatingCallbackHandler(cb, cur_file_size).call self.download_start_point = cur_file_size else: if key.bucket.connection.debug >= 1: print 'Starting new resumable download.' self._save_tracker_info(key) self.download_start_point = 0 # Truncate the file, in case a new resumable download is being # started atop an existing file. fp.truncate(0) # Disable AWSAuthConnection-level retry behavior, since that would # cause downloads to restart from scratch. key.get_file(fp, headers, cb, num_cb, torrent, version_id, override_num_retries=0) fp.flush()