Python socket 模块,fromshare() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用socket.fromshare()

项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testTypes(self):
        families = [socket.AF_INET, socket.AF_INET6]
        types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
        for f in families:
            for t in types:
                try:
                    source = socket.socket(f, t)
                except OSError:
                    continue # This combination is not supported
                try:
                    data = source.share(os.getpid())
                    shared = socket.fromshare(data)
                    try:
                        self.compareSockets(source, shared)
                    finally:
                        shared.close()
                finally:
                    source.close()
项目:px    作者:genotrance    | 项目源码 | 文件源码
def runpool():
    parsecli()

    try:
        httpd = ThreadedTCPServer((LISTEN, PORT), Proxy)
    except OSError as e:
        print(e)
        return

    mainsock = httpd.socket

    if hasattr(socket, "fromshare"):
        workers = MAX_WORKERS
        for i in range(workers-1):
            (pipeout, pipein) = multiprocessing.Pipe()
            p = multiprocessing.Process(target=start_worker, args=(pipeout,))
            p.daemon = True
            p.start()
            while p.pid == None:
                time.sleep(1)
            pipein.send(mainsock.share(p.pid))

    serve_forever(httpd)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testTypes(self):
        families = [socket.AF_INET, socket.AF_INET6]
        types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
        for f in families:
            for t in types:
                try:
                    source = socket.socket(f, t)
                except OSError:
                    continue # This combination is not supported
                try:
                    data = source.share(os.getpid())
                    shared = socket.fromshare(data)
                    try:
                        self.compareSockets(source, shared)
                    finally:
                        shared.close()
                finally:
                    source.close()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testTypes(self):
        families = [socket.AF_INET, socket.AF_INET6]
        types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
        for f in families:
            for t in types:
                try:
                    source = socket.socket(f, t)
                except OSError:
                    continue # This combination is not supported
                try:
                    data = source.share(os.getpid())
                    shared = socket.fromshare(data)
                    try:
                        self.compareSockets(source, shared)
                    finally:
                        shared.close()
                finally:
                    source.close()
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def fromshare(*args, **kwargs):
        return from_stdlib_socket(_stdlib_socket.fromshare(*args, **kwargs))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def remoteProcessServer(cls, q):
        # Recreate socket from shared data
        sdata = q.get()
        message = q.get()

        s = socket.fromshare(sdata)
        s2, c = s.accept()

        # Send the message
        s2.sendall(message)
        s2.close()
        s.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testShareLength(self):
        data = self.serv.share(os.getpid())
        self.assertRaises(ValueError, socket.fromshare, data[:-1])
        self.assertRaises(ValueError, socket.fromshare, data+b"foo")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testShareLocal(self):
        data = self.serv.share(os.getpid())
        s = socket.fromshare(data)
        try:
            self.compareSockets(self.serv, s)
        finally:
            s.close()
项目:px    作者:genotrance    | 项目源码 | 文件源码
def start_worker(pipeout):
    parsecli()
    httpd = ThreadedTCPServer((LISTEN, PORT), Proxy, bind_and_activate=False)
    mainsock = socket.fromshare(pipeout.recv())
    httpd.socket = mainsock

    serve_forever(httpd)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def detach(self):
            '''Get the socket.  This should only be called once.'''
            with _resource_sharer.get_connection(self._id) as conn:
                share = conn.recv_bytes()
                return socket.fromshare(share)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def remoteProcessServer(cls, q):
        # Recreate socket from shared data
        sdata = q.get()
        message = q.get()

        s = socket.fromshare(sdata)
        s2, c = s.accept()

        # Send the message
        s2.sendall(message)
        s2.close()
        s.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testShareLength(self):
        data = self.serv.share(os.getpid())
        self.assertRaises(ValueError, socket.fromshare, data[:-1])
        self.assertRaises(ValueError, socket.fromshare, data+b"foo")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testShareLocal(self):
        data = self.serv.share(os.getpid())
        s = socket.fromshare(data)
        try:
            self.compareSockets(self.serv, s)
        finally:
            s.close()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def detach(self):
            '''Get the socket.  This should only be called once.'''
            with _resource_sharer.get_connection(self._id) as conn:
                share = conn.recv_bytes()
                return socket.fromshare(share)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def remoteProcessServer(cls, q):
        # Recreate socket from shared data
        sdata = q.get()
        message = q.get()

        s = socket.fromshare(sdata)
        s2, c = s.accept()

        # Send the message
        s2.sendall(message)
        s2.close()
        s.close()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testShareLength(self):
        data = self.serv.share(os.getpid())
        self.assertRaises(ValueError, socket.fromshare, data[:-1])
        self.assertRaises(ValueError, socket.fromshare, data+b"foo")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testShareLocal(self):
        data = self.serv.share(os.getpid())
        s = socket.fromshare(data)
        try:
            self.compareSockets(self.serv, s)
        finally:
            s.close()