我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用ntpath.join()。
def _file_path_fixup(self, path): ''' We have a pseudo-cwd that we use to base off all commands. This means we need to figure out if a given path is relative, absolute, or file relative and calculate against the pseudo cwd. This function takes in a given file path arguemnt and performs the fixups. ''' if (self._is_path_absolute(path)): return path elif (self._is_path_drive_relative(path)): return self.cwd[:2] + path else: return ntpath.join(self.cwd + '\\', path)
def do_connect(self, line): """ Command: connect Description: Connect to a sensor given the sensor ID or the sensor hostname. Args: connect SENSOR_ID | SENSOR_HOSTNAME """ if not line: raise CliArgsException("Need argument: sensor ID or hostname") sensor = self.connect_callback(self.cb, line) self.lr_session = sensor.lr_session() print("Session: {0}".format(self.lr_session.session_id)) print(" Available Drives: %s" % ' '.join(self.lr_session.session_data.get('drives', []))) # look up supported commands print(" Supported Commands: %s" % ', '.join(self.lr_session.session_data.get('supported_commands', []))) print(" Working Directory: %s" % self.cwd) log.info("Attached to sensor {0}".format(sensor._model_unique_id))
def getInterface(self, interface, resp): # Now let's parse the answer and build an Interface instance objRefType = OBJREF(''.join(resp))['flags'] objRef = None if objRefType == FLAGS_OBJREF_CUSTOM: objRef = OBJREF_CUSTOM(''.join(resp)) elif objRefType == FLAGS_OBJREF_HANDLER: objRef = OBJREF_HANDLER(''.join(resp)) elif objRefType == FLAGS_OBJREF_STANDARD: objRef = OBJREF_STANDARD(''.join(resp)) elif objRefType == FLAGS_OBJREF_EXTENDED: objRef = OBJREF_EXTENDED(''.join(resp)) else: logging.error("Unknown OBJREF Type! 0x%x" % objRefType) return IRemUnknown2( INTERFACE(interface.get_cinstance(), None, interface.get_ipidRemUnknown(), objRef['std']['ipid'], oxid=objRef['std']['oxid'], oid=objRef['std']['oxid'], target=interface.get_target()))
def do_put(self, s): try: params = s.split(' ') if len(params) > 1: src_path = params[0] dst_path = params[1] elif len(params) == 1: src_path = params[0] dst_path = '' src_file = os.path.basename(src_path) fh = open(src_path, 'rb') dst_path = string.replace(dst_path, '/','\\') import ntpath pathname = ntpath.join(ntpath.join(self.__pwd,dst_path), src_file) drive, tail = ntpath.splitdrive(pathname) logging.info("Uploading %s to %s" % (src_file, pathname)) self.__transferClient.putFile(drive[:-1]+'$', tail, fh.read) fh.close() except Exception, e: logging.critical(str(e)) pass
def hexdump(data): x=str(data) strLen = len(x) i = 0 while i < strLen: print "%04x " % i, for j in range(16): if i+j < strLen: print "%02X" % ord(x[i+j]), else: print " ", if j%16 == 7: print "", print " ", print ''.join(pretty_print(x) for x in x[i:i+16] ) i += 16 # Reserved/fixed MFTs
def do_cat(self, line, command = sys.stdout.write): pathName = string.replace(line,'/','\\') pathName = ntpath.normpath(ntpath.join(self.pwd,pathName)) res = self.findPathName(pathName) if res is None: logging.error("Not found!") return if res.isDirectory() > 0: logging.error("It's a directory!") return if res.isCompressed() or res.isEncrypted() or res.isSparse(): logging.error('Cannot handle compressed/encrypted/sparse files! :(') return stream = res.getStream(None) chunks = 4096*10 written = 0 for i in range(stream.getDataSize()/chunks): buf = stream.read(i*chunks, chunks) written += len(buf) command(buf) if stream.getDataSize() % chunks: buf = stream.read(written, stream.getDataSize() % chunks) command(buf) logging.info("%d bytes read" % stream.getDataSize())
def __executeRemote(self, data): self.__tmpServiceName = ''.join([random.choice(string.letters) for _ in range(8)]).encode('utf-16le') command = self.__shell + 'echo ' + data + ' ^> ' + self.__output + ' > ' + self.__batchFile + ' & ' + \ self.__shell + self.__batchFile command += ' & ' + 'del ' + self.__batchFile self.__serviceDeleted = False resp = scmr.hRCreateServiceW(self.__scmr, self.__scManagerHandle, self.__tmpServiceName, self.__tmpServiceName, lpBinaryPathName=command) service = resp['lpServiceHandle'] try: scmr.hRStartServiceW(self.__scmr, service) except: pass scmr.hRDeleteService(self.__scmr, service) self.__serviceDeleted = True scmr.hRCloseServiceHandle(self.__scmr, service)
def transformKey(self, InputKey): # Section 2.2.11.1.2 Encrypting a 64-Bit Block with a 7-Byte Key OutputKey = [] OutputKey.append( chr(ord(InputKey[0]) >> 0x01) ) OutputKey.append( chr(((ord(InputKey[0])&0x01)<<6) | (ord(InputKey[1])>>2)) ) OutputKey.append( chr(((ord(InputKey[1])&0x03)<<5) | (ord(InputKey[2])>>3)) ) OutputKey.append( chr(((ord(InputKey[2])&0x07)<<4) | (ord(InputKey[3])>>4)) ) OutputKey.append( chr(((ord(InputKey[3])&0x0F)<<3) | (ord(InputKey[4])>>5)) ) OutputKey.append( chr(((ord(InputKey[4])&0x1F)<<2) | (ord(InputKey[5])>>6)) ) OutputKey.append( chr(((ord(InputKey[5])&0x3F)<<1) | (ord(InputKey[6])>>7)) ) OutputKey.append( chr(ord(InputKey[6]) & 0x7F) ) for i in range(8): OutputKey[i] = chr((ord(OutputKey[i]) << 1) & 0xfe) return "".join(OutputKey)
def upload_file(self, host, src, dst): dst = string.replace(dst,'/','\\') dst = ntpath.normpath(dst) dst = dst.split('\\') share = dst[0] dst = '\\'.join(dst[1:]) if os.path.exists(src): color('[+] Starting upload: %s (%s bytes)' % (src, os.path.getsize(src))) upFile = open(src, 'rb') try: self.smbconn[host].putFile(share, dst, upFile.read) color('[+] Upload complete' ) except Exception as e: color('[!]', e) color('[!] Error uploading file, you need to include destination file name in the path') upFile.close() else: color('[!] Invalid source. File does not exist') sys.exit()
def _CreateMSVSUserFile(proj_path, version, spec): """Generates a .user file for the user running this Gyp program. Arguments: proj_path: The path of the project file being created. The .user file shares the same path (with an appropriate suffix). version: The VisualStudioVersion object. spec: The target dictionary containing the properties of the target. Returns: The MSVSUserFile object created. """ (domain, username) = _GetDomainAndUserName() vcuser_filename = '.'.join([proj_path, domain, username, 'user']) user_file = MSVSUserFile.Writer(vcuser_filename, version, spec['target_name']) return user_file
def _GetDefines(config): """Returns the list of preprocessor definitions for this configuation. Arguments: config: The dictionary that defines the special processing to be done for this configuration. Returns: The list of preprocessor definitions. """ defines = [] for d in config.get('defines', []): if type(d) == list: fd = '='.join([str(dpart) for dpart in d]) else: fd = str(d) defines.append(fd) return defines
def _GetMSVSAttributes(spec, config, config_type): # Prepare configuration attributes. prepared_attrs = {} source_attrs = config.get('msvs_configuration_attributes', {}) for a in source_attrs: prepared_attrs[a] = source_attrs[a] # Add props files. vsprops_dirs = config.get('msvs_props', []) vsprops_dirs = _FixPaths(vsprops_dirs) if vsprops_dirs: prepared_attrs['InheritedPropertySheets'] = ';'.join(vsprops_dirs) # Set configuration type. prepared_attrs['ConfigurationType'] = config_type output_dir = prepared_attrs.get('OutputDirectory', '$(SolutionDir)$(ConfigurationName)') prepared_attrs['OutputDirectory'] = _FixPath(output_dir) + '\\' if 'IntermediateDirectory' not in prepared_attrs: intermediate = '$(ConfigurationName)\\obj\\$(ProjectName)' prepared_attrs['IntermediateDirectory'] = _FixPath(intermediate) + '\\' else: intermediate = _FixPath(prepared_attrs['IntermediateDirectory']) + '\\' intermediate = MSVSSettings.FixVCMacroSlashes(intermediate) prepared_attrs['IntermediateDirectory'] = intermediate return prepared_attrs
def _GetCopies(spec): copies = [] # Add copies. for cpy in spec.get('copies', []): for src in cpy.get('files', []): dst = os.path.join(cpy['destination'], os.path.basename(src)) # _AddCustomBuildToolForMSVS() will call _FixPath() on the inputs and # outputs, so do the same for our generated command line. if src.endswith('/'): src_bare = src[:-1] base_dir = posixpath.split(src_bare)[0] outer_dir = posixpath.split(src_bare)[1] cmd = 'cd "%s" && xcopy /e /f /y "%s" "%s\\%s\\"' % ( _FixPath(base_dir), outer_dir, _FixPath(dst), outer_dir) copies.append(([src], ['dummy_copies', dst], cmd, 'Copying %s to %s' % (src, dst))) else: cmd = 'mkdir "%s" 2>nul & set ERRORLEVEL=0 & copy /Y "%s" "%s"' % ( _FixPath(cpy['destination']), _FixPath(src), _FixPath(dst)) copies.append(([src], [dst], cmd, 'Copying %s to %s' % (src, dst))) return copies
def _DictsToFolders(base_path, bucket, flat): # Convert to folders recursively. children = [] for folder, contents in bucket.iteritems(): if type(contents) == dict: folder_children = _DictsToFolders(os.path.join(base_path, folder), contents, flat) if flat: children += folder_children else: folder_children = MSVSNew.MSVSFolder(os.path.join(base_path, folder), name='(' + folder + ')', entries=folder_children) children.append(folder_children) else: children.append(contents) return children
def _GetPathOfProject(qualified_target, spec, options, msvs_version): default_config = _GetDefaultConfiguration(spec) proj_filename = default_config.get('msvs_existing_vcproj') if not proj_filename: proj_filename = (spec['target_name'] + options.suffix + msvs_version.ProjectExtension()) build_file = gyp.common.BuildFile(qualified_target) proj_path = os.path.join(os.path.dirname(build_file), proj_filename) fix_prefix = None if options.generator_output: project_dir_path = os.path.dirname(os.path.abspath(proj_path)) proj_path = os.path.join(options.generator_output, proj_path) fix_prefix = gyp.common.RelativePath(project_dir_path, os.path.dirname(proj_path)) return proj_path, fix_prefix
def _VerifySourcesExist(sources, root_dir): """Verifies that all source files exist on disk. Checks that all regular source files, i.e. not created at run time, exist on disk. Missing files cause needless recompilation but no otherwise visible errors. Arguments: sources: A recursive list of Filter/file names. root_dir: The root directory for the relative path names. Returns: A list of source files that cannot be found on disk. """ missing_sources = [] for source in sources: if isinstance(source, MSVSProject.Filter): missing_sources.extend(_VerifySourcesExist(source.contents, root_dir)) else: if '$' not in source: full_path = os.path.join(root_dir, source) if not os.path.exists(full_path): missing_sources.append(full_path) return missing_sources
def _AddMSBuildAction(spec, primary_input, inputs, outputs, cmd, description, sources_handled_by_action, actions_spec): command = MSVSSettings.ConvertVCMacrosToMSBuild(cmd) primary_input = _FixPath(primary_input) inputs_array = _FixPaths(inputs) outputs_array = _FixPaths(outputs) additional_inputs = ';'.join([i for i in inputs_array if i != primary_input]) outputs = ';'.join(outputs_array) sources_handled_by_action.add(primary_input) action_spec = ['CustomBuild', {'Include': primary_input}] action_spec.extend( # TODO(jeanluc) 'Document' for all or just if as_sources? [['FileType', 'Document'], ['Command', command], ['Message', description], ['Outputs', outputs] ]) if additional_inputs: action_spec.append(['AdditionalInputs', additional_inputs]) actions_spec.append(action_spec)
def argv_to_cmdline(argv): """ Convert a list of arguments to a single command line string. @type argv: list( str ) @param argv: List of argument strings. The first element is the program to execute. @rtype: str @return: Command line string. """ cmdline = list() for token in argv: if not token: token = '""' else: if '"' in token: token = token.replace('"', '\\"') if ' ' in token or \ '\t' in token or \ '\n' in token or \ '\r' in token: token = '"%s"' % token cmdline.append(token) return ' '.join(cmdline)
def get_explorer_pid(self): """ Tries to find the process ID for "explorer.exe". @rtype: int or None @return: Returns the process ID, or C{None} on error. """ try: exp = win32.SHGetFolderPath(win32.CSIDL_WINDOWS) except Exception: exp = None if not exp: exp = getenv('SystemRoot') if exp: exp = ntpath.join(exp, 'explorer.exe') exp_list = self.find_processes_by_filename(exp) if exp_list: return exp_list[0][0].get_pid() return None #------------------------------------------------------------------------------ # XXX this methods musn't end up calling __initialize_snapshot by accident!
def _execvpe(file, args, env=None): if env is not None: exec_func = execve argrest = (args, env) else: exec_func = execv argrest = (args,) env = environ head, tail = path.split(file) if head: exec_func(file, *argrest) return last_exc = saved_exc = None saved_tb = None path_list = get_exec_path(env) if name != 'nt': file = fsencode(file) path_list = map(fsencode, path_list) for dir in path_list: fullname = path.join(dir, file) try: exec_func(fullname, *argrest) except OSError as e: last_exc = e tb = sys.exc_info()[2] if (e.errno != errno.ENOENT and e.errno != errno.ENOTDIR and saved_exc is None): saved_exc = e saved_tb = tb if saved_exc: raise saved_exc.with_traceback(saved_tb) raise last_exc.with_traceback(tb)
def __repr__(self): return 'environ({{{}}})'.format(', '.join( ('{!r}: {!r}'.format(self.decodekey(key), self.decodevalue(value)) for key, value in self._data.items())))
def _execvpe(file, args, env=None): if env is not None: func = execve argrest = (args, env) else: func = execv argrest = (args,) env = environ head, tail = path.split(file) if head: func(file, *argrest) return if 'PATH' in env: envpath = env['PATH'] else: envpath = defpath PATH = envpath.split(pathsep) saved_exc = None saved_tb = None for dir in PATH: fullname = path.join(dir, file) try: func(fullname, *argrest) except error, e: tb = sys.exc_info()[2] if (e.errno != errno.ENOENT and e.errno != errno.ENOTDIR and saved_exc is None): saved_exc = e saved_tb = tb if saved_exc: raise error, saved_exc, saved_tb raise error, e, tb # Change environ to automatically call putenv() if it exists
def split_cli(line): ''' we'd like to use shlex.split() but that doesn't work well for windows types of things. for now we'll take the easy approach and just split on space. We'll then cycle through and look for leading quotes and join those lines ''' parts = line.split(' ') final = [] inQuotes = False while len(parts) > 0: tok = parts.pop(0) if (tok[:1] == '"'): tok = tok[1:] next = parts.pop(0) while(next[-1:] != '"' and len(parts) > 0): tok += ' ' + next next = parts.pop(0) if (next[-1:] == '"'): tok += ' ' + next[:-1] final.append(tok) return final
def run(self, args): self.rep = os.path.join("data", "downloads", self.client.short_name(), "creds") try: os.makedirs(self.rep) except Exception: pass if self.client.is_windows(): self.windows() else: self.linux()
def dump(self, src, length=8): FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)]) N=0; result='' while src: s,src = src[:length],src[length:] hexa = ' '.join(["%02X"%ord(x) for x in s]) s = s.translate(FILTER) result += "%04X %-*s %s\n" % (N, length*3, hexa, s) N+=length return result
def do_get(self, src_path): try: import ntpath newPath = ntpath.normpath(ntpath.join(self.__pwd, src_path)) drive, tail = ntpath.splitdrive(newPath) filename = ntpath.basename(tail) fh = open(filename,'wb') logging.info("Downloading %s\\%s" % (drive, tail)) self.__transferClient.getFile(drive[:-1]+'$', tail, fh.write) fh.close() except Exception, e: logging.error(str(e)) os.remove(filename) pass
def do_cd(self, s): self.execute_remote('cd ' + s) if len(self.__outputBuffer.strip('\r\n')) > 0: print self.__outputBuffer self.__outputBuffer = '' else: self.__pwd = ntpath.normpath(ntpath.join(self.__pwd, s)) self.execute_remote('cd ') self.__pwd = self.__outputBuffer.strip('\r\n') self.prompt = self.__pwd + '>' self.__outputBuffer = ''