Python os 模块,environ() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.environ()

项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_zipfile_timestamp():
    # An environment variable can be used to influence the timestamp on
    # TarInfo objects inside the zip.  See issue #143.  TemporaryDirectory is
    # not a context manager under Python 3.
    with temporary_directory() as tempdir:
        for filename in ('one', 'two', 'three'):
            path = os.path.join(tempdir, filename)
            with codecs.open(path, 'w', encoding='utf-8') as fp:
                fp.write(filename + '\n')
        zip_base_name = os.path.join(tempdir, 'dummy')
        # The earliest date representable in TarInfos, 1980-01-01
        with environ('SOURCE_DATE_EPOCH', '315576060'):
            zip_filename = wheel.archive.make_wheelfile_inner(
                zip_base_name, tempdir)
        with readable_zipfile(zip_filename) as zf:
            for info in zf.infolist():
                assert info.date_time[:3] == (1980, 1, 1)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _warn_unsafe_extraction_path(path):
        """
        If the default extraction path is overridden and set to an insecure
        location, such as /tmp, it opens up an opportunity for an attacker to
        replace an extracted file with an unauthorized payload. Warn the user
        if a known insecure location is used.

        See Distribute #375 for more details.
        """
        if os.name == 'nt' and not path.startswith(os.environ['windir']):
            # On Windows, permissions are generally restrictive by default
            #  and temp directories are not writable by other users, so
            #  bypass the warning.
            return
        mode = os.stat(path).st_mode
        if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
            msg = ("%s is writable by group/others and vulnerable to attack "
                "when "
                "used with get_resource_filename. Consider a more secure "
                "location (set with .set_extraction_path or the "
                "PYTHON_EGG_CACHE environment variable)." % path)
            warnings.warn(msg, UserWarning)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _pythonpath():
    items = os.environ.get('PYTHONPATH', '').split(os.pathsep)
    return filter(None, items)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def initialize (self, sadFile):
        self.app_cnt = 0
        if self.__timeout is not None:
            self.domMgr.configure([CF.DataType('COMPONENT_BINDING_TIMEOUT', to_any(self.__timeout))])

        try:
            self.domMgr.installApplication(sadFile)
        except CF.DomainManager.ApplicationAlreadyInstalled:
            pass
        domRoot = os.path.join(os.environ["SDRROOT"], "dom")
        sad = ossie.parsers.sad.parse(domRoot + sadFile)
        app_id = sad.get_id()
        for appFact in self.domMgr._get_applicationFactories():
            if appFact._get_identifier() == app_id:
                self.appFact = appFact
                return

        raise KeyError, "Couldn't find app factory"
项目:pyupdater-wx-demo    作者:wettenhj    | 项目源码 | 文件源码
def setUp(self):
        tempFile = tempfile.NamedTemporaryFile()
        self.fileServerDir = tempFile.name
        tempFile.close()
        os.mkdir(self.fileServerDir)
        os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
        privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
                                        encoding='base64')
        signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
                                    encoding='base64').decode()
        VERSIONS['signature'] = signature
        keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
        with gzip.open(keysFilePath, 'wb') as keysFile:
            keysFile.write(json.dumps(KEYS, sort_keys=True))
        versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
        with gzip.open(versionsFilePath, 'wb') as versionsFile:
            versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
        os.environ['WXUPDATEDEMO_TESTING'] = 'True'
        from wxupdatedemo.config import CLIENT_CONFIG
        self.clientConfig = CLIENT_CONFIG
        self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
项目:pyupdater-wx-demo    作者:wettenhj    | 项目源码 | 文件源码
def setUp(self):
        tempFile = tempfile.NamedTemporaryFile()
        self.fileServerDir = tempFile.name
        tempFile.close()
        os.mkdir(self.fileServerDir)
        os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
        privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
                                        encoding='base64')
        signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
                                    encoding='base64').decode()
        VERSIONS['signature'] = signature
        keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
        with gzip.open(keysFilePath, 'wb') as keysFile:
            keysFile.write(json.dumps(KEYS, sort_keys=True))
        versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
        with gzip.open(versionsFilePath, 'wb') as versionsFile:
            versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
        os.environ['WXUPDATEDEMO_TESTING'] = 'True'
        from wxupdatedemo.config import CLIENT_CONFIG
        self.clientConfig = CLIENT_CONFIG
        self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
        self.clientConfig.APP_NAME = APP_NAME
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _warn_unsafe_extraction_path(path):
        """
        If the default extraction path is overridden and set to an insecure
        location, such as /tmp, it opens up an opportunity for an attacker to
        replace an extracted file with an unauthorized payload. Warn the user
        if a known insecure location is used.

        See Distribute #375 for more details.
        """
        if os.name == 'nt' and not path.startswith(os.environ['windir']):
            # On Windows, permissions are generally restrictive by default
            #  and temp directories are not writable by other users, so
            #  bypass the warning.
            return
        mode = os.stat(path).st_mode
        if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
            msg = ("%s is writable by group/others and vulnerable to attack "
                "when "
                "used with get_resource_filename. Consider a more secure "
                "location (set with .set_extraction_path or the "
                "PYTHON_EGG_CACHE environment variable)." % path)
            warnings.warn(msg, UserWarning)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def paths_on_pythonpath(paths):
        """
        Add the indicated paths to the head of the PYTHONPATH environment
        variable so that subprocesses will also see the packages at
        these paths.

        Do this in a context that restores the value on exit.
        """
        nothing = object()
        orig_pythonpath = os.environ.get('PYTHONPATH', nothing)
        current_pythonpath = os.environ.get('PYTHONPATH', '')
        try:
            prefix = os.pathsep.join(paths)
            to_join = filter(None, [prefix, current_pythonpath])
            new_path = os.pathsep.join(to_join)
            if new_path:
                os.environ['PYTHONPATH'] = new_path
            yield
        finally:
            if orig_pythonpath is nothing:
                os.environ.pop('PYTHONPATH', None)
            else:
                os.environ['PYTHONPATH'] = orig_pythonpath
项目:environ_config    作者:hynek    | 项目源码 | 文件源码
def to_config(config_cls, environ=os.environ):
    if config_cls._prefix:
        app_prefix = (config_cls._prefix,)
    else:
        app_prefix = ()

    def default_get(environ, metadata, prefix, name):
        ce = metadata[CNF_KEY]
        if ce.name is not None:
            var = ce.name
        else:
            var = ("_".join(app_prefix + prefix + (name,))).upper()

        log.debug("looking for env var '%s'." % (var,))
        val = environ.get(var, ce.default)
        if val is RAISE:
            raise MissingEnvValueError(var)
        return val

    return _to_config(config_cls, default_get, environ, ())
项目:environ_config    作者:hynek    | 项目源码 | 文件源码
def _to_config(config_cls, default_get, environ, prefix):
    vals = {}
    for a in attr.fields(config_cls):
        try:
            ce = a.metadata[CNF_KEY]
        except KeyError:
            continue
        if ce.sub_cls is None:
            get = ce.callback or default_get
            val = get(environ, a.metadata, prefix, a.name)
        else:
            val = _to_config(
                ce.sub_cls, default_get, environ,
                prefix + ((a.name if prefix else a.name),)
            )

        vals[a.name] = val
    return config_cls(**vals)
项目:fuel-nailgun-extension-iac    作者:openstack    | 项目源码 | 文件源码
def get_by_cluster_id(self, cluster_id):
        instance = db().query(self.model).\
            filter(self.model.env_id == cluster_id).first()
        if instance is not None:
            try:
                instance.repo = Repo(os.path.join(const.REPOS_DIR,
                                                  instance.repo_name))
            except exc.NoSuchPathError:
                logger.debug("Repo folder does not exist. Cloning repo")
                self._create_key_file(instance.repo_name, instance.user_key)
                if instance.user_key:
                    os.environ['GIT_SSH'] = \
                        self._get_ssh_cmd(instance.repo_name)

                repo_path = os.path.join(const.REPOS_DIR, instance.repo_name)
                repo = Repo.clone_from(instance.git_url, repo_path)
                instance.repo = repo
        return instance
项目:fuel-nailgun-extension-iac    作者:openstack    | 项目源码 | 文件源码
def create(self, data):
        if not os.path.exists(const.REPOS_DIR):
            os.mkdir(const.REPOS_DIR)
        repo_path = os.path.join(const.REPOS_DIR, data['repo_name'])
        if os.path.exists(repo_path):
            logger.debug('Repo directory exists. Removing...')
            shutil.rmtree(repo_path)

        user_key = data.get('user_key', '')
        if user_key:
            self._create_key_file(data['repo_name'], user_key)
            os.environ['GIT_SSH'] = self._get_ssh_cmd(data['repo_name'])
        repo = Repo.clone_from(data['git_url'], repo_path)

        instance = super(GitRepo, self).create(data)
        instance.repo = repo
        return instance
项目:docklet    作者:unias    | 项目源码 | 文件源码
def post(self):
        if (request.form['username']):
            data = {"user": request.form['username'], "key": request.form['password']}
            result = dockletRequest.unauthorizedpost('/login/', data)
            ok = result and result.get('success', None)
            if (ok and (ok == "true")):
                # set cookie:docklet-jupyter-cookie for jupyter notebook
                resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
                app_key = os.environ['APP_KEY']
                resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(request.form['username'], app_key))
                # set session for docklet
                session['username'] = request.form['username']
                session['nickname'] = result['data']['nickname']
                session['description'] = result['data']['description']
                session['avatar'] = '/static/avatar/'+ result['data']['avatar']
                session['usergroup'] = result['data']['group']
                session['status'] = result['data']['status']
                session['token'] = result['data']['token']
                return resp
            else:
                return redirect('/login/')
        else:
            return redirect('/login/')
项目:docklet    作者:unias    | 项目源码 | 文件源码
def get(self):

        form = external_generate.external_auth_generate_request()
        result = dockletRequest.unauthorizedpost('/external_login/', form)
        ok = result and result.get('success', None)
        if (ok and (ok == "true")):
            # set cookie:docklet-jupyter-cookie for jupyter notebook
            resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
            app_key = os.environ['APP_KEY']
            resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key))
            # set session for docklet
            session['username'] = result['data']['username']
            session['nickname'] = result['data']['nickname']
            session['description'] = result['data']['description']
            session['avatar'] = '/static/avatar/'+ result['data']['avatar']
            session['usergroup'] = result['data']['group']
            session['status'] = result['data']['status']
            session['token'] = result['data']['token']
            return resp
        else:
            return redirect('/login/')
项目:docklet    作者:unias    | 项目源码 | 文件源码
def post(self):

        form = external_generate.external_auth_generate_request()
        result = dockletRequest.unauthorizedpost('/external_login/', form)
        ok = result and result.get('success', None)
        if (ok and (ok == "true")):
            # set cookie:docklet-jupyter-cookie for jupyter notebook
            resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
            app_key = os.environ['APP_KEY']
            resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key))
            # set session for docklet
            session['username'] = result['data']['username']
            session['nickname'] = result['data']['nickname']
            session['description'] = result['data']['description']
            session['avatar'] = '/static/avatar/'+ result['data']['avatar']
            session['usergroup'] = result['data']['group']
            session['status'] = result['data']['status']
            session['token'] = result['data']['token']
            return resp
        else:
            return redirect('/login/')
项目:pytest-selenium-pdiff    作者:rentlytics    | 项目源码 | 文件源码
def pytest_configure(config):
    settings['SCREENSHOTS_PATH'] = config.getoption('screenshots_path')
    settings['PDIFF_PATH'] = config.getoption('pdiff_path')
    settings['ALLOW_SCREENSHOT_CAPTURE'] = config.getoption('allow_screenshot_capture')

    if 'ALLOW_SCREENSHOT_CAPTURE' in os.environ:
        settings['ALLOW_SCREENSHOT_CAPTURE'] = True

    try:
        from sh import compare
        settings['USE_IMAGEMAGICK'] = True
    except ImportError:
        pass

    try:
        from sh import perceptualdiff
        settings['USE_PERCEPTUALDIFF'] = True
    except ImportError:
        pass
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _warn_unsafe_extraction_path(path):
        """
        If the default extraction path is overridden and set to an insecure
        location, such as /tmp, it opens up an opportunity for an attacker to
        replace an extracted file with an unauthorized payload. Warn the user
        if a known insecure location is used.

        See Distribute #375 for more details.
        """
        if os.name == 'nt' and not path.startswith(os.environ['windir']):
            # On Windows, permissions are generally restrictive by default
            #  and temp directories are not writable by other users, so
            #  bypass the warning.
            return
        mode = os.stat(path).st_mode
        if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
            msg = ("%s is writable by group/others and vulnerable to attack "
                "when "
                "used with get_resource_filename. Consider a more secure "
                "location (set with .set_extraction_path or the "
                "PYTHON_EGG_CACHE environment variable)." % path)
            warnings.warn(msg, UserWarning)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _warn_unsafe_extraction_path(path):
        """
        If the default extraction path is overridden and set to an insecure
        location, such as /tmp, it opens up an opportunity for an attacker to
        replace an extracted file with an unauthorized payload. Warn the user
        if a known insecure location is used.

        See Distribute #375 for more details.
        """
        if os.name == 'nt' and not path.startswith(os.environ['windir']):
            # On Windows, permissions are generally restrictive by default
            #  and temp directories are not writable by other users, so
            #  bypass the warning.
            return
        mode = os.stat(path).st_mode
        if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
            msg = (
                "%s is writable by group/others and vulnerable to attack "
                "when "
                "used with get_resource_filename. Consider a more secure "
                "location (set with .set_extraction_path or the "
                "PYTHON_EGG_CACHE environment variable)." % path
            )
            warnings.warn(msg, UserWarning)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def set_environ(env_name, value):
    """Set the environment variable 'env_name' to 'value'

    Save previous value, yield, and then restore the previous value stored in
    the environment variable 'env_name'.

    If 'value' is None, do nothing"""
    value_changed = value is not None
    if value_changed:
        old_value = os.environ.get(env_name)
        os.environ[env_name] = value
    try:
        yield
    finally:
        if value_changed:
            if old_value is None:
                del os.environ[env_name]
            else:
                os.environ[env_name] = old_value
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def paths_on_pythonpath(paths):
        """
        Add the indicated paths to the head of the PYTHONPATH environment
        variable so that subprocesses will also see the packages at
        these paths.

        Do this in a context that restores the value on exit.
        """
        nothing = object()
        orig_pythonpath = os.environ.get('PYTHONPATH', nothing)
        current_pythonpath = os.environ.get('PYTHONPATH', '')
        try:
            prefix = os.pathsep.join(paths)
            to_join = filter(None, [prefix, current_pythonpath])
            new_path = os.pathsep.join(to_join)
            if new_path:
                os.environ['PYTHONPATH'] = new_path
            yield
        finally:
            if orig_pythonpath is nothing:
                os.environ.pop('PYTHONPATH', None)
            else:
                os.environ['PYTHONPATH'] = orig_pythonpath
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def _test_Valgrind(self, valgrind):
        # Clear the device cache to prevent false positives
        deviceCacheDir = os.path.join(scatest.getSdrCache(), ".ExecutableDevice_node", "ExecutableDevice1")
        shutil.rmtree(deviceCacheDir, ignore_errors=True)

        os.environ['VALGRIND'] = valgrind
        try:
            # Checking that the node and device launch as expected
            nb, devMgr = self.launchDeviceManager("/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml")
        finally:
            del os.environ['VALGRIND']

        self.assertFalse(devMgr is None)
        self.assertEquals(len(devMgr._get_registeredDevices()), 1, msg='device failed to launch with valgrind')
        children = getChildren(nb.pid)
        self.assertEqual(len(children), 1)
        devMgr.shutdown()

        # Check that a valgrind logfile exists
        logfile = os.path.join(deviceCacheDir, 'valgrind.%s.log' % children[0])
        self.assertTrue(os.path.exists(logfile))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_setSDRROOT(self):
        # None type
        self.assertRaises(TypeError, sb.setSDRROOT, None)

        # Bad dir should not change root
        sdrroot = sb.getSDRROOT()
        self.assertRaises(AssertionError, sb.setSDRROOT, 'TEMP_PATH')
        self.assertEquals(sdrroot, sb.getSDRROOT())

        # Good dir with no dev/dom should not change root
        self.assertRaises(AssertionError, sb.setSDRROOT, 'jackhammer')
        self.assertEquals(sdrroot, sb.getSDRROOT())

        # New root
        sb.setSDRROOT('sdr')
        self.assertEquals(sb.getSDRROOT(), 'sdr')

        # Restore sdrroot
        sb.setSDRROOT(os.environ['SDRROOT'])
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def _prependToEnvVar(self, newVal, envVar):
        path = self._getEnvVarAsList(envVar)
        foundValue = False
        for entry in path:
            # Search to determine if the new value is already in the path
            try:
                if os.path.samefile(entry, newVal):
                    # The value is already in the path
                    foundValue = True
                    break
            except OSError:
                # If we can't find concrete files to compare, fall back to string compare
                if entry == newVal:
                    # The value is already in the path
                    foundValue = True
                    break

        if not foundValue:
            # The value does not already exist
            if os.environ.has_key(envVar):
                newpath = newVal+os.path.pathsep + os.getenv(envVar)+os.path.pathsep
            else:
                newpath = newVal+os.path.pathsep
            os.putenv(envVar, newpath)
            os.environ[envVar] = newpath
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def principal_unit():
    """Returns the principal unit of this unit, otherwise None"""
    # Juju 2.2 and above provides JUJU_PRINCIPAL_UNIT
    principal_unit = os.environ.get('JUJU_PRINCIPAL_UNIT', None)
    # If it's empty, then this unit is the principal
    if principal_unit == '':
        return os.environ['JUJU_UNIT_NAME']
    elif principal_unit is not None:
        return principal_unit
    # For Juju 2.1 and below, let's try work out the principle unit by
    # the various charms' metadata.yaml.
    for reltype in relation_types():
        for rid in relation_ids(reltype):
            for unit in related_units(rid):
                md = _metadata_unit(unit)
                if not md:
                    continue
                subordinate = md.pop('subordinate', None)
                if not subordinate:
                    return unit
    return None
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def principal_unit():
    """Returns the principal unit of this unit, otherwise None"""
    # Juju 2.2 and above provides JUJU_PRINCIPAL_UNIT
    principal_unit = os.environ.get('JUJU_PRINCIPAL_UNIT', None)
    # If it's empty, then this unit is the principal
    if principal_unit == '':
        return os.environ['JUJU_UNIT_NAME']
    elif principal_unit is not None:
        return principal_unit
    # For Juju 2.1 and below, let's try work out the principle unit by
    # the various charms' metadata.yaml.
    for reltype in relation_types():
        for rid in relation_ids(reltype):
            for unit in related_units(rid):
                md = _metadata_unit(unit)
                if not md:
                    continue
                subordinate = md.pop('subordinate', None)
                if not subordinate:
                    return unit
    return None
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _get_user_provided_overrides(modules):
    """Load user-provided config overrides.

    :param modules: stack modules to lookup in user overrides yaml file.
    :returns: overrides dictionary.
    """
    overrides = os.path.join(os.environ['JUJU_CHARM_DIR'],
                             'hardening.yaml')
    if os.path.exists(overrides):
        log("Found user-provided config overrides file '%s'" %
            (overrides), level=DEBUG)
        settings = yaml.safe_load(open(overrides))
        if settings and settings.get(modules):
            log("Applying '%s' overrides" % (modules), level=DEBUG)
            return settings.get(modules)

        log("No overrides found for '%s'" % (modules), level=DEBUG)
    else:
        log("No hardening config overrides file '%s' found in charm "
            "root dir" % (overrides), level=DEBUG)

    return {}
项目:yolo_tensorflow    作者:hizhangp    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--weights', default="YOLO_small.ckpt", type=str)
    parser.add_argument('--data_dir', default="data", type=str)
    parser.add_argument('--threshold', default=0.2, type=float)
    parser.add_argument('--iou_threshold', default=0.5, type=float)
    parser.add_argument('--gpu', default='', type=str)
    args = parser.parse_args()

    if args.gpu is not None:
        cfg.GPU = args.gpu

    if args.data_dir != cfg.DATA_PATH:
        update_config_paths(args.data_dir, args.weights)

    os.environ['CUDA_VISIBLE_DEVICES'] = cfg.GPU

    yolo = YOLONet()
    pascal = pascal_voc('train')

    solver = Solver(yolo, pascal)

    print('Start training ...')
    solver.train()
    print('Done training.')
项目:yolo_tensorflow    作者:hizhangp    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--weights', default="YOLO_small.ckpt", type=str)
    parser.add_argument('--weight_dir', default='weights', type=str)
    parser.add_argument('--data_dir', default="data", type=str)
    parser.add_argument('--gpu', default='', type=str)
    args = parser.parse_args()

    os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu

    yolo = YOLONet(False)
    weight_file = os.path.join(args.data_dir, args.weight_dir, args.weights)
    detector = Detector(yolo, weight_file)

    # detect from camera
    # cap = cv2.VideoCapture(-1)
    # detector.camera_detector(cap)

    # detect from image file
    imname = 'test/person.jpg'
    detector.image_detector(imname)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def get_bcl2fastq_v2(hostname):
    try:
        subprocess.check_call(["which", "bcl2fastq"])
        # Restore the LD_LIBRARY_PATH set aside by sourceme.bash/shell10x.
        # Required for some installations of bcl2fastq.
        new_environ = dict(os.environ)
        new_environ['LD_LIBRARY_PATH'] = os.environ.get('_TENX_LD_LIBRARY_PATH', '')
        output = subprocess.check_output(["bcl2fastq", "--version"], env=new_environ, stderr=subprocess.STDOUT)
        match = None
        for l in output.split("\n"):
            match = re.match("bcl2fastq v([0-9.]+)", l)
            if match is not None:
                return (match.groups()[0], None)

        return (None, "bcl2fastq version not recognized -- please check the output of bcl2fastq --version")
    except subprocess.CalledProcessError:
        msg = "On machine: %s, bcl2fastq not found on PATH." % hostname
        return (None, msg)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def __init__(self, additional_compose_file=None, additional_services=None):
        # To resolve docker client server version mismatch issue.
        os.environ["COMPOSE_API_VERSION"] = "auto"
        dir_name = os.path.split(os.getcwd())[-1]
        self.project = "{}{}".format(
            re.sub(r'[^a-z0-9]', '', dir_name.lower()),
            getpass.getuser()
        )
        self.additional_compose_file = additional_compose_file

        self.services = ["zookeeper", "schematizer", "kafka"]

        if additional_services is not None:
            self.services.extend(additional_services)

        # This variable is meant to capture the running/not-running state of
        # the dependent testing containers when tests start running.  The idea
        # is, we'll only start and stop containers if they aren't already
        # running.  If they are running, we'll just use the ones that exist.
        # It takes a while to start all the containers, so when running lots of
        # tests, it's best to start them out-of-band and leave them up for the
        # duration of the session.
        self.containers_already_running = self._are_containers_already_running()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self,
                 indent_increment,
                 max_help_position,
                 width,
                 short_first):
        self.parser = None
        self.indent_increment = indent_increment
        self.help_position = self.max_help_position = max_help_position
        if width is None:
            try:
                width = int(os.environ['COLUMNS'])
            except (KeyError, ValueError):
                width = 80
            width -= 2
        self.width = width
        self.current_indent = 0
        self.level = 0
        self.help_width = None          # computed later
        self.short_first = short_first
        self.default_tag = "%default"
        self.option_strings = {}
        self._short_opt_fmt = "%s %s"
        self._long_opt_fmt = "%s=%s"
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _getuserbase():
    env_base = os.environ.get("IRONPYTHONUSERBASE", None)
    def joinuser(*args):
        return os.path.expanduser(os.path.join(*args))

    # what about 'os2emx', 'riscos' ?
    if os.name == "nt":
        base = os.environ.get("APPDATA") or "~"
        return env_base if env_base else joinuser(base, "Python")

    if sys.platform == "darwin":
        framework = get_config_var("PYTHONFRAMEWORK")
        if framework:
            return joinuser("~", "Library", framework, "%d.%d"%(
                sys.version_info[:2]))

    return env_base if env_base else joinuser("~", ".local")
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def path(klass):
        return os.environ['PATH'].split(os.pathsep)
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def __init__(self, config):
        if config.db == "mysql":
            pass  # TODO: determine best production grade relational database to use

        elif config.db == "sqlite":
            self._dbConn = sqlite3.connect(os.environ["PIPELINES_DB"])

        self._pipelinesDb = self._dbConn.cursor()
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def editPipeline(args, config):
        pipelineDbUtils = PipelineDbUtils(config)

        request = json.loads(pipelineDbUtils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)

        _, tmp = mkstemp()
        with open(tmp, 'w') as f:
            f.write("{data}".format(data=json.dumps(request, indent=4)))

        if "EDITOR" in os.environ.keys():
            editor = os.environ["EDITOR"]
        else:
            editor = "/usr/bin/nano"

        if subprocess.call([editor, tmp]) == 0:
            with open(tmp, 'r') as f:
                request = json.load(f)

            pipelineDbUtils.updateJob(args.jobId, keyName="job_id", setValues={"request": json.dumps(request)})
        else:
            print "ERROR: there was a problem editing the request"
            exit(-1)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def __init__(self, path=None):
        self.db_path = path
        if path is None:
            if 'UNIT_STATE_DB' in os.environ:
                self.db_path = os.environ['UNIT_STATE_DB']
            else:
                self.db_path = os.path.join(
                    os.environ.get('CHARM_DIR', ''), '.unit-state.db')
        self.conn = sqlite3.connect('%s' % self.db_path)
        self.cursor = self.conn.cursor()
        self.revision = None
        self._closed = False
        self._init()
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def execution_environment():
    """A convenient bundling of the current execution context"""
    context = {}
    context['conf'] = config()
    if relation_id():
        context['reltype'] = relation_type()
        context['relid'] = relation_id()
        context['rel'] = relation_get()
    context['unit'] = local_unit()
    context['rels'] = relations()
    context['env'] = os.environ
    return context
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def relation_type():
    """The scope for the current relation hook"""
    return os.environ.get('JUJU_RELATION', None)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def relation_id(relation_name=None, service_or_unit=None):
    """The relation ID for the current or a specified relation"""
    if not relation_name and not service_or_unit:
        return os.environ.get('JUJU_RELATION_ID', None)
    elif relation_name and service_or_unit:
        service_name = service_or_unit.split('/')[0]
        for relid in relation_ids(relation_name):
            remote_service = remote_service_name(relid)
            if remote_service == service_name:
                return relid
    else:
        raise ValueError('Must specify neither or both of relation_name and service_or_unit')
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def local_unit():
    """Local unit ID"""
    return os.environ['JUJU_UNIT_NAME']
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def hook_name():
    """The name of the currently executing hook"""
    return os.environ.get('JUJU_HOOK_NAME', os.path.basename(sys.argv[0]))
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def charm_dir():
    """Return the root directory of the current charm"""
    return os.environ.get('CHARM_DIR')
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def action_name():
    """Get the name of the currently executing action."""
    return os.environ.get('JUJU_ACTION_NAME')
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def action_uuid():
    """Get the UUID of the currently executing action."""
    return os.environ.get('JUJU_ACTION_UUID')
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def action_tag():
    """Get the tag for the currently executing action."""
    return os.environ.get('JUJU_ACTION_TAG')
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def enable(soft_fail=False):
    """
    Enable ufw

    :param soft_fail: If set to True silently disables IPv6 support in ufw,
                      otherwise a UFWIPv6Error exception is raised when IP6
                      support is broken.
    :returns: True if ufw is successfully enabled
    """
    if is_enabled():
        return True

    if not is_ipv6_ok(soft_fail):
        disable_ipv6()

    output = subprocess.check_output(['ufw', 'enable'],
                                     universal_newlines=True,
                                     env={'LANG': 'en_US',
                                          'PATH': os.environ['PATH']})

    m = re.findall('^Firewall is active and enabled on system startup\n',
                   output, re.M)
    hookenv.log(output, level='DEBUG')

    if len(m) == 0:
        hookenv.log("ufw couldn't be enabled", level='WARN')
        return False
    else:
        hookenv.log("ufw enabled", level='INFO')
        return True
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def disable():
    """
    Disable ufw

    :returns: True if ufw is successfully disabled
    """
    if not is_enabled():
        return True

    output = subprocess.check_output(['ufw', 'disable'],
                                     universal_newlines=True,
                                     env={'LANG': 'en_US',
                                          'PATH': os.environ['PATH']})

    m = re.findall(r'^Firewall stopped and disabled on system startup\n',
                   output, re.M)
    hookenv.log(output, level='DEBUG')

    if len(m) == 0:
        hookenv.log("ufw couldn't be disabled", level='WARN')
        return False
    else:
        hookenv.log("ufw disabled", level='INFO')
        return True
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def default_policy(policy='deny', direction='incoming'):
    """
    Changes the default policy for traffic `direction`

    :param policy: allow, deny or reject
    :param direction: traffic direction, possible values: incoming, outgoing,
                      routed
    """
    if policy not in ['allow', 'deny', 'reject']:
        raise UFWError(('Unknown policy %s, valid values: '
                        'allow, deny, reject') % policy)

    if direction not in ['incoming', 'outgoing', 'routed']:
        raise UFWError(('Unknown direction %s, valid values: '
                        'incoming, outgoing, routed') % direction)

    output = subprocess.check_output(['ufw', 'default', policy, direction],
                                     universal_newlines=True,
                                     env={'LANG': 'en_US',
                                          'PATH': os.environ['PATH']})
    hookenv.log(output, level='DEBUG')

    m = re.findall("^Default %s policy changed to '%s'\n" % (direction,
                                                             policy),
                   output, re.M)
    if len(m) == 0:
        hookenv.log("ufw couldn't change the default policy to %s for %s"
                    % (policy, direction), level='WARN')
        return False
    else:
        hookenv.log("ufw default policy for %s changed to %s"
                    % (direction, policy), level='INFO')
        return True
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def default_execd_dir():
    return os.path.join(os.environ['CHARM_DIR'], 'exec.d')
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def run():
    # REVISIT(ivc): current CNI implementation provided by this package is
    # experimental and its primary purpose is to enable development of other
    # components (e.g. functional tests, service/LBaaSv2 support)
    cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin))
    args = ['--config-file', cni_conf.kuryr_conf]

    try:
        if cni_conf.debug:
            args.append('-d')
    except AttributeError:
        pass
    config.init(args)
    config.setup_logging()

    # Initialize o.vo registry.
    k_objects.register_locally_defined_vifs()
    os_vif.initialize()

    if CONF.cni_daemon.daemon_enabled:
        runner = cni_api.CNIDaemonizedRunner()
    else:
        runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin())
    LOG.info("Using '%s' ", runner.__class__.__name__)

    def _timeout(signum, frame):
        runner._write_dict(sys.stdout, {
            'msg': 'timeout',
            'code': k_const.CNI_TIMEOUT_CODE,
        })
        LOG.debug('timed out')
        sys.exit(1)

    signal.signal(signal.SIGALRM, _timeout)
    signal.alarm(_CNI_TIMEOUT)
    status = runner.run(os.environ, cni_conf, sys.stdout)
    LOG.debug("Exiting with status %s", status)
    if status:
        sys.exit(status)