1 Star 0 Fork 0

沉舟/tDAL

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
tBluetoothHelper.py 5.13 KB
一键复制 编辑 原始数据 按行查看 历史
#!/usr/bin/env python3 /*设定脚本文件直接能执行*/
# -*- coding:utf-8 -*- /*设置编码*/
"""
@Author: zhuo
@Software: PyCharm
@File: tBluetoothHelper.py
@Time: 2025/3/21 9:33
@Function:tBluetoothHelper:蓝牙通信模块
"""
# 导入模块
import time
import threading
import bluetooth
from PySide6.QtCore import Signal, QObject
class BlueToothSignal(QObject):
# 自定义信号传递的数据类型object
get_devices = Signal(object)
class ctBluetoothHelper:
"""
支持多设备连接的蓝牙客户端通用类,用于发现设备、建立连接及数据通信
注意:PyBluez主要支持Windows和Linux,macOS兼容性较差
"""
def __init__(self):
# 存储多个设备连接 {mac: socket}
self.connections = {}
# RFCOMM端口号(通常1-30)
self.port = 1
# 初始化蓝牙端口刷新线程
self.__m_thread_refresh = None
self.thread_refresh_flag = False
# 初始化蓝牙设备列表
self.devices = []
# 初始化蓝牙信号对象
self.signal = BlueToothSignal()
# 启动扫描线程
def f_scanDeviceThreadFunc(self, continuous=True):
"""
function: 启动扫描线程
in: None
out: None
return: None
others: Start Scan Thread Func
"""
if self.__m_thread_refresh and self.__m_thread_refresh.is_alive():
self.thread_refresh_flag = False
self.__m_thread_refresh.join()
self.__m_thread_refresh = threading.Thread(
target=self.__f_scanDevicesFunc,
args=(5, True, continuous)
)
self.__m_thread_refresh.daemon = True
self.thread_refresh_flag = True
self.__m_thread_refresh.start()
# 核心扫描逻辑
def __f_scanDevicesFunc(self, timeout=5, lookup_names=True, continuous=True):
"""
function: 核心扫描逻辑
in: None
out: None
return: None
others: Scan Devices Func
"""
while self.thread_refresh_flag:
try:
# 实时更新设备列表
new_devices = bluetooth.discover_devices(
duration=timeout,
lookup_names=lookup_names,
flush_cache=True
)
# 仅当设备变化时发送信号
if new_devices != self.devices:
self.devices = new_devices
self.signal.get_devices.emit(new_devices)
if not continuous:
break
time.sleep(0.5) # 增加扫描频率
except Exception as e:
print(f"扫描出错: {str(e)}")
# 连接到指定蓝牙设备
def f_connectFunc(self, target_address, port=1):
"""
function: 连接到指定蓝牙设备
in: target_address: 目标设备蓝牙地址(MAC地址)
port: RFCOMM端口号
out:
return: bool: 连接是否成功
others: Connect Func
"""
try:
if target_address in self.connections:
print(f"设备 {target_address} 已连接")
return True
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
sock.settimeout(10)
sock.connect((target_address, port))
self.connections[target_address] = sock
print(f"连接成功 {target_address}")
return True
except Exception as e:
print(f"连接失败 {target_address}: {str(e)}")
return False
# 发送数据到已连接设备
def f_sendDataFunc(self, target_address, data):
"""
function: 发送数据到已连接设备
in: target_address: 目标设备蓝牙地址(MAC地址)
data: 要发送的数据
out: None
return: bool: 发送是否成功
others: Send Data Func
"""
if target_address not in self.connections:
print(f"设备 {target_address} 未连接")
return False
try:
if isinstance(data, str):
data = data.encode()
self.connections[target_address].send(data)
return True
except Exception as e:
print(f"发送失败 {target_address}: {str(e)}")
self.f_disconnectFunc(target_address)
return False
# 断开连接
def f_disconnectFunc(self, target_address=None):
"""
function: 断开连接
in: target_address: 目标设备蓝牙地址(MAC地址)
out: None
return: None
others: Disconnect Func
"""
if target_address:
if target_address in self.connections:
self.connections[target_address].close()
del self.connections[target_address]
else:
for addr in list(self.connections.keys()):
self.connections[addr].close()
del self.connections[addr]
def __del__(self):
self.f_disconnectFunc()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/gitee_chenzhou/tDAL.git
git@gitee.com:gitee_chenzhou/tDAL.git
gitee_chenzhou
tDAL
tDAL
VFDTS_V3.5

搜索帮助