我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用re.compile()。
def get_client_ip(request): access_route = get_access_route(request) if len(access_route) == 1: return access_route[0] expression = """ (^(?!(?:[0-9]{1,3}\.){3}[0-9]{1,3}$).*$)| # will match non valid ipV4 (^127\.0\.0\.1)| # will match 127.0.0.1 (^10\.)| # will match 10.0.0.0 - 10.255.255.255 IP-s (^172\.1[6-9]\.)| # will match 172.16.0.0 - 172.19.255.255 IP-s (^172\.2[0-9]\.)| # will match 172.20.0.0 - 172.29.255.255 IP-s (^172\.3[0-1]\.)| # will match 172.30.0.0 - 172.31.255.255 IP-s (^192\.168\.) # will match 192.168.0.0 - 192.168.255.255 IP-s """ regex = re.compile(expression, re.X) for ip in access_route: if not ip: # it's possible that the first value from X_FORWARDED_FOR # will be null, so we need to pass that value continue if regex.search(ip): continue else: return ip
def _readRegExp(regExp): """ Discard leading white space characters from standard input. Then read from standard input and return a string matching regular expression regExp. Raise an EOFError if no non-whitespace characters remain in standard input. Raise a ValueError if the next characters to be read from standard input do not match 'regExp'. """ global _buffer if isEmpty(): raise EOFError() compiledRegExp = re.compile(r'^\s*' + regExp) match = compiledRegExp.search(_buffer) if match is None: raise ValueError() s = match.group() _buffer = _buffer[match.end():] return s.lstrip() #-----------------------------------------------------------------------
def run(self): Analyzer.run(self) if self.data_type == 'domain' or self.data_type == 'url': try: pattern = re.compile("(?:Category: )([\w\s]+)") baseurl = 'http://www.fortiguard.com/webfilter?q=' url = baseurl + self.getData() req = requests.get(url) category_match = re.search(pattern, req.content, flags=0) self.report({ 'category': category_match.group(1) }) except ValueError as e: self.unexpectedError(e) else: self.notSupported()
def findHeadings(self, lines, struc): linenum=len(lines) level=struc[-1] _h_re=re.compile(self.textparser._h_re_base % level, re.X | re.M) hidx=[] for ii in xrange(linenum): if _h_re.match(lines[ii]): hidx.append(ii) hidx.append(linenum) groups=[[hidx[ii],hidx[ii+1]] for ii in xrange(len(hidx)-1)] result=[] for ii in groups: #--------Use heading line as container name-------- result.append(TextContainer(lines[ii[0]],struc,ii)) return result
def getDetailList(self,content): s2 = r'<h2><a target="_blank" href="(.*?)" title="(.*?)"' pattern =re.compile(s2 , re.S ) result = re.findall(pattern, content) with open('file.txt','w',encoding='gbk') as f: f.write(content) if not result: print('???????..............') threadsList=[] for item in result: t = threading.Thread(target = workthread, args=(item, self.user_agent, self.path)) threadsList.append(t) t.start() for threadid in threadsList: threadid.join()
def __init__( self, index_url="https://pypi.python.org/simple", hosts=('*',), ca_bundle=None, verify_ssl=True, *args, **kw ): Environment.__init__(self, *args, **kw) self.index_url = index_url + "/" [:not index_url.endswith('/')] self.scanned_urls = {} self.fetched_urls = {} self.package_pages = {} self.allows = re.compile('|'.join(map(translate, hosts))).match self.to_scan = [] use_ssl = ( verify_ssl and ssl_support.is_available and (ca_bundle or ssl_support.find_ca_bundle()) ) if use_ssl: self.opener = ssl_support.opener_for(ca_bundle) else: self.opener = urllib.request.urlopen
def getAudio(freq, audio_files=None): files = os.listdir(DATA_DIR) p = re.compile('.*\.[mkv|avi]') files = [ f for f in files if p.match(f) ] if audio_files: files = [ f for f in files if os.path.splitext(f)[0] in audio_files] audio_dirs = [] for f in files: name, extension = os.path.splitext(f) command = "ffmpeg -i {0}{1}{2} -ab 160k -ac 2 -ar {3} -vn {0}{1}_{3}.wav".format(DATA_DIR, name, extension, freq) audio_dirs.append(DATA_DIR + name + '_' + str(freq) + '.wav') subprocess.call(command, shell=True) return audio_dirs # Convert timestamp to seconds
def getRegEx(pattern): """Compiles and returns a 'regular expression' object for the given address-pattern. """ # Translate OSC-address syntax to python 're' syntax pattern = pattern.replace(".", r"\.") # first, escape all '.'s in the pattern. pattern = pattern.replace("(", r"\(") # escape all '('s. pattern = pattern.replace(")", r"\)") # escape all ')'s. pattern = pattern.replace("*", r".*") # replace a '*' by '.*' (match 0 or more characters) pattern = pattern.translate(OSCtrans) # change '?' to '.' and '{,}' to '(|)' return re.compile(pattern) ###### # # OSCMultiClient class # ######
def characterErrorsUCS2(self, data): # Someone picked the wrong compile option # You lose skip = False for match in invalid_unicode_re.finditer(data): if skip: continue codepoint = ord(match.group()) pos = match.start() # Pretty sure there should be endianness issues here if _utils.isSurrogatePair(data[pos:pos + 2]): # We have a surrogate pair! char_val = _utils.surrogatePairToCodepoint(data[pos:pos + 2]) if char_val in non_bmp_invalid_codepoints: self.errors.append("invalid-codepoint") skip = True elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and pos == len(data) - 1): self.errors.append("invalid-codepoint") else: skip = False self.errors.append("invalid-codepoint")
def run_script(self, script_name, namespace): script = 'scripts/' + script_name if not self.has_metadata(script): raise ResolutionError("No script named %r" % script_name) script_text = self.get_metadata(script).replace('\r\n', '\n') script_text = script_text.replace('\r', '\n') script_filename = self._fn(self.egg_info, script) namespace['__file__'] = script_filename if os.path.exists(script_filename): source = open(script_filename).read() code = compile(source, script_filename, 'exec') exec(code, namespace, namespace) else: from linecache import cache cache[script_filename] = ( len(script_text), 0, script_text.split('\n'), script_filename ) script_code = compile(script_text, script_filename, 'exec') exec(script_code, namespace, namespace)
def _needs_hiding(mod_name): """ >>> _needs_hiding('setuptools') True >>> _needs_hiding('pkg_resources') True >>> _needs_hiding('setuptools_plugin') False >>> _needs_hiding('setuptools.__init__') True >>> _needs_hiding('distutils') True >>> _needs_hiding('os') False >>> _needs_hiding('Cython') True """ pattern = re.compile(r'(setuptools|pkg_resources|distutils|Cython)(\.|$)') return bool(pattern.match(mod_name))
def byte_compile(self, to_compile): if sys.dont_write_bytecode: self.warn('byte-compiling is disabled, skipping.') return from distutils.util import byte_compile try: # try to make the byte compile messages quieter log.set_verbosity(self.verbose - 1) byte_compile(to_compile, optimize=0, force=1, dry_run=self.dry_run) if self.optimize: byte_compile( to_compile, optimize=self.optimize, force=1, dry_run=self.dry_run, ) finally: log.set_verbosity(self.verbose) # restore original verbosity
def filterwarnings(action, message="", category=Warning, module="", lineno=0, append=False): """Insert an entry into the list of warnings filters (at the front). 'action' -- one of "error", "ignore", "always", "default", "module", or "once" 'message' -- a regex that the warning message must match 'category' -- a class that the warning must be a subclass of 'module' -- a regex that the module name must match 'lineno' -- an integer line number, 0 matches all warnings 'append' -- if true, append to the list of filters """ import re assert action in ("error", "ignore", "always", "default", "module", "once"), "invalid action: %r" % (action,) assert isinstance(message, str), "message must be a string" assert isinstance(category, type), "category must be a class" assert issubclass(category, Warning), "category must be a Warning subclass" assert isinstance(module, str), "module must be a string" assert isinstance(lineno, int) and lineno >= 0, \ "lineno must be an int >= 0" _add_filter(action, re.compile(message, re.I), category, re.compile(module), lineno, append=append)
def getDetailList(self,content): pattern =re.compile(r'<h2><a target="_blank" href="(.*?)"'\ +r'title="(.*?)">', re.S ) #uf-8?????? file = open('file.txt', 'w',encoding='gbk') file.write(content) file.close() result = re.findall(pattern, content) if not result: print('???????..............') for item in result: self.getDetailPic(item)
def create_text_file(self, name): """ Create a new text file, change its name and return it Args: - name, a string to use as the new text file's name""" import re bpy.ops.text.new() re_text = re.compile(r'^Text.[0-9]{3}$') text_name = '' text_index, max_index = 0, 0 for text in bpy.data.texts: if re_text.match(text.name): text_index = int(text.name[-3:]) if text_index > max_index: max_index = text_index text_name = text.name if not text_name: text_name = 'Text' bpy.data.texts[text_name].name = name return bpy.data.texts[name]
def create_text_file(name): """ Create a new text file, change its name and return it Args: - name, the name of the text file, a string""" if not name and isinstance(name, str): raise TypeError('The name of the text file has to be a string') bpy.ops.text.new() import re re_text = re.compile(r'^Text.[0-9]{3}$') text_name = '' text_index, max_index = 0, 0 for text in bpy.data.texts: if re_text.match(text.name): text_index = int(text.name[-3:]) if text_index > max_index: max_index = text_index text_name = text.name if not text_name: text_name = 'Text' bpy.data.texts[text_name].name = name return bpy.data.texts[name]
def load_params(self, f_, filter_=None): di = pickle.load(f_) if filter_ is None: for k,v in di.items(): p = self._vars_di[k].get_value(borrow=True) if p.shape != v.shape: raise ValueError('Shape mismatch, need %s, got %s'%(v.shape, p.shape), p.shape) self._vars_di[k].set_value(v) else: pat = re.compile(filter_) for k,v in di.items(): if not pat.fullmatch(k): continue p = self._vars_di[k].get_value(borrow=True) if p.shape != v.shape: raise ValueError('Shape mismatch, need %s, got %s'%(v.shape, p.shape), p.shape) self._vars_di[k].set_value(v)
def get_encodings_from_content(content): """Returns encodings from given content string. :param content: bytestring to extract encodings from. """ warnings.warn(( 'In requests 3.0, get_encodings_from_content will be removed. For ' 'more information, please see the discussion on issue #2266. (This' ' warning should only appear once.)'), DeprecationWarning) charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') return (charset_re.findall(content) + pragma_re.findall(content) + xml_re.findall(content))
def expect_match(self, pattern, on_error=None): """ Require @item to match at the current `position`. Raises a ValueError if @item does not match. @pattern A regular expression. @on_error A function that returns an error. The error returned overrides the default ValueError. """ m = re.compile(pattern).match(self.source, self.position) if m: self.position += m.end() - m.start() return m if not on_error: raise ValueError('expected match with \'{0}\', at \'{1}\''.format(pattern, self.source[self.position:])) raise on_error()
def match(self, pattern): """ Return the match obtained by searching @pattern. The current `position` will advance as many characters as the match's length. @pattern A regular expression. """ m = re.compile(pattern).match(self.source, self.position) if m: self.position += m.end() - m.start() return m return
def craw_last_index(ptt_class_name): #ptt_class_name = 'Soft_Job' index_url = 'https://www.ptt.cc/bbs/' + ptt_class_name + '/index.html' res = requests.get(index_url,verify = True) soup3 = BeautifulSoup(res.text, "lxml") x = soup3('',{'class':"btn wide"},text = re.compile('??')) last_index = x[0]['href'] last_index = last_index.replace('/bbs/' + ptt_class_name + '/index','') last_index = int( last_index.replace('.html','') )+1 return last_index #--------------------------------------------------------------------------------- # ?? ubuntu - crontab-e, ????, ??????? data # ?? PTT ????, ???????, ??????, # ??????DATA, ???? index ??????, ??????? data, # ?????, ??????
def get_iface_from_addr(addr): """Work out on which interface the provided address is configured.""" for iface in netifaces.interfaces(): addresses = netifaces.ifaddresses(iface) for inet_type in addresses: for _addr in addresses[inet_type]: _addr = _addr['addr'] # link local ll_key = re.compile("(.+)%.*") raw = re.match(ll_key, _addr) if raw: _addr = raw.group(1) if _addr == addr: log("Address '%s' is configured on iface '%s'" % (addr, iface)) return iface msg = "Unable to infer net iface on which '%s' is configured" % (addr) raise Exception(msg)
def parse_treasury_csv_column(column): """ Parse a treasury CSV column into a more human-readable format. Columns start with 'RIFLGFC', followed by Y or M (year or month), followed by a two-digit number signifying number of years/months, followed by _N.B. We only care about the middle two entries, which we turn into a string like 3month or 30year. """ column_re = re.compile( r"^(?P<prefix>RIFLGFC)" "(?P<unit>[YM])" "(?P<periods>[0-9]{2})" "(?P<suffix>_N.B)$" ) match = column_re.match(column) if match is None: raise ValueError("Couldn't parse CSV column %r." % column) unit, periods = get_unit_and_periods(match.groupdict()) # Roundtrip through int to coerce '06' into '6'. return str(int(periods)) + ('year' if unit == 'Y' else 'month')
def get_transcript_gc_content(self, transcript_obj): pattern = re.compile('[cCgG]') gc, length = 0, 0 for interval in transcript_obj.intervals: if interval.chrom not in self.chroms: continue seq = self.chroms[interval.chrom][interval.start:interval.end] gc += len(re.findall(pattern, seq)) length += interval.length if length > 0: return float(gc) / float(length) else: return 0 # NOTE: these stub classes are necessary to maintain backwards compatibility with old refdata (1.2 or older)
def default(self, line): if line[:1] == '!': line = line[1:] locals = self.curframe_locals globals = self.curframe.f_globals try: code = compile(line + '\n', '<stdin>', 'single') save_stdout = sys.stdout save_stdin = sys.stdin save_displayhook = sys.displayhook try: sys.stdin = self.stdin sys.stdout = self.stdout sys.displayhook = self.displayhook exec code in globals, locals finally: sys.stdout = save_stdout sys.stdin = save_stdin sys.displayhook = save_displayhook except: t, v = sys.exc_info()[:2] if type(t) == type(''): exc_type_name = t else: exc_type_name = t.__name__ print >>self.stdout, '***', exc_type_name + ':', v
def __exit__(self, exc_type, exc_value, tb): if exc_type is None: try: exc_name = self.expected.__name__ except AttributeError: exc_name = str(self.expected) raise self.failureException( "{0} not raised".format(exc_name)) if not issubclass(exc_type, self.expected): # let unexpected exceptions pass through return False self.exception = exc_value # store for later retrieval if self.expected_regexp is None: return True expected_regexp = self.expected_regexp if isinstance(expected_regexp, basestring): expected_regexp = re.compile(expected_regexp) if not expected_regexp.search(str(exc_value)): raise self.failureException('"%s" does not match "%s"' % (expected_regexp.pattern, str(exc_value))) return True
def descriptions(self, group_pattern): """Get descriptions for a range of groups.""" line_pat = re.compile("^(?P<group>[^ \t]+)[ \t]+(.*)$") # Try the more std (acc. to RFC2980) LIST NEWSGROUPS first resp, raw_lines = self.longcmd('LIST NEWSGROUPS ' + group_pattern) if resp[:3] != "215": # Now the deprecated XGTITLE. This either raises an error # or succeeds with the same output structure as LIST # NEWSGROUPS. resp, raw_lines = self.longcmd('XGTITLE ' + group_pattern) lines = [] for raw_line in raw_lines: match = line_pat.search(raw_line.strip()) if match: lines.append(match.group(1, 2)) return resp, lines
def xhdr(self, hdr, str, file=None): """Process an XHDR command (optional server extension). Arguments: - hdr: the header type (e.g. 'subject') - str: an article nr, a message id, or a range nr1-nr2 Returns: - resp: server response if successful - list: list of (nr, value) strings""" pat = re.compile('^([0-9]+) ?(.*)\n?') resp, lines = self.longcmd('XHDR ' + hdr + ' ' + str, file) for i in range(len(lines)): line = lines[i] m = pat.match(line) if m: lines[i] = m.group(1, 2) return resp, lines
def parse150(resp): '''Parse the '150' response for a RETR request. Returns the expected transfer size or None; size is not guaranteed to be present in the 150 message. ''' if resp[:3] != '150': raise error_reply, resp global _150_re if _150_re is None: import re _150_re = re.compile("150 .* \((\d+) bytes\)", re.IGNORECASE) m = _150_re.match(resp) if not m: return None s = m.group(1) try: return int(s) except (OverflowError, ValueError): return long(s)
def parse227(resp): '''Parse the '227' response for a PASV request. Raises error_proto if it does not contain '(h1,h2,h3,h4,p1,p2)' Return ('host.addr.as.numbers', port#) tuple.''' if resp[:3] != '227': raise error_reply, resp global _227_re if _227_re is None: import re _227_re = re.compile(r'(\d+),(\d+),(\d+),(\d+),(\d+),(\d+)') m = _227_re.search(resp) if not m: raise error_proto, resp numbers = m.groups() host = '.'.join(numbers[:4]) port = (int(numbers[4]) << 8) + int(numbers[5]) return host, port
def find_templates(): """ Load python modules from templates directory and get templates list :return: list of tuples (pairs): [(compiled regex, lambda regex_match: return message_data)] """ templates = [] templates_directory = (inspect.getsourcefile(lambda: 0).rstrip('__init__.py') + 'templates') template_files = os.listdir(templates_directory) for template_file in template_files: if template_file.startswith('.') or not template_file.endswith('.py'): continue # Hack for dev development and disutils try: template_module = importlib.import_module('templates.{}'.format( template_file.rstrip('.py') )) except ImportError: template_module = importlib.import_module('ross.templates.{}'.format( template_file.rstrip('.py') )) # Iterate throw items in template. # If there are variable ends with 'templates', # extend templates list with it. for (name, content) in template_module.__dict__.items(): if name.endswith('templates'): for (regex_text, data_func) in content: templates.append((re.compile(regex_text, re.IGNORECASE), data_func)) return templates
def list_nics(nic_type=None): """Return a list of nics of given type(s)""" if isinstance(nic_type, six.string_types): int_types = [nic_type] else: int_types = nic_type interfaces = [] if nic_type: for int_type in int_types: cmd = ['ip', 'addr', 'show', 'label', int_type + '*'] ip_output = subprocess.check_output(cmd).decode('UTF-8') ip_output = ip_output.split('\n') ip_output = (line for line in ip_output if line) for line in ip_output: if line.split()[1].startswith(int_type): matched = re.search('.*: (' + int_type + r'[0-9]+\.[0-9]+)@.*', line) if matched: iface = matched.groups()[0] else: iface = line.split()[1].replace(":", "") if iface not in interfaces: interfaces.append(iface) else: cmd = ['ip', 'a'] ip_output = subprocess.check_output(cmd).decode('UTF-8').split('\n') ip_output = (line.strip() for line in ip_output if line) key = re.compile('^[0-9]+:\s+(.+):') for line in ip_output: matched = re.search(key, line) if matched: iface = matched.group(1) iface = iface.partition("@")[0] if iface not in interfaces: interfaces.append(iface) return interfaces
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200): """Wait for rmq units extended status to show cluster readiness, after an optional initial sleep period. Initial sleep is likely necessary to be effective following a config change, as status message may not instantly update to non-ready.""" if init_sleep: time.sleep(init_sleep) message = re.compile('^Unit is ready and clustered$') deployment._auto_wait_for_status(message=message, timeout=timeout, include_only=['rabbitmq-server'])
def splituser(host): '''urllib.splituser(), but six's support of this seems broken''' _userprog = re.compile('^(.*)@(.*)$') match = _userprog.match(host) if match: return match.group(1, 2) return None, host
def splitpasswd(user): '''urllib.splitpasswd(), but six's support of this is missing''' _passwdprog = re.compile('^([^:]*):(.*)$', re.S) match = _passwdprog.match(user) if match: return match.group(1, 2) return user, None
def strip_tags(text, strip_punctuation=False): # Return only the words from content, stripping punctuation and HTML. soup = BeautifulSoup(text) if strip_punctuation: punctuation = re.compile('[{}]+'.format(re.escape(p))) words_only = punctuation.sub('', soup.get_text()) return words_only words_only = soup.get_text() return words_only
def __init__(self): threading.Thread.__init__(self) self.finished = threading.Event() # Give these some initial values self.mouse_position_x = 0 self.mouse_position_y = 0 self.ison = {"shift":False, "caps":False} # Compile our regex statements. self.isshift = re.compile('^Shift') self.iscaps = re.compile('^Caps_Lock') self.shiftablechar = re.compile('^[a-z0-9]$|^minus$|^equal$|^bracketleft$|^bracketright$|^semicolon$|^backslash$|^apostrophe$|^comma$|^period$|^slash$|^grave$') self.logrelease = re.compile('.*') self.isspace = re.compile('^space$') # Assign default function actions (do nothing). self.KeyDown = lambda x: True self.KeyUp = lambda x: True self.MouseAllButtonsDown = lambda x: True self.MouseAllButtonsUp = lambda x: True self.contextEventMask = [X.KeyPress,X.MotionNotify] # Hook to our display. self.local_dpy = display.Display() self.record_dpy = display.Display()
def _search_for_query(self, query): if query in self._search_pattern_cache: return self._search_pattern_cache[query] # Build pattern: include all characters pattern = [] for c in query: # pattern.append('[^{0}]*{0}'.format(re.escape(c))) pattern.append('.*?{0}'.format(re.escape(c))) pattern = ''.join(pattern) search = re.compile(pattern, re.IGNORECASE).search self._search_pattern_cache[query] = search return search
def defSyntax(self): '''Define re patterns according to syntax.''' #------------------REGEX patterns------------------ if self.syntax=='markdown': self._img_re=re.compile('^(.*)!\\[(.+?)\\]\\((.+?)\\)', re.M | re.L) self._h_re_base = r''' (^(.+)[ \t]*\n(=+|-+)[ \t]*\n+) | (^(\#{%s}) # \1 = string of #'s [ \t]* (.+?) # \2 = Header text [ \t]* (?<!\\) # ensure not an escaped trailing '#' \#* # optional closing #'s (not counted) \n+ ) ''' self._all_h_re=re.compile(self._h_re_base %'1,6', re.X | re.M) elif self.syntax=='zim': self._img_re=re.compile('^(.*)\\{\\{(.+?)\\}\\}(.*)$', re.M | re.L) self._h_re_base = r''' ^(\={%s}) # \1 = string of ='s [ \t]* (.+?) # \2 = Header text [ \t]* \1 \n+ ''' self._all_h_re=re.compile(self._h_re_base %'1,6', re.X | re.M) else: raise Exception("Unknown syntax %s" %self.syntax) return
def createNoteBook(title,geeknote=None,verbose=True): #-------------------Trunc title------------------- title=title.strip() title=truncStr(title,MAX_NOTEBOOK_TITLE_LEN) #-------Make sure title doesnt start with #------- tp=textparse.TextParser('markdown') _h_re=re.compile(tp._h_re_base %'1,', re.X | re.M) m=_h_re.match(title) if m: title=m.group(6) #---------------------Connect--------------------- if geeknote is None: geeknote=GeekNoteConnector() geeknote.connectToEvertone() #-----------------Check if exists----------------- notebooks=geeknote.getEvernote().findNotebooks() out.preloader.stop() if not isinstance(title,unicode): title=unicode(title,'utf8') notebooks=[unicode(ii.name,'utf8') for ii in notebooks] if title in notebooks: out.successMessage('Notebook already exists.') return 0 else: out.preloader.setMessage("Creating notebook...") result = geeknote.getEvernote().createNotebook(name=title) if result: out.successMessage("Notebook has been successfully created.") return 0 else: out.failureMessage("Error while the process " "of creating the notebook.") return tools.exitErr()