我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用os.environ.get()。
def configure_options(self): if not environ.get('CLUSTER_NAME'): self.logger.fatal('CLUSTER_NAME environment variable is required') sys.exit(1) if not environ.get('MONGODB_DEVICE_NAME'): self.logger.fatal('MONGODB_DEVICE_NAME environment variable is required') sys.exit(1) self.cluster_name = environ['CLUSTER_NAME'] self.device_name = environ['MONGODB_DEVICE_NAME'] self.snapshot_and_exit = environ.get('MONGODB_SNAPSHOT_AND_EXIT', 'false').lower() == 'true' self.minutely_snapshots = int(environ.get('MONGODB_MINUTELY_SNAPSHOTS', '360')) self.hourly_snapshots = int(environ.get('MONGODB_HOURLY_SNAPSHOTS', '24')) self.daily_snapshots = int(environ.get('MONGODB_DAILY_SNAPSHOTS', '30')) self.instance_id = self.get_instance_id() self.statsd = DogStatsd(host='172.17.0.1', port=8125) self.snapshot_frequency = self.get_snapshot_frequency()
def wait_for_snapshot_validation(event, context): """ Gets the snapshots involved in the step function and checks for success of the MongoDB validator script. """ session = boto3.session.Session(region_name=event.get("region")) ec2 = session.resource("ec2") snapshot_ids = [vol["snapshot-id"] for vol in event.get("backup-volumes")] for sid in snapshot_ids: tags = ec2.Snapshot(sid).tags if not tags: break for tag in tags: if tag.get("Key") == VALIDATION_TAG_KEY: if tag.get("Success") == "false": raise Exception("Validation failed!") else: return event raise NotReady("Snapshot not validated yet.")
def read_database(self): """ Read .yml database files provided by 2factorauth guys! """ db_dir = path.join(env.get("DATA_DIR"), "applications") + "/data/*.yml" logging.debug("Database folder is {0}".format(db_dir)) db_files = glob(db_dir) logging.debug("Reading database files started") for db_file in db_files: logging.debug("Reading database file {0}".format(db_file)) with open(db_file, 'r') as data: try: websites = yaml.load(data)["websites"] for app in websites: if self.is_valid_app(app): self.db.append(app) except yaml.YAMLError as error: logging.error("Error loading yml file {0} : {1}".format( db_file, str(error))) except TypeError: logging.error("Not a valid yml file {0}".format(db_file)) logging.debug("Reading database files finished")
def get_icon(image, size): """ Generate a GdkPixbuf image :param image: icon name or image path :return: GdkPixbux Image """ directory = path.join(env.get("DATA_DIR"), "applications", "images") + "/" theme = Gtk.IconTheme.get_default() if theme.has_icon(path.splitext(image)[0]): icon = theme.load_icon(path.splitext(image)[0], size, 0) elif path.exists(directory + image): icon = GdkPixbuf.Pixbuf.new_from_file(directory + image) elif path.exists(image): icon = GdkPixbuf.Pixbuf.new_from_file(image) else: icon = theme.load_icon("image-missing", size, 0) if icon.get_width() != size or icon.get_height() != size: icon = icon.scale_simple(size, size, GdkPixbuf.InterpType.BILINEAR) return icon
def download_file(url, local_filename): """ Downloads a file from an url to a local file. Args: url (str): url to download from. local_filename (str): local file to download to. Returns: str: file name of the downloaded file. """ r = requests.get(url, stream=True) if path.dirname(local_filename) and not path.isdir(path.dirname(local_filename)): raise Exception(local_filename) makedirs(path.dirname(local_filename)) with open(local_filename, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: f.write(chunk) return local_filename
def process_request(self, request): url = match(r'^/django_dev_protector/$', request.path) if url and request.method == 'POST': import json data = json.loads(request.body.decode('utf-8')) if data['key'] == settings.SECRET_KEY: from .setup import save_status environ[PROTECT_STATUS_VARIABLE] = str(data['status']) save_status(data['status']) return redirect('/') if environ.get(PROTECT_STATUS_VARIABLE) == 'True': from django.shortcuts import render return render(request, TEMPLATE_NAME, { 'redirect_url': REDIRECT_URL })
def main(): bot = Bot(environ.get('TOKEN')) chat_id = environ.get('CHAT_ID') post.updatedb() new_posts = post.get_posts(sent_only=False) if new_posts: for p in new_posts: try: bot.sendMessage(chat_id=chat_id, text=p.text(), parse_mode='HTML') post.mark_as_sent(p.uid) except BadRequest: logging.info('Bad post formatting: %d' % p.uid) logger.info('%d new post(s) have been sent!' % len(new_posts)) else: logger.info('No new posts at this time!')
def fetch(): url = environ.get('URL') html = requests.get(url).content soup = BeautifulSoup(html, 'html.parser') posts = list() for link in soup.select('#threads a.title'): post = dict() post['title'] = link.text post['href'] = link.get('href') post['uid'] = int(post['href'].replace(url, '')[:6]) # TODO posts.append(post) return posts
def get_attributes(device_pk): """Get dict of numerical attributes""" # make request selector = 'id:%s' % Device.objects.get(pk=device_pk).manufacturer_id url = BASE_URL + selector data = make_request(url)[0] # assemble result result = { 'brightness': data['brightness'], 'hue': data['color']['hue'], 'saturation': data['color']['saturation'], 'kelvin': data['color']['kelvin'], } # return return result
def test_008_s3(self): """Cache on amazon is working properly""" if not environ.get('TRAVIS') == 'true': # This set of tests can be run just locally with properly set # amazon credentials, this will synchronize the xsd files # (and others) it does not make sense in travis. url = self.test_url_plain url_on_s3 = self.s3_url_plain new_url = tools.cache_s3(url, self.test_plain) self.assertEqual(new_url, url_on_s3, 'The url on amazon was not setted properly ' 'got %s' % new_url) new_url = tools.cache_s3(url_on_s3, self.test_plain) self.assertEqual(new_url, url_on_s3, 'The url on amazon wired not return what I expected' ' properly got %s' % new_url) check_s3 = tools.check_s3('NOTEXISTSORNOTPROPERACL', 'url/no/exists') self.assertFalse(check_s3, 'checking a non existing bucket fails') check_s3 = tools.check_s3('s3.vauxoo.com', 'url/no/exists') self.assertFalse(check_s3, 'checking a non existing element fails')
def __conf(section, param, type=None): try: if type == str or type is None: return _config_parser.get(section, param) elif type == int: return _config_parser.getint(section, param) elif type == bool: return _config_parser.getboolean(section, param) elif type == float: return _config_parser.getfloat(section, param) else: return None except (KeyError, configparser.NoSectionError): return None except: print('Error with key {0} in section {1}'.format(param, section)) sys.exit(1)
def __parseArg(self, arg): '''Parse the given arg element to get (and resolve) its name and value. * arg -- the arg element ''' name = self.__getAttribute(arg, self.NameAttribute) # Grab the default and standard value default = arg.attrib.get(self.DefaultAttribute, None) value = arg.attrib.get(self.ValueAttribute, default) # Any of these attributes may have substitution arguments # that need to be resolved name = self.__resolveText(name) # Only resolve the value if it is defined if value is not None: value = self.__resolveText(value) return name, value ##### ROS launch substitution argument related functions
def __onEnvSubstitutionArg(self, env): '''Handle the ROS launch 'env' or 'optenv' substitution argument which aims to substitute the value of an environment variable inside of some text. * package -- the package to find ''' # Determine if a default value was supplied parts = env.split(" ") if len(parts) == 1: #### No default value was supplied if parts[0] not in environ: raise Exception( "Could not find environment variable: '%s'" % env) return environ[env] else: #### A default value was supplied env, default = parts return environ.get(env, default)
def _environ_cols_linux(fp): # pragma: no cover # import os # if fp is None: # try: # fp = os.open(os.ctermid(), os.O_RDONLY) # except: # pass try: from termios import TIOCGWINSZ from fcntl import ioctl from array import array except ImportError: return None else: try: return array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[1] except: try: from os.environ import get except ImportError: return None else: return int(get('COLUMNS', 1)) - 1
def __init__(self, client_id, client_secret, redirect_uri): proxies = dict(http=environ.get('HTTP_PROXY', ''), https=environ.get('HTTPS_PROXY', '')) # some certificates such as netatmo are invalid super(ApiManager, self).__init__( ServiceInformation( '%s/oauth/v2/authorize' % URL_API, '%s/oauth/v2/token' % URL_API, client_id=client_id, client_secret=client_secret, scopes=ApiManager.SCOPES, skip_ssl_verifications=False), proxies) self.folders = Folders(self) self.freespace = Freespace(self) self.files = Files(self) self.redirect_uri = redirect_uri
def _extract(self, filename, password): archive_path = _prepare_archive_at_path(filename) if not archive_path: return None # Extraction. extract_path = environ.get("TEMP", "/tmp") with ZipFile(archive_path, "r") as archive: try: archive.extractall(path=extract_path, pwd=password) except BadZipfile: raise Exception("Invalid Zip file") # Try to extract it again, but with a default password except RuntimeError: try: archive.extractall(path=extract_path, pwd="infected") except RuntimeError as err: raise Exception("Unable to extract Zip file: %s" % err) finally: self._extract_nested_archives(archive, extract_path, password) return archive.namelist()
def get_database_connection(): global logger s3 = boto3.resource('s3') metasrcs = ujson.load( s3.Object('net-mozaws-prod-us-west-2-pipeline-metadata', 'sources.json').get()['Body']) creds = ujson.load( s3.Object( 'net-mozaws-prod-us-west-2-pipeline-metadata', '%s/write/credentials.json' % metasrcs[DB]['metadata_prefix'] ).get()['Body']) conn = psycopg2.connect(connection_factory=LoggingConnection, host=creds['host'], port=creds.get('port', 5432), user=creds['username'], password=creds['password'], dbname=creds['db_name']) conn.initialize(logger) return conn
def setup(self, api_host=None, api_key=None, api_secret=None, access_token=None, token_issuer_path=None, token_issuer_host=None, **kwargs): self.api_host = api_host or environ.get('TRUSTPILOT_API_HOST', 'https://api.trustpilot.com') self.token_issuer_host = token_issuer_host or self.api_host self.access_token = access_token self.token_issuer_path = token_issuer_path or environ.get( 'TRUSTPILOT_API_TOKEN_ISSUER_PATH', "oauth/system-users/token") self.hooks = dict() if not self.api_host.startswith("http"): raise requests.URLRequired( "'{}' is not a valid api_host url".format(api_host)) try: self.api_key=api_key or environ['TRUSTPILOT_API_KEY'] self.api_secret=api_secret or environ.get('TRUSTPILOT_API_SECRET', '') self.access_token=access_token self.hooks['response'] = self._post_request_callback except KeyError as e: logging.debug("Not auth setup, missing env-var or setup for {}".format(e)) return self
def set_resource(self, groupname, resourcename, resource, details=None): session = self.sessions.get(details.caller) if session is None: return assert isinstance(session, ExporterSession) groupname = str(groupname) resourcename = str(resourcename) # TODO check if acquired print(details) pprint(resource) action, resource_path = session.set_resource(groupname, resourcename, resource) if action is Action.ADD: self._add_default_place(groupname) yield from self._update_acquired_places(action, resource_path) self.save_later()
def connect(self): """connects to the database""" self.conn = mdb.connect( host = self.config["db"]["host"], user = self.config["db"]["user"], passwd = self.config["db"]["password"], db = self.config["db"]["database"], port = self.config["db"].get("port", 3306), use_unicode = True, charset = "utf8") # checks whether this is a ViewVC database instead of a Bonsai database cursor = self.conn.cursor() cursor.execute("show tables like 'commits'") self.is_viewvc_database = (cursor.rowcount == 1) cursor.execute("SET SESSION innodb_lock_wait_timeout = 500") cursor.close() self.conn.begin()
def extra_data_for_key_tables(self, cursor, column, row, value): """provides additional data that should be stored in lookup tables""" extra_column = "" extra_data = "" data = [value] if column == "description": extra_column = ", hash" extra_data = ", %s" data.append(len(value)) elif column == "repository": extra_column = ", base_url, repository_url, file_url, commit_url, tracker_url, icon_url" extra_data = ", %s, %s, %s, %s, %s, %s" data.extend(self.call_setup_repository(row, self.guess_repository_urls(row))) elif column == "hash": extra_column = ", authorid, committerid, co_when" extra_data = ", %s, %s, %s" self.fill_id_cache(cursor, "who", row, row["author"]) self.fill_id_cache(cursor, "who", row, row["committer"]) data.extend((self.cache.get("who", row["author"]), self.cache.get("who", row["committer"]), row["co_when"])) return data, extra_column, extra_data
def __getattr__(self, attr): """Return a terminal capability, like bold. For example, you can say ``term.bold`` to get the string that turns on bold formatting and ``term.normal`` to get the string that turns it off again. Or you can take a shortcut: ``term.bold('hi')`` bolds its argument and sets everything to normal afterward. You can even combine things: ``term.bold_underline_red_on_bright_green('yowzers!')``. For a parametrized capability like ``cup``, pass the parameters too: ``some_term.cup(line, column)``. ``man terminfo`` for a complete list of capabilities. Return values are always Unicode. """ resolution = (self._resolve_formatter(attr) if self.does_styling else NullCallableString()) setattr(self, attr, resolution) # Cache capability codes. return resolution
def _height_and_width(self): """Return a tuple of (terminal height, terminal width). Start by trying TIOCGWINSZ (Terminal I/O-Control: Get Window Size), falling back to environment variables (LINES, COLUMNS), and returning (None, None) if those are unavailable or invalid. """ # tigetnum('lines') and tigetnum('cols') update only if we call # setupterm() again. for descriptor in self._init_descriptor, sys.__stdout__: try: return struct.unpack( 'hhhh', ioctl(descriptor, TIOCGWINSZ, '\000' * 8))[0:2] except IOError: # when the output stream or init descriptor is not a tty, such # as when when stdout is piped to another program, fe. tee(1), # these ioctls will raise IOError pass try: return int(environ.get('LINES')), int(environ.get('COLUMNS')) except TypeError: return None, None
def _get_uid(self): old_lang = environ.get('LANG') environ['LANG'] = 'C' ioreg_process = Popen(["ioreg", "-l"], stdout=PIPE) grep_process = Popen(["grep", "IOPlatformSerialNumber"], stdin=ioreg_process.stdout, stdout=PIPE) ioreg_process.stdout.close() output = grep_process.communicate()[0] if old_lang is None: environ.pop('LANG') else: environ['LANG'] = old_lang if output: return output.split()[3][1:-1] else: return None
def _get_platform(self): if self._platform_android is None: # ANDROID_ARGUMENT and ANDROID_PRIVATE are 2 environment variables # from python-for-android project self._platform_android = 'ANDROID_ARGUMENT' in environ if self._platform_ios is None: self._platform_ios = (environ.get('KIVY_BUILD', '') == 'ios') # On android, _sys_platform return 'linux2', so prefer to check the # import of Android module than trying to rely on _sys_platform. if self._platform_android is True: return 'android' elif self._platform_ios is True: return 'ios' elif _sys_platform in ('win32', 'cygwin'): return 'win' elif _sys_platform == 'darwin': return 'macosx' elif _sys_platform[:5] == 'linux': return 'linux' return 'unknown'
def get_key(key=None, keyfile=None): """ returns a key given either its value, a path to it on the filesystem or as last resort it checks the environment variable CRYPTOYAML_SECRET """ if key is None: if keyfile is None: key = environ.get('CRYPTOYAML_SECRET') if key is None: raise MissingKeyException( '''You must either provide a key value,''' ''' a path to a key or its value via the environment variable ''' ''' CRYPTOYAML_SECRET''' ) else: key = key.encode('utf-8') else: key = open(keyfile, 'rb').read() return key
def main(): '''Send the RPC command to the server and print the result.''' parser = argparse.ArgumentParser('Send electrumx an RPC command') parser.add_argument('-p', '--port', metavar='port_num', type=int, help='RPC port number') parser.add_argument('command', nargs=1, default=[], help='command to send') parser.add_argument('param', nargs='*', default=[], help='params to send') args = parser.parse_args() port = args.port if port is None: port = int(environ.get('RPC_PORT', 8000)) # Get the RPC request. method = args.command[0] params = args.param if method in ('log', 'disconnect'): params = [params] rpc_send_and_wait(port, method, params)
def _make_postgres_string(password): """Create postgres connection string. It's parametrized, so it's possible to create either quoted or unquoted version of connection string. Note that it's outside of class since there is no simple way how to call it inside the class without class initialization. :param password: password which will be embedded into Postgres connection string :return: fully working postgres connection string """ connection = 'postgresql://{user}:{password}@{pgbouncer_host}:{pgbouncer_port}' \ '/{database}?sslmode=disable'. \ format(user=environ.get('POSTGRESQL_USER'), password=password, pgbouncer_host=environ.get('PGBOUNCER_SERVICE_HOST', 'coreapi-pgbouncer'), pgbouncer_port=environ.get('PGBOUNCER_SERVICE_PORT', '5432'), database=environ.get('POSTGRESQL_DATABASE')) return connection
def get(self, number): '''Gets an entry, or return None if the entry does not exist.''' # Do a binary search of the interval tree bmin = 0 bmax = len(self.intervals) - 1 while True: if bmax < bmin: return None index = (bmax + bmin) // 2 on = self.intervals[index] if number < on[0][0]: bmax = index - 1 elif number >= on[0][1]: bmin = index + 1 else: return on[1]
def get_logentry_at_time(*args): ''' If one argument is provided, get the LogEntry corresponding to the given unix time (in milliseconds). If two arguments are provided, get the LogEntry at clipid (first argument) and time in that clip in milliseconds (second argument). ''' if len(args) == 1: unixtime = args[0] else: unixtime = timestamp_from_video(*args) for logentry in get_logentries(): if logentry.starttime <= unixtime < logentry.endtime: return logentry
def get_data_home(ramp_kits_home=None): """Return the path of the ramp-kits data dir. This folder is used to fetch the up-to-date ramp-kits By default the data dir is set to a folder named 'ramp-kits' in the user home folder. Alternatively, it can be set by the 'RAMP-KITS' environment variable or programmatically by giving an explicit folder path. The '~' symbol is expanded to the user home folder. If the folder does not already exist, it is automatically created. """ if ramp_kits_home is None: ramp_kits_home = environ.get('RAMP-KITS', join('~', 'ramp-kits')) ramp_kits_home = expanduser(ramp_kits_home) if not exists(ramp_kits_home): makedirs(ramp_kits_home) return ramp_kits_home
def getDynamoDBConnection(self, config=None, endpoint=None, port=None, local=False, use_instance_metadata=False): if not config: config = {'region_name': 'us-west-2'} params = { 'region_name': config.get('region_name', 'cn-north-1') } if local: endpoint_url = 'http://{endpoint}:{port}'.format(endpoint=endpoint, port=port) params['endpoint_url'] = endpoint_url db = boto3.resource('dynamodb', **params) else: if not config or not isinstance(config, dict): raise ParameterException("Invalid config") params.update(config) db = boto3.resource('dynamodb', **params) return db
def update_item_by_set(): Test.create(realname='gs01', score=100, order_score=99.99, date_created=now) item = Test.get(realname='gs01', score=100) item.update(Test.order_score.set(80)) print 'set' assert item.order_score == 80 item.update(Test.order_score.set(78.7, attr_label='os')) print 'set with attr_label' assert item.order_score == 78.7 item.update(Test.order_score.set(78.7, if_not_exists='order_score')) print 'set with if_not_exists' assert item.order_score == 78.7 item.update(Test.order_score.set(10, if_not_exists='ids[0]')) assert item.order_score == 10 print 'ids', item.ids, type(item.ids) item.update(ids=[12]) print 'ids', item.ids, type(item.ids) item.update(Test.ids.set([100], list_append=('ids', -1))) print 'set with list_append' assert item.ids[-1] == 100 item.update(Test.order_score.set(78.7, attr_label='os'), doc={'a': 'bbb'}) print 'set with attr_label and upate_field' assert item.doc['a'] == 'bbb'
def rrid_resolver_xml(exact, found_rrids): print('\t' + exact) resolver_uri = 'https://scicrunch.org/resolver/%s.xml' % exact r = requests.get(resolver_uri) status_code = r.status_code xml = r.content print(status_code) found_rrids[exact] = status_code return xml, status_code, resolver_uri
def initialize_snapshot_tags(event, context): """ Event comes in with snapshot info, make sure we"re working with a snapshot we"re supposed to and wait for it to be done snapshotting. """ session = boto3.session.Session(region_name=event.get("region")) ec2 = session.resource("ec2") snapshot_ids = [vol["snapshot-id"] for vol in event.get("backup-volumes")] for sid in snapshot_ids: if ec2.Snapshot(sid).state != "completed": raise NotReady("Snapshot not ready") return event
def terminate_instances(event, context): """ Terminate instance identified in the event """ session = boto3.session.Session(region_name=event.get("region")) ec2 = session.resource("ec2") ec2.Instance(event["instance-id"]).terminate() return event
def are_snapshots_tagged(region, snapshot_ids): """Check the current snapshot tags to prevent multiple snapshots.""" session = boto3.session.Session(region_name=region) ec2 = session.resource('ec2') for sid in snapshot_ids: snapshot = ec2.Snapshot(sid) current_tags = snapshot.tags if not current_tags: continue for tag in current_tags: if tag.get("Key") == VALIDATION_TAG_KEY: return True return False
def index_page(): if request.args.get('epub'): return create_epub() else: return create_page()
def create_page(): page = int(request.args.get("page", "0")) limit = int(request.args.get("limit", "10")) offset = page * limit toots, count, count_all = get_toots(offset, limit) accounts = Account.query.order_by(Account.username) instances = Instance.query.order_by(Instance.domain) blacklist_status = True if request.args.get('blacklisted', None) else False if request.args.get('blacklisted') != 'ignore': accounts = accounts.filter(Account.blacklisted == blacklist_status) instances = instances.filter(Instance.blacklisted == blacklist_status) pagination = {'page': page + 1, 'limit': limit} pagination['next'] = "/?page=%s&" % (page + 1) pagination['previous'] = "/?page=%s&" % (page - 1) for key, value in request.args.iteritems(): if not key == "page": pagination['next'] += "&%s=%s" % (key, value) pagination['previous'] += "&%s=%s" % (key, value) if count < limit: pagination.pop('next') if page == 0: pagination.pop('previous') pagination['page_count'] = int(count_all / limit) + 1 return render_template('index.html', toots=toots, accounts=accounts, instances=instances, pagination=pagination)
def is_ci(): ''' Check if boss is running in a Continuous Integration (CI) environment. ''' return bool( env.get('BOSS_RUNNING') == 'true' and ( (env.get('CI') == 'true') or (env.get('CONTINUOUS_INTEGRATION') == 'true') ) )
def is_travis(): ''' Check if boss is running under Travis CI. ''' return is_ci() and env.get('TRAVIS') == 'true'
def get_ci_link(config): ''' Get CI build link for the current build deployment. ''' if is_travis(): base_url = config['ci']['base_url'].rstrip('/') return ci.TRAVIS_BUILD_URL.format( base_url=base_url, repo_slug=env.get('TRAVIS_REPO_SLUG'), build_id=env.get('TRAVIS_BUILD_ID') ) # Other CI providers aren't supported at the moment. # TODO: Add support for more providers. return None
def test_intersect_proximity(): ec = environment.EnvironmentalCorrelation() ec.intersect_proximity(mining_filename, vector_filename, proximity, test_filename) expected = spectral.open_image(correlated_filename) actual = spectral.open_image(test_filename) assert numpy.array_equal(expected.asarray(), actual.asarray()) assert actual.metadata.get('description') == 'COAL '+pycoal.version+' environmental correlation image.' assert expected.metadata.get('class names') == actual.metadata.get('class names') assert expected.metadata.get('map info') == actual.metadata.get('map info')