1 Star 3 Fork 1

BayMax98/MQTT_Client

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
mqtt_main.c 11.42 KB
一键复制 编辑 原始数据 按行查看 历史
乐嘉公司笔记本 提交于 2022-09-06 17:26 +08:00 . 优化变量命名
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "pthread.h"
#include "unistd.h"
#include "config.h"
#include "subcribe.h"
#include "connect.h"
#include "publish.h"
#define USER_EVENT_NONE 0
#define USER_EVENT_PUBLISH_CUR_TIME 1
#define USER_EVENT_SUBCRIBE_TOPIC 2
#define USER_EVENT_CONNECT_MQTT 3
#define USER_EVENT_DISCONN_MQTT 4
#define USER_EVENT_RECOVER_MQTT_PARAS 5
#define USER_EVETN_EXIT_PROGRAM 0XFFFF
static pthread_t mqttRead_thread;
static pthread_t mqttControl_thread;
static pthread_mutex_t mutex_event;
static int event=USER_EVENT_NONE;
static char ReadBuff[MAX_RECEIVE_BUFF_SIZE];
//set user event
void mqtt_control_set_event(int eve)
{
pthread_mutex_lock(&mutex_event);
event = eve;
pthread_mutex_unlock(&mutex_event);
}
int mqtt_control_get_event()
{
int eve = 0;
pthread_mutex_lock(&mutex_event);
eve = event;
pthread_mutex_unlock(&mutex_event);
return eve;
}
//build the keep alive frame
void keep_alive_send_frame()
{
char frame[50]={0};
memset(frame,0,sizeof(frame));
//心跳帧,保留位为空
frame[0] = MQTT_MSG_PINGREQ;
frame[1] = 0; //剩余长度为0
//无可变报头,无有效载荷
config_mqtt_write(config_get_cur_socket(),frame,2);
}
//analysis the published topic that get frmo mqtt server
void mqtt_anlysis_publish_frame(char* message,int len)
{
int pos=0;
int effectlen = 0;//固定头中的剩余长度字段
int len2 = 1;
char flags=0;
char qos=0; //qos标志位->byte[0]:bit[1-2]
char dup=0; //dup标志位->byte[0]:bit[4]
char retain=0; //retain标志位->byte[0]:bit[0]
unsigned short topicLen = 0;
unsigned short topicMsgLen = 0;
char topic[100];
char topicMsg[200];
int msgPos = 0;
memset(topic,0,sizeof(topic));
memset(topicMsg,0,sizeof(topicMsg));
flags = (message[0] & 0X0F);
if(flags)//判断标志位
{
qos = ((flags&0x06)>>1);
dup = ((flags&0x08)>>3);
retain = ((flags&0x01));
}
do
{
pos++;
if(pos>=5 || pos>(len/2))
{
effectlen = 0;
printf("mqtt_read_anlysis():error! invalid frame!!1\n");
break;
}
else
{
for(int j=1;j<pos;j++)
{
len2 *= 128;
}
effectlen += (len2*(message[pos]&0X7F));
}
}while(message[pos] & 0X80);
//printf("mqtt_anlysis_publish_frame():get effectlen is %d,pos is %d\n",effectlen,pos);
if(effectlen)
{
topicLen = ((message[pos+1]*128) + message[pos+2]);//主题长度2个字节
//printf("mqtt_anlysis_publish_frame():topic len is %d\n",topicLen);
strncpy(topic,&message[pos+3],topicLen);//主题
if(qos)//如果qos标志位不为0,主题后有一个字节为qos字段
{
//qos = message[pos+3+topicLen];//服务质量1字节
msgPos = pos+3+topicLen+1;
}
else
{
msgPos = pos+3+topicLen;
}
//printf("msgPos is %d\n",msgPos);
strncpy(topicMsg,&message[msgPos],(len-msgPos));//主题内容
printf("mqtt_anlysis_publish_frame(): result is:\n");
printf("************************qos: %d,dup: %d,retain %d\n",qos,dup,retain);
printf("**********************topic: %s\n",topic);
printf("*******************topicMsg: %s\n",topicMsg);
}
}
//the function to analysis terminal function
void mqtt_read_analysis(char* message,int len)
{
printf("\n***********************************************\n");
switch ((message[0] & 0XF0))
{
case MQTT_MSG_PINGRESP:
printf("mqtt_read_analysis():get keep alive response frame\n");
break;
case MQTT_MSG_SUBACK:
printf("mqtt_read_analysis():get subcribe ack frame\n");
break;
case MQTT_MSG_CONNACK:
printf("mqtt_read_analysis():get connect ack frame\n");
if(message[3] != 0)
{
printf("mqtt_read_analysis():connect error! error code is %d\n",message[3]);
exit(1);
}
else
{
config_set_connect_sta(NETSTA_CONN_MQTT);
printf("mqtt_read_analysis():connect to mqtt server successfully!\n");
}
break;
case MQTT_MSG_PUBLISH:
printf("mqtt_read_analysis():get published topic frame\n");
mqtt_anlysis_publish_frame(message,len);
break;
default:
printf("mqtt_read_analysis():unsuppoted msg\n");
break;
}
printf("***********************************************\n\n");
}
//the thread to read data from mqtt server
void * config_mqtt_read_thread()
{
int str_len=0;
while(1)
{
if(config_get_connect_sta() != NETSTA_NONE)
{
str_len=config_mqtt_read(config_get_cur_socket(),ReadBuff,sizeof(ReadBuff)-1);
if(PRINT_ORIG_SERVER_DATA)
{
printf("config_mqtt_read_thread():Message from server :\n");
config_printf_hex(ReadBuff,str_len);
}
mqtt_read_analysis(ReadBuff,str_len);//analysis the data that readed from mqtt server
memset(ReadBuff,0,sizeof(ReadBuff));
}
else
{
sleep(2);
}
}
return NULL;
}
//read control cmmond through the terminal
void * mqtt_control_thread()
{
char cmd = 0;
while(1)
{
cmd = getchar();//该函数会阻塞线程
if(cmd != '\n')
{
printf("mqtt_control_thread():get cmd \'%c\'\n",cmd);
switch(cmd)
{
case 'q':
case 'Q':
mqtt_control_set_event(USER_EVETN_EXIT_PROGRAM);
break;
case 'p':
case 'P':
mqtt_control_set_event(USER_EVENT_PUBLISH_CUR_TIME);
break;
case 'S':
case 's':
mqtt_control_set_event(USER_EVENT_SUBCRIBE_TOPIC);
break;
case 'D':
case 'd':
mqtt_control_set_event(USER_EVENT_DISCONN_MQTT);
break;
case 'C':
case 'c':
mqtt_control_set_event(USER_EVENT_CONNECT_MQTT);
break;
case 'R':
case 'r':
mqtt_control_set_event(USER_EVENT_RECOVER_MQTT_PARAS);
break;
default:
break;
}
}
}
return NULL;
}
//recover mqtt paraments
void mqtt_recover_handle()
{
EM_NETSTA netSta = config_get_connect_sta();
config_set_connect_sta(NETSTA_NONE);
//1.judge current net state
if(netSta == NETSTA_NONE)
{
}
else if(netSta == NETSTA_CONN_SOCKET)
{
config_tcp_socket_disconnect(config_get_cur_socket());
}
else if(netSta == NETSTA_CONN_MQTT)
{
mqtt_disconnect(config_get_cur_socket());
}
config_init(0XFF,NULL);
if(mqtt_connect() == -1)
{
printf("mqtt_recover_handle():mqtt connect to default mqtt server failed\n");
exit(1);
}
}
//the controller to control the program function
//'C'->connect to mqtt server
//'D'->disconnect to mqtt server
//'S'->subcribe the designated topic which is defined as:TOPIC_SUBCIRBE
//'P'->publish the designated topic which is defined as:TOPIC_PUBLISH
//'Q'->exit the program
//'R'->recover the link paraments as default paraments that are defined as SOCKET_IP_DEFAULT and SOCKET_PORT_DEFAULT
// and reconnect to the default mqtt server
void mqtt_controller()
{
int aliveCnt = 0;
int event = mqtt_control_get_event();
if(event == USER_EVENT_CONNECT_MQTT)//connect mqtt server
{
if(config_get_connect_sta()==NETSTA_NONE)
{
printf("mqtt_controller():is going to connect to mqqt server\n");
if(mqtt_connect() == -1)//connect to mqtt server
{
printf("mqtt connect failed!\n");
exit(1);
}
}
else
{
printf("mqtt_controller():is already connected to mqtt server\n");
}
}
else if(event == USER_EVENT_DISCONN_MQTT)
{
if(config_get_connect_sta()==NETSTA_NONE)
{
printf("mqtt_controller():event USER_EVENT_DISCONN_MQTT, is already disconnected\n");
}
else
{
mqtt_disconnect(config_get_cur_socket());
}
}
else if(event == USER_EVENT_PUBLISH_CUR_TIME && config_get_connect_sta()==NETSTA_CONN_MQTT)//publish cur time
{
printf("mqtt_controller():is going to publish cur time\n");
publish_publish_topic(0,config_get_cur_socket());
aliveCnt = 0;
}
else if(event == USER_EVENT_SUBCRIBE_TOPIC && config_get_connect_sta()==NETSTA_CONN_MQTT)//subscribe test topic
{
if(sucribe_subcribe_topic(0,config_get_cur_socket())==-1)
{
printf("publish topic failed\n");
}
}
else if(event == USER_EVETN_EXIT_PROGRAM)//exit program
{
printf("mqtt_controller():is going to exit program\n");
exit(1);
}
else if(event == USER_EVENT_RECOVER_MQTT_PARAS)//recover the link paraments and reconnect
{
printf("mqtt_controller():is going to recover link paraments\n");
mqtt_recover_handle();
}
mqtt_control_set_event(USER_EVENT_NONE);
aliveCnt++;
//when connect,send keep alive frame periodic
if(aliveCnt >= (KEEP_ALIVE/2) && config_get_connect_sta()==NETSTA_CONN_MQTT)
{
printf("mqtt_controller():is going to send alive frame\n");
keep_alive_send_frame();
aliveCnt = 0;
}
}
int main(int argc,char *argv[])
{
pthread_mutex_init(&mutex_event,NULL);
config_init(argc,argv);//initialization the link paraments
//creat one thread to get mqtt server data
int err = pthread_create(&mqttRead_thread,NULL,config_mqtt_read_thread,NULL);
if(err != 0)
{
printf("mqtt read thread creat fail\n");
}
//else printf("mqtt read thread creat success\n");
//creat one thread to control the program
err = pthread_create(&mqttControl_thread,NULL,mqtt_control_thread,NULL);
if(err != 0)
{
printf("mqtt_control_thread creat fail\n");
}
//else printf("mqtt_control_thread creat success\n");
while(1)
{
//'C'->connect to mqtt server
//'D'->disconnect to mqtt server
//'S'->subcribe the designated topic which is defined as:TOPIC_SUBCIRBE
//'P'->publish the designated topic which is defined as:TOPIC_PUBLISH
//'Q'->exit the program
//'R'->recover the link paraments as default paraments that are defined as SOCKET_IP_DEFAULT and SOCKET_PORT_DEFAULT
// and reconnect to the default mqtt server
mqtt_controller();//通过输入 字符命令对mqtt进行控制
sleep(1);
}
return 0;
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/Baymax98/mqtt_-client.git
git@gitee.com:Baymax98/mqtt_-client.git
Baymax98
mqtt_-client
MQTT_Client
master

搜索帮助