Python os 模块,getenv() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.getenv()

项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def session_id(self):
        """A unique session ID every time the user uses the workflow.

        .. versionadded:: 1.25

        The session ID persists while the user is using this workflow.
        It expires when the user runs a different workflow or closes
        Alfred.

        """
        if not self._session_id:
            sid = os.getenv('_WF_SESSION_ID')
            if not sid:
                from uuid import uuid4
                sid = uuid4().hex
                self.setvar('_WF_SESSION_ID', sid)

            self._session_id = sid

        return self._session_id
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def site_config_dirs(appname):
    """Return a list of potential user-shared config dirs for this application.

        "appname" is the name of application.

    Typical user config directories are:
        macOS:      /Library/Application Support/<AppName>/
        Unix:       /etc or $XDG_CONFIG_DIRS[i]/<AppName>/ for each value in
                    $XDG_CONFIG_DIRS
        Win XP:     C:\Documents and Settings\All Users\Application ...
                    ...Data\<AppName>\
        Vista:      (Fail! "C:\ProgramData" is a hidden *system* directory
                    on Vista.)
        Win 7:      Hidden, but writeable on Win 7:
                    C:\ProgramData\<AppName>\
    """
    if WINDOWS:
        path = os.path.normpath(_get_win_folder("CSIDL_COMMON_APPDATA"))
        pathlist = [os.path.join(path, appname)]
    elif sys.platform == 'darwin':
        pathlist = [os.path.join('/Library/Application Support', appname)]
    else:
        # try looking in $XDG_CONFIG_DIRS
        xdg_config_dirs = os.getenv('XDG_CONFIG_DIRS', '/etc/xdg')
        if xdg_config_dirs:
            pathlist = [
                os.path.join(expanduser(x), appname)
                for x in xdg_config_dirs.split(os.pathsep)
            ]
        else:
            pathlist = []

        # always look in /etc directly as well
        pathlist.append('/etc')

    return pathlist


# -- Windows support functions --
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _candidate_tempdir_list():
    """Generate a list of candidate temporary directories which
    _get_default_tempdir will try."""

    dirlist = []

    # First, try the environment.
    for envname in 'TMPDIR', 'TEMP', 'TMP':
        dirname = _os.getenv(envname)
        if dirname: dirlist.append(dirname)

    # Failing that, try OS-specific locations.
    if _os.name == 'nt':
        dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
    else:
        dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])

    # As a last resort, the current directory.
    try:
        dirlist.append(_os.getcwd())
    except (AttributeError, OSError):
        dirlist.append(_os.curdir)

    return dirlist
项目:ibus-replace-with-kanji    作者:esrille    | 项目源码 | 文件源码
def __load_layout(self, config):
        var = config.get_value('engine/replace-with-kanji-python', 'layout')
        if var is None or var.get_type_string() != 's':
            path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'layouts')
            path = os.path.join(path, 'roomazi.json')
            if var:
                config.unset('engine/replace-with-kanji-python', 'layout')
        else:
            path = var.get_string()
        logger.info("layout: %s", path)
        layout = roomazi.layout     # Use 'roomazi' as default
        try:
            with open(path) as f:
                layout = json.load(f)
        except ValueError as error:
            logger.error("JSON error: %s", error)
        except OSError as error:
            logger.error("Error: %s", error)
        except:
            logger.error("Unexpected error: %s %s", sys.exc_info()[0], sys.exc_info()[1])
        self.__to_kana = self.__handle_roomazi_layout
        if 'Type' in layout:
            if layout['Type'] == 'Kana':
                self.__to_kana = self.__handle_kana_layout
        return layout
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __init__(self):
        if (os.getenv("LD_LIBRARY_PATH")):
            self.ld_lib_path = os.getenv("LD_LIBRARY_PATH")
        else:
            self.ld_lib_path = ''
        if (os.getenv("PYTHONPATH")):
            self.pythonpath = os.getenv("PYTHONPATH")
        else:
            self.pythonpath = ''
        if (os.getenv("CLASSPATH")):
            self.classpath = os.getenv("CLASSPATH")
        else:
            self.classpath = ''
        if (os.getenv("OCTAVE_PATH")):
            self.octave_path = os.getenv("OCTAVE_PATH")
        else:
            self.octave_path = ''
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __enter__(self):
        if (os.getenv("LD_LIBRARY_PATH")):
            self.ld_lib_path = os.getenv("LD_LIBRARY_PATH")
        else:
            self.ld_lib_path = ''
        if (os.getenv("PYTHONPATH")):
            self.pythonpath = os.getenv("PYTHONPATH")
        else:
            self.pythonpath = ''
        if (os.getenv("CLASSPATH")):
            self.classpath = os.getenv("CLASSPATH")
        else:
            self.classpath = ''
        if (os.getenv("OCTAVE_PATH")):
            self.octave_path = os.getenv("OCTAVE_PATH")
        else:
            self.octave_path = ''
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def _prependToEnvVar(self, newVal, envVar):
        path = self._getEnvVarAsList(envVar)
        foundValue = False
        for entry in path:
            # Search to determine if the new value is already in the path
            try:
                if os.path.samefile(entry, newVal):
                    # The value is already in the path
                    foundValue = True
                    break
            except OSError:
                # If we can't find concrete files to compare, fall back to string compare
                if entry == newVal:
                    # The value is already in the path
                    foundValue = True
                    break

        if not foundValue:
            # The value does not already exist
            if os.environ.has_key(envVar):
                newpath = newVal+os.path.pathsep + os.getenv(envVar)+os.path.pathsep
            else:
                newpath = newVal+os.path.pathsep
            os.putenv(envVar, newpath)
            os.environ[envVar] = newpath
项目:kubernaut    作者:datawire    | 项目源码 | 文件源码
def create_kubeconfig_var_message(path):
    msg = """Set your KUBECONFIG environment variable to use kubectl"""

    shell = os.getenv("SHELL", "").lower()
    if "/bash" in shell or "/zsh" in shell:
        msg += """

        export KUBECONFIG={0}
        """
    elif "/fish" in shell:
        msg += """ 

        set -g -x KUBECONFIG {0}
        """
    else:
        msg += ". Unable to detect shell therefore assuming a Bash-compatible shell"

        msg += """    

        export KUBECONFIG={0}
        """

    return msg.format(path).lstrip()
项目:libbuild    作者:appscode    | 项目源码 | 文件源码
def resolve_nested_variables(values):
    def _replacement(name):
        """
        get appropiate value for a variable name.
        first search in environ, if not found,
        then look into the dotenv variables
        """
        ret = os.getenv(name, values.get(name, ""))
        return ret

    def _re_sub_callback(match_object):
        """
        From a match object gets the variable name and returns
        the correct replacement
        """
        return _replacement(match_object.group()[2:-1])

    for k, v in values.items():
        values[k] = __posix_variable.sub(_re_sub_callback, v)

    return values
项目:games_nebula    作者:yancharkin    | 项目源码 | 文件源码
def goglib_get_games_list():

    proc = subprocess.Popen(['lgogdownloader', '--exclude', \
    '1,2,4,8,16,32', '--list-details'],stdout=subprocess.PIPE)
    games_detailed_list = proc.stdout.readlines()
    stdoutdata, stderrdata = proc.communicate()

    if proc.returncode == 0:

        file_path = os.getenv('HOME') + '/.games_nebula/config/games_list'

        games_list_file = open(file_path, 'w')

        for line in games_detailed_list:
            if 'Getting game info' not in line:
                games_list_file.write(line)

        return 0

    else:

        return 1
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def __init__(self):
        """Setup the Runner for all Foremast modules."""
        debug_flag()

        self.email = os.getenv("EMAIL")
        self.env = os.getenv("ENV")
        self.group = os.getenv("PROJECT")
        self.region = os.getenv("REGION")
        self.repo = os.getenv("GIT_REPO")
        self.runway_dir = os.getenv("RUNWAY_DIR")
        self.artifact_path = os.getenv("ARTIFACT_PATH")
        self.artifact_version = os.getenv("ARTIFACT_VERSION")
        self.promote_stage = os.getenv("PROMOTE_STAGE", "latest")

        self.git_project = "{}/{}".format(self.group, self.repo)
        parsed = gogoutils.Parser(self.git_project)
        generated = gogoutils.Generator(*parsed.parse_url(), formats=consts.APP_FORMATS)

        self.app = generated.app_name()
        self.trigger_job = generated.jenkins()['name']
        self.git_short = generated.gitlab()['main']

        self.raw_path = "./raw.properties"
        self.json_path = self.raw_path + ".json"
        self.configs = None
项目:mechanic    作者:server-mechanic    | 项目源码 | 文件源码
def __init__(self, mode):
    self.mode = mode
    self.mechanicRootDir = getenv("MECHANIC_ROOT_DIR", "")
    if mode != "USER":
      self.configFile = "${MECHANIC_ROOT_DIR}/etc/mechanic.conf"
      self.logFile = ""
      self.migrationDirs = ["${MECHANIC_ROOT_DIR}/etc/mechanic/migration.d", "${MECHANIC_ROOT_DIR}/var/lib/mechanic/migration.d"]
      self.preMigrationDirs = ["${MECHANIC_ROOT_DIR}/etc/mechanic/pre-migration.d", "${MECHANIC_ROOT_DIR}/var/lib/mechanic/pre-migration.d"]
      self.postMigrationDirs = ["${MECHANIC_ROOT_DIR}/etc/mechanic/post-migration.d", "${MECHANIC_ROOT_DIR}/var/lib/mechanic/post-migration.d"]
      self.stateDir = "${MECHANIC_ROOT_DIR}/var/lib/mechanic/state"
      self.runDir = "${MECHANIC_ROOT_DIR}/var/lib/mechanic/tmp"
    else:
      self.configFile = "${HOME}/.mechanic/mechanic.conf"
      self.logFile = "stderr"
      self.migrationDirs = ["${HOME}/.mechanic/migration.d"]
      self.preMigrationDirs = ["${HOME}/.mechanic/pre-migration.d"]
      self.postMigrationDirs = ["${HOME}/.mechanic/post-migration.d"]
      self.stateDir = "${HOME}/.mechanic/state"
      self.runDir = "${HOME}/.mechanic/tmp"
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _get_well_known_file():
    """Get the well known file produced by command 'gcloud auth login'."""
    # TODO(orestica): Revisit this method once gcloud provides a better way
    # of pinpointing the exact location of the file.
    default_config_dir = os.getenv(_CLOUDSDK_CONFIG_ENV_VAR)
    if default_config_dir is None:
        if os.name == 'nt':
            try:
                default_config_dir = os.path.join(os.environ['APPDATA'],
                                                  _CLOUDSDK_CONFIG_DIRECTORY)
            except KeyError:
                # This should never happen unless someone is really
                # messing with things.
                drive = os.environ.get('SystemDrive', 'C:')
                default_config_dir = os.path.join(drive, '\\',
                                                  _CLOUDSDK_CONFIG_DIRECTORY)
        else:
            default_config_dir = os.path.join(os.path.expanduser('~'),
                                              '.config',
                                              _CLOUDSDK_CONFIG_DIRECTORY)

    return os.path.join(default_config_dir, _WELL_KNOWN_CREDENTIALS_FILE)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _SendRecv():
    """Communicate with the Developer Shell server socket."""

    port = int(os.getenv(DEVSHELL_ENV, 0))
    if port == 0:
        raise NoDevshellServer()

    sock = socket.socket()
    sock.connect(('localhost', port))

    data = CREDENTIAL_INFO_REQUEST_JSON
    msg = '%s\n%s' % (len(data), data)
    sock.sendall(_to_bytes(msg, encoding='utf-8'))

    header = sock.recv(6).decode()
    if '\n' not in header:
        raise CommunicationError('saw no newline in the first 6 bytes')
    len_str, json_str = header.split('\n', 1)
    to_read = int(len_str) - len(json_str)
    if to_read > 0:
        json_str += sock.recv(to_read, socket.MSG_WAITALL).decode()

    return CredentialInfoResponse(json_str)
项目:anonymine    作者:oskar-skog    | 项目源码 | 文件源码
def find_EXECUTABLES(Makefile, flags):
    '''
    See the doc-string for find_prefix as well.

    Set Makefile['EXECUTABLES'] if needed to.
    Depends (directly) on $(gamesdir) and $(bindir).
    Depends (indirectly) on $(prefix).
    '''
    if 'EXECUTABLES' not in Makefile:
        acceptable = os.getenv('PATH').split(':')
        for exec_dir in ('gamesdir', 'bindir'):
            if expand(exec_dir, Makefile) in acceptable:
                Makefile['EXECUTABLES'] = '$('+exec_dir+')'
                return False
        else:
            return True
    else:
        return False
项目:metadataproxy    作者:lyft    | 项目源码 | 文件源码
def bool_env(var_name, default=False):
    """
    Get an environment variable coerced to a boolean value.
    Example:
        Bash:
            $ export SOME_VAL=True
        settings.py:
            SOME_VAL = bool_env('SOME_VAL', False)
    Arguments:
        var_name: The name of the environment variable.
        default: The default to use if `var_name` is not specified in the
                 environment.
    Returns: `var_name` or `default` coerced to a boolean using the following
        rules:
            "False", "false" or "" => False
            Any other non-empty string => True
    """
    test_val = getenv(var_name, default)
    # Explicitly check for 'False', 'false', and '0' since all non-empty
    # string are normally coerced to True.
    if test_val in ('False', 'false', '0'):
        return False
    return bool(test_val)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_in_flight_is_one(self):
        """
        Verify that in_flight value stays equal to one while doing multiple inserts.
        The number of inserts can be set through INSERTS_ITERATIONS environmental variable.
        Default value is 1000000.
        """
        prepared = self.session.prepare("INSERT INTO race (x) VALUES (?)")
        iterations = int(os.getenv("INSERT_ITERATIONS", 1000000))
        i = 0
        leaking_connections = False
        while i < iterations and not leaking_connections:
            bound = prepared.bind((i,))
            self.session.execute(bound)
            for pool in self.session._pools.values():
                if leaking_connections:
                    break
                for conn in pool.get_connections():
                    if conn.in_flight > 1:
                        print(self.session.get_pool_state())
                        leaking_connections = True
                        break
            i = i + 1

        self.assertFalse(leaking_connections, 'Detected leaking connection after %s iterations' % i)
项目:open-nti    作者:Juniper    | 项目源码 | 文件源码
def teardown_module(module):
    global c
    global OPENNTI_C
    global OPENNTI_IN_JTI_C
    global OPENNTI_IN_LOG_C

    # Delete all files in /tests/output/
    if not os.getenv('TRAVIS'):

        c.stop(container=OPENNTI_C)
        c.remove_container(container=OPENNTI_C)

        c.stop(container=OPENNTI_IN_JTI_C)
        c.remove_container(container=OPENNTI_IN_JTI_C)

        c.stop(container=OPENNTI_IN_LOG_C)
        c.remove_container(container=OPENNTI_IN_LOG_C)

        c.stop(container=TCP_REPLAY_C)
        c.remove_container(container=TCP_REPLAY_C)
项目:PowerMeter-Reader    作者:lucab85    | 项目源码 | 文件源码
def setup_logging(self, default_path=PATH_LOGGING, default_level=logging.INFO, env_key='LOG_CFG'):
        path = default_path
        self.logconf = None
        value = os.getenv(env_key, None)
        if value:
            path = value
        if os.path.exists(path):
            with open(path, 'rt') as f:
                config = json.load(f)
            self.logconf = logging.config.dictConfig(config)
        elif os.path.exists(path.replace("../", "")):
            with open(path.replace("../", ""), 'rt') as f:
                config = json.load(f)
                self._changePath(config["handlers"])
            self.logconf = logging.config.dictConfig(config)
        else:
            print("Configurazione log non trovata (\"%s\"): applico le impostazioni predefinite" % path)
            self.logconf = logging.basicConfig(level=default_level)
项目:girder_worker    作者:girder    | 项目源码 | 文件源码
def add_nvidia_docker_to_config(container_config):
    if not container_config.get('HostConfig', None):
        container_config['HostConfig'] = {}

    nvidia_config = get_nvidia_configuration()

    # Setup the Volumes
    container_config['HostConfig'].setdefault('VolumeDriver', nvidia_config['VolumeDriver'])
    container_config['HostConfig'].setdefault('Binds', [])
    container_config['HostConfig']['Binds'].extend(nvidia_config['Volumes'])

    # Get nvidia control devices
    devices = container_config['HostConfig'].get('Devices', [])
    # suport both '0 1' and '0, 1' formats, just like nvidia-docker
    gpu_isolation = os.getenv('NV_GPU', '').replace(',', ' ').split()
    pattern = re.compile(r'/nvidia([0-9]+)$')
    for device in nvidia_config['Devices']:
        if gpu_isolation:
            card_number = pattern.search(device)
            if card_number and card_number.group(1) not in gpu_isolation:
                continue
        devices.extend(parse_devices([device]))

    container_config['HostConfig']['Devices'] = devices
项目:SublimeTerm    作者:percevalw    | 项目源码 | 文件源码
def keep_reading(self):
        """Output thread method for the process

        Sends the process output to the ViewController (through OutputTranscoder)
        """
        while True:
            if self.stop:
                break
            ret = self.process.poll()
            if ret is not None:
                self.stop = True
            readable, writable, executable = select.select([self.master], [], [], 5)
            if readable:
                """ We read the new content """
                data = os.read(self.master, 1024)
                text = data.decode('UTF-8', errors='replace')
                log_debug("RAW", repr(text))
                log_debug("PID", os.getenv('BASHPID'))
                self.output_transcoder.decode(text)
            #                log_debug("{} >> {}".format(int(time.time()), repr(text)))
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def copy_nrpe_checks():
    """
    Copy the nrpe checks into place

    """
    NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
    nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
                                  'charmhelpers', 'contrib', 'openstack',
                                  'files')

    if not os.path.exists(NAGIOS_PLUGINS):
        os.makedirs(NAGIOS_PLUGINS)
    for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
        if os.path.isfile(fname):
            shutil.copy2(fname,
                         os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def copy_nrpe_checks():
    """
    Copy the nrpe checks into place

    """
    NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
    nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
                                  'charmhelpers', 'contrib', 'openstack',
                                  'files')

    if not os.path.exists(NAGIOS_PLUGINS):
        os.makedirs(NAGIOS_PLUGINS)
    for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
        if os.path.isfile(fname):
            shutil.copy2(fname,
                         os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def copy_nrpe_checks():
    """
    Copy the nrpe checks into place

    """
    NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
    nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
                                  'charmhelpers', 'contrib', 'openstack',
                                  'files')

    if not os.path.exists(NAGIOS_PLUGINS):
        os.makedirs(NAGIOS_PLUGINS)
    for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
        if os.path.isfile(fname):
            shutil.copy2(fname,
                         os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
项目:bob.bio.base    作者:bioidiap    | 项目源码 | 文件源码
def indices(list_to_split, number_of_parallel_jobs, task_id=None):
  """This function returns the first and last index for the files for the current job ID.
     If no job id is set (e.g., because a sub-job is executed locally), it simply returns all indices."""

  if number_of_parallel_jobs is None or number_of_parallel_jobs == 1:
    return None

  # test if the 'SEG_TASK_ID' environment is set
  sge_task_id = os.getenv('SGE_TASK_ID') if task_id is None else task_id
  if sge_task_id is None:
    # task id is not set, so this function is not called from a grid job
    # hence, we process the whole list
    return (0,len(list_to_split))
  else:
    job_id = int(sge_task_id) - 1
    # compute number of files to be executed
    number_of_objects_per_job = int(math.ceil(float(len(list_to_split) / float(number_of_parallel_jobs))))
    start = job_id * number_of_objects_per_job
    end = min((job_id + 1) * number_of_objects_per_job, len(list_to_split))
    return (start, end)
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def copy_nrpe_checks():
    """
    Copy the nrpe checks into place

    """
    NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
    nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
                                  'charmhelpers', 'contrib', 'openstack',
                                  'files')

    if not os.path.exists(NAGIOS_PLUGINS):
        os.makedirs(NAGIOS_PLUGINS)
    for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
        if os.path.isfile(fname):
            shutil.copy2(fname,
                         os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
项目:PTE    作者:pwn2winctf    | 项目源码 | 文件源码
def setup_environment():
    root = os.getenv('LAMBDA_TASK_ROOT')
    bin_dir = os.path.join(root, 'bin')
    os.environ['PATH'] += ':' + bin_dir
    os.environ['GIT_EXEC_PATH'] = bin_dir

    ssh_dir = tempfile.mkdtemp()

    ssh_identity = os.path.join(ssh_dir, 'identity')
    with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600),
                   'w') as f:
        f.write(base64.b64decode(os.getenv('SSH_IDENTITY')))

    ssh_config = os.path.join(ssh_dir, 'config')
    with open(ssh_config, 'w') as f:
        f.write('CheckHostIP no\n'
                'StrictHostKeyChecking yes\n'
                'IdentityFile %s\n'
                'UserKnownHostsFile %s\n' %
                (ssh_identity, os.path.join(root, 'known_hosts')))

    os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
项目:pgbadger-rds-cron    作者:CanopyTax    | 项目源码 | 文件源码
def run():
    get_log_states()
    db_name = os.getenv('DB_NAME') or \
              raiser(ValueError('DB_NAME is required'))
    bucket = os.getenv('S3_BUCKET') or \
             raiser(ValueError('S3_BUCKET is required'))
    region = os.getenv('REGION', 'us-west-2')
    key = os.getenv('S3_KEY', 'pgbadger/')
    try:
        files = download_log_files(db_name)
        sync_s3(bucket, key)
        run_pgbadger(files)
        sync_s3(bucket, key, upload=True)
        # upload_to_s3(bucket, key, region)
    except Exception as e:
        traceback.print_exc()
    finally:
        save_log_states()
项目:galaxia    作者:WiproOpenSourcePractice    | 项目源码 | 文件源码
def list(self):
        self.parser.add_argument('--unit-type', help='Type of unit valid value\
                                                     is docker', required=True)
        self.parser.add_argument('--search-type', help='search type', required=False)
        self.parser.add_argument('--search-string', help='search string', required=False)
        args = self.parser.parse_args()
        unit_type = vars(args)['unit_type']
        search_type = vars(args)['search_type']
        search_string = vars(args)['search_string']
        data = {'unit_type': unit_type, 'search_type': search_type, 'search_string': search_string}
        galaxia_api_endpoint = os.getenv("galaxia_api_endpoint")
        target_url = client.concatenate_url(galaxia_api_endpoint,
                                            self.catalogue_uri)
        resp = client.http_request('GET', target_url, self.headers, data)
        if unit_type == 'container':
            format_print.format_dict(resp.json(), "keys")
        if unit_type == 'dashboard':
            format_print.format_dict(resp.json(), "keys")
        if unit_type == 'exporter':
            header = ["EXPORTER_NAME", "EXPORTER_ID"]
            format_print.format_dict(resp.json(), header)
        if unit_type == 'node':
            header = ["Instance_Name", "Host_Name"]
            format_print.format_dict(resp.json(), header)
项目:galaxia    作者:WiproOpenSourcePractice    | 项目源码 | 文件源码
def create(self):
        self.parser.add_argument('--source-system', help='Source system',
                                 required=True)
        self.parser.add_argument('--target-system', help='Target system',
                                 required=True)
        self.parser.add_argument('--metrics-list', help='List of metrics to\
                                                        export', required=True)
        self.parser.add_argument('--time-interval', help='Time interval\
                                        in which to push metrics to target\
                                                         system', required=True)
        self.parser.add_argument('--unit-type', help='Type of unit valid value\
                                                     is docker', required=True)
        self.parser.add_argument('--exporter-name', help='Unique Name for\
                                                    exporter', required=True)

        args = self.parser.parse_args()
        json_data = client.create_request_data(**vars(args))
        galaxia_api_endpoint = os.getenv("galaxia_api_endpoint")
        target_url = client.concatenate_url(galaxia_api_endpoint,
                                            self.exporter_uri)
        resp = client.http_request('POST', target_url, self.headers,
                                   json_data)
        print resp.text
项目:galaxia    作者:WiproOpenSourcePractice    | 项目源码 | 文件源码
def list(self):
        resp = None
        self.parser.add_argument('--type', help="Type of unit valid values are\
                                containers, nodes", required=True)
        args = self.parser.parse_args()
        unit_type = vars(args)['type']
        data = {"sub_type": unit_type}
        galaxia_api_endpoint = os.getenv("galaxia_api_endpoint")
        target_url = client.concatenate_url(galaxia_api_endpoint, self.metrics_uri)
        try:
            resp = client.http_request('GET', target_url, self.headers, data)
            headers = ["NAME", "DESCRIPTION"]
            print "List of supported metrics for "+unit_type
            format_print.format_dict(resp.json(), headers)
        except Exception as ex:
            pass
项目:galaxia    作者:WiproOpenSourcePractice    | 项目源码 | 文件源码
def sample(self):
        resp = None
        self.parser.add_argument('--type', help="Type of unit valid values are\
                                containers, nodes", required=True)
        self.parser.add_argument('--search-string', help='Search String', required=False)
        self.parser.add_argument('--search-type', help='Search String', required=False)
        self.parser.add_argument('--meter-name', help='Name of the meter', required=True)
        args = self.parser.parse_args()
        data = {"type": vars(args)['type'], "search_string": vars(args)['search_string'],
                "search_type": vars(args)['search_type'] , "meter_name": vars(args)['meter_name']}
        galaxia_api_endpoint = os.getenv("galaxia_api_endpoint")
        target_url = client.concatenate_url(galaxia_api_endpoint, self.sample_uri)
        try:
            resp = client.http_request('GET', target_url, self.headers, data)
            headers = ["NAME", "VALUE"]
            print "Current "+ vars(args)['meter_name']
            #print "Current "+unit_type
            format_print.format_dict(resp.json(), headers)
        except Exception as ex:
            pass
项目:galaxia    作者:WiproOpenSourcePractice    | 项目源码 | 文件源码
def create(self):
        self.parser.add_argument('--name', help='Name of the dashboard', required=True)
        self.parser.add_argument('--metrics-list', nargs='+')#help='List of \
#                                metrics to be displayed on the dashboard',
 #                                required=True)
        self.parser.add_argument('--names-list', help='Names list of \
                                units to plot in dashboard')
        self.parser.add_argument('--search-string', help='Search String')
        self.parser.add_argument('--search-type', help='Search String')
        self.parser.add_argument('--unit-type', help='Type of unit, valid value is docker')
        self.parser.add_argument('--exclude', help='Search excluding search string', required=False)
        args = self.parser.parse_args()
        if not (args.names_list or (args.search_string and args.search_type)):
            self.parser.error('add --names-list or (--search-string and --search-type)')

        json_data = client.create_request_data(**vars(args))
        print json_data
        galaxia_api_endpoint = os.getenv("galaxia_api_endpoint")
        target_url = client.concatenate_url(galaxia_api_endpoint, self.url)
        try:
            resp = client.http_request('PUT', target_url, self.headers, json_data)
            print resp.text
        except Exception as ex:
            pass
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def oldest_peer(peers):
    """Determines who the oldest peer is by comparing unit numbers."""
    local_unit_no = int(os.getenv('JUJU_UNIT_NAME').split('/')[1])
    for peer in peers:
        remote_unit_no = int(peer.split('/')[1])
        if remote_unit_no < local_unit_no:
            return False
    return True
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def get_chrome_path():
    if win_client():
        PathName = os.getenv('localappdata') + '\\Google\\Chrome\\User Data\\Default\\'
        if (os.path.isdir(PathName) == False):
            return "[!] Chrome Doesn't exists", False
    if osx_client():
        PathName = os.getenv('HOME') + "/Library/Application Support/Google/Chrome/Default/"
        if (os.path.isdir(PathName) == False):
            return "[!] Chrome Doesn't exists", False
    if lnx_client():
        PathName = os.getenv('HOME') + '/.config/google-chrome/Default/'
        if (os.path.isdir(PathName) == False):
            return "[!] Chrome Doesn't exists", False
    return PathName, True
项目:MPIS    作者:KernelPanicBlog    | 项目源码 | 文件源码
def __init__(self):
        self.__path_home = os.getenv("HOME")
        self.__path_config = self.__path_home + "/.config/mpis"
        self.__path_file = "/usr/share/mpis"
        self.__path_tr = os.path.join(self.__path_config, "locale")
        self.__path_db = os.path.join(self.__path_config, "db")
项目:almond-nnparser    作者:Stanford-Mobisocial-IoT-Lab    | 项目源码 | 文件源码
def init_from_url(self, snapshot=-1, thingpedia_url=None):
        if thingpedia_url is None:
            thingpedia_url = os.getenv('THINGPEDIA_URL', 'https://thingpedia.stanford.edu/thingpedia')
        ssl_context = ssl.create_default_context()

        with urllib.request.urlopen(thingpedia_url + '/api/snapshot/' + str(snapshot) + '?meta=1', context=ssl_context) as res:
            self._process_devices(json.load(res)['data'])

        with urllib.request.urlopen(thingpedia_url + '/api/entities?snapshot=' + str(snapshot), context=ssl_context) as res:
            self._process_entities(json.load(res)['data'])
项目:almond-nnparser    作者:Stanford-Mobisocial-IoT-Lab    | 项目源码 | 文件源码
def get_thingpedia(input_words, workdir, snapshot):
    thingpedia_url = os.getenv('THINGPEDIA_URL', 'https://thingpedia.stanford.edu/thingpedia')

    output = dict()
    with urllib.request.urlopen(thingpedia_url + '/api/snapshot/' + str(snapshot) + '?meta=1', context=ssl_context) as res:
        output['devices'] = json.load(res)['data']
        for device in output['devices']:
            if device['kind_type'] == 'global':
                continue
            if device['kind_canonical']:
                add_words(input_words, device['kind_canonical'])
            else:
                print('WARNING: missing canonical for tt-device:%s' % (device['kind'],))
            for function_type in ('triggers', 'queries', 'actions'):
                for function_name, function in device[function_type].items():
                    if not function['canonical']:
                        print('WARNING: missing canonical for tt:%s.%s' % (device['kind'], function_name))
                    else:
                        add_words(input_words, function['canonical'])
                    for argname, argcanonical in zip(function['args'], function['argcanonicals']):
                        if argcanonical:
                            add_words(input_words, argcanonical)
                        else:
                            add_words(input_words, clean(argname))
                    for argtype in function['schema']:
                        if not argtype.startswith('Enum('):
                            continue
                        enum_entries = argtype[len('Enum('):-1].split(',')
                        for enum_value in enum_entries:
                            add_words(input_words, clean(enum_value))

    with urllib.request.urlopen(thingpedia_url + '/api/entities?snapshot=' + str(snapshot), context=ssl_context) as res:
        output['entities'] = json.load(res)['data']
        for entity in output['entities']:
            if entity['is_well_known'] == 1:
                continue
            add_words(input_words, tokenize(entity['name']))

    with open(os.path.join(workdir, 'thingpedia.json'), 'w') as fp:
        json.dump(output, fp, indent=2)
项目:almond-nnparser    作者:Stanford-Mobisocial-IoT-Lab    | 项目源码 | 文件源码
def main():
    np.random.seed(1234)

    workdir = sys.argv[1]
    if len(sys.argv) > 2:
        snapshot = int(sys.argv[2])
    else:
        snapshot = -1
    if len(sys.argv) > 3:
        embed_size = int(sys.argv[3])
    else:
        embed_size = 300

    dataset = os.getenv('DATASET', workdir)
    glove = os.getenv('GLOVE', os.path.join(workdir, 'glove.42B.300d.txt'))

    download_glove(glove)

    input_words = set()
    # add the canonical words for the builtin functions
    add_words(input_words, 'now nothing notify return the event')

    create_dictionary(input_words, dataset)
    get_thingpedia(input_words, workdir, snapshot)
    save_dictionary(input_words, workdir)
    trim_embeddings(input_words, workdir, embed_size, glove)
项目:change-acs-password    作者:nertwork    | 项目源码 | 文件源码
def read_config():
    config = ConfigParser()
    config.read([path.join(BASE_DIR, 'settings.ini'), os.getenv('CONF_FILE', '')])

    return config
项目:devops-playground    作者:jerrywardlow    | 项目源码 | 文件源码
def read_environment(self):
        ''' Reads the settings from environment variables '''
        # Setup credentials
        if os.getenv("DO_API_TOKEN"):
            self.api_token = os.getenv("DO_API_TOKEN")
        if os.getenv("DO_API_KEY"):
            self.api_token = os.getenv("DO_API_KEY")
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def user_config_dir(appname=None, appauthor=None, version=None, roaming=False):
    r"""Return full path to the user-specific config dir for this application.

        "appname" is the name of application.
            If None, just the system directory is returned.
        "appauthor" (only used on Windows) is the name of the
            appauthor or distributing body for this application. Typically
            it is the owning company name. This falls back to appname. You may
            pass False to disable it.
        "version" is an optional version path element to append to the
            path. You might want to use this if you want multiple versions
            of your app to be able to run independently. If used, this
            would typically be "<major>.<minor>".
            Only applied when appname is present.
        "roaming" (boolean, default False) can be set True to use the Windows
            roaming appdata directory. That means that for users on a Windows
            network setup for roaming profiles, this user data will be
            sync'd on login. See
            <http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
            for a discussion of issues.

    Typical user data directories are:
        macOS:                  same as user_data_dir
        Unix:                   ~/.config/<AppName>     # or in $XDG_CONFIG_HOME, if defined
        Win *:                  same as user_data_dir

    For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME.
    That means, by deafult "~/.config/<AppName>".
    """
    if system in ["win32", "darwin"]:
        path = user_data_dir(appname, appauthor, None, roaming)
    else:
        path = os.getenv('XDG_CONFIG_HOME', os.path.expanduser("~/.config"))
        if appname:
            path = os.path.join(path, appname)
    if appname and version:
        path = os.path.join(path, version)
    return path
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def user_config_dir(appname, roaming=True):
    """Return full path to the user-specific config dir for this application.

        "appname" is the name of application.
            If None, just the system directory is returned.
        "roaming" (boolean, default True) can be set False to not use the
            Windows roaming appdata directory. That means that for users on a
            Windows network setup for roaming profiles, this user data will be
            sync'd on login. See
            <http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
            for a discussion of issues.

    Typical user data directories are:
        macOS:                  same as user_data_dir
        Unix:                   ~/.config/<AppName>
        Win *:                  same as user_data_dir

    For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME.
    That means, by default "~/.config/<AppName>".
    """
    if WINDOWS:
        path = user_data_dir(appname, roaming=roaming)
    elif sys.platform == "darwin":
        path = user_data_dir(appname)
    else:
        path = os.getenv('XDG_CONFIG_HOME', expanduser("~/.config"))
        path = os.path.join(path, appname)

    return path


# for the discussion regarding site_config_dirs locations
# see <https://github.com/pypa/pip/issues/1733>
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def user_config_dir(appname=None, appauthor=None, version=None, roaming=False):
    r"""Return full path to the user-specific config dir for this application.

        "appname" is the name of application.
            If None, just the system directory is returned.
        "appauthor" (only used on Windows) is the name of the
            appauthor or distributing body for this application. Typically
            it is the owning company name. This falls back to appname. You may
            pass False to disable it.
        "version" is an optional version path element to append to the
            path. You might want to use this if you want multiple versions
            of your app to be able to run independently. If used, this
            would typically be "<major>.<minor>".
            Only applied when appname is present.
        "roaming" (boolean, default False) can be set True to use the Windows
            roaming appdata directory. That means that for users on a Windows
            network setup for roaming profiles, this user data will be
            sync'd on login. See
            <http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
            for a discussion of issues.

    Typical user data directories are:
        Mac OS X:               same as user_data_dir
        Unix:                   ~/.config/<AppName>     # or in $XDG_CONFIG_HOME, if defined
        Win *:                  same as user_data_dir

    For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME.
    That means, by deafult "~/.config/<AppName>".
    """
    if system in ["win32", "darwin"]:
        path = user_data_dir(appname, appauthor, None, roaming)
    else:
        path = os.getenv('XDG_CONFIG_HOME', os.path.expanduser("~/.config"))
        if appname:
            path = os.path.join(path, appname)
    if appname and version:
        path = os.path.join(path, version)
    return path
项目:kiteHistory    作者:mr-karan    | 项目源码 | 文件源码
def check_for_tokens():
    '''
    Checks for token present in system environment. To set them, export them
    in your ~/.bashrc or ~/.zshrc
    '''
    log.debug('Checking for tokens')

    kite_api_key = getenv('KITE_API_KEY')
    kite_request_token = getenv('KITE_REQUEST_TOKEN')
    kite_secret = getenv('KITE_SECRET')

    # Get your request token from the first time
    # kite.trade/connect/login?api_key=<>

    log.debug("Tokens fetched: {} {} ".format(kite_api_key,
                                              kite_secret,))

    if kite_api_key is None or kite_secret is None:
        print('''
            You need to add your Kite API token,
            along with Secret Key. \n
            export KITE_API_KEY='your-kite-api-key'
            export KITE_SECRET='your-kite-secret-key'
            \n
            You can fetch it from here : https://developers.kite.trade/apps
        ''')
        return False

    log.debug("Kite Request Token: {}".format(kite_request_token))

    if kite_request_token is None:
        print('''
            Set your request token.
            You can do this by setting environment variables: \n
            export KITE_REQUEST_TOKEN='your-kite-request-token' \n
            Generate request token from 
            https://kite.trade/connect/login?api_key=<>
            ''')
        return False
    return True
项目:ibus-replace-with-kanji    作者:esrille    | 项目源码 | 文件源码
def __load_dictionary(self, config):
        var = config.get_value('engine/replace-with-kanji-python', 'dictionary')
        if var is None or var.get_type_string() != 's':
            path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'restrained.dic')
            if var:
                config.unset('engine/replace-with-kanji-python', 'dictionary')
        else:
            path = var.get_string()
        return Dictionary(path)
项目:ibus-replace-with-kanji    作者:esrille    | 项目源码 | 文件源码
def __init__(self, path):
        logger.info("Dictionary(%s)", path)

        self.__dict_base = {}
        self.__dict = {}

        self.__yomi = ''
        self.__no = 0
        self.__cand = []
        self.__numeric = ''
        self.__dirty = False

        self.__orders_path = ''

        # Load Katakana dictionary first so that Katakana words come after Kanji words.
        katakana_path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'katakana.dic')
        self.__load_dict(self.__dict_base, katakana_path)

        # Load system dictionary
        self.__load_dict(self.__dict_base, path)

        # Load private dictionary
        self.__dict = self.__dict_base.copy()

        my_path = os.path.expanduser('~/.local/share/ibus-replace-with-kanji/my.dic')
        self.__load_dict(self.__dict, my_path, 'a+')

        base = os.path.basename(path)
        if base:
            self.__orders_path = os.path.expanduser('~/.local/share/ibus-replace-with-kanji')
            self.__orders_path = os.path.join(self.__orders_path, base)
            self.__load_dict(self.__dict, self.__orders_path, 'a+', version_checked=False)
项目:foodAId    作者:Pregnor    | 项目源码 | 文件源码
def __init__(self):
        '''Constructor.'''

        logging.basicConfig()
        random.seed()

        self._slack_bot_token = os.getenv('SLACK_BOT_TOKEN')
        self._slack = slackclient.SlackClient(self._slack_bot_token)

        self._slack_bot_id = self._get_bot_id()
        self._at_bot = '<@' + str(self._slack_bot_id) + '>'

        self._scraper = scraper.Scraper()

        self._polls = {}

        self.is_running = True
        self._reaction_interval = 1
        self._keywords = [
            'belly',
            'bite',
            'eat',
            'food',
            'lunch',
            'meal',
            'menu',
            'offer',
            'stomach'
        ]

        self._thread = threading.Thread(None, self._loop_messages)
        self._thread.start()
项目:spyking-circus    作者:spyking-circus    | 项目源码 | 文件源码
def gather_mpi_arguments(hostfile, params):
    from mpi4py import MPI
    vendor = MPI.get_vendor()
    print_and_log(['MPI detected: %s' % str(vendor)], 'debug', logger)
    if vendor[0] == 'Open MPI':
        mpi_args = ['mpirun']
        if os.getenv('LD_LIBRARY_PATH'):
            mpi_args += ['-x', 'LD_LIBRARY_PATH']
        if os.getenv('PATH'):
            mpi_args += ['-x', 'PATH']
        if os.getenv('PYTHONPATH'):
            mpi_args += ['-x', 'PYTHONPATH']
        if os.path.exists(hostfile):
            mpi_args += ['-hostfile', hostfile]
    elif vendor[0] == 'Microsoft MPI':
        mpi_args = ['mpiexec']
        if os.path.exists(hostfile):
            mpi_args += ['-machinefile', hostfile]
    elif vendor[0] == 'MPICH2':
        mpi_args = ['mpiexec']
        if os.path.exists(hostfile):
            mpi_args += ['-f', hostfile]
    elif vendor[0] == 'MPICH':
        mpi_args = ['mpiexec']
        if os.path.exists(hostfile):
            mpi_args += ['-f', hostfile]
    else:
        print_and_log([
                        '%s may not be yet properly implemented: contact developpers' %
                        vendor[0]], 'error', logger)
        mpi_args = ['mpirun']
        if os.path.exists(hostfile):
            mpi_args += ['-hostfile', hostfile]
    return mpi_args
项目:Dockerfiles    作者:appscode    | 项目源码 | 文件源码
def resolve_nested_variables(values):
    def _replacement(name):
        """
        get appropiate value for a variable name.
        first search in environ, if not found,
        then look into the dotenv variables
        """
        ret = os.getenv(name, values.get(name, ""))
        return ret

    def _re_sub_callback(match_object):
        """
        From a match object gets the variable name and returns
        the correct replacement
        """
        return _replacement(match_object.group()[2:-1])

    for k, v in values.items():
        values[k] = __posix_variable.sub(_re_sub_callback, v)

    return values