我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ast.literal_eval()。
def parse_default(field, ftype, fdefault): if not (ftype == 'bool' and fdefault == 'true'): try: fdefault = literal_eval(fdefault.rstrip('LDF')) except (ValueError, SyntaxError): fdefault = None if type(fdefault) is int: if ftype[0] != 'u' and ftype[:5] != 'fixed': if fdefault >> 63: fdefault = c_long(fdefault).value elif fdefault >> 31 and ftype[-2:] != '64': fdefault = c_int(fdefault).value else: fdefault &= (1 << int(ftype[-2:])) - 1 if ftype == 'float' and abs(fdefault) >> 23: fdefault = unpack('=f', pack('=i', fdefault))[0] elif ftype == 'double' and abs(fdefault) >> 52: fdefault = unpack('=d', pack('=q', fdefault))[0] if fdefault: field.default_value = str(fdefault)
def get_default(default, setting_type): if isinstance(default, str): if setting_type[0] == 'str_t': return default else: try: return ast.literal_eval(default) except: if setting_type[0] == 'bool_t': return default.lower()=='true' else: try: return setting_type[1](default.split(' ')[0]) except: return elif default is None: return setting_type[1]() else: return default
def read_settings(self): ''' Reads the settings from the digital_ocean.ini file ''' config = ConfigParser.SafeConfigParser() config.read(os.path.dirname(os.path.realpath(__file__)) + '/digital_ocean.ini') # Credentials if config.has_option('digital_ocean', 'api_token'): self.api_token = config.get('digital_ocean', 'api_token') # Cache related if config.has_option('digital_ocean', 'cache_path'): self.cache_path = config.get('digital_ocean', 'cache_path') if config.has_option('digital_ocean', 'cache_max_age'): self.cache_max_age = config.getint('digital_ocean', 'cache_max_age') # Private IP Address if config.has_option('digital_ocean', 'use_private_network'): self.use_private_network = config.getboolean('digital_ocean', 'use_private_network') # Group variables if config.has_option('digital_ocean', 'group_variables'): self.group_variables = ast.literal_eval(config.get('digital_ocean', 'group_variables'))
def get_upload_url(self,session): """Summary Args: session (TYPE): Description Returns: TYPE: Description """ r = session.get ( DataManagement.__APPSPOT_URL ) if r.text.startswith ( '\n<!DOCTYPE html>' ): self.logger.debug ( 'Incorrect credentials. Probably. If you are sure the credentials are OK, ' 'refresh the authentication token. If it did not work report a problem. ' 'They might have changed something in the Matrix.' ) sys.exit ( 1 ) elif r.text.startswith ( '<HTML>' ): self.logger.debug ( 'Redirecting to upload URL' ) r = session.get ( DataManagement.__APPSPOT_URL ) d = ast.literal_eval ( r.text ) return d['url']
def cfg_from_list(cfg_list): """Set config keys via list (e.g., from command line).""" from ast import literal_eval assert len(cfg_list) % 2 == 0 for k, v in zip(cfg_list[0::2], cfg_list[1::2]): key_list = k.split('.') d = __C for subkey in key_list[:-1]: assert subkey in d d = d[subkey] subkey = key_list[-1] assert subkey in d try: value = literal_eval(v) except: # handle the case when v is a string literal value = v assert type(value) == type(d[subkey]), \ 'type {} does not match original type {}'.format( type(value), type(d[subkey])) d[subkey] = value
def checkPlugins(self): response = self._send("GET", "pluginManager/api/python?depth=1") if response.status != 200: print("Warning: could not verify plugins: HTTP error: {} {}" .format(response.status, response.reason), file=sys.stderr) response.read() else: try: plugins = ast.literal_eval(response.read().decode("utf8"))["plugins"] required = set(requiredPlugins.keys()) for p in plugins: if p["shortName"] not in required: continue if not p["active"] or not p["enabled"]: raise BuildError("Plugin not enabled: " + requiredPlugins[p["shortName"]]) required.remove(p["shortName"]) if required: raise BuildError("Missing plugin(s): " + ", ".join( requiredPlugins[p] for p in required)) except BuildError: raise except: raise BuildError("Malformed Jenkins response while checking plugins!")
def collect_moves(self, reader, name): Moves = namedtuple('Moves', ['pokemon', 'gen', 'color', 'moves', 'versions']) if name.split('-')[-1].isdigit(): for row in reader: if name == row[0]: pokemon = name.split('-')[0].title() generation, color = switcher[row[1]], int(ast.literal_eval(row[2])) moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4]) return Moves(pokemon, generation, color, moves, versions) else: for row in reader: if name in row[0]: pokemon = name.title() generation, color = switcher[row[1]], int(ast.literal_eval(row[2])) moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4]) return Moves(pokemon, generation, color, moves, versions)
def _unwrap_plugin_exceptions(self, func, *args, **kwargs): """Parse exception details.""" try: return func(*args, **kwargs) except XenAPI.Failure as exc: LOG.debug("Got exception: %s", exc) if (len(exc.details) == 4 and exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and exc.details[2] == 'Failure'): params = None try: params = ast.literal_eval(exc.details[3]) except Exception: raise exc raise XenAPI.Failure(params) else: raise except xmlrpclib.ProtocolError as exc: LOG.debug("Got exception: %s", exc) raise
def _int_handler(self): """ Handler for int values. Parses QLineEdit. """ current = self._select_widget.text() try: # Make sure this scans as a string if type(ast.literal_eval(current)) != int: raise ValueError new_value = int(current) color = "#FFFFFF" self._current_value = new_value self._valid = True except ValueError: color = "#FFB6C1" self._valid = False self._select_widget.setStyleSheet("QLineEdit {{ background-color: {} }}".format(color))
def cfg_from_list(cfg_list): """Set config keys via list (e.g., from command line).""" from ast import literal_eval assert len(cfg_list) % 2 == 0 for k, v in zip(cfg_list[0::2], cfg_list[1::2]): key_list = k.split('.') d = __C for subkey in key_list[:-1]: assert d.has_key(subkey) d = d[subkey] subkey = key_list[-1] assert d.has_key(subkey) try: value = literal_eval(v) except: # handle the case when v is a string literal value = v assert type(value) == type(d[subkey]), \ 'type {} does not match original type {}'.format( type(value), type(d[subkey])) d[subkey] = value
def fixSettingDictLoadedFromResultsDf(setting_dict): if 'hidden_layers' in setting_dict.keys(): if type(setting_dict['hidden_layers']) == str: setting_dict['hidden_layers'] = ast.literal_eval(setting_dict['hidden_layers']) if 'optimizer' in setting_dict.keys(): if 'GradientDescent' in setting_dict['optimizer']: setting_dict['optimizer'] = tf.train.GradientDescentOptimizer elif 'Adagrad' in setting_dict['optimizer']: setting_dict['optimizer'] = tf.train.AdagradOptimizer else: setting_dict['optimizer'] = tf.train.AdamOptimizer for setting in ['batch_size','decay_steps']: if setting in setting_dict.keys(): setting_dict[setting] = int(setting_dict[setting]) return setting_dict
def coalesce_ordered_dict(dict_str): """Return an :obj:collections.OrderedDict from ``dict_str``. >>> config = configparser.ConfigParser() >>> config.add_section('x') >>> config.set('x', 'y', "{'a': 0}\\n{'b': 1}") >>> od = coalesce_ordered_dict(config['x']['y']) >>> assert isinstance(od, OrderedDict) >>> assert od.popitem() == ('b', 1) >>> assert od.popitem() == ('a', 0) :param str dict_str: A string of newline-delimited dictionary literals. """ ordered_dict = OrderedDict() [ordered_dict.update(literal_eval(i)) for i in dict_str.splitlines()] return ordered_dict
def aget(self, **kw): session = aiohttp.ClientSession() url = URL(self.url) if kw: url = url.with_query(**kw) logger.debug("GET %s", url) try: response = yield from session.get(url, timeout=10) payload = yield from response.read() finally: yield from session.close() response.raise_for_status() payload = payload.decode('utf-8') if response.content_type == 'text/x-python': payload = ast.literal_eval(payload) return Payload.factory(response.status, response.headers, payload)
def cfg_from_list(cfg_list): """Set config keys via list (e.g., from command line).""" from ast import literal_eval assert len(cfg_list) % 2 == 0 for k, v in zip(cfg_list[0::2], cfg_list[1::2]): key_list = k.split('.') d = __C for subkey in key_list[:-1]: assert subkey in d d = d[subkey] subkey = key_list[-1] assert subkey in d try: value = literal_eval(v) except: # handle the case when v is a string literal value = v assert isinstance(value, type(d[subkey])), \ 'type {} does not match original type {}'.format( type(value), type(d[subkey])) d[subkey] = value
def send(self, *args, **kwargs): # In raven<6, args = (data, headers). # In raven 6.x args = (url, data, headers) if len(args) == 2: data, _ = args elif len(args) == 3: _, data, _ = args else: raise Exception('raven Transport.send api seems to have changed') raw = json.loads(zlib.decompress(data).decode('utf8')) # to make asserting easier, parse json strings into python strings for k, v in list(raw['extra'].items()): try: val = ast.literal_eval(v) raw['extra'][k] = val except Exception: pass self.messages.append(raw)
def _callback(cls, cron): pool = Pool() Config = pool.get('ir.configuration') try: args = (cron.args or []) and literal_eval(cron.args) Model = pool.get(cron.model) with Transaction().set_user(cron.user.id): getattr(Model, cron.function)(*args) except Exception: Transaction().cursor.rollback() req_user = cron.request_user language = (req_user.language.code if req_user.language else Config.get_language()) with Transaction().set_user(cron.user.id), \ Transaction().set_context(language=language): cls.send_error_message(cron)
def set_custom_user_vars(opt_key, settings, args): """Set custom user configuration variables""" custom_vars = args.get(opt_key) if not custom_vars: return for var in custom_vars: # parse key-value pair var = var.strip() eq_pos = var.find('=') if eq_pos < 1 or eq_pos > len(var) - 2: raise RuntimeError('Expected "key=value" for --conf-var command-line argument; got "{}"'.format(var)) key, value = var[:eq_pos].strip(), var[eq_pos + 1:].strip() # interpret type of value try: settings[key] = ast.literal_eval(value) except: settings[key] = value
def validate_custom_val(self, val): """ Pass in a desired custom value and ensure it is valid. Probaly should check type, etc, but let's assume fine for the moment. """ self.ensure_one() if self.custom_type in ('int', 'float'): minv = self.min_val maxv = self.max_val val = literal_eval(val) if minv and maxv and (val < minv or val > maxv): raise ValidationError( _("Selected custom value '%s' must be between %s and %s" % (self.name, self.min_val, self.max_val)) ) elif minv and val < minv: raise ValidationError( _("Selected custom value '%s' must be at least %s" % (self.name, self.min_val)) ) elif maxv and val > maxv: raise ValidationError( _("Selected custom value '%s' must be lower than %s" % (self.name, self.max_val + 1)) )
def _parse_line(self, line): line = line.decode("latin1") line = line.rstrip() l = len(line) line = line.lstrip() if not line or line.startswith("#"): return None indent = (l - len(line)) // 2 parts = line.split("\t") command = parts[0] args = {} for i in xrange(1, len(parts)): n, v = parts[i].split("=") args[n] = literal_eval(v) return (indent, command, args)
def create_and_save_model(self): model_params = self.job_args['model']['params'] params = base64.b64decode(model_params) list_params = literal_eval(params) dataset_format = self.job_args.get('dataset_format') if dataset_format == 'libsvm': self.model = self.controller.create_model_libsvm(self.data, list_params) elif dataset_format == 'text': self.model = self.controller.create_model_text(self.data, list_params) else: self.model = self.controller.create_model(self.data, list_params) if self.model: self.model.save(self.context, self.modelpath)
def extractJson(message: str) -> Optional[Mapping]: """ Returns the first json blob found in the string if any are found """ # First pass that relies on it being in a code block for match in re.findall('\`\s*?{[\s\S]*?}\s*?\`', message): potJson = match[1:-1].strip() try: return json.loads(potJson) except ValueError: pass # Second pass doesn't require the code block, but it still uses the json parser for match in re.findall('{[\s\S]*}', message): try: return json.loads(match) except ValueError: pass # Third pass uses ast.literal_eval (which IS safe-it only evals literals) and some replacements to handle malformed # JSON. This is a horrible JSON parser and will incorrectly parse certain types of JSON, but it is far more # accepting so we might as well try doing this for match in re.findall('{[\s\S]*}', message): try: return fuzzyJsonParse(match) except (SyntaxError, ValueError): pass return None
def __getCheatsheets(self): server = self.serverManager.getDownloadServer(self.localServer) url = server.url if not url[-1] == '/': url = url + '/' url = url + self.jsonServerLocation data = requests.get(url).text data = ast.literal_eval(data) cheatsheets = [] icon = self.__getIconWithName('cheatsheet') for k,d in data['cheatsheets'].items(): c = Cheatsheet() c.name = d['name'] c.aliases = d['aliases'] c.globalversion = data['global_version'] c.version = d['version'] c.image = icon c.onlineid = k c.status = 'online' cheatsheets.append(c) return sorted(cheatsheets, key=lambda x: x.name.lower())
def test_view_with_args(self): """Dedicated test to ensure that views with args work """ view_name = 'thumber_tests:args_example' args = ('foobar',) path = reverse(view_name, args=args) http_referer = 'http://example.com{0}'.format(path) response = self.client.get(path, follow=True) self.assertContains(response, 'Example Template!', status_code=200) self.assertContains(response, 'Was this service useful?') # Post with thumber_token=ajax for a JSON response data = {'satisfied': 'True', 'thumber_token': 'ajax'} response = self.client.post(path, data, HTTP_REFERER=http_referer) # Check a Feedback model was created self.assertEquals(Feedback.objects.count(), 1) feedback = Feedback.objects.all()[0] view_args = ast.literal_eval(feedback.view_args) self.assertEquals(view_args[0], args) self.assertEquals(view_args[1], {})
def test_view_with_kwargs(self): """Dedicated test to ensure that views with kwargs work, and the kwargs get stored in the model """ view_name = 'thumber_tests:kwargs_example' kwargs = {'slug': 'foobar'} path = reverse(view_name, kwargs=kwargs) http_referer = 'http://example.com{0}'.format(path) response = self.client.get(path, follow=True) self.assertContains(response, 'Example Template!', status_code=200) self.assertContains(response, 'Was this service useful?') # Post with thumber_token=ajax for a JSON response data = {'satisfied': 'True', 'thumber_token': 'ajax'} response = self.client.post(path, data, HTTP_REFERER=http_referer) # Check a Feedback model was created self.assertEquals(Feedback.objects.count(), 1) feedback = Feedback.objects.all()[0] view_args = ast.literal_eval(feedback.view_args) self.assertEquals(view_args[1], kwargs) self.assertEquals(view_args[0], ())
def ner(request): """This functionality calls the run_ner() functionality to tag the message . It is called through api call Attributes: request: url parameters """ message = request.GET.get('message') entities_data = request.GET.get('entities', []) entities = [] if entities_data: entities = ast.literal_eval(entities_data) ner_logger.debug('Start: %s -- %s' % (message, entities)) output = run_ner(entities=entities, message=message) ner_logger.debug('Finished %s : %s ' % (message, output)) return HttpResponse(json.dumps({'data': output}), content_type='application/json')
def __creatBlocks(self): """ Second part of parsing. Find blocks and creat a list. """ w = list(zip(self.lines, self.indentationList)) self.blocks, indentation, level = "[", 0, 0 for i in w: if i[1] > indentation: level = level + 1 self.blocks += ",[" + '"' + urllib.parse.quote_plus(i[0]) + '"' elif i[1] == 0: if len(self.blocks) > 1: self.blocks += "]" * (level) + ',' self.blocks += '"' + urllib.parse.quote_plus(i[0]) + '"' level = 0 elif i[1] < indentation: if w.index(i) != len(w): self.blocks += "]" + "," + '"' + \ urllib.parse.quote_plus(i[0]) + '"' level += -1 elif i[1] == indentation: self.blocks += "," + '"' + urllib.parse.quote_plus(i[0]) + '"' indentation = i[1] self.blocks += "]" * (level + 1) self.blocks = ast.literal_eval(self.blocks)
def working_directory(directory): owd = os.getcwd() try: os.chdir(directory) yield directory finally: os.chdir(owd) # def extractXML(project_dir, apk_location): # """ # Parses AndroidManifest file and returns a dictionary object # :param project_dir: Project Location # :param apk_location: Apk location # :return: Parsed AndroidManifest Dictionary # """ # with working_directory(project_dir): # subprocess.check_output(["./gradlew", "assembleRelease"]) # with working_directory("/tmp"): # subprocess.call(["apktool", "d", apk_location]) # with working_directory("/tmp" + "/app-release/"): # with open("AndroidManifest.xml") as fd: # obj_file = xmltodict.parse(fd.read()) # return ast.literal_eval(json.dumps(obj_file))
def extractXML(apk_location,config_location): """ @param project_dir: @param apk_location: @return: """ with working_directory("/tmp"): subprocess.call(["apktool", "d", apk_location]) config = ConfigParser.ConfigParser() config.read(config_location) app_name = "app-external-release" temp = config.get("APP_NAME","app_flavor_name") if temp != None: app_name = temp with working_directory("/tmp/" + app_name): with open("AndroidManifest.xml") as fd: obj_file = xmltodict.parse(fd.read()) return ast.literal_eval(json.dumps(obj_file))
def main(): parser = optparse.OptionParser() build_utils.AddDepfileOption(parser) parser.add_option('--inputs', help='List of files to archive.') parser.add_option('--output', help='Path to output archive.') parser.add_option('--base-dir', help='If provided, the paths in the archive will be ' 'relative to this directory', default='.') options, _ = parser.parse_args() inputs = ast.literal_eval(options.inputs) output = options.output base_dir = options.base_dir build_utils.DoZip(inputs, output, base_dir) if options.depfile: build_utils.WriteDepfile(options.depfile, output)
def convert_string_to_datatype(self): """Converts an input string to a python datatype """ err_msg = "User input argument value {0} does"\ "not match python syntax for '{1}'".format(self.arg_value, self.datatype) info_msg = "Warrior FW will handle user input argument value as string (default)" try: result = ast.literal_eval(self.arg_value) except Exception: print '\n' print_error(err_msg) print_info(info_msg) print '\n' print_error('unexpected error: {0}'.format(traceback.format_exc())) result = self.arg_value else: if not isinstance(result, self.datatype): print '\n' print_error(err_msg) print_info(info_msg) print '\n' result = self.arg_value return result
def main(unused_argv=None): # Load image image = np.expand_dims(image_utils.load_np_image( os.path.expanduser(FLAGS.input_image)), 0) output_dir = os.path.expanduser(FLAGS.output_dir) if not os.path.exists(output_dir): os.makedirs(output_dir) which_styles = ast.literal_eval(FLAGS.which_styles) if isinstance(which_styles, list): _multiple_images(image, which_styles, output_dir) elif isinstance(which_styles, dict): _multiple_styles(image, which_styles, output_dir) else: raise ValueError('--which_styles must be either a list of style indexes ' 'or a dictionary mapping style indexes to weights.')
def convert_param_dict_for_use(self, setting_dict): """When loading rows from a saved results df in csv format, some of the settings may end up being converted to a string representation and need to be converted back to actual numbers and objects. May need to be overwritten in child class.""" if 'architecture' in setting_dict.keys(): if type(setting_dict['architecture']) == str: setting_dict['architecture'] = ast.literal_eval(setting_dict['architecture']) if 'optimizer' in setting_dict.keys(): if 'GradientDescent' in setting_dict['optimizer']: setting_dict['optimizer'] = tf.train.GradientDescentOptimizer elif 'Adagrad' in setting_dict['optimizer']: setting_dict['optimizer'] = tf.train.AdagradOptimizer else: setting_dict['optimizer'] = tf.train.AdamOptimizer if 'batch_size' in setting_dict.keys(): setting_dict['batch_size'] = int(setting_dict['batch_size']) print "batch size just got changed in convert_param_dict. It's an", type(setting_dict['batch_size']) return setting_dict
def read(self,outname): if ebf.containsKey(outname,'/names0'): x=ebf.read(outname,'/names0') self.names0=[str(temp) for temp in x] self.chain=ebf.read(outname,'/chain/') else: self.names0=[] if ebf.containsKey(outname,'/names1'): x=ebf.read(outname,'/names1') self.names1=[str(temp) for temp in x] self.mu=ebf.read(outname,'/mu/') self.sigma=ebf.read(outname,'/sigma/') else: self.names1=[] self.descr=ast.literal_eval(ebf.read(outname,'/descr')[0])
def vfam_to_krona(self, vfam_file): vfam_dic = defaultdict(int) families_dic = {} genera_dic = {} with open(vfam_file, 'r') as vfam_file: vfam_file.readline() # get rid of the header for line in vfam_file: splitted_line = line.split('\t') vfam = splitted_line[1] vfam_dic[vfam] += 1 families = ast.literal_eval(splitted_line[3]) # safe eval of dict families_dic[vfam] = families genera = ast.literal_eval(splitted_line[4]) genera_dic[vfam] = genera with open(self.krona_in, 'w') as o: for vfam, n_reads in vfam_dic.items(): fam_total = sum(families_dic[vfam].values()) for fam, fam_prop in families_dic[vfam].items(): gen_total = sum(genera_dic[vfam].values()) for genera, gen_prop in genera_dic[vfam].items(): n = (n_reads * (fam_prop / fam_total)) * (gen_prop / gen_total) o.write('%.3f\t%s\t%s\t%s\n' % (n, fam, vfam, genera))
def show_connections(): """Get list of network connections""" connections = subprocess.check_output( "/usr/share/harbour-infraview/helper/infraview-helper", shell=True) dlist = [] netstat_keys = [ "udp_tcp", "ConnID", "UID", "localhost", "localport", "remotehost", "remoteport", "conn_state", "pid", "exe_name"] for line in connections.splitlines(): x = ast.literal_eval(line.decode("utf-8")) key_value = zip(netstat_keys, x) key_value_dict = dict(key_value) dlist.append(dict(key_value_dict)) return dlist
def getAdmin(ID=logChannelID): raw = ast.literal_eval(str(await bot.api_call("getChatAdministrators",chat_id=ID))) i=0 adminDict = [] while i < len(raw['result']): if 'last_name' in raw['result'][i]['user']: adminDict.append({ 'id':raw['result'][i]['user']['id'], 'username':raw['result'][i]['user']['username'], 'first_name':raw['result'][i]['user']['first_name'], 'last_name':raw['result'][i]['user']['last_name']}) else: adminDict.append({ 'id':raw['result'][i]['user']['id'], 'username':raw['result'][i]['user']['username'], 'first_name':raw['result'][i]['user']['first_name'], 'last_name':''}) i += 1 return adminDict
def read_files_worker(self, directory, queue): """ Read all files in a directory and output to the queue. First line of every file should contain the index. Worker separates first line and parses to dict. Tuple of index and text is added to queue. :directory: Source directory containing files :queue: Queue to add the tuples to """ for file in os.scandir(directory): if file.is_file(): with open(file.path, 'r', errors='replace') as f: text = f.readlines() try: index = literal_eval(text.pop(0).strip()) queue.put((index, '\n'.join(text)), block=True) except IndexError: LOGGER.error('File {0} is not classifyable' .format(file.path)) LOGGER.info('File reading worker done.')
def cfg_from_list(cfg_list): """Set config keys via list (e.g., from command line).""" from ast import literal_eval # assert len(cfg_list) % 2 == 0 for k, v in zip(cfg_list[0::2], cfg_list[1::2]): key_list = k.split('.') d = __C for subkey in key_list[:-1]: assert subkey in d d = d[subkey] subkey = key_list[-1] assert subkey in d try: value = literal_eval(v) except: # handle the case when v is a string literal value = v assert type(value) == type(d[subkey]), \ 'type {} does not match original type {}'.format( type(value), type(d[subkey])) d[subkey] = value
def package_meta(): """Read __init__.py for global package metadata. Do this without importing the package. """ _version_re = re.compile(r'__version__\s+=\s+(.*)') _url_re = re.compile(r'__url__\s+=\s+(.*)') _license_re = re.compile(r'__license__\s+=\s+(.*)') with open('jetstream/__init__.py', 'rb') as ffinit: initcontent = ffinit.read() version = str(ast.literal_eval(_version_re.search( initcontent.decode('utf-8')).group(1))) url = str(ast.literal_eval(_url_re.search( initcontent.decode('utf-8')).group(1))) licencia = str(ast.literal_eval(_license_re.search( initcontent.decode('utf-8')).group(1))) return { 'version': version, 'license': licencia, 'url': url, }
def api(request): hostname = None json_data = None hash_id = None data = "this url receives get requests for updating the gpu info to the dashboard" context = {'data': data} if request.GET: # for testing purposes only # if request.GET['test_data']: # Test.objects.update_or_create(request.GET) # break if request.GET['hash']: hash_id = request.GET['hash'] if request.GET['url_style']: json_data = ast.literal_eval(request.GET['url_style']) if request.GET['hostname']: hostname = request.GET['hostname'] json_data['host'] = hostname Miner_Info.objects.filter(host=json_data['host']).update_or_create(json_data) return render(request, 'home/api.html', context)
def get_map(chat, **kwargs): redis = kwargs.get('redis') args = kwargs.get('info').get('args') active_dungeon = await redis.hget(kwargs.get('info').get('username'), 'active_dungeon') if len(args) == 2: name, num = args if name in Dungeon.ACRONYMS: active_dungeon = Dungeon.ACRONYMS.get(name) else: return chat.reply(f"Errore!\nLa sigla dungeon che mi ha mandato non esiste!\n" f"Opzioni valide: {', '.join(Dungeon.ACRONYMS.keys())}") if is_number(num): active_dungeon += ' ' + num else: return chat.reply(f"Errore!\n{num} non è un numero!") elif not active_dungeon: return await chat.reply(ErrorReply.NO_ACTIVE_DUNGEONS) map_string = await redis.get(f'map:{active_dungeon}') if not map_string: return await chat.reply('La mappa del dungeon che hai richiesto non esiste!') dungeon_map = literal_eval(map_string)[:5] printable_map = active_dungeon + '\n\n' + ''.join([ Dungeon.stringify_room(i, *level, kwargs.get('info').get('emojis')) for i, level in enumerate(dungeon_map, 1)]) markup = Dungeon.map_directions(active_dungeon, 0, 5, kwargs.get('info').get('username')) return await chat.send_text(printable_map, reply_markup=markup, parse_mode='Markdown')
def next_room(chat, **kwargs): redis = kwargs.get('redis') active_dungeon = kwargs.get('active_dungeon') info = kwargs.get('info') sender = info.get('username') arg = info.get('args') try: position = int(await redis.hget(sender, 'position')) + 1 if not arg else int(arg[0]) except ValueError: return chat.reply("Errore!\n L'argomento del comando deve essere un numero!") if position > Dungeon.length(active_dungeon): return await chat.reply('Errore!\n La stanza richiesta è maggiore ') dungeon_map = literal_eval(await redis.get(f"map:{active_dungeon}")) await redis.hset(sender, 'position', position) return await chat.reply(Dungeon.stringify_room( position, *dungeon_map[position-1], info.get('emojis')), parse_mode='Markdown')
def stats_choice_phase2(chat, **kwargs): dungeon, num = kwargs.get('match').group(1).split(':') redis = kwargs.get('redis') dungeon_map = literal_eval(await redis.get(f'map:{dungeon} {num}')) counter = defaultdict(int) for level in dungeon_map: for room in level: counter[room] += 1 tot_rooms = len(dungeon_map) * 3 # dungeon_deadline = await redis.hget('dungeon_deadlines', f'{dungeon} {num}') percent_completed = round(((tot_rooms - (counter.get('') or 0)) / tot_rooms) * 100, 2) reply = f"{dungeon} {num}\nPercentuale completamento {percent_completed}%\nMonete: {counter.get('monete') or 0}\n" \ f"Spade: {counter.get('spada') or 0}\nAsce: {counter.get('ascia') or 0}\n" \ f"Aiuta: {counter.get('aiuta') or 0}\nMattonelle: {counter.get('mattonella') or 0}\n" \ f"Stanze vuote: {counter.get('stanza vuota') or 0}\n" \ f"Fontana: {counter.get('fontana') or 0}\nIncisioni: {counter.get('incisioni') or 0}" await chat.send_text(reply)
def read_settings(self): ''' Reads the settings from the digital_ocean.ini file ''' config = ConfigParser.SafeConfigParser() config.read(os.path.dirname(os.path.realpath(__file__)) + '/digital_ocean.ini') # Credentials if config.has_option('digital_ocean', 'api_token'): self.api_token = config.get('digital_ocean', 'api_token') # Cache related if config.has_option('digital_ocean', 'cache_path'): self.cache_path = config.get('digital_ocean', 'cache_path') if config.has_option('digital_ocean', 'cache_max_age'): self.cache_max_age = config.getint('digital_ocean', 'cache_max_age') # Private IP Address if config.has_option('digital_ocean', 'use_private_network'): self.use_private_network = config.get('digital_ocean', 'use_private_network') # Group variables if config.has_option('digital_ocean', 'group_variables'): self.group_variables = ast.literal_eval(config.get('digital_ocean', 'group_variables'))
def _pop_token(self, lineno: int, token_value: str) -> Token: tokensline = self._lines[lineno - 1] # Pop the first token with the same name in the same line for t in tokensline: if t.name != 'STRING': line_value = t.value else: if t.value[0] == 'f' and t.value[1] in ('"', "'"): # fstring: token identify as STRING but they parse into the AST as a # collection of nodes so the token_value is different. To find the # real token position we'll search inside the fstring token value. tok_subpos = t.value.find(token_value) if tok_subpos != -1: # We don't remove the fstring token from the line in this case; other # nodes could match different parts of it newtok = deepcopy(t) newtok.start.col = t.start.col + tok_subpos return newtok raise TokenNotFoundException("Could not find token '{}' inside f-string '{}'" .format(token_value, t.value)) else: # normal string; they include the single or double quotes so we liteval line_value = literal_eval(t.value) if str(line_value) == str(token_value): tokensline.remove(t) return t raise TokenNotFoundException("Token named '{}' not found in line {}" .format(token_value, lineno))
def read_piksi_settings_info(self): self.piksi_settings_info = tree() settings_info = yaml.load(open(os.path.join(PKG_PATH, 'piksi_settings.yaml'), 'r')) for s in settings_info: if s['type'].lower() == 'boolean': s['parser'] = lambda x: x.lower()=='true' elif s['type'].lower() in ('float', 'double','int'): s['parser'] = ast.literal_eval elif s['type'] == 'enum': s['parser'] = s['enum'].index else: s['parser'] = lambda x: x self.piksi_settings_info[s['group']][s['name']] = s