我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用win32api.CloseHandle()。
def GetDomainName(): try: tok = win32security.OpenThreadToken(win32api.GetCurrentThread(), TOKEN_QUERY, 1) except win32api.error, details: if details[0] != winerror.ERROR_NO_TOKEN: raise # attempt to open the process token, since no thread token # exists tok = win32security.OpenProcessToken(win32api.GetCurrentProcess(), TOKEN_QUERY) sid, attr = win32security.GetTokenInformation(tok, TokenUser) win32api.CloseHandle(tok) name, dom, typ = win32security.LookupAccountSid(None, sid) return dom
def run_elevated(command, args, wait=True): """Run the given command as an elevated user and wait for it to return""" ret = 1 if command.find(' ') > -1: command = '"' + command + '"' if platform.system() == 'Windows': import win32api import win32con import win32event import win32process from win32com.shell.shell import ShellExecuteEx from win32com.shell import shellcon logging.debug(command + ' ' + args) process_info = ShellExecuteEx(nShow=win32con.SW_HIDE, fMask=shellcon.SEE_MASK_NOCLOSEPROCESS, lpVerb='runas', lpFile=command, lpParameters=args) if wait: win32event.WaitForSingleObject(process_info['hProcess'], 600000) ret = win32process.GetExitCodeProcess(process_info['hProcess']) win32api.CloseHandle(process_info['hProcess']) else: ret = process_info else: logging.debug('sudo ' + command + ' ' + args) ret = subprocess.call('sudo ' + command + ' ' + args, shell=True) return ret
def subprocess_terminate( proc ) : try: proc.terminate() except AttributeError: print " no terminate method to Popen.." try: import signal os.kill( proc.pid , signal.SIGTERM) except AttributeError: print " no os.kill, using win32api.." try: import win32api PROCESS_TERMINATE = 1 handle = win32api.OpenProcess( PROCESS_TERMINATE, False, proc.pid) win32api.TerminateProcess(handle,-1) win32api.CloseHandle(handle) except ImportError: print " ERROR: could not terminate process."
def testCleanup1(self): # We used to clobber all outstanding exceptions. def f1(invalidate): import win32event h = win32event.CreateEvent(None, 0, 0, None) if invalidate: win32api.CloseHandle(int(h)) 1/0 # If we invalidated, then the object destruction code will attempt # to close an invalid handle. We don't wan't an exception in # this case def f2(invalidate): """ This function should throw an IOError. """ try: f1(invalidate) except ZeroDivisionError, exc: raise IOError("raise 2") self.assertRaises(IOError, f2, False) # Now do it again, but so the auto object destruction # actually fails. self.assertRaises(IOError, f2, True)
def GetDomainName(): try: tok = win32security.OpenThreadToken(win32api.GetCurrentThread(), TOKEN_QUERY, 1) except win32api.error as details: if details[0] != winerror.ERROR_NO_TOKEN: raise # attempt to open the process token, since no thread token # exists tok = win32security.OpenProcessToken(win32api.GetCurrentProcess(), TOKEN_QUERY) sid, attr = win32security.GetTokenInformation(tok, TokenUser) win32api.CloseHandle(tok) name, dom, typ = win32security.LookupAccountSid(None, sid) return dom
def testCleanup1(self): # We used to clobber all outstanding exceptions. def f1(invalidate): import win32event h = win32event.CreateEvent(None, 0, 0, None) if invalidate: win32api.CloseHandle(int(h)) 1/0 # If we invalidated, then the object destruction code will attempt # to close an invalid handle. We don't wan't an exception in # this case def f2(invalidate): """ This function should throw an IOError. """ try: f1(invalidate) except ZeroDivisionError as exc: raise IOError("raise 2") self.assertRaises(IOError, f2, False) # Now do it again, but so the auto object destruction # actually fails. self.assertRaises(IOError, f2, True)
def _closeStdin(self): if hasattr(self, "hChildStdinWr"): win32file.CloseHandle(self.hChildStdinWr) del self.hChildStdinWr self.closingStdin = False self.closedStdin = True
def closeStderr(self): if hasattr(self, "hChildStderrRd"): win32file.CloseHandle(self.hChildStderrRd) del self.hChildStderrRd self.closedStderr = True self.connectionLostNotify()
def closeStdout(self): if hasattr(self, "hChildStdoutRd"): win32file.CloseHandle(self.hChildStdoutRd) del self.hChildStdoutRd self.closedStdout = True self.connectionLostNotify()
def close(self): try: win32api.CloseHandle(self.pipe) except pywintypes.error: # You can't close std handles...? pass
def writeConnectionLost(self): self.deactivate() try: win32api.CloseHandle(self.writePipe) except pywintypes.error: # OMG what pass self.lostCallback()
def wait_for_elevated_process(process_info): if platform.system() == 'Windows' and 'hProcess' in process_info: import win32api import win32con import win32event import win32process win32event.WaitForSingleObject(process_info['hProcess'], 600000) ret = win32process.GetExitCodeProcess(process_info['hProcess']) win32api.CloseHandle(process_info['hProcess']) return ret # pylint: enable=E0611,E0401 # pylint: disable=E1101
def __del__(self): if self.locked: self.release() win32api.CloseHandle(self.handle)
def killProcName(procname): # Change suggested by Dan Knierim, who found that this performed a # "refresh", allowing us to kill processes created since this was run # for the first time. try: win32pdhutil.GetPerformanceAttributes('Process','ID Process',procname) except: pass pids = win32pdhutil.FindPerformanceAttributesByName(procname) # If _my_ pid in there, remove it! try: pids.remove(win32api.GetCurrentProcessId()) except ValueError: pass if len(pids)==0: result = "Can't find %s" % procname elif len(pids)>1: result = "Found too many %s's - pids=`%s`" % (procname,pids) else: handle = win32api.OpenProcess(win32con.PROCESS_TERMINATE, 0,pids[0]) win32api.TerminateProcess(handle,0) win32api.CloseHandle(handle) result = "" return result
def testCleanup2(self): # Cause an exception during object destruction. # The worst this does is cause an ".XXX undetected error (why=3)" # So avoiding that is the goal import win32event h = win32event.CreateEvent(None, 0, 0, None) # Close the handle underneath the object. win32api.CloseHandle(int(h)) # Object destructor runs with the implicit close failing h = None
def testCleanup3(self): # And again with a class - no __del__ import win32event class Test: def __init__(self): self.h = win32event.CreateEvent(None, 0, 0, None) win32api.CloseHandle(int(self.h)) t=Test() t = None
def _getInvalidHandleException(self): try: win32api.CloseHandle(1) except win32api.error, exc: return exc self.fail("Didn't get invalid-handle exception.")
def testSimple(self): self.assertRaises(pywintypes.error, win32api.CloseHandle, 1)
def testFuncIndex(self): exc = self._getInvalidHandleException() self._testExceptionIndex(exc, 1, "CloseHandle")
def testUnpack(self): try: win32api.CloseHandle(1) self.fail("expected exception!") except win32api.error, exc: self.failUnlessEqual(exc.winerror, winerror.ERROR_INVALID_HANDLE) self.failUnlessEqual(exc.funcname, "CloseHandle") expected_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip() self.failUnlessEqual(exc.strerror, expected_msg)
def testAsStr(self): exc = self._getInvalidHandleException() err_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip() # early on the result actually *was* a tuple - it must always look like one err_tuple = (winerror.ERROR_INVALID_HANDLE, 'CloseHandle', err_msg) self.failUnlessEqual(str(exc), str(err_tuple))
def testAttributes(self): exc = self._getInvalidHandleException() err_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip() self.failUnlessEqual(exc.winerror, winerror.ERROR_INVALID_HANDLE) self.failUnlessEqual(exc.strerror, err_msg) self.failUnlessEqual(exc.funcname, 'CloseHandle') # some tests for 'insane' args.
def _getInvalidHandleException(self): try: win32api.CloseHandle(1) except win32api.error as exc: return exc self.fail("Didn't get invalid-handle exception.")
def testUnpack(self): try: win32api.CloseHandle(1) self.fail("expected exception!") except win32api.error as exc: self.failUnlessEqual(exc.winerror, winerror.ERROR_INVALID_HANDLE) self.failUnlessEqual(exc.funcname, "CloseHandle") expected_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip() self.failUnlessEqual(exc.strerror, expected_msg)
def logout(self, session): """Delete session of it exists.""" if self.is_session_valid(session): sessionInfo = self.sessions[session] win32api.CloseHandle(sessionInfo[3]) del self.sessions[session]
def init_acls(): # A process that tries to read or write a SACL needs # to have and enable the SE_SECURITY_NAME privilege. # And inorder to backup/restore, the SE_BACKUP_NAME and # SE_RESTORE_NAME privileges are needed. import win32api try: hnd = OpenProcessToken(win32api.GetCurrentProcess(), TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY) except win32api.error, exc: log.Log("Warning: unable to open Windows process token: %s" % exc, 5) return try: try: lpv = lambda priv: LookupPrivilegeValue(None, priv) # enable the SE_*_NAME privileges SecurityName = lpv(SE_SECURITY_NAME) AdjustTokenPrivileges(hnd, False, [ (SecurityName, SE_PRIVILEGE_ENABLED), (lpv(SE_BACKUP_NAME), SE_PRIVILEGE_ENABLED), (lpv(SE_RESTORE_NAME), SE_PRIVILEGE_ENABLED) ]) except win32api.error, exc: log.Log("Warning: unable to enable SE_*_NAME privileges: %s" % exc, 5) return for name, enabled in GetTokenInformation(hnd, TokenPrivileges): if name == SecurityName and enabled: # now we *may* access the SACL (sigh) ACL.flags |= SACL_SECURITY_INFORMATION break finally: win32api.CloseHandle(hnd)
def _readListViewItems(hwnd, column_index=0): # Allocate virtual memory inside target process pid = ctypes.create_string_buffer(4) p_pid = ctypes.addressof(pid) GetWindowThreadProcessId(hwnd, p_pid) # process owning the given hwnd hProcHnd = OpenProcess(win32con.PROCESS_ALL_ACCESS, False, struct.unpack("i", pid)[0]) pLVI = VirtualAllocEx(hProcHnd, 0, 4096, win32con.MEM_RESERVE | win32con.MEM_COMMIT, win32con.PAGE_READWRITE) pBuffer = VirtualAllocEx(hProcHnd, 0, 4096, win32con.MEM_RESERVE | win32con.MEM_COMMIT, win32con.PAGE_READWRITE) # Prepare an LVITEM record and write it to target process memory lvitem_str = struct.pack('iiiiiiiii', *[0, 0, column_index, 0, 0, pBuffer, 4096, 0, 0]) lvitem_buffer = ctypes.create_string_buffer(lvitem_str) copied = ctypes.create_string_buffer(4) p_copied = ctypes.addressof(copied) WriteProcessMemory(hProcHnd, pLVI, ctypes.addressof(lvitem_buffer), ctypes.sizeof(lvitem_buffer), p_copied) # iterate items in the SysListView32 control num_items = win32gui.SendMessage(hwnd, commctrl.LVM_GETITEMCOUNT) item_texts = [] for item_index in range(num_items): win32gui.SendMessage(hwnd, commctrl.LVM_GETITEMTEXT, item_index, pLVI) target_buff = ctypes.create_string_buffer(4096) ReadProcessMemory(hProcHnd, pBuffer, ctypes.addressof(target_buff), 4096, p_copied) item_texts.append(target_buff.value) VirtualFreeEx(hProcHnd, pBuffer, 0, win32con.MEM_RELEASE) VirtualFreeEx(hProcHnd, pLVI, 0, win32con.MEM_RELEASE) win32api.CloseHandle(hProcHnd) return item_texts
def killPID(pid, sig=None): """Kill the process with the given pid.""" try: if sig is None: from signal import SIGTERM sig = SIGTERM os.kill(pid, sig) except (AttributeError, ImportError): if win32api: handle = win32api.OpenProcess(1, False, pid) win32api.TerminateProcess(handle, -1) win32api.CloseHandle(handle)
def clear_instance_check_event(self): '''Close handle created by CreateEvent''' if self.SICHECK_EVENT is not None: win32api.CloseHandle(self.SICHECK_EVENT)
def check_pids(curmir_incs): """Check PIDs in curmir markers to make sure rdiff-backup not running""" pid_re = re.compile("^PID\s*([0-9]+)", re.I | re.M) def extract_pid(curmir_rp): """Return process ID from a current mirror marker, if any""" match = pid_re.search(curmir_rp.get_data()) if not match: return None else: return int(match.group(1)) def pid_running(pid): """True if we know if process with pid is currently running""" try: os.kill(pid, 0) except OSError, exc: if exc[0] == errno.ESRCH: return 0 else: log.Log("Warning: unable to check if PID %d still running" % (pid,), 2) except AttributeError: assert os.name == 'nt' import win32api, win32con, pywintypes process = None try: process = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, 0, pid) except pywintypes.error, error: if error[0] == 87: return 0 else: msg = "Warning: unable to check if PID %d still running" log.Log(msg % pid, 2) if process: win32api.CloseHandle(process) return 1 return 0 return 1 for curmir_rp in curmir_incs: assert Globals.local_connection is curmir_rp.conn pid = extract_pid(curmir_rp) if pid is not None and pid_running(pid): log.Log.FatalError( """It appears that a previous rdiff-backup session with process id %d is still running. If two different rdiff-backup processes write the same repository simultaneously, data corruption will probably result. To proceed with regress anyway, rerun rdiff-backup with the --force option.""" % (pid,))