添加项目文件。

This commit is contained in:
CaiXiang
2025-06-09 09:09:25 +08:00
parent 75b909652e
commit 88acb23465
1054 changed files with 615623 additions and 0 deletions

View File

@@ -0,0 +1,79 @@
#pragma once
#include <string>
#include <map>
#include <memory>
#include <pthread.h>
#include "mongoose.h"
#include "LockQueue.hpp"
#include <time.h>
#include <functional>
#include <sstream>
using namespace std;
#ifndef _WIN64
#define Logi(fmt, ...) {time_t timep; time(&timep); struct tm *p = localtime(&timep); printf("%4d-%02d-%02d %02d:%02d:%02d [%llu][I] ", 1900+p->tm_year, 1+p->tm_mon, p->tm_mday, p-> tm_hour, p->tm_min, p->tm_sec, (unsigned long long)pthread_self());printf(fmt, ##__VA_ARGS__);printf("\n");fflush(stdout);}
#define Logw(fmt, ...) {time_t timep; time(&timep); struct tm *p = localtime(&timep); printf("%4d-%02d-%02d %02d:%02d:%02d [%llu][W] ", 1900+p->tm_year, 1+p->tm_mon, p->tm_mday, p-> tm_hour, p->tm_min, p->tm_sec, (unsigned long long)pthread_self());printf(fmt, ##__VA_ARGS__);printf("\n");fflush(stdout);}
#define Loge(fmt, ...) {time_t timep; time(&timep); struct tm *p = localtime(&timep); printf("%4d-%02d-%02d %02d:%02d:%02d [%llu][E] ", 1900+p->tm_year, 1+p->tm_mon, p->tm_mday, p-> tm_hour, p->tm_min, p->tm_sec, (unsigned long long)pthread_self());printf(fmt, ##__VA_ARGS__);printf("\n");fflush(stdout);}
#else
#define Logi(fmt, ...) {time_t timep; time(&timep); struct tm *p = localtime(&timep); printf("%4d-%02d-%02d %02d:%02d:%02d [I] ", 1900+p->tm_year, 1+p->tm_mon, p->tm_mday, p-> tm_hour, p->tm_min, p->tm_sec );printf(fmt, ##__VA_ARGS__);printf("\n");fflush(stdout);}
#define Logw(fmt, ...) {time_t timep; time(&timep); struct tm *p = localtime(&timep); printf("%4d-%02d-%02d %02d:%02d:%02d [W] ", 1900+p->tm_year, 1+p->tm_mon, p->tm_mday, p-> tm_hour, p->tm_min, p->tm_sec );printf(fmt, ##__VA_ARGS__);printf("\n");fflush(stdout);}
#define Loge(fmt, ...) {time_t timep; time(&timep); struct tm *p = localtime(&timep); printf("%4d-%02d-%02d %02d:%02d:%02d [E] ", 1900+p->tm_year, 1+p->tm_mon, p->tm_mday, p-> tm_hour, p->tm_min, p->tm_sec );printf(fmt, ##__VA_ARGS__);printf("\n");fflush(stdout);}
#endif
#define W_HTTP_GET (1 << 0)
#define W_HTTP_POST (1 << 1)
#define W_HTTP_PUT (1 << 2)
#define W_HTTP_DELETE (1 << 3)
#define W_HTTP_HEAD (1 << 4)
#define W_HTTP_ALL 0xFF
using HttpChunkQueue = LockQueue<string *>;
using HttpSendQueue = LockQueue<string *>;
struct HttpReqMsg
{
mg_connection *httpConnection = nullptr;
string method; // GET POST PUT DELETE
string uri;
map<string, string> querys; // the params in uri
string proto; // http version
// the params in header, all key letters are converted to lowercase, eg "Content-Length" change to "content-length"
map<string, string> headers;
string body;
int64_t totalBodySize;
shared_ptr<HttpChunkQueue> chunkQueue;
shared_ptr<HttpSendQueue> sendQueue;
int64_t recvChunkSize = 0;
bool finishRecvChunk = false;
bool isKeepingAlive = false;
int64_t lastKeepAliveTime = 0;
};
using HttpCbFun = std::function<void(shared_ptr<HttpReqMsg> &)>;
using HttpFilterFun = std::function<bool(shared_ptr<HttpReqMsg> &)>;
class IHttpServer
{
public:
IHttpServer(){}
virtual ~IHttpServer(){}
virtual bool init(int maxEventThreadNum) = 0;
virtual bool startHttp(string strip, int port) = 0;
virtual bool startHttps(int port, string certPath, string keyPath) = 0;
virtual bool stop() = 0;
virtual bool run() = 0;
virtual bool isRunning() = 0;
virtual void addHttpApi(const string &uri, HttpCbFun fun, int httpMethods) = 0;
virtual void addChunkHttpApi(const string &uri, HttpCbFun fun, int httpMethods) = 0;
virtual void setHttpFilter(HttpFilterFun filter) = 0;
virtual void forceCloseHttpConnection(shared_ptr<HttpReqMsg> httpMsg) = 0;
virtual void httpReplyJson(shared_ptr<HttpReqMsg> httpMsg, int httpCode, string head, string body) = 0;
virtual void addSendMsgToQueue(shared_ptr<HttpReqMsg> httpMsg, const char* data, int len) = 0;
virtual void addSendMsgToQueue(shared_ptr<HttpReqMsg> httpMsg, string *sendMsg) = 0;
virtual string formJsonBody(int code, string message, string body="") = 0;
virtual bool isClientDisconnect(shared_ptr<HttpReqMsg> httpMsg) = 0;
virtual shared_ptr<string> deQueueHttpChunk(shared_ptr<HttpReqMsg> httpMsg) = 0;
virtual bool addStaticWebDir(const string &dir, const string &header = "") = 0;
};

View File

@@ -0,0 +1,90 @@
#pragma once
#include <mutex>
#include <iostream>
template <typename T>
class LockQueue
{
public:
LockQueue()
{
QueueNode *node = new QueueNode();
node->next = nullptr;
// head->next is the first node, _tail point to last node, not _tail->next
_head = node;
_tail = _head;
};
virtual ~LockQueue()
{
clear();
delete _head;
_head = nullptr;
_tail = nullptr;
};
struct QueueNode
{
T value;
QueueNode *next;
};
bool enQueue(T data)
{
QueueNode *node = new (std::nothrow) QueueNode();
if (!node)
{
return false;
}
node->value = data;
node->next = nullptr;
std::unique_lock<std::mutex> locker(_mutex);
_tail->next = node;
_tail = node;
_queueSize++;
return true;
}
bool deQueue(T &data)
{
std::unique_lock<std::mutex> locker(_mutex);
QueueNode *currentFirstNode = _head->next;
if (!currentFirstNode)
{
return false;
}
_head->next = currentFirstNode->next;
data = currentFirstNode->value;
delete currentFirstNode;
_queueSize--;
if (_queueSize == 0)
{
_tail = _head;
}
return true;
}
int64_t size()
{
return _queueSize;
}
void clear()
{
T data;
while(deQueue(data));
}
bool empty()
{
return (_queueSize <= 0);
}
private:
QueueNode *_head;
QueueNode *_tail;
int64_t _queueSize = 0;
std::mutex _mutex;
};

View File

@@ -0,0 +1,965 @@
#include "WHttpServer.h"
#include <unistd.h>
#include <assert.h>
//#include "stdafx.h"
WHttpServer::WHttpServer()
{
mg_mgr_init(&_mgr);
}
WHttpServer::~WHttpServer()
{
stop();
delete _threadPool;
_threadPool = nullptr;
}
//实现usleep
void WHttpServer::usleep(unsigned long usec)
{
HANDLE timer;
LARGE_INTEGER interval;
interval.QuadPart = (10 * usec);
timer = CreateWaitableTimer(NULL, TRUE, NULL);
SetWaitableTimer(timer, &interval, 0, NULL, NULL, 0);
WaitForSingleObject(timer, INFINITE);
CloseHandle(timer);
}
bool WHttpServer::init(int maxEventThreadNum)
{
std::unique_lock<mutex> locker(_httpLocker);
_threadPool = new WThreadPool();
_threadPool->setMaxThreadNum(maxEventThreadNum);
return true;
}
bool WHttpServer::startHttp(string strIp, int port)
{
std::unique_lock<mutex> locker(_httpLocker);
if (!_threadPool)
{
Logw("WHttpServer::StartHttp do not init");
return false;
}
if (_httpPort != -1)
{
Logw("WHttpServer::StartHttp http server is already start port:%d", _httpPort);
return false;
}
std::stringstream sstream;
sstream << strIp << ":" << port;
_httpCbMsg.httpServer = this;
_httpCbMsg.httpsFlag = false;
mg_connection *serverConn = mg_http_listen(&_mgr, sstream.str().c_str(), WHttpServer::recvHttpRequestCallback, (void *)&_httpCbMsg);
if (!serverConn)
{
Logw("WHttpServer::StartHttp http server start failed: %s", sstream.str().c_str());
return false;
}
Logi("WHttpServer::StartHttp http server start success: %s", sstream.str().c_str());
_httpPort = port;
return true;
}
bool WHttpServer::startHttps(int port, string certPath, string keyPath)
{
std::unique_lock<mutex> locker(_httpLocker);
if (!_threadPool)
{
Logw("WHttpServer::StartHttps do not init");
return false;
}
if (_httpsPort != -1)
{
Logw("WHttpServer::StartHttps https server is already start port:%d", _httpsPort);
return false;
}
_certPath = certPath;
_keyPath = keyPath;
std::stringstream sstream;
sstream << "https://0.0.0.0:" << port;
_httpsCbMsg.httpServer = this;
_httpsCbMsg.httpsFlag = true;
mg_connection *serverConn = mg_http_listen(&_mgr, sstream.str().c_str(), WHttpServer::recvHttpRequestCallback, (void *)&_httpsCbMsg);
if (!serverConn)
{
Logw("WHttpServer::StartHttps https server start failed: %s", sstream.str().c_str());
return false;
}
Logi("WHttpServer::StartHttps https server start success: %s", sstream.str().c_str());
_httpsPort = port;
return true;
}
bool WHttpServer::stop()
{
std::unique_lock<mutex> locker(_httpLocker);
if (_httpPort == -1 && _httpsPort == -1)
{
return true;
}
_httpPort = -1;
_httpsPort = -1;
usleep(100*1000); // make sure run() can not call mg_mgr_poll
mg_mgr_free(&_mgr);
reset();
return true;
}
bool WHttpServer::run()
{
if (_httpPort == -1 && _httpsPort == -1)
{
usleep(1000);
return false;
}
sendHttpMsgPoll();
mg_mgr_poll(&_mgr, 1);
return true;
}
bool WHttpServer::isRunning()
{
return (_httpPort != -1 || _httpsPort != -1);
}
void WHttpServer::addHttpApi(const string &uri, HttpCbFun fun, int httpMethods)
{
HttpApiData httpApiData;
httpApiData.httpCbFun = fun;
httpApiData.httpMethods = httpMethods;
_httpApiMap[uri] = httpApiData;
}
void WHttpServer::addChunkHttpApi(const string &uri, HttpCbFun fun, int httpMethods)
{
HttpApiData httpApiData;
httpApiData.httpCbFun = fun;
httpApiData.httpMethods = httpMethods;
_chunkHttpApiMap[uri] = httpApiData;
}
void WHttpServer::setHttpFilter(HttpFilterFun filter)
{
_httpFilterFun = filter;
}
void WHttpServer::forceCloseHttpConnection(shared_ptr<HttpReqMsg> httpMsg)
{
mg_connection *conn = httpMsg->httpConnection;
conn->is_closing = 1;
}
void WHttpServer::closeHttpConnection(struct mg_connection *conn, bool isDirectClose)
{
if (conn->label[W_FD_STATUS_BIT] == HTTP_NORMAL_CLOSE)
{
return;
}
if (isDirectClose)
{
conn->is_draining = 1;
}
conn->label[W_FD_STATUS_BIT] = HTTP_NORMAL_CLOSE;
}
std::set<string> WHttpServer::getSupportMethods(int httpMethods)
{
std::set<string> methodsSet;
if (httpMethods & W_HTTP_GET)
{
methodsSet.insert("GET");
}
if (httpMethods & W_HTTP_GET)
{
methodsSet.insert("GET");
}
if (httpMethods & W_HTTP_POST)
{
methodsSet.insert("POST");
}
if (httpMethods & W_HTTP_PUT)
{
methodsSet.insert("PUT");
}
if (httpMethods & W_HTTP_DELETE)
{
methodsSet.insert("DELETE");
}
if (httpMethods & W_HTTP_HEAD)
{
methodsSet.insert("HEAD");
}
return methodsSet;
}
bool WHttpServer::handleStaticWebDir(shared_ptr<HttpReqMsg> httpMsg, HttpStaticWebDir &webDir)
{
string filePath = webDir.dirPath + httpMsg->uri;
FILE *file = fopen(filePath.c_str(), "r");
if (!file)
{
// Logw("WHttpServer::handleStaticWebDir can not open file:%s", filePath.c_str());
// httpReplyJson(httpMsg, 500, "", formJsonBody(101, "can not find this file"));
return false;
}
struct stat statbuf;
stat(filePath.c_str(), &statbuf);
int64_t fileSize = statbuf.st_size;
stringstream sstream;
if (httpMsg->method == "HEAD")
{
formStaticWebDirResHeader(sstream, httpMsg, webDir, filePath, 200);
sstream << "Content-Length: " << fileSize << "\r\n";
sstream << "\r\n"; // 空行表示http头部完成
addSendMsgToQueue(httpMsg, sstream.str().c_str(), sstream.str().size());
}
else
{
if (httpMsg->headers.find("range") != httpMsg->headers.end())
{
string rangeStr = httpMsg->headers["range"];
int64_t startByte = 0, endByte = 0;
parseRangeStr(rangeStr, startByte, endByte, fileSize);
startByte = startByte < 0 ? 0 : startByte;
endByte = (endByte > fileSize - 1) ? (fileSize - 1) : endByte;
int64_t contentLength = endByte - startByte + 1;
contentLength = contentLength < 0 ? 0 : contentLength;
if (contentLength < fileSize)
{
formStaticWebDirResHeader(sstream, httpMsg, webDir, filePath, 206);
}
else
{
formStaticWebDirResHeader(sstream, httpMsg, webDir, filePath, 200);
}
sstream << "Content-Range: bytes " << startByte << "-" << endByte << "/" << fileSize << "\r\n";
sstream << "Accept-Ranges: bytes\r\n";
sstream << "Content-Length: " << contentLength << "\r\n";
sstream << "\r\n";
addSendMsgToQueue(httpMsg, sstream.str().c_str(), sstream.str().size());
readStaticWebFile(httpMsg, file, contentLength, startByte);
}
else
{
formStaticWebDirResHeader(sstream, httpMsg, webDir, filePath, 200);
sstream << "Content-Length: " << fileSize << "\r\n";
sstream << "\r\n";
addSendMsgToQueue(httpMsg, sstream.str().c_str(), sstream.str().size());
readStaticWebFile(httpMsg, file, fileSize, 0);
}
}
fclose(file);
if (httpMsg->isKeepingAlive)
{
httpMsg->lastKeepAliveTime = getSysTickCountInMilliseconds();
}
return true;
}
void WHttpServer::formStaticWebDirResHeader(stringstream &sstream, shared_ptr<HttpReqMsg> &httpMsg, HttpStaticWebDir &webDir,
string &filePath, int code)
{
sstream << "HTTP/1.1 "<< code << " " << mg_http_status_code_str(code) << "\r\n";
sstream << "Content-Type: " << guess_content_type(filePath.c_str()) << "\r\n";
map<string, string> &reqHeaders = httpMsg->headers;
if (httpMsg->isKeepingAlive)
{
sstream << "Connection: " << "keep-alive" << "\r\n";
}
else if ((reqHeaders.find("connection") != reqHeaders.end()) && (reqHeaders["connection"] == "keep-alive"))
{
if (_currentKeepAliveNum < MAX_KEEP_ALIVE_NUM)
{
sstream << "Connection: " << "keep-alive" << "\r\n";
_currentKeepAliveNum++;
httpMsg->isKeepingAlive = true;
}
else
{
sstream << "Connection: " << "close" << "\r\n";
}
}
if (!webDir.header.empty())
{
sstream << webDir.header;
}
// sstream << "Content-Disposition: attachment;filename=" << fileName << "\r\n";
}
void WHttpServer::readStaticWebFile(shared_ptr<HttpReqMsg> httpMsg, FILE *file, int64_t contentSize, int64_t startByte)
{
int64_t currentReadSize = 0;
int64_t maxPerReadSize = 1024*1024;
int64_t perReadSize = contentSize > maxPerReadSize ? maxPerReadSize : contentSize;
int64_t remainSize;
uint64_t currentMs = 0;
uint64_t lastWriteMs = getSysTickCountInMilliseconds();
fseek(file, startByte, SEEK_SET);
while((remainSize = contentSize - currentReadSize) > 0 && isRunning())
{
if (isClientDisconnect(httpMsg))
{
Logw("WHttpServer::readStaticWebFile http client close the connection actively");
break;
}
// 为了防止发送队列里的数据太大,占用大量内存,当发送队列里面的数据达到一定量,先等待
if (httpMsg->sendQueue->size() >= HTTP_SEND_QUEUE_SIZE)
{
currentMs = getSysTickCountInMilliseconds();
if (currentMs - lastWriteMs > MAX_DOWNLOAD_PAUSE_TIME * 1000)
{
Logi("WHttpServer::readStaticWebFile download file timeout %s", httpMsg->uri.c_str());
forceCloseHttpConnection(httpMsg);
return;
}
usleep(1000);
continue;
}
string *fileStr = new string();
fileStr->resize(perReadSize);
int64_t currentWantReadSize = remainSize > perReadSize ? perReadSize : remainSize;
int64_t readSize = fread((char *)fileStr->c_str(), 1, currentWantReadSize, file);
currentReadSize += readSize;
if (readSize == 0)
{
Logw("WHttpServer::readStaticWebFile read size is 0");
delete fileStr;
break;
}
if (readSize != perReadSize)
{
fileStr->resize(readSize);
}
addSendMsgToQueue(httpMsg, fileStr);
lastWriteMs = getSysTickCountInMilliseconds();
}
}
void WHttpServer::parseRangeStr(string rangeStr, int64_t &startByte, int64_t &endByte, int64_t fileSize)
{
startByte = 0;
endByte = 0;
size_t equalMarkIndex = rangeStr.find('=');
size_t lineMarkIndex = rangeStr.find('-');
if (equalMarkIndex == string::npos || lineMarkIndex == string::npos)
{
return;
}
startByte = stoll(rangeStr.substr(equalMarkIndex + 1, lineMarkIndex - equalMarkIndex - 1));
if (lineMarkIndex == rangeStr.size() - 1)
{
endByte = fileSize - 1;
}
else
{
endByte = stoll(rangeStr.substr(lineMarkIndex + 1));
}
}
void WHttpServer::reset()
{
_currentKeepAliveNum = 0;
}
void WHttpServer::logHttpRequestMsg(mg_connection *conn, mg_http_message *httpCbData)
{
if (httpCbData->message.len < 1024)
{
Logi("WHttpServer::logHttpRequestMsg %s request id:%ld, message: %s", conn->is_tls ? "https" : "http", conn->id, httpCbData->message.ptr);
}
else
{
char msg[1024] = {0};
memcpy(msg, httpCbData->message.ptr, 1024);
Logi("WHttpServer::logHttpRequestMsg %s request id:%ld, message: %s", conn->is_tls ? "https" : "http", conn->id, msg);
}
}
void WHttpServer::httpReplyJson(shared_ptr<HttpReqMsg> httpMsg, int httpCode, string head, string body)
{
stringstream sstream;
sstream << "HTTP/1.1 " << httpCode << " " << mg_http_status_code_str(httpCode) << "\r\n";
sstream << "Content-Type: application/json\r\n";
if (!head.empty())
{
sstream << head;
}
sstream << "Content-Length: " << body.size() << "\r\n\r\n";
sstream << body;
string data = sstream.str();
// xy_sync_mg_send(conn, data.c_str(), data.size());
addSendMsgToQueue(httpMsg, data.c_str(), data.size());
}
void WHttpServer::addSendMsgToQueue(shared_ptr<HttpReqMsg> httpMsg, const char *data, int len)
{
string *sendMsg = new string();
sendMsg->resize(len);
memcpy((char *)sendMsg->c_str(), data, len);
bool res = httpMsg->sendQueue->enQueue(sendMsg);
assert(res);
}
void WHttpServer::addSendMsgToQueue(shared_ptr<HttpReqMsg> httpMsg, string *sendMsg)
{
bool res = httpMsg->sendQueue->enQueue(sendMsg);
assert(res);
}
string WHttpServer::formJsonBody(int code, string message, string body)
{
stringstream sstream;
sstream << "{";
sstream << R"("code":)" << code << ",";
sstream << R"("message":")" << message << R"(")" << ",";
sstream << R"("url":")" << body << R"(")" ;
sstream << "}";
return sstream.str();
}
bool WHttpServer::isClientDisconnect(shared_ptr<HttpReqMsg> httpMsg)
{
return (httpMsg->httpConnection->label[W_CLIENT_CLOSE_BIT] == 1);
}
shared_ptr<string> WHttpServer::deQueueHttpChunk(shared_ptr<HttpReqMsg> httpMsg)
{
string *res = nullptr;
httpMsg->chunkQueue->deQueue(res);
return shared_ptr<string>(res);
}
bool WHttpServer::addStaticWebDir(const string &dir, const string &header)
{
char tempDir[PATH_MAX];
if (!realpath(dir.c_str(), tempDir))
{
Loge("WHttpServer::addStaticWebDir the dir path is wrong: %s", dir.c_str());
return false;
}
if (!mg_is_dir(tempDir))
{
Loge("WHttpServer::addStaticWebDir is not dir: %s", dir.c_str());
return false;
}
HttpStaticWebDir staticDir;
staticDir.dirPath = tempDir;
staticDir.header = header;
_staticDirVect.push_back(staticDir);
return true;
}
void WHttpServer::recvHttpRequest(mg_connection *conn, int msgType, void *msgData, void *cbData)
{
if (_httpPort == -1 && _httpsPort == -1)
{
return;
}
HttpCbMsg *cbMsg = (HttpCbMsg *)cbData;
int64_t fd = (int64_t)conn->fd;
if (msgType == MG_EV_ACCEPT && cbMsg->httpsFlag)
{
struct mg_tls_opts opts;
opts.ca = nullptr;
opts.cert = _certPath.c_str();
opts.certkey = _keyPath.c_str();
opts.ciphers = nullptr;
opts.srvname.ptr = nullptr;
opts.srvname.len = 0;
Logi("WHttpServer::recvHttpRequest https connect come id:%ld", conn->id);
mg_tls_init(conn, &opts);
}
else if (msgType == MG_EV_HTTP_MSG)
{
struct mg_http_message *httpCbData = (struct mg_http_message *) msgData;
if (isValidHttpChunk(httpCbData))
{
return;
}
logHttpRequestMsg(conn, httpCbData);
if (httpCbData->head.len > HTTP_MAX_HEAD_SIZE)
{
mg_http_reply(conn, 500, "", formJsonBody(HTTP_BEYOND_HEAD_SIZE, "head size beyond 2M").c_str());
closeHttpConnection(conn, true);
return;
}
HttpApiData cbApiData;
if (!findHttpCbFun(httpCbData, cbApiData))
{
if ((mg_vcasecmp(&(httpCbData->method), "GET") != 0) && (mg_vcasecmp(&(httpCbData->method), "HEAD") != 0))
{
mg_http_reply(conn, 400, "", formJsonBody(HTTP_UNKNOWN_REQUEST, "unknown request").c_str());
closeHttpConnection(conn, true);
return;
}
else
{
cbApiData.findStaticFileFlag = true;
}
}
shared_ptr<HttpReqMsg> httpMsg = nullptr;
// if keep-alive fd, erase last http msg
if (_workingMsgMap.find(fd) != _workingMsgMap.end())
{
releaseHttpReqMsg(_workingMsgMap[fd]);
_workingMsgMap.erase(fd);
httpMsg = parseHttpMsg(conn, httpCbData);
httpMsg->isKeepingAlive = true;
_workingMsgMap[fd] = httpMsg;
}
else
{
httpMsg = parseHttpMsg(conn, httpCbData);
}
_workingMsgMap[fd] = httpMsg;
_threadPool->concurrentRun(&WHttpServer::handleHttpMsg, this, std::ref(_workingMsgMap[fd]), cbApiData);
}
else if (msgType == MG_EV_HTTP_CHUNK)
{
struct mg_http_message *httpCbData = (struct mg_http_message *) msgData;
HttpApiData chunkCbApiData;
if (!findChunkHttpCbFun(httpCbData, chunkCbApiData))
{
return;
}
if (httpCbData->head.len > HTTP_MAX_HEAD_SIZE)
{
mg_http_reply(conn, 500, "", formJsonBody(HTTP_BEYOND_HEAD_SIZE, "head size beyond 2M").c_str());
closeHttpConnection(conn, true);
return;
}
if (_workingMsgMap.find(fd) == _workingMsgMap.end())
{
logHttpRequestMsg(conn, httpCbData);
shared_ptr<HttpReqMsg> httpMsg = parseHttpMsg(conn, httpCbData, true);
_workingMsgMap[fd] = httpMsg;
_threadPool->concurrentRun(&WHttpServer::handleChunkHttpMsg, this, std::ref(_workingMsgMap[fd]), chunkCbApiData);
}
else
{
shared_ptr<HttpReqMsg> httpMsg = _workingMsgMap[fd];
enQueueHttpChunk(httpMsg, httpCbData);
}
}
else if (msgType == MG_EV_CLOSE)
{
Logi("WHttpServer::RecvHttpRequest http disconnect id:%ld", conn->id);
if (conn->label[W_VALID_CONNECT_BIT] != 1)
{
return;
}
if (conn->label[W_FD_STATUS_BIT] == HTTP_NORMAL_CLOSE)
{
releaseHttpReqMsg(_workingMsgMap[fd]);
_workingMsgMap.erase(fd);
return;
}
conn->label[W_CLIENT_CLOSE_BIT] = 1;
while(conn->label[W_FD_STATUS_BIT] == HTTP_IN_USE)
{
usleep(1);
}
releaseHttpReqMsg(_workingMsgMap[fd]);
_workingMsgMap.erase(fd);
}
}
void WHttpServer::handleHttpMsg(shared_ptr<HttpReqMsg> &httpMsg, HttpApiData httpCbData)
{
if (httpCbData.findStaticFileFlag)
{
bool findFlag = false;
for(int i = 0; i < (int)_staticDirVect.size(); i++)
{
if (handleStaticWebDir(httpMsg, _staticDirVect[i]))
{
findFlag = true;
break;
}
}
if (!findFlag)
{
httpReplyJson(httpMsg, 400, "", formJsonBody(HTTP_UNKNOWN_REQUEST, "unknown request").c_str());
}
}
else
{
if (_httpFilterFun && !_httpFilterFun(httpMsg))
{
closeHttpConnection(httpMsg->httpConnection);
return;
}
set<string> methods = getSupportMethods(httpCbData.httpMethods);
if (methods.find(httpMsg->method) == methods.end())
{
httpReplyJson(httpMsg, 400, "", formJsonBody(HTTP_UNKNOWN_REQUEST, "do not support this method"));
closeHttpConnection(httpMsg->httpConnection);
return;
}
httpCbData.httpCbFun(httpMsg);
}
if (httpMsg->isKeepingAlive)
{
httpMsg->httpConnection->label[W_FD_STATUS_BIT] = HTTP_NOT_USE;
}
else
{
closeHttpConnection(httpMsg->httpConnection);
}
}
void WHttpServer::handleChunkHttpMsg(shared_ptr<HttpReqMsg> &httpMsg, HttpApiData chunkHttpCbData)
{
if (_httpFilterFun && !_httpFilterFun(httpMsg))
{
closeHttpConnection(httpMsg->httpConnection);
return;
}
set<string> methods = getSupportMethods(chunkHttpCbData.httpMethods);
if (methods.find(httpMsg->method) == methods.end())
{
httpReplyJson(httpMsg, 400, "", formJsonBody(HTTP_UNKNOWN_REQUEST, "do not support this method"));
closeHttpConnection(httpMsg->httpConnection);
return;
}
chunkHttpCbData.httpCbFun(httpMsg);
closeHttpConnection(httpMsg->httpConnection);
}
void WHttpServer::sendHttpMsgPoll()
{
static int64_t pollCount = 0;
pollCount++;
int64_t currentTime = -1;
if (pollCount % 1000 == 0)
{
currentTime = getSysTickCountInMilliseconds();
}
std::map<int64_t, shared_ptr<HttpReqMsg>>::iterator it;
for (it = _workingMsgMap.begin(); it != _workingMsgMap.end(); it++)
{
shared_ptr<HttpReqMsg> httpMsg = it->second;
mg_connection *conn = httpMsg->httpConnection;
// identify if keep-alive timeout
if (httpMsg->isKeepingAlive && (currentTime != -1) && (conn->label[W_FD_STATUS_BIT] == HTTP_NOT_USE))
{
if ((currentTime - httpMsg->lastKeepAliveTime > KEEP_ALIVE_TIME * 1000))
{
conn->label[W_FD_STATUS_BIT] = HTTP_NORMAL_CLOSE;
_currentKeepAliveNum--;
}
}
if ((conn->label[W_FD_STATUS_BIT] == HTTP_NORMAL_CLOSE) && (httpMsg->sendQueue->size() == 0))
{
conn->is_draining = 1;
continue;
}
if (conn->send.len > SEND_BUF_SIZE_BOUNDARY)
{
continue;
}
if (httpMsg->sendQueue->size() == 0)
{
continue;
}
shared_ptr<string> sendMsg = deQueueHttpSendMsg(httpMsg);
assert(sendMsg.get());
mg_send(conn, (const void *)sendMsg->c_str(), sendMsg->size());
}
}
shared_ptr<string> WHttpServer::deQueueHttpSendMsg(shared_ptr<HttpReqMsg> httpMsg)
{
string *sendMsg = nullptr;
httpMsg->sendQueue->deQueue(sendMsg);
return shared_ptr<string>(sendMsg);
}
bool WHttpServer::findChunkHttpCbFun(mg_http_message *httpCbData, HttpApiData &cbApiData)
{
bool res = false;
for (auto it = _chunkHttpApiMap.begin(); it != _chunkHttpApiMap.end(); it++)
{
if (httpCbData->uri.len < it->first.size())
{
continue;
}
size_t cmpSize = it->first.size();
if (strncmp(it->first.c_str(), httpCbData->uri.ptr, cmpSize) == 0)
{
if (((it->first)[cmpSize - 1] == '/') || (httpCbData->uri.len == cmpSize) ||
(httpCbData->uri.len > cmpSize && httpCbData->uri.ptr[cmpSize] == '/'))
{
cbApiData = it->second;
res = true;
break;
}
}
}
return res;
}
bool WHttpServer::isValidHttpChunk(mg_http_message *httpCbData)
{
bool res = false;
for (auto it = _chunkHttpApiMap.begin(); it != _chunkHttpApiMap.end(); it++)
{
if (httpCbData->uri.len < it->first.size())
{
continue;
}
size_t cmpSize = it->first.size();
if (strncmp(it->first.c_str(), httpCbData->uri.ptr, cmpSize) == 0)
{
if (((it->first)[cmpSize - 1] == '/') || (httpCbData->uri.len == cmpSize) ||
(httpCbData->uri.len > cmpSize && httpCbData->uri.ptr[cmpSize] == '/'))
{
res = true;
break;
}
}
}
return res;
}
bool WHttpServer::findHttpCbFun(mg_http_message *httpCbData, HttpApiData &cbApiData)
{
bool res = false;
for (auto it = _httpApiMap.begin(); it != _httpApiMap.end(); it++)
{
if (httpCbData->uri.len < it->first.size())
{
continue;
}
size_t cmpSize = it->first.size();
if (strncmp(it->first.c_str(), httpCbData->uri.ptr, cmpSize) == 0)
{
if (((it->first)[cmpSize - 1] == '/') || (httpCbData->uri.len == cmpSize) ||
(httpCbData->uri.len > cmpSize && httpCbData->uri.ptr[cmpSize] == '/'))
{
cbApiData = it->second;
res = true;
break;
}
}
}
return res;
}
shared_ptr<HttpReqMsg> WHttpServer::parseHttpMsg(mg_connection *conn, mg_http_message *httpCbData, bool chunkFlag)
{
shared_ptr<HttpReqMsg> res = shared_ptr<HttpReqMsg>(new HttpReqMsg());
res->httpConnection = conn;
conn->label[W_VALID_CONNECT_BIT] = 1;
conn->label[W_FD_STATUS_BIT] = HTTP_IN_USE;
res->sendQueue = shared_ptr<HttpSendQueue>(new HttpSendQueue());
res->method.resize(httpCbData->method.len);
memcpy((char*)res->method.c_str(), httpCbData->method.ptr, httpCbData->method.len);
toUpperString(res->method);
res->uri.resize(httpCbData->uri.len);
memcpy((char*)res->uri.c_str(), httpCbData->uri.ptr, httpCbData->uri.len);
string queryKey = "";
string queryValue = "";
bool valueFlag = false;
for (int i = 0; i < (int)httpCbData->query.len; i++)
{
if (httpCbData->query.ptr[i] == '=')
{
valueFlag = true;
continue;
}
else if(httpCbData->query.ptr[i] == '&')
{
valueFlag = false;
res->querys[queryKey] = queryValue;
queryKey.clear();
queryValue.clear();
continue;
}
if (!valueFlag)
{
queryKey.append(1, httpCbData->query.ptr[i]);
}
else
{
queryValue.append(1, httpCbData->query.ptr[i]);
}
}
res->querys[queryKey] = queryValue;
res->proto.resize(httpCbData->proto.len);
memcpy((char*)res->proto.c_str(), httpCbData->proto.ptr, httpCbData->proto.len);
for(int i = 0; i < MG_MAX_HTTP_HEADERS; i++)
{
if (httpCbData->headers[i].name.len == 0 || !httpCbData->headers[i].name.ptr)
{
break;
}
string name;
string value;
name.resize(httpCbData->headers[i].name.len);
value.resize(httpCbData->headers[i].value.len);
memcpy((char*)name.c_str(), httpCbData->headers[i].name.ptr, httpCbData->headers[i].name.len);
memcpy((char*)value.c_str(), httpCbData->headers[i].value.ptr, httpCbData->headers[i].value.len);
toLowerString(name);
res->headers[name] = value;
// std::cout << "show headers, " << name << ": " << value << endl;
}
if (res->headers.find("content-length") != res->headers.end())
{
res->totalBodySize = (int64_t)stoll(res->headers["content-length"]);
}
else
{
Logi("WHttpServer::ParseHttpMsg request id:%ld have no content-length", conn->id);
res->totalBodySize = httpCbData->body.len;
}
if (chunkFlag)
{
res->chunkQueue = shared_ptr<HttpChunkQueue>(new HttpChunkQueue());
string *chunk = new string();
chunk->resize(httpCbData->chunk.len);
memcpy((char*)chunk->c_str(), httpCbData->chunk.ptr, httpCbData->chunk.len);
conn->recv.len -= httpCbData->chunk.len;
res->chunkQueue->enQueue(chunk);
res->recvChunkSize += httpCbData->chunk.len;
res->finishRecvChunk = (res->recvChunkSize >= res->totalBodySize);
}
else
{
res->body.resize(httpCbData->body.len);
memcpy((char*)res->body.c_str(), httpCbData->body.ptr, httpCbData->body.len);
}
return res;
}
void WHttpServer::enQueueHttpChunk(shared_ptr<HttpReqMsg> httpMsg, mg_http_message *httpCbData)
{
string *chunk = new string();
chunk->resize(httpCbData->chunk.len);
memcpy((char*)chunk->c_str(), httpCbData->chunk.ptr, httpCbData->chunk.len);
httpMsg->httpConnection->recv.len -= httpCbData->chunk.len;
bool res = httpMsg->chunkQueue->enQueue(chunk);
assert(res);
if (httpMsg->chunkQueue->size() > CHUNK_QUEUE_SIZE_BOUNDARY)
{
usleep(500);
}
/*
while(!httpMsg->chunkQueue->enQueue(chunk))
{
usleep(500);
}
*/
httpMsg->recvChunkSize += httpCbData->chunk.len;
httpMsg->finishRecvChunk = (httpMsg->recvChunkSize >= httpMsg->totalBodySize);
}
void WHttpServer::releaseHttpReqMsg(shared_ptr<HttpReqMsg> httpMsg)
{
while (httpMsg->chunkQueue.get() && httpMsg->chunkQueue->size() > 0)
{
string *res = nullptr;
httpMsg->chunkQueue->deQueue(res);
delete res;
}
while (httpMsg->sendQueue.get() && httpMsg->sendQueue->size() > 0)
{
string *res = nullptr;
httpMsg->sendQueue->deQueue(res);
delete res;
}
}
void WHttpServer::toLowerString(string &str)
{
for(int i = 0; i < (int)str.size(); i++)
{
str[i] = tolower(str[i]);
}
}
void WHttpServer::toUpperString(string &str)
{
for(int i = 0; i < (int)str.size(); i++)
{
str[i] = toupper(str[i]);
}
}
void WHttpServer::recvHttpRequestCallback(mg_connection *conn, int msgType, void *msgData, void *cbData)
{
HttpCbMsg *cbMsg = (HttpCbMsg *)cbData;
cbMsg->httpServer->recvHttpRequest(conn, msgType, msgData, cbData);
}
uint64_t WHttpServer::getSysTickCountInMilliseconds()
{
/* timespec time;
int ret = clock_gettime(CLOCK_MONOTONIC, &time);
if (ret != 0)
{
printf("get clock error!\n");
}
uint64_t result = ((uint64_t)time.tv_sec) * 1000 + ((uint64_t)time.tv_nsec) / 1000000;
return result;*/
return 0;
}

113
3rdparty/whttp-server-core/WHttpServer.h vendored Normal file
View File

@@ -0,0 +1,113 @@
#pragma once
#include "IHttpServer.h"
#include <mutex>
#include <set>
#include <vector>
#include <time.h>
#include "WThreadPool.h"
#include <atomic>
#define HTTP_SEND_QUEUE_SIZE 3
#define SEND_BUF_SIZE_BOUNDARY (3 * 1024 * 1024)
#define CHUNK_QUEUE_SIZE_BOUNDARY 2000
#define HTTP_MAX_HEAD_SIZE (2 * 1024 * 1024)
#define HTTP_UNKNOWN_REQUEST 100
#define HTTP_BEYOND_HEAD_SIZE 101
#define MAX_KEEP_ALIVE_NUM 100
#define KEEP_ALIVE_TIME 5 // 5s
#define MAX_DOWNLOAD_PAUSE_TIME 60 // 60s
class WHttpServer;
struct HttpCbMsg
{
WHttpServer *httpServer = nullptr;
bool httpsFlag = false;
};
struct HttpApiData
{
HttpCbFun httpCbFun = nullptr;
int httpMethods = -1;
bool findStaticFileFlag = false;
};
struct HttpStaticWebDir
{
string dirPath = "";
string header = "";
};
class WHttpServer: public IHttpServer
{
public:
WHttpServer();
virtual ~WHttpServer();
virtual bool init(int maxEventThreadNum);
virtual bool startHttp(string strIp, int port);
virtual bool startHttps(int port, string certPath, string keyPath);
virtual bool stop();
virtual bool run();
virtual bool isRunning();
virtual void addHttpApi(const string &uri, HttpCbFun fun, int httpMethods);
virtual void addChunkHttpApi(const string &uri, HttpCbFun fun, int httpMethods);
virtual void setHttpFilter(HttpFilterFun filter);
virtual void forceCloseHttpConnection(shared_ptr<HttpReqMsg> httpMsg);
virtual void httpReplyJson(shared_ptr<HttpReqMsg> httpMsg, int httpCode, string head, string body);
virtual void addSendMsgToQueue(shared_ptr<HttpReqMsg> httpMsg, const char* data, int len);
virtual void addSendMsgToQueue(shared_ptr<HttpReqMsg> httpMsg, string *sendMsg);
virtual string formJsonBody(int code, string message, string body = "");
virtual bool isClientDisconnect(shared_ptr<HttpReqMsg> httpMsg);
virtual shared_ptr<string> deQueueHttpChunk(shared_ptr<HttpReqMsg> httpMsg);
virtual bool addStaticWebDir(const string &dir, const string &header = "");
static void toLowerString(string &str);
static void toUpperString(string &str);
void usleep(unsigned long usec);
private:
volatile int _httpPort = -1;
volatile int _httpsPort = -1;
std::mutex _httpLocker;
struct mg_mgr _mgr;
string _certPath = "";
string _keyPath = "";
HttpCbMsg _httpCbMsg;
HttpCbMsg _httpsCbMsg;
std::map<int64_t, shared_ptr<HttpReqMsg>> _workingMsgMap;
WThreadPool *_threadPool = nullptr;
std::map<string, HttpApiData> _httpApiMap;
std::map<string, HttpApiData> _chunkHttpApiMap;
HttpFilterFun _httpFilterFun = nullptr;
vector<HttpStaticWebDir> _staticDirVect;
std::atomic<int> _currentKeepAliveNum {0};
void recvHttpRequest(struct mg_connection *conn, int msgType, void *msgData, void *cbData);
void handleHttpMsg(shared_ptr<HttpReqMsg> &httpMsg, HttpApiData httpCbData);
void handleChunkHttpMsg(shared_ptr<HttpReqMsg> &httpMsg, HttpApiData chunkHttpCbData);
void sendHttpMsgPoll();
shared_ptr<string> deQueueHttpSendMsg(shared_ptr<HttpReqMsg> httpMsg);
bool findHttpCbFun(mg_http_message *httpCbData, HttpApiData &cbApiData);
bool findChunkHttpCbFun(mg_http_message *httpCbData, HttpApiData &cbApiData);
bool isValidHttpChunk(mg_http_message *httpCbData);
shared_ptr<HttpReqMsg> parseHttpMsg(struct mg_connection *conn, struct mg_http_message *httpCbData, bool chunkFlag = false);
void enQueueHttpChunk(shared_ptr<HttpReqMsg> httpMsg, mg_http_message *httpCbData);
void releaseHttpReqMsg(shared_ptr<HttpReqMsg> httpMsg);
void closeHttpConnection(struct mg_connection *conn, bool isDirectClose = false);
std::set<string> getSupportMethods(int httpMethods);
bool handleStaticWebDir(shared_ptr<HttpReqMsg> httpMsg, HttpStaticWebDir &webDir);
void formStaticWebDirResHeader(stringstream &sstream, shared_ptr<HttpReqMsg> &httpMsg, HttpStaticWebDir &webDir,
string &filePath, int code);
void readStaticWebFile(shared_ptr<HttpReqMsg> httpMsg, FILE *file, int64_t contentLength,
int64_t startByte);
void parseRangeStr(string rangeStr, int64_t &startByte, int64_t &endByte, int64_t fileSize);
void reset();
void logHttpRequestMsg(mg_connection *conn, mg_http_message *httpCbData);
static void recvHttpRequestCallback(struct mg_connection *conn, int msgType, void *msgData, void *cbData);
static uint64_t getSysTickCountInMilliseconds();
};

View File

@@ -0,0 +1,264 @@
#include "WThreadPool.h"
using namespace std;
shared_ptr<WThreadPool> WThreadPool::s_threadPool;
std::mutex WThreadPool::s_globleMutex;
WThreadPool::WThreadPool()
{
_mgrThread = make_shared<thread>(&WThreadPool::managerThread, this);
}
WThreadPool::~WThreadPool()
{
stop();
}
WThreadPool *WThreadPool::globalInstance()
{
if (!s_threadPool.get())
{
unique_lock<mutex> locker(s_globleMutex);
if (!s_threadPool.get())
{
s_threadPool = make_shared<WThreadPool>();
}
}
return s_threadPool.get();
}
void WThreadPool::setMaxThreadNum(int maxNum)
{
if (maxNum > WPOOL_MAX_THREAD_NUM)
{
maxNum = WPOOL_MAX_THREAD_NUM;
}
else if (maxNum < WPOOL_MIN_THREAD_NUM)
{
maxNum = WPOOL_MIN_THREAD_NUM;
}
_maxThreadNum = maxNum;
}
bool WThreadPool::waitForDone(int waitMs)
{
int waitedMs = 0;
while(_busyThreadNum != 0 || !_eventQueue.empty())
{
this_thread::sleep_for(chrono::milliseconds(1));
waitedMs++;
if (waitMs > 0 && waitedMs >= waitMs)
{
return false;
}
}
return true;
}
void WThreadPool::enQueueEvent(EventFun fun)
{
bool res = _eventQueue.enQueue(fun);
assert(res);
}
EventFun WThreadPool::deQueueEvent()
{
EventFun fun;
if (_eventQueue.deQueue(fun))
{
return fun;
}
else
{
return nullptr;
}
return fun;
}
void WThreadPool::run()
{
{
unique_lock<mutex> locker(_threadIsRunMutex);
_threadIsRunMap[this_thread::get_id()] = true;
}
while (!_exitAllFlag)
{
{
unique_lock<mutex> locker(_workMutex);
if (_eventQueue.empty() && !_exitAllFlag)
{
_workCondVar.wait(locker);
}
if (_reduceThreadNum > 0)
{
_reduceThreadNum--;
break;
}
}
_busyThreadNum++;
while (!_exitAllFlag)
{
EventFun fun = deQueueEvent();
if (!fun)
{
break;
}
fun();
}
_busyThreadNum--;
}
{
unique_lock<mutex> locker(_threadIsRunMutex);
_threadIsRunMap[this_thread::get_id()] = false;
}
}
void WThreadPool::stop()
{
_exitAllFlag = true;
{
unique_lock<mutex> locker(_mgrMutex);
_mgrCondVar.notify_all();
}
if (_mgrThread->joinable())
{
_mgrThread->join();
}
}
void WThreadPool::managerThread()
{
startWorkThread();
while (!_exitAllFlag)
{
{
unique_lock<mutex> locker(_mgrMutex);
auto now = std::chrono::system_clock::now();
if (((int)_workThreadList.size() >= _maxThreadNum ||
_eventQueue.size() < ((int)_workThreadList.size() - _busyThreadNum - ADD_THREAD_BOUNDARY)) && !_exitAllFlag)
{
_mgrCondVar.wait_until(locker, now + chrono::seconds(WPOOL_MANAGE_SECONDS));
}
}
if (_exitAllFlag)
{
break;
}
adjustWorkThread();
// WThreadPool_log("get here to show work thread num:%d", _workThreadList.size());
}
stopWorkThread();
}
void WThreadPool::startWorkThread()
{
for (int i = 0; i < _minThreadNum; i++)
{
shared_ptr<thread> threadPtr = make_shared<thread>(&WThreadPool::run, this);
_workThreadList.emplace_back(threadPtr);
}
}
void WThreadPool::stopWorkThread()
{
{
unique_lock<mutex> locker(_mgrMutex);
_workCondVar.notify_all();
}
for (auto it = _workThreadList.begin(); it != _workThreadList.end(); it++)
{
if ((*it)->joinable())
{
(*it)->join();
}
}
_workThreadList.clear();
_threadIsRunMap.clear();
_eventQueue.clear();
}
void WThreadPool::adjustWorkThread()
{
int queueSize = _eventQueue.size();
int busyThreadNum = _busyThreadNum;
int liveThreadNum = _workThreadList.size();
int maxThreadNum = _maxThreadNum;
int stepThreadNum = _stepThreadNum;
int minThreadNum = _minThreadNum;
// if rest thread can not run all task concurrently, add the thread
if ((liveThreadNum < maxThreadNum) && (queueSize >= (liveThreadNum - busyThreadNum - ADD_THREAD_BOUNDARY)))
{
int restAllAddNum = maxThreadNum - liveThreadNum;
int addThreadNum = restAllAddNum > stepThreadNum ? stepThreadNum : restAllAddNum;
for (int i = 0; i < addThreadNum; i++)
{
shared_ptr<thread> threadPtr = make_shared<thread>(&WThreadPool::run, this);
_workThreadList.emplace_back(threadPtr);
}
}
else if ((liveThreadNum > minThreadNum) && (busyThreadNum*2 < liveThreadNum))
{
int resAllReduceNum = liveThreadNum - minThreadNum;
int reduceThreadNum = resAllReduceNum > stepThreadNum ? stepThreadNum : resAllReduceNum;
_reduceThreadNum = reduceThreadNum;
int findExitThreadNum = 0;
do
{
if (_exitAllFlag)
{
return;
}
for (int i = 0; i < (reduceThreadNum - findExitThreadNum); i++)
{
_workCondVar.notify_one();
}
this_thread::sleep_for(chrono::milliseconds(1));
{
unique_lock<mutex> locker(_threadIsRunMutex);
for (auto it = _workThreadList.begin(); it != _workThreadList.end();)
{
std::thread::id threadId = (*it)->get_id();
auto threadIdIt = _threadIsRunMap.find(threadId);
if ((threadIdIt != _threadIsRunMap.end()) && (_threadIsRunMap[threadId] == false))
{
findExitThreadNum++;
_threadIsRunMap.erase(threadIdIt);
(*it)->join();
_workThreadList.erase(it++);
}
else
{
it++;
}
}
}
if (findExitThreadNum < reduceThreadNum)
{
this_thread::sleep_for(chrono::milliseconds(1));
}
/*
WThreadPool_log("get here 3 to show findExitThreadNum:%d, reduceThreadNum:%d, _reduceThreadNum:%d", findExitThreadNum, reduceThreadNum, (int)_reduceThreadNum);
for (auto it = _workThreadList.begin(); it != _workThreadList.end(); it++)
{
WThreadPool_log("work thread pid:%lld", (*it)->get_id());
}
for (auto it = _threadIsRunMap.begin(); it != _threadIsRunMap.end(); it++)
{
WThreadPool_log("it->first:%lld, it->second:%d", it->first, it->second);
}
*/
} while(!(findExitThreadNum >= reduceThreadNum && _reduceThreadNum <= 0));
}
}

View File

@@ -0,0 +1,90 @@
#pragma once
#include <functional>
#include <mutex>
#include <list>
#include <thread>
#include <memory>
#include <atomic>
#include <stdio.h>
#include <map>
#include <sstream>
#include <condition_variable>
#include "LockQueue.hpp"
#include <assert.h>
#define WThreadPool_log(fmt, ...) {printf(fmt, ##__VA_ARGS__);printf("\n");fflush(stdout);}
#define WPOOL_MIN_THREAD_NUM 4
#define WPOOL_MAX_THREAD_NUM 256
#define WPOOL_MANAGE_SECONDS 20
#define ADD_THREAD_BOUNDARY 1
using EventFun = std::function<void ()>;
using int64 = long long int;
class WThreadPool
{
public:
WThreadPool();
virtual ~WThreadPool();
static WThreadPool * globalInstance();
void setMaxThreadNum(int maxNum);
bool waitForDone(int waitMs = -1);
template<typename Func, typename ...Arguments >
void concurrentRun(Func func, Arguments... args) {
EventFun queunFun = std::bind(func, args...);
enQueueEvent(queunFun);
if (((int)_workThreadList.size() < _maxThreadNum) &&
(_eventQueue.size() >= ((int)_workThreadList.size() - _busyThreadNum - ADD_THREAD_BOUNDARY)))
{
_mgrCondVar.notify_one();
}
_workCondVar.notify_one();
}
template<typename T> static int64_t threadIdToint64(T threadId)
{
std::string stid;
stid.resize(32);
snprintf((char *)stid.c_str(), 32, "%lld", threadId);
long long int tid = std::stoll(stid);
return tid;
}
private:
int _minThreadNum = WPOOL_MIN_THREAD_NUM;
int _maxThreadNum = 8;
std::atomic<int> _busyThreadNum = {0};
int _stepThreadNum = 4;
volatile bool _exitAllFlag = false;
std::atomic<int> _reduceThreadNum = {0};
std::shared_ptr<std::thread> _mgrThread;
LockQueue<EventFun> _eventQueue;
std::list<std::shared_ptr<std::thread>> _workThreadList;
std::mutex _threadIsRunMutex;
std::map<std::thread::id, bool> _threadIsRunMap;
std::condition_variable _workCondVar;
std::mutex _workMutex;
std::condition_variable _mgrCondVar;
std::mutex _mgrMutex;
static std::shared_ptr<WThreadPool> s_threadPool;
static std::mutex s_globleMutex;
void enQueueEvent(EventFun fun);
EventFun deQueueEvent();
void run();
void managerThread();
void stop();
void startWorkThread();
void stopWorkThread();
void adjustWorkThread();
};

4583
3rdparty/whttp-server-core/mongoose.cpp vendored Normal file

File diff suppressed because it is too large Load Diff

1007
3rdparty/whttp-server-core/mongoose.h vendored Normal file

File diff suppressed because it is too large Load Diff

6
3rdparty/whttp-server-core/unistd.h vendored Normal file
View File

@@ -0,0 +1,6 @@
#ifndef _UNISTD_H
#define _UNISTD_H
#include <io.h>
#include <process.h>
#endif /* _UNISTD_H */