2 Star 10 Fork 7

终點起點/PySide6-UI-Demo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
TCP.py 13.25 KB
一键复制 编辑 原始数据 按行查看 历史
终點起點 提交于 2024-08-19 09:07 +08:00 . 实现TCP和UDP通讯
import sys
import os
import datetime
import threading
from PySide6.QtCore import QObject
from PySide6.QtWidgets import *
from PySide6.QtGui import *
from PySide6.QtCore import *
from PySide6.QtNetwork import *
#tcp客户端类
class TcpClient(QThread):
OnShowMessage = Signal(str)
OnConnected = Signal()
OnDisConnected = Signal()
_localPort=50000
_serverHost=None
_serverPort=1
_client=None
_flag=True
_readThread=None
_isConnected=False
def __init__(self,serverHost:QHostAddress,serverPort:int,localPort:int=50000) -> None:
super(TcpClient,self).__init__()
self._localPort=localPort
self._serverHost=serverHost
self._serverPort=serverPort
self._client=QTcpSocket()
self._client.bind(QHostAddress(QHostAddress.SpecialAddress.AnyIPv4),self._localPort)
self._client.connectToHost(self._serverHost,self._serverPort)
self._client.connected.connect(self.SocketConnected)
self._client.disconnected.connect(self.SocketDisConnected)
def IsConnected(self):
return self._isConnected
def run(self):
self._flag=True
while(self._flag):
if self._client.bytesAvailable()>0:
msg=self._client.readAll().toStdString()
self.OnShowMessage.emit(f'收到[{self._client.localAddress().toString()}:{self._client.peerPort()}]消息:{msg}')
def exit(self):
self._flag=False
self._isConnected=False
if self._readThread is not None:
self._readThread.exit()
if self._client is not None:
self._client.close()
self._client=None
super().exit()
def Send(self,msg):
if self._client is None or len(msg)==0:return
self._client.write(QByteArray(msg))
self.OnShowMessage.emit(f'向[{self._client.localAddress().toString()}:{self._client.peerPort()}]发送消息:{msg}')
def SocketConnected(self):
self._isConnected=True
#通知
self.OnShowMessage.emit(f'与[{self._serverHost.toString()}:{self._serverPort}]建立连接')
self.OnConnected.emit()
def SocketDisConnected(self):
self._isConnected=False
if self._readThread is not None:
self._readThread.exit()
self.OnShowMessage.emit(f'与[{self._serverHost.toString()}:{self._serverPort}]断开连接')
self.OnDisConnected.emit()
def SocketRead(self,msg):
self.OnShowMessage.emit(f'收到[{self._serverHost.toString()}:{self._serverPort}]消息:{msg}')
class TcpServer(QThread):
OnShowMessage = Signal(str)
OnClientConnected = Signal(str)
OnClientDisConnected = Signal(str)
_host=None
_port=1
_server=None
_isListening=False
_clients={}
_flag=True
def __init__(self,host:QHostAddress,port:int=60000) :
super(TcpServer,self).__init__()
self._host=host
self._port=port
self._server=QTcpServer()
def IsListening(self):
return self._isListening
def Listen(self):
self._isListening=self._server.listen(self._host,self._port)
self.OnShowMessage.emit(f'开始监听端口:{self._port}')
def run(self):
self._flag=True
while(self._flag):
#接受连接
socket=self._server.nextPendingConnection()
if socket is not None:
print(f'pausemode:{socket.pauseMode()},state:{socket.state()}')
clientId=f'{socket.peerAddress().toString()}:{socket.peerPort()}'
self._clients[clientId]=socket
self._clients[clientId].connected.connect(self.SocketConnect(clientId))
self._clients[clientId].disconnected.connect(self.SocketDisconnect(clientId))
#接收所有客户端消息
for client in self._clients.values():
clientId=f'{client.peerAddress().toString()}:{client.peerPort()}'
msg=client.readAll().toStdString()
if len(msg)>0:
self.OnShowMessage.emit(f'接收[{clientId}]消息:{msg}')
def exit(self):
self._flag=False
self._isListening=False
for client in self._clients.values():
client.close()
self._clients.clear()
self._server.close()
self.OnShowMessage.emit(f'结束监听')
super().exit()
def SocketConnect(self,clientId):
self.OnClientConnected.emit(clientId)
self.OnShowMessage.emit(f'{clientId}连接')
#self._clients[clientId].readyRead.connect(self.ServerRead)
def OnReceive(self,msg):
self.OnShowMessage.emit(msg)
'''def ServerRead(self):
print('begin read')
while(self._flag):
for client in self._clients.values():
if client.bytesAvailable()>0:
clientId=f'{client.peerAddress().toString()}:{client.peerPort()}'
msg=client.readAll().toStdString()
self.OnShowMessage.emit(f'接收[{clientId}]消息:{msg}') '''
def SocketDisconnect(self,clientId):
self.OnClientDisConnected.emit(clientId)
self.OnShowMessage.emit(f'{clientId}断开')
#del self._clients[clientId] #暂时不知道为什么连接后马上断开,估断开时暂时不做移除
def Send(self,clientId,msg):
if len(msg)==0 or self._clients.keys().__contains__(clientId)==False:return
self._clients[clientId].write(QByteArray(msg))
self.OnShowMessage.emit(f'向[{clientId}]发送消息:{msg}')
def SendBroadcast(self,msg):
for clent in self._clients.values():
clientId=f'{clent.peerAddress().toString()}:{clent.peerPort()}'
clent.write(QByteArray(msg))
self.OnShowMessage.emit(f'向[{clientId}]发送消息:{msg}')
class Demo(QWidget):
server=None
portValidator=QIntValidator()
portValidator.setRange(1000,90000)
ipValidator=QRegularExpressionValidator(QRegularExpression('^((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)$'))
socketClients={}
def __init__(self, parent=None):
super().__init__(parent)
self.parent = parent
layout = QGridLayout()
group=QGroupBox('TCP Server')
groupLayout=QVBoxLayout()
toolBar=QToolBar()
toolBar.addWidget(QLabel('Port:'))
self.port=QLineEdit()
self.port.setValidator(self.portValidator)
toolBar.addWidget(self.port)
self.btnListen=QPushButton('Listen')
self.btnListen.setAutoFillBackground(True)
self.btnListen.setFlat(True)
self.btnListen.clicked.connect(self.Listen)
toolBar.addWidget(self.btnListen)
groupLayout.addWidget(toolBar)
groupLayoutMain=QGridLayout()
ly=QVBoxLayout()
gp=QGroupBox('Clients')
gply=QVBoxLayout()
self.clientListView=QListView()
self.clientListData = QStandardItemModel()
self.clientListView.setModel(self.clientListData)
gply.addWidget(self.clientListView)
gp.setLayout(gply)
ly.addWidget(gp)
gp=QGroupBox('Message')
gply=QVBoxLayout()
self.serverMessage=QTextEdit()
self.serverMessage.setReadOnly(True)
gply.addWidget(self.serverMessage)
gp.setLayout(gply)
ly.addWidget(gp)
groupLayoutMain.addLayout(ly,0,0,2,1)
gp=QGroupBox('Send Message')
gply=QVBoxLayout()
self.serverSend=QTextEdit()
gply.addWidget(self.serverSend)
btn=QPushButton('Send')
btn.clicked.connect(self.ServerSend)
gply.addWidget(btn)
gp.setLayout(gply)
groupLayoutMain.addWidget(gp,0,1,2,1)
groupLayout.addLayout(groupLayoutMain)
group.setLayout(groupLayout)
layout.addWidget(group,0,0)
group=QGroupBox('TCP Client')
groupLayout=QVBoxLayout()
toolBar=QToolBar()
toolBar.addWidget(QLabel('LocalPort:'))
self.clientPort=QLineEdit()
self.clientPort.setValidator(self.portValidator)
toolBar.addWidget(self.clientPort)
toolBar.addWidget(QLabel('ServerIp:'))
self.serverIp=QLineEdit()
self.serverIp.setValidator(self.ipValidator)
toolBar.addWidget(self.serverIp)
toolBar.addWidget(QLabel('ServerPort:'))
self.serverPort=QLineEdit()
self.serverPort.setValidator(self.portValidator)
toolBar.addWidget(self.serverPort)
self.btnConnect=QPushButton('Connect')
self.btnConnect.setAutoFillBackground(True)
self.btnConnect.setFlat(True)
self.btnConnect.clicked.connect(self.ClientConnect)
toolBar.addWidget(self.btnConnect)
groupLayout.addWidget(toolBar)
groupLayoutMain=QGridLayout()
gp=QGroupBox('Message')
gply=QVBoxLayout()
self.clientMessage=QTextEdit()
self.clientMessage.setReadOnly(True)
gply.addWidget(self.clientMessage)
gp.setLayout(gply)
groupLayoutMain.addWidget(gp,0,0)
gp=QGroupBox('Send Message')
gply=QVBoxLayout()
self.clientSend=QTextEdit()
gply.addWidget(self.clientSend)
btn=QPushButton('Send')
btn.clicked.connect(self.ClientSend)
gply.addWidget(btn)
gp.setLayout(gply)
groupLayoutMain.addWidget(gp,0,1)
groupLayout.addLayout(groupLayoutMain)
group.setLayout(groupLayout)
layout.addWidget(group,0,1)
self.setLayout(layout)
def Listen(self):
if self.server is not None and self.server.IsListening():
self.server.exit()
self.clientListData.clear()
palette=self.btnListen.palette()
palette.setColor(QPalette.ColorRole.Button,Qt.GlobalColor.lightGray)
self.btnListen.setPalette(palette)
self.btnListen.setText('Listen')
else:
if len(self.port.text())==0:return
port=int(self.port.text())
self.server=TcpServer(QHostAddress(QHostAddress.SpecialAddress.AnyIPv4),port)
self.server.OnShowMessage.connect(self.ShowServerMessage)
self.server.OnClientConnected.connect(self.ShowConnected)
self.server.OnClientDisConnected.connect(self.ShowDisConnected)
self.server.Listen()
self.server.start()
palette=self.btnListen.palette()
palette.setColor(QPalette.ColorRole.Button,Qt.GlobalColor.green)
self.btnListen.setPalette(palette)
self.btnListen.setText('Stop')
def ShowServerMessage(self,msg):
self.serverMessage.append(f'{datetime.datetime.now().strftime("%H:%M:%S")} {msg}')
def ShowConnected(self,client):
self.clientListData.appendRow(QStandardItem(client))
def ShowDisConnected(self,client):
index=self.clientListData.indexFromItem(QStandardItem(client))
self.clientListData.removeRow(index.row())
def ServerSend(self):
msg=self.serverSend.toPlainText()
if len(msg)==0 or self.server is None:return
if self.clientListView.currentIndex().row()==-1:
#群发
self.server.SendBroadcast(msg)
else:
#单发
clientId=self.clientListData.itemFromIndex(self.clientListView.currentIndex()).text()
self.server.Send(clientId,msg)
#tcp客户端连接
def ClientConnect(self):
if self.btnConnect.text()=="Connect":
if len(self.serverIp.text())==0 or len(self.serverPort.text())==0:return
ip=self.serverIp.text()
port=int(self.serverPort.text())
if len(self.clientPort.text())==0:
self.client=TcpClient(QHostAddress(ip),port)
else:
localPort=int(self.clientPort.text())
self.client=TcpClient(QHostAddress(ip),port,localPort)
self.client.OnConnected.connect(self.ClientConnected)
self.client.OnDisConnected.connect(self.ClientDisConnected)
self.client.OnShowMessage.connect(self.ShowClientMessage)
self.client.start()
else:
self.client.exit()
def ShowClientMessage(self,msg):
self.clientMessage.append(f'{datetime.datetime.now().strftime("%H:%M:%S")} {msg}')
def ClientSend(self):
msg=self.clientSend.toPlainText()
if self.client is None or len(msg)==0 or self.client.IsConnected()==False:return
self.client.Send(msg)
def ClientConnected(self):
palette=self.btnConnect.palette()
palette.setColor(QPalette.ColorRole.Button,Qt.GlobalColor.green)
self.btnConnect.setPalette(palette)
self.btnConnect.setText('DisConnect')
def ClientDisConnected(self):
palette=self.btnConnect.palette()
palette.setColor(QPalette.ColorRole.Button,Qt.GlobalColor.lightGray)
self.btnConnect.setPalette(palette)
self.btnConnect.setText('Connect')
def runDemo(parent):
wigdet = Demo(parent)
return wigdet
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/johnjiangw/pyside6-ui-demo.git
git@gitee.com:johnjiangw/pyside6-ui-demo.git
johnjiangw
pyside6-ui-demo
PySide6-UI-Demo
master

搜索帮助