我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用json.get()。
def resolve_env_to_prefix(name_or_prefix): """Convert an env name or path into a canonical prefix path. Returns: Absolute path of prefix or None if it isn't found. """ if os.path.isabs(name_or_prefix): return name_or_prefix json = info() root_prefix = json.get('root_prefix', None) if name_or_prefix == 'root': return root_prefix envs = json.get('envs', []) for prefix in envs: if os.path.basename(prefix) == name_or_prefix: return prefix return None
def parse_spec(spec): """Parse a package name and version spec as conda would. Returns: ``ParsedSpec`` or None on failure """ m = _spec_pat.match(spec) if m is None: return None pip_constraint = m.group('pc') if pip_constraint is not None: pip_constraint = pip_constraint.replace(' ', '') return ParsedSpec(name=m.group('name').lower(), conda_constraint=m.group('cc'), pip_constraint=pip_constraint) # these are in order of preference. On pre-4.1.4 Windows, # CONDA_PREFIX and CONDA_ENV_PATH aren't set, so we get to # CONDA_DEFAULT_ENV.
def environ_set_prefix(environ, prefix, varname=conda_prefix_variable()): prefix = os.path.normpath(prefix) environ[varname] = prefix if varname != 'CONDA_DEFAULT_ENV': # This case matters on both Unix and Windows # with conda >= 4.1.4 since requirement.env_var # is CONDA_PREFIX, and matters on Unix only pre-4.1.4 # when requirement.env_var is CONDA_ENV_PATH. global _envs_dirs global _root_dir if _envs_dirs is None: i = info() _envs_dirs = [os.path.normpath(d) for d in i.get('envs_dirs', [])] _root_dir = os.path.normpath(i.get('root_prefix')) if prefix == _root_dir: name = 'root' else: for d in _envs_dirs: name = subdirectory_relative_to_directory(prefix, d) if name != prefix: break environ['CONDA_DEFAULT_ENV'] = name
def node_byo(): token = "" try: if dockercloud.namespace: json = dockercloud.api.http.send_request("POST", "api/infra/%s/%s/token" % (API_VERSION, dockercloud.namespace)) else: json = dockercloud.api.http.send_request("POST", "api/infra/%s/token" % API_VERSION) if json: token = json.get("token", "") except Exception as e: print(e, file=sys.stderr) sys.exit(EXCEPTION_EXIT_CODE) print("Docker Cloud lets you use your own servers as nodes to run containers. " "For this you have to install our agent.") print("Run the following command on your server:") print() print("\tcurl -Ls https://get.cloud.docker.com/ | sudo -H sh -s", token) print()
def record_sync_status(total, status, message, data, sync_type): delete_count = 'null' worn_clues_list = [] update_count = 'null' save_count = 'null' if data: delete_count = data.get("deleteNum") worn_clues_list = data.get("wrongIdList") update_count = data.get("updateNum") save_count = data.get("saveNum") sql = "insert into TAB_IOPM_CLUES_SYNC_INFO (id, total, status, message, delete_count, update_count, save_count, worn_clues_list, sync_type) values (%s, %s, %s, '%s', %s, %s, %s, '%s', %s) "%('seq_iopm_sync.nextval', total, status, message, delete_count, update_count, save_count, ','.join(worn_clues_list), sync_type) print(sql) return db.add(sql)
def geocode(address): """ ????geocoding???????????? :param address:??????? :return: """ geocoding = {'s': 'rsv3', 'key': key, 'city': '??', 'address': address} res = requests.get( "http://restapi.amap.com/v3/geocode/geo", params=geocoding) if res.status_code == 200: json = res.json() status = json.get('status') count = json.get('count') if status == '1' and int(count) >= 1: geocodes = json.get('geocodes')[0] lng = float(geocodes.get('location').split(',')[0]) lat = float(geocodes.get('location').split(',')[1]) return [lng, lat] else: return None else: return None
def refresh_client(self, from_dt=None, to_dt=None): """ Refreshes the CalendarService endpoint, ensuring that the event data is up-to-date. If no 'from_dt' or 'to_dt' datetimes have been given, the range becomes this month. """ today = datetime.today() first_day, last_day = monthrange(today.year, today.month) if not from_dt: from_dt = datetime(today.year, today.month, first_day) if not to_dt: to_dt = datetime(today.year, today.month, last_day) params = dict(self.params) params.update({ 'lang': 'en-us', 'usertz': get_localzone().zone, 'startDate': from_dt.strftime('%Y-%m-%d'), 'endDate': to_dt.strftime('%Y-%m-%d') }) req = self.session.get(self._calendar_refresh_url, params=params) self.response = req.json()
def lights(): """ example_lights_json = { 'rooms': [ {'name': 'Living Room', 'on': True}, ] } """ if is_local_request(flask.request): json = flask.request.get_json() rooms = json.get('rooms', []) logger.info('Switching rooms %s', rooms) for json_room in rooms: room_name = json_room['name'] on_state = json_room['on'] for room in ROOMS: # Strip whitespace if room.name == room_name: logger.info('Switching room %s', room.name) room.switch(on_state) return "Light commands sent." else: logger.info('Lights accessed by remote address %s', flask.request.environ['REMOTE_ADDR']) flask.abort(404)
def check_authorization(self, access_token): """Check an authorization created by a registered application. OAuth applications can use this method to check token validity without hitting normal rate limits because of failed login attempts. If the token is valid, it will return True, otherwise it will return False. :returns: bool """ p = self.session.params auth = (p.get('client_id'), p.get('client_secret')) if access_token and auth: url = self._build_url('applications', str(auth[0]), 'tokens', str(access_token)) resp = self._get(url, auth=auth, params={ 'client_id': None, 'client_secret': None }) return self._boolean(resp, 200, 404) return False
def environ_set_prefix(environ, prefix, varname=conda_prefix_variable()): prefix = os.path.normpath(prefix) environ[varname] = prefix if varname != 'CONDA_DEFAULT_ENV': # This case matters on both Unix and Windows # with conda >= 4.1.4 since requirement.env_var # is CONDA_PREFIX, and matters on Unix only pre-4.1.4 # when requirement.env_var is CONDA_ENV_PATH. global _envs_dirs global _root_dir if _envs_dirs is None: i = info() _envs_dirs = [os.path.normpath(d) for d in i.get('envs_dirs', [])] _root_dir = os.path.normpath(i.get('root_prefix')) if prefix == _root_dir: name = 'root' else: for d in _envs_dirs: name = subdirectory_relative_to_directory(prefix, d) if name != prefix: break environ['CONDA_DEFAULT_ENV'] = name # This isn't all (e.g. leaves out arm, power). it's sort of "all # that people typically publish for"
def get(self, object_name, path=None, params=None, retries=0): """ Makes a GET request to the API. Checks for invalid requests that raise PardotAPIErrors. If the API key is invalid, one re-authentication request is made, in case the key has simply expired. If no errors are raised, returns either the JSON response, or if no JSON was returned, returns the HTTP response status code. """ if params is None: params = {} params.update({'user_key': self.user_key, 'api_key': self.api_key, 'format': 'json'}) try: self._check_auth(object_name=object_name) request = requests.get(self._full_path(object_name, path), params=params) response = self._check_response(request) return response except PardotAPIError as err: if err.message == 'Invalid API key or user key': response = self._handle_expired_api_key(err, retries, 'get', object_name, path, params) return response else: raise err
def environ_get_prefix(environ): for name in _all_prefix_variables: if name in environ: return environ.get(name) return None
def request(self, *args, **kwargs): response = super(Session, self).request(*args, **kwargs) content_type = response.headers.get('Content-Type', '').split(';')[0] json_mimetypes = ['application/json', 'text/json'] if content_type not in json_mimetypes: return response try: json = response.json() except: return response reason = json.get('errorMessage') or json.get('reason') if not reason and isinstance(json.get('error'), six.string_types): reason = json.get('error') if not reason and not response.ok: reason = response.reason if not reason and json.get('error'): reason = "Unknown reason" code = json.get('errorCode') if reason: raise Exception return response
def _get_cookiejar_path(self): # Get path for cookiejar file return os.path.join( self._cookie_directory, ''.join([c for c in self.user.get('apple_id') if match(r'\w', c)]) )
def __unicode__(self): return 'iCloud API: %s' % self.user.get('apple_id')
def get_event_detail(self, pguid, guid): """ Fetches a single event's details by specifying a pguid (a calendar) and a guid (an event's ID). """ params = dict(self.params) params.update({'lang': 'en-us', 'usertz': get_localzone().zone}) url = '%s/%s/%s' % (self._calendar_event_detail_url, pguid, guid) req = self.session.get(url, params=params) self.response = req.json() return self.response['Event'][0]
def party(): if not is_local_request(flask.request): flask.abort(404) global party_process json = flask.request.get_json() enabled = json.get('enabled', False) logger.info('Got party state %r' % enabled) if enabled and not env.is_party_mode(): # Start party XD party_process = Popen(["python3", "./animate_web.py", "run"]) # async env.set_party_mode(True) env.set_motion_enabled(False) elif not enabled and env.is_party_mode(): # Stop party :( env.set_party_mode(False) env.set_motion_enabled(True) if party_process is not None: party_process.kill() party_process = None # Return lights to circadian color command = copy.deepcopy(COMMAND_FULL_ON) circadian_color = get_current_circadian_color(date=get_local_time()) command_all_lights(circadian_color.apply_to_command(command)) return "Party mode is now %r" % enabled
def guest_mode(): if is_local_request(flask.request): json = flask.request.get_json() enabled = json.get('enabled', False) env.set_guest_mode(enabled) return "Guest mode is now %r" % enabled else: logger.info('Guest Mode accessed by remote address %s', flask.request.environ['REMOTE_ADDR']) flask.abort(404)
def motion_mode(): if is_local_request(flask.request): json = flask.request.get_json() enabled = json.get('enabled', False) env.set_motion_enabled(enabled) return "Motion mode is now %r" % enabled else: logger.info('Motion Mode accessed by remote address %s', flask.request.environ['REMOTE_ADDR']) flask.abort(404)
def vacation_mode(): if is_local_request(flask.request): json = flask.request.get_json() enabled = json.get('enabled', False) env.set_vacation_mode(enabled) return "Vacation mode is now %r" % enabled else: logger.info('Vacation Mode accessed by remote address %s', flask.request.environ['REMOTE_ADDR']) flask.abort(404)
def get_eset_sig_from_scan(scans): if scans is None: return None for scan in scans: if scan.get('name') in ['ESET-NOD32', 'NOD32', 'NOD32v2']: return scan.get('result') return ''
def key_dict_clean(json): if json is None: return None array = [] for key in json.keys(): tmp_dict = json.get(key) tmp_dict["name"] = key array.append(tmp_dict) return array # Replace dot with _ # in dictionaries keys # in order to save them in mongo
def key_list_clean(json): if json is None: return None array = [] for key in json.keys(): tmp_dict = {} tmp_dict["name"] = key tmp_dict["values"] = json.get(key) array.append(tmp_dict) return array
def jsonize(data): return json.dumps(data, sort_keys=False, indent=4) # Checks if the meta has a date. If it doesn't # it updates it. If a date is found, the oldest # date will get saved.
def change_date_to_str(res): if res is None: return None for date_key in ["date", "upload_date", "date_start", "date_end", "date_enqueued"]: if res.get(date_key) is None: pass else: res[date_key] = str(res.get(date_key)) return res
def add_error(resp_dict, error_code, error_message): if type(resp_dict) != dict: return resp_dict if resp_dict.get('errors') is None: resp_dict["errors"] = [] resp_dict["errors"].append({"code": error_code, "message": error_message}) return resp_dict
def cursor_to_dict(f1, retrieve): results = [] for f in f1: results.append(f) ret = [] for a in results: dic = {} for key in retrieve.keys(): steps = key.split('.') partial_res = a for step in steps: partial_res = partial_res.get(step) if partial_res is None: break if isinstance(partial_res, list): partial_res = None break legend_to_show = key.split('.')[-1] if (legend_to_show == "file_id"): legend_to_show = "sha1" if (legend_to_show == "TimeDateStamp" and partial_res is not None): partial_res = time.strftime( "%Y-%m-%d %H:%M:%S", time.gmtime(int(eval(partial_res), 16))) if (legend_to_show == "timeDateStamp" and partial_res is not None): partial_res = time.strftime( "%Y-%m-%d %H:%M:%S", time.gmtime(partial_res)) dic[legend_to_show] = partial_res ret.append(dic) return ret # ****************TEST_CODE******************
def get_required_param(json, param): if json is None: logger.info("Request is not a valid json") raise InvalidUsage("Request is not a valid json") value = json.get(param, None) if (value is None) or (value=='') or (value==[]): logger.info("A required request parameter '{}' had value {}".format(param, value)) raise InvalidUsage("A required request parameter '{}' was not provided".format(param)) return value
def get_optional_param(json, param, default): if json is None: logger.info("Request is not a valid json") raise InvalidUsage("Request is not a valid json") value = json.get(param, None) if (value is None) or (value=='') or (value==[]): logger.info("An optional request parameter '{}' had value {} and was replaced with default value {}".format(param, value, default)) value = default return value
def shutdown(): """ Request a server shutdown - currently used by the integration tests to repeatedly create and destroy fresh copies of the server running in a separate thread""" f = request.environ.get('werkzeug.server.shutdown') f() return 'Server shutting down'
def gitignore_template(self, language): """Return the template for language. :returns: str """ url = self._build_url('gitignore', 'templates', language) json = self._json(self._get(url), 200) if not json: return '' return json.get('source', '')
def admin_stats(self, option): """This is a simple way to get statistics about your system. :param str option: (required), accepted values: ('all', 'repos', 'hooks', 'pages', 'orgs', 'users', 'pulls', 'issues', 'milestones', 'gists', 'comments') :returns: dict """ stats = {} if option.lower() in ('all', 'repos', 'hooks', 'pages', 'orgs', 'users', 'pulls', 'issues', 'milestones', 'gists', 'comments'): url = self._build_url('enterprise', 'stats', option.lower()) stats = self._json(self._get(url), 200) return stats
def rl_dispatch(): set_event() while True: rated_queue.get() set_event();
def read_minfo(method): json = load_json("./files/minfo.json") if json: return str(json.get(method,"no method info")) else: return "no info at all"
def update_minfo(method, type, incr): with json_lock: json = load_json("./files/minfo.json") if not json: json = {"method":{"type":incr}} else: mmap = json.get(method, {}) mmap[type] = mmap.get(type,0)+incr json[method] = mmap open("./files/minfo.json","w").write(pretty_dump(json))
def current_platform(): m = platform.machine() if m in _non_x86_linux_machines: return 'linux-%s' % m else: _platform_map = {'linux2': 'linux', 'linux': 'linux', 'darwin': 'osx', 'win32': 'win', } p = _platform_map.get(sys.platform, 'unknown') return '%s-%d' % (p, (8 * tuple.__itemsize__))
def _check_response(response): """ Checks the HTTP response to see if it contains JSON. If it does, checks the JSON for error codes and messages. Raises PardotAPIError if an error was found. If no error was found, returns the JSON. If JSON was not found, returns the response status code. """ if response.headers.get('content-type') == 'application/json': json = response.json() error = json.get('err') if error: raise PardotAPIError(json_response=json) return json else: return response.status_code
def authenticate(self): """ Authenticates the user and sets the API key if successful. Returns True if authentication is successful, False if authentication fails. """ try: auth = self.post('login', params={'email': self.email, 'password': self.password}) self.api_key = auth.get('api_key') if self.api_key is not None: return True return False except PardotAPIError: return False
def __run_actions(self, session_id, current_request, message, context, i, verbose): if i <= 0: raise WitError('Max steps reached, stopping.') json = self.converse(session_id, message, context, verbose=verbose) if 'type' not in json: raise WitError('Couldn\'t find type in Wit response') if current_request != self._sessions[session_id]: return context self.logger.debug('Context: %s', context) self.logger.debug('Response type: %s', json['type']) # backwards-cpmpatibility with API version 20160516 if json['type'] == 'merge': json['type'] = 'action' json['action'] = 'merge' if json['type'] == 'error': raise WitError('Oops, I don\'t know what to do.') if json['type'] == 'stop': return context request = { 'session_id': session_id, 'context': dict(context), 'text': message, 'entities': json.get('entities'), } if json['type'] == 'msg': self.throw_if_action_missing('send') response = { 'text': json.get('msg').encode('utf8'), 'quickreplies': json.get('quickreplies'), } self.actions['send'](request, response) elif json['type'] == 'action': action = json['action'] self.throw_if_action_missing(action) context = self.actions[action](request) if context is None: self.logger.warn('missing context - did you forget to return it?') context = {} else: raise WitError('unknown type: ' + json['type']) if current_request != self._sessions[session_id]: return context return self.__run_actions(session_id, current_request, None, context, i - 1, verbose)
def service_ps(quiet, status, stack): try: headers = ["NAME", "UUID", "STATUS", "#CONTAINERS", "IMAGE", "DEPLOYED", "PUBLIC DNS", "STACK"] stack_resource_uri = None if stack: s = dockercloud.Utils.fetch_remote_stack(stack, raise_exceptions=False) if isinstance(s, dockercloud.NonUniqueIdentifier): raise dockercloud.NonUniqueIdentifier( "Identifier %s matches more than one stack, please use UUID instead" % stack) if isinstance(s, dockercloud.ObjectNotFound): raise dockercloud.ObjectNotFound("Identifier '%s' does not match any stack" % stack) stack_resource_uri = s.resource_uri service_list = dockercloud.Service.list(state=status, stack=stack_resource_uri) data_list = [] long_uuid_list = [] has_unsynchronized_service = False stacks = {} for stack in dockercloud.Stack.list(): stacks[stack.resource_uri] = stack.name for service in service_list: service_state = utils.add_unicode_symbol_to_state(service.state) if not service.synchronized and service.state != "Redeploying": service_state += "(*)" has_unsynchronized_service = True data_list.append([service.name, service.uuid[:8], service_state, service.current_num_containers, service.image_name, utils.get_humanize_local_datetime_from_utc_datetime_string(service.deployed_datetime), service.public_dns, stacks.get(service.stack)]) long_uuid_list.append(service.uuid) if len(data_list) == 0: data_list.append(["", "", "", "", "", ""]) if quiet: for uuid in long_uuid_list: print(uuid) else: utils.tabulate_result(data_list, headers) if has_unsynchronized_service: print( "\n(*) Please note that this service needs to be redeployed to " "have its configuration changes applied") except Exception as e: print(e, file=sys.stderr) sys.exit(EXCEPTION_EXIT_CODE)
def service_create(image, name, cpu_shares, memory, privileged, target_num_containers, run_command, entrypoint, expose, publish, envvars, envfiles, tag, linked_to_service, autorestart, autodestroy, autoredeploy, roles, sequential, volume, volumes_from, deployment_strategy, sync, net, pid): has_exception = False try: ports = utils.parse_published_ports(publish) # Add exposed_port to ports, excluding whose inner_port that has been defined in published ports exposed_ports = utils.parse_exposed_ports(expose) for exposed_port in exposed_ports: existed = False for port in ports: if exposed_port.get('inner_port', '') == port.get('inner_port', ''): existed = True break if not existed: ports.append(exposed_port) envvars = utils.parse_envvars(envvars, envfiles) links_service = utils.parse_links(linked_to_service, 'to_service') tags = [] if tag: if isinstance(tag, list): for t in tag: tags.append({"name": t}) else: tags.append({"name": tag}) bindings = utils.parse_volume(volume) bindings.extend(utils.parse_volumes_from(volumes_from)) service = dockercloud.Service.create(image=image, name=name, cpu_shares=cpu_shares, memory=memory, privileged=privileged, target_num_containers=target_num_containers, run_command=run_command, entrypoint=entrypoint, container_ports=ports, container_envvars=envvars, linked_to_service=links_service, autorestart=autorestart, autodestroy=autodestroy, autoredeploy=autoredeploy, roles=roles, sequential_deployment=sequential, tags=tags, bindings=bindings, deployment_strategy=deployment_strategy, net=net, pid=pid) result = service.save() if not utils.sync_action(service, sync): has_exception = True if result: print(service.uuid) except Exception as e: print(e, file=sys.stderr) has_exception = True if has_exception: sys.exit(EXCEPTION_EXIT_CODE)
def service_run(image, name, cpu_shares, memory, privileged, target_num_containers, run_command, entrypoint, expose, publish, envvars, envfiles, tag, linked_to_service, autorestart, autodestroy, autoredeploy, roles, sequential, volume, volumes_from, deployment_strategy, sync, net, pid): has_exception = False try: ports = utils.parse_published_ports(publish) # Add exposed_port to ports, excluding whose inner_port that has been defined in published ports exposed_ports = utils.parse_exposed_ports(expose) for exposed_port in exposed_ports: existed = False for port in ports: if exposed_port.get('inner_port', '') == port.get('inner_port', ''): existed = True break if not existed: ports.append(exposed_port) envvars = utils.parse_envvars(envvars, envfiles) links_service = utils.parse_links(linked_to_service, 'to_service') tags = [] if tag: if isinstance(tag, list): for t in tag: tags.append({"name": t}) else: tags.append({"name": tag}) bindings = utils.parse_volume(volume) bindings.extend(utils.parse_volumes_from(volumes_from)) service = dockercloud.Service.create(image=image, name=name, cpu_shares=cpu_shares, memory=memory, privileged=privileged, target_num_containers=target_num_containers, run_command=run_command, entrypoint=entrypoint, container_ports=ports, container_envvars=envvars, linked_to_service=links_service, autorestart=autorestart, autodestroy=autodestroy, autoredeploy=autoredeploy, roles=roles, sequential_deployment=sequential, tags=tags, bindings=bindings, deployment_strategy=deployment_strategy, net=net, pid=pid) service.save() result = service.start() if not utils.sync_action(service, sync): has_exception = True if result: print(service.uuid) except Exception as e: print(e, file=sys.stderr) has_exception = True if has_exception: sys.exit(EXCEPTION_EXIT_CODE)
def tag_ls(identifiers, quiet): has_exception = False headers = ["IDENTIFIER", "TYPE", "TAGS"] data_list = [] tags_list = [] for identifier in identifiers: try: obj = dockercloud.Utils.fetch_remote_service(identifier, raise_exceptions=False) if isinstance(obj, dockercloud.ObjectNotFound): obj = dockercloud.Utils.fetch_remote_nodecluster(identifier, raise_exceptions=False) if isinstance(obj, dockercloud.ObjectNotFound): obj = dockercloud.Utils.fetch_remote_node(identifier, raise_exceptions=False) if isinstance(obj, dockercloud.ObjectNotFound): raise dockercloud.ObjectNotFound( "Identifier '%s' does not match any service, node or nodecluster" % identifier) else: obj_type = 'Node' else: obj_type = 'NodeCluster' else: obj_type = 'Service' tagnames = [] for tags in dockercloud.Tag.fetch(obj).list(): tagname = tags.get('name', '') if tagname: tagnames.append(tagname) data_list.append([identifier, obj_type, ' '.join(tagnames)]) tags_list.append(' '.join(tagnames)) except Exception as e: if isinstance(e, dockercloud.ObjectNotFound): data_list.append([identifier, 'None', '']) else: data_list.append([identifier, '', '']) tags_list.append('') print(e, file=sys.stderr) has_exception = True if quiet: for tags in tags_list: print(tags) else: utils.tabulate_result(data_list, headers) if has_exception: sys.exit(EXCEPTION_EXIT_CODE)
def main(): ''' @summary: --------- @param : --------- @result: ''' clues_json = get_clues() clues_count = len(clues_json['data']) clues_json = tools.dumps_json(clues_json) print(clues_json) # save_clues_to_file(clues_json) keys = 'pattek.com.cn' prpcrypt = Prpcrypt(keys) encrypt_text = prpcrypt.encrypt(clues_json) data = {'info':encrypt_text} # ????? url = 'http://192.168.60.38:8002/datasync_al/interface/cluesConfSync?' json = tools.get_json_by_requests(url, data = data) # ?????? result = record_sync_status(clues_count, json.get("status"), json.get('message'), json.get('data'), 0) print(result) log.debug(''' ------ ??????? ----- %s ?????? %d '''%(json, result)) # ????? url = 'http://124.205.229.232:8005/gdyq/datasync_al/interface/cluesConfSync' json = tools.get_json_by_requests(url, data = data) # ?????? result = record_sync_status(clues_count, json.get("status"), json.get('message'), json.get('data'), 1) log.debug(''' ------ ??????? ----- %s ?????? %d '''%(json, result))
def feeds(self): """List GitHub's timeline resources in Atom format. :returns: dictionary parsed to include URITemplates """ def replace_href(feed_dict): if not feed_dict: return feed_dict ret_dict = {} # Let's pluck out what we're most interested in, the href value href = feed_dict.pop('href', None) # Then we update the return dictionary with the rest of the values ret_dict.update(feed_dict) if href is not None: # So long as there is something to template, let's template it ret_dict['href'] = URITemplate(href) return ret_dict url = self._build_url('feeds') json = self._json(self._get(url), 200, include_cache_info=False) if json is None: # If something went wrong, get out early return None # We have a response body to parse feeds = {} # Let's pop out the old links so we don't have to skip them below old_links = json.pop('_links', {}) _links = {} # If _links is in the response JSON, iterate over that and recreate it # so that any templates contained inside can be turned into # URITemplates for key, value in old_links.items(): if isinstance(value, list): # If it's an array/list of links, let's replace that with a # new list of links _links[key] = [replace_href(d) for d in value] else: # Otherwise, just use the new value _links[key] = replace_href(value) # Start building up our return dictionary feeds['_links'] = _links for key, value in json.items(): # This should roughly be the same logic as above. if isinstance(value, list): feeds[key] = [URITemplate(v) for v in value] else: feeds[key] = URITemplate(value) return feeds
def parse_spec(spec): """Parse a package name and version spec as conda would. Returns: ``ParsedSpec`` or None on failure """ if not is_string(spec): raise TypeError("Expected a string not %r" % spec) m = _spec_pat.match(spec) if m is None: return None name = m.group('name').lower() pip_constraint = m.group('pc') if pip_constraint is not None: pip_constraint = pip_constraint.replace(' ', '') conda_constraint = m.group('cc') exact_version = None exact_build_string = None if conda_constraint is not None: m = _conda_constraint_pat.match(conda_constraint) assert m is not None exact_version = m.group('version') for special in ('|', '*', ','): if special in exact_version: exact_version = None break if exact_version is not None: exact_build_string = m.group('build') if exact_build_string is not None: assert exact_build_string[0] == '=' exact_build_string = exact_build_string[1:] return ParsedSpec(name=name, conda_constraint=conda_constraint, pip_constraint=pip_constraint, exact_version=exact_version, exact_build_string=exact_build_string) # these are in order of preference. On pre-4.1.4 Windows, # CONDA_PREFIX and CONDA_ENV_PATH aren't set, so we get to # CONDA_DEFAULT_ENV.