我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用threading._start_new_thread()。
def intercept_threads(for_attach = False): thread.start_new_thread = thread_creator thread.start_new = thread_creator # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread # so that new threads started using it will be intercepted by our code. # # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then # treat the current thread as the main thread, which is incorrect when attaching because this code is executing # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported. global _threading if _threading is None and 'threading' in sys.modules: import threading _threading = threading _threading._start_new_thread = thread_creator global _INTERCEPTING_FOR_ATTACH _INTERCEPTING_FOR_ATTACH = for_attach ## Modified parameters by Don Jayamanne # Accept current Process id to pass back to debugger
def deferToThread(self, callback, func, *args, **kw): """ Defer a function to a thread and callback the return value. @type callback: function @param callback: function to call on completion @type cbargs: tuple or list @param cbargs: arguments to get supplied to the callback @type func: function @param func: function to call """ def f(func, callback, *args, **kw): ret = func(*args, **kw) self.setTimeout(0, callback, ret) threading._start_new_thread(f, (func, callback) + args, kw) #### # Scheduling ####
def intercept_threads(for_attach = False): thread.start_new_thread = thread_creator thread.start_new = thread_creator # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread # so that new threads started using it will be intercepted by our code. # # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then # treat the current thread as the main thread, which is incorrect when attaching because this code is executing # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported. global _threading if _threading is None and 'threading' in sys.modules: import threading _threading = threading _threading._start_new_thread = thread_creator global _INTERCEPTING_FOR_ATTACH _INTERCEPTING_FOR_ATTACH = for_attach
def thread_creator(func, args, kwargs = {}, *extra_args): if not isinstance(args, tuple): # args is not a tuple. This may be because we have become bound to a # class, which has offset our arguments by one. if isinstance(kwargs, tuple): func, args = args, kwargs kwargs = extra_args[0] if len(extra_args) > 0 else {} return _start_new_thread(new_thread_wrapper, (func, args, kwargs))
def command_connect_repl(self): port_num = read_int(self.conn) _start_new_thread(self.connect_to_repl_backend, (port_num,))
def detach_process(): global DETACHED DETACHED = True if not _INTERCEPTING_FOR_ATTACH: if isinstance(sys.stdout, _DebuggerOutput): sys.stdout = sys.stdout.old_out if isinstance(sys.stderr, _DebuggerOutput): sys.stderr = sys.stderr.old_out if not _INTERCEPTING_FOR_ATTACH: thread.start_new_thread = _start_new_thread thread.start_new = _start_new_thread
def connect_repl_using_socket(sock): _start_new_thread(DebuggerLoop.instance.connect_to_repl_backend_using_socket, (sock,))
def test_limbo_cleanup(self): # Issue 7481: Failure to start thread should cleanup the limbo map. def fail_new_thread(*args): raise threading.ThreadError() _start_new_thread = threading._start_new_thread threading._start_new_thread = fail_new_thread try: t = threading.Thread(target=lambda: None) self.assertRaises(threading.ThreadError, t.start) self.assertFalse( t in threading._limbo, "Failed to cleanup _limbo map on failure of Thread.start().") finally: threading._start_new_thread = _start_new_thread
def __init__(self): import socket self.buffer = [] # cmd buffer self.socket = sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP host='localhost'; port = 9420 sock.connect((host, port)) print('SERVER: socket connected', sock) self._handle = None self.setup_callback( bpy.context ) import threading self.ready = threading._allocate_lock() self.ID = threading._start_new_thread( self.loop, (None,) ) print( 'SERVER: thread started')
def test_limbo_cleanup(self): # Issue 7481: Failure to start thread should cleanup the limbo map. def fail_new_thread(*args): raise thread.error() _start_new_thread = threading._start_new_thread threading._start_new_thread = fail_new_thread try: t = threading.Thread(target=lambda: None) self.assertRaises(thread.error, t.start) self.assertFalse( t in threading._limbo, "Failed to cleanup _limbo map on failure of Thread.start().") finally: threading._start_new_thread = _start_new_thread
def parent(): i=0 while True: i+=1 threading._start_new_thread(child,(i,)) print('parent.....') if input()=='q':break
def parent(): # ?????? threading._start_new_thread(action, (3,)) #j??????? threading._start_new_thread((lambda: action(2)), ()) obj = Power(1) threading._start_new_thread(obj.action, ())
def test_message_is_not_thread_safe(self): class SpoofMessage: class Channel: def __init__(self): self.name = None def __init__(self): self.channel = SpoofMessage.Channel() self.payload = None @MyJsonRpcWebsocketConsumerTest.rpc_method() def ping2(**kwargs): original_message = kwargs["original_message"] return original_message.payload @MyJsonRpcWebsocketConsumerTest.rpc_method() def ping3(**kwargs): original_message = kwargs["original_message"] return original_message.payload def thread_test(): for _i in range(0, 10000): _message = SpoofMessage() _message.channel.name = "websocket.test" _message.payload = "test%s" % _i _res = MyJsonRpcWebsocketConsumerTest._JsonRpcConsumer__process( {"id": 1, "jsonrpc": "2.0", "method": "ping3", "params": []}, _message) self.assertEqual(_res['result'], "test%s" % _i) import threading threading._start_new_thread(thread_test, ()) for i in range(0, 10000): _message = SpoofMessage() _message.channel.name = "websocket.test" _message.payload = "test%s" % i res = MyJsonRpcWebsocketConsumerTest._JsonRpcConsumer__process( {"id": 1, "jsonrpc": "2.0", "method": "ping2", "params": []}, _message) self.assertEqual(res['result'], "test%s" % i)
def thread_creator(func, args, kwargs = {}): id = _start_new_thread(new_thread_wrapper, (func, ) + args, kwargs) return id
def intercept_threads(for_attach = False): thread.start_new_thread = thread_creator thread.start_new = thread_creator global threading if threading is None: # we need to patch threading._start_new_thread so that # we pick up new threads in the attach case when threading # is already imported. import threading threading._start_new_thread = thread_creator global _INTERCEPTING_FOR_ATTACH _INTERCEPTING_FOR_ATTACH = for_attach
def start_stream_threads(self): threading._start_new_thread(self.start_stream, ())
def __init__(self): self._scene_loaded = False self._objects = {} self._materials = {} self.buffer = [] # cmd buffer self.callbacks = [ self.update_view, self.update_selected, self.update_materials ] ## launch Tundra ## if sys.platform == 'linux2': exe = os.path.join( CONFIG_TUNDRA, 'run-server.sh' ) assert os.path.isfile( exe ) cmd = [exe, '--config', TUNDRA_CONFIG_XML_PATH, '--fpslimit', '100', '--storage', '/tmp/'] print( cmd ) p = subprocess.Popen(cmd, stdin=subprocess.PIPE) else: exe = os.path.join( CONFIG_TUNDRA, 'Tundra.exe' ) assert os.path.isfile( exe ) cmd = [exe, '--file', PREVIEW, '--config', TUNDRA_CONFIG_XML_PATH] p = subprocess.Popen(cmd, stdin=subprocess.PIPE) self.proc = p self.socket = sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) host='localhost'; port = 9978 sock.connect((host, port)) print('socket connected', sock) self._handle = None self.setup_callback( bpy.context ) self.ready = threading._allocate_lock() self.ID = threading._start_new_thread( self.loop, (None,) ) print( '.....thread started......')