我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用boto.storage_uri()。
def test_provider_uri(self): for prov in ('gs', 's3'): uri_str = '%s://' % prov uri = boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) self.assertEqual(prov, uri.scheme) self.assertEqual(uri_str, uri.uri) self.assertFalse(hasattr(uri, 'versionless_uri')) self.assertEqual('', uri.bucket_name) self.assertEqual('', uri.object_name) self.assertEqual(None, uri.version_id) self.assertEqual(None, uri.generation) self.assertEqual(uri.names_provider(), True) self.assertEqual(uri.names_container(), True) self.assertEqual(uri.names_bucket(), False) self.assertEqual(uri.names_object(), False) self.assertEqual(uri.names_directory(), False) self.assertEqual(uri.names_file(), False) self.assertEqual(uri.is_stream(), False) self.assertEqual(uri.is_version_specific, False)
def test_bucket_uri_no_trailing_slash(self): for prov in ('gs', 's3'): uri_str = '%s://bucket' % prov uri = boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) self.assertEqual(prov, uri.scheme) self.assertEqual('%s/' % uri_str, uri.uri) self.assertFalse(hasattr(uri, 'versionless_uri')) self.assertEqual('bucket', uri.bucket_name) self.assertEqual('', uri.object_name) self.assertEqual(None, uri.version_id) self.assertEqual(None, uri.generation) self.assertEqual(uri.names_provider(), False) self.assertEqual(uri.names_container(), True) self.assertEqual(uri.names_bucket(), True) self.assertEqual(uri.names_object(), False) self.assertEqual(uri.names_directory(), False) self.assertEqual(uri.names_file(), False) self.assertEqual(uri.is_stream(), False) self.assertEqual(uri.is_version_specific, False)
def test_bucket_uri_with_trailing_slash(self): for prov in ('gs', 's3'): uri_str = '%s://bucket/' % prov uri = boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) self.assertEqual(prov, uri.scheme) self.assertEqual(uri_str, uri.uri) self.assertFalse(hasattr(uri, 'versionless_uri')) self.assertEqual('bucket', uri.bucket_name) self.assertEqual('', uri.object_name) self.assertEqual(None, uri.version_id) self.assertEqual(None, uri.generation) self.assertEqual(uri.names_provider(), False) self.assertEqual(uri.names_container(), True) self.assertEqual(uri.names_bucket(), True) self.assertEqual(uri.names_object(), False) self.assertEqual(uri.names_directory(), False) self.assertEqual(uri.names_file(), False) self.assertEqual(uri.is_stream(), False) self.assertEqual(uri.is_version_specific, False)
def test_non_versioned_object_uri(self): for prov in ('gs', 's3'): uri_str = '%s://bucket/obj/a/b' % prov uri = boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) self.assertEqual(prov, uri.scheme) self.assertEqual(uri_str, uri.uri) self.assertEqual(uri_str, uri.versionless_uri) self.assertEqual('bucket', uri.bucket_name) self.assertEqual('obj/a/b', uri.object_name) self.assertEqual(None, uri.version_id) self.assertEqual(None, uri.generation) self.assertEqual(uri.names_provider(), False) self.assertEqual(uri.names_container(), False) self.assertEqual(uri.names_bucket(), False) self.assertEqual(uri.names_object(), True) self.assertEqual(uri.names_directory(), False) self.assertEqual(uri.names_file(), False) self.assertEqual(uri.is_stream(), False) self.assertEqual(uri.is_version_specific, False)
def test_versioned_gs_object_uri(self): uri_str = 'gs://bucket/obj/a/b#1359908801674000' uri = boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) self.assertEqual('gs', uri.scheme) self.assertEqual(uri_str, uri.uri) self.assertEqual('gs://bucket/obj/a/b', uri.versionless_uri) self.assertEqual('bucket', uri.bucket_name) self.assertEqual('obj/a/b', uri.object_name) self.assertEqual(None, uri.version_id) self.assertEqual(1359908801674000, uri.generation) self.assertEqual(uri.names_provider(), False) self.assertEqual(uri.names_container(), False) self.assertEqual(uri.names_bucket(), False) self.assertEqual(uri.names_object(), True) self.assertEqual(uri.names_directory(), False) self.assertEqual(uri.names_file(), False) self.assertEqual(uri.is_stream(), False) self.assertEqual(uri.is_version_specific, True)
def test_versioned_s3_object_uri(self): uri_str = 's3://bucket/obj/a/b#eMuM0J15HkJ9QHlktfNP5MfA.oYR2q6S' uri = boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) self.assertEqual('s3', uri.scheme) self.assertEqual(uri_str, uri.uri) self.assertEqual('s3://bucket/obj/a/b', uri.versionless_uri) self.assertEqual('bucket', uri.bucket_name) self.assertEqual('obj/a/b', uri.object_name) self.assertEqual('eMuM0J15HkJ9QHlktfNP5MfA.oYR2q6S', uri.version_id) self.assertEqual(None, uri.generation) self.assertEqual(uri.names_provider(), False) self.assertEqual(uri.names_container(), False) self.assertEqual(uri.names_bucket(), False) self.assertEqual(uri.names_object(), True) self.assertEqual(uri.names_directory(), False) self.assertEqual(uri.names_file(), False) self.assertEqual(uri.is_stream(), False) self.assertEqual(uri.is_version_specific, True)
def test_explicit_file_uri(self): tmp_dir = tempfile.tempdir or '' uri_str = 'file://%s' % urllib.request.pathname2url(tmp_dir) uri = boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) self.assertEqual('file', uri.scheme) self.assertEqual(uri_str, uri.uri) self.assertFalse(hasattr(uri, 'versionless_uri')) self.assertEqual('', uri.bucket_name) self.assertEqual(tmp_dir, uri.object_name) self.assertFalse(hasattr(uri, 'version_id')) self.assertFalse(hasattr(uri, 'generation')) self.assertFalse(hasattr(uri, 'is_version_specific')) self.assertEqual(uri.names_provider(), False) self.assertEqual(uri.names_bucket(), False) # Don't check uri.names_container(), uri.names_directory(), # uri.names_file(), or uri.names_object(), because for file URIs these # functions look at the file system and apparently unit tests run # chroot'd. self.assertEqual(uri.is_stream(), False)
def test_implicit_file_uri(self): tmp_dir = tempfile.tempdir or '' uri_str = '%s' % urllib.request.pathname2url(tmp_dir) uri = boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) self.assertEqual('file', uri.scheme) self.assertEqual('file://%s' % tmp_dir, uri.uri) self.assertFalse(hasattr(uri, 'versionless_uri')) self.assertEqual('', uri.bucket_name) self.assertEqual(tmp_dir, uri.object_name) self.assertFalse(hasattr(uri, 'version_id')) self.assertFalse(hasattr(uri, 'generation')) self.assertFalse(hasattr(uri, 'is_version_specific')) self.assertEqual(uri.names_provider(), False) self.assertEqual(uri.names_bucket(), False) # Don't check uri.names_container(), uri.names_directory(), # uri.names_file(), or uri.names_object(), because for file URIs these # functions look at the file system and apparently unit tests run # chroot'd. self.assertEqual(uri.is_stream(), False)
def test_gs_object_uri_contains_sharp_not_matching_version_syntax(self): uri_str = 'gs://bucket/obj#13a990880167400' uri = boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) self.assertEqual('gs', uri.scheme) self.assertEqual(uri_str, uri.uri) self.assertEqual('gs://bucket/obj#13a990880167400', uri.versionless_uri) self.assertEqual('bucket', uri.bucket_name) self.assertEqual('obj#13a990880167400', uri.object_name) self.assertEqual(None, uri.version_id) self.assertEqual(None, uri.generation) self.assertEqual(uri.names_provider(), False) self.assertEqual(uri.names_container(), False) self.assertEqual(uri.names_bucket(), False) self.assertEqual(uri.names_object(), True) self.assertEqual(uri.names_directory(), False) self.assertEqual(uri.names_file(), False) self.assertEqual(uri.is_stream(), False) self.assertEqual(uri.is_version_specific, False)
def testHasVersion(self): uri = storage_uri("gs://bucket/obj") self.assertFalse(uri.has_version()) uri.version_id = "versionid" self.assertTrue(uri.has_version()) uri = storage_uri("gs://bucket/obj") # Generation triggers versioning. uri.generation = 12345 self.assertTrue(uri.has_version()) uri.generation = None self.assertFalse(uri.has_version()) # Zero-generation counts as a version. uri = storage_uri("gs://bucket/obj") uri.generation = 0 self.assertTrue(uri.has_version())
def testPropertiesUpdated(self): b = self._MakeBucket() bucket_uri = storage_uri("gs://%s" % b.name) key_uri = bucket_uri.clone_replace_name("obj") key_uri.set_contents_from_string("data1") self.assertRegexpMatches(str(key_uri.generation), r"[0-9]+") k = b.get_key("obj") self.assertEqual(k.generation, key_uri.generation) self.assertEquals(k.get_contents_as_string(), "data1") key_uri.set_contents_from_stream(StringIO.StringIO("data2")) self.assertRegexpMatches(str(key_uri.generation), r"[0-9]+") self.assertGreater(key_uri.generation, k.generation) k = b.get_key("obj") self.assertEqual(k.generation, key_uri.generation) self.assertEquals(k.get_contents_as_string(), "data2") key_uri.set_contents_from_file(StringIO.StringIO("data3")) self.assertRegexpMatches(str(key_uri.generation), r"[0-9]+") self.assertGreater(key_uri.generation, k.generation) k = b.get_key("obj") self.assertEqual(k.generation, key_uri.generation) self.assertEquals(k.get_contents_as_string(), "data3")
def test_cors_xml_storage_uri(self): """Test setting and getting of CORS XML documents with storage_uri.""" # create a new bucket bucket = self._MakeBucket() bucket_name = bucket.name uri = storage_uri('gs://' + bucket_name) # get new bucket cors and make sure it's empty cors = re.sub(r'\s', '', uri.get_cors().to_xml()) self.assertEqual(cors, CORS_EMPTY) # set cors document on new bucket cors_obj = Cors() h = handler.XmlHandler(cors_obj, None) xml.sax.parseString(CORS_DOC, h) uri.set_cors(cors_obj) cors = re.sub(r'\s', '', uri.get_cors().to_xml()) self.assertEqual(cors, CORS_DOC)
def test_storage_uri_regionless(self): # First, create a bucket in a different region. conn = S3Connection( host='s3-us-west-2.amazonaws.com' ) bucket_name = 'keytest-%d' % int(time.time()) bucket = conn.create_bucket(bucket_name, location=Location.USWest2) self.addCleanup(self.nuke_bucket, bucket) # Now use ``storage_uri`` to try to make a new key. # This would throw a 301 exception. suri = boto.storage_uri('s3://%s/test' % bucket_name) the_key = suri.new_key() the_key.key = 'Test301' the_key.set_contents_from_string( 'This should store in a different region.' ) # Check it a different way. alt_conn = boto.connect_s3(host='s3-us-west-2.amazonaws.com') alt_bucket = alt_conn.get_bucket(bucket_name) alt_key = alt_bucket.get_key('Test301')
def resetConnection(self): import boto if getattr(self, 'conn', False): self.conn.close() self.bucket = None self.conn = None self.storage_uri = None del self.conn del self.storage_uri self.storage_uri = boto.storage_uri(self.boto_uri_str) self.conn = get_connection(self.scheme, self.parsed_url, self.storage_uri) if not self.conn.lookup(self.bucket_name): self.bucket = self.conn.create_bucket(self.bucket_name, location=self.my_location) else: self.bucket = self.conn.get_bucket(self.bucket_name)
def get_uri(self): return boto.storage_uri(self.path, 'gs')
def test_roundtrip_versioned_gs_object_uri_parsed(self): uri_str = 'gs://bucket/obj#1359908801674000' uri = boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) roundtrip_uri = boto.storage_uri(uri.uri, validate=False, suppress_consec_slashes=False) self.assertEqual(uri.uri, roundtrip_uri.uri) self.assertEqual(uri.is_version_specific, True)
def test_invalid_scheme(self): uri_str = 'mars://bucket/object' try: boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) except InvalidUriError as e: self.assertIn('Unrecognized scheme', e.message)
def testCloneReplaceKey(self): b = self._MakeBucket() k = b.new_key("obj") k.set_contents_from_string("stringdata") orig_uri = storage_uri("gs://%s/" % b.name) uri = orig_uri.clone_replace_key(k) self.assertTrue(uri.has_version()) self.assertRegexpMatches(str(uri.generation), r"[0-9]+")
def testSetAclXml(self): """Ensures that calls to the set_xml_acl functions succeed.""" b = self._MakeBucket() k = b.new_key("obj") k.set_contents_from_string("stringdata") bucket_uri = storage_uri("gs://%s/" % b.name) # Get a valid ACL for an object. bucket_uri.object_name = "obj" bucket_acl = bucket_uri.get_acl() bucket_uri.object_name = None # Add a permission to the ACL. all_users_read_permission = ("<Entry><Scope type='AllUsers'/>" "<Permission>READ</Permission></Entry>") acl_string = re.sub(r"</Entries>", all_users_read_permission + "</Entries>", bucket_acl.to_xml()) # Test-generated owner IDs are not currently valid for buckets acl_no_owner_string = re.sub(r"<Owner>.*</Owner>", "", acl_string) # Set ACL on an object. bucket_uri.set_xml_acl(acl_string, "obj") # Set ACL on a bucket. bucket_uri.set_xml_acl(acl_no_owner_string) # Set the default ACL for a bucket. bucket_uri.set_def_xml_acl(acl_no_owner_string) # Verify all the ACLs were successfully applied. new_obj_acl_string = k.get_acl().to_xml() new_bucket_acl_string = bucket_uri.get_acl().to_xml() new_bucket_def_acl_string = bucket_uri.get_def_acl().to_xml() self.assertRegexpMatches(new_obj_acl_string, r"AllUsers") self.assertRegexpMatches(new_bucket_acl_string, r"AllUsers") self.assertRegexpMatches(new_bucket_def_acl_string, r"AllUsers")
def test_default_object_acls_storage_uri(self): """Test default object acls using storage_uri.""" # create a new bucket bucket = self._MakeBucket() bucket_name = bucket.name uri = storage_uri('gs://' + bucket_name) # get default acl and make sure it's project-private acl = uri.get_def_acl() self.assertIsNotNone(re.search(PROJECT_PRIVATE_RE, acl.to_xml())) # set default acl to a canned acl and verify it gets set uri.set_def_acl('public-read') acl = uri.get_def_acl() # save public-read acl for later test public_read_acl = acl self.assertEqual(acl.to_xml(), ('<AccessControlList><Entries><Entry>' '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' '</Entry></Entries></AccessControlList>')) # back to private acl uri.set_def_acl('private') acl = uri.get_def_acl() self.assertEqual(acl.to_xml(), '<AccessControlList></AccessControlList>') # set default acl to an xml acl and verify it gets set uri.set_def_acl(public_read_acl) acl = uri.get_def_acl() self.assertEqual(acl.to_xml(), ('<AccessControlList><Entries><Entry>' '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' '</Entry></Entries></AccessControlList>')) # back to private acl uri.set_def_acl('private') acl = uri.get_def_acl() self.assertEqual(acl.to_xml(), '<AccessControlList></AccessControlList>')
def test_lifecycle_config_storage_uri(self): """Test setting and getting of lifecycle config with storage_uri.""" # create a new bucket bucket = self._MakeBucket() bucket_name = bucket.name uri = storage_uri('gs://' + bucket_name) # get lifecycle config and make sure it's empty xml = uri.get_lifecycle_config().to_xml() self.assertEqual(xml, LIFECYCLE_EMPTY) # set lifecycle config lifecycle_config = LifecycleConfig() lifecycle_config.add_rule('Delete', None, LIFECYCLE_CONDITIONS) uri.configure_lifecycle(lifecycle_config) xml = uri.get_lifecycle_config().to_xml() self.assertEqual(xml, LIFECYCLE_DOC)
def get_bucket_list(bucket_name): uri = boto.storage_uri(bucket_name, 'gs') for obj in uri.get_bucket(): yield obj.name
def add_thumbnail(file_name): ''' add a thumbnail for the specific file''' src_uri = boto.storage_uri('{}/{}'.format(conf.photos_bucket_name, file_name), 'gs') dest_uri = boto.storage_uri('{}/{}'.format(conf.thumbnails_bucket_name, file_name), 'gs') try: new_key = dest_uri.new_key() except boto.exception.NoAuthHandlerFound as e: logging.error(e) return None # Create a file-like object for holding the photo contents. photo = StringIO.StringIO() src_uri.get_key().get_file(photo) thumbnail = StringIO.StringIO() im = Image.open(photo) im.thumbnail((260, 260)) im.save(thumbnail, 'JPEG') thumbnail.seek(0) # save the thumbnail try: new_key.set_contents_from_file(thumbnail) new_key.make_public() except boto.exception.GSResponseError as e: logging.error(e) # Do we have the credentials file set up? boto_cred_file = os.path.expanduser('~') + '/.boto' if not os.path.exists(boto_cred_file): logging.error('Credentials file {} was not found.'.format(boto_cred_file))
def upload_file(file_obj, bucket, file_oid, object_md, make_public=False): ''' Upload the file object to a bucket using credentials and object metadata. Object name is a part of its metadata. ''' if getpass.getuser() == 'bhs': boto_cred_file = '/home/bhs/.boto' else: boto_cred_file = os.path.expanduser('~') + '/.boto' fn = str(file_oid) dest_uri = boto.storage_uri(bucket + '/' + fn, 'gs') try: new_key = dest_uri.new_key() except boto.exception.NoAuthHandlerFound as e: print e.message return None new_key.update_metadata(object_md) try: new_key.set_contents_from_file(file_obj) if make_public: new_key.make_public() except boto.exception.GSResponseError as e: # Do we have the credentials file set up? if not os.path.exists(boto_cred_file): print('Credentials file {} was not found.'.format(boto_cred_file)) return None return str(dest_uri)
def save(self, data, name): ''' google storage code uri = boto.storage_uri(self.dest_bucket_name+name+'.json', 'gs') uri.new_key().set_contents_from_string(json.dumps(data)) ''' data['id'] = name data['tree_num'] = self.meta['num'] data['tree_size'] = self.meta['persons'] data['tree_file_id'] = self.meta['file_id'] self.onsave(data)
def __init__(self, parsed_url): import boto from boto.s3.connection import Location duplicity.backend.Backend.__init__(self, parsed_url) assert boto.Version >= BOTO_MIN_VERSION # This folds the null prefix and all null parts, which means that: # //MyBucket/ and //MyBucket are equivalent. # //MyBucket//My///My/Prefix/ and //MyBucket/My/Prefix are equivalent. self.url_parts = [x for x in parsed_url.path.split('/') if x != ''] if self.url_parts: self.bucket_name = self.url_parts.pop(0) else: # Duplicity hangs if boto gets a null bucket name. # HC: Caught a socket error, trying to recover raise BackendException('Boto requires a bucket name.') self.scheme = parsed_url.scheme if self.url_parts: self.key_prefix = '%s/' % '/'.join(self.url_parts) else: self.key_prefix = '' self.straight_url = duplicity.backend.strip_auth_from_url(parsed_url) self.parsed_url = parsed_url # duplicity and boto.storage_uri() have different URI formats. # boto uses scheme://bucket[/name] and specifies hostname on connect() self.boto_uri_str = '://'.join((parsed_url.scheme[:2], parsed_url.path.lstrip('/'))) if globals.s3_european_buckets: self.my_location = Location.EU else: self.my_location = '' self.resetConnection() self._listed_keys = {}
def _close(self): del self._listed_keys self._listed_keys = {} self.bucket = None self.conn = None self.storage_uri = None del self.conn del self.storage_uri
def __init__(self, config): self.__uri = boto.storage_uri(config.url) self.__key = config.key self.__secret = config.secret self.__bucket = None
def handle_uploaded_file(f, user): gcs_bucket = settings.GCS_BUCKET user_key = md5.md5(str(user.username)).hexdigest() # Sometimes things have paths in them? Windows used to do this. name = f.name.split("/")[-1] file_key = "%s/%s" % (user_key, name) uri = boto.storage_uri("%s/%s" % (gcs_bucket, file_key), "gs") uri.set_contents_from_string(f.read()) return "http://%s/%s" % (gcs_bucket, file_key)
def connect(self, access_key_id=None, secret_access_key=None, **kwargs): """ Opens a connection to appropriate provider, depending on provider portion of URI. Requires Credentials defined in boto config file (see boto/pyami/config.py). @type storage_uri: StorageUri @param storage_uri: StorageUri specifying a bucket or a bucket+object @rtype: L{AWSAuthConnection<boto.gs.connection.AWSAuthConnection>} @return: A connection to storage service provider of the given URI. """ connection_args = dict(self.connection_args or ()) if (hasattr(self, 'suppress_consec_slashes') and 'suppress_consec_slashes' not in connection_args): connection_args['suppress_consec_slashes'] = ( self.suppress_consec_slashes) connection_args.update(kwargs) if not self.connection: if self.scheme in self.provider_pool: self.connection = self.provider_pool[self.scheme] elif self.scheme == 's3': from boto.s3.connection import S3Connection self.connection = S3Connection(access_key_id, secret_access_key, **connection_args) self.provider_pool[self.scheme] = self.connection elif self.scheme == 'gs': from boto.gs.connection import GSConnection # Use OrdinaryCallingFormat instead of boto-default # SubdomainCallingFormat because the latter changes the hostname # that's checked during cert validation for HTTPS connections, # which will fail cert validation (when cert validation is # enabled). # # The same is not true for S3's HTTPS certificates. In fact, # we don't want to do this for S3 because S3 requires the # subdomain to match the location of the bucket. If the proper # subdomain is not used, the server will return a 301 redirect # with no Location header. # # Note: the following import can't be moved up to the # start of this file else it causes a config import failure when # run from the resumable upload/download tests. from boto.s3.connection import OrdinaryCallingFormat connection_args['calling_format'] = OrdinaryCallingFormat() self.connection = GSConnection(access_key_id, secret_access_key, **connection_args) self.provider_pool[self.scheme] = self.connection elif self.scheme == 'file': from boto.file.connection import FileConnection self.connection = FileConnection(self) else: raise InvalidUriError('Unrecognized scheme "%s"' % self.scheme) self.connection.debug = self.debug return self.connection
def connect(self, access_key_id=None, secret_access_key=None, **kwargs): """ Opens a connection to appropriate provider, depending on provider portion of URI. Requires Credentials defined in boto config file (see boto/pyami/config.py). @type storage_uri: StorageUri @param storage_uri: StorageUri specifying a bucket or a bucket+object @rtype: L{AWSAuthConnection<boto.gs.connection.AWSAuthConnection>} @return: A connection to storage service provider of the given URI. """ connection_args = dict(self.connection_args or ()) # Use OrdinaryCallingFormat instead of boto-default # SubdomainCallingFormat because the latter changes the hostname # that's checked during cert validation for HTTPS connections, # which will fail cert validation (when cert validation is enabled). # Note: the following import can't be moved up to the start of # this file else it causes a config import failure when run from # the resumable upload/download tests. from boto.s3.connection import OrdinaryCallingFormat connection_args['calling_format'] = OrdinaryCallingFormat() if (hasattr(self, 'suppress_consec_slashes') and 'suppress_consec_slashes' not in connection_args): connection_args['suppress_consec_slashes'] = ( self.suppress_consec_slashes) connection_args.update(kwargs) if not self.connection: if self.scheme in self.provider_pool: self.connection = self.provider_pool[self.scheme] elif self.scheme == 's3': from boto.s3.connection import S3Connection self.connection = S3Connection(access_key_id, secret_access_key, **connection_args) self.provider_pool[self.scheme] = self.connection elif self.scheme == 'gs': from boto.gs.connection import GSConnection self.connection = GSConnection(access_key_id, secret_access_key, **connection_args) self.provider_pool[self.scheme] = self.connection elif self.scheme == 'file': from boto.file.connection import FileConnection self.connection = FileConnection(self) else: raise InvalidUriError('Unrecognized scheme "%s"' % self.scheme) self.connection.debug = self.debug return self.connection
def multipart_upload_worker(scheme, parsed_url, storage_uri, bucket_name, multipart_id, filename, offset, bytes, num_retries, queue): """ Worker method for uploading a file chunk to S3 using multipart upload. Note that the file chunk is read into memory, so it's important to keep this number reasonably small. """ def _upload_callback(uploaded, total): worker_name = multiprocessing.current_process().name log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total)) if queue is not None: queue.put([uploaded, total]) # Push data to the consumer thread def _upload(num_retries): worker_name = multiprocessing.current_process().name log.Debug("%s: Uploading chunk %d" % (worker_name, offset + 1)) try: conn = get_connection(scheme, parsed_url, storage_uri) bucket = conn.lookup(bucket_name) for mp in bucket.list_multipart_uploads(): if mp.id == multipart_id: with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd: start = time.time() mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback, num_cb=max(2, 8 * bytes / (1024 * 1024)) ) # Max num of callbacks = 8 times x megabyte end = time.time() log.Debug(("{name}: Uploaded chunk {chunk}" "at roughly {speed} bytes/second").format(name=worker_name, chunk=offset + 1, speed=(bytes / max(1, abs(end - start))))) break conn.close() conn = None bucket = None del conn except Exception as e: traceback.print_exc() if num_retries: log.Debug("%s: Upload of chunk %d failed. Retrying %d more times..." % ( worker_name, offset + 1, num_retries - 1)) return _upload(num_retries - 1) log.Debug("%s: Upload of chunk %d failed. Aborting..." % ( worker_name, offset + 1)) raise e log.Debug("%s: Upload of chunk %d complete" % (worker_name, offset + 1)) return _upload(num_retries)