1 Star 0 Fork 0

Da yang/Socketio

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
app.py 2.05 KB
一键复制 编辑 原始数据 按行查看 历史
Da yang 提交于 2021-03-13 19:31 +08:00 . c
import json
import os
import time
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
from flask_sqlalchemy import SQLAlchemy
import pymysql
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret_key'
app.config['SECRET_KEY'] = os.urandom(24)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:123456@127.0.0.1:3306/sshmanage'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
socketio = SocketIO()
socketio.init_app(app, cors_allowed_origins='*')
from sshd import ssh
name_space = '/dcenter'
app.config['SECRET_KEY'] = os.urandom(24)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:123456@127.0.0.1:3306/sokio'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
class Hosts(db.Model):
id = db.Column(db.Integer, autoincrement=True, primary_key=True)
hostname = db.Column(db.String(50))
ipaddr = db.Column(db.String(50))
Groups = db.Column(db.String(50))
@app.route('/')
def index():
db.create_all()
# return "hello"
return render_template('index.html')
@app.route('/push')
def push_once():
event_name = 'dcenter'
dbdata = Hosts.query.all()
broadcasted_data = []
for host in dbdata:
broadcasted_data.append({'host': host.hostname})
# # hostname = broadcasted_data1.hostname
# broadcasted_data = {'data': broadcasted_data1} # 发送的数据
# for x in range(100):
# time.sleep(1)
# print(x)
# broadcasted_data = {'data': 'hostname'+str(x)}
socketio.emit(event_name, broadcasted_data, broadcast=True, namespace=name_space)
return 'done!'
@socketio.on('connect', namespace=name_space)
def connected_msg():
print('client connected.')
@socketio.on('disconnect', namespace=name_space)
def disconnect_msg():
print('client disconnected.')
@socketio.on('my_event', namespace=name_space)
def mtest_message(message):
print(message)
emit('my_response',
{'data': message['data'], 'count': 1})
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=5000, debug=True)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/shyanglin/Socketio.git
git@gitee.com:shyanglin/Socketio.git
shyanglin
Socketio
Socketio
master

搜索帮助