Python win32file 模块,OPEN_EXISTING 实例源码

我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用win32file.OPEN_EXISTING

项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def testSimpleFiles(self):
        fd, filename = tempfile.mkstemp()
        os.close(fd)
        os.unlink(filename)
        handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None)
        test_data = str2bytes("Hello\0there")
        try:
            win32file.WriteFile(handle, test_data)
            handle.Close()
            # Try and open for read
            handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
            rc, data = win32file.ReadFile(handle, 1024)
            self.assertEquals(data, test_data)
        finally:
            handle.Close()
            try:
                os.unlink(filename)
            except os.error:
                pass

    # A simple test using normal read/write operations.
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def setUp(self):
        self.watcher_threads = []
        self.watcher_thread_changes = []
        self.dir_names = []
        self.dir_handles = []
        for i in range(self.num_test_dirs):
            td = tempfile.mktemp("-test-directory-changes-%d" % i)
            os.mkdir(td)
            self.dir_names.append(td)
            hdir = win32file.CreateFile(td, 
                                        ntsecuritycon.FILE_LIST_DIRECTORY,
                                        win32con.FILE_SHARE_READ,
                                        None, # security desc
                                        win32con.OPEN_EXISTING,
                                        win32con.FILE_FLAG_BACKUP_SEMANTICS |
                                        win32con.FILE_FLAG_OVERLAPPED,
                                        None)
            self.dir_handles.append(hdir)

            changes = []
            t = threading.Thread(target=self._watcherThreadOverlapped,
                                 args=(td, hdir, changes))
            t.start()
            self.watcher_threads.append(t)
            self.watcher_thread_changes.append(changes)
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def testSimpleFiles(self):
        fd, filename = tempfile.mkstemp()
        os.close(fd)
        os.unlink(filename)
        handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None)
        test_data = str2bytes("Hello\0there")
        try:
            win32file.WriteFile(handle, test_data)
            handle.Close()
            # Try and open for read
            handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
            rc, data = win32file.ReadFile(handle, 1024)
            self.assertEquals(data, test_data)
        finally:
            handle.Close()
            try:
                os.unlink(filename)
            except os.error:
                pass

    # A simple test using normal read/write operations.
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def setUp(self):
        self.watcher_threads = []
        self.watcher_thread_changes = []
        self.dir_names = []
        self.dir_handles = []
        for i in range(self.num_test_dirs):
            td = tempfile.mktemp("-test-directory-changes-%d" % i)
            os.mkdir(td)
            self.dir_names.append(td)
            hdir = win32file.CreateFile(td, 
                                        ntsecuritycon.FILE_LIST_DIRECTORY,
                                        win32con.FILE_SHARE_READ,
                                        None, # security desc
                                        win32con.OPEN_EXISTING,
                                        win32con.FILE_FLAG_BACKUP_SEMANTICS |
                                        win32con.FILE_FLAG_OVERLAPPED,
                                        None)
            self.dir_handles.append(hdir)

            changes = []
            t = threading.Thread(target=self._watcherThreadOverlapped,
                                 args=(td, hdir, changes))
            t.start()
            self.watcher_threads.append(t)
            self.watcher_thread_changes.append(changes)
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def _OpenFileForRead(self, path):
        try:
            fhandle = self.fhandle = win32file.CreateFile(
                path,
                win32file.GENERIC_READ,
                win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE,
                None,
                win32file.OPEN_EXISTING,
                win32file.FILE_ATTRIBUTE_NORMAL,
                None)

            self._closer = weakref.ref(
                self, lambda x: win32file.CloseHandle(fhandle))

            self.write_enabled = False
            return fhandle

        except pywintypes.error as e:
            raise IOError("Unable to open %s: %s" % (path, e))
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def _OpenFileForWrite(self, path):
        try:
            fhandle = self.fhandle = win32file.CreateFile(
                path,
                win32file.GENERIC_READ | win32file.GENERIC_WRITE,
                win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE,
                None,
                win32file.OPEN_EXISTING,
                win32file.FILE_ATTRIBUTE_NORMAL,
                None)
            self.write_enabled = True
            self._closer = weakref.ref(
                self, lambda x: win32file.CloseHandle(fhandle))

            return fhandle

        except pywintypes.error as e:
            raise IOError("Unable to open %s: %s" % (path, e))
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def pipe(bufsize=8192):
        """Creates overlapped (asynchronous) pipe.
        """
        name = r'\\.\pipe\pycos-pipe-%d-%d' % (os.getpid(), next(_pipe_id))
        openmode = (win32pipe.PIPE_ACCESS_INBOUND | win32file.FILE_FLAG_OVERLAPPED |
                    FILE_FLAG_FIRST_PIPE_INSTANCE)
        pipemode = (win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE)
        rh = wh = None
        try:
            rh = win32pipe.CreateNamedPipe(
                name, openmode, pipemode, 1, bufsize, bufsize,
                win32pipe.NMPWAIT_USE_DEFAULT_WAIT, None)

            wh = win32file.CreateFile(
                name, win32file.GENERIC_WRITE | winnt.FILE_READ_ATTRIBUTES, 0, None,
                win32file.OPEN_EXISTING, win32file.FILE_FLAG_OVERLAPPED, None)

            overlapped = pywintypes.OVERLAPPED()
            # 'yield' can't be used in constructor so use sync wait
            # (in this case it is should be okay)
            overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
            rc = win32pipe.ConnectNamedPipe(rh, overlapped)
            if rc == winerror.ERROR_PIPE_CONNECTED:
                win32event.SetEvent(overlapped.hEvent)
            rc = win32event.WaitForSingleObject(overlapped.hEvent, 1000)
            overlapped = None
            if rc != win32event.WAIT_OBJECT_0:
                pycos.logger.warning('connect failed: %s' % rc)
                raise Exception(rc)
            return (rh, wh)
        except:
            if rh is not None:
                win32file.CloseHandle(rh)
            if wh is not None:
                win32file.CloseHandle(wh)
            raise
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def pipe(bufsize=8192):
        """Creates overlapped (asynchronous) pipe.
        """
        name = r'\\.\pipe\pycos-pipe-%d-%d' % (os.getpid(), next(_pipe_id))
        openmode = (win32pipe.PIPE_ACCESS_INBOUND | win32file.FILE_FLAG_OVERLAPPED |
                    FILE_FLAG_FIRST_PIPE_INSTANCE)
        pipemode = (win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE)
        rh = wh = None
        try:
            rh = win32pipe.CreateNamedPipe(
                name, openmode, pipemode, 1, bufsize, bufsize,
                win32pipe.NMPWAIT_USE_DEFAULT_WAIT, None)

            wh = win32file.CreateFile(
                name, win32file.GENERIC_WRITE | winnt.FILE_READ_ATTRIBUTES, 0, None,
                win32file.OPEN_EXISTING, win32file.FILE_FLAG_OVERLAPPED, None)

            overlapped = pywintypes.OVERLAPPED()
            # 'yield' can't be used in constructor so use sync wait
            # (in this case it is should be okay)
            overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
            rc = win32pipe.ConnectNamedPipe(rh, overlapped)
            if rc == winerror.ERROR_PIPE_CONNECTED:
                win32event.SetEvent(overlapped.hEvent)
            rc = win32event.WaitForSingleObject(overlapped.hEvent, 1000)
            overlapped = None
            if rc != win32event.WAIT_OBJECT_0:
                pycos.logger.warning('connect failed: %s' % rc)
                raise Exception(rc)
            return (rh, wh)
        except:
            if rh is not None:
                win32file.CloseHandle(rh)
            if wh is not None:
                win32file.CloseHandle(wh)
            raise
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def opencom1():
    # returns a handle to the Windows file
    return win32file.CreateFile(u'COM1:', win32file.GENERIC_READ, \
                                0, None, win32file.OPEN_EXISTING, 0, None)
项目:cmdchallenge-site    作者:jarv    | 项目源码 | 文件源码
def connect(self, address):
        win32pipe.WaitNamedPipe(address, self._timeout)
        try:
            handle = win32file.CreateFile(
                address,
                win32file.GENERIC_READ | win32file.GENERIC_WRITE,
                0,
                None,
                win32file.OPEN_EXISTING,
                cSECURITY_ANONYMOUS | cSECURITY_SQOS_PRESENT,
                0
            )
        except win32pipe.error as e:
            # See Remarks:
            # https://msdn.microsoft.com/en-us/library/aa365800.aspx
            if e.winerror == cERROR_PIPE_BUSY:
                # Another program or thread has grabbed our pipe instance
                # before we got to it. Wait for availability and attempt to
                # connect again.
                win32pipe.WaitNamedPipe(address, RETRY_WAIT_TIMEOUT)
                return self.connect(address)
            raise e

        self.flags = win32pipe.GetNamedPipeInfo(handle)[0]

        self._handle = handle
        self._address = address
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def testSimpleOverlapped(self):
        # Create a file in the %TEMP% directory.
        import win32event
        testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
        desiredAccess = win32file.GENERIC_WRITE
        overlapped = pywintypes.OVERLAPPED()
        evt = win32event.CreateEvent(None, 0, 0, None)
        overlapped.hEvent = evt
        # Create the file and write shit-loads of data to it.
        h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.CREATE_ALWAYS, 0, 0)
        chunk_data = str2bytes("z") * 0x8000
        num_loops = 512
        expected_size = num_loops * len(chunk_data)
        for i in range(num_loops):
            win32file.WriteFile(h, chunk_data, overlapped)
            win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
            overlapped.Offset = overlapped.Offset + len(chunk_data)
        h.Close()
        # Now read the data back overlapped
        overlapped = pywintypes.OVERLAPPED()
        evt = win32event.CreateEvent(None, 0, 0, None)
        overlapped.hEvent = evt
        desiredAccess = win32file.GENERIC_READ
        h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.OPEN_EXISTING, 0, 0)
        buffer = win32file.AllocateReadBuffer(0xFFFF)
        while 1:
            try:
                hr, data = win32file.ReadFile(h, buffer, overlapped)
                win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
                overlapped.Offset = overlapped.Offset + len(data)
                if not data is buffer:
                    self.fail("Unexpected result from ReadFile - should be the same buffer we passed it")
            except win32api.error:
                break
        h.Close()
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def testSimpleOverlapped(self):
        # Create a file in the %TEMP% directory.
        import win32event
        testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
        desiredAccess = win32file.GENERIC_WRITE
        overlapped = pywintypes.OVERLAPPED()
        evt = win32event.CreateEvent(None, 0, 0, None)
        overlapped.hEvent = evt
        # Create the file and write shit-loads of data to it.
        h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.CREATE_ALWAYS, 0, 0)
        chunk_data = str2bytes("z") * 0x8000
        num_loops = 512
        expected_size = num_loops * len(chunk_data)
        for i in range(num_loops):
            win32file.WriteFile(h, chunk_data, overlapped)
            win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
            overlapped.Offset = overlapped.Offset + len(chunk_data)
        h.Close()
        # Now read the data back overlapped
        overlapped = pywintypes.OVERLAPPED()
        evt = win32event.CreateEvent(None, 0, 0, None)
        overlapped.hEvent = evt
        desiredAccess = win32file.GENERIC_READ
        h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.OPEN_EXISTING, 0, 0)
        buffer = win32file.AllocateReadBuffer(0xFFFF)
        while 1:
            try:
                hr, data = win32file.ReadFile(h, buffer, overlapped)
                win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
                overlapped.Offset = overlapped.Offset + len(data)
                if not data is buffer:
                    self.fail("Unexpected result from ReadFile - should be the same buffer we passed it")
            except win32api.error:
                break
        h.Close()
项目:viewvc    作者:viewvc    | 项目源码 | 文件源码
def NullFile(inheritable):
  """Create a null file handle."""
  if inheritable:
    sa = pywintypes.SECURITY_ATTRIBUTES()
    sa.bInheritHandle = 1
  else:
    sa = None

  return win32file.CreateFile("nul",
    win32file.GENERIC_READ | win32file.GENERIC_WRITE,
    win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE,
    sa, win32file.OPEN_EXISTING, 0, None)
项目:pygcam    作者:JGCRI    | 项目源码 | 文件源码
def get_read_handle(filename):
        if os.path.isdir(filename):
            dwFlagsAndAttributes = win32file.FILE_FLAG_BACKUP_SEMANTICS
        else:
            dwFlagsAndAttributes = 0

        # CreateFile(fileName, desiredAccess, shareMode, attributes,
        #            CreationDisposition, flagsAndAttributes, hTemplateFile)
        try:
            handle = win32file.CreateFileW(filename,  # with 'W' accepts unicode
                         win32file.GENERIC_READ, win32file.FILE_SHARE_READ, None,
                         win32file.OPEN_EXISTING, dwFlagsAndAttributes, None)
            return handle
        except Exception as e:
            raise PygcamException("get_read_handle(%s) failed: %s" % (filename, e))
项目:pygcam    作者:JGCRI    | 项目源码 | 文件源码
def readlinkWindows(path):
        reparse_point_handle = CreateFileW(path,
                                           0,
                                           0,
                                           None,
                                           OPEN_EXISTING,
                                           FILE_FLAG_OPEN_REPARSE_POINT |
                                           FILE_FLAG_BACKUP_SEMANTICS,
                                           None)
        if reparse_point_handle == INVALID_HANDLE_VALUE:
            from pygcam.log import getLogger
            _logger = getLogger(__name__)
            _logger.info("Can't readlink: %s", path)
            raise ctypes.WinError()

        target_buffer = ctypes.c_buffer(MAXIMUM_REPARSE_DATA_BUFFER_SIZE)
        n_bytes_returned = DWORD()
        io_result = DeviceIoControl(reparse_point_handle,
                                    FSCTL_GET_REPARSE_POINT,
                                    None, 0,
                                    target_buffer, len(target_buffer),
                                    ctypes.byref(n_bytes_returned),
                                    None)
        CloseHandle(reparse_point_handle)
        if not io_result:
            raise ctypes.WinError()

        rdb = REPARSE_DATA_BUFFER.from_buffer(target_buffer)
        if rdb.ReparseTag == IO_REPARSE_TAG_SYMLINK:
            return rdb.SymbolicLinkReparseBuffer.PrintName
        elif rdb.ReparseTag == IO_REPARSE_TAG_MOUNT_POINT:
            return rdb.MountPointReparseBuffer.PrintName

        raise ValueError("not a link")

    # Replace broken functions with those defined above.
    # (In python 2.7.11 os.path.islink() indeed failed to detect link made with mklink)
项目:mpv-trakt-sync-daemon    作者:StareInTheAir    | 项目源码 | 文件源码
def run(self):
        import win32file
        self.file_handle = win32file.CreateFile(self.named_pipe_path,
                                                win32file.GENERIC_READ | win32file.GENERIC_WRITE,
                                                0, None,
                                                win32file.OPEN_EXISTING,
                                                0, None)

        log.info('Windows named pipe connected')
        self.fire_connected()

        while True:
            # The following code is cleaner, than waiting for an exception while writing to detect pipe closing,
            # but causes mpv to hang and crash while closing when closed at the wrong time.

            # if win32file.GetFileAttributes(self.named_pipe_path) != win32file.FILE_ATTRIBUTE_NORMAL:
            #     # pipe was closed
            #     break

            try:
                while not self.write_queue.empty():
                    win32file.WriteFile(self.file_handle, self.write_queue.get_nowait())
            except win32file.error:
                log.warning('Exception while writing to Windows named pipe. Assuming pipe closed.')
                break

            size = win32file.GetFileSize(self.file_handle)
            if size > 0:
                while size > 0:
                    # pipe has data to read
                    data = win32file.ReadFile(self.file_handle, 512)
                    self.on_data(data[1])
                    size = win32file.GetFileSize(self.file_handle)
            else:
                time.sleep(1)

        log.info('Windows named pipe closed')
        win32file.CloseHandle(self.file_handle)
        self.file_handle = None

        self.fire_disconnected()
项目:threat-research-tools    作者:carbonblack    | 项目源码 | 文件源码
def write_mbr_winapi( _file ):

    print 'Are you SURE you want to overwrite the MBR?? This will possibly make the volume unbootable.'
    response = raw_input( 'Type \"YES\" then Return to continue, anything else then Return to not continue:' )

    if response != 'YES':
        return

    h       = None
    handles = []

    try:

        for x in range( num_mbr_handles ):

            h = win32file.CreateFile( '\\\\.\\PhysicalDrive0',
                    win32con.GENERIC_WRITE,
                    win32file.FILE_SHARE_WRITE,
                    None,
                    win32file.OPEN_EXISTING,
                    win32file.FILE_ATTRIBUTE_NORMAL,
                    None )

            if ( h != win32file.INVALID_HANDLE_VALUE ):
                handles.append( h )

        f = open( _file, 'rb' )

        if f <> None:

            fsize = os.path.getsize( _file )
            wsize = 512

            if fsize > 512:
                print 'WARNING: File being written is > 512 bytes, will only write 512...'
                wsize = 512

            contents = f.read( fsize )

            if fsize < 512:

                print 'WARNING: Padding file up to 512 bytes, may not have expected results...'

                ## pad it out to 512 bytes
                diff = 512 - 512

                for num in xrange( diff ):
                    contents += 'A'

            win32file.WriteFile( h, contents, None )

            f.close()

    except Exception, e:
        print str( e )
        print '\tAre you running as Administrator?'

    for handle in handles:
        win32file.CloseHandle( handle )

#############
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def testFileTimes(self):
        if issubclass(pywintypes.TimeType, datetime.datetime):
            from win32timezone import TimeZoneInfo
            now = datetime.datetime.now(tz=TimeZoneInfo.local())
            nowish = now + datetime.timedelta(seconds=1)
            later = now + datetime.timedelta(seconds=120)
        else:
            rc, tzi = win32api.GetTimeZoneInformation()
            bias = tzi[0]
            if rc==2: # daylight-savings is in effect.
                bias += tzi[-1]
            bias *= 60 # minutes to seconds...
            tick = int(time.time())
            now = pywintypes.Time(tick+bias)
            nowish = pywintypes.Time(tick+bias+1)
            later = pywintypes.Time(tick+bias+120)

        filename = tempfile.mktemp("-testFileTimes")
        # Windows docs the 'last time' isn't valid until the last write
        # handle is closed - so create the file, then re-open it to check.
        open(filename,"w").close()
        f = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE,
                                 0, None,
                                 win32con.OPEN_EXISTING, 0, None)
        try:
            ct, at, wt = win32file.GetFileTime(f)
            self.failUnless(ct >= now, "File was created in the past - now=%s, created=%s" % (now, ct))
            self.failUnless( now <= ct <= nowish, (now, ct))
            self.failUnless(wt >= now, "File was written-to in the past now=%s, written=%s" % (now,wt))
            self.failUnless( now <= wt <= nowish, (now, wt))

            # Now set the times.
            win32file.SetFileTime(f, later, later, later)
            # Get them back.
            ct, at, wt = win32file.GetFileTime(f)
            # XXX - the builtin PyTime type appears to be out by a dst offset.
            # just ignore that type here...
            if issubclass(pywintypes.TimeType, datetime.datetime):
                self.failUnlessEqual(ct, later)
                self.failUnlessEqual(at, later)
                self.failUnlessEqual(wt, later)

        finally:
            f.Close()
            os.unlink(filename)
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def testFileTimes(self):
        if issubclass(pywintypes.TimeType, datetime.datetime):
            from win32timezone import TimeZoneInfo
            now = datetime.datetime.now(tz=TimeZoneInfo.local())
            nowish = now + datetime.timedelta(seconds=1)
            later = now + datetime.timedelta(seconds=120)
        else:
            rc, tzi = win32api.GetTimeZoneInformation()
            bias = tzi[0]
            if rc==2: # daylight-savings is in effect.
                bias += tzi[-1]
            bias *= 60 # minutes to seconds...
            tick = int(time.time())
            now = pywintypes.Time(tick+bias)
            nowish = pywintypes.Time(tick+bias+1)
            later = pywintypes.Time(tick+bias+120)

        filename = tempfile.mktemp("-testFileTimes")
        # Windows docs the 'last time' isn't valid until the last write
        # handle is closed - so create the file, then re-open it to check.
        open(filename,"w").close()
        f = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE,
                                 0, None,
                                 win32con.OPEN_EXISTING, 0, None)
        try:
            ct, at, wt = win32file.GetFileTime(f)
            self.failUnless(ct >= now, "File was created in the past - now=%s, created=%s" % (now, ct))
            self.failUnless( now <= ct <= nowish, (now, ct))
            self.failUnless(wt >= now, "File was written-to in the past now=%s, written=%s" % (now,wt))
            self.failUnless( now <= wt <= nowish, (now, wt))

            # Now set the times.
            win32file.SetFileTime(f, later, later, later)
            # Get them back.
            ct, at, wt = win32file.GetFileTime(f)
            # XXX - the builtin PyTime type appears to be out by a dst offset.
            # just ignore that type here...
            if issubclass(pywintypes.TimeType, datetime.datetime):
                self.failUnlessEqual(ct, later)
                self.failUnlessEqual(at, later)
                self.failUnlessEqual(wt, later)

        finally:
            f.Close()
            os.unlink(filename)