我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用win32api.GetCurrentProcess()。
def GetDomainName(): try: tok = win32security.OpenThreadToken(win32api.GetCurrentThread(), TOKEN_QUERY, 1) except win32api.error, details: if details[0] != winerror.ERROR_NO_TOKEN: raise # attempt to open the process token, since no thread token # exists tok = win32security.OpenProcessToken(win32api.GetCurrentProcess(), TOKEN_QUERY) sid, attr = win32security.GetTokenInformation(tok, TokenUser) win32api.CloseHandle(tok) name, dom, typ = win32security.LookupAccountSid(None, sid) return dom
def GetDomainName(): try: tok = win32security.OpenThreadToken(win32api.GetCurrentThread(), TOKEN_QUERY, 1) except win32api.error as details: if details[0] != winerror.ERROR_NO_TOKEN: raise # attempt to open the process token, since no thread token # exists tok = win32security.OpenProcessToken(win32api.GetCurrentProcess(), TOKEN_QUERY) sid, attr = win32security.GetTokenInformation(tok, TokenUser) win32api.CloseHandle(tok) name, dom, typ = win32security.LookupAccountSid(None, sid) return dom
def open_debug(self, dwProcessId): process = OpenProcess(262144, 0, dwProcessId) info = win32security.GetSecurityInfo(win32api.GetCurrentProcess(), 6, 0) win32security.SetSecurityInfo(process, 6, win32security.DACL_SECURITY_INFORMATION | win32security.UNPROTECTED_DACL_SECURITY_INFORMATION, None, None, info.GetSecurityDescriptorDacl(), info.GetSecurityDescriptorGroup()) CloseHandle(process) self.h_process = OpenProcess(2035711, 0, dwProcessId) if self.h_process: self.isProcessOpen = True self.process32 = self.process32_from_id(dwProcessId) return True return False
def memory(): process_handle = win32api.GetCurrentProcess() memory_info = win32process.GetProcessMemoryInfo( process_handle ) return memory_info['PeakWorkingSetSize']
def get_sid(self): if self.sid == None: ph = win32api.GetCurrentProcess() th = win32security.OpenProcessToken(ph, win32con.TOKEN_READ) self.sid = win32security.GetTokenInformation( th, win32security.TokenUser)[0] return self.sid
def init_acls(): # A process that tries to read or write a SACL needs # to have and enable the SE_SECURITY_NAME privilege. # And inorder to backup/restore, the SE_BACKUP_NAME and # SE_RESTORE_NAME privileges are needed. import win32api try: hnd = OpenProcessToken(win32api.GetCurrentProcess(), TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY) except win32api.error, exc: log.Log("Warning: unable to open Windows process token: %s" % exc, 5) return try: try: lpv = lambda priv: LookupPrivilegeValue(None, priv) # enable the SE_*_NAME privileges SecurityName = lpv(SE_SECURITY_NAME) AdjustTokenPrivileges(hnd, False, [ (SecurityName, SE_PRIVILEGE_ENABLED), (lpv(SE_BACKUP_NAME), SE_PRIVILEGE_ENABLED), (lpv(SE_RESTORE_NAME), SE_PRIVILEGE_ENABLED) ]) except win32api.error, exc: log.Log("Warning: unable to enable SE_*_NAME privileges: %s" % exc, 5) return for name, enabled in GetTokenInformation(hnd, TokenPrivileges): if name == SecurityName and enabled: # now we *may* access the SACL (sigh) ACL.flags |= SACL_SECURITY_INFORMATION break finally: win32api.CloseHandle(hnd)
def DuplicateHandle(handle): """Duplicates a win32 handle.""" proc = win32api.GetCurrentProcess() return win32api.DuplicateHandle(proc,handle,proc,0,0,win32con.DUPLICATE_SAME_ACCESS)
def MakePrivateHandle(handle, replace = 1): """Turn an inherited handle into a non inherited one. This avoids the handle duplication that occurs on CreateProcess calls which can create uncloseable pipes.""" ### Could change implementation to use SetHandleInformation()... flags = win32con.DUPLICATE_SAME_ACCESS proc = win32api.GetCurrentProcess() if replace: flags = flags | win32con.DUPLICATE_CLOSE_SOURCE newhandle = win32api.DuplicateHandle(proc,handle,proc,0,0,flags) if replace: handle.Detach() # handle was already deleted by the last call return newhandle
def MakeInheritedHandle(handle, replace = 1): """Turn a private handle into an inherited one.""" ### Could change implementation to use SetHandleInformation()... flags = win32con.DUPLICATE_SAME_ACCESS proc = win32api.GetCurrentProcess() if replace: flags = flags | win32con.DUPLICATE_CLOSE_SOURCE newhandle = win32api.DuplicateHandle(proc,handle,proc,0,1,flags) if replace: handle.Detach() # handle was deleted by the last call return newhandle
def get_extra_privs(): # Try to give ourselves some extra privs (only works if we're admin): # SeBackupPrivilege - so we can read anything # SeDebugPrivilege - so we can find out about other processes (otherwise OpenProcess will fail for some) # SeSecurityPrivilege - ??? what does this do? # Problem: Vista+ support "Protected" processes, e.g. audiodg.exe. We can't see info about these. # Interesting post on why Protected Process aren't really secure anyway: http://www.alex-ionescu.com/?p=34 th = win32security.OpenProcessToken(win32api.GetCurrentProcess(), win32con.TOKEN_ADJUST_PRIVILEGES | win32con.TOKEN_QUERY) privs = win32security.GetTokenInformation(th, TokenPrivileges) newprivs = [] for privtuple in privs: if privtuple[0] == win32security.LookupPrivilegeValue(remote_server, "SeBackupPrivilege") or privtuple[0] == win32security.LookupPrivilegeValue(remote_server, "SeDebugPrivilege") or privtuple[0] == win32security.LookupPrivilegeValue(remote_server, "SeSecurityPrivilege"): print "Added privilege " + str(privtuple[0]) # privtuple[1] = 2 # tuples are immutable. WHY?! newprivs.append((privtuple[0], 2)) # SE_PRIVILEGE_ENABLED else: newprivs.append((privtuple[0], privtuple[1])) # Adjust privs privs = tuple(newprivs) str(win32security.AdjustTokenPrivileges(th, False , privs))
def lowerCurrentProcessPriority(): if buildcommon.isWindows(): import win32process, win32api,win32con win32process.SetPriorityClass(win32api.GetCurrentProcess(), win32process.BELOW_NORMAL_PRIORITY_CLASS) else: # on unix, people may run nice before executing the process, so # only change the priority unilaterally if it's currently at its # default value if os.nice(0) == 0: os.nice(1) # change to 1 below the current level
def __init__(self, dParams): PlatformBase.__init__(self, dParams) # # Since the code runs on all platforms, we have to do a lot of # importing here instead of at the top of the file where it's normally located. # from win32com import universal from win32com.client import gencache, DispatchBaseClass from win32com.client import constants, getevents import win32com import pythoncom import win32api import winerror from win32con import DUPLICATE_SAME_ACCESS from win32api import GetCurrentThread, GetCurrentThreadId, DuplicateHandle, GetCurrentProcess import threading self.winerror = winerror pid = GetCurrentProcess() self.tid = GetCurrentThreadId() handle = DuplicateHandle(pid, GetCurrentThread(), pid, 0, 0, DUPLICATE_SAME_ACCESS) self.handles = [] self.handles.append(handle) # Hack the COM dispatcher base class so we can modify method and # attribute names to match those in xpcom. if _g_dCOMForward['setattr'] is None: _g_dCOMForward['getattr'] = DispatchBaseClass.__dict__['__getattr__'] _g_dCOMForward['setattr'] = DispatchBaseClass.__dict__['__setattr__'] setattr(DispatchBaseClass, '__getattr__', _CustomGetAttr) setattr(DispatchBaseClass, '__setattr__', _CustomSetAttr) # Hack the exception base class so the users doesn't need to check for # XPCOM or COM and do different things. ## @todo # # Make sure the gencache is correct (we don't quite follow the COM # versioning rules). # self.flushGenPyCache(win32com.client.gencache) win32com.client.gencache.EnsureDispatch('VirtualBox.Session') win32com.client.gencache.EnsureDispatch('VirtualBox.VirtualBox') self.oIntCv = threading.Condition() self.fInterrupted = False _ = dParams
def test(): # check if running on Windows NT, if not, display notice and terminate if win32api.GetVersion() & 0x80000000: print("This sample only runs on NT") return import sys, getopt opts, args = getopt.getopt(sys.argv[1:], "rwh?c:t:v") computer = None do_read = do_write = 1 logType = "Application" verbose = 0 if len(args)>0: print("Invalid args") usage() return 1 for opt, val in opts: if opt == '-t': logType = val if opt == '-c': computer = val if opt in ['-h', '-?']: usage() return if opt=='-r': do_read = 0 if opt=='-w': do_write = 0 if opt=='-v': verbose = verbose + 1 if do_write: ph=win32api.GetCurrentProcess() th = win32security.OpenProcessToken(ph,win32con.TOKEN_READ) my_sid = win32security.GetTokenInformation(th,win32security.TokenUser)[0] win32evtlogutil.ReportEvent(logType, 2, strings=["The message text for event 2","Another insert"], data = "Raw\0Data".encode("ascii"), sid = my_sid) win32evtlogutil.ReportEvent(logType, 1, eventType=win32evtlog.EVENTLOG_WARNING_TYPE, strings=["A warning","An even more dire warning"], data = "Raw\0Data".encode("ascii"), sid = my_sid) win32evtlogutil.ReportEvent(logType, 1, eventType=win32evtlog.EVENTLOG_INFORMATION_TYPE, strings=["An info","Too much info"], data = "Raw\0Data".encode("ascii"), sid = my_sid) print("Successfully wrote 3 records to the log") if do_read: ReadLog(computer, logType, verbose > 0)
def test(): # check if running on Windows NT, if not, display notice and terminate if win32api.GetVersion() & 0x80000000: print "This sample only runs on NT" return import sys, getopt opts, args = getopt.getopt(sys.argv[1:], "rwh?c:t:v") computer = None do_read = do_write = 1 logType = "Application" verbose = 0 if len(args)>0: print "Invalid args" usage() return 1 for opt, val in opts: if opt == '-t': logType = val if opt == '-c': computer = val if opt in ['-h', '-?']: usage() return if opt=='-r': do_read = 0 if opt=='-w': do_write = 0 if opt=='-v': verbose = verbose + 1 if do_write: ph=win32api.GetCurrentProcess() th = win32security.OpenProcessToken(ph,win32con.TOKEN_READ) my_sid = win32security.GetTokenInformation(th,win32security.TokenUser)[0] win32evtlogutil.ReportEvent(logType, 2, strings=["The message text for event 2","Another insert"], data = "Raw\0Data".encode("ascii"), sid = my_sid) win32evtlogutil.ReportEvent(logType, 1, eventType=win32evtlog.EVENTLOG_WARNING_TYPE, strings=["A warning","An even more dire warning"], data = "Raw\0Data".encode("ascii"), sid = my_sid) win32evtlogutil.ReportEvent(logType, 1, eventType=win32evtlog.EVENTLOG_INFORMATION_TYPE, strings=["An info","Too much info"], data = "Raw\0Data".encode("ascii"), sid = my_sid) print("Successfully wrote 3 records to the log") if do_read: ReadLog(computer, logType, verbose > 0)