Python json 模块,get() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用json.get()

项目:kapsel    作者:conda    | 项目源码 | 文件源码
def resolve_env_to_prefix(name_or_prefix):
    """Convert an env name or path into a canonical prefix path.

    Returns:
        Absolute path of prefix or None if it isn't found.
    """
    if os.path.isabs(name_or_prefix):
        return name_or_prefix

    json = info()
    root_prefix = json.get('root_prefix', None)
    if name_or_prefix == 'root':
        return root_prefix

    envs = json.get('envs', [])
    for prefix in envs:
        if os.path.basename(prefix) == name_or_prefix:
            return prefix
    return None
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def parse_spec(spec):
    """Parse a package name and version spec as conda would.

    Returns:
       ``ParsedSpec`` or None on failure
    """
    m = _spec_pat.match(spec)
    if m is None:
        return None
    pip_constraint = m.group('pc')
    if pip_constraint is not None:
        pip_constraint = pip_constraint.replace(' ', '')
    return ParsedSpec(name=m.group('name').lower(), conda_constraint=m.group('cc'), pip_constraint=pip_constraint)

# these are in order of preference. On pre-4.1.4 Windows,
# CONDA_PREFIX and CONDA_ENV_PATH aren't set, so we get to
# CONDA_DEFAULT_ENV.
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def environ_set_prefix(environ, prefix, varname=conda_prefix_variable()):
    prefix = os.path.normpath(prefix)
    environ[varname] = prefix
    if varname != 'CONDA_DEFAULT_ENV':
        # This case matters on both Unix and Windows
        # with conda >= 4.1.4 since requirement.env_var
        # is CONDA_PREFIX, and matters on Unix only pre-4.1.4
        # when requirement.env_var is CONDA_ENV_PATH.
        global _envs_dirs
        global _root_dir
        if _envs_dirs is None:
            i = info()
            _envs_dirs = [os.path.normpath(d) for d in i.get('envs_dirs', [])]
            _root_dir = os.path.normpath(i.get('root_prefix'))
        if prefix == _root_dir:
            name = 'root'
        else:
            for d in _envs_dirs:
                name = subdirectory_relative_to_directory(prefix, d)
                if name != prefix:
                    break
        environ['CONDA_DEFAULT_ENV'] = name
项目:dockercloud-cli    作者:docker    | 项目源码 | 文件源码
def node_byo():
    token = ""
    try:
        if dockercloud.namespace:
            json = dockercloud.api.http.send_request("POST", "api/infra/%s/%s/token" %
                                                     (API_VERSION, dockercloud.namespace))
        else:
            json = dockercloud.api.http.send_request("POST", "api/infra/%s/token" % API_VERSION)
        if json:
            token = json.get("token", "")
    except Exception as e:
        print(e, file=sys.stderr)
        sys.exit(EXCEPTION_EXIT_CODE)

    print("Docker Cloud lets you use your own servers as nodes to run containers. "
          "For this you have to install our agent.")
    print("Run the following command on your server:")
    print()
    print("\tcurl -Ls https://get.cloud.docker.com/ | sudo -H sh -s", token)
    print()
项目:internet-content-detection    作者:liubo0621    | 项目源码 | 文件源码
def record_sync_status(total, status, message, data, sync_type):
    delete_count = 'null'
    worn_clues_list = []
    update_count = 'null'
    save_count = 'null'

    if data:
        delete_count = data.get("deleteNum")
        worn_clues_list = data.get("wrongIdList")
        update_count = data.get("updateNum")
        save_count = data.get("saveNum")

    sql = "insert into TAB_IOPM_CLUES_SYNC_INFO (id, total, status, message, delete_count, update_count, save_count, worn_clues_list, sync_type) values (%s, %s, %s, '%s', %s, %s, %s, '%s', %s) "%('seq_iopm_sync.nextval', total, status, message, delete_count, update_count, save_count, ','.join(worn_clues_list), sync_type)

    print(sql)
    return db.add(sql)
项目:ugc.aggregator    作者:Dreamcatcher-GIS    | 项目源码 | 文件源码
def geocode(address):
    """
    ????geocoding????????????
    :param address:???????
    :return:
    """
    geocoding = {'s': 'rsv3',
                 'key': key,
                 'city': '??',
                 'address': address}
    res = requests.get(
        "http://restapi.amap.com/v3/geocode/geo", params=geocoding)
    if res.status_code == 200:
        json = res.json()
        status = json.get('status')
        count = json.get('count')
        if status == '1' and int(count) >= 1:
            geocodes = json.get('geocodes')[0]
            lng = float(geocodes.get('location').split(',')[0])
            lat = float(geocodes.get('location').split(',')[1])
            return [lng, lat]
        else:
            return None
    else:
        return None
项目:alexa-apple-calendar    作者:zanderxyz    | 项目源码 | 文件源码
def refresh_client(self, from_dt=None, to_dt=None):
        """
        Refreshes the CalendarService endpoint, ensuring that the
        event data is up-to-date. If no 'from_dt' or 'to_dt' datetimes
        have been given, the range becomes this month.
        """
        today = datetime.today()
        first_day, last_day = monthrange(today.year, today.month)
        if not from_dt:
            from_dt = datetime(today.year, today.month, first_day)
        if not to_dt:
            to_dt = datetime(today.year, today.month, last_day)
        params = dict(self.params)
        params.update({
            'lang': 'en-us',
            'usertz': get_localzone().zone,
            'startDate': from_dt.strftime('%Y-%m-%d'),
            'endDate': to_dt.strftime('%Y-%m-%d')
        })
        req = self.session.get(self._calendar_refresh_url, params=params)
        self.response = req.json()
项目:house-of-spirits    作者:OnlyInAmerica    | 项目源码 | 文件源码
def lights():
    """
    example_lights_json = {
        'rooms': [
            {'name': 'Living Room', 'on': True},
        ]
    }
    """
    if is_local_request(flask.request):
        json = flask.request.get_json()
        rooms = json.get('rooms', [])
        logger.info('Switching rooms %s', rooms)
        for json_room in rooms:
            room_name = json_room['name']
            on_state = json_room['on']

            for room in ROOMS:
                # Strip whitespace
                if room.name == room_name:
                    logger.info('Switching room %s', room.name)
                    room.switch(on_state)
        return "Light commands sent."
    else:
        logger.info('Lights accessed by remote address %s', flask.request.environ['REMOTE_ADDR'])
        flask.abort(404)
项目:geojson-python-utils    作者:brandonxiang    | 项目源码 | 文件源码
def geocode(address):
    """
    ????geocoding????????????
    :param address:???????
    :return:
    """
    geocoding = {'s': 'rsv3',
                 'key': key,
                 'city': '??',
                 'address': address}
    res = requests.get(
        "http://restapi.amap.com/v3/geocode/geo", params=geocoding)
    if res.status_code == 200:
        json = res.json()
        status = json.get('status')
        count = json.get('count')
        if status == '1' and int(count) >= 1:
            geocodes = json.get('geocodes')[0]
            lng = float(geocodes.get('location').split(',')[0])
            lat = float(geocodes.get('location').split(',')[1])
            return [lng, lat]
        else:
            return None
    else:
        return None
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def check_authorization(self, access_token):
        """Check an authorization created by a registered application.

        OAuth applications can use this method to check token validity
        without hitting normal rate limits because of failed login attempts.
        If the token is valid, it will return True, otherwise it will return
        False.

        :returns: bool
        """
        p = self.session.params
        auth = (p.get('client_id'), p.get('client_secret'))
        if access_token and auth:
            url = self._build_url('applications', str(auth[0]), 'tokens',
                                  str(access_token))
            resp = self._get(url, auth=auth, params={
                'client_id': None, 'client_secret': None
            })
            return self._boolean(resp, 200, 404)
        return False
项目:anaconda-project    作者:Anaconda-Platform    | 项目源码 | 文件源码
def resolve_env_to_prefix(name_or_prefix):
    """Convert an env name or path into a canonical prefix path.

    Returns:
        Absolute path of prefix or None if it isn't found.
    """
    if os.path.isabs(name_or_prefix):
        return name_or_prefix

    json = info()
    root_prefix = json.get('root_prefix', None)
    if name_or_prefix == 'root':
        return root_prefix

    envs = json.get('envs', [])
    for prefix in envs:
        if os.path.basename(prefix) == name_or_prefix:
            return prefix
    return None
项目:anaconda-project    作者:Anaconda-Platform    | 项目源码 | 文件源码
def environ_set_prefix(environ, prefix, varname=conda_prefix_variable()):
    prefix = os.path.normpath(prefix)
    environ[varname] = prefix
    if varname != 'CONDA_DEFAULT_ENV':
        # This case matters on both Unix and Windows
        # with conda >= 4.1.4 since requirement.env_var
        # is CONDA_PREFIX, and matters on Unix only pre-4.1.4
        # when requirement.env_var is CONDA_ENV_PATH.
        global _envs_dirs
        global _root_dir
        if _envs_dirs is None:
            i = info()
            _envs_dirs = [os.path.normpath(d) for d in i.get('envs_dirs', [])]
            _root_dir = os.path.normpath(i.get('root_prefix'))
        if prefix == _root_dir:
            name = 'root'
        else:
            for d in _envs_dirs:
                name = subdirectory_relative_to_directory(prefix, d)
                if name != prefix:
                    break
        environ['CONDA_DEFAULT_ENV'] = name

# This isn't all (e.g. leaves out arm, power).  it's sort of "all
# that people typically publish for"
项目:PyPardot4    作者:mneedham91    | 项目源码 | 文件源码
def get(self, object_name, path=None, params=None, retries=0):
        """
        Makes a GET request to the API. Checks for invalid requests that raise PardotAPIErrors. If the API key is
        invalid, one re-authentication request is made, in case the key has simply expired. If no errors are raised,
        returns either the JSON response, or if no JSON was returned, returns the HTTP response status code.
        """
        if params is None:
            params = {}
        params.update({'user_key': self.user_key, 'api_key': self.api_key, 'format': 'json'})
        try:
            self._check_auth(object_name=object_name)
            request = requests.get(self._full_path(object_name, path), params=params)
            response = self._check_response(request)
            return response
        except PardotAPIError as err:
            if err.message == 'Invalid API key or user key':
                response = self._handle_expired_api_key(err, retries, 'get', object_name, path, params)
                return response
            else:
                raise err
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def environ_get_prefix(environ):
    for name in _all_prefix_variables:
        if name in environ:
            return environ.get(name)
    return None
项目:alexa-apple-calendar    作者:zanderxyz    | 项目源码 | 文件源码
def request(self, *args, **kwargs):
        response = super(Session, self).request(*args, **kwargs)

        content_type = response.headers.get('Content-Type', '').split(';')[0]
        json_mimetypes = ['application/json', 'text/json']
        if content_type not in json_mimetypes:
            return response

        try:
            json = response.json()
        except:
            return response

        reason = json.get('errorMessage') or json.get('reason')
        if not reason and isinstance(json.get('error'), six.string_types):
            reason = json.get('error')
        if not reason and not response.ok:
            reason = response.reason
        if not reason and json.get('error'):
            reason = "Unknown reason"

        code = json.get('errorCode')

        if reason:
            raise Exception

        return response
项目:alexa-apple-calendar    作者:zanderxyz    | 项目源码 | 文件源码
def _get_cookiejar_path(self):
        # Get path for cookiejar file
        return os.path.join(
            self._cookie_directory,
            ''.join([c for c in self.user.get('apple_id') if match(r'\w', c)])
        )
项目:alexa-apple-calendar    作者:zanderxyz    | 项目源码 | 文件源码
def __unicode__(self):
        return 'iCloud API: %s' % self.user.get('apple_id')
项目:alexa-apple-calendar    作者:zanderxyz    | 项目源码 | 文件源码
def get_event_detail(self, pguid, guid):
        """
        Fetches a single event's details by specifying a pguid
        (a calendar) and a guid (an event's ID).
        """
        params = dict(self.params)
        params.update({'lang': 'en-us', 'usertz': get_localzone().zone})
        url = '%s/%s/%s' % (self._calendar_event_detail_url, pguid, guid)
        req = self.session.get(url, params=params)
        self.response = req.json()
        return self.response['Event'][0]
项目:house-of-spirits    作者:OnlyInAmerica    | 项目源码 | 文件源码
def party():
    if not is_local_request(flask.request):
        flask.abort(404)

    global party_process
    json = flask.request.get_json()
    enabled = json.get('enabled', False)
    logger.info('Got party state %r' % enabled)
    if enabled and not env.is_party_mode():
        # Start party XD
        party_process = Popen(["python3", "./animate_web.py", "run"])  # async
        env.set_party_mode(True)
        env.set_motion_enabled(False)

    elif not enabled and env.is_party_mode():
        # Stop party :(
        env.set_party_mode(False)
        env.set_motion_enabled(True)
        if party_process is not None:
            party_process.kill()
            party_process = None

        # Return lights to circadian color
        command = copy.deepcopy(COMMAND_FULL_ON)
        circadian_color = get_current_circadian_color(date=get_local_time())
        command_all_lights(circadian_color.apply_to_command(command))
    return "Party mode is now %r" % enabled
项目:house-of-spirits    作者:OnlyInAmerica    | 项目源码 | 文件源码
def guest_mode():
    if is_local_request(flask.request):
        json = flask.request.get_json()
        enabled = json.get('enabled', False)
        env.set_guest_mode(enabled)
        return "Guest mode is now %r" % enabled
    else:
        logger.info('Guest Mode accessed by remote address %s', flask.request.environ['REMOTE_ADDR'])
        flask.abort(404)
项目:house-of-spirits    作者:OnlyInAmerica    | 项目源码 | 文件源码
def motion_mode():
    if is_local_request(flask.request):
        json = flask.request.get_json()
        enabled = json.get('enabled', False)
        env.set_motion_enabled(enabled)
        return "Motion mode is now %r" % enabled
    else:
        logger.info('Motion Mode accessed by remote address %s', flask.request.environ['REMOTE_ADDR'])
        flask.abort(404)
项目:house-of-spirits    作者:OnlyInAmerica    | 项目源码 | 文件源码
def vacation_mode():
    if is_local_request(flask.request):
        json = flask.request.get_json()
        enabled = json.get('enabled', False)
        env.set_vacation_mode(enabled)
        return "Vacation mode is now %r" % enabled
    else:
        logger.info('Vacation Mode accessed by remote address %s', flask.request.environ['REMOTE_ADDR'])
        flask.abort(404)
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def get_eset_sig_from_scan(scans):
    if scans is None:
        return None
    for scan in scans:
        if scan.get('name') in ['ESET-NOD32', 'NOD32', 'NOD32v2']:
            return scan.get('result')
    return ''
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def key_dict_clean(json):
    if json is None:
        return None
    array = []
    for key in json.keys():
        tmp_dict = json.get(key)
        tmp_dict["name"] = key
        array.append(tmp_dict)
    return array

# Replace dot with _
# in dictionaries keys
# in order to save them in mongo
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def key_list_clean(json):
    if json is None:
        return None
    array = []
    for key in json.keys():
        tmp_dict = {}
        tmp_dict["name"] = key
        tmp_dict["values"] = json.get(key)
        array.append(tmp_dict)
    return array
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def jsonize(data):
    return json.dumps(data, sort_keys=False, indent=4)


# Checks if the meta has a date. If it doesn't
# it updates it. If a date is found, the oldest
# date will get saved.
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def change_date_to_str(res):
    if res is None:
        return None
    for date_key in ["date", "upload_date", "date_start", "date_end", "date_enqueued"]:
        if res.get(date_key) is None:
            pass
        else:
            res[date_key] = str(res.get(date_key))
    return res
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def add_error(resp_dict, error_code, error_message):
    if type(resp_dict) != dict:
        return resp_dict
    if resp_dict.get('errors') is None:
        resp_dict["errors"] = []
    resp_dict["errors"].append({"code": error_code, "message": error_message})
    return resp_dict
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def cursor_to_dict(f1, retrieve):
    results = []
    for f in f1:
        results.append(f)

    ret = []
    for a in results:
        dic = {}
        for key in retrieve.keys():
            steps = key.split('.')
            partial_res = a
            for step in steps:
                partial_res = partial_res.get(step)
                if partial_res is None:
                    break
                if isinstance(partial_res, list):
                    partial_res = None
                    break

            legend_to_show = key.split('.')[-1]
            if (legend_to_show == "file_id"):
                legend_to_show = "sha1"

            if (legend_to_show == "TimeDateStamp" and partial_res is not None):
                partial_res = time.strftime(
                    "%Y-%m-%d %H:%M:%S", time.gmtime(int(eval(partial_res), 16)))
            if (legend_to_show == "timeDateStamp" and partial_res is not None):
                partial_res = time.strftime(
                    "%Y-%m-%d %H:%M:%S", time.gmtime(partial_res))

            dic[legend_to_show] = partial_res

        ret.append(dic)
    return ret


# ****************TEST_CODE******************
项目:gym-http-api    作者:openai    | 项目源码 | 文件源码
def get_required_param(json, param):
    if json is None:
        logger.info("Request is not a valid json")
        raise InvalidUsage("Request is not a valid json")
    value = json.get(param, None)
    if (value is None) or (value=='') or (value==[]):
        logger.info("A required request parameter '{}' had value {}".format(param, value))
        raise InvalidUsage("A required request parameter '{}' was not provided".format(param))
    return value
项目:gym-http-api    作者:openai    | 项目源码 | 文件源码
def get_optional_param(json, param, default):
    if json is None:
        logger.info("Request is not a valid json")
        raise InvalidUsage("Request is not a valid json")
    value = json.get(param, None)
    if (value is None) or (value=='') or (value==[]):
        logger.info("An optional request parameter '{}' had value {} and was replaced with default value {}".format(param, value, default))
        value = default
    return value
项目:gym-http-api    作者:openai    | 项目源码 | 文件源码
def shutdown():
    """ Request a server shutdown - currently used by the integration tests to repeatedly create and destroy fresh copies of the server running in a separate thread"""
    f = request.environ.get('werkzeug.server.shutdown')
    f()
    return 'Server shutting down'
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def gitignore_template(self, language):
        """Return the template for language.

        :returns: str
        """
        url = self._build_url('gitignore', 'templates', language)
        json = self._json(self._get(url), 200)
        if not json:
            return ''
        return json.get('source', '')
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def admin_stats(self, option):
        """This is a simple way to get statistics about your system.

        :param str option: (required), accepted values: ('all', 'repos',
            'hooks', 'pages', 'orgs', 'users', 'pulls', 'issues',
            'milestones', 'gists', 'comments')
        :returns: dict
        """
        stats = {}
        if option.lower() in ('all', 'repos', 'hooks', 'pages', 'orgs',
                              'users', 'pulls', 'issues', 'milestones',
                              'gists', 'comments'):
            url = self._build_url('enterprise', 'stats', option.lower())
            stats = self._json(self._get(url), 200)
        return stats
项目:tulen    作者:detorto    | 项目源码 | 文件源码
def rl_dispatch():
    set_event()
    while True:
        rated_queue.get()
        set_event();
项目:tulen    作者:detorto    | 项目源码 | 文件源码
def read_minfo(method):
    json = load_json("./files/minfo.json")
    if json:
        return str(json.get(method,"no method info"))
    else: 
        return "no info at all"
项目:tulen    作者:detorto    | 项目源码 | 文件源码
def update_minfo(method, type, incr):
    with json_lock:
        json = load_json("./files/minfo.json")
        if not json:
           json = {"method":{"type":incr}}
        else:
           mmap = json.get(method, {})
           mmap[type] = mmap.get(type,0)+incr
           json[method] = mmap
        open("./files/minfo.json","w").write(pretty_dump(json))
项目:anaconda-project    作者:Anaconda-Platform    | 项目源码 | 文件源码
def environ_get_prefix(environ):
    for name in _all_prefix_variables:
        if name in environ:
            return environ.get(name)
    return None
项目:anaconda-project    作者:Anaconda-Platform    | 项目源码 | 文件源码
def current_platform():
    m = platform.machine()
    if m in _non_x86_linux_machines:
        return 'linux-%s' % m
    else:
        _platform_map = {'linux2': 'linux', 'linux': 'linux', 'darwin': 'osx', 'win32': 'win', }
        p = _platform_map.get(sys.platform, 'unknown')
        return '%s-%d' % (p, (8 * tuple.__itemsize__))
项目:PyPardot4    作者:mneedham91    | 项目源码 | 文件源码
def _check_response(response):
        """
        Checks the HTTP response to see if it contains JSON. If it does, checks the JSON for error codes and messages.
        Raises PardotAPIError if an error was found. If no error was found, returns the JSON. If JSON was not found,
        returns the response status code.
        """
        if response.headers.get('content-type') == 'application/json':
            json = response.json()
            error = json.get('err')
            if error:
                raise PardotAPIError(json_response=json)
            return json
        else:
            return response.status_code
项目:PyPardot4    作者:mneedham91    | 项目源码 | 文件源码
def authenticate(self):
        """
         Authenticates the user and sets the API key if successful. Returns True if authentication is successful,
         False if authentication fails.
        """
        try:
            auth = self.post('login', params={'email': self.email, 'password': self.password})
            self.api_key = auth.get('api_key')
            if self.api_key is not None:
                return True
            return False
        except PardotAPIError:
            return False
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def __run_actions(self, session_id, current_request, message, context, i,
                      verbose):
        if i <= 0:
            raise WitError('Max steps reached, stopping.')
        json = self.converse(session_id, message, context, verbose=verbose)
        if 'type' not in json:
            raise WitError('Couldn\'t find type in Wit response')
        if current_request != self._sessions[session_id]:
            return context

        self.logger.debug('Context: %s', context)
        self.logger.debug('Response type: %s', json['type'])

        # backwards-cpmpatibility with API version 20160516
        if json['type'] == 'merge':
            json['type'] = 'action'
            json['action'] = 'merge'

        if json['type'] == 'error':
            raise WitError('Oops, I don\'t know what to do.')

        if json['type'] == 'stop':
            return context

        request = {
            'session_id': session_id,
            'context': dict(context),
            'text': message,
            'entities': json.get('entities'),
        }
        if json['type'] == 'msg':
            self.throw_if_action_missing('send')
            response = {
                'text': json.get('msg').encode('utf8'),
                'quickreplies': json.get('quickreplies'),
            }
            self.actions['send'](request, response)
        elif json['type'] == 'action':
            action = json['action']
            self.throw_if_action_missing(action)
            context = self.actions[action](request)
            if context is None:
                self.logger.warn('missing context - did you forget to return it?')
                context = {}
        else:
            raise WitError('unknown type: ' + json['type'])
        if current_request != self._sessions[session_id]:
            return context
        return self.__run_actions(session_id, current_request, None, context,
                                  i - 1, verbose)
项目:dockercloud-cli    作者:docker    | 项目源码 | 文件源码
def service_ps(quiet, status, stack):
    try:
        headers = ["NAME", "UUID", "STATUS", "#CONTAINERS", "IMAGE", "DEPLOYED", "PUBLIC DNS", "STACK"]

        stack_resource_uri = None
        if stack:
            s = dockercloud.Utils.fetch_remote_stack(stack, raise_exceptions=False)
            if isinstance(s, dockercloud.NonUniqueIdentifier):
                raise dockercloud.NonUniqueIdentifier(
                    "Identifier %s matches more than one stack, please use UUID instead" % stack)
            if isinstance(s, dockercloud.ObjectNotFound):
                raise dockercloud.ObjectNotFound("Identifier '%s' does not match any stack" % stack)
            stack_resource_uri = s.resource_uri
        service_list = dockercloud.Service.list(state=status, stack=stack_resource_uri)

        data_list = []
        long_uuid_list = []
        has_unsynchronized_service = False
        stacks = {}
        for stack in dockercloud.Stack.list():
            stacks[stack.resource_uri] = stack.name
        for service in service_list:
            service_state = utils.add_unicode_symbol_to_state(service.state)
            if not service.synchronized and service.state != "Redeploying":
                service_state += "(*)"
                has_unsynchronized_service = True
            data_list.append([service.name, service.uuid[:8],
                              service_state,
                              service.current_num_containers,
                              service.image_name,
                              utils.get_humanize_local_datetime_from_utc_datetime_string(service.deployed_datetime),
                              service.public_dns,
                              stacks.get(service.stack)])
            long_uuid_list.append(service.uuid)
        if len(data_list) == 0:
            data_list.append(["", "", "", "", "", ""])

        if quiet:
            for uuid in long_uuid_list:
                print(uuid)
        else:
            utils.tabulate_result(data_list, headers)
            if has_unsynchronized_service:
                print(
                    "\n(*) Please note that this service needs to be redeployed to "
                    "have its configuration changes applied")
    except Exception as e:
        print(e, file=sys.stderr)
        sys.exit(EXCEPTION_EXIT_CODE)
项目:dockercloud-cli    作者:docker    | 项目源码 | 文件源码
def service_create(image, name, cpu_shares, memory, privileged, target_num_containers, run_command, entrypoint,
                   expose, publish, envvars, envfiles, tag, linked_to_service, autorestart, autodestroy, autoredeploy,
                   roles, sequential, volume, volumes_from, deployment_strategy, sync, net, pid):
    has_exception = False
    try:
        ports = utils.parse_published_ports(publish)

        # Add exposed_port to ports, excluding whose inner_port that has been defined in published ports
        exposed_ports = utils.parse_exposed_ports(expose)
        for exposed_port in exposed_ports:
            existed = False
            for port in ports:
                if exposed_port.get('inner_port', '') == port.get('inner_port', ''):
                    existed = True
                    break
            if not existed:
                ports.append(exposed_port)

        envvars = utils.parse_envvars(envvars, envfiles)
        links_service = utils.parse_links(linked_to_service, 'to_service')

        tags = []
        if tag:
            if isinstance(tag, list):
                for t in tag:
                    tags.append({"name": t})
            else:
                tags.append({"name": tag})

        bindings = utils.parse_volume(volume)
        bindings.extend(utils.parse_volumes_from(volumes_from))

        service = dockercloud.Service.create(image=image, name=name, cpu_shares=cpu_shares,
                                             memory=memory, privileged=privileged,
                                             target_num_containers=target_num_containers, run_command=run_command,
                                             entrypoint=entrypoint, container_ports=ports, container_envvars=envvars,
                                             linked_to_service=links_service,
                                             autorestart=autorestart, autodestroy=autodestroy,
                                             autoredeploy=autoredeploy,
                                             roles=roles, sequential_deployment=sequential, tags=tags,
                                             bindings=bindings,
                                             deployment_strategy=deployment_strategy, net=net, pid=pid)
        result = service.save()
        if not utils.sync_action(service, sync):
            has_exception = True
        if result:
            print(service.uuid)
    except Exception as e:
        print(e, file=sys.stderr)
        has_exception = True
    if has_exception:
        sys.exit(EXCEPTION_EXIT_CODE)
项目:dockercloud-cli    作者:docker    | 项目源码 | 文件源码
def service_run(image, name, cpu_shares, memory, privileged, target_num_containers, run_command, entrypoint,
                expose, publish, envvars, envfiles, tag, linked_to_service, autorestart, autodestroy, autoredeploy,
                roles, sequential, volume, volumes_from, deployment_strategy, sync, net, pid):
    has_exception = False
    try:
        ports = utils.parse_published_ports(publish)

        # Add exposed_port to ports, excluding whose inner_port that has been defined in published ports
        exposed_ports = utils.parse_exposed_ports(expose)
        for exposed_port in exposed_ports:
            existed = False
            for port in ports:
                if exposed_port.get('inner_port', '') == port.get('inner_port', ''):
                    existed = True
                    break
            if not existed:
                ports.append(exposed_port)

        envvars = utils.parse_envvars(envvars, envfiles)
        links_service = utils.parse_links(linked_to_service, 'to_service')

        tags = []
        if tag:
            if isinstance(tag, list):
                for t in tag:
                    tags.append({"name": t})
            else:
                tags.append({"name": tag})

        bindings = utils.parse_volume(volume)
        bindings.extend(utils.parse_volumes_from(volumes_from))

        service = dockercloud.Service.create(image=image, name=name, cpu_shares=cpu_shares,
                                             memory=memory, privileged=privileged,
                                             target_num_containers=target_num_containers, run_command=run_command,
                                             entrypoint=entrypoint, container_ports=ports, container_envvars=envvars,
                                             linked_to_service=links_service,
                                             autorestart=autorestart, autodestroy=autodestroy,
                                             autoredeploy=autoredeploy,
                                             roles=roles, sequential_deployment=sequential, tags=tags,
                                             bindings=bindings,
                                             deployment_strategy=deployment_strategy, net=net, pid=pid)
        service.save()
        result = service.start()
        if not utils.sync_action(service, sync):
            has_exception = True
        if result:
            print(service.uuid)
    except Exception as e:
        print(e, file=sys.stderr)
        has_exception = True
    if has_exception:
        sys.exit(EXCEPTION_EXIT_CODE)
项目:dockercloud-cli    作者:docker    | 项目源码 | 文件源码
def tag_ls(identifiers, quiet):
    has_exception = False

    headers = ["IDENTIFIER", "TYPE", "TAGS"]
    data_list = []
    tags_list = []
    for identifier in identifiers:
        try:
            obj = dockercloud.Utils.fetch_remote_service(identifier, raise_exceptions=False)
            if isinstance(obj, dockercloud.ObjectNotFound):
                obj = dockercloud.Utils.fetch_remote_nodecluster(identifier, raise_exceptions=False)
                if isinstance(obj, dockercloud.ObjectNotFound):
                    obj = dockercloud.Utils.fetch_remote_node(identifier, raise_exceptions=False)
                    if isinstance(obj, dockercloud.ObjectNotFound):
                        raise dockercloud.ObjectNotFound(
                            "Identifier '%s' does not match any service, node or nodecluster" % identifier)
                    else:
                        obj_type = 'Node'
                else:
                    obj_type = 'NodeCluster'
            else:
                obj_type = 'Service'

            tagnames = []
            for tags in dockercloud.Tag.fetch(obj).list():
                tagname = tags.get('name', '')
                if tagname:
                    tagnames.append(tagname)

            data_list.append([identifier, obj_type, ' '.join(tagnames)])
            tags_list.append(' '.join(tagnames))
        except Exception as e:
            if isinstance(e, dockercloud.ObjectNotFound):
                data_list.append([identifier, 'None', ''])
            else:
                data_list.append([identifier, '', ''])
            tags_list.append('')
            print(e, file=sys.stderr)
            has_exception = True
    if quiet:
        for tags in tags_list:
            print(tags)
    else:
        utils.tabulate_result(data_list, headers)
    if has_exception:
        sys.exit(EXCEPTION_EXIT_CODE)
项目:internet-content-detection    作者:liubo0621    | 项目源码 | 文件源码
def main():
    '''
    @summary:
    ---------
    @param :
    ---------
    @result:
    '''

    clues_json = get_clues()
    clues_count = len(clues_json['data'])

    clues_json = tools.dumps_json(clues_json)
    print(clues_json)
    # save_clues_to_file(clues_json)


    keys = 'pattek.com.cn'
    prpcrypt = Prpcrypt(keys)
    encrypt_text = prpcrypt.encrypt(clues_json)

    data = {'info':encrypt_text}

    # ?????
    url = 'http://192.168.60.38:8002/datasync_al/interface/cluesConfSync?'
    json = tools.get_json_by_requests(url, data = data)
    # ??????
    result = record_sync_status(clues_count, json.get("status"), json.get('message'), json.get('data'), 0)
    print(result)
    log.debug('''
        ------ ??????? -----
        %s
        ?????? %d
        '''%(json, result))

    # ?????
    url = 'http://124.205.229.232:8005/gdyq/datasync_al/interface/cluesConfSync'
    json = tools.get_json_by_requests(url, data = data)
    # ??????
    result = record_sync_status(clues_count, json.get("status"), json.get('message'), json.get('data'), 1)
    log.debug('''
        ------ ??????? -----
        %s
        ?????? %d
        '''%(json, result))
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def feeds(self):
        """List GitHub's timeline resources in Atom format.

        :returns: dictionary parsed to include URITemplates
        """
        def replace_href(feed_dict):
            if not feed_dict:
                return feed_dict
            ret_dict = {}
            # Let's pluck out what we're most interested in, the href value
            href = feed_dict.pop('href', None)
            # Then we update the return dictionary with the rest of the values
            ret_dict.update(feed_dict)
            if href is not None:
                # So long as there is something to template, let's template it
                ret_dict['href'] = URITemplate(href)
            return ret_dict

        url = self._build_url('feeds')
        json = self._json(self._get(url), 200, include_cache_info=False)
        if json is None:  # If something went wrong, get out early
            return None

        # We have a response body to parse
        feeds = {}

        # Let's pop out the old links so we don't have to skip them below
        old_links = json.pop('_links', {})
        _links = {}
        # If _links is in the response JSON, iterate over that and recreate it
        # so that any templates contained inside can be turned into
        # URITemplates
        for key, value in old_links.items():
            if isinstance(value, list):
                # If it's an array/list of links, let's replace that with a
                # new list of links
                _links[key] = [replace_href(d) for d in value]
            else:
                # Otherwise, just use the new value
                _links[key] = replace_href(value)

        # Start building up our return dictionary
        feeds['_links'] = _links

        for key, value in json.items():
            # This should roughly be the same logic as above.
            if isinstance(value, list):
                feeds[key] = [URITemplate(v) for v in value]
            else:
                feeds[key] = URITemplate(value)

        return feeds
项目:anaconda-project    作者:Anaconda-Platform    | 项目源码 | 文件源码
def parse_spec(spec):
    """Parse a package name and version spec as conda would.

    Returns:
       ``ParsedSpec`` or None on failure
    """
    if not is_string(spec):
        raise TypeError("Expected a string not %r" % spec)

    m = _spec_pat.match(spec)
    if m is None:
        return None
    name = m.group('name').lower()
    pip_constraint = m.group('pc')
    if pip_constraint is not None:
        pip_constraint = pip_constraint.replace(' ', '')

    conda_constraint = m.group('cc')

    exact_version = None
    exact_build_string = None
    if conda_constraint is not None:
        m = _conda_constraint_pat.match(conda_constraint)
        assert m is not None
        exact_version = m.group('version')
        for special in ('|', '*', ','):
            if special in exact_version:
                exact_version = None
                break
        if exact_version is not None:
            exact_build_string = m.group('build')
            if exact_build_string is not None:
                assert exact_build_string[0] == '='
                exact_build_string = exact_build_string[1:]

    return ParsedSpec(name=name,
                      conda_constraint=conda_constraint,
                      pip_constraint=pip_constraint,
                      exact_version=exact_version,
                      exact_build_string=exact_build_string)

# these are in order of preference. On pre-4.1.4 Windows,
# CONDA_PREFIX and CONDA_ENV_PATH aren't set, so we get to
# CONDA_DEFAULT_ENV.