我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用string.replace()。
def eval_python(self, result): code = result.group('code') code = string.replace(code, '\t', ' ') try: if self.local_dict: result = eval(code, self.global_dict, self.local_dict) else: result = eval(code, self.global_dict) return str(result) except: self.errorLogger('\n---- Error parsing: ----\n') self.errorLogger(code) self.errorLogger('\n------------------------\n') raise #--------------------------------------------------------------------------- # This routine is only called when OUTPUT() is included in executed Python # code from the templates. It evaluates its parameter as if it was a # template and appends the result to the OUTPUT_TEXT variable in the global # dictionary.
def _extract_file_info(self, _file): """Extract file information.""" _file = json.loads(_file) info = {'filename': _file['name'], 'link_raw': string.replace(_file['link'], 'dl=0', 'raw=1'), 'link': _file['link']} if self._is_image_file(_file['name']): extra_fields = {'url_m': info['link_raw'], 'url_b': info['link_raw'], 'title': info['filename']} info.update(extra_fields) if self._is_video_file(_file['name']): url = self._create_raw_cors_link(_file['link']) extra_fields = {'video_url': url} info.update(extra_fields) if self._is_audio_file(_file['name']): url = self._create_raw_cors_link(_file['link']) extra_fields = {'audio_url': url} info.update(extra_fields) if self._is_pdf_file(_file['name']): url = self._create_raw_cors_link(_file['link']) extra_fields = {'pdf_url': url} info.update(extra_fields) return {'info': info}
def test_tasks_attributes_for_image_files(self): #For image file extensions: link, filename, url, url_m, url_b, title image_ext = ['png', 'jpg', 'jpeg', 'gif'] file_data = 'myfile.extension' for ext in image_ext: data = string.replace(file_data,'extension', ext) form_data = { 'files': [data], 'bucket': 'mybucket' } tasks = BulkTaskS3Import(**form_data).tasks() assert tasks[0]['info']['filename'] == "myfile.%s" % ext assert tasks[0]['info']['link'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext assert tasks[0]['info']['url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext assert tasks[0]['info']['url_m'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext assert tasks[0]['info']['url_b'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext assert tasks[0]['info']['title'] == "myfile.%s" % ext
def test_tasks_attributes_for_video_files(self): #For video file extension: link, filename, url, video_url video_ext = ['mp4', 'm4v', 'ogg', 'ogv', 'webm', 'avi'] file_data = 'myfile.extension' for ext in video_ext: data = string.replace(file_data,'extension', ext) form_data = { 'files': [data], 'bucket': 'mybucket' } tasks = BulkTaskS3Import(**form_data).tasks() assert tasks[0]['info']['filename'] == "myfile.%s" % ext assert tasks[0]['info']['link'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext assert tasks[0]['info']['url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext assert tasks[0]['info']['video_url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
def test_tasks_attributes_for_audio_files(self): #For audio file extension: link, filename, url, audio_url audio_ext = ['mp4', 'm4a', 'mp3', 'ogg', 'oga', 'webm', 'wav'] file_data = 'myfile.extension' for ext in audio_ext: data = string.replace(file_data,'extension', ext) form_data = { 'files': [data], 'bucket': 'mybucket' } tasks = BulkTaskS3Import(**form_data).tasks() assert tasks[0]['info']['filename'] == "myfile.%s" % ext assert tasks[0]['info']['link'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext assert tasks[0]['info']['url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext assert tasks[0]['info']['audio_url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
def test_tasks_attributes_for_video_files(self): #For video file extension: link, filename, link_raw, video_url video_ext = ['mp4', 'm4v', 'ogg', 'ogv', 'webm', 'avi'] file_data = (u'{"bytes":286,' u'"link":"https://www.dropbox.com/s/l2b77qvlrequ6gl/test.extension?dl=0",' u'"name":"test.extension",' u'"icon":"https://www.dropbox.com/static/images/icons64/page_white_text.png"}') for ext in video_ext: data = string.replace(file_data,'extension', ext) form_data = {'files': [data]} tasks = BulkTaskDropboxImport(**form_data).tasks() assert tasks[0]['info']['filename'] == "test.%s" % ext assert tasks[0]['info']['link'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?dl=0" % ext assert tasks[0]['info']['link_raw'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?raw=1" % ext assert tasks[0]['info']['video_url'] == "https://dl.dropboxusercontent.com/s/l2b77qvlrequ6gl/test.%s" % ext
def test_tasks_attributes_for_audio_files(self): #For audio file extension: link, filename, link_raw, audio_url audio_ext = ['mp4', 'm4a', 'mp3', 'ogg', 'oga', 'webm', 'wav'] file_data = (u'{"bytes":286,' u'"link":"https://www.dropbox.com/s/l2b77qvlrequ6gl/test.extension?dl=0",' u'"name":"test.extension",' u'"icon":"https://www.dropbox.com/static/images/icons64/page_white_text.png"}') for ext in audio_ext: data = string.replace(file_data,'extension', ext) form_data = {'files': [data]} tasks = BulkTaskDropboxImport(**form_data).tasks() assert tasks[0]['info']['filename'] == "test.%s" % ext assert tasks[0]['info']['link'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?dl=0" % ext assert tasks[0]['info']['link_raw'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?raw=1" % ext assert tasks[0]['info']['audio_url'] == "https://dl.dropboxusercontent.com/s/l2b77qvlrequ6gl/test.%s" % ext
def ParseWhois_INT(self): int_contacts = ( {"page_field": "Registrant", "rec_field": "registrant"}, {"page_field": "Administrative Contact", "rec_field": "administrative"}, {"page_field": "Technical Contact", "rec_field": "technical"}) page = string.replace(self.page, "\r\n", "\n") for contact in int_contacts: page_field = contact['page_field'] s = "%s:(.*)\n\W" % page_field m = re.search(s, page, re.DOTALL) #if m: print m.group(1) print "-------------------" ## ## ---------------------------------------------------------------------- ## ## ## ---------------------------------------------------------------------- ##
def strToHex(string): """ @param string: string to be converted into its hexadecimal value. @type string: C{str} @return: the hexadecimal converted string. @rtype: C{str} """ hexStr = "" for character in string: if character == "\n": character = " " hexChar = "%2x" % ord(character) hexChar = hexChar.replace(" ", "0") hexChar = hexChar.upper() hexStr += hexChar return hexStr
def fileToStr(fileName): """ @param fileName: file path to read the content and return as a no NEWLINE string. @type fileName: C{file.open} @return: the file content as a string without TAB and NEWLINE. @rtype: C{str} """ filePointer = open(fileName, "r") fileText = filePointer.read() fileText = fileText.replace(" ", "") fileText = fileText.replace("\t", "") fileText = fileText.replace("\r", "") fileText = fileText.replace("\n", " ") return fileText
def _do_attr(self, n, value): ''''_do_attr(self, node) -> None Process an attribute.''' W = self.write W(' ') W(n) W('="') s = string.replace(value, "&", "&") s = string.replace(s, "<", "<") s = string.replace(s, '"', '"') s = string.replace(s, '\011', '	') s = string.replace(s, '\012', '
') s = string.replace(s, '\015', '
') W(s) W('"')
def more (self): esc_from = self.esc_from esc_to = self.esc_to buffer = self.buffer + self.producer.more() if buffer: buffer = string.replace (buffer, esc_from, esc_to) i = self.find_prefix_at_end (buffer, esc_from) if i: # we found a prefix self.buffer = buffer[-i:] return buffer[:-i] else: # no prefix, return it all self.buffer = b'' return buffer else: return buffer
def do_put(self, s): try: params = s.split(' ') if len(params) > 1: src_path = params[0] dst_path = params[1] elif len(params) == 1: src_path = params[0] dst_path = '' src_file = os.path.basename(src_path) fh = open(src_path, 'rb') dst_path = string.replace(dst_path, '/','\\') import ntpath pathname = ntpath.join(ntpath.join(self.__pwd,dst_path), src_file) drive, tail = ntpath.splitdrive(pathname) logging.info("Uploading %s to %s" % (src_file, pathname)) self.__transferClient.putFile(drive[:-1]+'$', tail, fh.read) fh.close() except Exception, e: logging.critical(str(e)) pass
def do_cat(self, line, command = sys.stdout.write): pathName = string.replace(line,'/','\\') pathName = ntpath.normpath(ntpath.join(self.pwd,pathName)) res = self.findPathName(pathName) if res is None: logging.error("Not found!") return if res.isDirectory() > 0: logging.error("It's a directory!") return if res.isCompressed() or res.isEncrypted() or res.isSparse(): logging.error('Cannot handle compressed/encrypted/sparse files! :(') return stream = res.getStream(None) chunks = 4096*10 written = 0 for i in range(stream.getDataSize()/chunks): buf = stream.read(i*chunks, chunks) written += len(buf) command(buf) if stream.getDataSize() % chunks: buf = stream.read(written, stream.getDataSize() % chunks) command(buf) logging.info("%d bytes read" % stream.getDataSize())
def do_put(self, s): try: if self.transferClient is None: self.connect_transferClient() params = s.split(' ') if len(params) > 1: src_path = params[0] dst_path = params[1] elif len(params) == 1: src_path = params[0] dst_path = '/' src_file = os.path.basename(src_path) fh = open(src_path, 'rb') f = dst_path + '/' + src_file pathname = string.replace(f,'/','\\') logging.info("Uploading %s to %s\%s" % (src_file, self.share, dst_path)) self.transferClient.putFile(self.share, pathname, fh.read) fh.close() except Exception, e: logging.error(str(e)) pass self.send_data('\r\n')
def do_put(self, s): try: if self.transferClient is None: self.connect_transferClient() params = s.split(' ') if len(params) > 1: src_path = params[0] dst_path = params[1] elif len(params) == 1: src_path = params[0] dst_path = '/' src_file = os.path.basename(src_path) fh = open(src_path, 'rb') f = dst_path + '/' + src_file pathname = string.replace(f,'/','\\') logging.info("Uploading %s to %s\%s" % (src_file, self.share, dst_path)) self.transferClient.putFile(self.share, pathname.decode(sys.stdin.encoding), fh.read) fh.close() except Exception, e: logging.error(str(e)) pass self.send_data('\r\n')
def copy_file(self, src, tree, dst): LOG.info("Uploading file %s" % dst) if isinstance(src, str): # We have a filename fh = open(src, 'rb') else: # We have a class instance, it must have a read method fh = src f = dst pathname = string.replace(f,'/','\\') try: self.connection.putFile(tree, pathname, fh.read) except: LOG.critical("Error uploading file %s, aborting....." % dst) raise fh.close()
def retr_file(self, service, filename, callback, mode = FILE_OPEN, offset = 0, password = None, shareAccessMode = SMB_ACCESS_READ): filename = string.replace(filename, '/', '\\') fid = -1 tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password) try: fid = self.nt_create_andx(tid, filename, shareAccessMode = shareAccessMode, accessMask = 0x20089) res = self.query_file_info(tid, fid) datasize = SMBQueryFileStandardInfo(res)['EndOfFile'] self.__nonraw_retr_file(tid, fid, offset, datasize, callback) finally: if fid >= 0: self.close(tid, fid) self.disconnect_tree(tid)
def check_dir(self, service, path, password = None): path = string.replace(path,'/', '\\') tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password) try: smb = NewSMBPacket() smb['Tid'] = tid smb['Mid'] = 0 cmd = SMBCommand(SMB.SMB_COM_CHECK_DIRECTORY) cmd['Parameters'] = '' cmd['Data'] = SMBCheckDirectory_Data(flags = self.__flags2) cmd['Data']['DirectoryName'] = path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else path smb.addCommand(cmd) self.sendSMB(smb) while 1: s = self.recvSMB() if s.isValidAnswer(SMB.SMB_COM_CHECK_DIRECTORY): return finally: self.disconnect_tree(tid)
def rmdir(self, service, path, password = None): path = string.replace(path,'/', '\\') # Check that the directory exists self.check_dir(service, path, password) tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password) try: path = path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else path smb = NewSMBPacket() smb['Tid'] = tid createDir = SMBCommand(SMB.SMB_COM_DELETE_DIRECTORY) createDir['Data'] = SMBDeleteDirectory_Data(flags=self.__flags2) createDir['Data']['DirectoryName'] = path smb.addCommand(createDir) self.sendSMB(smb) while 1: s = self.recvSMB() if s.isValidAnswer(SMB.SMB_COM_DELETE_DIRECTORY): return finally: self.disconnect_tree(tid)
def mkdir(self, service, path, password = None): path = string.replace(path,'/', '\\') tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password) try: path = path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else path smb = NewSMBPacket() smb['Tid'] = tid smb['Mid'] = 0 createDir = SMBCommand(SMB.SMB_COM_CREATE_DIRECTORY) createDir['Data'] = SMBCreateDirectory_Data(flags=self.__flags2) createDir['Data']['DirectoryName'] = path smb.addCommand(createDir) self.sendSMB(smb) smb = self.recvSMB() if smb.isValidAnswer(SMB.SMB_COM_CREATE_DIRECTORY): return 1 return 0 finally: self.disconnect_tree(tid)
def rename(self, service, old_path, new_path, password = None): old_path = string.replace(old_path,'/', '\\') new_path = string.replace(new_path,'/', '\\') tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password) try: smb = NewSMBPacket() smb['Tid'] = tid smb['Mid'] = 0 renameCmd = SMBCommand(SMB.SMB_COM_RENAME) renameCmd['Parameters'] = SMBRename_Parameters() renameCmd['Parameters']['SearchAttributes'] = ATTR_SYSTEM | ATTR_HIDDEN | ATTR_DIRECTORY renameCmd['Data'] = SMBRename_Data(flags = self.__flags2) renameCmd['Data']['OldFileName'] = old_path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else old_path renameCmd['Data']['NewFileName'] = new_path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else new_path smb.addCommand(renameCmd) self.sendSMB(smb) smb = self.recvSMB() if smb.isValidAnswer(SMB.SMB_COM_RENAME): return 1 return 0 finally: self.disconnect_tree(tid)
def rmdir(self, shareName, pathName, password = None): # ToDo: Handle situations where share is password protected pathName = string.replace(pathName,'/', '\\') pathName = ntpath.normpath(pathName) if len(pathName) > 0 and pathName[0] == '\\': pathName = pathName[1:] treeId = self.connectTree(shareName) fileId = None try: fileId = self.create(treeId, pathName, desiredAccess=DELETE | FILE_READ_ATTRIBUTES | SYNCHRONIZE, shareMode=FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, creationOptions=FILE_DIRECTORY_FILE | FILE_OPEN_REPARSE_POINT, creationDisposition=FILE_OPEN, fileAttributes=0) from impacket import smb delete_req = smb.SMBSetFileDispositionInfo() delete_req['DeletePending'] = True self.setInfo(treeId, fileId, inputBlob=delete_req, fileInfoClass=SMB2_FILE_DISPOSITION_INFO) finally: if fileId is not None: self.close(treeId, fileId) self.disconnectTree(treeId) return True
def remove(self, shareName, pathName, password = None): # ToDo: Handle situations where share is password protected pathName = string.replace(pathName,'/', '\\') pathName = ntpath.normpath(pathName) if len(pathName) > 0 and pathName[0] == '\\': pathName = pathName[1:] treeId = self.connectTree(shareName) fileId = None try: fileId = self.create(treeId, pathName,DELETE | FILE_READ_ATTRIBUTES, FILE_SHARE_DELETE, FILE_NON_DIRECTORY_FILE | FILE_DELETE_ON_CLOSE, FILE_OPEN, 0) finally: if fileId is not None: self.close(treeId, fileId) self.disconnectTree(treeId) return True
def storeFile(self, shareName, path, callback, mode = FILE_OVERWRITE_IF, offset = 0, password = None, shareAccessMode = FILE_SHARE_WRITE): # ToDo: Handle situations where share is password protected path = string.replace(path,'/', '\\') path = ntpath.normpath(path) if len(path) > 0 and path[0] == '\\': path = path[1:] treeId = self.connectTree(shareName) fileId = None try: fileId = self.create(treeId, path, FILE_WRITE_DATA, shareAccessMode, FILE_NON_DIRECTORY_FILE, mode, 0) finished = False writeOffset = offset while not finished: data = callback(self._Connection['MaxWriteSize']) if len(data) == 0: break written = self.write(treeId, fileId, data, writeOffset, len(data)) writeOffset += written finally: if fileId is not None: self.close(treeId, fileId) self.disconnectTree(treeId)
def save_plot(self,plot_name,**kwargs): logger = logging.getLogger("plotting") if plot_name not in self.plots: logger.warn('Tried to generate a plot called %s that does not exist' % plot_name) # raise an exception here? else: # # the filename to save to is known by the handler, which needs to be assigned to this logger # # look at the handlers attached to this logger instance # ph=None # for h in self.handlers: # # we want an instance of a PlotHandler - we'll take the first one we find # # (behaviour will be unpredictable if there is more than one handler of this type) # if isinstance(h,PlotHandler): # ph=h # break # if ph: # TO DO - need to be sure of safe file names if not os.path.isdir(self.plot_path): os.makedirs(self.plot_path) filename = self.plot_path + "/" + string.replace(plot_name, " ", "_") + ".pdf" logger.info('Generating a plot in file %s' % filename) self.plots[plot_name].generate_plot(filename,**kwargs) # else: # logger.warn('No handler of type PlotHandler is attached to this logger - cannot save plots')
def parseResp(self, resp): def fan(v): def var_func_00000026(x): return [x, (x.upper)(), (x.lower)()] return distinct((lambda var_0000002B:selectMany(var_func_00000026, var_0000002B))([v, ((v.replace)("\\", "\\\\").replace)("\"", "\\\"")])) def split(v, t): for tag in fan(t): if (len(v) != 2): v = (v[0].split)(tag) else: break return v e = ((self.injector).emitter) if ((e.prefix) == None): return resp p = split([resp], (e.prefix)) if (len(p) < 2): return resp return split([p[1]], (e.suffix))[0]
def retr_file(self, service, filename, callback, mode = SMB_O_OPEN, offset = 0, password = None, timeout = None): filename = string.replace(filename, '/', '\\') fid = -1 tid = self.__connect_tree('\\\\' + self.__remote_name + '\\' + service, SERVICE_ANY, password, timeout) try: fid, attrib, lastwritetime, datasize, grantedaccess, filetype, devicestate, action, serverfid = self.__open_file(tid, filename, mode, SMB_ACCESS_READ | SMB_SHARE_DENY_WRITE) #if not datasize: datasize = self.__query_file_info(tid, fid) if self.__can_read_raw: self.__raw_retr_file(tid, fid, offset, datasize, callback) else: self.__nonraw_retr_file(tid, fid, offset, datasize, callback, timeout) finally: if fid >= 0: self.__close_file(tid, fid) self.__disconnect_tree(tid)
def stor_file(self, service, filename, callback, mode = SMB_O_CREAT | SMB_O_TRUNC, offset = 0, password = None, timeout = None): filename = string.replace(filename, '/', '\\') fid = -1 tid = self.__connect_tree('\\\\' + self.__remote_name + '\\' + service, SERVICE_ANY, password, timeout) try: fid, attrib, lastwritetime, datasize, grantedaccess, filetype, devicestate, action, serverfid = self.__open_file(tid, filename, mode, SMB_ACCESS_WRITE | SMB_SHARE_DENY_WRITE) # If the max_transmit buffer size is more than 16KB, upload process using non-raw mode is actually # faster than using raw-mode. if self.__max_transmit_size < 16384 and self.__can_write_raw: # Once the __raw_stor_file returns, fid is already closed self.__raw_stor_file(tid, fid, offset, datasize, callback, timeout) fid = -1 else: self.__nonraw_stor_file(tid, fid, offset, datasize, callback, timeout) finally: if fid >= 0: self.__close_file(tid, fid) self.__disconnect_tree(tid)
def googlesearch(query, ext): print query google="https://www.google.co.in/search?filter=0&q=site:" getrequrl="https://www.google.co.in/search?filter=0&num=100&q=%s&start=" % (query) hdr = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3', 'Accept-Encoding': 'none', 'Accept-Language': 'en-US,en;q=0.8', 'Connection': 'keep-alive'} req=urllib2.Request(getrequrl, headers=hdr) response=urllib2.urlopen(req) data = response.read() data=re.sub('<b>','',data) for e in ('>','=','<','\\','(',')','"','http',':','//'): data = string.replace(data,e,' ') r1 = re.compile('[-_.a-zA-Z0-9.-_]*'+'\.'+ ext) res = r1.findall(data) if res==[]: print "No results were found" else: return res
def test_inplace_rewrites(self): # Check that strings don't copy and modify cached single-character strings self.checkequal('a', 'A', 'lower') self.checkequal(True, 'A', 'isupper') self.checkequal('A', 'a', 'upper') self.checkequal(True, 'a', 'islower') self.checkequal('a', 'A', 'replace', 'A', 'a') self.checkequal(True, 'A', 'isupper') self.checkequal('A', 'a', 'capitalize') self.checkequal(True, 'a', 'islower') self.checkequal('A', 'a', 'swapcase') self.checkequal(True, 'a', 'islower') self.checkequal('A', 'a', 'title') self.checkequal(True, 'a', 'islower')
def _ExpandVariables(self, data, substitutions): """Expands variables "$(variable)" in data. Args: data: object, can be either string, list or dictionary substitutions: dictionary, variable substitutions to perform Returns: Copy of data where each references to "$(variable)" has been replaced by the corresponding value found in substitutions, or left intact if the key was not found. """ if isinstance(data, str): for key, value in substitutions.iteritems(): data = data.replace('$(%s)' % key, value) return data if isinstance(data, list): return [self._ExpandVariables(v, substitutions) for v in data] if isinstance(data, dict): return {k: self._ExpandVariables(data[k], substitutions) for k in data} return data
def format_invoke_command(self, string): """ Formats correctly the string ouput from the invoke() method, replacing line breaks and tabs when necessary. """ string = string.replace('\\n', '\n') formated_response = '' for line in string.splitlines(): if line.startswith('REPORT'): line = line.replace('\t', '\n') if line.startswith('[DEBUG]'): line = line.replace('\t', ' ') formated_response += line + '\n' formated_response = formated_response.replace('\n\n', '\n') return formated_response
def is_http_log_entry(self, string): """ Determines if a log entry is an HTTP-formatted log string or not. """ # Debug event filter if 'Zappa Event' in string: return False # IP address filter for token in string.replace('\t', ' ').split(' '): try: if (token.count('.') is 3 and token.replace('.', '').isnumeric()): return True except Exception: # pragma: no cover pass return False
def readItems(data_dir): fr = open(data_dir,'r') alllines = fr.readlines() num = len(alllines) cmv = np.zeros((num,3)) imgID = [] for i in range(num): line = alllines[i] temp = string.replace(line,'\r','');temp = string.replace(temp,'\n',''); temp = temp.split(' ') imgID.append(temp[2]) cmv[i,:] = [temp[0],temp[1],temp[3]] return cmv, imgID
def sub(self, s): for k, v in self.macros.items(): s = string.replace(s, k, v) return s
def escape(s, replace=string.replace): s = replace(s, "&", "&") s = replace(s, "<", "<") return replace(s, ">", ">",)
def _platform(*args): """ Helper to format the platform string in a filename compatible format e.g. "system-version-machine". """ # Format the platform string platform = string.join( map(string.strip, filter(len, args)), '-') # Cleanup some possible filename obstacles... replace = string.replace platform = replace(platform,' ','_') platform = replace(platform,'/','-') platform = replace(platform,'\\','-') platform = replace(platform,':','-') platform = replace(platform,';','-') platform = replace(platform,'"','-') platform = replace(platform,'(','-') platform = replace(platform,')','-') # No need to report 'unknown' information... platform = replace(platform,'unknown','') # Fold '--'s and remove trailing '-' while 1: cleaned = replace(platform,'--','-') if cleaned == platform: break platform = cleaned while platform[-1] == '-': platform = platform[:-1] return platform
def _syscmd_file(target,default=''): """ Interface to the system's file command. The function uses the -b option of the file command to have it ommit the filename in its output and if possible the -L option to have the command follow symlinks. It returns default in case the command should fail. """ if sys.platform in ('dos','win32','win16','os2'): # XXX Others too ? return default target = _follow_symlinks(target).replace('"', '\\"') try: f = os.popen('file "%s" 2> %s' % (target, DEV_NULL)) except (AttributeError,os.error): return default output = string.strip(f.read()) rc = f.close() if not output or rc: return default else: return output ### Information about the used architecture # Default values for architecture; non-empty strings override the # defaults given as parameters
def handle_data( self, data ) : data = string.replace( data, '\r', '' ) ...
def delspace(s): return string.replace(s, ' ', '')
def safe(self, input): input = string.replace(input, "'", "\\'") input = string.replace(input, '"', '\\"') input = string.replace(input, ";", "\\;") input = string.replace(input, "%", "\\%") input = string.replace(input, "_", "\\_") return input
def exec_python(self, result): # Condition the code. Replace all tabs with four spaces. Then make # sure that we unindent every line by the indentation level of the # first line. code = result.group('code') code = string.replace(code, '\t', ' ') result2 = re.search(r'(?P<prefix>\n[ ]*)[#a-zA-Z0-9''"]', code) if not result2: raise ParsingError,'Invalid template code expression: ' + code code = string.replace(code, result2.group('prefix'), '\n') code = code + '\n' try: self.global_dict['OUTPUT_TEXT'] = '' if self.local_dict: exec code in self.global_dict, self.local_dict else: exec code in self.global_dict return self.global_dict['OUTPUT_TEXT'] except: self.errorLogger('\n---- Error parsing: ----\n') self.errorLogger(code) self.errorLogger('\n------------------------\n') raise # Subroutine called from re module for every block of code to be # evaluated. Returned the result of the evaluation (should be a string).
def getQuantity(quantity): quantity = string.replace(string.upper(quantity),'K','000') quantity = int(quantity) return quantity