def test_cmdline(self): sys_value = re.sub(' +', ' ', win32api.GetCommandLine()).strip() psutil_value = ' '.join(psutil.Process().cmdline()) self.assertEqual(sys_value, psutil_value) # XXX - occasional failures # def test_cpu_times(self): # handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, # win32con.FALSE, os.getpid()) # self.addCleanup(win32api.CloseHandle, handle) # sys_value = win32process.GetProcessTimes(handle) # psutil_value = psutil.Process().cpu_times() # self.assertAlmostEqual( # psutil_value.user, sys_value['UserTime'] / 10000000.0, # delta=0.2) # self.assertAlmostEqual( # psutil_value.user, sys_value['KernelTime'] / 10000000.0, # delta=0.2)
def test_io_counters(self): handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, os.getpid()) self.addCleanup(win32api.CloseHandle, handle) sys_value = win32process.GetProcessIoCounters(handle) psutil_value = psutil.Process().io_counters() self.assertEqual( psutil_value.read_count, sys_value['ReadOperationCount']) self.assertEqual( psutil_value.write_count, sys_value['WriteOperationCount']) self.assertEqual( psutil_value.read_bytes, sys_value['ReadTransferCount']) self.assertEqual( psutil_value.write_bytes, sys_value['WriteTransferCount']) self.assertEqual( psutil_value.other_count, sys_value['OtherOperationCount']) self.assertEqual( psutil_value.other_bytes, sys_value['OtherTransferCount'])
def get_process_privileges(pid): try: #????id?????????? hproc = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,False,pid) #??????? htok = win32security.OpenProcessToken(hproc,win32con.TOKEN_QUERY) #???????????????????? privs = win32security.GetTokenInformation(htok, win32security. TokenPrivileges) #?????????i[1] == 3???????? priv_list = "" for i in privs: if i[1] == 3: #?????? priv_list += "%s|" % win32security.LookupPrivilegeName(None,i[0]) except: priv_list = "N/A" return priv_lis #????
def is_running (pid, cmd = None): if cmd is None: cmd = os.path.split (sys.argv [0])[1] if os.name == "nt": import win32process, win32api, win32con, pywintypes HAS_WMI = True try: import wmi except ImportError: HAS_WMI = False if pid not in win32process.EnumProcesses (): return False if HAS_WMI: cl = [p.CommandLine for p in wmi.WMI ().Win32_Process () if p.ProcessID == pid] if cl and cl [0].find (cmd) != -1: return True return False else: try: handle = win32api.OpenProcess (win32con.PROCESS_QUERY_INFORMATION | win32con.PROCESS_VM_READ, 0, int (pid)) exefilename = win32process.GetModuleFileNameEx (handle, 0) win32process.GetStartupInfo() if exefilename.lower ().find ("python.exe") != -1 or exefilename.lower ().find ("cmd.exe") != -1: return True except pywintypes.error: # Windows service, Access is denied return False else: proc = "/proc/%s/cmdline" % pid if not os.path.isfile (proc): return False with open (proc) as f: exefilename = f.read () if exefilename.find (cmd) != -1: return True return False
def test_num_handles(self): p = psutil.Process(os.getpid()) before = p.num_handles() handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, os.getpid()) after = p.num_handles() self.assertEqual(after, before + 1) win32api.CloseHandle(handle) self.assertEqual(p.num_handles(), before)
def check_processes(): pids = win32process.EnumProcesses() # TODO also check out WMI. It might not be running, but it could help if it is: # http://groups.google.com/group/comp.lang.python/browse_thread/thread/1f50065064173ccb # TODO process explorer can find quite a lot more information than this script. This script has several problems: # TODO I can't open 64-bit processes for a 32-bit app. I get this error: # ERROR: can't open 6100: 299 EnumProcessModules, Only part of a ReadProcessMemory # or WriteProcessMemory request was completed. # TODO I can't seem to get the name of elevated processes (user running as me, but with admin privs) # TODO I can't get details of certain processes runnign as SYSTEM on xp (e.g. pid 4 "system", csrss.exe) # TODO should be able to find name (and threads?) for all processes. Not necessarily path. for pid in sorted(pids): # TODO there's a security descriptor for each process accessible via GetSecurityInfo according to http://msdn.microsoft.com/en-us/library/ms684880%28VS.85%29.aspx # TODO could we connect with PROCESS_QUERY_LIMITED_INFORMATION instead on Vista+ try: ph = win32api.OpenProcess(win32con.PROCESS_VM_READ | win32con.PROCESS_QUERY_INFORMATION , False, pid) except: # print "ERROR: can't connected to PID " + str(pid) sys.stdout.write("?") continue else: user = "unknown\\unknown" try: tokenh = win32security.OpenProcessToken(ph, win32con.TOKEN_QUERY) except: pass else: sidObj, intVal = win32security.GetTokenInformation(tokenh, TokenUser) #source = win32security.GetTokenInformation(tokenh, TokenSource) if sidObj: accountName, domainName, accountTypeInt = win32security.LookupAccountSid(remote_server, sidObj) # print "pid=%d accountname=%s domainname=%s wow64=%s" % (pid, accountName, domainName, win32process.IsWow64Process(ph)) user = domainName + "\\" + accountName # print "PID %d is running as %s" % (pid, user) sys.stdout.write(".") try: mhs = win32process.EnumProcessModules(ph) # print mhs except: continue mhs = list(mhs) exe = win32process.GetModuleFileNameEx(ph, mhs.pop(0)) weak_perms = check_weak_write_perms(exe, 'file') # print_weak_perms("PID " + str(pid) + " running as " + user + ":", weak_perms) if weak_perms: save_issue("WPC016", "weak_perms_exes", weak_perms) sys.stdout.write("!") for mh in mhs: # print "PID %d (%s) has loaded module: %s" % (pid, exe, win32process.GetModuleFileNameEx(ph, mh)) dll = win32process.GetModuleFileNameEx(ph, mh) weak_perms = check_weak_write_perms(dll, 'file') # print_weak_perms("DLL used by PID " + str(pid) + " running as " + user + " (" + exe + "):", weak_perms) if weak_perms: save_issue("WPC016", "weak_perms_dlls", weak_perms) sys.stdout.write("!") print
def test_num_handles_increment(self): p = psutil.Process(os.getpid()) before = p.num_handles() handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, os.getpid()) after = p.num_handles() self.assertEqual(after, before + 1) win32api.CloseHandle(handle) self.assertEqual(p.num_handles(), before)
def test_nice(self): handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, os.getpid()) self.addCleanup(win32api.CloseHandle, handle) sys_value = win32process.GetPriorityClass(handle) psutil_value = psutil.Process().nice() self.assertEqual(psutil_value, sys_value)
def test_memory_info(self): handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, self.pid) self.addCleanup(win32api.CloseHandle, handle) sys_value = win32process.GetProcessMemoryInfo(handle) psutil_value = psutil.Process(self.pid).memory_info() self.assertEqual( sys_value['PeakWorkingSetSize'], psutil_value.peak_wset) self.assertEqual( sys_value['WorkingSetSize'], psutil_value.wset) self.assertEqual( sys_value['QuotaPeakPagedPoolUsage'], psutil_value.peak_paged_pool) self.assertEqual( sys_value['QuotaPagedPoolUsage'], psutil_value.paged_pool) self.assertEqual( sys_value['QuotaPeakNonPagedPoolUsage'], psutil_value.peak_nonpaged_pool) self.assertEqual( sys_value['QuotaNonPagedPoolUsage'], psutil_value.nonpaged_pool) self.assertEqual( sys_value['PagefileUsage'], psutil_value.pagefile) self.assertEqual( sys_value['PeakPagefileUsage'], psutil_value.peak_pagefile) self.assertEqual(psutil_value.rss, psutil_value.wset) self.assertEqual(psutil_value.vms, psutil_value.pagefile)
def test_wait(self): handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, self.pid) self.addCleanup(win32api.CloseHandle, handle) p = psutil.Process(self.pid) p.terminate() psutil_value = p.wait() sys_value = win32process.GetExitCodeProcess(handle) self.assertEqual(psutil_value, sys_value)
def test_num_handles(self): import ctypes import ctypes.wintypes PROCESS_QUERY_INFORMATION = 0x400 handle = ctypes.windll.kernel32.OpenProcess( PROCESS_QUERY_INFORMATION, 0, os.getpid()) self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle) hndcnt = ctypes.wintypes.DWORD() ctypes.windll.kernel32.GetProcessHandleCount( handle, ctypes.byref(hndcnt)) sys_value = hndcnt.value psutil_value = psutil.Process().num_handles() ctypes.windll.kernel32.CloseHandle(handle) self.assertEqual(psutil_value, sys_value + 1)