Python flask_socketio 模块,SocketIO() 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用flask_socketio.SocketIO()

项目:OpenPoGoBot    作者:tehp    | 项目源码 | 文件源码
def run_socket_server(self):
        app = Flask(__name__)
        app.config["SECRET_KEY"] = "OpenPoGoBotSocket"
        socketio = SocketIO(app, logging=False, engineio_logger=False, json=myjson)

        @app.route("/")
        def redirect_online():
            return redirect("http://openpogoui.nicontoso.eu")

        state = {}

        BotEvents(self.bot, socketio, state, self.event_manager)
        UiEvents(self.bot, socketio, state, self.event_manager, self.logger)

        self.log("Starting socket server...")

        socketio.run(
            app,
            host=self.config['socket_server']['host'] or '0.0.0.0',
            port=self.config['socket_server']['port'] or 8080,
            debug=False,
            use_reloader=False,
            log_output=False
        )
项目:twitter-sentiment    作者:words-sdsc    | 项目源码 | 文件源码
def __init__(self, socketio, hashtag=None, filter_by_hashtag=False):
        """
        Args:
            socketio (SocketIO): Used for emitting the tweet data to the client

            filter_by_hashtag (bool): When filtering the stream with a bounding
                                      box, an extra filter needs to be performed
                                      to emit only the tweets with the desired
                                      hashtag

            hashtag (string): If 'filter_by_hashtag' is specified, this is
                              required to assist the internal filter
        """

        super().__init__()
        self.socketio = socketio
        self.filter_by_hashtag = filter_by_hashtag
        self.hashtag = hashtag
项目:parade    作者:bailaohe    | 项目源码 | 文件源码
def _create_app(context):
    from flask import Flask
    from flask_cors import CORS
    from flask_socketio import SocketIO

    app = Flask(context.name)
    CORS(app)

    app.parade_context = context

    from ..api import parade_blueprint
    app.register_blueprint(parade_blueprint)
    socketio = SocketIO(app)

    context.webapp = app

    return app, socketio
项目:weblablib    作者:weblabdeusto    | 项目源码 | 文件源码
def create_socketio(self):
        self.socketio = SocketIO(self.app)

        @self.socketio.on('connect')
        @weblablib.socket_requires_login
        def on_connect():
            self.socketio.emit('results', {'result': 'onconnected'}, namespace='/test')

        @self.socketio.on('my-active-test', namespace='/test')
        @weblablib.socket_requires_active
        def active_message(message):
            self.socketio.emit('results', {'result': 'onactive'}, namespace='/test')

        @self.socketio.on('my-login-test', namespace='/test')
        @weblablib.socket_requires_login
        def login_message(message):
            self.socketio.emit('results', {'result': 'onlogin'}, namespace='/test')
项目:pycommunicate    作者:mincrmatt12    | 项目源码 | 文件源码
def __init__(self, app):
        """

        :type app: pycommunicate.server.app.communicate.CommunicateApp
        """
        self.socketio = flask_socketio.SocketIO(app.flask)
        self.send_queue = eventlet.queue.Queue()
        self.app = app

        self.awaited_responses = {}

        self.event_dispatchers = {}

        self.create_handlers()
项目:flask-sqlalchemy-socketio-demo    作者:lukeyeager    | 项目源码 | 文件源码
def on_connect():
    print('SocketIO connect /')
项目:flask-sqlalchemy-socketio-demo    作者:lukeyeager    | 项目源码 | 文件源码
def on_disconnect():
    print('SocketIO disconnect /')
项目:flask-sqlalchemy-socketio-demo    作者:lukeyeager    | 项目源码 | 文件源码
def on_connect_browser():
    print('SocketIO connect /browser')
项目:flask-sqlalchemy-socketio-demo    作者:lukeyeager    | 项目源码 | 文件源码
def on_disconnect_browser():
    print('SocketIO disconnect /browser')
项目:flask-sqlalchemy-socketio-demo    作者:lukeyeager    | 项目源码 | 文件源码
def on_connect_worker():
    print('SocketIO connect /worker')
项目:flask-socketio-example    作者:takiyu    | 项目源码 | 文件源码
def new_server(update_queue, response_queue, stop_page, port, secret_key):
    # create server
    app = Flask(__name__, static_url_path='/static')
    app.config['SECRET_KEY'] = secret_key
    socketio = SocketIO(app, async_mode=ASYNC_MODE,
                        logger=False, engineio_logger=False)

    # rooting
    @app.route('/')
    def __index():
        logger.info('Render page')
        return render_template('index.html', script="index.js")

    if stop_page:
        @app.route('/stop')
        def __stop():
            socketio.stop()
            logger.info('Server stop request')
            return 'This server is stopped'

    @socketio.on('connect', namespace=IO_NAMESPACE)
    def __on_connect():
        logger.info('New connection is established')

    @socketio.on('disconnect', namespace=IO_NAMESPACE)
    def __on_disconnect():
        logger.info('Connection is closed')

    @socketio.on('update', namespace=IO_NAMESPACE)
    def __on_update(data):
        update_queue.put(data)
        if response_queue is not None:
            res = response_queue.get()
            emit('response', res)

    # start server
    logger.info('Start server on port %d' % port)
    socketio.run(app, host='0.0.0.0', port=port, debug=False, log_output=False)
    logger.info('Stop server on port %d' % port)
项目:dino    作者:thenetcircle    | 项目源码 | 文件源码
def create_app():
    _app = Flask(__name__)

    # used for encrypting cookies for handling sessions
    _app.config['SECRET_KEY'] = 'abc492ee-9739-11e6-a174-07f6b92d4a4b'

    message_queue_type = environ.env.config.get(ConfigKeys.TYPE, domain=ConfigKeys.QUEUE, default=None)
    if message_queue_type is None and not (len(environ.env.config) == 0 or environ.env.config.get(ConfigKeys.TESTING)):
        raise RuntimeError('no message queue type specified')

    message_queue = 'redis://%s' % environ.env.config.get(ConfigKeys.HOST, domain=ConfigKeys.CACHE_SERVICE, default='')
    message_channel = 'dino_%s' % environ.env.config.get(ConfigKeys.ENVIRONMENT, default='test')

    logger.info('message_queue: %s' % message_queue)

    _api = Api(_app)

    _socketio = SocketIO(
            _app,
            logger=logger,
            engineio_logger=os.environ.get('DINO_DEBUG', '0') == '1',
            async_mode='eventlet',
            message_queue=message_queue,
            channel=message_channel)

    # preferably "emit" should be set during env creation, but the socketio object is not created until after env is
    environ.env.out_of_scope_emit = _socketio.emit

    _app.wsgi_app = ProxyFix(_app.wsgi_app)
    return _app, _api, _socketio
项目:dino    作者:thenetcircle    | 项目源码 | 文件源码
def create_app():
    _app = Flask(__name__)

    # used for encrypting cookies for handling sessions
    _app.config['SECRET_KEY'] = 'abc492ee-9739-11e6-a174-07f6b92d4a4b'

    message_queue_type = environ.env.config.get(ConfigKeys.TYPE, domain=ConfigKeys.QUEUE, default=None)
    if message_queue_type is None and not (len(environ.env.config) == 0 or environ.env.config.get(ConfigKeys.TESTING)):
        raise RuntimeError('no message queue type specified')

    message_queue = 'redis://%s' % environ.env.config.get(ConfigKeys.HOST, domain=ConfigKeys.CACHE_SERVICE, default='')
    message_channel = 'dino_%s' % environ.env.config.get(ConfigKeys.ENVIRONMENT, default='test')

    logger.info('message_queue: %s' % message_queue)

    _socketio = SocketIO(
            _app,
            logger=logger,
            engineio_logger=os.environ.get('DINO_DEBUG', '0') == '1',
            async_mode='eventlet',
            message_queue=message_queue,
            channel=message_channel)

    # preferably "emit" should be set during env creation, but the socketio object is not created until after env is
    environ.env.out_of_scope_emit = _socketio.emit

    _app.wsgi_app = ProxyFix(_app.wsgi_app)
    return _app, _socketio
项目:dino    作者:thenetcircle    | 项目源码 | 文件源码
def create_app():
    _app = Flask(
            import_name=__name__,
            template_folder='admin/templates/',
            static_folder='admin/static/')

    # used for encrypting cookies for handling sessions
    _app.config['SECRET_KEY'] = 'abc492ee-9739-11e6-a174-07f6b92d4a4b'
    _app.config['ROOT_URL'] = environ.env.config.get(ConfigKeys.ROOT_URL, domain=ConfigKeys.WEB, default='/')

    message_queue_type = environ.env.config.get(ConfigKeys.TYPE, domain=ConfigKeys.QUEUE, default=None)
    if message_queue_type is None and not (len(environ.env.config) == 0 or environ.env.config.get(ConfigKeys.TESTING)):
        raise RuntimeError('no message queue type specified')

    message_queue = 'redis://%s' % environ.env.config.get(ConfigKeys.HOST, domain=ConfigKeys.CACHE_SERVICE, default='')
    message_channel = 'dino_%s' % environ.env.config.get(ConfigKeys.ENVIRONMENT, default='test')

    logger.info('message_queue: %s' % message_queue)

    _socketio = SocketIO(
            _app,
            logger=logger,
            engineio_logger=os.environ.get('DINO_DEBUG', '0') == '1',
            async_mode='eventlet',
            message_queue=message_queue,
            channel=message_channel)

    # preferably "emit" should be set during env creation, but the socketio object is not created until after env is
    environ.env.out_of_scope_emit = _socketio.emit

    _app.wsgi_app = ReverseProxied(ProxyFix(_app.wsgi_app))
    return _app, _socketio
项目:kytos    作者:kytos    | 项目源码 | 文件源码
def __init__(self, app_name, listen='0.0.0.0', port=8181):
        """Start a Flask+SocketIO server.

        Args:
            app_name(string): String representing a App Name
            listen (string): host name used by api server instance
            port (int): Port number used by api server instance
        """
        dirname = os.path.dirname(os.path.abspath(__file__))
        self.flask_dir = os.path.join(dirname, '../web-ui')
        self.log = logging.getLogger('api_server')

        self.listen = listen
        self.port = port

        self.app = Flask(app_name, root_path=self.flask_dir,
                         static_folder="dist", static_url_path="/dist")
        self.server = SocketIO(self.app, async_mode='threading')
        self._enable_websocket_rooms()
        # ENABLE CROSS ORIGIN RESOURCE SHARING
        CORS(self.app)

        # Disable trailing slash
        self.app.url_map.strict_slashes = False

        # Update web-ui if necessary
        self.update_web_ui(force=False)
项目:opendc-web-server    作者:atlarge-research    | 项目源码 | 文件源码
def receive_message(message):
    """"Receive a SocketIO request"""

    (request, response) = _process_message(message)

    print 'Socket:\t{} to `/{}` resulted in {}: {}'.format(
        request.method,
        request.path,
        response.status['code'],
        response.status['description']
    )
    sys.stdout.flush()

    flask_socketio.emit('response', response.to_JSON(), json=True)
项目:system_designer    作者:frnsys    | 项目源码 | 文件源码
def front(port, working_dir, redis_host):
    """start the frontend server"""
    import eventlet
    eventlet.monkey_patch() # for flask_socketio message queue support
    from flask_socketio import SocketIO
    static = os.path.abspath(os.path.join(working_dir, 'static'))
    templs = os.path.abspath(os.path.join(working_dir, 'templates'))
    app = front_app(static_folder=static, template_folder=templs)
    socketio = SocketIO(app, message_queue=redis_host)
    socketio.run(app, host='0.0.0.0', port=port) # doesn't seem to work if debug=True
项目:system_designer    作者:frnsys    | 项目源码 | 文件源码
def __init__(self, redis_host='redis://localhost:6379'):
        self.redis_host = redis_host
        self.socketio = flask_socketio.SocketIO(message_queue=redis_host)
项目:system_designer    作者:frnsys    | 项目源码 | 文件源码
def __init__(self, redis_host='redis://localhost:6379', namespace='/simulation'):
        self.namespace = namespace
        self.socketio = SocketIO(redis_host)
项目:image_servers    作者:takiyu    | 项目源码 | 文件源码
def new_server(viewer_queue, stop_page, port, secret_key):
    # create server
    app = Flask(__name__, static_url_path='/static')
    app.config['SECRET_KEY'] = secret_key
    # must be 'threading' for broadcast emitting
    socketio = SocketIO(app, async_mode='threading',
                        logger=False, engineio_logger=False)

    # rooting
    @app.route('/')
    def __index():
        logger.info('Render viewer page')
        return render_template('index.html', script="index.js")

    if stop_page:
        @app.route('/stop')
        def __stop():
            socketio.stop()
            logger.info('Server stop request')
            return 'This server is stopped'

    @socketio.on('connect', namespace=IO_NAMESPACE)
    def __on_viewer_connect():
        logger.info('New viewer connection is established')

    @socketio.on('disconnect', namespace=IO_NAMESPACE)
    def __on_viewer_disconnect():
        logger.info('Viewer connection is closed')

    @socketio.on('update', namespace=IO_NAMESPACE)
    def __on_update():
        logger.info('Image updating request is received')
        # get all of current data
        emit_data = buffering_thread.get_data_all()
        # emit all
        logger.debug('Emit for update all')
        emit('update', emit_data, namespace=IO_NAMESPACE)

    def update_event(tab, name, data):
        emit_data = [[tab, name, data]]  # single data
        # broadcast emit
        logger.debug('Broadcast emit for update (tab: %s, name: %s)' %
                     (str(tab), str(name)))
        socketio.emit('update', emit_data, namespace=IO_NAMESPACE)

    # create image updating thread
    if viewer_queue:
        logger.info('Start image buffering thread')
        buffering_thread = ImageBufferingThread(viewer_queue)
        buffering_thread.daemon = True
        buffering_thread.start()
        buffering_thread.register_update_event_func(update_event)

    # start server
    logger.info('Start server on port %d' % port)
    socketio.run(app, host='0.0.0.0', port=port, debug=False, log_output=False)
    logger.info('Stop server on port %d' % port)