我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.environ()。
def test_zipfile_timestamp(): # An environment variable can be used to influence the timestamp on # TarInfo objects inside the zip. See issue #143. TemporaryDirectory is # not a context manager under Python 3. with temporary_directory() as tempdir: for filename in ('one', 'two', 'three'): path = os.path.join(tempdir, filename) with codecs.open(path, 'w', encoding='utf-8') as fp: fp.write(filename + '\n') zip_base_name = os.path.join(tempdir, 'dummy') # The earliest date representable in TarInfos, 1980-01-01 with environ('SOURCE_DATE_EPOCH', '315576060'): zip_filename = wheel.archive.make_wheelfile_inner( zip_base_name, tempdir) with readable_zipfile(zip_filename) as zf: for info in zf.infolist(): assert info.date_time[:3] == (1980, 1, 1)
def _warn_unsafe_extraction_path(path): """ If the default extraction path is overridden and set to an insecure location, such as /tmp, it opens up an opportunity for an attacker to replace an extracted file with an unauthorized payload. Warn the user if a known insecure location is used. See Distribute #375 for more details. """ if os.name == 'nt' and not path.startswith(os.environ['windir']): # On Windows, permissions are generally restrictive by default # and temp directories are not writable by other users, so # bypass the warning. return mode = os.stat(path).st_mode if mode & stat.S_IWOTH or mode & stat.S_IWGRP: msg = ("%s is writable by group/others and vulnerable to attack " "when " "used with get_resource_filename. Consider a more secure " "location (set with .set_extraction_path or the " "PYTHON_EGG_CACHE environment variable)." % path) warnings.warn(msg, UserWarning)
def _pythonpath(): items = os.environ.get('PYTHONPATH', '').split(os.pathsep) return filter(None, items)
def initialize (self, sadFile): self.app_cnt = 0 if self.__timeout is not None: self.domMgr.configure([CF.DataType('COMPONENT_BINDING_TIMEOUT', to_any(self.__timeout))]) try: self.domMgr.installApplication(sadFile) except CF.DomainManager.ApplicationAlreadyInstalled: pass domRoot = os.path.join(os.environ["SDRROOT"], "dom") sad = ossie.parsers.sad.parse(domRoot + sadFile) app_id = sad.get_id() for appFact in self.domMgr._get_applicationFactories(): if appFact._get_identifier() == app_id: self.appFact = appFact return raise KeyError, "Couldn't find app factory"
def setUp(self): tempFile = tempfile.NamedTemporaryFile() self.fileServerDir = tempFile.name tempFile.close() os.mkdir(self.fileServerDir) os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'), encoding='base64') signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)), encoding='base64').decode() VERSIONS['signature'] = signature keysFilePath = os.path.join(self.fileServerDir, 'keys.gz') with gzip.open(keysFilePath, 'wb') as keysFile: keysFile.write(json.dumps(KEYS, sort_keys=True)) versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz') with gzip.open(versionsFilePath, 'wb') as versionsFile: versionsFile.write(json.dumps(VERSIONS, sort_keys=True)) os.environ['WXUPDATEDEMO_TESTING'] = 'True' from wxupdatedemo.config import CLIENT_CONFIG self.clientConfig = CLIENT_CONFIG self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
def setUp(self): tempFile = tempfile.NamedTemporaryFile() self.fileServerDir = tempFile.name tempFile.close() os.mkdir(self.fileServerDir) os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'), encoding='base64') signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)), encoding='base64').decode() VERSIONS['signature'] = signature keysFilePath = os.path.join(self.fileServerDir, 'keys.gz') with gzip.open(keysFilePath, 'wb') as keysFile: keysFile.write(json.dumps(KEYS, sort_keys=True)) versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz') with gzip.open(versionsFilePath, 'wb') as versionsFile: versionsFile.write(json.dumps(VERSIONS, sort_keys=True)) os.environ['WXUPDATEDEMO_TESTING'] = 'True' from wxupdatedemo.config import CLIENT_CONFIG self.clientConfig = CLIENT_CONFIG self.clientConfig.PUBLIC_KEY = PUBLIC_KEY self.clientConfig.APP_NAME = APP_NAME
def paths_on_pythonpath(paths): """ Add the indicated paths to the head of the PYTHONPATH environment variable so that subprocesses will also see the packages at these paths. Do this in a context that restores the value on exit. """ nothing = object() orig_pythonpath = os.environ.get('PYTHONPATH', nothing) current_pythonpath = os.environ.get('PYTHONPATH', '') try: prefix = os.pathsep.join(paths) to_join = filter(None, [prefix, current_pythonpath]) new_path = os.pathsep.join(to_join) if new_path: os.environ['PYTHONPATH'] = new_path yield finally: if orig_pythonpath is nothing: os.environ.pop('PYTHONPATH', None) else: os.environ['PYTHONPATH'] = orig_pythonpath
def to_config(config_cls, environ=os.environ): if config_cls._prefix: app_prefix = (config_cls._prefix,) else: app_prefix = () def default_get(environ, metadata, prefix, name): ce = metadata[CNF_KEY] if ce.name is not None: var = ce.name else: var = ("_".join(app_prefix + prefix + (name,))).upper() log.debug("looking for env var '%s'." % (var,)) val = environ.get(var, ce.default) if val is RAISE: raise MissingEnvValueError(var) return val return _to_config(config_cls, default_get, environ, ())
def _to_config(config_cls, default_get, environ, prefix): vals = {} for a in attr.fields(config_cls): try: ce = a.metadata[CNF_KEY] except KeyError: continue if ce.sub_cls is None: get = ce.callback or default_get val = get(environ, a.metadata, prefix, a.name) else: val = _to_config( ce.sub_cls, default_get, environ, prefix + ((a.name if prefix else a.name),) ) vals[a.name] = val return config_cls(**vals)
def get_by_cluster_id(self, cluster_id): instance = db().query(self.model).\ filter(self.model.env_id == cluster_id).first() if instance is not None: try: instance.repo = Repo(os.path.join(const.REPOS_DIR, instance.repo_name)) except exc.NoSuchPathError: logger.debug("Repo folder does not exist. Cloning repo") self._create_key_file(instance.repo_name, instance.user_key) if instance.user_key: os.environ['GIT_SSH'] = \ self._get_ssh_cmd(instance.repo_name) repo_path = os.path.join(const.REPOS_DIR, instance.repo_name) repo = Repo.clone_from(instance.git_url, repo_path) instance.repo = repo return instance
def create(self, data): if not os.path.exists(const.REPOS_DIR): os.mkdir(const.REPOS_DIR) repo_path = os.path.join(const.REPOS_DIR, data['repo_name']) if os.path.exists(repo_path): logger.debug('Repo directory exists. Removing...') shutil.rmtree(repo_path) user_key = data.get('user_key', '') if user_key: self._create_key_file(data['repo_name'], user_key) os.environ['GIT_SSH'] = self._get_ssh_cmd(data['repo_name']) repo = Repo.clone_from(data['git_url'], repo_path) instance = super(GitRepo, self).create(data) instance.repo = repo return instance
def post(self): if (request.form['username']): data = {"user": request.form['username'], "key": request.form['password']} result = dockletRequest.unauthorizedpost('/login/', data) ok = result and result.get('success', None) if (ok and (ok == "true")): # set cookie:docklet-jupyter-cookie for jupyter notebook resp = make_response(redirect(request.args.get('next',None) or '/dashboard/')) app_key = os.environ['APP_KEY'] resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(request.form['username'], app_key)) # set session for docklet session['username'] = request.form['username'] session['nickname'] = result['data']['nickname'] session['description'] = result['data']['description'] session['avatar'] = '/static/avatar/'+ result['data']['avatar'] session['usergroup'] = result['data']['group'] session['status'] = result['data']['status'] session['token'] = result['data']['token'] return resp else: return redirect('/login/') else: return redirect('/login/')
def get(self): form = external_generate.external_auth_generate_request() result = dockletRequest.unauthorizedpost('/external_login/', form) ok = result and result.get('success', None) if (ok and (ok == "true")): # set cookie:docklet-jupyter-cookie for jupyter notebook resp = make_response(redirect(request.args.get('next',None) or '/dashboard/')) app_key = os.environ['APP_KEY'] resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key)) # set session for docklet session['username'] = result['data']['username'] session['nickname'] = result['data']['nickname'] session['description'] = result['data']['description'] session['avatar'] = '/static/avatar/'+ result['data']['avatar'] session['usergroup'] = result['data']['group'] session['status'] = result['data']['status'] session['token'] = result['data']['token'] return resp else: return redirect('/login/')
def post(self): form = external_generate.external_auth_generate_request() result = dockletRequest.unauthorizedpost('/external_login/', form) ok = result and result.get('success', None) if (ok and (ok == "true")): # set cookie:docklet-jupyter-cookie for jupyter notebook resp = make_response(redirect(request.args.get('next',None) or '/dashboard/')) app_key = os.environ['APP_KEY'] resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key)) # set session for docklet session['username'] = result['data']['username'] session['nickname'] = result['data']['nickname'] session['description'] = result['data']['description'] session['avatar'] = '/static/avatar/'+ result['data']['avatar'] session['usergroup'] = result['data']['group'] session['status'] = result['data']['status'] session['token'] = result['data']['token'] return resp else: return redirect('/login/')
def pytest_configure(config): settings['SCREENSHOTS_PATH'] = config.getoption('screenshots_path') settings['PDIFF_PATH'] = config.getoption('pdiff_path') settings['ALLOW_SCREENSHOT_CAPTURE'] = config.getoption('allow_screenshot_capture') if 'ALLOW_SCREENSHOT_CAPTURE' in os.environ: settings['ALLOW_SCREENSHOT_CAPTURE'] = True try: from sh import compare settings['USE_IMAGEMAGICK'] = True except ImportError: pass try: from sh import perceptualdiff settings['USE_PERCEPTUALDIFF'] = True except ImportError: pass
def _warn_unsafe_extraction_path(path): """ If the default extraction path is overridden and set to an insecure location, such as /tmp, it opens up an opportunity for an attacker to replace an extracted file with an unauthorized payload. Warn the user if a known insecure location is used. See Distribute #375 for more details. """ if os.name == 'nt' and not path.startswith(os.environ['windir']): # On Windows, permissions are generally restrictive by default # and temp directories are not writable by other users, so # bypass the warning. return mode = os.stat(path).st_mode if mode & stat.S_IWOTH or mode & stat.S_IWGRP: msg = ( "%s is writable by group/others and vulnerable to attack " "when " "used with get_resource_filename. Consider a more secure " "location (set with .set_extraction_path or the " "PYTHON_EGG_CACHE environment variable)." % path ) warnings.warn(msg, UserWarning)
def set_environ(env_name, value): """Set the environment variable 'env_name' to 'value' Save previous value, yield, and then restore the previous value stored in the environment variable 'env_name'. If 'value' is None, do nothing""" value_changed = value is not None if value_changed: old_value = os.environ.get(env_name) os.environ[env_name] = value try: yield finally: if value_changed: if old_value is None: del os.environ[env_name] else: os.environ[env_name] = old_value
def _test_Valgrind(self, valgrind): # Clear the device cache to prevent false positives deviceCacheDir = os.path.join(scatest.getSdrCache(), ".ExecutableDevice_node", "ExecutableDevice1") shutil.rmtree(deviceCacheDir, ignore_errors=True) os.environ['VALGRIND'] = valgrind try: # Checking that the node and device launch as expected nb, devMgr = self.launchDeviceManager("/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml") finally: del os.environ['VALGRIND'] self.assertFalse(devMgr is None) self.assertEquals(len(devMgr._get_registeredDevices()), 1, msg='device failed to launch with valgrind') children = getChildren(nb.pid) self.assertEqual(len(children), 1) devMgr.shutdown() # Check that a valgrind logfile exists logfile = os.path.join(deviceCacheDir, 'valgrind.%s.log' % children[0]) self.assertTrue(os.path.exists(logfile))
def test_setSDRROOT(self): # None type self.assertRaises(TypeError, sb.setSDRROOT, None) # Bad dir should not change root sdrroot = sb.getSDRROOT() self.assertRaises(AssertionError, sb.setSDRROOT, 'TEMP_PATH') self.assertEquals(sdrroot, sb.getSDRROOT()) # Good dir with no dev/dom should not change root self.assertRaises(AssertionError, sb.setSDRROOT, 'jackhammer') self.assertEquals(sdrroot, sb.getSDRROOT()) # New root sb.setSDRROOT('sdr') self.assertEquals(sb.getSDRROOT(), 'sdr') # Restore sdrroot sb.setSDRROOT(os.environ['SDRROOT'])
def _prependToEnvVar(self, newVal, envVar): path = self._getEnvVarAsList(envVar) foundValue = False for entry in path: # Search to determine if the new value is already in the path try: if os.path.samefile(entry, newVal): # The value is already in the path foundValue = True break except OSError: # If we can't find concrete files to compare, fall back to string compare if entry == newVal: # The value is already in the path foundValue = True break if not foundValue: # The value does not already exist if os.environ.has_key(envVar): newpath = newVal+os.path.pathsep + os.getenv(envVar)+os.path.pathsep else: newpath = newVal+os.path.pathsep os.putenv(envVar, newpath) os.environ[envVar] = newpath
def principal_unit(): """Returns the principal unit of this unit, otherwise None""" # Juju 2.2 and above provides JUJU_PRINCIPAL_UNIT principal_unit = os.environ.get('JUJU_PRINCIPAL_UNIT', None) # If it's empty, then this unit is the principal if principal_unit == '': return os.environ['JUJU_UNIT_NAME'] elif principal_unit is not None: return principal_unit # For Juju 2.1 and below, let's try work out the principle unit by # the various charms' metadata.yaml. for reltype in relation_types(): for rid in relation_ids(reltype): for unit in related_units(rid): md = _metadata_unit(unit) if not md: continue subordinate = md.pop('subordinate', None) if not subordinate: return unit return None
def _get_user_provided_overrides(modules): """Load user-provided config overrides. :param modules: stack modules to lookup in user overrides yaml file. :returns: overrides dictionary. """ overrides = os.path.join(os.environ['JUJU_CHARM_DIR'], 'hardening.yaml') if os.path.exists(overrides): log("Found user-provided config overrides file '%s'" % (overrides), level=DEBUG) settings = yaml.safe_load(open(overrides)) if settings and settings.get(modules): log("Applying '%s' overrides" % (modules), level=DEBUG) return settings.get(modules) log("No overrides found for '%s'" % (modules), level=DEBUG) else: log("No hardening config overrides file '%s' found in charm " "root dir" % (overrides), level=DEBUG) return {}
def main(): parser = argparse.ArgumentParser() parser.add_argument('--weights', default="YOLO_small.ckpt", type=str) parser.add_argument('--data_dir', default="data", type=str) parser.add_argument('--threshold', default=0.2, type=float) parser.add_argument('--iou_threshold', default=0.5, type=float) parser.add_argument('--gpu', default='', type=str) args = parser.parse_args() if args.gpu is not None: cfg.GPU = args.gpu if args.data_dir != cfg.DATA_PATH: update_config_paths(args.data_dir, args.weights) os.environ['CUDA_VISIBLE_DEVICES'] = cfg.GPU yolo = YOLONet() pascal = pascal_voc('train') solver = Solver(yolo, pascal) print('Start training ...') solver.train() print('Done training.')
def main(): parser = argparse.ArgumentParser() parser.add_argument('--weights', default="YOLO_small.ckpt", type=str) parser.add_argument('--weight_dir', default='weights', type=str) parser.add_argument('--data_dir', default="data", type=str) parser.add_argument('--gpu', default='', type=str) args = parser.parse_args() os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu yolo = YOLONet(False) weight_file = os.path.join(args.data_dir, args.weight_dir, args.weights) detector = Detector(yolo, weight_file) # detect from camera # cap = cv2.VideoCapture(-1) # detector.camera_detector(cap) # detect from image file imname = 'test/person.jpg' detector.image_detector(imname)
def get_bcl2fastq_v2(hostname): try: subprocess.check_call(["which", "bcl2fastq"]) # Restore the LD_LIBRARY_PATH set aside by sourceme.bash/shell10x. # Required for some installations of bcl2fastq. new_environ = dict(os.environ) new_environ['LD_LIBRARY_PATH'] = os.environ.get('_TENX_LD_LIBRARY_PATH', '') output = subprocess.check_output(["bcl2fastq", "--version"], env=new_environ, stderr=subprocess.STDOUT) match = None for l in output.split("\n"): match = re.match("bcl2fastq v([0-9.]+)", l) if match is not None: return (match.groups()[0], None) return (None, "bcl2fastq version not recognized -- please check the output of bcl2fastq --version") except subprocess.CalledProcessError: msg = "On machine: %s, bcl2fastq not found on PATH." % hostname return (None, msg)
def __init__(self, additional_compose_file=None, additional_services=None): # To resolve docker client server version mismatch issue. os.environ["COMPOSE_API_VERSION"] = "auto" dir_name = os.path.split(os.getcwd())[-1] self.project = "{}{}".format( re.sub(r'[^a-z0-9]', '', dir_name.lower()), getpass.getuser() ) self.additional_compose_file = additional_compose_file self.services = ["zookeeper", "schematizer", "kafka"] if additional_services is not None: self.services.extend(additional_services) # This variable is meant to capture the running/not-running state of # the dependent testing containers when tests start running. The idea # is, we'll only start and stop containers if they aren't already # running. If they are running, we'll just use the ones that exist. # It takes a while to start all the containers, so when running lots of # tests, it's best to start them out-of-band and leave them up for the # duration of the session. self.containers_already_running = self._are_containers_already_running()
def __init__(self, indent_increment, max_help_position, width, short_first): self.parser = None self.indent_increment = indent_increment self.help_position = self.max_help_position = max_help_position if width is None: try: width = int(os.environ['COLUMNS']) except (KeyError, ValueError): width = 80 width -= 2 self.width = width self.current_indent = 0 self.level = 0 self.help_width = None # computed later self.short_first = short_first self.default_tag = "%default" self.option_strings = {} self._short_opt_fmt = "%s %s" self._long_opt_fmt = "%s=%s"
def _getuserbase(): env_base = os.environ.get("IRONPYTHONUSERBASE", None) def joinuser(*args): return os.path.expanduser(os.path.join(*args)) # what about 'os2emx', 'riscos' ? if os.name == "nt": base = os.environ.get("APPDATA") or "~" return env_base if env_base else joinuser(base, "Python") if sys.platform == "darwin": framework = get_config_var("PYTHONFRAMEWORK") if framework: return joinuser("~", "Library", framework, "%d.%d"%( sys.version_info[:2])) return env_base if env_base else joinuser("~", ".local")
def path(klass): return os.environ['PATH'].split(os.pathsep)
def __init__(self, config): if config.db == "mysql": pass # TODO: determine best production grade relational database to use elif config.db == "sqlite": self._dbConn = sqlite3.connect(os.environ["PIPELINES_DB"]) self._pipelinesDb = self._dbConn.cursor()
def editPipeline(args, config): pipelineDbUtils = PipelineDbUtils(config) request = json.loads(pipelineDbUtils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request) _, tmp = mkstemp() with open(tmp, 'w') as f: f.write("{data}".format(data=json.dumps(request, indent=4))) if "EDITOR" in os.environ.keys(): editor = os.environ["EDITOR"] else: editor = "/usr/bin/nano" if subprocess.call([editor, tmp]) == 0: with open(tmp, 'r') as f: request = json.load(f) pipelineDbUtils.updateJob(args.jobId, keyName="job_id", setValues={"request": json.dumps(request)}) else: print "ERROR: there was a problem editing the request" exit(-1)
def __init__(self, path=None): self.db_path = path if path is None: if 'UNIT_STATE_DB' in os.environ: self.db_path = os.environ['UNIT_STATE_DB'] else: self.db_path = os.path.join( os.environ.get('CHARM_DIR', ''), '.unit-state.db') self.conn = sqlite3.connect('%s' % self.db_path) self.cursor = self.conn.cursor() self.revision = None self._closed = False self._init()
def execution_environment(): """A convenient bundling of the current execution context""" context = {} context['conf'] = config() if relation_id(): context['reltype'] = relation_type() context['relid'] = relation_id() context['rel'] = relation_get() context['unit'] = local_unit() context['rels'] = relations() context['env'] = os.environ return context
def relation_type(): """The scope for the current relation hook""" return os.environ.get('JUJU_RELATION', None)
def relation_id(relation_name=None, service_or_unit=None): """The relation ID for the current or a specified relation""" if not relation_name and not service_or_unit: return os.environ.get('JUJU_RELATION_ID', None) elif relation_name and service_or_unit: service_name = service_or_unit.split('/')[0] for relid in relation_ids(relation_name): remote_service = remote_service_name(relid) if remote_service == service_name: return relid else: raise ValueError('Must specify neither or both of relation_name and service_or_unit')
def local_unit(): """Local unit ID""" return os.environ['JUJU_UNIT_NAME']
def hook_name(): """The name of the currently executing hook""" return os.environ.get('JUJU_HOOK_NAME', os.path.basename(sys.argv[0]))
def charm_dir(): """Return the root directory of the current charm""" return os.environ.get('CHARM_DIR')
def action_name(): """Get the name of the currently executing action.""" return os.environ.get('JUJU_ACTION_NAME')
def action_uuid(): """Get the UUID of the currently executing action.""" return os.environ.get('JUJU_ACTION_UUID')
def action_tag(): """Get the tag for the currently executing action.""" return os.environ.get('JUJU_ACTION_TAG')
def enable(soft_fail=False): """ Enable ufw :param soft_fail: If set to True silently disables IPv6 support in ufw, otherwise a UFWIPv6Error exception is raised when IP6 support is broken. :returns: True if ufw is successfully enabled """ if is_enabled(): return True if not is_ipv6_ok(soft_fail): disable_ipv6() output = subprocess.check_output(['ufw', 'enable'], universal_newlines=True, env={'LANG': 'en_US', 'PATH': os.environ['PATH']}) m = re.findall('^Firewall is active and enabled on system startup\n', output, re.M) hookenv.log(output, level='DEBUG') if len(m) == 0: hookenv.log("ufw couldn't be enabled", level='WARN') return False else: hookenv.log("ufw enabled", level='INFO') return True
def disable(): """ Disable ufw :returns: True if ufw is successfully disabled """ if not is_enabled(): return True output = subprocess.check_output(['ufw', 'disable'], universal_newlines=True, env={'LANG': 'en_US', 'PATH': os.environ['PATH']}) m = re.findall(r'^Firewall stopped and disabled on system startup\n', output, re.M) hookenv.log(output, level='DEBUG') if len(m) == 0: hookenv.log("ufw couldn't be disabled", level='WARN') return False else: hookenv.log("ufw disabled", level='INFO') return True
def default_policy(policy='deny', direction='incoming'): """ Changes the default policy for traffic `direction` :param policy: allow, deny or reject :param direction: traffic direction, possible values: incoming, outgoing, routed """ if policy not in ['allow', 'deny', 'reject']: raise UFWError(('Unknown policy %s, valid values: ' 'allow, deny, reject') % policy) if direction not in ['incoming', 'outgoing', 'routed']: raise UFWError(('Unknown direction %s, valid values: ' 'incoming, outgoing, routed') % direction) output = subprocess.check_output(['ufw', 'default', policy, direction], universal_newlines=True, env={'LANG': 'en_US', 'PATH': os.environ['PATH']}) hookenv.log(output, level='DEBUG') m = re.findall("^Default %s policy changed to '%s'\n" % (direction, policy), output, re.M) if len(m) == 0: hookenv.log("ufw couldn't change the default policy to %s for %s" % (policy, direction), level='WARN') return False else: hookenv.log("ufw default policy for %s changed to %s" % (direction, policy), level='INFO') return True
def default_execd_dir(): return os.path.join(os.environ['CHARM_DIR'], 'exec.d')
def run(): # REVISIT(ivc): current CNI implementation provided by this package is # experimental and its primary purpose is to enable development of other # components (e.g. functional tests, service/LBaaSv2 support) cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin)) args = ['--config-file', cni_conf.kuryr_conf] try: if cni_conf.debug: args.append('-d') except AttributeError: pass config.init(args) config.setup_logging() # Initialize o.vo registry. k_objects.register_locally_defined_vifs() os_vif.initialize() if CONF.cni_daemon.daemon_enabled: runner = cni_api.CNIDaemonizedRunner() else: runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin()) LOG.info("Using '%s' ", runner.__class__.__name__) def _timeout(signum, frame): runner._write_dict(sys.stdout, { 'msg': 'timeout', 'code': k_const.CNI_TIMEOUT_CODE, }) LOG.debug('timed out') sys.exit(1) signal.signal(signal.SIGALRM, _timeout) signal.alarm(_CNI_TIMEOUT) status = runner.run(os.environ, cni_conf, sys.stdout) LOG.debug("Exiting with status %s", status) if status: sys.exit(status)