我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用re.split()。
def unquote_unreserved(uri): """Un-escape any percent-escape sequences in a URI that are unreserved characters. This leaves all reserved, illegal and non-ASCII bytes encoded. :rtype: str """ parts = uri.split('%') for i in range(1, len(parts)): h = parts[i][0:2] if len(h) == 2 and h.isalnum(): try: c = chr(int(h, 16)) except ValueError: raise InvalidURL("Invalid percent-escape sequence: '%s'" % h) if c in UNRESERVED_SET: parts[i] = c + parts[i][2:] else: parts[i] = '%' + parts[i] else: parts[i] = '%' + parts[i] return ''.join(parts)
def get_language(self, locale): ''' Convert a locale tag into a preloaded language ''' split_tag = re.split("[_\\.\\-]", locale) # try with language and country language = None if len(split_tag) >= 2: language = self._languages.get(split_tag[0] + "-" + split_tag[1], None) if language is None and len(split_tag) >= 1: language = self._languages.get(split_tag[0], None) # fallback to english if the language is not recognized or # locale was not specified if language: return language else: return language['en']
def is_valid_cidr(string_network): """ Very simple check of the cidr format in no_proxy variable. :rtype: bool """ if string_network.count('/') == 1: try: mask = int(string_network.split('/')[1]) except ValueError: return False if mask < 1 or mask > 32: return False try: socket.inet_aton(string_network.split('/')[0]) except socket.error: return False else: return False return True
def attempt_special(self, cmd_str): # Special command handling fields = cmd_str.split(" ") SKIP = False command_set = [] if cmd_str[0] == '@' or cmd_str[0] == '%': special_cmd = fields[0][1:] if special_cmd == "script": command_set = self.parse_script(fields[1], *fields[2:]) elif special_cmd in special_commands: try: special_commands[special_cmd](*fields[1:]) SKIP = True except TypeError as e: print e print("Likely incorrect usage of '%s'" % special_cmd) return SKIP, command_set
def super_handle(self, command_set): handled = None for command_i, command in enumerate(command_set): to_capture = [] try: to_capture = self.uncaptured_variable_blocks[command_i] except: pass handled = self.handle_command(command.split(" "), to_capture, self.variables, self.meta) if handled: if "captured" in handled: self.variables.update(handled["captured"]) print("") ##################################### #TODO return aggregate message for scripts instead of last message return handled
def get_proc_etime(self,pid): fmt = subprocess.getoutput("ps -A -opid,etime | grep '^ *%d ' | awk '{print $NF}'" % pid).strip() if fmt == '': return -1 parts = fmt.split('-') days = int(parts[0]) if len(parts) == 2 else 0 fmt = parts[-1] parts = fmt.split(':') hours = int(parts[0]) if len(parts) == 3 else 0 parts = parts[len(parts)-2:] minutes = int(parts[0]) seconds = int(parts[1]) return ((days * 24 + hours) * 60 + minutes) * 60 + seconds # compute the billing val this running hour # if isreal is True, it will also make users' beans decrease to pay for the bill. # return the billing value in this running hour
def build_sam_tags(flds): """ Given a list of fields from a SAM file (all fields, including the first 11 fixed fields), returns a dictionary with the SAM tags (e.g. 'MD', 'NM'). Tags with type 'i' are converted to integers. Tags with type 'f' are converted to floats. Example: tags = build_sam_tags( ["NM:i:0","MD:Z:77"]) => tags = { 'NM':0, 'MD':'77' } """ # Split tags into tuples of (name,type,value) # e.g. ["NM:i:0","MD:Z:77"] => [('NM', 'i', '0'), ('MD', 'Z', '77')] in_tags = [ tuple(x.split(':')) for x in flds[11:]] out_tags = {} for n,t,v in in_tags: if t=="i": v = int(v) elif t=='f': v = float(v) out_tags[n]=v return out_tags
def slice_arg(s): """ Parse a string that describes a slice with start and end. >>> slice_arg('2:-3') slice(2, -3, None) >> slice_arg(':-3') slice(None, -3, None) >> slice_arg('2:') slice(2, None, None) """ start, end = s.split(':') start = None if start == '' else int(start) end = None if end == '' else int(end) return slice(start, end)
def __init__(self, fullpath): fn = os.path.split(fullpath)[-1] dot_parts = fn.split(".") if dot_parts[-1] == "fastq": name = dot_parts[-2] elif len(dot_parts) > 2 and dot_parts[-2] == "fastq": name = dot_parts[-3] else: raise NameError("%s is not a fastq file" % fullpath) all_flds = name.split("_") flds = all_flds[-4:] self.prefix = "_".join(all_flds[:-4]) self.s = flds[0][1:] self.lane = int(flds[1][2:]) self.read = flds[2] self.group = int(flds[3]) self.filename = fullpath
def get_run_data(fn): """ Parse flowcell + lane from the first FASTQ record. NOTE: we don't check whether there are multiple FC / lanes in this file. NOTE: taken from longranger/mro/stages/reads/setup_chunks """ if fn[-2:] == 'gz': reader = gzip.open(fn) else: reader = open(fn, 'r') gen = read_generator_fastq(reader) try: (name, seq, qual) = gen.next() (flowcell, lane) = re.split(':', name)[2:4] return (flowcell, lane) except StopIteration: # empty fastq raise ValueError('Could not extract flowcell and lane from FASTQ file. File is empty: %s' % fn)
def set_data_field(record, field_name, field_val): assert(len(record.samples) == 1) new_format = record.FORMAT new_fields = new_format.split(':') if not(field_name in new_fields): new_fields = new_fields + [field_name] new_format = ':'.join(new_fields) sample_call = get_record_sample_call(record) data = sample_call.data data_dict = data._asdict() data_dict[field_name] = field_val new_sample_vals = [] for field in new_fields: new_sample_vals.append(data_dict[field]) # Note - the old way of passing the fields to pyVCF is memory intensive # because a fresh type is allocated for each call to make_calldata_tuple #data_instantiator = vcf.model.make_calldata_tuple(new_fields) #data = data_instantiator(*new_sample_vals) data = FakeNamedTuple(new_fields, new_sample_vals) sample_call.data = data record.samples[0] = sample_call record.FORMAT = new_format
def get_locus_info(locus): """ Returns chrom, start and stop from locus string. Enforces standardization of how locus is represented. chrom:start_stop (start and stop should be ints or 'None') """ chrom, start_stop = locus.split(':') if chrom == 'None': chrom = None start, stop = re.split("\.\.|-", start_stop) if start == 'None': start = None else: start = int(float(start)) if stop == 'None': stop = None else: stop = int(float(stop)) return (str(chrom), start, stop)
def get_target_regions_dict(targets_file): """ Gets the target regions from a targets file as a chrom-indexed dictionary, with every entry given as a list of (start, end) tuples """ targets = {} for line in targets_file: info = line.strip().split('\t') if line.startswith('browser') or line.startswith('track') or line.startswith('-browser') or line.startswith('-track') or line.startswith('#'): continue if len(line.strip()) == 0: continue chrom = info[0] start = int(info[1]) end = int(info[2]) chrom_targs = targets.setdefault(chrom, []) chrom_targs.append((start, end)) return targets
def read_reference(self): profiles = {} reference = {} reference_file = open(self.path + "Reference.txt", "rU") bin_size = float("nan") for line in reference_file: line = line.strip("\n").strip("\r") (key, value) = line.split("\t") if key == "BinSize": bin_size = int(value) else: reference[key] = float(value) # if BinSize else # for line reference_file.close() profiles["BinSize"] = bin_size profiles["Reference"] = reference self.profiles = profiles return profiles # read_reference #...........................................................................
def debug_print(self, msg): """Print 'msg' to stdout if the global DEBUG (taken from the DISTUTILS_DEBUG environment variable) flag is true. """ from distutils.debug import DEBUG if DEBUG: print msg sys.stdout.flush() # -- Option validation methods ------------------------------------- # (these are very handy in writing the 'finalize_options()' method) # # NB. the general philosophy here is to ensure that a particular option # value meets certain type and value constraints. If not, we try to # force it into conformance (eg. if we expect a list but have a string, # split the string on comma and/or whitespace). If we can't force the # option into conformance, raise DistutilsOptionError. Thus, command # classes need do nothing more than (eg.) # self.ensure_string_list('foo') # and they can be guaranteed that thereafter, self.foo will be # a list of strings.
def push(self, data): """Push some new data into this object.""" # Handle any previous leftovers data, self._partial = self._partial + data, '' # Crack into lines, but preserve the newlines on the end of each parts = NLCRE_crack.split(data) # The *ahem* interesting behaviour of re.split when supplied grouping # parentheses is that the last element of the resulting list is the # data after the final RE. In the case of a NL/CR terminated string, # this is the empty string. self._partial = parts.pop() #GAN 29Mar09 bugs 1555570, 1721862 Confusion at 8K boundary ending with \r: # is there a \n to follow later? if not self._partial and parts and parts[-1].endswith('\r'): self._partial = parts.pop(-2)+parts.pop() # parts is a list of strings, alternating between the line contents # and the eol character(s). Gather up a list of lines after # re-attaching the newlines. lines = [] for i in range(len(parts) // 2): lines.append(parts[i*2] + parts[i*2+1]) self.pushlines(lines)
def which(program): # type: (str) -> Optional[str] program = exename(program) fpath, _ = os.path.split(program) if fpath: if is_executable(program): return program else: for path in [os.path.abspath(os.curdir)] + os.environ['PATH'].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(unifilename(path), unifilename(program)) if is_executable(exe_file): return exe_file return None # ----------------------------------------------------------------------
def reformat(self, sourcefile, destfile, configfile): # type: (str, str, str) -> None formatstyle = style_make() with open(configfile) as fp: for line in fp.readlines(): line = line.rstrip() if line.startswith('#'): continue parts = line.split('=') if len(parts) == 2: optionname, value = parts set_option(formatstyle, optionname, value) sourcedata = readbinary(sourcefile) data = self.formatcode(formatstyle, sourcedata, filename=sourcefile) if data is None: data = b'' writebinary(destfile, data) # ----------------------------------------------------------------------
def reformat(self, sourcefile, destfile, configfile): # type: (str, str, str) -> None formatstyle = style_make() with open(configfile) as fp: for line in fp.readlines(): line = line.rstrip() if line.startswith('#'): continue parts = re.split(r'\s+=\s+', line) if len(parts) == 2: optionname, value = parts set_option(formatstyle, optionname, value) sourcedata = readbinary(sourcefile) data = self.formatcode(formatstyle, sourcedata, filename=sourcefile) if data is None: data = b'' writebinary(destfile, data) # ---------------------------------------------------------------------- # Functions for the in-memory cache
def unpack_exeresult(buf): # type: (bytes) -> Tuple[int, bytes, bytes] def unpack_error(): # type: () -> None raise ValueError('invalid buffer in unpack_exeresult') buf = binary_type(buf) pos = buf.find(b'|') if pos < 0: unpack_error() lengths, data = buf[:pos], buf[pos + 1:] try: numvalues = [int(s) for s in lengths.split()] except ValueError: numvalues = [] if len(numvalues) != 3: unpack_error() returncode, outlen, errlen = numvalues if outlen + errlen != len(data): unpack_error() return returncode, data[:outlen], data[outlen:outlen + errlen] # ----------------------------------------------------------------------
def formatters_for_files(filenames): # type: (List[str]) -> List[str] """Returns a list of formatter names that support every extension of these filenames. """ exts = set() # type: Set[str] for f in filenames: root, ext = os.path.splitext(f) ext = ext.lower() if not ext and root.startswith('.'): # Recognize extension-only filenames as well. ext = root.lower() exts.add(ext) supported = [] for fmt, fmtexts in SUPPORTED_EXTS: fmt_exts = set(fmtexts.split()) # type: Set[str] if not exts or exts.issubset(fmt_exts): supported.append(fmt) return supported
def get_release_quality(release_name): if release_name is None: return try: release_name = release_name.encode('utf-8') except: pass try: release_name = release_name.upper() fmt = re.sub('(.+)(\.|\(|\[|\s)(\d{4}|S\d*E\d*|S\d*)(\.|\)|\]|\s)', '', release_name) fmt = re.split('\.|\(|\)|\[|\]|\s|-', fmt) fmt = [i.lower() for i in fmt] if '1080p' in fmt: quality = '1080p' elif '720p' in fmt: quality = 'HD' else: quality = 'SD' if any(i in ['dvdscr', 'r5', 'r6'] for i in fmt): quality = 'SCR' elif any(i in ['camrip', 'tsrip', 'hdcam', 'hdts', 'dvdcam', 'dvdts', 'cam', 'telesync', 'ts'] for i in fmt): quality = 'CAM' info = [] if '3d' in fmt or '.3D.' in release_name: info.append('3D') if any(i in ['hevc', 'h265', 'x265'] for i in fmt): info.append('HEVC') return quality, info except: return 'SD', []
def __init__(self, xmap_file, r_cmap_file, q_cmap_file, confidence_score, reference): self.xmap = xmap_file self.rcmap = r_cmap_file self.qcmap = q_cmap_file self.confidence_score = confidence_score self.ref = reference self.name=xmap_file.rsplit('.',1)[0].split('/')[-1] self.XmapTable = None self.filtered_XmapTable = None self.RcmapTable = None self.QcmapTable = None self.ref_id = None self.ref_inf = None self.cmap = None self.unqualified = None self.qualified = None self.mapped = None self.unmapped = None self.BN = None self.detail = None self.no_data = None self.kicked = None
def pathtype(cls, path=None): path = os.path.abspath(path or getcwd()) depth = 0 while cd(path): tpath = path path = Repo.findparent(path) if path: depth += 1 path = os.path.split(path)[0] if tpath == path: # Reached root. break else: break return "directory" if depth == 0 else ("program" if depth == 1 else "library")
def __init__(self, path=None, print_warning=False): path = os.path.abspath(path or getcwd()) self.path = path self.is_cwd = True while cd(path): tpath = path if os.path.isfile(os.path.join(path, Cfg.file)): self.path = path self.is_cwd = False break path = os.path.split(path)[0] if tpath == path: # Reached root. break self.name = os.path.basename(self.path) self.is_classic = os.path.isfile(os.path.join(self.path, 'mbed.bld')) # is_cwd flag indicates that current dir is assumed to be root, not root repo if self.is_cwd and print_warning: warning( "Could not find mbed program in current path \"%s\".\n" "You can fix this by calling \"mbed new .\" in the root of your program." % self.path)
def assemble(instrs): res = [] for inst in instrs: m = instre.match(inst) if not m or not m.group(1) in aCode_map: continue opcode, parmfmt = aCode_map[m.group(1)] res.append(struct.pack("B", opcode)) if m.group(2): if parmfmt == 0: continue parms = [int(x) for x in re.split(",\s*", m.group(2))] if parmfmt == -1: l = len(parms) res.append(struct.pack(("%dB" % (l+1)), l, *parms)) else: res.append(struct.pack(parmfmt, *parms)) return b"".join(res)
def fromXML(self, name, attrs, content, ttFont, version=2.0): if name == 'linearClasses': for element in content: if not isinstance(element, tuple): continue tag, attrs, subcontent = element if tag == 'linear': l = content_string(subcontent).split() self.linear.append(l) elif name == 'nonLinearClasses': for element in content: if not isinstance(element, tuple): continue tag, attrs, subcontent = element if tag =='nonLinear': l = {} for e in subcontent: if not isinstance(e, tuple): continue tag, attrs, subsubcontent = e if tag == 'map': l[attrs['glyph']] = int(safeEval(attrs['index'])) self.nonLinear.append(l)
def collect_moves(self, reader, name): Moves = namedtuple('Moves', ['pokemon', 'gen', 'color', 'moves', 'versions']) if name.split('-')[-1].isdigit(): for row in reader: if name == row[0]: pokemon = name.split('-')[0].title() generation, color = switcher[row[1]], int(ast.literal_eval(row[2])) moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4]) return Moves(pokemon, generation, color, moves, versions) else: for row in reader: if name in row[0]: pokemon = name.title() generation, color = switcher[row[1]], int(ast.literal_eval(row[2])) moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4]) return Moves(pokemon, generation, color, moves, versions)
def clean_recipient_numbers(self): cleaned_numbers = [] error_numbers = [] # 0?? ???? ?? 9? ?? 10?? ???? ?? p = re.compile(r'^0\d{9}\d?$') number_string = self.cleaned_data['recipient_numbers'] print(number_string) # ???? ?? '-'??? ''(? ???)? ????? sub_string = re.sub(r'\s|-', '', number_string) print(sub_string) # , ?? .? ???? ???? ??? ???? ??? numbers? ?? numbers = re.split(r',|\.', sub_string) print(numbers) for number in numbers: if re.match(p, number): cleaned_numbers.append(number) else: error_numbers.append(number) if error_numbers: raise ValidationError('Invalid phone number format! {}'.format(', '.join(error_numbers))) return cleaned_numbers
def _custom(self, custom): """ Create commented files to let the admin know where it's safe to make custom changes. Mirror the default tree. Never overwrite. """ path_dir = os.path.dirname(custom) if not os.path.isdir(path_dir): _create_dirs(path_dir, self.pillar_dir) if not self.dryrun: if not os.path.isfile(custom): log.info("Writing {}".format(custom)) with open(custom, "w") as yml: custom_split = custom.split("stack") custom_for = "{}{}{}".format( custom_split[0], "stack/default", custom_split[1]) yml.write("# {}\n".format(custom)) yml.write("# Overwrites configuration in {}\n".format(custom_for)) _examples(custom, yml)
def _parse(line): """ Return globbed files constrained by optional slices or regexes. """ if " " in line: parts = re.split('\s+', line) files = sorted(glob.glob(parts[0])) for optional in parts[1:]: filter_type, value = optional.split('=') if filter_type == "re": regex = re.compile(value) files = [m.group(0) for l in files for m in [regex.search(l)] if m] elif filter_type == "slice": # pylint: disable=eval-used files = eval("files{}".format(value)) else: log.warning("keyword {} unsupported".format(filter_type)) else: files = glob.glob(line) return files
def _parse(self, line): """ Return globbed files constrained by optional slices or regexes. """ if " " in line: parts = re.split(r'\s+', line) files = sorted(glob.glob(parts[0])) for keyvalue in parts[1:]: key, value = keyvalue.split('=') if key == "re": regex = re.compile(value) files = [match.group(0) for _file in files for match in [regex.search(_file)] if match] elif key == "slice": # pylint: disable=eval-used files = eval("files{}".format(value)) else: log.warning("keyword {} unsupported".format(key)) else: files = glob.glob(line) return files
def prep_blob(self, blob): """Cleanup input.""" # remove empty lines if type(blob) == list: blob = [line for line in blob if line.strip() != ''] if len(blob) == 1: blob = blob[0].replace('\\n', '\n').split('\n') # Split by line if type(blob) == str or type(blob) == six.text_type: lines = blob.split('\n') elif type(blob) == list: if len(blob) == 1: lines = blob[0].split('\n') else: lines = [line.rstrip() for line in blob] else: message = "Unknown input format" log.debug("%s - '%s", message, blob) raise ParseException(message) return lines
def _save_edgelist_as_1_indexed(f_edgelist, f_target_edgelist, delimiter="\t"): """ Note that this function always saves with delimiter "\t" :param f_edgelist: :param f_target_edgelist: :param delimiter: :return: """ import re with open(f_target_edgelist, "w") as g: with open(f_edgelist, "r") as f: for line in f: line = line.replace('\r', '').replace('\n', '') edge = re.split(delimiter, line) try: g.write(str(int(edge[0]) + 1) + "\t" + str(int(edge[1]) + 1) + "\n") except ValueError as e: raise ValueError( "[ERROR] Please check if the delimiter for the edgelist file is wrong -- {}".format(e) )
def _setup_requirements(argument): from sqlalchemy.testing import config from sqlalchemy import testing if config.requirements is not None: return modname, clsname = argument.split(":") # importlib.import_module() only introduced in 2.7, a little # late mod = __import__(modname) for component in modname.split(".")[1:]: mod = getattr(mod, component) req_cls = getattr(mod, clsname) config.requirements = testing.requires = req_cls()
def _setup_requirements(argument): from alembic.testing import config if config.requirements is not None: return modname, clsname = argument.split(":") # importlib.import_module() only introduced in 2.7, a little # late mod = __import__(modname) for component in modname.split(".")[1:]: mod = getattr(mod, component) req_cls = getattr(mod, clsname) config.requirements = req_cls()
def push(self, data): """Push some new data into this object.""" # Handle any previous leftovers data, self._partial = self._partial + data, '' # Crack into lines, but preserve the newlines on the end of each parts = NLCRE_crack.split(data) # The *ahem* interesting behaviour of re.split when supplied grouping # parentheses is that the last element of the resulting list is the # data after the final RE. In the case of a NL/CR terminated string, # this is the empty string. self._partial = parts.pop() # parts is a list of strings, alternating between the line contents # and the eol character(s). Gather up a list of lines after # re-attaching the newlines. lines = [] for i in range(len(parts) // 2): lines.append(parts[i*2] + parts[i*2+1]) self.pushlines(lines)
def parse_tags(self, root): ans = [] exclude_tokens = {'kindle', 'a-z'} exclude = {'special features', 'by authors', 'authors & illustrators', 'books', 'new; used & rental textbooks'} seen = set() for a in root.xpath(self.tags_xpath): raw = (a.text or '').strip().replace(',', ';').replace('/', ';').replace('>', ';') lraw = icu_lower(raw) tokens = frozenset(lraw.split()) if raw and lraw not in exclude and not tokens.intersection(exclude_tokens) and lraw not in seen: ans.append(raw) seen.add(lraw) return ans
def unquote_unreserved(uri): """Un-escape any percent-escape sequences in a URI that are unreserved characters. This leaves all reserved, illegal and non-ASCII bytes encoded. """ parts = uri.split('%') for i in range(1, len(parts)): h = parts[i][0:2] if len(h) == 2 and h.isalnum(): try: c = chr(int(h, 16)) except ValueError: raise InvalidURL("Invalid percent-escape sequence: '%s'" % h) if c in UNRESERVED_SET: parts[i] = c + parts[i][2:] else: parts[i] = '%' + parts[i] else: parts[i] = '%' + parts[i] return ''.join(parts)
def is_valid_cidr(string_network): """Very simple check of the cidr format in no_proxy variable""" if string_network.count('/') == 1: try: mask = int(string_network.split('/')[1]) except ValueError: return False if mask < 1 or mask > 32: return False try: socket.inet_aton(string_network.split('/')[0]) except socket.error: return False else: return False return True
def fix_repeating_arguments(self): """Fix elements that should accumulate/increment values.""" either = [list(c.children) for c in self.either.children] for case in either: for e in [c for c in case if case.count(c) > 1]: if type(e) is Argument or type(e) is Option and e.argcount: if e.value is None: e.value = [] elif type(e.value) is not list: e.value = e.value.split() if type(e) is Command or type(e) is Option and e.argcount == 0: e.value = 0 return self
def parse(class_, option_description): short, long, argcount, value = None, None, 0, False options, _, description = option_description.strip().partition(' ') options = options.replace(',', ' ').replace('=', ' ') for s in options.split(): if s.startswith('--'): long = s elif s.startswith('-'): short = s else: argcount = 1 if argcount: matched = re.findall('\[default: (.*)\]', description, flags=re.I) value = matched[0] if matched else None return class_(short, long, argcount, value)
def __init__(self, source, error): self += source.split() if hasattr(source, 'split') else source self.error = error
def parse_defaults(doc): # in python < 2.7 you can't pass flags=re.MULTILINE split = re.split('\n *(<\S+?>|-\S+?)', doc)[1:] split = [s1 + s2 for s1, s2 in zip(split[::2], split[1::2])] options = [Option.parse(s) for s in split if s.startswith('-')] #arguments = [Argument.parse(s) for s in split if s.startswith('<')] #return options, arguments return options