我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用os.path.strip()。
def which(program): import os def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None
def which( program ): # Tries to locate a program import os if os.name == 'nt': program_ext = os.path.splitext( program )[1] if program_ext == "": prog_exe = which( program + ".exe" ) if prog_exe != None: return prog_exe return which( program + ".com" ) def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None
def which(program): def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None #----------------------------------------------------------------------
def which(program): def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None
def read_tags(self): tags = {} if loggedin_users(self.userblacklist) == 0 and os.path.exists(self.tagfile): os.remove(self.tagfile) return tags if os.path.exists(self.tagfile): finput = None try: f = open(self.tagfile) finput = f.read().strip() f.close() except: self.log.error("Cannot open tag file %s" % (self.tagfile,)) return tags for line in finput.split("\n"): if line.startswith("#") or not line.strip(): continue if not ":" in line: continue linelist = [ i.strip() for i in line.split(":") ] if len(linelist): tags.update({linelist[0] : linelist[1]}) return tags
def which(program): import os def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return True else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return True return "Not found"
def get(self): root = etree.Element('sitemapindex') root.attrib['xmlns'] = 'http://www.sitemaps.org/schemas/sitemap/0.9' bucket_name = 'dancedeets-hrd.appspot.com' bucket = client.get_bucket(bucket_name) recent_date = get_mapreduce_date(bucket, 'Generate FUTURE Sitemaps') root.append(sitemap_node('recent', recent_date)) versioned_date = get_mapreduce_date(bucket, 'Generate Sitemaps') path = get_newest_path(bucket, 'Generate Sitemaps') version = os.path.basename(path.strip('/')) print path, version for i in range(MapReduceShards): root.append(sitemap_node('%s-%s' % (version, i), versioned_date)) root_data = etree.tostring(root, pretty_print=True) self.response.headers["Content-Type"] = "text/xml" self.response.out.write(root_data)
def which(program): import os def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, _ = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None
def LoadString(self, data): if data is None or len(data.strip()) == 0: return False flag = False try: self.rootNode = ElementTree.fromstring(data) if self.rootNode is not None: flag = True self.currentNode = self.rootNode except Exception, e: logging.error("XML??????") logging.error(e.__str__()) return flag # ??????????
def FindNode(self, path): if path is None or len(path.strip()) == 0: return XmlNode(None, self.rootNode) path = path.strip() node = None if path[0] == '/': node = self.rootNode.find(path[1:]) else: node = self.currentNode.find(path) return XmlNode(node, self.rootNode) # ?????
def FindNodes(self, path): if path is None or len(path.strip()) == 0: return XmlNode(None, self.rootNode) if path[0] == '/': nodes = self.rootNode.findall(path[1:]) else: nodes = self.currentNode.findall(path) return [XmlNode(node, self.rootNode) for node in nodes] # ???????
def GetChildrenMap(self, tag=None, keyType=XmlNodeMap.ATTR, keyName="name", valueType=XmlNodeMap.TEXT, valueName=None, valueFormat=XmlNodeValue.STRING): data = {} for node in self.GetChildrens(tag=tag): k, v = None, None if keyType == XmlNodeMap.ATTR: if keyName is None or len(keyName.strip()) == 0: continue k = node.GetAttrs().GetStr(keyName) elif keyType == XmlNodeMap.NODE: if keyName is None or len(keyName.strip()) == 0: continue t = node.FindNode(keyName) if not t.IsLoad(): continue k = t.GetStr() elif keyType == XmlNodeMap.TEXT: k = node.GetStr() else: continue if k is None or len(k.strip()) == 0: continue if valueType == XmlNodeMap.ATTR: if valueName is None or len(valueName.strip()) == 0: continue v = self.GetFormatData(node.GetAttrs(), valueFormat) elif valueType == XmlNodeMap.NODE: if valueName is None or len(valueName.strip()) == 0: continue t = node.FindNode(valueName) if t.IsLoad(): v = self.GetFormatData(t, valueFormat) elif valueType == XmlNodeMap.TEXT: v = self.GetFormatData(node, valueFormat) else: v = None data[k] = v return data # ??????
def GetStr(self, default="", strip=True): data = self.GetData() if data is None: return default try: data = str(data.encode("utf-8")) if data is None: data = default else: if strip: data = data.strip() except Exception, e: print e data = default return data
def spawn(self, argv=None, term=None): if argv is None: if 'SHELL' in os.environ: argv = [os.environ['SHELL']] elif 'PATH' in os.environ: #searching sh in the path. It can be unusual like /system/bin/sh on android for shell in ["bash","sh","ksh","zsh","csh","ash"]: for path in os.environ['PATH'].split(':'): fullpath=os.path.join(path.strip(),shell) if os.path.isfile(fullpath): argv=[fullpath] break if argv: break if not argv: argv= ['/bin/sh'] if term is not None: os.environ['TERM']=term master, slave = pty.openpty() self.slave=slave self.master = os.fdopen(master, 'rb+wb', 0) # open file in an unbuffered mode flags = fcntl.fcntl(self.master, fcntl.F_GETFL) assert flags>=0 flags = fcntl.fcntl(self.master, fcntl.F_SETFL , flags | os.O_NONBLOCK) assert flags>=0 self.prog = subprocess.Popen( shell=False, args=argv, stdin=slave, stdout=slave, stderr=subprocess.STDOUT, preexec_fn=prepare )
def get_stylesheet_list(settings): """ Retrieve list of stylesheet references from the settings object. """ assert not (settings.stylesheet and settings.stylesheet_path), ( 'stylesheet and stylesheet_path are mutually exclusive.') stylesheets = settings.stylesheet_path or settings.stylesheet or [] # programmatically set default can be string or unicode: if not isinstance(stylesheets, list): stylesheets = [path.strip() for path in stylesheets.split(',')] # expand relative paths if found in stylesheet-dirs: return [find_file_in_dirs(path, settings.stylesheet_dirs) for path in stylesheets]
def _parseDuration(s): from flask import current_app logger = current_app.logger from datetime import timedelta for timefmt in TIMEFORMATS: logger.debug("timefmt is {}".format(timefmt)) match = re.match(r'\s*' + timefmt + r'\s*$', s, re.I) logger.debug("Match is {}".format(match)) if match and match.group(0).strip(): mdict = match.groupdict() logger.debug("mdict is {}".format(mdict)) return timedelta(seconds=sum( [MULTIPLIERS[k] * float(v) for (k, v) in list(mdict.items()) if v is not None]))
def createHdfsPath(path): return os.path.join("hdfs://", hadoopNameNode, path.strip("/"))
def webhdfsGetRequest(path, op, allow_redirects=False): url = os.path.join(hadoopWebhdfsHost, path.strip("/")) response = requests.get("%s?op=%s" % (url, op), allow_redirects=allow_redirects, verify=validateKnoxSSL, auth=(username, password)) print ">>> Status: %d (%s)" % (response.status_code, url) return response.json()
def webhdfsPutRequest(path, op, allow_redirects=False): url = os.path.join(hadoopWebhdfsHost, path.strip("/")) response = requests.put("%s?op=%s" % (url, op), "", allow_redirects=allow_redirects, verify=validateKnoxSSL, auth=(username, password)) print ">>> Status: %d (%s)" % (response.status_code, url) return response
def get_deepth(path): plat_sys = platform.system() deepth = 0 if plat_sys == "Windows": path = path.strip("\\") deepth = path.count("\\") + 1 elif plat_sys == "Linux": path = path.strip("/") deepth = path.count("/") + 1 return deepth
def GetFilePath(uri): path = urllib.request.url2pathname(uri) # escape special chars path = path.strip('\r\n\x00') # remove \r\n and NULL # get the path to file if path.startswith('file://'): # nautilus, rox path = path[7:] # 7 is len('file://') return path
def process_pov(self, filename): if not self.args or not self.args.pov: if self.verbose > 0: print('No POV handler registered, pipeline stopping') return pov_args = [ self.args.pov, ] if self.args.ini: ini = random.choice(self.args.ini.split(',')).strip() if ini.endswith('[]'): section = random.choice(read_ini_sections(ini[:-2])) ini = '{}[{}]'.format(ini[:-2], section) pov_args.append(ini) else: # Default size pov_args.extend(['+W1024', '+H768']) if self.args.include_path: pov_args.extend('+L{}'.format(path.strip()) for path in self.args.include_path.split(',')) pov_args.extend([ '+I{}'.format(filename), '+O{}'.format(OUT_FILENAME), '-P', '-D', '-V', '+FN8' ]) with timed(is_verbose(1), 'POV-Ray is rendering maze...', 'Maze rendered in {0:.3f}s'): subprocess.check_call(pov_args) if self.args.keys: self.tweet(filename=OUT_FILENAME)
def _split_path(self, path): return path.strip(os.sep).split(os.sep)
def testRun(self): """ / p3 --- \ p1 -- p2 \ / p8 \ \ p4 \ p7 p10 p6 / \ p9 p5 / """ from collections import OrderedDict ProcTree.NODES = OrderedDict() p1 = Proc() p2 = Proc() p3 = Proc() p4 = Proc() p5 = Proc() p6 = Proc() p7 = Proc() p8 = Proc() p9 = Proc() p10 = Proc() p1.profile = 'proc' p2.addDepends(p1) p10.addDepends(p1) p3.addDepends(p2) p4.addDepends(p2) p6.addDepends(p4, p5) p7.addDepends(p3, p6) p8.addDepends(p7) p9.addDepends(p7) with captured_output() as (out, err): PyPPL(config = { 'log': { 'levels': 'all', 'file': None } }).start(p1, p5).run() errmsgs = [e for e in err.getvalue().splitlines() if 'SUBMIT' in e] errmsgs = [e[(e.index('Running')+8):-4].strip() for e in errmsgs] self.assertEqual(errmsgs, ['p1', 'p2', 'p3', 'p4', 'p5', 'p6', 'p7', 'p8', 'p9', 'p10'])
def normalise_path(path): path = path.strip() if path.startswith('./'): path = path[2:] return path
def _is_valid_path(self, line): if line == None: return False line = line.strip() return len(line) > 0 and not line.startswith('#')
def to_args(view): parts = [s.strip() for s in view.name().split('-')] if len(parts) == 1: return {} elif len(parts) == 2: return { 'prefix': parts[1] } else: raise Exception('Not a ListFiles window {0}.'.format(view.name()))
def GetFilePath(uri): path = urllib.url2pathname(uri) # escape special chars path = path.strip('\r\n\x00') # remove \r\n and NULL # get the path to file if path.startswith('file://'): # nautilus, rox path = path[7:] # 7 is len('file://') return path
def extract_name_value(line): """ Return a list of (name, value) from a line of the form "name=value ...". :Exception: `NameValueError` for invalid input (missing name, missing data, bad quotes, etc.). """ attlist = [] while line: equals = line.find('=') if equals == -1: raise NameValueError('missing "="') attname = line[:equals].strip() if equals == 0 or not attname: raise NameValueError( 'missing attribute name before "="') line = line[equals+1:].lstrip() if not line: raise NameValueError( 'missing value after "%s="' % attname) if line[0] in '\'"': endquote = line.find(line[0], 1) if endquote == -1: raise NameValueError( 'attribute "%s" missing end quote (%s)' % (attname, line[0])) if len(line) > endquote + 1 and line[endquote + 1].strip(): raise NameValueError( 'attribute "%s" end quote (%s) not followed by ' 'whitespace' % (attname, line[0])) data = line[1:endquote] line = line[endquote+1:].lstrip() else: space = line.find(' ') if space == -1: data = line line = '' else: data = line[:space] line = line[space+1:].lstrip() attlist.append((attname.lower(), data)) return attlist
def loggedin_users(blacklist): systemusers = blacklist users = [] f = open("/etc/passwd") finput = f.read().strip() f.close() for line in finput.split("\n"): if re.match("^\s*$", line): continue linelist = re.split(":", line) if linelist[1] == "x": if len(linelist[0]) <= 8: systemusers.append(linelist[0]) else: systemusers.append(linelist[0]) systemusers.append(linelist[0][:7]+"+") else: uid = None try: uid = int(linelist[2]) if uid < 1024: if len(linelist[0]) <= 8: systemusers.append(linelist[0]) else: systemusers.append(linelist[0]) systemusers.append(linelist[0][:7]+"+") except: pass cmd = ["ps -ef"] try: p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (out, err) = p.communicate() except subprocess.CalledProcessError, e: return -1 if p.returncode: return -1 if not out: return -1 if err: return -1 for line in out.split('\n'): if line.startswith("UID"): continue linelist = re.split("\s+", line) if linelist[0] != "" and linelist[0] not in systemusers and not re.match("\d+", linelist[0]): users.append(linelist[0]) return len(set(users))
def generate_layer(self, layer=True, path=None): """Creates an in-memory layer and displays/saves it.""" # Create a new layer in memory new_layer = QgsVectorLayer('Polygon', "compactness_scores", "memory") provider = new_layer.dataProvider() # Add the new fields new_fields = sorted(self.scores.keys()) # need the same order every time fields = self.features[0].fields() # QgsFields object for field in new_fields: fields.append(QgsField(field, QVariant.Double, '', 20, 4)) # All changes to the layer happen below new_layer.startEditing() # Set layer CRS # NOTE A warning message will still appear in QGIS # because this isn't set at layer creation time. new_layer.setCrs(self.crs) # Set layer attributes (fields) provider.addAttributes(fields.toList()) new_layer.updateFields() # Update features with new data for i in xrange(0, len(self.features)): attributes = self.features[i].attributes() # list of values for field in new_fields: attributes.append(float(self.scores[field][i])) self.features[i].setAttributes(attributes) # Add features provider.addFeatures(self.features) new_layer.commitChanges() new_layer.updateExtents() if path.strip() != "": if path.endswith(".json") or path.endswith(".geojson"): filetype = 'GeoJson' elif path.endswith(".shp"): filetype = 'ESRI Shapefile' else: QMessageBox.critical(self.dlg, 'Error', u"Unsupported file type. Only GeoJSON (.json, .geojson) and ESRI Shapefile (.shp) are supported.") return False QgsVectorFileWriter.writeAsVectorFormat(new_layer, path, 'utf-8', new_layer.crs(), filetype) print "Saved to " + path if layer: QgsMapLayerRegistry.instance().addMapLayer(new_layer) return True