Files
2023-10-13 14:01:41 +00:00

157 lines
4.6 KiB
C++

#include "tcp_server.h"
#include <atomic>
#include <net/if.h>
#include <pthread.h>
#include <signal.h>
#ifdef LOG_TAG
#undef LOG_TAG
#endif
#define LOG_TAG "aiqtool"
TCPServer::~TCPServer()
{
SaveExit();
}
void TCPServer::SaveExit()
{
quit_.store(true, std::memory_order_release);
if (accept_thread_ && accept_thread_->joinable())
accept_thread_->join();
std::for_each(recv_threads_.begin(), recv_threads_.end(), [](const std::unique_ptr<std::thread>& thrd) {
if (thrd && thrd->joinable())
thrd->join();
});
close(sockfd);
sockfd = -1;
}
int TCPServer::Send(int cilent_socket, char* buff, int size)
{
return send(cilent_socket, buff, size, 0);
}
int TCPServer::Recvieve(int cilent_socket)
{
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGQUIT);
sigaddset(&set, SIGINT);
sigaddset(&set, SIGTERM);
pthread_sigmask(SIG_BLOCK, &set, NULL);
std::thread::id threadID = std::this_thread::get_id();
LOG_DEBUG("TCPServer::Recvieve enter %d\n", cilent_socket);
char buffer[MAXPACKETSIZE];
int size = sizeof(buffer);
struct timeval interval = {3, 0};
setsockopt(cilent_socket, SOL_SOCKET, SO_RCVTIMEO, (char*)&interval, sizeof(struct timeval));
while (!quit_.load()) {
int length = recv(cilent_socket, buffer, size, 0);
if (length == 0) {
LOG_DEBUG("socket recvieve exit\n");
break;
} else if (length < 0 && errno == EAGAIN) {
// LOG_INFO("socket recvieve failed\n");
continue;
} else if (length < 0) {
break;
}
LOG_DEBUG("socket recvieve length: %d\n", length);
if (callback_) {
callback_(cilent_socket, buffer, length);
}
}
LOG_DEBUG("TCPServer::Recvieve exit %d\n", cilent_socket);
close(cilent_socket);
cilent_socket = -1;
//
recv_threads_finished_id_.push_back(threadID);
LOG_DEBUG("TCPServer::recv_threads_finished_id_ len: %d\n", recv_threads_finished_id_.size());
return 0;
}
void TCPServer::Accepted()
{
LOG_INFO("TCPServer::Accepted\n");
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGQUIT);
sigaddset(&set, SIGINT);
sigaddset(&set, SIGTERM);
pthread_sigmask(SIG_BLOCK, &set, NULL);
struct timeval interval = {1, 0};
setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char*)&interval, sizeof(struct timeval));
int reuseTrue = 1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseTrue, sizeof(int));
while (!quit_) {
int cilent_socket;
socklen_t sosize = sizeof(clientAddress);
cilent_socket = accept(sockfd, (struct sockaddr*)&clientAddress, &sosize);
if (cilent_socket < 0) {
if (errno != EAGAIN && errno != EINTR) {
LOG_ERROR("Error socket accept failed %d %d\n", cilent_socket, errno);
break;
}
continue;
}
LOG_DEBUG("socket accept ip %s\n", inet_ntoa(clientAddress.sin_addr));
recv_threads_.push_back(
std::unique_ptr<std::thread>(new std::thread(&TCPServer::Recvieve, this, cilent_socket)));
LOG_DEBUG("socket accept close\n");
}
close(sockfd);
sockfd = -1;
exited_.store(true, std::memory_order_release);
LOG_DEBUG("socket accept exit\n");
}
int TCPServer::Process(int port)
{
exited_.store(false, std::memory_order_release);
LOG_DEBUG("TCPServer::Process\n");
int opt = 1;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
LOG_ERROR("Failed to create socket with tunner");
exited_.store(true, std::memory_order_release);
return -1;
}
memset(&serverAddress, 0, sizeof(serverAddress));
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
LOG_ERROR("Error setsockopt\n");
exited_.store(true, std::memory_order_release);
return -1;
}
serverAddress.sin_family = AF_INET;
serverAddress.sin_addr.s_addr = htonl(INADDR_ANY);
serverAddress.sin_port = htons(port);
if ((::bind(sockfd, (struct sockaddr*)&serverAddress, sizeof(serverAddress))) < 0) {
LOG_ERROR("Error bind\n");
exited_.store(true, std::memory_order_release);
return -1;
}
if (listen(sockfd, 5) < 0) {
LOG_ERROR("Error listen\n");
exited_.store(true, std::memory_order_release);
return -1;
}
if (accept_thread_) {
// SaveExit();
}
quit_.store(false, std::memory_order_release);
accept_thread_ = std::unique_ptr<std::thread>(new std::thread(&TCPServer::Accepted, this));
return 0;
}