Python re 模块,split() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用re.split()

项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def unquote_unreserved(uri):
    """Un-escape any percent-escape sequences in a URI that are unreserved
    characters. This leaves all reserved, illegal and non-ASCII bytes encoded.

    :rtype: str
    """
    parts = uri.split('%')
    for i in range(1, len(parts)):
        h = parts[i][0:2]
        if len(h) == 2 and h.isalnum():
            try:
                c = chr(int(h, 16))
            except ValueError:
                raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)

            if c in UNRESERVED_SET:
                parts[i] = c + parts[i][2:]
            else:
                parts[i] = '%' + parts[i]
        else:
            parts[i] = '%' + parts[i]
    return ''.join(parts)
项目:almond-nnparser    作者:Stanford-Mobisocial-IoT-Lab    | 项目源码 | 文件源码
def get_language(self, locale):
        '''
        Convert a locale tag into a preloaded language
        '''

        split_tag = re.split("[_\\.\\-]", locale)
        # try with language and country
        language = None
        if len(split_tag) >= 2:
            language = self._languages.get(split_tag[0] + "-" + split_tag[1], None)
        if language is None and len(split_tag) >= 1:
            language = self._languages.get(split_tag[0], None)

        # fallback to english if the language is not recognized or
        # locale was not specified
        if language:
            return language
        else:
            return language['en']
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def is_valid_cidr(string_network):
    """
    Very simple check of the cidr format in no_proxy variable.

    :rtype: bool
    """
    if string_network.count('/') == 1:
        try:
            mask = int(string_network.split('/')[1])
        except ValueError:
            return False

        if mask < 1 or mask > 32:
            return False

        try:
            socket.inet_aton(string_network.split('/')[0])
        except socket.error:
            return False
    else:
        return False
    return True
项目:chitin    作者:SamStudio8    | 项目源码 | 文件源码
def attempt_special(self, cmd_str):
        # Special command handling
        fields = cmd_str.split(" ")
        SKIP = False

        command_set = []
        if cmd_str[0] == '@' or cmd_str[0] == '%':
            special_cmd = fields[0][1:]
            if special_cmd == "script":
                command_set = self.parse_script(fields[1], *fields[2:])
            elif special_cmd in special_commands:
                try:
                    special_commands[special_cmd](*fields[1:])
                    SKIP = True
                except TypeError as e:
                    print e
                    print("Likely incorrect usage of '%s'" % special_cmd)
        return SKIP, command_set
项目:chitin    作者:SamStudio8    | 项目源码 | 文件源码
def super_handle(self, command_set):
        handled = None
        for command_i, command in enumerate(command_set):
            to_capture = []
            try:
                to_capture = self.uncaptured_variable_blocks[command_i]
            except:
                pass

            handled = self.handle_command(command.split(" "), to_capture, self.variables, self.meta)
            if handled:
                if "captured" in handled:
                    self.variables.update(handled["captured"])
            print("")
            #####################################
        #TODO return aggregate message for scripts instead of last message
        return handled
项目:docklet    作者:unias    | 项目源码 | 文件源码
def get_proc_etime(self,pid):
        fmt = subprocess.getoutput("ps -A -opid,etime | grep '^ *%d ' | awk '{print $NF}'" % pid).strip()
        if fmt == '':
            return -1
        parts = fmt.split('-')
        days = int(parts[0]) if len(parts) == 2 else 0
        fmt = parts[-1]
        parts = fmt.split(':')
        hours = int(parts[0]) if len(parts) == 3 else 0
        parts = parts[len(parts)-2:]
        minutes = int(parts[0])
        seconds = int(parts[1])
        return ((days * 24 + hours) * 60 + minutes) * 60 + seconds

    # compute the billing val this running hour
    # if isreal is True, it will also make users' beans decrease to pay for the bill.
    # return the billing value in this running hour
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def unquote_unreserved(uri):
    """Un-escape any percent-escape sequences in a URI that are unreserved
    characters. This leaves all reserved, illegal and non-ASCII bytes encoded.

    :rtype: str
    """
    parts = uri.split('%')
    for i in range(1, len(parts)):
        h = parts[i][0:2]
        if len(h) == 2 and h.isalnum():
            try:
                c = chr(int(h, 16))
            except ValueError:
                raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)

            if c in UNRESERVED_SET:
                parts[i] = c + parts[i][2:]
            else:
                parts[i] = '%' + parts[i]
        else:
            parts[i] = '%' + parts[i]
    return ''.join(parts)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def is_valid_cidr(string_network):
    """
    Very simple check of the cidr format in no_proxy variable.

    :rtype: bool
    """
    if string_network.count('/') == 1:
        try:
            mask = int(string_network.split('/')[1])
        except ValueError:
            return False

        if mask < 1 or mask > 32:
            return False

        try:
            socket.inet_aton(string_network.split('/')[0])
        except socket.error:
            return False
    else:
        return False
    return True
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def unquote_unreserved(uri):
    """Un-escape any percent-escape sequences in a URI that are unreserved
    characters. This leaves all reserved, illegal and non-ASCII bytes encoded.

    :rtype: str
    """
    parts = uri.split('%')
    for i in range(1, len(parts)):
        h = parts[i][0:2]
        if len(h) == 2 and h.isalnum():
            try:
                c = chr(int(h, 16))
            except ValueError:
                raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)

            if c in UNRESERVED_SET:
                parts[i] = c + parts[i][2:]
            else:
                parts[i] = '%' + parts[i]
        else:
            parts[i] = '%' + parts[i]
    return ''.join(parts)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def is_valid_cidr(string_network):
    """
    Very simple check of the cidr format in no_proxy variable.

    :rtype: bool
    """
    if string_network.count('/') == 1:
        try:
            mask = int(string_network.split('/')[1])
        except ValueError:
            return False

        if mask < 1 or mask > 32:
            return False

        try:
            socket.inet_aton(string_network.split('/')[0])
        except socket.error:
            return False
    else:
        return False
    return True
项目:personal-identification-pipeline    作者:TeamErlich    | 项目源码 | 文件源码
def build_sam_tags(flds):
    """
    Given a list of fields from a SAM file
    (all fields, including the first 11 fixed fields),
    returns a dictionary with the SAM tags (e.g. 'MD', 'NM').

    Tags with type 'i' are converted to integers.
    Tags with type 'f' are converted to floats.

    Example:
      tags = build_sam_tags( ["NM:i:0","MD:Z:77"])
        => tags = { 'NM':0, 'MD':'77' }
    """

    # Split tags into tuples of (name,type,value)
    # e.g. ["NM:i:0","MD:Z:77"] => [('NM', 'i', '0'), ('MD', 'Z', '77')]
    in_tags = [ tuple(x.split(':')) for x in flds[11:]]
    out_tags = {}
    for n,t,v in in_tags:
        if t=="i":
            v = int(v)
        elif t=='f':
            v = float(v)
        out_tags[n]=v
    return out_tags
项目:IgDiscover    作者:NBISweden    | 项目源码 | 文件源码
def slice_arg(s):
    """
    Parse a string that describes a slice with start and end.

    >>> slice_arg('2:-3')
    slice(2, -3, None)

    >> slice_arg(':-3')
    slice(None, -3, None)

    >> slice_arg('2:')
    slice(2, None, None)
    """

    start, end = s.split(':')
    start = None if start == '' else int(start)
    end = None if end == '' else int(end)
    return slice(start, end)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def __init__(self, fullpath):
        fn = os.path.split(fullpath)[-1]

        dot_parts = fn.split(".")
        if dot_parts[-1] == "fastq":
            name = dot_parts[-2]
        elif len(dot_parts) > 2 and dot_parts[-2] == "fastq":
            name = dot_parts[-3]
        else:
            raise NameError("%s is not a fastq file" % fullpath)

        all_flds = name.split("_")

        flds = all_flds[-4:]
        self.prefix = "_".join(all_flds[:-4])

        self.s = flds[0][1:]
        self.lane = int(flds[1][2:])
        self.read = flds[2]
        self.group = int(flds[3])

        self.filename = fullpath
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def get_run_data(fn):
    """ Parse flowcell + lane from the first FASTQ record.
    NOTE: we don't check whether there are multiple FC / lanes in this file.
    NOTE: taken from longranger/mro/stages/reads/setup_chunks
    """
    if fn[-2:] == 'gz':
        reader = gzip.open(fn)
    else:
        reader = open(fn, 'r')

    gen = read_generator_fastq(reader)

    try:
        (name, seq, qual) = gen.next()
        (flowcell, lane) = re.split(':', name)[2:4]
        return (flowcell, lane)
    except StopIteration:
        # empty fastq
        raise ValueError('Could not extract flowcell and lane from FASTQ file. File is empty: %s' % fn)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def set_data_field(record, field_name, field_val):
    assert(len(record.samples) == 1)
    new_format = record.FORMAT
    new_fields = new_format.split(':')
    if not(field_name in new_fields):
        new_fields = new_fields + [field_name]
        new_format = ':'.join(new_fields)

    sample_call = get_record_sample_call(record)
    data = sample_call.data
    data_dict = data._asdict()
    data_dict[field_name] = field_val

    new_sample_vals = []
    for field in new_fields:
        new_sample_vals.append(data_dict[field])

    # Note - the old way of passing the fields to pyVCF is memory intensive
    # because a fresh type is allocated for each call to make_calldata_tuple
    #data_instantiator = vcf.model.make_calldata_tuple(new_fields)
    #data = data_instantiator(*new_sample_vals)
    data = FakeNamedTuple(new_fields, new_sample_vals)
    sample_call.data = data
    record.samples[0] = sample_call
    record.FORMAT = new_format
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def get_locus_info(locus):
    """ Returns chrom, start and stop from locus string.
    Enforces standardization of how locus is represented.
    chrom:start_stop (start and stop should be ints or 'None')
    """
    chrom, start_stop = locus.split(':')
    if chrom == 'None':
        chrom = None

    start, stop = re.split("\.\.|-", start_stop)
    if start == 'None':
        start = None
    else:
        start = int(float(start))
    if stop == 'None':
        stop = None
    else:
        stop = int(float(stop))
    return (str(chrom), start, stop)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def get_target_regions_dict(targets_file):
    """ Gets the target regions from a targets file as a chrom-indexed dictionary,
    with every entry given as a list of (start, end) tuples
    """
    targets = {}
    for line in targets_file:
        info = line.strip().split('\t')
        if line.startswith('browser') or line.startswith('track') or line.startswith('-browser') or line.startswith('-track') or line.startswith('#'):
            continue
        if len(line.strip()) == 0:
            continue
        chrom = info[0]
        start = int(info[1])
        end = int(info[2])
        chrom_targs = targets.setdefault(chrom, [])
        chrom_targs.append((start, end))
    return targets
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def read_reference(self):
        profiles = {}
        reference = {}
        reference_file = open(self.path + "Reference.txt", "rU")
        bin_size = float("nan")
        for line in reference_file:
            line = line.strip("\n").strip("\r")
            (key, value) = line.split("\t")
            if key == "BinSize":
                bin_size = int(value)
            else:
                reference[key] = float(value)
            # if BinSize else
        # for line
        reference_file.close()
        profiles["BinSize"] = bin_size
        profiles["Reference"] = reference
        self.profiles = profiles
        return profiles
    # read_reference

    #...........................................................................
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def debug_print(self, msg):
        """Print 'msg' to stdout if the global DEBUG (taken from the
        DISTUTILS_DEBUG environment variable) flag is true.
        """
        from distutils.debug import DEBUG
        if DEBUG:
            print msg
            sys.stdout.flush()


    # -- Option validation methods -------------------------------------
    # (these are very handy in writing the 'finalize_options()' method)
    #
    # NB. the general philosophy here is to ensure that a particular option
    # value meets certain type and value constraints.  If not, we try to
    # force it into conformance (eg. if we expect a list but have a string,
    # split the string on comma and/or whitespace).  If we can't force the
    # option into conformance, raise DistutilsOptionError.  Thus, command
    # classes need do nothing more than (eg.)
    #   self.ensure_string_list('foo')
    # and they can be guaranteed that thereafter, self.foo will be
    # a list of strings.
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def push(self, data):
        """Push some new data into this object."""
        # Handle any previous leftovers
        data, self._partial = self._partial + data, ''
        # Crack into lines, but preserve the newlines on the end of each
        parts = NLCRE_crack.split(data)
        # The *ahem* interesting behaviour of re.split when supplied grouping
        # parentheses is that the last element of the resulting list is the
        # data after the final RE.  In the case of a NL/CR terminated string,
        # this is the empty string.
        self._partial = parts.pop()
        #GAN 29Mar09  bugs 1555570, 1721862  Confusion at 8K boundary ending with \r:
        # is there a \n to follow later?
        if not self._partial and parts and parts[-1].endswith('\r'):
            self._partial = parts.pop(-2)+parts.pop()
        # parts is a list of strings, alternating between the line contents
        # and the eol character(s).  Gather up a list of lines after
        # re-attaching the newlines.
        lines = []
        for i in range(len(parts) // 2):
            lines.append(parts[i*2] + parts[i*2+1])
        self.pushlines(lines)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def which(program):
    # type: (str) -> Optional[str]
    program = exename(program)
    fpath, _ = os.path.split(program)
    if fpath:
        if is_executable(program):
            return program
    else:
        for path in [os.path.abspath(os.curdir)] + os.environ['PATH'].split(os.pathsep):
            path = path.strip('"')
            exe_file = os.path.join(unifilename(path), unifilename(program))
            if is_executable(exe_file):
                return exe_file

    return None

# ----------------------------------------------------------------------
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def reformat(self, sourcefile, destfile, configfile):
        # type: (str, str, str) -> None
        formatstyle = style_make()
        with open(configfile) as fp:
            for line in fp.readlines():
                line = line.rstrip()
                if line.startswith('#'):
                    continue
                parts = line.split('=')
                if len(parts) == 2:
                    optionname, value = parts
                    set_option(formatstyle, optionname, value)
        sourcedata = readbinary(sourcefile)
        data = self.formatcode(formatstyle, sourcedata, filename=sourcefile)
        if data is None:
            data = b''
        writebinary(destfile, data)

# ----------------------------------------------------------------------
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def reformat(self, sourcefile, destfile, configfile):
        # type: (str, str, str) -> None
        formatstyle = style_make()
        with open(configfile) as fp:
            for line in fp.readlines():
                line = line.rstrip()
                if line.startswith('#'):
                    continue
                parts = re.split(r'\s+=\s+', line)
                if len(parts) == 2:
                    optionname, value = parts
                    set_option(formatstyle, optionname, value)
        sourcedata = readbinary(sourcefile)
        data = self.formatcode(formatstyle, sourcedata, filename=sourcefile)
        if data is None:
            data = b''
        writebinary(destfile, data)

# ----------------------------------------------------------------------
# Functions for the in-memory cache
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def unpack_exeresult(buf):
    # type: (bytes) -> Tuple[int, bytes, bytes]

    def unpack_error():
        # type: () -> None
        raise ValueError('invalid buffer in unpack_exeresult')

    buf = binary_type(buf)
    pos = buf.find(b'|')
    if pos < 0:
        unpack_error()
    lengths, data = buf[:pos], buf[pos + 1:]
    try:
        numvalues = [int(s) for s in lengths.split()]
    except ValueError:
        numvalues = []
    if len(numvalues) != 3:
        unpack_error()
    returncode, outlen, errlen = numvalues
    if outlen + errlen != len(data):
        unpack_error()
    return returncode, data[:outlen], data[outlen:outlen + errlen]

# ----------------------------------------------------------------------
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def formatters_for_files(filenames):
    # type: (List[str]) -> List[str]
    """Returns a list of formatter names that support every extension of these filenames.
    """
    exts = set()  # type: Set[str]
    for f in filenames:
        root, ext = os.path.splitext(f)
        ext = ext.lower()
        if not ext and root.startswith('.'):
            # Recognize extension-only filenames as well.
            ext = root.lower()
        exts.add(ext)
    supported = []
    for fmt, fmtexts in SUPPORTED_EXTS:
        fmt_exts = set(fmtexts.split())  # type: Set[str]
        if not exts or exts.issubset(fmt_exts):
            supported.append(fmt)
    return supported
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def get_release_quality(release_name):
    if release_name is None: return
    try: release_name = release_name.encode('utf-8')
    except: pass

    try:
        release_name = release_name.upper()

        fmt = re.sub('(.+)(\.|\(|\[|\s)(\d{4}|S\d*E\d*|S\d*)(\.|\)|\]|\s)', '', release_name)
        fmt = re.split('\.|\(|\)|\[|\]|\s|-', fmt)
        fmt = [i.lower() for i in fmt]

        if '1080p' in fmt: quality = '1080p'
        elif '720p' in fmt: quality = 'HD'
        else: quality = 'SD'
        if any(i in ['dvdscr', 'r5', 'r6'] for i in fmt): quality = 'SCR'
        elif any(i in ['camrip', 'tsrip', 'hdcam', 'hdts', 'dvdcam', 'dvdts', 'cam', 'telesync', 'ts'] for i in fmt): quality = 'CAM'

        info = []
        if '3d' in fmt or '.3D.' in release_name: info.append('3D')
        if any(i in ['hevc', 'h265', 'x265'] for i in fmt): info.append('HEVC')

        return quality, info
    except:
        return 'SD', []
项目:googletranslate.popclipext    作者:wizyoung    | 项目源码 | 文件源码
def unquote_unreserved(uri):
    """Un-escape any percent-escape sequences in a URI that are unreserved
    characters. This leaves all reserved, illegal and non-ASCII bytes encoded.

    :rtype: str
    """
    parts = uri.split('%')
    for i in range(1, len(parts)):
        h = parts[i][0:2]
        if len(h) == 2 and h.isalnum():
            try:
                c = chr(int(h, 16))
            except ValueError:
                raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)

            if c in UNRESERVED_SET:
                parts[i] = c + parts[i][2:]
            else:
                parts[i] = '%' + parts[i]
        else:
            parts[i] = '%' + parts[i]
    return ''.join(parts)
项目:BioNanoAnalyst    作者:AppliedBioinformatics    | 项目源码 | 文件源码
def __init__(self, xmap_file, r_cmap_file, q_cmap_file, confidence_score, reference):
        self.xmap = xmap_file
        self.rcmap = r_cmap_file
        self.qcmap = q_cmap_file
        self.confidence_score = confidence_score
        self.ref = reference
        self.name=xmap_file.rsplit('.',1)[0].split('/')[-1]
        self.XmapTable = None
        self.filtered_XmapTable = None
        self.RcmapTable = None
        self.QcmapTable = None
        self.ref_id = None
        self.ref_inf = None
        self.cmap = None
        self.unqualified = None
        self.qualified = None
        self.mapped = None
        self.unmapped = None
        self.BN = None
        self.detail = None
        self.no_data = None
        self.kicked = None
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def pathtype(cls, path=None):
        path = os.path.abspath(path or getcwd())

        depth = 0
        while cd(path):
            tpath = path
            path = Repo.findparent(path)
            if path:
                depth += 1
                path = os.path.split(path)[0]
                if tpath == path:       # Reached root.
                    break
            else:
                break

        return "directory" if depth == 0 else ("program" if depth == 1 else "library")
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def __init__(self, path=None, print_warning=False):
        path = os.path.abspath(path or getcwd())
        self.path = path
        self.is_cwd = True

        while cd(path):
            tpath = path
            if os.path.isfile(os.path.join(path, Cfg.file)):
                self.path = path
                self.is_cwd = False
                break

            path = os.path.split(path)[0]
            if tpath == path:       # Reached root.
                break

        self.name = os.path.basename(self.path)
        self.is_classic = os.path.isfile(os.path.join(self.path, 'mbed.bld'))

        # is_cwd flag indicates that current dir is assumed to be root, not root repo
        if self.is_cwd and print_warning:
            warning(
                "Could not find mbed program in current path \"%s\".\n"
                "You can fix this by calling \"mbed new .\" in the root of your program." % self.path)
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def assemble(instrs):
    res = []
    for inst in instrs:
        m = instre.match(inst)
        if not m or not m.group(1) in aCode_map:
            continue
        opcode, parmfmt = aCode_map[m.group(1)]
        res.append(struct.pack("B", opcode))
        if m.group(2):
            if parmfmt == 0:
                continue
            parms = [int(x) for x in re.split(",\s*", m.group(2))]
            if parmfmt == -1:
                l = len(parms)
                res.append(struct.pack(("%dB" % (l+1)), l, *parms))
            else:
                res.append(struct.pack(parmfmt, *parms))
    return b"".join(res)
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def fromXML(self, name, attrs, content, ttFont, version=2.0):
        if name == 'linearClasses':
            for element in content:
                if not isinstance(element, tuple): continue
                tag, attrs, subcontent = element
                if tag == 'linear':
                    l = content_string(subcontent).split()
                    self.linear.append(l)
        elif name == 'nonLinearClasses':
            for element in content:
                if not isinstance(element, tuple): continue
                tag, attrs, subcontent = element
                if tag =='nonLinear':
                    l = {}
                    for e in subcontent:
                        if not isinstance(e, tuple): continue
                        tag, attrs, subsubcontent = e
                        if tag == 'map':
                            l[attrs['glyph']] = int(safeEval(attrs['index']))
                    self.nonLinear.append(l)
项目:Jumper-Cogs    作者:Redjumpman    | 项目源码 | 文件源码
def collect_moves(self, reader, name):
        Moves = namedtuple('Moves', ['pokemon', 'gen', 'color', 'moves', 'versions'])
        if name.split('-')[-1].isdigit():
            for row in reader:
                if name == row[0]:
                    pokemon = name.split('-')[0].title()
                    generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
                    moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
                    return Moves(pokemon, generation, color, moves, versions)
        else:
            for row in reader:
                if name in row[0]:
                    pokemon = name.title()
                    generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
                    moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
                    return Moves(pokemon, generation, color, moves, versions)
项目:WPS-4th    作者:Fastcampus-WPS    | 项目源码 | 文件源码
def clean_recipient_numbers(self):
        cleaned_numbers = []
        error_numbers = []
        # 0?? ???? ?? 9? ?? 10?? ???? ??
        p = re.compile(r'^0\d{9}\d?$')
        number_string = self.cleaned_data['recipient_numbers']
        print(number_string)
        # ???? ?? '-'??? ''(? ???)? ?????
        sub_string = re.sub(r'\s|-', '', number_string)
        print(sub_string)
        # , ?? .? ???? ???? ??? ???? ??? numbers? ??
        numbers = re.split(r',|\.', sub_string)
        print(numbers)
        for number in numbers:
            if re.match(p, number):
                cleaned_numbers.append(number)
            else:
                error_numbers.append(number)

        if error_numbers:
            raise ValidationError('Invalid phone number format! {}'.format(', '.join(error_numbers)))
        return cleaned_numbers
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _custom(self, custom):
        """
        Create commented files to let the admin know where it's safe
        to make custom changes.  Mirror the default tree. Never overwrite.
        """
        path_dir = os.path.dirname(custom)
        if not os.path.isdir(path_dir):
            _create_dirs(path_dir, self.pillar_dir)
        if not self.dryrun:
            if not os.path.isfile(custom):
                log.info("Writing {}".format(custom))
                with open(custom, "w") as yml:
                    custom_split = custom.split("stack")
                    custom_for = "{}{}{}".format(
                            custom_split[0],
                            "stack/default",
                            custom_split[1])
                    yml.write("# {}\n".format(custom))
                    yml.write("# Overwrites configuration in {}\n".format(custom_for))
                    _examples(custom, yml)
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _parse(line):
    """
    Return globbed files constrained by optional slices or regexes.
    """
    if " " in line:
        parts = re.split('\s+', line)
        files = sorted(glob.glob(parts[0]))
        for optional in parts[1:]:
            filter_type, value = optional.split('=')
            if filter_type == "re":
                regex = re.compile(value)
                files = [m.group(0) for l in files for m in [regex.search(l)] if m]
            elif filter_type == "slice":
                # pylint: disable=eval-used
                files = eval("files{}".format(value))
            else:
                log.warning("keyword {} unsupported".format(filter_type))

    else:
        files = glob.glob(line)
    return files
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _parse(self, line):
        """
        Return globbed files constrained by optional slices or regexes.
        """
        if " " in line:
            parts = re.split(r'\s+', line)
            files = sorted(glob.glob(parts[0]))
            for keyvalue in parts[1:]:
                key, value = keyvalue.split('=')
                if key == "re":
                    regex = re.compile(value)
                    files = [match.group(0) for _file in files
                             for match in [regex.search(_file)] if match]
                elif key == "slice":
                    # pylint: disable=eval-used
                    files = eval("files{}".format(value))
                else:
                    log.warning("keyword {} unsupported".format(key))

        else:
            files = glob.glob(line)
        return files
项目:git-stacktrace    作者:pinterest    | 项目源码 | 文件源码
def prep_blob(self, blob):
        """Cleanup input."""
        # remove empty lines
        if type(blob) == list:
            blob = [line for line in blob if line.strip() != '']
            if len(blob) == 1:
                blob = blob[0].replace('\\n', '\n').split('\n')
        # Split by line
        if type(blob) == str or type(blob) == six.text_type:
            lines = blob.split('\n')
        elif type(blob) == list:
            if len(blob) == 1:
                lines = blob[0].split('\n')
            else:
                lines = [line.rstrip() for line in blob]
        else:
            message = "Unknown input format"
            log.debug("%s - '%s", message, blob)
            raise ParseException(message)
        return lines
项目:det_k_bisbm    作者:junipertcy    | 项目源码 | 文件源码
def _save_edgelist_as_1_indexed(f_edgelist, f_target_edgelist, delimiter="\t"):
        """
            Note that this function always saves with delimiter "\t"
        :param f_edgelist:
        :param f_target_edgelist:
        :param delimiter:
        :return:
        """
        import re
        with open(f_target_edgelist, "w") as g:
            with open(f_edgelist, "r") as f:
                for line in f:
                    line = line.replace('\r', '').replace('\n', '')
                    edge = re.split(delimiter, line)
                    try:
                        g.write(str(int(edge[0]) + 1) + "\t" + str(int(edge[1]) + 1) + "\n")
                    except ValueError as e:
                        raise ValueError(
                            "[ERROR] Please check if the delimiter for the edgelist file is wrong -- {}".format(e)
                        )
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _setup_requirements(argument):
    from sqlalchemy.testing import config
    from sqlalchemy import testing

    if config.requirements is not None:
        return

    modname, clsname = argument.split(":")

    # importlib.import_module() only introduced in 2.7, a little
    # late
    mod = __import__(modname)
    for component in modname.split(".")[1:]:
        mod = getattr(mod, component)
    req_cls = getattr(mod, clsname)

    config.requirements = testing.requires = req_cls()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _setup_requirements(argument):
    from alembic.testing import config

    if config.requirements is not None:
        return

    modname, clsname = argument.split(":")

    # importlib.import_module() only introduced in 2.7, a little
    # late
    mod = __import__(modname)
    for component in modname.split(".")[1:]:
        mod = getattr(mod, component)
    req_cls = getattr(mod, clsname)

    config.requirements = req_cls()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def push(self, data):
        """Push some new data into this object."""
        # Handle any previous leftovers
        data, self._partial = self._partial + data, ''
        # Crack into lines, but preserve the newlines on the end of each
        parts = NLCRE_crack.split(data)
        # The *ahem* interesting behaviour of re.split when supplied grouping
        # parentheses is that the last element of the resulting list is the
        # data after the final RE.  In the case of a NL/CR terminated string,
        # this is the empty string.
        self._partial = parts.pop()
        # parts is a list of strings, alternating between the line contents
        # and the eol character(s).  Gather up a list of lines after
        # re-attaching the newlines.
        lines = []
        for i in range(len(parts) // 2):
            lines.append(parts[i*2] + parts[i*2+1])
        self.pushlines(lines)
项目:calibre_dangdang    作者:qunxyz    | 项目源码 | 文件源码
def parse_tags(self, root):
        ans = []
        exclude_tokens = {'kindle', 'a-z'}
        exclude = {'special features', 'by authors', 'authors & illustrators', 'books', 'new; used & rental textbooks'}
        seen = set()

        for a in root.xpath(self.tags_xpath):
            raw = (a.text or '').strip().replace(',', ';').replace('/', ';').replace('>', ';')

            lraw = icu_lower(raw)
            tokens = frozenset(lraw.split())
            if raw and lraw not in exclude and not tokens.intersection(exclude_tokens) and lraw not in seen:
                ans.append(raw)
                seen.add(lraw)

        return ans
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def unquote_unreserved(uri):
    """Un-escape any percent-escape sequences in a URI that are unreserved
    characters. This leaves all reserved, illegal and non-ASCII bytes encoded.

    :rtype: str
    """
    parts = uri.split('%')
    for i in range(1, len(parts)):
        h = parts[i][0:2]
        if len(h) == 2 and h.isalnum():
            try:
                c = chr(int(h, 16))
            except ValueError:
                raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)

            if c in UNRESERVED_SET:
                parts[i] = c + parts[i][2:]
            else:
                parts[i] = '%' + parts[i]
        else:
            parts[i] = '%' + parts[i]
    return ''.join(parts)
项目:aws-waf-security-automation    作者:cerbo    | 项目源码 | 文件源码
def unquote_unreserved(uri):
    """Un-escape any percent-escape sequences in a URI that are unreserved
    characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
    """
    parts = uri.split('%')
    for i in range(1, len(parts)):
        h = parts[i][0:2]
        if len(h) == 2 and h.isalnum():
            try:
                c = chr(int(h, 16))
            except ValueError:
                raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)

            if c in UNRESERVED_SET:
                parts[i] = c + parts[i][2:]
            else:
                parts[i] = '%' + parts[i]
        else:
            parts[i] = '%' + parts[i]
    return ''.join(parts)
项目:aws-waf-security-automation    作者:cerbo    | 项目源码 | 文件源码
def is_valid_cidr(string_network):
    """Very simple check of the cidr format in no_proxy variable"""
    if string_network.count('/') == 1:
        try:
            mask = int(string_network.split('/')[1])
        except ValueError:
            return False

        if mask < 1 or mask > 32:
            return False

        try:
            socket.inet_aton(string_network.split('/')[0])
        except socket.error:
            return False
    else:
        return False
    return True
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def fix_repeating_arguments(self):
        """Fix elements that should accumulate/increment values."""
        either = [list(c.children) for c in self.either.children]
        for case in either:
            for e in [c for c in case if case.count(c) > 1]:
                if type(e) is Argument or type(e) is Option and e.argcount:
                    if e.value is None:
                        e.value = []
                    elif type(e.value) is not list:
                        e.value = e.value.split()
                if type(e) is Command or type(e) is Option and e.argcount == 0:
                    e.value = 0
        return self
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def parse(class_, option_description):
        short, long, argcount, value = None, None, 0, False
        options, _, description = option_description.strip().partition('  ')
        options = options.replace(',', ' ').replace('=', ' ')
        for s in options.split():
            if s.startswith('--'):
                long = s
            elif s.startswith('-'):
                short = s
            else:
                argcount = 1
        if argcount:
            matched = re.findall('\[default: (.*)\]', description, flags=re.I)
            value = matched[0] if matched else None
        return class_(short, long, argcount, value)
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def __init__(self, source, error):
        self += source.split() if hasattr(source, 'split') else source
        self.error = error
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def parse_defaults(doc):
    # in python < 2.7 you can't pass flags=re.MULTILINE
    split = re.split('\n *(<\S+?>|-\S+?)', doc)[1:]
    split = [s1 + s2 for s1, s2 in zip(split[::2], split[1::2])]
    options = [Option.parse(s) for s in split if s.startswith('-')]
    #arguments = [Argument.parse(s) for s in split if s.startswith('<')]
    #return options, arguments
    return options