我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用botocore.exceptions.BotoCoreError()。
def get_last_updated(self, source_id): params = { "TableName": self.table_name, "IndexName": "source_id-updated-index", "KeyConditionExpression": "source_id = :value", "ExpressionAttributeValues": { ":value": {"S": str(source_id)}, }, "ProjectionExpression": "source_id, updated, post_id", "ScanIndexForward": False, "Limit": 1, } db = await self.db try: response = await db.query(**params) if response.get("ResponseMetadata", {}).get("HTTPStatusCode") == 200: updated = response["Items"][0]["updated"]["S"] if response["Count"] == 1 else None tstamp = parse_date(updated) if updated else None return tstamp except BotoCoreError as exc: logger.error("[DB] Error when querying for last updated item on {}".format(source_id)) logger.error(exc) return None
def get_control(self): params = { "TableName": self.control_table_name, "KeyConditionExpression": "id= :value", "ExpressionAttributeValues": { ":value": {"S": "control"} }, "ScanIndexForward": False, } try: db = await self.db response = await db.query(**params) if response.get("ResponseMetadata", {}).get("HTTPStatusCode") == 200: db_post = response["Items"][0] if response["Count"] >= 1 else None if db_post: return json.loads(db_post["data"]["S"]) except BotoCoreError as exc: logger.error("[DB] Error when querying for control_data on {}".format(self.control_table_name)) logger.error(exc) return False
def create_project_repository(self): try: repository = self.client.create_repository(repositoryName=config.project_name)['repository'] except BotoCoreError: return False except KeyError: return False try: assert 'createdAt' in repository assert repository['registryId'] == config.aws_account_id assert 'repositoryArn' in repository assert repository['repositoryName'] == config.project_name assert 'repositoryUri' in repository except AssertionError: return False return True
def _wait_for_image_state(self, ami, desired_state): """Timer to wait for the image/snapshot to reach a desired state :params:ami_id: correspoding image id in Amazon :params:desired_state: the desired new state of the image to be in. """ def _wait_for_state(): """Called at an interval until the AMI image is available.""" try: # LOG.info("\n\n\nImage id = %s" % ami_id + ", state = %s\n\n\n" % state) image = self.driver.Image(ami.id) if image.state == desired_state: LOG.info("Image has changed state to %s." % desired_state) raise loopingcall.LoopingCallDone() else: LOG.info("Image state %s." % image.state) except BotoCoreError as e: LOG.info("BotoCoreError: {}".format(e)) pass timer = loopingcall.FixedIntervalLoopingCall(_wait_for_state) timer.start(interval=3).wait()
def scan(self): scan_params = { "TableName": self.table_name, "Select": "ALL_ATTRIBUTES", "Limit": 5 } db = await self.db try: response = await db.scan(**scan_params) if response.get("ResponseMetadata", {}).get("HTTPStatusCode") == 200: return response["Items"] except BotoCoreError as exc: logger.error("[DB] Error when scanning db") logger.error(exc) return []
def insert_post(self, **kwargs): params = { "TableName": self.table_name, "Item": { "target_id": {"S": kwargs.get("target_id")}, "post_id": {"S": str(kwargs.get("post_id"))}, "source_id": {"S": kwargs.get("source_id")}, "text": {"S": kwargs.get("text") or " "}, "sticky": {"N": str(int(kwargs.get("sticky", False)))}, "created": {"S": datetime.strftime(kwargs.get("created"), self.date_fmt)}, "updated": {"S": datetime.strftime(kwargs.get("updated"), self.date_fmt)}, } } # add doc at target if present if kwargs.get("target_doc"): params["Item"]["target_doc"] = {"S": json.dumps(kwargs["target_doc"])} db = await self.db try: response = await db.put_item(**params) if response.get("ResponseMetadata", {}).get("HTTPStatusCode") == 200: logger.info("[DB] Post {} {} was saved!".format(kwargs["source_id"], kwargs["post_id"])) return True except BotoCoreError as exc: logger.error("[DB] Error when saving {}".format(kwargs)) logger.error(exc) return False
def get_post(self, target_id, post_id): params = { "TableName": self.table_name, "KeyConditionExpression": "target_id = :value AND post_id= :post_id", "ExpressionAttributeValues": { ":value": {"S": str(target_id)}, ":post_id": {"S": str(post_id)} }, "ScanIndexForward": False, "ProjectionExpression": "target_id, source_id, updated, post_id, target_doc, sticky", } db = await self.db try: response = await db.query(**params) if response.get("ResponseMetadata", {}).get("HTTPStatusCode") == 200: db_post = response["Items"][0] if response["Count"] >= 1 else None if db_post: db_post["sticky"] = db_post.get("sticky", {}).get("N") db_post["updated"] = db_post.get("updated", {}).get("S") db_post["source_id"] = db_post.get("source_id", {}).get("S") db_post["post_id"] = db_post.get("post_id", {}).get("S") db_post["target_id"] = db_post.get("target_id", {}).get("S") db_post["target_doc"] = json.loads(db_post["target_doc"]["S"]) \ if db_post.get("target_doc", {}).get("S") else {} return db_post except BotoCoreError as exc: logger.error("[DB] Error when querying for a post [{}] on {}".format(post_id, target_id)) logger.error(exc) return None
def test_get_last_updated_failing(self): source_id = "56fceedda505e600f71959c8" db = await self.client.db db.query = asynctest.CoroutineMock(side_effect=BotoCoreError) res = await self.client.get_last_updated(source_id) assert res is None
def test_get_post_failing(self): post_id = "urn:newsml:localhost:2016-04-06T14:36:37.255055:f2266f58-1e5c-4021-85af-e39087d94372" db = await self.client.db db.query = asynctest.CoroutineMock(side_effect=BotoCoreError) res = await self.client.get_post(self.target_id, post_id) assert res is None
def test_delete_failing(self): db = await self.client.db db.delete_item = asynctest.CoroutineMock(side_effect=BotoCoreError) res = await self.client.delete_post("target-id", "baz") assert res is False
def test_get_control_failing(self): db = await self.client.db db.query = asynctest.CoroutineMock(side_effect=BotoCoreError) res = await self.client.get_control() assert res is False
def test_save_control_failing(self): db = await self.client.db db.put_item = asynctest.CoroutineMock(side_effect=BotoCoreError) res = await self.client.save_control({"foo": "bar"}) assert res is False
def sing_a_song(intent_request): logger.info(intent_request) slots = get_slots(intent_request) song_slot = slots['song'] output_session_attributes = intent_request['sessionAttributes'] if intent_request['sessionAttributes'] is not None else {} song_from_session = output_session_attributes.get('song') if song_from_session is None and song_slot is None: return elicit_slot(output_session_attributes, intent_request['currentIntent']['name'], slots, 'song', 'What song do you want me to sing?') if song_slot is not None: song = song_slot else: song = song_from_session if str(song).lower() == 'Hips don\'t lie'.lower(): response = 'This is your lucky day! I can sing that one :)' else: response = 'I still don\'t know {}, but I can sing another one ;)'.format(song) filename = 'singing_files/shak_{}.mp3'.format(secure_random.random()) try: audio_stream = polly.synthesize_speech( Text='And I\'m on tonight, you know my hips don\'t lie and I\'m starting to feel it\'s right. All the attraction, the tension, don\'t you see baby, this is perfection.', OutputFormat='mp3', VoiceId='Joanna') with closing(audio_stream['AudioStream']) as stream: bucket.put_object(Key=filename, Body=stream.read(), ACL='public-read') except BotoCoreError as error: logging.error(error) return close_with_response_card(intent_request['sessionAttributes'], 'Fulfilled', response, 'Click it, that\'s me singing', None, '{}/{}/{}'.format(s3_client.meta.endpoint_url, bucket.name, filename), 'https://s3.amazonaws.com/shakirachatbot/play_icon.png')
def catch_exceptions(func): """ Catches and simplifies expected errors thrown by sceptre. catch_exceptions should be used as a decorator. :param func: The function which may throw exceptions which should be simplified. :type func: func :returns: The decorated function. :rtype: func """ @wraps(func) def decorated(*args, **kwargs): """ Invokes ``func``, catches expected errors, prints the error message and exits sceptre with a non-zero exit code. """ try: return func(*args, **kwargs) except (SceptreException, BotoCoreError, ClientError, Boto3Error, TemplateError) as error: write(error) sys.exit(1) return decorated
def test_fetch_messages_with_botocoreerror(mock_boto_session_sqs, boto_client_sqs): with mock_boto_session_sqs: error = BotoCoreError() boto_client_sqs.receive_message.side_effect = error provider = SQSProvider('queue-name') with pytest.raises(ProviderError): await provider.fetch_messages()
def lambda_handler(event, context): rss = event['rss'] bucket_name = event['bucket'] logging.info("Processing url: %s" % rss) logging.info("Using bucket: %s" % bucket_name) session = Session(region_name="us-west-2") polly = session.client("polly") s3 = resource('s3') bucket = s3.Bucket(bucket_name) logging.info("getting list of existing objects in the given bucket") files = set(o.key for o in bucket.objects.all()) feed = feedparser.parse(rss) title = feed['feed']['title'] fg = FeedGenerator() fg.load_extension('podcast') fg.title('Audio podcast based on: %s' % title) fg.link(href=feed.feed.link, rel='alternate') fg.subtitle(feed.feed.description) ENTRY_URL = "http://{bucket}.s3-website.{region}.amazonaws.com/{filename}" for entry in get_entries(feed): filename = "%s.mp3" % entry['id'] fe = fg.add_entry() fe.id(entry['id']) fe.title(entry['title']) fe.published(entry['published']) entry_url = ENTRY_URL.format(bucket=bucket_name, filename=filename, region=os.environ["AWS_REGION"]) fe.enclosure(entry_url, 0, 'audio/mpeg') if filename in files: logging.info('Article "%s" with id %s already exist, skipping.' % (entry['title'], entry['id'])) continue try: logging.info("Next entry, size: %d" % len(entry['content'])) logging.debug("Content: %s" % entry['content']) response = polly.synthesize_speech( Text=entry['content'], OutputFormat="mp3", VoiceId="Joanna") with closing(response["AudioStream"]) as stream: bucket.put_object(Key=filename, Body=stream.read()) except BotoCoreError as error: logging.error(error) bucket.put_object(Key='podcast.xml', Body=fg.rss_str(pretty=True))