代码拉取完成,页面将自动刷新
import sys
import os
import datetime
import threading
from PySide6.QtCore import QObject
from PySide6.QtWidgets import *
from PySide6.QtGui import *
from PySide6.QtCore import *
from PySide6.QtNetwork import *
#UDP类
class UdpClient(QThread):
OnShowMessage = Signal(str)
_localHost=None
_localPort=50000
_socket=None
_flag=True
def __init__(self,localHost:QHostAddress,localPort:int=50000) -> None:
super(UdpClient,self).__init__()
self._localHost=localHost
self._localPort=localPort
self._socket=QUdpSocket()
def run(self):
self._socket.bind(self._localHost,self._localPort)
self.OnShowMessage.emit(f'开始监听端口:{self._localPort}')
self._flag=True
while(self._flag):
if self._socket.hasPendingDatagrams():
length=self._socket.pendingDatagramSize()
if length>0:
msg=self._socket.readDatagram(length)[0].toStdString()
self.OnShowMessage.emit(f'收到消息:{msg}')
def exit(self):
self._flag=False
if self._socket is not None:
self._socket.close()
self._socket=None
super().exit()
def Send(self,msg:str,targetHost:QHostAddress,targetPort:int):
'''
发送消息
msg:消息内容
targetHost:目标地址(支持单播、组播或广播)
targetPort:目标端口'''
if self._socket is None or len(msg)==0:return
self._socket.writeDatagram(QByteArray(msg),targetHost,targetPort)
self.OnShowMessage.emit(f'向[{targetHost.toString()}:{targetPort}]发送消息:{msg}')
class Demo(QWidget):
server=None
portValidator=QIntValidator()
portValidator.setRange(1000,90000)
ipValidator=QRegularExpressionValidator(QRegularExpression('^((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)$'))
socketClients={}
def __init__(self, parent=None):
super().__init__(parent)
self.parent = parent
layout = QVBoxLayout()
toolBar=QToolBar()
toolBar.addWidget(QLabel('LocalPort:'))
self.localPort=QLineEdit()
self.localPort.setValidator(self.portValidator)
toolBar.addWidget(self.localPort)
toolBar.addWidget(QLabel('TargetIp:'))
self.targetIp=QLineEdit()
self.targetIp.setValidator(self.ipValidator)
toolBar.addWidget(self.targetIp)
toolBar.addWidget(QLabel('TargetPort:'))
self.targetPort=QLineEdit()
self.targetPort.setValidator(self.portValidator)
toolBar.addWidget(self.targetPort)
self.btnStart=QPushButton('Start')
self.btnStart.setAutoFillBackground(True)
self.btnStart.setFlat(True)
self.btnStart.clicked.connect(self.Start)
toolBar.addWidget(self.btnStart)
layout.addWidget(toolBar)
groupLayoutMain=QGridLayout()
gp=QGroupBox('Received Message')
gply=QVBoxLayout()
self.receivedMessage=QTextEdit()
self.receivedMessage.setReadOnly(True)
gply.addWidget(self.receivedMessage)
gp.setLayout(gply)
groupLayoutMain.addWidget(gp,0,0)
gp=QGroupBox('Send Message')
gply=QVBoxLayout()
self.sendMessage=QTextEdit()
gply.addWidget(self.sendMessage)
btn=QPushButton('Send')
btn.clicked.connect(self.Send)
gply.addWidget(btn)
gp.setLayout(gply)
groupLayoutMain.addWidget(gp,0,1)
layout.addLayout(groupLayoutMain)
self.setLayout(layout)
def Start(self):
if self.btnStart.text()=="Start":
if len(self.localPort.text())==0:return
localPort=int(self.localPort.text())
self.udp=UdpClient(QHostAddress(QHostAddress.SpecialAddress.AnyIPv4),localPort)
self.udp.OnShowMessage.connect(self.ShowMessage)
self.udp.start()
palette=self.btnStart.palette()
palette.setColor(QPalette.ColorRole.Button,Qt.GlobalColor.green)
self.btnStart.setPalette(palette)
self.btnStart.setText('Stop')
else:
self.udp.exit()
palette=self.btnStart.palette()
palette.setColor(QPalette.ColorRole.Button,Qt.GlobalColor.lightGray)
self.btnStart.setPalette(palette)
self.btnStart.setText('Start')
def ShowMessage(self,msg):
self.receivedMessage.append(f'{datetime.datetime.now().strftime("%H:%M:%S")} {msg}')
def Send(self):
msg=self.sendMessage.toPlainText()
targetIp=self.targetIp.text()
targetPort=self.targetPort.text()
if self.udp is None or len(msg)==0 or len(targetIp)==0 or len(targetPort)==0:return
self.udp.Send(msg,QHostAddress(targetIp),int(targetPort))
def runDemo(parent):
wigdet = Demo(parent)
return wigdet
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。