我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用botocore.session()。
def make_session(identity_profile): session = botocore.session.Session(profile=identity_profile) try: session3 = boto3.session.Session(botocore_session=session) except botocore.exceptions.ProfileNotFound as err: print(str(err), file=sys.stderr) if session.available_profiles: print("Available profiles: %s" % ", ".join(sorted(session.available_profiles)), file=sys.stderr) print("You can specify a profile by passing it with the -i " "command line flag.", file=sys.stderr) else: print("You have no AWS profiles configured. Please run 'aws " "configure --profile identity' to get started.", file=sys.stderr) return None, None, USER_RECOVERABLE_ERROR return session, session3, None
def one_mfa(args, credentials): session, session3, err = make_session(args.identity_profile) if err: return err if "AWSMFA_TESTING_MODE" in os.environ: use_testing_credentials(args, credentials) return OK mfa_args = {} if args.token_code != 'skip': serial_number, token_code, err = acquire_code(args, session, session3) if err is not OK: return err mfa_args['SerialNumber'] = serial_number mfa_args['TokenCode'] = token_code sts = session3.client('sts') try: if args.role_to_assume: mfa_args.update( DurationSeconds=min(args.duration, 3600), RoleArn=args.role_to_assume, RoleSessionName=args.role_session_name) response = sts.assume_role(**mfa_args) else: mfa_args.update(DurationSeconds=args.duration) response = sts.get_session_token(**mfa_args) except botocore.exceptions.ClientError as err: if err.response["Error"]["Code"] == "AccessDenied": print(str(err), file=sys.stderr) return USER_RECOVERABLE_ERROR else: raise print_expiration_time(response['Credentials']['Expiration']) update_credentials_file(args.aws_credentials, args.target_profile, args.identity_profile, credentials, response['Credentials']) return OK
def acquire_code(args, session, session3): """returns the user's token serial number, MFA token code, and an error code.""" serial_number = find_mfa_for_user(args.serial_number, session, session3) if not serial_number: print("There are no MFA devices associated with this user.", file=sys.stderr) return None, None, USER_RECOVERABLE_ERROR token_code = args.token_code if token_code is None: while token_code is None or len(token_code) != 6: token_code = getpass.getpass("MFA Token Code: ") return serial_number, token_code, OK
def rotate(args, credentials): """rotate the identity profile's AWS access key pair.""" current_access_key_id = credentials.get( args.identity_profile, 'aws_access_key_id') # create new sessions using the MFA credentials session, session3, err = make_session(args.target_profile) if err: return err iam = session3.resource('iam') # find the AccessKey corresponding to the identity profile and delete it. current_access_key = next((key for key in iam.CurrentUser().access_keys.all() if key.access_key_id == current_access_key_id)) current_access_key.delete() # create the new access key pair iam_service = session3.client('iam') new_access_key_pair = iam_service.create_access_key()["AccessKey"] print("Rotating from %s to %s." % (current_access_key.access_key_id, new_access_key_pair['AccessKeyId']), file=sys.stderr) update_credentials_file(args.aws_credentials, args.identity_profile, args.identity_profile, credentials, new_access_key_pair) print("%s profile updated." % args.identity_profile, file=sys.stderr) return OK
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, region_name=None, botocore_session=None, profile_name=None): if botocore_session is not None: self._session = botocore_session else: # Create a new default session self._session = botocore.session.get_session() # Setup custom user-agent string if it isn't already customized if self._session.user_agent_name == 'Botocore': botocore_info = 'Botocore/{0}'.format( self._session.user_agent_version) if self._session.user_agent_extra: self._session.user_agent_extra += ' ' + botocore_info else: self._session.user_agent_extra = botocore_info self._session.user_agent_name = 'Boto3' self._session.user_agent_version = boto3.__version__ if profile_name is not None: self._session.set_config_variable('profile', profile_name) if aws_access_key_id or aws_secret_access_key or aws_session_token: self._session.set_credentials( aws_access_key_id, aws_secret_access_key, aws_session_token) if region_name is not None: self._session.set_config_variable('region', region_name) self.resource_factory = ResourceFactory( self._session.get_component('event_emitter')) self._setup_loader() self._register_default_handlers()
def events(self): """ The event emitter for a session """ return self._session.get_component('event_emitter')
def available_profiles(self): """ The profiles available to the session credentials """ return self._session.available_profiles
def get_credentials(self): """ Return the :class:`botocore.credential.Credential` object associated with this session. If the credentials have not yet been loaded, this will attempt to load them. If they have already been loaded, this will return the cached credentials. """ return self._session.get_credentials()
def client(*args, **kwargs): return session().client(*args, **kwargs)
def credentials(*args, **kwargs): return session().get_credentials(*args, **kwargs)
def resource(*args, **kwargs): return session().resource(*args, **kwargs)
def session(): aws_config = config.get('aws') or {} profile = aws_config.pop('profile_name', None) if 'botocore' not in _sessions: print('[ssha] creating aws session') _sessions['botocore'] = botocore.session.Session(profile=profile) resolver = _sessions['botocore'].get_component('credential_provider') provider = resolver.get_provider('assume-role') provider.cache = boto3_session_cache.JSONFileCache() return boto3.Session(botocore_session=_sessions['botocore'], **aws_config)
def test_can_create_deployer_from_factory_function(): session = botocore.session.get_session() d = deployer.create_default_deployer(session) assert isinstance(d, deployer.Deployer)
def __init__(self, session=None, policy_actions=None): # type: (Any, Dict[str, str]) -> None if session is None: session = botocore.session.get_session() if policy_actions is None: policy_actions = load_policy_actions() self._session = session self._policy_actions = policy_actions
def test_boto3(pyi_builder): pyi_builder.test_source( """ import boto3 session = boto3.Session(region_name='us-west-2') # verify all clients for service in session.get_available_services(): session.client(service) # verify all resources for resource in session.get_available_resources(): session.resource(resource) """)
def test_botocore(pyi_builder): pyi_builder.test_source( """ import botocore from botocore.session import Session session = Session() # verify all services for service in session.get_available_services(): session.create_client(service, region_name='us-west-2') """)
def add_regional_client(self, region_name): """Adds a regional client for the specified region if it does not already exist. :param str region_name: AWS Region ID (ex: us-east-1) """ if region_name not in self._regional_clients: self._regional_clients[region_name] = boto3.session.Session( region_name=region_name, botocore_session=self.config.botocore_session ).client('kms')
def __init__(self, region_name, lambda_function_name): self.session = botocore.session.get_session() self.region_name = region_name self.lambda_function_name = lambda_function_name self.lambclient = self.session.create_client('lambda', region_name=region_name) self.TIME_LIMIT = True