我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.wintypes()。
def _is_pid_running_on_windows(pid): import ctypes.wintypes kernel32 = ctypes.windll.kernel32 handle = kernel32.OpenProcess(1, 0, pid) if handle == 0: return False # If the process exited recently, a pid may still exist for the handle. # So, check if we can get the exit code. exit_code = ctypes.wintypes.DWORD() is_running = ( kernel32.GetExitCodeProcess(handle, ctypes.byref(exit_code)) == 0) kernel32.CloseHandle(handle) # See if we couldn't get the exit code or the exit code indicates that the # process is still running. return is_running or exit_code.value == _STILL_ACTIVE # %% Code for detecting the executablews
def OpenProcessToken(proc_handle, access): result = ctypes.wintypes.HANDLE() proc_handle = ctypes.wintypes.HANDLE(proc_handle) handle_nonzero_success(ctypes.windll.advapi32.OpenProcessToken( proc_handle, access, ctypes.byref(result))) return result
def get_dnsserver_list(): if os.name == 'nt': import ctypes import ctypes.wintypes DNS_CONFIG_DNS_SERVER_LIST = 6 buf = ctypes.create_string_buffer(2048) ctypes.windll.dnsapi.DnsQueryConfig(DNS_CONFIG_DNS_SERVER_LIST, 0, None, None, ctypes.byref(buf), ctypes.byref(ctypes.wintypes.DWORD(len(buf)))) ipcount = struct.unpack('I', buf[0:4])[0] iplist = [socket.inet_ntoa(buf[i:i+4]) for i in xrange(4, ipcount*4+4, 4)] return iplist elif os.path.isfile('/etc/resolv.conf'): with open('/etc/resolv.conf', 'rb') as fp: return re.findall(r'(?m)^nameserver\s+(\S+)', fp.read()) else: logging.warning("get_dnsserver_list failed: unsupport platform '%s-%s'", sys.platform, os.name) return []
def _is_gui_available(): UOI_FLAGS = 1 WSF_VISIBLE = 0x0001 class USEROBJECTFLAGS(ctypes.Structure): _fields_ = [("fInherit", ctypes.wintypes.BOOL), ("fReserved", ctypes.wintypes.BOOL), ("dwFlags", ctypes.wintypes.DWORD)] dll = ctypes.windll.user32 h = dll.GetProcessWindowStation() if not h: raise ctypes.WinError() uof = USEROBJECTFLAGS() needed = ctypes.wintypes.DWORD() res = dll.GetUserObjectInformationW(h, UOI_FLAGS, ctypes.byref(uof), ctypes.sizeof(uof), ctypes.byref(needed)) if not res: raise ctypes.WinError() return bool(uof.dwFlags & WSF_VISIBLE)
def CreateFile(path, access=GENERIC_READ | GENERIC_WRITE, mode=0, security_attributes=NULL, creation=OPEN_EXISTING, flags=FILE_ATTRIBUTE_NORMAL, template_file = NULL): """See: CreateFile function http://msdn.microsoft.com/en-us/library/windows/desktop/aa363858(v=vs.85).aspx """ CreateFile_Fn = windll.kernel32.CreateFileA CreateFile_Fn.argtypes = [ wintypes.LPCSTR, # _In_ LPCTSTR lpFileName wintypes.DWORD, # _In_ DWORD dwDesiredAccess wintypes.DWORD, # _In_ DWORD dwShareMode LPSECURITY_ATTRIBUTES, # _In_opt_ LPSECURITY_ATTRIBUTES lpSecurityAttributes wintypes.DWORD, # _In_ DWORD dwCreationDisposition wintypes.DWORD, # _In_ DWORD dwFlagsAndAttributes wintypes.HANDLE] # _In_opt_ HANDLE hTemplateFile CreateFile_Fn.restype = wintypes.HANDLE handle = wintypes.HANDLE(CreateFile_Fn(path, access, mode, security_attributes, creation, flags, template_file)) return handle
def WriteFile(file, buffer, number_of_bytes_to_write, number_of_bytes_written, overlapped): """See: WriteFile function https://msdn.microsoft.com/en-us/library/windows/desktop/aa365747(v=vs.85).aspx """ WriteFile_Fn = windll.kernel32.WriteFile WriteFile_Fn.argtypes = [ wintypes.HANDLE, # _In_ HANDLE hFile, wintypes.LPCVOID, # _In_ LPCVOID lpBuffer, wintypes.DWORD, # _In_ DWORD nNumberOfBytesToWrite, LPDWORD, # _Out_opt_ LPDWORD lpNumberOfBytesWritten, LPOVERLAPPED # _Inout_opt_ LPOVERLAPPED lpOverlapped ] WriteFile_Fn.restype = wintypes.BOOL ret = wintypes.BOOL(WriteFile_Fn( file, buffer, number_of_bytes_to_write, number_of_bytes_written, overlapped )) return ret
def open_service(service_manager_handle, service_name, desired_access): """ See: OpenService function https://msdn.microsoft.com/en-us/library/windows/desktop/ms684330(v=vs.85).aspx """ OpenService_Fn = windll.Advapi32.OpenServiceA #SC_HANDLE WINAPI OpenService( OpenService_Fn.argtypes = [ # wintypes.HANDLE, # _In_ SC_HANDLE hSCManager, LPCTSTR, # _In_ LPCTSTR lpServiceName, wintypes.DWORD # _In_ DWORD dwDesiredAccess ] OpenService_Fn.restype = wintypes.SC_HANDLE handle = OpenService_Fn( service_manager_handle, service_name, desired_access ) return handle
def control_service(service_handle, control, service_status): """See: ControlService function https://msdn.microsoft.com/en-us/library/windows/desktop/ms682108(v=vs.85).aspx """ ControlService_Fn = windll.Advapi32.ControlService #BOOL WINAPI ControlService( ControlService_Fn.argtypes = [ # wintypes.SC_HANDLE, # _In_ SC_HANDLE hService, wintypes.DWORD, # _In_ DWORD dwControl, wintypes.LPCVOID # _Out_ LPSERVICE_STATUS lpServiceStatus ] ControlService_Fn.restype = wintypes.BOOL bool = ControlService_Fn( service_handle, control, service_status ) return bool
def open_sc_manager(machine_name, database_name, desired_access): """See: OpenSCManager function https://msdn.microsoft.com/en-us/library/windows/desktop/ms684323(v=vs.85).aspx """ OpenSCManager_Fn = windll.Advapi32.OpenSCManagerA #SC_HANDLE WINAPI OpenSCManager( OpenSCManager_Fn.argtypes = [ # LPCTSTR, # _In_opt_ LPCTSTR lpMachineName, LPCTSTR, # _In_opt_ LPCTSTR lpDatabaseName, wintypes.DWORD # _In_ DWORD dwDesiredAccess ] OpenSCManager_Fn.restype = wintypes.SC_HANDLE handle = OpenSCManager_Fn( machine_name, database_name, desired_access ) return handle
def start_service(service_handle, service_arg_count, service_arg_vectors): """See: StartService function https://msdn.microsoft.com/en-us/library/windows/desktop/ms686321(v=vs.85).aspx """ StartService_Fn = windll.Advapi32.StartServiceA #BOOL WINAPI StartService( StartService_Fn.argtypes = [ # wintypes.SC_HANDLE, # _In_ SC_HANDLE hService, wintypes.DWORD, # _In_ DWORD dwNumServiceArgs, LPCTSTR # _In_opt_ LPCTSTR *lpServiceArgVectors ] StartService_Fn.restype = wintypes.BOOL bool = StartService_Fn( service_handle, service_arg_count, service_arg_vectors ) return bool
def _create_windows(source, destination, link_type): """Creates hardlink at destination from source in Windows.""" if link_type == HARDLINK: import ctypes from ctypes.wintypes import BOOL CreateHardLink = ctypes.windll.kernel32.CreateHardLinkW CreateHardLink.argtypes = [ ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_void_p ] CreateHardLink.restype = BOOL res = CreateHardLink(destination, source, None) if res == 0: raise ctypes.WinError() else: raise NotImplementedError("Link type unrecognized.")
def open_device(self, access=GENERIC_READ | GENERIC_WRITE, mode=0, creation=OPEN_EXISTING, flags=FILE_ATTRIBUTE_NORMAL): """See: CreateFile function http://msdn.microsoft.com/en-us/library/windows/desktop/aa363858(v=vs.85).aspx """ CreateFile_Fn = windll.kernel32.CreateFileA CreateFile_Fn.argtypes = [ wintypes.LPCSTR, # _In_ LPCTSTR lpFileName wintypes.DWORD, # _In_ DWORD dwDesiredAccess wintypes.DWORD, # _In_ DWORD dwShareMode LPSECURITY_ATTRIBUTES, # _In_opt_ LPSECURITY_ATTRIBUTES lpSecurityAttributes wintypes.DWORD, # _In_ DWORD dwCreationDisposition wintypes.DWORD, # _In_ DWORD dwFlagsAndAttributes wintypes.HANDLE] # _In_opt_ HANDLE hTemplateFile CreateFile_Fn.restype = wintypes.HANDLE self.handle = wintypes.HANDLE(CreateFile_Fn('\\\\.\\' + self.name, access, mode, NULL, creation, flags, NULL))
def __init__ (self): import ctypes.wintypes if sys.platform[:3] != 'win': raise SystemError('Windows is required') self.__winmm = ctypes.windll.LoadLibrary('winmm.dll') self.__mciSendStringW = self.__winmm.mciSendStringW self.__mciGetErrorStringW = self.__winmm.mciGetErrorStringW wintypes = ctypes.wintypes LPCWSTR, HANDLE = wintypes.LPCWSTR, wintypes.HANDLE args = [LPCWSTR, ctypes.c_char_p, wintypes.UINT, HANDLE] self.__mciSendStringW.argtypes = args self.__mciSendStringW.restype = wintypes.DWORD args = [wintypes.DWORD, ctypes.c_void_p, wintypes.UINT] self.__mciGetErrorStringW.argtypes = args self.__mciGetErrorStringW.restype = wintypes.BOOL self.__buffer = ctypes.create_string_buffer('?' * 4098) self.__alias_index = 0
def QueryValueEx(key, value_name): """This calls the Windows QueryValueEx function in a Unicode safe way.""" size = 256 data_type = ctypes.wintypes.DWORD() while True: tmp_size = ctypes.wintypes.DWORD(size) buf = ctypes.create_string_buffer(size) rc = RegQueryValueEx(key.handle, value_name, LPDWORD(), ctypes.byref(data_type), ctypes.cast(buf, LPBYTE), ctypes.byref(tmp_size)) if rc != ERROR_MORE_DATA: break # We limit the size here to ~10 MB so the response doesn't get too big. if size > 10 * 1024 * 1024: raise exceptions.WindowsError("Value too big to be read.") size *= 2 if rc != ERROR_SUCCESS: raise ctypes.WinError(2) return (Reg2Py(buf, tmp_size.value, data_type.value), data_type.value)
def remove_ca(self, name): import ctypes import ctypes.wintypes class CERT_CONTEXT(ctypes.Structure): _fields_ = [ ('dwCertEncodingType', ctypes.wintypes.DWORD), ('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)), ('cbCertEncoded', ctypes.wintypes.DWORD), ('pCertInfo', ctypes.c_void_p), ('hCertStore', ctypes.c_void_p),] crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode()) store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode()) pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None) while pCertCtx: certCtx = CERT_CONTEXT.from_address(pCertCtx) certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded) cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certdata) if hasattr(cert, 'get_subject'): cert = cert.get_subject() cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '') if cert_name and name.lower() == cert_name.split()[0].lower(): crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx)) pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx) return 0
def remove_cert(name): if os.name == 'nt': import ctypes, ctypes.wintypes class CERT_CONTEXT(ctypes.Structure): _fields_ = [ ('dwCertEncodingType', ctypes.wintypes.DWORD), ('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)), ('cbCertEncoded', ctypes.wintypes.DWORD), ('pCertInfo', ctypes.c_void_p), ('hCertStore', ctypes.c_void_p),] crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode()) store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode()) pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None) while pCertCtx: certCtx = CERT_CONTEXT.from_address(pCertCtx) certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded) cert = crypto.load_certificate(crypto.FILETYPE_ASN1, certdata) if hasattr(cert, 'get_subject'): cert = cert.get_subject() cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '') if cert_name and name == cert_name: crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx)) pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx) return 0 return -1
def notify(class_): """ Notify other windows that the environment has changed (following http://support.microsoft.com/kb/104011). """ # TODO: Implement Microsoft UIPI (User Interface Privilege Isolation) to # elevate privilege to system level so the system gets this notification # for now, this must be run as admin to work as expected return_val = ctypes.wintypes.DWORD() res = message.SendMessageTimeout( message.HWND_BROADCAST, message.WM_SETTINGCHANGE, 0, # wparam must be null 'Environment', message.SMTO_ABORTIFHUNG, 5000, # timeout in ms return_val, ) error.handle_nonzero_success(res)
def isfile_cached(self): # optimize for nt.stat calls, assuming there are many files for few folders try: cache = self.__class__.cache_isfile_cache except AttributeError: cache = self.__class__.cache_isfile_cache = {} try: c1 = cache[id(self.parent)] except KeyError: c1 = cache[id(self.parent)] = [] curpath = self.parent.abspath() findData = ctypes.wintypes.WIN32_FIND_DATAW() find = FindFirstFile(TP % curpath, ctypes.byref(findData)) if find == INVALID_HANDLE_VALUE: Logs.error("invalid win32 handle isfile_cached %r" % self.abspath()) return os.path.isfile(self.abspath()) try: while True: if findData.cFileName not in UPPER_FOLDERS: thatsadir = findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY if not thatsadir: c1.append(str(findData.cFileName)) if not FindNextFile(find, ctypes.byref(findData)): break except Exception as e: Logs.error('exception while listing a folder %r %r' % (self.abspath(), e)) return os.path.isfile(self.abspath()) finally: FindClose(find) return self.name in c1
def getPreferencesDir(): buf = ctypes.create_unicode_buffer( ctypes.wintypes.MAX_PATH ) ctypes.windll.shell32.SHGetFolderPathW( 0, CSIDL_APPDATA, 0, SHGFP_TYPE_CURRENT, buf ) return pathlib.Path( buf.value )
def getWindowsDir(): buf = ctypes.create_unicode_buffer( ctypes.wintypes.MAX_PATH ) ctypes.windll.shell32.SHGetFolderPathW( 0, CSIDL_WINDOWS, 0, SHGFP_TYPE_CURRENT, buf ) return pathlib.Path( buf.value )
def getProgramFilesDir(): buf = ctypes.create_unicode_buffer( ctypes.wintypes.MAX_PATH ) ctypes.windll.shell32.SHGetFolderPathW( 0, CSIDL_PROGRAM_FILES, 0, SHGFP_TYPE_CURRENT, buf ) return pathlib.Path( buf.value )
def format_system_message(errno): """ Call FormatMessage with a system error number to retrieve the descriptive error message. """ # first some flags used by FormatMessageW ALLOCATE_BUFFER = 0x100 ARGUMENT_ARRAY = 0x2000 FROM_HMODULE = 0x800 FROM_STRING = 0x400 FROM_SYSTEM = 0x1000 IGNORE_INSERTS = 0x200 # Let FormatMessageW allocate the buffer (we'll free it below) # Also, let it know we want a system error message. flags = ALLOCATE_BUFFER | FROM_SYSTEM source = None message_id = errno language_id = 0 result_buffer = ctypes.wintypes.LPWSTR() buffer_size = 0 arguments = None bytes = ctypes.windll.kernel32.FormatMessageW( flags, source, message_id, language_id, ctypes.byref(result_buffer), buffer_size, arguments, ) # note the following will cause an infinite loop if GetLastError # repeatedly returns an error that cannot be formatted, although # this should not happen. handle_nonzero_success(bytes) message = result_buffer.value ctypes.windll.kernel32.LocalFree(result_buffer) return message
def GetTokenInformation(token, information_class): """ Given a token, get the token information for it. """ data_size = ctypes.wintypes.DWORD() ctypes.windll.advapi32.GetTokenInformation(token, information_class.num, 0, 0, ctypes.byref(data_size)) data = ctypes.create_string_buffer(data_size.value) handle_nonzero_success(ctypes.windll.advapi32.GetTokenInformation(token, information_class.num, ctypes.byref(data), ctypes.sizeof(data), ctypes.byref(data_size))) return ctypes.cast(data, ctypes.POINTER(TOKEN_USER)).contents
def remove_windows_ca(name): import ctypes import ctypes.wintypes class CERT_CONTEXT(ctypes.Structure): _fields_ = [ ('dwCertEncodingType', ctypes.wintypes.DWORD), ('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)), ('cbCertEncoded', ctypes.wintypes.DWORD), ('pCertInfo', ctypes.c_void_p), ('hCertStore', ctypes.c_void_p),] try: crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode()) store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode()) pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None) while pCertCtx: certCtx = CERT_CONTEXT.from_address(pCertCtx) certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded) cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certdata) if hasattr(cert, 'get_subject'): cert = cert.get_subject() cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '') if cert_name and name == cert_name: crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx)) pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx) except Exception as e: logging.warning('CertUtil.remove_windows_ca failed: %r', e)
def remove_windows_ca(name): import ctypes import ctypes.wintypes class CERT_CONTEXT(ctypes.Structure): _fields_ = [ ('dwCertEncodingType', ctypes.wintypes.DWORD), ('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)), ('cbCertEncoded', ctypes.wintypes.DWORD), ('pCertInfo', ctypes.c_void_p), ('hCertStore', ctypes.c_void_p),] try: crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode()) store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode()) pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None) while pCertCtx: certCtx = CERT_CONTEXT.from_address(pCertCtx) certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded) cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certdata) if hasattr(cert, 'get_subject'): cert = cert.get_subject() cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '') if cert_name and name == cert_name: crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx)) pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx) except Exception as e: xlog.warning('CertUtil.remove_windows_ca failed: %r', e)
def to_rect(coords): """Converts a set of coordinates to a windows RECT Converts the form [x1, y1, x2, y2] to a windows RECT structure. Args: coords: List of coordinates in the form [x1, y1, x2, y2]. Returns: A windows RECT structure. """ rect = ctypes.wintypes.RECT(*coords) return rect
def CreateNamedPipe(name, open_mode, pipe_mode, max_instances, out_buffer_size, in_buffer_size, default_time_out, security_attributes): """See: CreateNamedPipe function https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx """ CreateNamedPipe_Fn = windll.kernel32.CreateNamedPipe CreateNamedPipe_Fn.argtypes = [ wintypes.LPCSTR, #LPCTSTR lpName, wintypes.DWORD, #_In_ DWORD dwOpenMode, wintypes.DWORD, #_In_ DWORD dwPipeMode, wintypes.DWORD, #_In_ DWORD nMaxInstances, wintypes.DWORD, #_In_ DWORD nOutBufferSize, wintypes.DWORD, #_In_ DWORD nInBufferSize, wintypes.DWORD, #_In_ DWORD nDefaultTimeOut, LPSECURITY_ATTRIBUTES #_In_opt_ LPSECURITY_ATTRIBUTES lpSecurityAttributes ] CreateNamedPipe_Fn.restype = wintypes.HANDLE handle = wintypes.HANDLE(CreateNamedPipe_Fn( name, open_mode, pipe_mode, max_instances, out_buffer_size, in_buffer_size, default_time_out, security_attributes )) return handle
def ConnectNamedPipe(named_pipe, overlapped): """See: ConnectNamedPipe function https://msdn.microsoft.com/en-us/library/windows/desktop/aa365146(v=vs.85).aspx """ ConnectNamedPipe_Fn = windll.kernel32.ConnectNamedPipe ConnectNamedPipe_Fn.argtypes = [ wintypes.HANDLE, # _In_ HANDLE hNamedPipe, LPOVERLAPPED # _Inout_opt_ LPOVERLAPPED lpOverlapped ] ConnectNamedPipe_Fn.restype = wintypes.BOOL ret = wintypes.BOOL(ConnectNamedPipe_Fn( named_pipe, overlapped )) return ret
def ReadFile(file, buffer, number_of_bytes_to_read, number_of_bytes_read, overlapped): """See: ReadFile function https://msdn.microsoft.com/en-us/library/windows/desktop/aa365467(v=vs.85).aspx """ ReadFile_Fn = windll.kernel32.ReadFile ReadFile_Fn.argtypes = [ wintypes.HANDLE, # _In_ HANDLE hFile, LPVOID, # _Out_ LPVOID lpBuffer, wintypes.DWORD, # _In_ DWORD nNumberOfBytesToRead, LPDWORD, # _Out_opt_ LPDWORD lpNumberOfBytesRead, LPOVERLAPPED # _Inout_opt_ LPOVERLAPPED lpOverlapped ] ReadFile_Fn.restype = wintypes.BOOL ret = wintypes.BOOL(ReadFile_Fn( file, buffer, number_of_bytes_to_read, number_of_bytes_read, overlapped ))