Python ctypes 模块,wintypes() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.wintypes()

项目:pyelastix    作者:almarklein    | 项目源码 | 文件源码
def _is_pid_running_on_windows(pid):
    import ctypes.wintypes

    kernel32 = ctypes.windll.kernel32
    handle = kernel32.OpenProcess(1, 0, pid)
    if handle == 0:
        return False

    # If the process exited recently, a pid may still exist for the handle.
    # So, check if we can get the exit code.
    exit_code = ctypes.wintypes.DWORD()
    is_running = (
        kernel32.GetExitCodeProcess(handle, ctypes.byref(exit_code)) == 0)
    kernel32.CloseHandle(handle)

    # See if we couldn't get the exit code or the exit code indicates that the
    # process is still running.
    return is_running or exit_code.value == _STILL_ACTIVE


# %% Code for detecting the executablews
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def OpenProcessToken(proc_handle, access):
    result = ctypes.wintypes.HANDLE()
    proc_handle = ctypes.wintypes.HANDLE(proc_handle)
    handle_nonzero_success(ctypes.windll.advapi32.OpenProcessToken(
        proc_handle, access, ctypes.byref(result)))
    return result
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def get_dnsserver_list():
    if os.name == 'nt':
        import ctypes
        import ctypes.wintypes
        DNS_CONFIG_DNS_SERVER_LIST = 6
        buf = ctypes.create_string_buffer(2048)
        ctypes.windll.dnsapi.DnsQueryConfig(DNS_CONFIG_DNS_SERVER_LIST, 0, None, None, ctypes.byref(buf), ctypes.byref(ctypes.wintypes.DWORD(len(buf))))
        ipcount = struct.unpack('I', buf[0:4])[0]
        iplist = [socket.inet_ntoa(buf[i:i+4]) for i in xrange(4, ipcount*4+4, 4)]
        return iplist
    elif os.path.isfile('/etc/resolv.conf'):
        with open('/etc/resolv.conf', 'rb') as fp:
            return re.findall(r'(?m)^nameserver\s+(\S+)', fp.read())
    else:
        logging.warning("get_dnsserver_list failed: unsupport platform '%s-%s'", sys.platform, os.name)
        return []
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def _is_gui_available():
        UOI_FLAGS = 1
        WSF_VISIBLE = 0x0001
        class USEROBJECTFLAGS(ctypes.Structure):
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
                        ("fReserved", ctypes.wintypes.BOOL),
                        ("dwFlags", ctypes.wintypes.DWORD)]
        dll = ctypes.windll.user32
        h = dll.GetProcessWindowStation()
        if not h:
            raise ctypes.WinError()
        uof = USEROBJECTFLAGS()
        needed = ctypes.wintypes.DWORD()
        res = dll.GetUserObjectInformationW(h,
            UOI_FLAGS,
            ctypes.byref(uof),
            ctypes.sizeof(uof),
            ctypes.byref(needed))
        if not res:
            raise ctypes.WinError()
        return bool(uof.dwFlags & WSF_VISIBLE)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _is_gui_available():
        UOI_FLAGS = 1
        WSF_VISIBLE = 0x0001
        class USEROBJECTFLAGS(ctypes.Structure):
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
                        ("fReserved", ctypes.wintypes.BOOL),
                        ("dwFlags", ctypes.wintypes.DWORD)]
        dll = ctypes.windll.user32
        h = dll.GetProcessWindowStation()
        if not h:
            raise ctypes.WinError()
        uof = USEROBJECTFLAGS()
        needed = ctypes.wintypes.DWORD()
        res = dll.GetUserObjectInformationW(h,
            UOI_FLAGS,
            ctypes.byref(uof),
            ctypes.sizeof(uof),
            ctypes.byref(needed))
        if not res:
            raise ctypes.WinError()
        return bool(uof.dwFlags & WSF_VISIBLE)
项目:driverlib    作者:sam-b    | 项目源码 | 文件源码
def CreateFile(path, access=GENERIC_READ | GENERIC_WRITE, mode=0, security_attributes=NULL, creation=OPEN_EXISTING, flags=FILE_ATTRIBUTE_NORMAL, template_file = NULL):
    """See: CreateFile function
       http://msdn.microsoft.com/en-us/library/windows/desktop/aa363858(v=vs.85).aspx
    """
    CreateFile_Fn = windll.kernel32.CreateFileA
    CreateFile_Fn.argtypes = [
            wintypes.LPCSTR,                    # _In_          LPCTSTR lpFileName
            wintypes.DWORD,                     # _In_          DWORD dwDesiredAccess
            wintypes.DWORD,                     # _In_          DWORD dwShareMode
            LPSECURITY_ATTRIBUTES,              # _In_opt_      LPSECURITY_ATTRIBUTES lpSecurityAttributes
            wintypes.DWORD,                     # _In_          DWORD dwCreationDisposition
            wintypes.DWORD,                     # _In_          DWORD dwFlagsAndAttributes
            wintypes.HANDLE]                    # _In_opt_      HANDLE hTemplateFile
    CreateFile_Fn.restype = wintypes.HANDLE

    handle = wintypes.HANDLE(CreateFile_Fn(path,
                         access,
                         mode,
                         security_attributes,
                         creation,
                         flags,
                         template_file))
    return handle
项目:driverlib    作者:sam-b    | 项目源码 | 文件源码
def WriteFile(file, buffer, number_of_bytes_to_write, number_of_bytes_written, overlapped):
    """See: WriteFile function 
        https://msdn.microsoft.com/en-us/library/windows/desktop/aa365747(v=vs.85).aspx
    """
    WriteFile_Fn = windll.kernel32.WriteFile
    WriteFile_Fn.argtypes = [
        wintypes.HANDLE,    # _In_        HANDLE       hFile,
        wintypes.LPCVOID,   # _In_        LPCVOID      lpBuffer,
        wintypes.DWORD,     # _In_        DWORD        nNumberOfBytesToWrite,
        LPDWORD,            # _Out_opt_   LPDWORD      lpNumberOfBytesWritten,
        LPOVERLAPPED        # _Inout_opt_ LPOVERLAPPED lpOverlapped
    ]
    WriteFile_Fn.restype = wintypes.BOOL
    ret = wintypes.BOOL(WriteFile_Fn(
        file, 
        buffer, 
        number_of_bytes_to_write, 
        number_of_bytes_written, 
        overlapped
    ))
    return ret
项目:driverlib    作者:sam-b    | 项目源码 | 文件源码
def open_service(service_manager_handle, service_name, desired_access):
    """ See: OpenService function
    https://msdn.microsoft.com/en-us/library/windows/desktop/ms684330(v=vs.85).aspx
    """
    OpenService_Fn = windll.Advapi32.OpenServiceA   #SC_HANDLE WINAPI OpenService(
    OpenService_Fn.argtypes = [                     #
        wintypes.HANDLE,                            #   _In_ SC_HANDLE hSCManager,
        LPCTSTR,                            #   _In_ LPCTSTR   lpServiceName,
        wintypes.DWORD                              #   _In_ DWORD     dwDesiredAccess
    ]
    OpenService_Fn.restype = wintypes.SC_HANDLE
    handle = OpenService_Fn(
        service_manager_handle,
        service_name,
        desired_access
    )
    return handle
项目:driverlib    作者:sam-b    | 项目源码 | 文件源码
def control_service(service_handle, control, service_status):
    """See: ControlService function
    https://msdn.microsoft.com/en-us/library/windows/desktop/ms682108(v=vs.85).aspx
    """
    ControlService_Fn = windll.Advapi32.ControlService      #BOOL WINAPI ControlService(
    ControlService_Fn.argtypes = [                          #
        wintypes.SC_HANDLE,                                 #   _In_  SC_HANDLE        hService,
        wintypes.DWORD,                                     #   _In_  DWORD            dwControl,
        wintypes.LPCVOID                                    #   _Out_ LPSERVICE_STATUS lpServiceStatus
    ]
    ControlService_Fn.restype = wintypes.BOOL
    bool = ControlService_Fn(
        service_handle,
        control,
        service_status
    )
    return bool
项目:driverlib    作者:sam-b    | 项目源码 | 文件源码
def open_sc_manager(machine_name, database_name, desired_access):
    """See: OpenSCManager function
    https://msdn.microsoft.com/en-us/library/windows/desktop/ms684323(v=vs.85).aspx
    """
    OpenSCManager_Fn = windll.Advapi32.OpenSCManagerA   #SC_HANDLE WINAPI OpenSCManager(
    OpenSCManager_Fn.argtypes = [                       #
        LPCTSTR,                                #   _In_opt_ LPCTSTR lpMachineName,
        LPCTSTR,                                #   _In_opt_ LPCTSTR lpDatabaseName,
        wintypes.DWORD                                  #   _In_     DWORD   dwDesiredAccess
    ]
    OpenSCManager_Fn.restype = wintypes.SC_HANDLE
    handle = OpenSCManager_Fn(
        machine_name,
        database_name,
        desired_access
    )
    return handle
项目:driverlib    作者:sam-b    | 项目源码 | 文件源码
def start_service(service_handle, service_arg_count, service_arg_vectors):
    """See: StartService function
    https://msdn.microsoft.com/en-us/library/windows/desktop/ms686321(v=vs.85).aspx
    """

    StartService_Fn = windll.Advapi32.StartServiceA #BOOL WINAPI StartService(
    StartService_Fn.argtypes = [                    #
        wintypes.SC_HANDLE,                         #   _In_     SC_HANDLE hService,
        wintypes.DWORD,                             #   _In_     DWORD     dwNumServiceArgs,
        LPCTSTR                         #   _In_opt_ LPCTSTR   *lpServiceArgVectors
    ]
    StartService_Fn.restype = wintypes.BOOL
    bool = StartService_Fn(
        service_handle,
        service_arg_count, 
        service_arg_vectors
    )
    return bool
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def _is_gui_available():
        UOI_FLAGS = 1
        WSF_VISIBLE = 0x0001
        class USEROBJECTFLAGS(ctypes.Structure):
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
                        ("fReserved", ctypes.wintypes.BOOL),
                        ("dwFlags", ctypes.wintypes.DWORD)]
        dll = ctypes.windll.user32
        h = dll.GetProcessWindowStation()
        if not h:
            raise ctypes.WinError()
        uof = USEROBJECTFLAGS()
        needed = ctypes.wintypes.DWORD()
        res = dll.GetUserObjectInformationW(h,
            UOI_FLAGS,
            ctypes.byref(uof),
            ctypes.sizeof(uof),
            ctypes.byref(needed))
        if not res:
            raise ctypes.WinError()
        return bool(uof.dwFlags & WSF_VISIBLE)
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def _is_gui_available():
        UOI_FLAGS = 1
        WSF_VISIBLE = 0x0001
        class USEROBJECTFLAGS(ctypes.Structure):
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
                        ("fReserved", ctypes.wintypes.BOOL),
                        ("dwFlags", ctypes.wintypes.DWORD)]
        dll = ctypes.windll.user32
        h = dll.GetProcessWindowStation()
        if not h:
            raise ctypes.WinError()
        uof = USEROBJECTFLAGS()
        needed = ctypes.wintypes.DWORD()
        res = dll.GetUserObjectInformationW(h,
            UOI_FLAGS,
            ctypes.byref(uof),
            ctypes.sizeof(uof),
            ctypes.byref(needed))
        if not res:
            raise ctypes.WinError()
        return bool(uof.dwFlags & WSF_VISIBLE)
项目:FightstickDisplay    作者:calexil    | 项目源码 | 文件源码
def _is_gui_available():
        UOI_FLAGS = 1
        WSF_VISIBLE = 0x0001
        class USEROBJECTFLAGS(ctypes.Structure):
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
                        ("fReserved", ctypes.wintypes.BOOL),
                        ("dwFlags", ctypes.wintypes.DWORD)]
        dll = ctypes.windll.user32
        h = dll.GetProcessWindowStation()
        if not h:
            raise ctypes.WinError()
        uof = USEROBJECTFLAGS()
        needed = ctypes.wintypes.DWORD()
        res = dll.GetUserObjectInformationW(h,
            UOI_FLAGS,
            ctypes.byref(uof),
            ctypes.sizeof(uof),
            ctypes.byref(needed))
        if not res:
            raise ctypes.WinError()
        return bool(uof.dwFlags & WSF_VISIBLE)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def get_dnsserver_list():
    if os.name == 'nt':
        import ctypes
        import ctypes.wintypes
        DNS_CONFIG_DNS_SERVER_LIST = 6
        buf = ctypes.create_string_buffer(2048)
        ctypes.windll.dnsapi.DnsQueryConfig(DNS_CONFIG_DNS_SERVER_LIST, 0, None, None, ctypes.byref(buf), ctypes.byref(ctypes.wintypes.DWORD(len(buf))))
        ipcount = struct.unpack('I', buf[0:4])[0]
        iplist = [socket.inet_ntoa(buf[i:i+4]) for i in xrange(4, ipcount*4+4, 4)]
        return iplist
    elif os.path.isfile('/etc/resolv.conf'):
        with open('/etc/resolv.conf', 'rb') as fp:
            return re.findall(r'(?m)^nameserver\s+(\S+)', fp.read())
    else:
        logging.warning("get_dnsserver_list failed: unsupport platform '%s-%s'", sys.platform, os.name)
        return []
项目:cryptogram    作者:xinmingzhang    | 项目源码 | 文件源码
def _is_gui_available():
        UOI_FLAGS = 1
        WSF_VISIBLE = 0x0001
        class USEROBJECTFLAGS(ctypes.Structure):
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
                        ("fReserved", ctypes.wintypes.BOOL),
                        ("dwFlags", ctypes.wintypes.DWORD)]
        dll = ctypes.windll.user32
        h = dll.GetProcessWindowStation()
        if not h:
            raise ctypes.WinError()
        uof = USEROBJECTFLAGS()
        needed = ctypes.wintypes.DWORD()
        res = dll.GetUserObjectInformationW(h,
            UOI_FLAGS,
            ctypes.byref(uof),
            ctypes.sizeof(uof),
            ctypes.byref(needed))
        if not res:
            raise ctypes.WinError()
        return bool(uof.dwFlags & WSF_VISIBLE)
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def _is_gui_available():
        UOI_FLAGS = 1
        WSF_VISIBLE = 0x0001
        class USEROBJECTFLAGS(ctypes.Structure):
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
                        ("fReserved", ctypes.wintypes.BOOL),
                        ("dwFlags", ctypes.wintypes.DWORD)]
        dll = ctypes.windll.user32
        h = dll.GetProcessWindowStation()
        if not h:
            raise ctypes.WinError()
        uof = USEROBJECTFLAGS()
        needed = ctypes.wintypes.DWORD()
        res = dll.GetUserObjectInformationW(h,
            UOI_FLAGS,
            ctypes.byref(uof),
            ctypes.sizeof(uof),
            ctypes.byref(needed))
        if not res:
            raise ctypes.WinError()
        return bool(uof.dwFlags & WSF_VISIBLE)
项目:core    作者:getavalon    | 项目源码 | 文件源码
def _create_windows(source, destination, link_type):
    """Creates hardlink at destination from source in Windows."""

    if link_type == HARDLINK:
        import ctypes
        from ctypes.wintypes import BOOL
        CreateHardLink = ctypes.windll.kernel32.CreateHardLinkW
        CreateHardLink.argtypes = [
            ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_void_p
        ]
        CreateHardLink.restype = BOOL

        res = CreateHardLink(destination, source, None)
        if res == 0:
            raise ctypes.WinError()
    else:
        raise NotImplementedError("Link type unrecognized.")
项目:win_driver_plugin    作者:mwrlabs    | 项目源码 | 文件源码
def open_device(self, access=GENERIC_READ | GENERIC_WRITE, mode=0, creation=OPEN_EXISTING, flags=FILE_ATTRIBUTE_NORMAL):
        """See: CreateFile function
        http://msdn.microsoft.com/en-us/library/windows/desktop/aa363858(v=vs.85).aspx
        """
        CreateFile_Fn = windll.kernel32.CreateFileA
        CreateFile_Fn.argtypes = [
                wintypes.LPCSTR,                    # _In_          LPCTSTR lpFileName
                wintypes.DWORD,                     # _In_          DWORD dwDesiredAccess
                wintypes.DWORD,                     # _In_          DWORD dwShareMode
                LPSECURITY_ATTRIBUTES,              # _In_opt_      LPSECURITY_ATTRIBUTES lpSecurityAttributes
                wintypes.DWORD,                     # _In_          DWORD dwCreationDisposition
                wintypes.DWORD,                     # _In_          DWORD dwFlagsAndAttributes
                wintypes.HANDLE]                    # _In_opt_      HANDLE hTemplateFile
        CreateFile_Fn.restype = wintypes.HANDLE


        self.handle = wintypes.HANDLE(CreateFile_Fn('\\\\.\\' + self.name,
                             access,
                             mode,
                             NULL,
                             creation,
                             flags,
                             NULL))
项目:win_driver_plugin    作者:mwrlabs    | 项目源码 | 文件源码
def open_service(service_manager_handle, service_name, desired_access):
    """ See: OpenService function
    https://msdn.microsoft.com/en-us/library/windows/desktop/ms684330(v=vs.85).aspx
    """
    OpenService_Fn = windll.Advapi32.OpenServiceA   #SC_HANDLE WINAPI OpenService(
    OpenService_Fn.argtypes = [                     #
        wintypes.HANDLE,                            #   _In_ SC_HANDLE hSCManager,
        LPCTSTR,                            #   _In_ LPCTSTR   lpServiceName,
        wintypes.DWORD                              #   _In_ DWORD     dwDesiredAccess
    ]
    OpenService_Fn.restype = wintypes.SC_HANDLE
    handle = OpenService_Fn(
        service_manager_handle,
        service_name,
        desired_access
    )
    return handle
项目:win_driver_plugin    作者:mwrlabs    | 项目源码 | 文件源码
def open_sc_manager(machine_name, database_name, desired_access):
    """See: OpenSCManager function
    https://msdn.microsoft.com/en-us/library/windows/desktop/ms684323(v=vs.85).aspx
    """
    OpenSCManager_Fn = windll.Advapi32.OpenSCManagerA   #SC_HANDLE WINAPI OpenSCManager(
    OpenSCManager_Fn.argtypes = [                       #
        LPCTSTR,                                #   _In_opt_ LPCTSTR lpMachineName,
        LPCTSTR,                                #   _In_opt_ LPCTSTR lpDatabaseName,
        wintypes.DWORD                                  #   _In_     DWORD   dwDesiredAccess
    ]
    OpenSCManager_Fn.restype = wintypes.SC_HANDLE
    handle = OpenSCManager_Fn(
        machine_name,
        database_name,
        desired_access
    )
    return handle
项目:win_driver_plugin    作者:mwrlabs    | 项目源码 | 文件源码
def start_service(service_handle, service_arg_count, service_arg_vectors):
    """See: StartService function
    https://msdn.microsoft.com/en-us/library/windows/desktop/ms686321(v=vs.85).aspx
    """

    StartService_Fn = windll.Advapi32.StartServiceA #BOOL WINAPI StartService(
    StartService_Fn.argtypes = [                    #
        wintypes.SC_HANDLE,                         #   _In_     SC_HANDLE hService,
        wintypes.DWORD,                             #   _In_     DWORD     dwNumServiceArgs,
        LPCTSTR                         #   _In_opt_ LPCTSTR   *lpServiceArgVectors
    ]
    StartService_Fn.restype = wintypes.BOOL
    bool = StartService_Fn(
        service_handle,
        service_arg_count, 
        service_arg_vectors
    )
    return bool
项目:UMOG    作者:hsab    | 项目源码 | 文件源码
def _is_gui_available():
        UOI_FLAGS = 1
        WSF_VISIBLE = 0x0001
        class USEROBJECTFLAGS(ctypes.Structure):
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
                        ("fReserved", ctypes.wintypes.BOOL),
                        ("dwFlags", ctypes.wintypes.DWORD)]
        dll = ctypes.windll.user32
        h = dll.GetProcessWindowStation()
        if not h:
            raise ctypes.WinError()
        uof = USEROBJECTFLAGS()
        needed = ctypes.wintypes.DWORD()
        res = dll.GetUserObjectInformationW(h,
            UOI_FLAGS,
            ctypes.byref(uof),
            ctypes.sizeof(uof),
            ctypes.byref(needed))
        if not res:
            raise ctypes.WinError()
        return bool(uof.dwFlags & WSF_VISIBLE)
项目:blackmamba    作者:zrzka    | 项目源码 | 文件源码
def _is_gui_available():
        UOI_FLAGS = 1
        WSF_VISIBLE = 0x0001
        class USEROBJECTFLAGS(ctypes.Structure):
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
                        ("fReserved", ctypes.wintypes.BOOL),
                        ("dwFlags", ctypes.wintypes.DWORD)]
        dll = ctypes.windll.user32
        h = dll.GetProcessWindowStation()
        if not h:
            raise ctypes.WinError()
        uof = USEROBJECTFLAGS()
        needed = ctypes.wintypes.DWORD()
        res = dll.GetUserObjectInformationW(h,
            UOI_FLAGS,
            ctypes.byref(uof),
            ctypes.sizeof(uof),
            ctypes.byref(needed))
        if not res:
            raise ctypes.WinError()
        return bool(uof.dwFlags & WSF_VISIBLE)
项目:collection    作者:skywind3000    | 项目源码 | 文件源码
def __init__ (self):
        import ctypes.wintypes
        if sys.platform[:3] != 'win':
            raise SystemError('Windows is required')
        self.__winmm = ctypes.windll.LoadLibrary('winmm.dll')
        self.__mciSendStringW = self.__winmm.mciSendStringW
        self.__mciGetErrorStringW = self.__winmm.mciGetErrorStringW
        wintypes = ctypes.wintypes
        LPCWSTR, HANDLE = wintypes.LPCWSTR, wintypes.HANDLE
        args = [LPCWSTR, ctypes.c_char_p, wintypes.UINT, HANDLE]
        self.__mciSendStringW.argtypes = args
        self.__mciSendStringW.restype = wintypes.DWORD
        args = [wintypes.DWORD, ctypes.c_void_p, wintypes.UINT]
        self.__mciGetErrorStringW.argtypes = args
        self.__mciGetErrorStringW.restype = wintypes.BOOL
        self.__buffer = ctypes.create_string_buffer('?' * 4098)
        self.__alias_index = 0
项目:beepboop    作者:nicolehe    | 项目源码 | 文件源码
def _is_gui_available():
        UOI_FLAGS = 1
        WSF_VISIBLE = 0x0001
        class USEROBJECTFLAGS(ctypes.Structure):
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
                        ("fReserved", ctypes.wintypes.BOOL),
                        ("dwFlags", ctypes.wintypes.DWORD)]
        dll = ctypes.windll.user32
        h = dll.GetProcessWindowStation()
        if not h:
            raise ctypes.WinError()
        uof = USEROBJECTFLAGS()
        needed = ctypes.wintypes.DWORD()
        res = dll.GetUserObjectInformationW(h,
            UOI_FLAGS,
            ctypes.byref(uof),
            ctypes.sizeof(uof),
            ctypes.byref(needed))
        if not res:
            raise ctypes.WinError()
        return bool(uof.dwFlags & WSF_VISIBLE)
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def QueryValueEx(key, value_name):
    """This calls the Windows QueryValueEx function in a Unicode safe way."""
    size = 256
    data_type = ctypes.wintypes.DWORD()
    while True:
        tmp_size = ctypes.wintypes.DWORD(size)
        buf = ctypes.create_string_buffer(size)
        rc = RegQueryValueEx(key.handle, value_name, LPDWORD(),
                             ctypes.byref(data_type), ctypes.cast(buf, LPBYTE),
                             ctypes.byref(tmp_size))
        if rc != ERROR_MORE_DATA:
            break

        # We limit the size here to ~10 MB so the response doesn't get too big.
        if size > 10 * 1024 * 1024:
            raise exceptions.WindowsError("Value too big to be read.")

        size *= 2

    if rc != ERROR_SUCCESS:
        raise ctypes.WinError(2)

    return (Reg2Py(buf, tmp_size.value, data_type.value), data_type.value)
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def get_dnsserver_list():
    if os.name == 'nt':
        import ctypes
        import ctypes.wintypes
        DNS_CONFIG_DNS_SERVER_LIST = 6
        buf = ctypes.create_string_buffer(2048)
        ctypes.windll.dnsapi.DnsQueryConfig(DNS_CONFIG_DNS_SERVER_LIST, 0, None, None, ctypes.byref(buf), ctypes.byref(ctypes.wintypes.DWORD(len(buf))))
        ipcount = struct.unpack('I', buf[0:4])[0]
        iplist = [socket.inet_ntoa(buf[i:i+4]) for i in xrange(4, ipcount*4+4, 4)]
        return iplist
    elif os.path.isfile('/etc/resolv.conf'):
        with open('/etc/resolv.conf', 'rb') as fp:
            return re.findall(r'(?m)^nameserver\s+(\S+)', fp.read())
    else:
        logging.warning("get_dnsserver_list failed: unsupport platform '%s-%s'", sys.platform, os.name)
        return []
项目:Proxy-Factory    作者:ping99    | 项目源码 | 文件源码
def remove_ca(self, name):
        import ctypes
        import ctypes.wintypes
        class CERT_CONTEXT(ctypes.Structure):
            _fields_ = [
                ('dwCertEncodingType', ctypes.wintypes.DWORD),
                ('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
                ('cbCertEncoded', ctypes.wintypes.DWORD),
                ('pCertInfo', ctypes.c_void_p),
                ('hCertStore', ctypes.c_void_p),]
        crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode())
        store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode())
        pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None)
        while pCertCtx:
            certCtx = CERT_CONTEXT.from_address(pCertCtx)
            certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded)
            cert =  OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certdata)
            if hasattr(cert, 'get_subject'):
                cert = cert.get_subject()
            cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '')
            if cert_name and name.lower() == cert_name.split()[0].lower():
                crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx))
            pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx)
        return 0
项目:Proxy-Factory    作者:ping99    | 项目源码 | 文件源码
def get_dnsserver_list():
    if os.name == 'nt':
        import ctypes
        import ctypes.wintypes
        DNS_CONFIG_DNS_SERVER_LIST = 6
        buf = ctypes.create_string_buffer(2048)
        ctypes.windll.dnsapi.DnsQueryConfig(DNS_CONFIG_DNS_SERVER_LIST, 0, None, None, ctypes.byref(buf), ctypes.byref(ctypes.wintypes.DWORD(len(buf))))
        ipcount = struct.unpack('I', buf[0:4])[0]
        iplist = [socket.inet_ntoa(buf[i:i+4]) for i in xrange(4, ipcount*4+4, 4)]
        return iplist
    elif os.path.isfile('/etc/resolv.conf'):
        with open('/etc/resolv.conf', 'rb') as fp:
            return re.findall(r'(?m)^nameserver\s+(\S+)', fp.read())
    else:
        logging.warning("get_dnsserver_list failed: unsupport platform '%s-%s'", sys.platform, os.name)
        return []
项目:Proxy-Factory    作者:ping99    | 项目源码 | 文件源码
def remove_ca(self, name):
        import ctypes
        import ctypes.wintypes
        class CERT_CONTEXT(ctypes.Structure):
            _fields_ = [
                ('dwCertEncodingType', ctypes.wintypes.DWORD),
                ('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
                ('cbCertEncoded', ctypes.wintypes.DWORD),
                ('pCertInfo', ctypes.c_void_p),
                ('hCertStore', ctypes.c_void_p),]
        crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode())
        store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode())
        pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None)
        while pCertCtx:
            certCtx = CERT_CONTEXT.from_address(pCertCtx)
            certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded)
            cert =  OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certdata)
            if hasattr(cert, 'get_subject'):
                cert = cert.get_subject()
            cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '')
            if cert_name and name.lower() == cert_name.split()[0].lower():
                crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx))
            pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx)
        return 0
项目:Proxy-Factory    作者:ping99    | 项目源码 | 文件源码
def get_dnsserver_list():
    if os.name == 'nt':
        import ctypes
        import ctypes.wintypes
        DNS_CONFIG_DNS_SERVER_LIST = 6
        buf = ctypes.create_string_buffer(2048)
        ctypes.windll.dnsapi.DnsQueryConfig(DNS_CONFIG_DNS_SERVER_LIST, 0, None, None, ctypes.byref(buf), ctypes.byref(ctypes.wintypes.DWORD(len(buf))))
        ipcount = struct.unpack('I', buf[0:4])[0]
        iplist = [socket.inet_ntoa(buf[i:i+4]) for i in xrange(4, ipcount*4+4, 4)]
        return iplist
    elif os.path.isfile('/etc/resolv.conf'):
        with open('/etc/resolv.conf', 'rb') as fp:
            return re.findall(r'(?m)^nameserver\s+(\S+)', fp.read())
    else:
        logging.warning("get_dnsserver_list failed: unsupport platform '%s-%s'", sys.platform, os.name)
        return []
项目:GotoX    作者:SeaHOH    | 项目源码 | 文件源码
def remove_cert(name):
    if os.name == 'nt':
        import ctypes, ctypes.wintypes
        class CERT_CONTEXT(ctypes.Structure):
            _fields_ = [
                ('dwCertEncodingType', ctypes.wintypes.DWORD),
                ('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
                ('cbCertEncoded', ctypes.wintypes.DWORD),
                ('pCertInfo', ctypes.c_void_p),
                ('hCertStore', ctypes.c_void_p),]
        crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode())
        store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode())
        pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None)
        while pCertCtx:
            certCtx = CERT_CONTEXT.from_address(pCertCtx)
            certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded)
            cert =  crypto.load_certificate(crypto.FILETYPE_ASN1, certdata)
            if hasattr(cert, 'get_subject'):
                cert = cert.get_subject()
            cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '')
            if cert_name and name == cert_name:
                crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx))
            pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx)
        return 0
    return -1
项目:jaraco.windows    作者:jaraco    | 项目源码 | 文件源码
def notify(class_):
        """
        Notify other windows that the environment has changed (following
        http://support.microsoft.com/kb/104011).
        """
        # TODO: Implement Microsoft UIPI (User Interface Privilege Isolation) to
        #  elevate privilege to system level so the system gets this notification
        # for now, this must be run as admin to work as expected
        return_val = ctypes.wintypes.DWORD()
        res = message.SendMessageTimeout(
            message.HWND_BROADCAST,
            message.WM_SETTINGCHANGE,
            0, # wparam must be null
            'Environment',
            message.SMTO_ABORTIFHUNG,
            5000, # timeout in ms
            return_val,
        )
        error.handle_nonzero_success(res)
项目:hackathon    作者:vertica    | 项目源码 | 文件源码
def _is_gui_available():
        UOI_FLAGS = 1
        WSF_VISIBLE = 0x0001
        class USEROBJECTFLAGS(ctypes.Structure):
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
                        ("fReserved", ctypes.wintypes.BOOL),
                        ("dwFlags", ctypes.wintypes.DWORD)]
        dll = ctypes.windll.user32
        h = dll.GetProcessWindowStation()
        if not h:
            raise ctypes.WinError()
        uof = USEROBJECTFLAGS()
        needed = ctypes.wintypes.DWORD()
        res = dll.GetUserObjectInformationW(h,
            UOI_FLAGS,
            ctypes.byref(uof),
            ctypes.sizeof(uof),
            ctypes.byref(needed))
        if not res:
            raise ctypes.WinError()
        return bool(uof.dwFlags & WSF_VISIBLE)
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def _is_gui_available():
        UOI_FLAGS = 1
        WSF_VISIBLE = 0x0001
        class USEROBJECTFLAGS(ctypes.Structure):
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
                        ("fReserved", ctypes.wintypes.BOOL),
                        ("dwFlags", ctypes.wintypes.DWORD)]
        dll = ctypes.windll.user32
        h = dll.GetProcessWindowStation()
        if not h:
            raise ctypes.WinError()
        uof = USEROBJECTFLAGS()
        needed = ctypes.wintypes.DWORD()
        res = dll.GetUserObjectInformationW(h,
            UOI_FLAGS,
            ctypes.byref(uof),
            ctypes.sizeof(uof),
            ctypes.byref(needed))
        if not res:
            raise ctypes.WinError()
        return bool(uof.dwFlags & WSF_VISIBLE)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def isfile_cached(self):
        # optimize for nt.stat calls, assuming there are many files for few folders
        try:
            cache = self.__class__.cache_isfile_cache
        except AttributeError:
            cache = self.__class__.cache_isfile_cache = {}

        try:
            c1 = cache[id(self.parent)]
        except KeyError:
            c1 = cache[id(self.parent)] = []

            curpath = self.parent.abspath()
            findData = ctypes.wintypes.WIN32_FIND_DATAW()
            find     = FindFirstFile(TP % curpath, ctypes.byref(findData))

            if find == INVALID_HANDLE_VALUE:
                Logs.error("invalid win32 handle isfile_cached %r" % self.abspath())
                return os.path.isfile(self.abspath())

            try:
                while True:
                    if findData.cFileName not in UPPER_FOLDERS:
                        thatsadir = findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY
                        if not thatsadir:
                            c1.append(str(findData.cFileName))
                    if not FindNextFile(find, ctypes.byref(findData)):
                        break
            except Exception as e:
                Logs.error('exception while listing a folder %r %r' % (self.abspath(), e))
                return os.path.isfile(self.abspath())
            finally:
                FindClose(find)
        return self.name in c1
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def isfile_cached(self):
        # optimize for nt.stat calls, assuming there are many files for few folders
        try:
            cache = self.__class__.cache_isfile_cache
        except AttributeError:
            cache = self.__class__.cache_isfile_cache = {}

        try:
            c1 = cache[id(self.parent)]
        except KeyError:
            c1 = cache[id(self.parent)] = []

            curpath = self.parent.abspath()
            findData = ctypes.wintypes.WIN32_FIND_DATAW()
            find     = FindFirstFile(TP % curpath, ctypes.byref(findData))

            if find == INVALID_HANDLE_VALUE:
                Logs.error("invalid win32 handle isfile_cached %r" % self.abspath())
                return os.path.isfile(self.abspath())

            try:
                while True:
                    if findData.cFileName not in UPPER_FOLDERS:
                        thatsadir = findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY
                        if not thatsadir:
                            c1.append(str(findData.cFileName))
                    if not FindNextFile(find, ctypes.byref(findData)):
                        break
            except Exception as e:
                Logs.error('exception while listing a folder %r %r' % (self.abspath(), e))
                return os.path.isfile(self.abspath())
            finally:
                FindClose(find)
        return self.name in c1
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def getPreferencesDir():
    buf = ctypes.create_unicode_buffer( ctypes.wintypes.MAX_PATH )
    ctypes.windll.shell32.SHGetFolderPathW( 0, CSIDL_APPDATA, 0, SHGFP_TYPE_CURRENT, buf )

    return pathlib.Path( buf.value )
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def getWindowsDir():
    buf = ctypes.create_unicode_buffer( ctypes.wintypes.MAX_PATH )
    ctypes.windll.shell32.SHGetFolderPathW( 0, CSIDL_WINDOWS, 0, SHGFP_TYPE_CURRENT, buf )

    return pathlib.Path( buf.value )
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def getProgramFilesDir():
    buf = ctypes.create_unicode_buffer( ctypes.wintypes.MAX_PATH )
    ctypes.windll.shell32.SHGetFolderPathW( 0, CSIDL_PROGRAM_FILES, 0, SHGFP_TYPE_CURRENT, buf )

    return pathlib.Path( buf.value )
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def format_system_message(errno):
    """
    Call FormatMessage with a system error number to retrieve
    the descriptive error message.
    """
    # first some flags used by FormatMessageW
    ALLOCATE_BUFFER = 0x100
    ARGUMENT_ARRAY = 0x2000
    FROM_HMODULE = 0x800
    FROM_STRING = 0x400
    FROM_SYSTEM = 0x1000
    IGNORE_INSERTS = 0x200

    # Let FormatMessageW allocate the buffer (we'll free it below)
    # Also, let it know we want a system error message.
    flags = ALLOCATE_BUFFER | FROM_SYSTEM
    source = None
    message_id = errno
    language_id = 0
    result_buffer = ctypes.wintypes.LPWSTR()
    buffer_size = 0
    arguments = None
    bytes = ctypes.windll.kernel32.FormatMessageW(
        flags,
        source,
        message_id,
        language_id,
        ctypes.byref(result_buffer),
        buffer_size,
        arguments,
        )
    # note the following will cause an infinite loop if GetLastError
    #  repeatedly returns an error that cannot be formatted, although
    #  this should not happen.
    handle_nonzero_success(bytes)
    message = result_buffer.value
    ctypes.windll.kernel32.LocalFree(result_buffer)
    return message
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def GetTokenInformation(token, information_class):
    """
    Given a token, get the token information for it.
    """
    data_size = ctypes.wintypes.DWORD()
    ctypes.windll.advapi32.GetTokenInformation(token, information_class.num,
        0, 0, ctypes.byref(data_size))
    data = ctypes.create_string_buffer(data_size.value)
    handle_nonzero_success(ctypes.windll.advapi32.GetTokenInformation(token,
        information_class.num,
        ctypes.byref(data), ctypes.sizeof(data),
        ctypes.byref(data_size)))
    return ctypes.cast(data, ctypes.POINTER(TOKEN_USER)).contents
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def remove_windows_ca(name):
        import ctypes
        import ctypes.wintypes
        class CERT_CONTEXT(ctypes.Structure):
            _fields_ = [
                ('dwCertEncodingType', ctypes.wintypes.DWORD),
                ('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
                ('cbCertEncoded', ctypes.wintypes.DWORD),
                ('pCertInfo', ctypes.c_void_p),
                ('hCertStore', ctypes.c_void_p),]
        try:
            crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode())
            store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode())
            pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None)
            while pCertCtx:
                certCtx = CERT_CONTEXT.from_address(pCertCtx)
                certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded)
                cert =  OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certdata)
                if hasattr(cert, 'get_subject'):
                    cert = cert.get_subject()
                cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '')
                if cert_name and name == cert_name:
                    crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx))
                pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx)
        except Exception as e:
            logging.warning('CertUtil.remove_windows_ca failed: %r', e)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def remove_windows_ca(name):
        import ctypes
        import ctypes.wintypes
        class CERT_CONTEXT(ctypes.Structure):
            _fields_ = [
                ('dwCertEncodingType', ctypes.wintypes.DWORD),
                ('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
                ('cbCertEncoded', ctypes.wintypes.DWORD),
                ('pCertInfo', ctypes.c_void_p),
                ('hCertStore', ctypes.c_void_p),]
        try:
            crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode())
            store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode())
            pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None)
            while pCertCtx:
                certCtx = CERT_CONTEXT.from_address(pCertCtx)
                certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded)
                cert =  OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certdata)
                if hasattr(cert, 'get_subject'):
                    cert = cert.get_subject()
                cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '')
                if cert_name and name == cert_name:
                    crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx))
                pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx)
        except Exception as e:
            xlog.warning('CertUtil.remove_windows_ca failed: %r', e)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def remove_windows_ca(name):
        import ctypes
        import ctypes.wintypes
        class CERT_CONTEXT(ctypes.Structure):
            _fields_ = [
                ('dwCertEncodingType', ctypes.wintypes.DWORD),
                ('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
                ('cbCertEncoded', ctypes.wintypes.DWORD),
                ('pCertInfo', ctypes.c_void_p),
                ('hCertStore', ctypes.c_void_p),]
        try:
            crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode())
            store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode())
            pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None)
            while pCertCtx:
                certCtx = CERT_CONTEXT.from_address(pCertCtx)
                certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded)
                cert =  OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certdata)
                if hasattr(cert, 'get_subject'):
                    cert = cert.get_subject()
                cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '')
                if cert_name and name == cert_name:
                    crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx))
                pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx)
        except Exception as e:
            xlog.warning('CertUtil.remove_windows_ca failed: %r', e)
项目:grid    作者:russelg    | 项目源码 | 文件源码
def to_rect(coords):
    """Converts a set of coordinates to a windows RECT

    Converts the form [x1, y1, x2, y2] to a windows RECT structure.

    Args:
        coords: List of coordinates in the form [x1, y1, x2, y2].

    Returns:
        A windows RECT structure.
    """
    rect = ctypes.wintypes.RECT(*coords)
    return rect
项目:driverlib    作者:sam-b    | 项目源码 | 文件源码
def CreateNamedPipe(name, open_mode, pipe_mode, max_instances, out_buffer_size, in_buffer_size, default_time_out, security_attributes):
    """See: CreateNamedPipe function 
       https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx
    """
    CreateNamedPipe_Fn = windll.kernel32.CreateNamedPipe
    CreateNamedPipe_Fn.argtypes = [
        wintypes.LPCSTR,        #LPCTSTR               lpName,
        wintypes.DWORD,         #_In_     DWORD                 dwOpenMode,
        wintypes.DWORD,         #_In_     DWORD                 dwPipeMode,
        wintypes.DWORD,         #_In_     DWORD                 nMaxInstances,
        wintypes.DWORD,         #_In_     DWORD                 nOutBufferSize,
        wintypes.DWORD,         #_In_     DWORD                 nInBufferSize,
        wintypes.DWORD,         #_In_     DWORD                 nDefaultTimeOut,
        LPSECURITY_ATTRIBUTES   #_In_opt_ LPSECURITY_ATTRIBUTES lpSecurityAttributes
    ]
    CreateNamedPipe_Fn.restype = wintypes.HANDLE
    handle = wintypes.HANDLE(CreateNamedPipe_Fn(
        name, 
        open_mode, 
        pipe_mode, 
        max_instances, 
        out_buffer_size, 
        in_buffer_size, 
        default_time_out, 
        security_attributes
    ))
    return handle
项目:driverlib    作者:sam-b    | 项目源码 | 文件源码
def ConnectNamedPipe(named_pipe, overlapped):   
    """See: ConnectNamedPipe function 
       https://msdn.microsoft.com/en-us/library/windows/desktop/aa365146(v=vs.85).aspx
    """
    ConnectNamedPipe_Fn = windll.kernel32.ConnectNamedPipe
    ConnectNamedPipe_Fn.argtypes = [
        wintypes.HANDLE,    # _In_        HANDLE       hNamedPipe,
        LPOVERLAPPED        # _Inout_opt_ LPOVERLAPPED lpOverlapped
    ]
    ConnectNamedPipe_Fn.restype = wintypes.BOOL
    ret = wintypes.BOOL(ConnectNamedPipe_Fn(
        named_pipe,
        overlapped
    ))
    return ret
项目:driverlib    作者:sam-b    | 项目源码 | 文件源码
def ReadFile(file, buffer, number_of_bytes_to_read, number_of_bytes_read, overlapped):
    """See: ReadFile function
       https://msdn.microsoft.com/en-us/library/windows/desktop/aa365467(v=vs.85).aspx
    """
    ReadFile_Fn = windll.kernel32.ReadFile
    ReadFile_Fn.argtypes = [
    wintypes.HANDLE,    # _In_        HANDLE       hFile,
    LPVOID,             # _Out_       LPVOID       lpBuffer,
    wintypes.DWORD,     # _In_        DWORD        nNumberOfBytesToRead,
    LPDWORD,            # _Out_opt_   LPDWORD      lpNumberOfBytesRead,
    LPOVERLAPPED        # _Inout_opt_ LPOVERLAPPED lpOverlapped
    ]
    ReadFile_Fn.restype = wintypes.BOOL

    ret = wintypes.BOOL(ReadFile_Fn(
        file, 
        buffer, 
        number_of_bytes_to_read, 
        number_of_bytes_read, 
        overlapped
    ))