1 Star 0 Fork 0

GaussianPrince/paxos cpp

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
network.h 7.66 KB
一键复制 编辑 原始数据 按行查看 历史
#ifndef __NETWORK_H__
#define __NETWORK_H__
#include <cerrno>
#include <cstring>
#include <exception>
#include <string>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <iostream>
#include "message.h"
#include <sstream>
#include <mutex>
#include <condition_variable>
class NetworkError : public std::exception {
public:
int err;
std::string msg;
NetworkError(int err, const std::string &msg) : err(err), msg(msg) {}
};
class Network {
public:
int fdServer;
int id;
std::unordered_map<int, int> fdClientMap;
std::unordered_map<int, struct sockaddr_in> neighbors;
fd_set clients;
int maxFd;
std::vector<Message> msgVec;
std::mutex mtx;
std::condition_variable_any condEmpty;
public:
Network()
: fdServer(-1), id(-1), neighbors(), fdClientMap(), clients(), maxFd(0),
condEmpty(), mtx() {}
Network(const int id,
const std::unordered_map<int, struct sockaddr_in> &neighbors)
: fdServer(-1), id(id), neighbors(neighbors), fdClientMap(), clients(),
maxFd(0), condEmpty(), mtx() {}
int openTcpServer() {
std::stringstream ss;
fdServer = socket(AF_INET, SOCK_STREAM, 0);
if (fdServer < 0) {
ss << fdServer << " socket error: " << std::string(strerror(errno));
std::cout << ss.str();
return fdServer;
}
int flags = fcntl(fdServer, F_GETFL, 0);
int ret = fcntl(fdServer, F_SETFL, flags | O_NONBLOCK);
if (ret < 0) {
ss << ret << " fcntl error: " << std::string(strerror(errno));
std::cout << ss.str();
return ret;
}
flags = 1;
ret = setsockopt(fdServer, SOL_SOCKET, SO_REUSEADDR,
(const void *)&flags, sizeof(flags));
if (ret < 0) {
ss << ret
<< " SO_REUSEADDR error: " << std::string(strerror(errno));
std::cout << ss.str();
return ret;
}
ret = setsockopt(fdServer, SOL_SOCKET, SO_REUSEPORT,
(const void *)&flags, sizeof(flags));
if (ret < 0) {
ss << ret
<< " SO_REUSEPORT error: " << std::string(strerror(errno));
std::cout << ss.str();
return ret;
}
struct sockaddr addr;
if (neighbors.find(id) == neighbors.end()) {
ss << ret << " id error: " << std::to_string(id);
std::cout << ss.str();
return -1;
}
memcpy(&addr, &neighbors[id], sizeof(struct sockaddr));
ret = bind(fdServer, &addr, sizeof(struct sockaddr));
if (ret < 0) {
ss << ret << " bind error: " + std::string(strerror(errno));
std::cout << ss.str();
return ret;
}
ret = listen(fdServer, 1024);
if (ret < 0) {
ss << ret << " listen error: " << std::string(strerror(errno));
std::cout << ss.str();
return ret;
}
maxFd = std::max(maxFd, fdServer);
return ret;
}
int closeTcpServer() {
int ret = close(fdServer);
fdServer = -1;
return ret;
}
int openTcpClient() {
std::stringstream ss;
for (auto &neighbor : neighbors) {
if (neighbor.first == id) {
continue;
}
int fdClient = socket(AF_INET, SOCK_STREAM, 0);
if (fdClient < 0) {
ss << fdClient
<< " socket error: " + std::string(strerror(errno));
std::cout << ss.str();
return fdClient;
}
int flags = fcntl(fdClient, F_GETFL, 0);
int ret = fcntl(fdClient, F_SETFL, flags | O_NONBLOCK);
if (ret < 0) {
ss << ret << " fcntl error: " << std::string(strerror(errno));
std::cout << ss.str();
return ret;
}
struct sockaddr addr;
memcpy(&addr, &neighbor.second, sizeof(struct sockaddr));
ret = connect(fdClient, &addr, sizeof(struct sockaddr));
if (ret < 0) {
if (errno != EINPROGRESS) {
ss << ret << " connect error "
<< std::string(strerror(errno));
std::cout << ss.str();
return ret;
}
}
fdClientMap[neighbor.first] = fdClient;
}
return 0;
}
int closeTcpClient() {
int ret = 0;
for (auto &fdClient : fdClientMap) {
ret = close(fdClient.second);
}
return ret;
}
int sendTcpMsg(Message &msg) {
if (fdClientMap.find(msg.to) == fdClientMap.end()) {
return -1;
}
int fdClient = fdClientMap[msg.to];
fd_set wset;
FD_ZERO(&wset);
FD_SET(fdClient, &wset);
struct timeval timeout {
1, 0
};
int ret = select(fdClient + 1, nullptr, &wset, nullptr, &timeout);
if (ret <= 0) {
return ret;
}
std::vector<char> buf;
ret = msg.msgToBuf(buf);
ret = send(fdClient, &buf[0], buf.size(), 0);
if (ret != buf.size()) {
ret = -1;
}
return ret;
}
int recvLoop() {
int ret;
fd_set rset;
if (fdServer <= STDERR_FILENO) {
return -1;
}
while (1) {
rset = clients;
FD_SET(fdServer, &rset);
struct timeval timeout {
1, 0
};
ret = select(maxFd + 1, &rset, nullptr, nullptr, &timeout);
if (ret <= 0) {
continue;
}
for (int i = 0; i <= maxFd; i++) {
if (!FD_ISSET(i, &rset)) {
continue;
}
if (i == fdServer) {
int conn = accept(fdServer, nullptr, nullptr);
maxFd = std::max(conn, maxFd);
FD_SET(conn, &clients);
std::stringstream ss;
ss << id << " new conn " << conn << "," << strerror(errno)
<< "\n";
std::cout << ss.str();
} else {
std::vector<char> buf(sizeof(int) * 3, 0);
ret = recv(i, &buf[0], sizeof(int) * 3, 0);
if (ret != sizeof(int) * 3) {
continue;
}
buf.resize(sizeof(int) * 3 +
ntohl(*(unsigned int *)(&buf[sizeof(int)])),
0);
ret = recv(i, &buf[sizeof(int) * 3],
buf.size() - sizeof(int) * 3, 0);
if (ret != buf.size() - sizeof(int) * 3) {
continue;
}
Message msg;
ret = msg.bufToMsg(buf);
if (ret != buf.size()) {
continue;
}
equeueTcpMsg(msg);
}
}
}
return ret;
}
int equeueTcpMsg(Message &msg) {
std::lock_guard<std::mutex> lock(mtx);
msgVec.emplace_back(msg);
condEmpty.notify_one();
return msgVec.size();
}
int dequeueTcpMsg(std::vector<Message> &msgOutVec) {
std::unique_lock<std::mutex> lock(mtx);
condEmpty.wait(mtx, [&]() { return !(msgVec.size() == 0); });
msgOutVec = msgVec;
msgVec.clear();
return msgOutVec.size();
}
int recvTcpMsg(std::vector<Message> &msgOutVec) {
return dequeueTcpMsg(msgOutVec);
}
};
#endif
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/GaussianPrince/paxos-cpp.git
git@gitee.com:GaussianPrince/paxos-cpp.git
GaussianPrince
paxos-cpp
paxos cpp
master

搜索帮助