我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ntpath.normpath()。
def do_cat(self, line, command = sys.stdout.write): pathName = string.replace(line,'/','\\') pathName = ntpath.normpath(ntpath.join(self.pwd,pathName)) res = self.findPathName(pathName) if res is None: logging.error("Not found!") return if res.isDirectory() > 0: logging.error("It's a directory!") return if res.isCompressed() or res.isEncrypted() or res.isSparse(): logging.error('Cannot handle compressed/encrypted/sparse files! :(') return stream = res.getStream(None) chunks = 4096*10 written = 0 for i in range(stream.getDataSize()/chunks): buf = stream.read(i*chunks, chunks) written += len(buf) command(buf) if stream.getDataSize() % chunks: buf = stream.read(written, stream.getDataSize() % chunks) command(buf) logging.info("%d bytes read" % stream.getDataSize())
def mkdir(self, shareName, pathName, password = None): # ToDo: Handle situations where share is password protected pathName = string.replace(pathName,'/', '\\') pathName = ntpath.normpath(pathName) if len(pathName) > 0 and pathName[0] == '\\': pathName = pathName[1:] treeId = self.connectTree(shareName) fileId = None try: fileId = self.create(treeId, pathName,GENERIC_ALL ,FILE_SHARE_READ | FILE_SHARE_WRITE |FILE_SHARE_DELETE, FILE_DIRECTORY_FILE | FILE_SYNCHRONOUS_IO_NONALERT, FILE_CREATE, 0) finally: if fileId is not None: self.close(treeId, fileId) self.disconnectTree(treeId) return True
def rmdir(self, shareName, pathName, password = None): # ToDo: Handle situations where share is password protected pathName = string.replace(pathName,'/', '\\') pathName = ntpath.normpath(pathName) if len(pathName) > 0 and pathName[0] == '\\': pathName = pathName[1:] treeId = self.connectTree(shareName) fileId = None try: fileId = self.create(treeId, pathName, desiredAccess=DELETE | FILE_READ_ATTRIBUTES | SYNCHRONIZE, shareMode=FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, creationOptions=FILE_DIRECTORY_FILE | FILE_OPEN_REPARSE_POINT, creationDisposition=FILE_OPEN, fileAttributes=0) from impacket import smb delete_req = smb.SMBSetFileDispositionInfo() delete_req['DeletePending'] = True self.setInfo(treeId, fileId, inputBlob=delete_req, fileInfoClass=SMB2_FILE_DISPOSITION_INFO) finally: if fileId is not None: self.close(treeId, fileId) self.disconnectTree(treeId) return True
def storeFile(self, shareName, path, callback, mode = FILE_OVERWRITE_IF, offset = 0, password = None, shareAccessMode = FILE_SHARE_WRITE): # ToDo: Handle situations where share is password protected path = string.replace(path,'/', '\\') path = ntpath.normpath(path) if len(path) > 0 and path[0] == '\\': path = path[1:] treeId = self.connectTree(shareName) fileId = None try: fileId = self.create(treeId, path, FILE_WRITE_DATA, shareAccessMode, FILE_NON_DIRECTORY_FILE, mode, 0) finished = False writeOffset = offset while not finished: data = callback(self._Connection['MaxWriteSize']) if len(data) == 0: break written = self.write(treeId, fileId, data, writeOffset, len(data)) writeOffset += written finally: if fileId is not None: self.close(treeId, fileId) self.disconnectTree(treeId)
def checkIntegrity(): """ Checks integrity of code files during the unhandled exceptions """ if not paths: return logger.debug("running code integrity check") retVal = True for checksum, _ in (re.split(r'\s+', _) for _ in getFileItems(paths.CHECKSUM_MD5)): path = os.path.normpath(os.path.join(paths.SQLMAP_ROOT_PATH, _)) if not os.path.isfile(path): logger.error("missing file detected '%s'" % path) retVal = False elif md5File(path) != checksum: logger.error("wrong checksum of file '%s' detected" % path) retVal = False return retVal
def remove(self, shareName, pathName, password = None): # ToDo: Handle situations where share is password protected pathName = string.replace(pathName,'/', '\\') pathName = ntpath.normpath(pathName) if len(pathName) > 0 and pathName[0] == '\\': pathName = pathName[1:] treeId = self.connectTree(shareName) fileId = None try: fileId = self.create(treeId, pathName,DELETE | FILE_READ_ATTRIBUTES, FILE_SHARE_DELETE, FILE_NON_DIRECTORY_FILE | FILE_DELETE_ON_CLOSE, FILE_OPEN, 0) finally: if fileId is not None: self.close(treeId, fileId) self.disconnectTree(treeId) return True
def printSources(self, hierarchy, commonprefix): sorteditems = sorted(hierarchy.items(), key=lambda a: a[0].lower()) # First folders, then files for key, value in sorteditems: if SCons.Util.is_Dict(value): self.file.write('\t\t\t<Filter\n' '\t\t\t\tName="%s"\n' '\t\t\t\tFilter="">\n' % (key)) self.printSources(value, commonprefix) self.file.write('\t\t\t</Filter>\n') for key, value in sorteditems: if SCons.Util.is_String(value): file = value if commonprefix: file = os.path.join(commonprefix, value) file = os.path.normpath(file) self.file.write('\t\t\t<File\n' '\t\t\t\tRelativePath="%s">\n' '\t\t\t</File>\n' % (file))
def printSources(self, hierarchy, kind, commonprefix, filter_name): keywords = {'Source Files': 'ClCompile', 'Header Files': 'ClInclude', 'Local Headers': 'ClInclude', 'Resource Files': 'None', 'Other Files': 'None'} sorteditems = sorted(hierarchy.items(), key = lambda a: a[0].lower()) # First folders, then files for key, value in sorteditems: if SCons.Util.is_Dict(value): self.printSources(value, kind, commonprefix, filter_name + '\\' + key) for key, value in sorteditems: if SCons.Util.is_String(value): file = value if commonprefix: file = os.path.join(commonprefix, value) file = os.path.normpath(file) self.file.write('\t\t<%s Include="%s" />\n' % (keywords[kind], file)) self.filters_file.write('\t\t<%s Include="%s">\n' '\t\t\t<Filter>%s</Filter>\n' '\t\t</%s>\n' % (keywords[kind], file, filter_name, keywords[kind]))
def rmdir(self, shareName, pathName, password = None): # ToDo: Handle situations where share is password protected pathName = string.replace(pathName,'/', '\\') pathName = ntpath.normpath(pathName) if len(pathName) > 0 and pathName[0] == '\\': pathName = pathName[1:] treeId = self.connectTree(shareName) fileId = None try: fileId = self.create(treeId, pathName, DELETE, FILE_SHARE_DELETE, FILE_DIRECTORY_FILE | FILE_DELETE_ON_CLOSE, FILE_OPEN, 0) finally: if fileId is not None: self.close(treeId, fileId) self.disconnectTree(treeId) return True
def checkIntegrity(): """ Checks integrity of code files during the unhandled exceptions """ logger.debug("running code integrity check") retVal = True for checksum, _ in (re.split(r'\s+', _) for _ in getFileItems(paths.CHECKSUM_MD5)): path = os.path.normpath(os.path.join(paths.SQLMAP_ROOT_PATH, _)) if not os.path.isfile(path): logger.error("missing file detected '%s'" % path) retVal = False elif hashlib.md5(open(path, 'rb').read()).hexdigest() != checksum: logger.error("wrong checksum of file '%s' detected" % path) retVal = False return retVal
def do_get(self, src_path): try: import ntpath newPath = ntpath.normpath(ntpath.join(self.__pwd, src_path)) drive, tail = ntpath.splitdrive(newPath) filename = ntpath.basename(tail) fh = open(filename,'wb') logging.info("Downloading %s\\%s" % (drive, tail)) self.__transferClient.getFile(drive[:-1]+'$', tail, fh.write) fh.close() except Exception, e: logging.error(str(e)) os.remove(filename) pass
def do_cd(self, s): self.execute_remote('cd ' + s) if len(self.__outputBuffer.strip('\r\n')) > 0: print self.__outputBuffer self.__outputBuffer = '' else: self.__pwd = ntpath.normpath(ntpath.join(self.__pwd, s)) self.execute_remote('cd ') self.__pwd = self.__outputBuffer.strip('\r\n') self.prompt = self.__pwd + '>' self.__outputBuffer = ''
def do_cd(self, line): p = string.replace(line,'/','\\') oldpwd = self.pwd newPath = ntpath.normpath(ntpath.join(self.pwd,p)) if newPath == self.pwd: # Nothing changed return common = ntpath.commonprefix([newPath,oldpwd]) if common == oldpwd: res = self.findPathName(ntpath.normpath(p)) else: res = self.findPathName(newPath) if res is None: logging.error("Directory not found") self.pwd = oldpwd return if res.isDirectory() == 0: logging.error("Not a directory!") self.pwd = oldpwd return else: self.currentINode = res self.do_ls('', False) self.pwd = ntpath.join(self.pwd,p) self.pwd = ntpath.normpath(self.pwd) self.prompt = self.pwd + '>'
def do_get(self, line): pathName = string.replace(line,'/','\\') pathName = ntpath.normpath(ntpath.join(self.pwd,pathName)) fh = open(ntpath.basename(pathName),"wb") self.do_cat(line, command = fh.write) fh.close()
def do_cd(self, s): self.execute_remote('cd ' + s) if len(self.__outputBuffer.strip('\r\n')) > 0: print self.__outputBuffer.decode(CODEC) self.__outputBuffer = '' else: self.__pwd = ntpath.normpath(ntpath.join(self.__pwd, s.decode(sys.stdin.encoding))) self.execute_remote('cd ') self.__pwd = self.__outputBuffer.strip('\r\n').decode(CODEC) self.prompt = unicode(self.__pwd + '>').encode(sys.stdout.encoding) self.__outputBuffer = ''
def rename(self, shareName, oldPath, newPath): oldPath = string.replace(oldPath,'/', '\\') oldPath = ntpath.normpath(oldPath) if len(oldPath) > 0 and oldPath[0] == '\\': oldPath = oldPath[1:] newPath = string.replace(newPath,'/', '\\') newPath = ntpath.normpath(newPath) if len(newPath) > 0 and newPath[0] == '\\': newPath = newPath[1:] treeId = self.connectTree(shareName) fileId = None try: fileId = self.create(treeId, oldPath, MAXIMUM_ALLOWED ,FILE_SHARE_READ | FILE_SHARE_WRITE |FILE_SHARE_DELETE, 0x200020, FILE_OPEN, 0) renameReq = FILE_RENAME_INFORMATION_TYPE_2() renameReq['ReplaceIfExists'] = 1 renameReq['RootDirectory'] = '\x00'*8 renameReq['FileNameLength'] = len(newPath)*2 renameReq['FileName'] = newPath.encode('utf-16le') self.setInfo(treeId, fileId, renameReq, infoType = SMB2_0_INFO_FILE, fileInfoClass = SMB2_FILE_RENAME_INFO) finally: if fileId is not None: self.close(treeId, fileId) self.disconnectTree(treeId) return True
def listPath(self, shareName, path, password = None): # ToDo: Handle situations where share is password protected path = string.replace(path,'/', '\\') path = ntpath.normpath(path) if len(path) > 0 and path[0] == '\\': path = path[1:] treeId = self.connectTree(shareName) fileId = None try: # ToDo, we're assuming it's a directory, we should check what the file type is fileId = self.create(treeId, ntpath.dirname(path), FILE_READ_ATTRIBUTES | FILE_READ_DATA ,FILE_SHARE_READ | FILE_SHARE_WRITE |FILE_SHARE_DELETE, FILE_DIRECTORY_FILE | FILE_SYNCHRONOUS_IO_NONALERT, FILE_OPEN, 0) res = '' files = [] from impacket import smb while True: try: res = self.queryDirectory( treeId, fileId, ntpath.basename(path), maxBufferSize = 65535, informationClass = FILE_FULL_DIRECTORY_INFORMATION ) nextOffset = 1 while nextOffset != 0: fileInfo = smb.SMBFindFileFullDirectoryInfo(smb.SMB.FLAGS2_UNICODE) fileInfo.fromString(res) files.append(smb.SharedFile(fileInfo['CreationTime'],fileInfo['LastAccessTime'],fileInfo['LastChangeTime'],fileInfo['EndOfFile'],fileInfo['AllocationSize'],fileInfo['ExtFileAttributes'],fileInfo['FileName'].decode('utf-16le'), fileInfo['FileName'].decode('utf-16le'))) nextOffset = fileInfo['NextEntryOffset'] res = res[nextOffset:] except SessionError, e: if (e.get_error_code()) != STATUS_NO_MORE_FILES: raise break finally: if fileId is not None: self.close(treeId, fileId) self.disconnectTree(treeId) return files
def retrieveFile(self, shareName, path, callback, mode = FILE_OPEN, offset = 0, password = None, shareAccessMode = FILE_SHARE_READ): # ToDo: Handle situations where share is password protected path = string.replace(path,'/', '\\') path = ntpath.normpath(path) if len(path) > 0 and path[0] == '\\': path = path[1:] treeId = self.connectTree(shareName) fileId = None from impacket import smb try: fileId = self.create(treeId, path, FILE_READ_DATA, shareAccessMode, FILE_NON_DIRECTORY_FILE, mode, 0) res = self.queryInfo(treeId, fileId) fileInfo = smb.SMBQueryFileStandardInfo(res) fileSize = fileInfo['EndOfFile'] if (fileSize-offset) < self._Connection['MaxReadSize']: # Skip reading 0 bytes files. if (fileSize-offset) > 0: data = self.read(treeId, fileId, offset, fileSize-offset) callback(data) else: written = 0 toBeRead = fileSize-offset while written < toBeRead: data = self.read(treeId, fileId, offset, self._Connection['MaxReadSize']) written += len(data) offset += len(data) callback(data) finally: if fileId is not None: self.close(treeId, fileId) self.disconnectTree(treeId)
def adjustDcfFileRef(self,dcf,basedir): for elem in dcf['fileRef']: basename = ntpath.basename(elem['path']) dirname=ntpath.normpath(ntpath.join(basedir,ntpath.dirname(elem['path']))) elem['path']=ntpath.join(dirname,basename) if os.path.sep == '/': #are we running in cygwin/Linux? elem['path'] = elem['path'].replace(r'\\','/')
def _adjustFileRef(self,fileRef,basedir): basename = ntpath.basename(fileRef['path']) dirname=ntpath.normpath(ntpath.join(basedir,ntpath.dirname(fileRef['path']))) retval=ntpath.join(dirname,basename) if os.path.sep == '/': #are we running in cygwin/Linux? retval = retval.replace(r'\\','/') return retval
def pathify(self, path): root = ntpath.join(path,'*') root = root.replace('/','\\') #root = ntpath.normpath(root) return root
def list_path(self, host, share, path, pattern, verbose=False, only_output=False): ''' List shares and regex search for files in the shares to download If only_output, will not try to download ''' pwd = self.pathify(path) width = 16 try: output_file = True pathList = self.smbconn[host].listPath(share, pwd) if verbose: info('\t.%s' % (path.ljust(50))) if only_output: if not pathList: return [] return pathList for item in pathList: filesize = item.get_filesize() readonly = 'w' if item.is_readonly() > 0 else 'r' date = time.ctime(float(item.get_mtime_epoch())) isDir = 'd' if item.is_directory() > 0 else 'f' filename = item.get_longname() if item.is_directory() <= 0: info(pattern.lower(), filename.lower()) fileMatch = re.search(pattern.lower(), filename.lower()) if fileMatch: dlThis = '%s\\%s/%s' % (share, ntpath.normpath(pwd.strip('*')), filename) dlThis = dlThis.replace('/','\\') output_file = self.download_file(host, dlThis, True) if verbose: info('\t%s%s--%s--%s-- %s %s\t%s' % (isDir, readonly, readonly, readonly, str(filesize).rjust(width), date, filename)) return output_file except Exception as e: # import traceback; traceback.print_exc() return []
def delete_file(self, host, path, verbose=True): path = path.replace('/','\\') path = ntpath.normpath(path) filename = path.split('\\')[-1] share = path.split('\\')[0] path = path.replace(share, '') path = path.replace(filename, '') try: self.smbconn[host].deleteFile(share, path + filename) if verbose: success('[+] File successfully deleted: %s%s%s' % (share, path, filename)) except SessionError as e: if 'STATUS_ACCESS_DENIED' in str(e): if verbose: warning('[!] Error deleting file, access denied') elif 'STATUS_INVALID_PARAMETER' in str(e): if verbose: warning('[!] Error deleting file, invalid path') elif 'STATUS_SHARING_VIOLATION' in str(e): if verbose: warning('[!] Error retrieving file, sharing violation') else: warning('[!] Error deleting file %s%s%s, unknown error' % (share, path, filename)) warning('[!]', e) except Exception as e: warning('[!] Error deleting file %s%s%s, unknown error' % (share, path, filename)) warning('[!]', e)
def test_normpath(self): tester("ntpath.normpath('A//////././//.//B')", r'A\B') tester("ntpath.normpath('A/./B')", r'A\B') tester("ntpath.normpath('A/foo/../B')", r'A\B') tester("ntpath.normpath('C:A//B')", r'C:A\B') tester("ntpath.normpath('D:A/./B')", r'D:A\B') tester("ntpath.normpath('e:A/foo/../B')", r'e:A\B') tester("ntpath.normpath('C:///A//B')", r'C:\A\B') tester("ntpath.normpath('D:///A/./B')", r'D:\A\B') tester("ntpath.normpath('e:///A/foo/../B')", r'e:\A\B') tester("ntpath.normpath('..')", r'..') tester("ntpath.normpath('.')", r'.') tester("ntpath.normpath('')", r'.') tester("ntpath.normpath('/')", '\\') tester("ntpath.normpath('c:/')", 'c:\\') tester("ntpath.normpath('/../.././..')", '\\') tester("ntpath.normpath('c:/../../..')", 'c:\\') tester("ntpath.normpath('../.././..')", r'..\..\..') tester("ntpath.normpath('K:../.././..')", r'K:..\..\..') tester("ntpath.normpath('C:////a/b')", r'C:\a\b') tester("ntpath.normpath('//machine/share//a/b')", r'\\machine\share\a\b') tester("ntpath.normpath('\\\\.\\NUL')", r'\\.\NUL') tester("ntpath.normpath('\\\\?\\D:/XY\\Z')", r'\\?\D:/XY\Z')
def normalizePath(filepath): """ Returns normalized string representation of a given filepath >>> normalizePath('//var///log/apache.log') '//var/log/apache.log' """ retVal = filepath if retVal: retVal = retVal.strip("\r\n") retVal = ntpath.normpath(retVal) if isWindowsDriveLetterPath(retVal) else posixpath.normpath(retVal) return retVal
def _generateGUID(slnfile, name): """This generates a dummy GUID for the sln file to use. It is based on the MD5 signatures of the sln filename plus the name of the project. It basically just needs to be unique, and not change with each invocation.""" m = hashlib.md5() # Normalize the slnfile path to a Windows path (\ separators) so # the generated file has a consistent GUID even if we generate # it on a non-Windows platform. m.update(bytearray(ntpath.normpath(str(slnfile)) + str(name),'utf-8')) solution = m.hexdigest().upper() # convert most of the signature to GUID form (discard the rest) solution = "{" + solution[:8] + "-" + solution[8:12] + "-" + solution[12:16] + "-" + solution[16:20] + "-" + solution[20:32] + "}" return solution
def PrintSourceFiles(self): categories = {'Source Files': 'cpp|c|cxx|l|y|def|odl|idl|hpj|bat', 'Header Files': 'h|hpp|hxx|hm|inl', 'Local Headers': 'h|hpp|hxx|hm|inl', 'Resource Files': 'r|rc|ico|cur|bmp|dlg|rc2|rct|bin|cnt|rtf|gif|jpg|jpeg|jpe', 'Other Files': ''} for kind in sorted(list(categories.keys()), key=lambda a: a.lower()): if not self.sources[kind]: continue # skip empty groups self.file.write('# Begin Group "' + kind + '"\n\n') typelist = categories[kind].replace('|', ';') self.file.write('# PROP Default_Filter "' + typelist + '"\n') for file in self.sources[kind]: file = os.path.normpath(file) self.file.write('# Begin Source File\n\n' 'SOURCE="' + file + '"\n' '# End Source File\n') self.file.write('# End Group\n') # add the SConscript file outside of the groups self.file.write('# Begin Source File\n\n' 'SOURCE="' + str(self.sconscript) + '"\n' '# End Source File\n')
def _generateGUID(slnfile, name): """This generates a dummy GUID for the sln file to use. It is based on the MD5 signatures of the sln filename plus the name of the project. It basically just needs to be unique, and not change with each invocation.""" m = hashlib.md5() # Normalize the slnfile path to a Windows path (\ separators) so # the generated file has a consistent GUID even if we generate # it on a non-Windows platform. m.update(ntpath.normpath(str(slnfile)) + str(name)) solution = m.hexdigest().upper() # convert most of the signature to GUID form (discard the rest) solution = "{" + solution[:8] + "-" + solution[8:12] + "-" + solution[12:16] + "-" + solution[16:20] + "-" + solution[20:32] + "}" return solution
def PrintSourceFiles(self): categories = {'Source Files': 'cpp|c|cxx|l|y|def|odl|idl|hpj|bat', 'Header Files': 'h|hpp|hxx|hm|inl', 'Local Headers': 'h|hpp|hxx|hm|inl', 'Resource Files': 'r|rc|ico|cur|bmp|dlg|rc2|rct|bin|cnt|rtf|gif|jpg|jpeg|jpe', 'Other Files': ''} for kind in sorted(categories.keys(), key=lambda a: a.lower()): if not self.sources[kind]: continue # skip empty groups self.file.write('# Begin Group "' + kind + '"\n\n') typelist = categories[kind].replace('|', ';') self.file.write('# PROP Default_Filter "' + typelist + '"\n') for file in self.sources[kind]: file = os.path.normpath(file) self.file.write('# Begin Source File\n\n' 'SOURCE="' + file + '"\n' '# End Source File\n') self.file.write('# End Group\n') # add the SConscript file outside of the groups self.file.write('# Begin Source File\n\n' 'SOURCE="' + str(self.sconscript) + '"\n' '# End Source File\n')