From 97de6580f4f2900363634afcf43c5e075f6299a2 Mon Sep 17 00:00:00 2001 From: Eduardo Dantas Date: Tue, 23 Sep 2025 17:58:16 -0300 Subject: [PATCH] Refactor HTTP and WebSocket to use ixwebsocket Replaces custom ASIO-based HTTP and WebSocket implementations with ixwebsocket library for HTTP requests and WebSocket connections. Updates CMake configuration to include ixwebsocket, simplifies code, and improves maintainability and cross-platform support. --- CMakePresets.json | 3 +- src/CMakeLists.txt | 6 + src/framework/luaengine/luavaluecasts.cpp | 10 + src/framework/net/protocolhttp.cpp | 1294 ++++++--------------- src/framework/net/protocolhttp.h | 160 +-- src/framework/pch.h | 5 + vcpkg.json | 1 + 7 files changed, 401 insertions(+), 1078 deletions(-) diff --git a/CMakePresets.json b/CMakePresets.json index 14d89ec0ff..81a32cae8c 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -96,7 +96,8 @@ "DEBUG_LOG": "ON", "ASAN_ENABLED": "ON", "BUILD_STATIC_LIBRARY": "OFF", - "VCPKG_TARGET_TRIPLET": "x64-windows" + "VCPKG_TARGET_TRIPLET": "x64-windows", + "SPEED_UP_BUILD_UNITY": "OFF" }, "architecture": { "value": "x64", diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9018197b46..30e44f10aa 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -192,6 +192,7 @@ find_package(pugixml CONFIG REQUIRED) find_package(ZLIB REQUIRED) find_package(httplib CONFIG REQUIRED) find_package(fmt CONFIG REQUIRED) +find_package(ixwebsocket CONFIG REQUIRED) if(APPLE) # Required for Physfs @@ -520,7 +521,9 @@ if(MSVC) OpenAL::OpenAL LibLZMA::LibLZMA winmm.lib + bcrypt.lib pugixml::pugixml + ixwebsocket::ixwebsocket fmt::fmt-header-only ) elseif(ANDROID) @@ -569,6 +572,7 @@ elseif(ANDROID) android log pugixml::pugixml + ixwebsocket::ixwebsocket fmt::fmt-header-only ) @@ -616,6 +620,7 @@ elseif(WASM) # OpenAL::OpenAL (using emscripten openal api) LibLZMA::LibLZMA pugixml::pugixml + ixwebsocket::ixwebsocket ZLIB::ZLIB OpenSSL::SSL OpenSSL::Crypto @@ -699,6 +704,7 @@ else() # Linux OpenAL::OpenAL LibLZMA::LibLZMA pugixml::pugixml + ixwebsocket::ixwebsocket ZLIB::ZLIB OpenSSL::SSL OpenSSL::Crypto diff --git a/src/framework/luaengine/luavaluecasts.cpp b/src/framework/luaengine/luavaluecasts.cpp index a163207e32..08bddcd83d 100644 --- a/src/framework/luaengine/luavaluecasts.cpp +++ b/src/framework/luaengine/luavaluecasts.cpp @@ -67,6 +67,16 @@ bool luavalue_cast(const int index, double& d) } // string +int push_luavalue(const char* cstr) +{ + if (!cstr) { + g_lua.pushNil(); + } else { + g_lua.pushString(cstr); + } + return 1; +} + int push_luavalue(const std::string_view str) { g_lua.pushString(str); diff --git a/src/framework/net/protocolhttp.cpp b/src/framework/net/protocolhttp.cpp index 9ef174ff1a..4b76ac371d 100644 --- a/src/framework/net/protocolhttp.cpp +++ b/src/framework/net/protocolhttp.cpp @@ -23,16 +23,21 @@ #include #include +#include #include +#include #include "protocolhttp.h" Http g_http; +std::shared_ptr g_ixHttpClient; void Http::init() { m_working = true; - m_thread = std::thread([this] { m_ios.run(); }); + if (!g_ixHttpClient) { + g_ixHttpClient = std::make_shared(true); + } } void Http::terminate() @@ -40,1057 +45,488 @@ void Http::terminate() if (!m_working) return; m_working = false; - for (const auto& ws : m_websockets) { - ws.second->close(); - } - for (const auto& op : m_operations) { - if (const auto& session = op.second->session.lock()) - session->close(); - } - m_guard.reset(); - if (!m_thread.joinable()) { - stdext::millisleep(100); - m_ios.stop(); - } - m_thread.join(); -} -int Http::get(const std::string& url, int timeout) -{ - if (!timeout) // lua is not working with default values - timeout = 5; - int operationId = m_operationId++; + std::vector> websockets; + std::vector> requests; - asio::post(m_ios, [&, url, timeout, operationId] { - auto result = std::make_shared(); - result->url = url; - result->operationId = operationId; - m_operations[operationId] = result; - const auto& session = std::make_shared(m_ios, url, m_userAgent, m_enable_time_out_on_read_write, m_custom_header, timeout, - false, true, result, [this, operationId](HttpResult_ptr result) { - bool finished = result->finished; - g_dispatcher.addEvent([result, finished] { - if (!finished) { - g_lua.callGlobalField("g_http", "onGetProgress", result->operationId, result->url, result->progress); - return; - } - g_lua.callGlobalField("g_http", "onGet", result->operationId, result->url, result->error, result->response); - }); - if (finished) { - m_operations.erase(operationId); + { + std::lock_guard lock(m_mutex); + for (auto& entry : m_websockets) { + websockets.push_back(entry.second); + } + for (auto& entry : m_operations) { + const auto& result = entry.second; + if (!result) + continue; + result->canceled = true; + if (result->request) { + requests.push_back(result->request); } - }); - result->session = session; - session->start(); - }); + } + m_websockets.clear(); + m_operations.clear(); + m_downloads.clear(); + } - return operationId; -} + for (const auto& request : requests) { + if (request) { + request->cancel = true; + } + } -int Http::post(const std::string& url, const std::string& data, int timeout, bool isJson, bool checkContentLength) -{ - if (!timeout) // lua is not working with default values - timeout = 5; - if (data.empty()) { - g_logger.error("Invalid post request for {}, empty data, use get instead", url); - return -1; + for (const auto& websocket : websockets) { + if (websocket) { + websocket->close(); + } } - int operationId = m_operationId++; - asio::post(m_ios, [&, url, data, timeout, isJson, checkContentLength, operationId] { - auto result = std::make_shared(); - result->url = url; - result->operationId = operationId; - result->postData = data; - m_operations[operationId] = result; - const auto& session = std::make_shared(m_ios, url, m_userAgent, m_enable_time_out_on_read_write, m_custom_header, timeout, - isJson, checkContentLength, result, [this, operationId](HttpResult_ptr result) { - bool finished = result->finished; - g_dispatcher.addEvent([result, finished] { - if (!finished) { - g_lua.callGlobalField("g_http", "onPostProgress", result->operationId, result->url, result->progress); - return; - } - g_lua.callGlobalField("g_http", "onPost", result->operationId, result->url, result->error, result->response); - }); - if (finished) { - m_operations.erase(operationId); - } - }); - result->session = session; - session->start(); - }); - return operationId; + g_ixHttpClient.reset(); } -int Http::download(const std::string& url, const std::string& path, int timeout) +int Http::get(const std::string& url, int timeout) { - if (!timeout) // lua is not working with default values + if (!timeout) timeout = 5; - int operationId = m_operationId++; - asio::post(m_ios, [&, url, path, timeout, operationId] { - auto result = std::make_shared(); - result->url = url; - result->operationId = operationId; - m_operations[operationId] = result; - const auto& session = std::make_shared(m_ios, url, m_userAgent, m_enable_time_out_on_read_write, m_custom_header, timeout, - false, true, result, [&, path](HttpResult_ptr result) { - if (!result->finished) { - g_dispatcher.addEvent([result] { - g_lua.callGlobalField("g_http", "onDownloadProgress", result->operationId, result->url, result->progress, result->speed); - }); - return; - } + if (!g_ixHttpClient) { + g_ixHttpClient = std::make_shared(true); + } - auto checksum = g_crypt.crc32(result->response, false); - g_dispatcher.addEvent([this, result, path, checksum] { - if (result->error.empty()) { - if (!path.empty() && path[0] == '/') - m_downloads[path.substr(1)] = result; - else - m_downloads[path] = result; - } - g_lua.callGlobalField("g_http", "onDownload", result->operationId, result->url, result->error, path, checksum); - }); + const int operationId = m_operationId++; + auto result = std::make_shared(); + result->url = url; + result->operationId = operationId; - m_operations.erase(operationId); - }); - result->session = session; - session->start(); - }); + { + std::lock_guard lock(m_mutex); + m_operations[operationId] = result; + } - return operationId; -} + auto request = g_ixHttpClient->createRequest(url, ix::HttpClient::kGet); + request->connectTimeout = timeout; + if (m_enable_time_out_on_read_write) { + request->transferTimeout = timeout; + } + request->followRedirects = true; + request->extraHeaders["User-Agent"] = m_userAgent; + for (const auto& header : m_custom_header) { + request->extraHeaders[header.first] = header.second; + } -int Http::ws(const std::string& url, int timeout) -{ - if (!timeout) // lua is not working with default values - timeout = 5; - int operationId = m_operationId++; + result->request = request; - asio::post(m_ios, [&, url, timeout, operationId] { - auto result = std::make_shared(); - result->url = url; - result->operationId = operationId; - m_operations[operationId] = result; - const auto& session = std::make_shared(m_ios, url, m_userAgent, m_enable_time_out_on_read_write, timeout, result, [&, result](WebsocketCallbackType type, std::string message) { - g_dispatcher.addEvent([result, type, message] { - if (type == WebsocketCallbackType::OPEN) { - g_lua.callGlobalField("g_http", "onWsOpen", result->operationId, message); - } else if (type == WebsocketCallbackType::MESSAGE) { - g_lua.callGlobalField("g_http", "onWsMessage", result->operationId, message); - } else if (type == WebsocketCallbackType::CLOSE) { - g_lua.callGlobalField("g_http", "onWsClose", result->operationId, message); - } else if (type == WebsocketCallbackType::ERROR_) { - g_lua.callGlobalField("g_http", "onWsError", result->operationId, message); - } - }); - if (type == WebsocketCallbackType::CLOSE) { - m_websockets.erase(result->operationId); + request->onProgressCallback = [this, result](int current, int total) { + if (result->finished || result->canceled) + return false; + + result->progress = computeProgress(current, total); + g_dispatcher.addEvent([result] { + if (!result->finished) { + g_lua.callGlobalField("g_http", "onGetProgress", result->operationId, result->url, result->progress); } }); - m_websockets[result->operationId] = session; - session->start(); - }); - - return operationId; -} + return true; + }; -bool Http::wsSend(int operationId, const std::string& message) -{ - asio::post(m_ios, [&, operationId, message] { - const auto wit = m_websockets.find(operationId); - if (wit == m_websockets.end()) { - return; + const auto callback = [this, operationId, result](const ix::HttpResponsePtr& response) { + result->finished = true; + if (response) { + result->status = response->statusCode; + result->response = response->body; + result->size = static_cast(response->body.size()); + result->progress = 100; } - wit->second->send(message); - }); - return true; -} + result->error = describeHttpError(response, result); -bool Http::wsClose(const int operationId) -{ - cancel(operationId); - return true; -} - -bool Http::cancel(int id) -{ - asio::post(m_ios, [&, id] { - const auto wit = m_websockets.find(id); - if (wit != m_websockets.end()) { - wit->second->close(); + { + std::lock_guard lock(m_mutex); + m_operations.erase(operationId); } - const auto it = m_operations.find(id); - if (it == m_operations.end()) - return; - if (it->second->canceled) - return; - const auto& session = it->second->session.lock(); - if (session) - session->close(); - }); - return true; -} -void HttpSession::start() -{ - instance_uri = parseURI(m_url); - const asio::ip::tcp::resolver::query query_resolver(instance_uri.domain, instance_uri.port); - - if (m_result->postData == "") { - m_request.append("GET " + instance_uri.query + " HTTP/1.1\r\n"); - m_request.append("Host: " + instance_uri.domain + "\r\n"); - m_request.append("User-Agent: " + m_agent + "\r\n"); - m_request.append("Accept: */*\r\n"); - for (const auto& ch : m_custom_header) { - m_request.append(ch.first + ch.second + "\r\n"); - } - m_request.append("Connection: close\r\n\r\n"); - } else { - m_request.append("POST " + instance_uri.query + " HTTP/1.1\r\n"); - m_request.append("Host: " + instance_uri.domain + "\r\n"); - m_request.append("User-Agent: " + m_agent + "\r\n"); - m_request.append("Accept: */*\r\n"); - for (const auto& ch : m_custom_header) { - m_request.append(ch.first + ch.second + "\r\n"); - } - if (m_isJson) { - m_request.append("Content-Type: application/json\r\n"); - } else { - m_request.append("Content-Type: application/x-www-form-urlencoded\r\n"); + g_dispatcher.addEvent([result] { + g_lua.callGlobalField("g_http", "onGet", result->operationId, result->url, result->error, result->response); + }); + }; + + if (!g_ixHttpClient->performRequest(request, callback)) { + result->finished = true; + result->error = "http_error::queue"; + { + std::lock_guard lock(m_mutex); + m_operations.erase(operationId); } - m_request.append("Content-Length: " + std::to_string(m_result->postData.size()) + "\r\n"); - m_request.append("Connection: close\r\n\r\n"); - m_request.append(m_result->postData); + g_dispatcher.addEvent([result] { + g_lua.callGlobalField("g_http", "onGet", result->operationId, result->url, result->error, result->response); + }); } - m_resolver.async_resolve( - query_resolver, - [sft = shared_from_this()]( - const std::error_code& ec, asio::ip::tcp::resolver::iterator iterator) { - sft->on_resolve(ec, std::move(iterator)); - }); + return operationId; } -void HttpSession::on_resolve(const std::error_code& ec, asio::ip::tcp::resolver::iterator iterator) +int Http::post(const std::string& url, const std::string& data, int timeout, bool isJson, bool /*checkContentLength*/) { - if (ec) { - onError("HttpSession unable to resolve " + m_url + ": " + ec.message()); - return; - } - - std::error_code _ec; - if (instance_uri.port == "443") { - while (iterator != asio::ip::tcp::resolver::iterator()) { - m_ssl.lowest_layer().close(); - m_ssl.lowest_layer().connect(*iterator++, _ec); - if (!_ec) { - const std::error_code __ec; - on_connect(__ec); - break; - } - } - } else { - while (iterator != asio::ip::tcp::resolver::iterator()) { - m_socket.close(); - m_socket.connect(*iterator++, _ec); - if (!_ec) { - const std::error_code __ec; - on_connect(__ec); - break; - } - } + if (!timeout) + timeout = 5; + if (data.empty()) { + g_logger.error("Invalid post request for {}, empty data, use get instead", url); + return -1; } - if (_ec) { - onError("HttpSession unable to resolve " + m_url + ": " + ec.message()); - return; + if (!g_ixHttpClient) { + g_ixHttpClient = std::make_shared(true); } - m_timer.cancel(); - m_timer.expires_after(std::chrono::seconds(m_timeout)); - m_timer.async_wait([sft = shared_from_this()](const std::error_code& ec) {sft->onTimeout(ec); }); -} + const int operationId = m_operationId++; + auto result = std::make_shared(); + result->url = url; + result->operationId = operationId; + result->postData = data; -void HttpSession::on_connect(const std::error_code& ec) -{ - if (ec) { - onError("HttpSession unable to connect " + m_url + ": " + ec.message()); - return; + { + std::lock_guard lock(m_mutex); + m_operations[operationId] = result; } - if (instance_uri.port == "443") { - m_ssl.set_verify_mode(asio::ssl::verify_peer); - m_ssl.set_verify_callback([](bool, const asio::ssl::verify_context&) { return true; }); - if (!SSL_set_tlsext_host_name(m_ssl.native_handle(), instance_uri.domain.c_str())) { - const std::error_code _ec{ static_cast(ERR_get_error()), asio::error::get_ssl_category() }; - onError("HttpSession on SSL_set_tlsext_host_name unable to handshake " + m_url + ": " + _ec.message()); - return; - } - - m_ssl.async_handshake(asio::ssl::stream_base::client, - [sft = shared_from_this()](const std::error_code& ec) { - if (ec) { - sft->onError("HttpSession unable to handshake " + sft->m_url + ": " + ec.message()); - return; - } - sft->on_write(); - }); + auto request = g_ixHttpClient->createRequest(url, ix::HttpClient::kPost); + request->connectTimeout = timeout; + if (m_enable_time_out_on_read_write) { + request->transferTimeout = timeout; + } + request->followRedirects = true; + request->body = data; + request->extraHeaders["User-Agent"] = m_userAgent; + request->extraHeaders["Accept"] = "*/*"; + request->extraHeaders["Connection"] = "close"; + if (isJson) { + request->extraHeaders["Content-Type"] = "application/json"; } else { - on_write(); + request->extraHeaders["Content-Type"] = "application/x-www-form-urlencoded"; } - - m_timer.cancel(); - m_timer.expires_after(std::chrono::seconds(m_timeout)); - m_timer.async_wait([sft = shared_from_this()](const std::error_code& ec) {sft->onTimeout(ec); }); -} - -void HttpSession::on_write() -{ - if (instance_uri.port == "443") { - async_write(m_ssl, asio::buffer(m_request), [sft = shared_from_this()] - (const std::error_code& ec, const size_t bytes) { sft->on_request_sent(ec, bytes); }); - } else { - async_write(m_socket, asio::buffer(m_request), [sft = shared_from_this()] - (const std::error_code& ec, const size_t bytes) {sft->on_request_sent(ec, bytes); }); + for (const auto& header : m_custom_header) { + request->extraHeaders[header.first] = header.second; } - m_timer.cancel(); - m_timer.expires_after(std::chrono::seconds(m_timeout)); - m_timer.async_wait([sft = shared_from_this()](const std::error_code& ec) {sft->onTimeout(ec); }); -} + result->request = request; -void HttpSession::on_request_sent(const std::error_code& ec, size_t /*bytes_transferred*/) -{ - if (ec) { - onError("HttpSession error on sending request " + m_url + ": " + ec.message()); - return; - } + request->onProgressCallback = [this, result](int current, int total) { + if (result->finished || result->canceled) + return false; - if (instance_uri.port == "443") { - async_read_until( - m_ssl, m_response, "\r\n\r\n", - [this](const std::error_code& ec, const size_t size) { - if (ec) { - onError("HttpSession error receiving header " + m_url + ": " + ec.message()); - return; - } - std::string header( - buffers_begin(m_response.data()), - buffers_begin(m_response.data()) + size); - m_response.consume(size); - - const size_t pos = header.find("Content-Length: "); - if (pos != std::string::npos) { - const size_t len = std::strtoul( - header.c_str() + pos + sizeof("Content-Length: ") - 1, - nullptr, 10); - m_result->size = len - m_response.size(); + result->progress = computeProgress(current, total); + g_dispatcher.addEvent([result] { + if (!result->finished) { + g_lua.callGlobalField("g_http", "onPostProgress", result->operationId, result->url, result->progress); } + }); + return true; + }; - async_read(m_ssl, m_response, - asio::transfer_at_least(1), - [sft = shared_from_this()]( - const std::error_code& ec, const size_t bytes) { - sft->on_read(ec, bytes); - }); + const auto callback = [this, operationId, result](const ix::HttpResponsePtr& response) { + result->finished = true; + if (response) { + result->status = response->statusCode; + result->response = response->body; + result->size = static_cast(response->body.size()); + result->progress = 100; + } + result->error = describeHttpError(response, result); + + { + std::lock_guard lock(m_mutex); + m_operations.erase(operationId); + } + + g_dispatcher.addEvent([result] { + g_lua.callGlobalField("g_http", "onPost", result->operationId, result->url, result->error, result->response); }); - } else { - async_read_until( - m_socket, m_response, "\r\n\r\n", - [this](const std::error_code& ec, const size_t size) { - if (ec) { - onError("HttpSession error receiving header " + m_url + ": " + ec.message()); - return; - } - std::string header( - buffers_begin(m_response.data()), - buffers_begin(m_response.data()) + size); - m_response.consume(size); - - const size_t pos = header.find("Content-Length: "); - if (pos != std::string::npos) { - const size_t len = std::strtoul( - header.c_str() + pos + sizeof("Content-Length: ") - 1, - nullptr, 10); - m_result->size = len - m_response.size(); - } else if (m_checkContentLength) { - onError("HttpSession error receiving header " + m_url + ": " + "Content-Length not found"); - return; - } + }; - async_read(m_socket, m_response, - asio::transfer_at_least(1), - [sft = shared_from_this()]( - const std::error_code& ec, const size_t bytes) { - sft->on_read(ec, bytes); - }); + if (!g_ixHttpClient->performRequest(request, callback)) { + result->finished = true; + result->error = "http_error::queue"; + { + std::lock_guard lock(m_mutex); + m_operations.erase(operationId); + } + g_dispatcher.addEvent([result] { + g_lua.callGlobalField("g_http", "onPost", result->operationId, result->url, result->error, result->response); }); } - m_timer.cancel(); - m_timer.expires_after(std::chrono::seconds(m_timeout)); - m_timer.async_wait([sft = shared_from_this()](const std::error_code& ec) {sft->onTimeout(ec); }); + return operationId; } -void HttpSession::on_read(const std::error_code& ec, const size_t bytes_transferred) +int Http::download(const std::string& url, const std::string& path, int timeout) { - auto on_done_read = [this] { - m_timer.cancel(); - const auto& data = m_response.data(); - m_result->response.append(buffers_begin(data), buffers_end(data)); - m_result->finished = true; - m_callback(m_result); - }; + if (!timeout) + timeout = 5; - if (ec && ec != asio::error::eof) { - onError("HttpSession unable to on_read " + m_url + ": " + ec.message()); - return; + if (!g_ixHttpClient) { + g_ixHttpClient = std::make_shared(true); } - sum_bytes_response += bytes_transferred; - sum_bytes_speed_response += bytes_transferred; - - if (stdext::millis() > m_last_progress_update) { - m_result->speed = (sum_bytes_speed_response) / ((stdext::millis() - (m_last_progress_update - 100))); + const int operationId = m_operationId++; + auto result = std::make_shared(); + result->url = url; + result->operationId = operationId; - m_result->progress = (static_cast(sum_bytes_response) / m_result->size) * 100; - m_last_progress_update = stdext::millis() + 100; - sum_bytes_speed_response = 0; - m_callback(m_result); + { + std::lock_guard lock(m_mutex); + m_operations[operationId] = result; } + auto request = g_ixHttpClient->createRequest(url, ix::HttpClient::kGet); + request->connectTimeout = timeout; if (m_enable_time_out_on_read_write) { - m_timer.expires_after(std::chrono::seconds(m_timeout)); - m_timer.async_wait([sft = shared_from_this()](const std::error_code& ec) {sft->onTimeout(ec); }); - } else { - m_timer.cancel(); + request->transferTimeout = timeout; + } + request->followRedirects = true; + request->extraHeaders["User-Agent"] = m_userAgent; + for (const auto& header : m_custom_header) { + request->extraHeaders[header.first] = header.second; } - if (instance_uri.port == "443") { - async_read(m_ssl, m_response, - asio::transfer_at_least(1), - [sft = shared_from_this(), on_done_read]( - const std::error_code& ec, const size_t bytes) { - if (bytes > 0) { - sft->on_read(ec, bytes); - } else { - on_done_read(); - } - }); - } else { - async_read(m_socket, m_response, - asio::transfer_at_least(1), - [sft = shared_from_this(), on_done_read]( - const std::error_code& ec, const size_t bytes) { - if (bytes > 0) { - sft->on_read(ec, bytes); - } else { - on_done_read(); + result->request = request; + + const auto lastUpdate = std::make_shared(stdext::millis()); + const auto lastBytes = std::make_shared(0); + + request->onProgressCallback = [this, result, lastUpdate, lastBytes](int current, int total) { + if (result->finished || result->canceled) + return false; + + const ticks_t now = stdext::millis(); + const ticks_t elapsed = now - *lastUpdate; + if (elapsed > 0) { + result->speed = ((current - *lastBytes) * 1000) / elapsed; + *lastUpdate = now; + *lastBytes = current; + } + result->progress = computeProgress(current, total); + + g_dispatcher.addEvent([result] { + if (!result->finished) { + g_lua.callGlobalField("g_http", "onDownloadProgress", result->operationId, result->url, result->progress, result->speed); } }); - } -} + return true; + }; -void HttpSession::close() -{ - m_result->canceled = true; - g_logger.error("HttpSession close"); - if (instance_uri.port == "443") { - m_ssl.async_shutdown( - [sft = shared_from_this()]( - std::error_code ec) { - if (ec == asio::error::eof) { - ec = {}; - } + const auto callback = [this, operationId, result, path](const ix::HttpResponsePtr& response) { + result->finished = true; + if (response) { + result->status = response->statusCode; + result->response = response->body; + result->size = static_cast(response->body.size()); + result->progress = 100; + } + result->error = describeHttpError(response, result); + + const auto checksum = g_crypt.crc32(result->response, false); + + { + std::lock_guard lock(m_mutex); + m_operations.erase(operationId); + } - if (ec) { - sft->onError("shutdown " + sft->m_url + ": " + ec.message()); + g_dispatcher.addEvent([this, result, path, checksum] { + if (result->error.empty()) { + std::string normalizedPath = path; + if (!normalizedPath.empty() && normalizedPath[0] == '/') + normalizedPath = normalizedPath.substr(1); + m_downloads[normalizedPath] = result; } + g_lua.callGlobalField("g_http", "onDownload", result->operationId, result->url, result->error, path, checksum); }); - } else { - std::error_code ec; - m_socket.shutdown(asio::ip::tcp::socket::shutdown_both, ec); + }; - // not_connected happens sometimes so don't bother reporting it. - if (ec && ec != asio::error::not_connected) { - onError("shutdown " + m_url + ": " + ec.message()); + if (!g_ixHttpClient->performRequest(request, callback)) { + result->finished = true; + result->error = "http_error::queue"; + const auto checksum = g_crypt.crc32(result->response, false); + { + std::lock_guard lock(m_mutex); + m_operations.erase(operationId); } + g_dispatcher.addEvent([this, result, path, checksum] { + g_lua.callGlobalField("g_http", "onDownload", result->operationId, result->url, result->error, path, checksum); + }); } -} - -void HttpSession::onTimeout(const std::error_code& ec) -{ - if (!ec) { - onError(fmt::format("HttpSession ontimeout {}", ec.message())); - } -} -void HttpSession::onError(const std::string& ec, const std::string& /*details*/) const -{ - g_logger.error("{}", ec); - m_result->error = fmt::format("{}", ec); - m_result->finished = true; - m_callback(m_result); + return operationId; } -void WebsocketSession::start() +int Http::ws(const std::string& url, int timeout) { - instance_uri = parseURI(m_url); - const asio::ip::tcp::resolver::query query_resolver(instance_uri.domain, instance_uri.port); - - m_request.append("GET " + instance_uri.query + " HTTP/1.1\r\n"); - m_request.append("Host: " + instance_uri.domain + ":" + instance_uri.port + "\r\n"); - m_request.append("Upgrade: websocket\r\n"); - m_request.append("Connection: Upgrade\r\n"); - m_request.append("Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"); - m_request.append("Sec-WebSocket-Version: 13\r\n"); - m_request.append("\r\n"); - - m_resolver.async_resolve( - query_resolver, - [sft = shared_from_this()]( - const std::error_code& ec, asio::ip::tcp::resolver::iterator iterator) { - sft->on_resolve(ec, std::move(iterator)); - }); -} + if (!timeout) + timeout = 5; -void WebsocketSession::on_resolve(const std::error_code& ec, asio::ip::tcp::resolver::iterator iterator) -{ - if (ec) { - onError("WebsocketSession unable to resolve " + m_url + ": " + ec.message()); - return; - } + const int operationId = m_operationId++; + auto result = std::make_shared(); + result->url = url; + result->operationId = operationId; - std::error_code _ec; - if (instance_uri.port == "443") { - while (iterator != asio::ip::tcp::resolver::iterator()) { - m_ssl.lowest_layer().close(); - m_ssl.lowest_layer().connect(*iterator++, _ec); - if (!_ec) { - const std::error_code __ec; - on_connect(__ec); - break; - } - } - } else { - while (iterator != asio::ip::tcp::resolver::iterator()) { - m_socket.close(); - m_socket.connect(*iterator++, _ec); - if (!_ec) { - const std::error_code __ec; - on_connect(__ec); - break; - } - } + { + std::lock_guard lock(m_mutex); + m_operations[operationId] = result; } - if (_ec) { - onError("WebsocketSession unable to resolve " + m_url + ": " + ec.message()); - return; - } + auto websocket = std::make_shared(); + websocket->setUrl(url); + websocket->setHandshakeTimeout(timeout); + websocket->setPingInterval(10); + websocket->disableAutomaticReconnection(); - m_timer.cancel(); - m_timer.expires_after(std::chrono::seconds(m_timeout)); - m_timer.async_wait([sft = shared_from_this()](const std::error_code& ec) {sft->onTimeout(ec); }); -} + ix::WebSocketHttpHeaders headers; + headers["User-Agent"] = m_userAgent; + copyHeaders(m_custom_header, headers); + websocket->setExtraHeaders(headers); -void WebsocketSession::on_connect(const std::error_code& ec) -{ - if (ec) { - onError("WebsocketSession unable to on_connect " + m_url + ": " + ec.message()); - return; - } + websocket->setOnMessageCallback([this, operationId, result](const ix::WebSocketMessagePtr& msg) { + if (!msg) + return; - if (instance_uri.port == "443") { - std::error_code _ec; - m_ssl.handshake(asio::ssl::stream_base::client, _ec); - if (_ec) { - onError("WebsocketSession unable to handshake " + m_url + ": " + _ec.message()); + if (msg->type == ix::WebSocketMessageType::Open) { + result->connected = true; + g_dispatcher.addEvent([result] { + g_lua.callGlobalField("g_http", "onWsOpen", result->operationId, "code::websocket_open"); + }); return; } - async_write( - m_ssl, asio::buffer(m_request), - [sft = shared_from_this()]( - const std::error_code& ec, const size_t bytes) { - sft->on_request_sent(ec, bytes); - }); - } else { - async_write( - m_socket, asio::buffer(m_request), - [sft = shared_from_this()]( - const std::error_code& ec, const size_t bytes) { - sft->on_request_sent(ec, bytes); - }); - } - m_timer.cancel(); - m_timer.expires_after(std::chrono::seconds(m_timeout)); - m_timer.async_wait([sft = shared_from_this()](const std::error_code& ec) {sft->onTimeout(ec); }); -} + if (msg->type == ix::WebSocketMessageType::Message) { + const std::string payload = msg->str; + g_dispatcher.addEvent([result, payload] { + g_lua.callGlobalField("g_http", "onWsMessage", result->operationId, payload); + }); + return; + } -void WebsocketSession::on_request_sent(const std::error_code& ec, size_t /*bytes_transferred*/) -{ - if (ec) { - onError("WebsocketSession error on sending request " + m_url + ": " + ec.message()); - return; - } + if (msg->type == ix::WebSocketMessageType::Error) { + result->error = msg->errorInfo.reason; + const std::string errorReason = fmt::format("close_code::error {}", result->error); + g_dispatcher.addEvent([result, errorReason] { + g_lua.callGlobalField("g_http", "onWsError", result->operationId, errorReason); + }); + } - if (instance_uri.port == "443") { - async_read_until( - m_ssl, m_response, "\r\n\r\n", - [this](const std::error_code& ec, const size_t size) { - if (ec) { - onError("WebsocketSession error receiving header " + m_url + ": " + ec.message()); - return; - } - std::string header( - buffers_begin(m_response.data()), - buffers_begin(m_response.data()) + size); - m_response.consume(size); - - //TODO: Local variable 'websocket_accept' is only assigned but never accessed - /*size_t pos = header.find("Sec-WebSocket-Accept: "); - std::string websocket_accept; - if (pos != std::string::npos) { - websocket_accept = header.c_str() + pos + sizeof("Sec-WebSocket-Accept: ") - 1; - }*/ - - if (!m_closed && m_ssl.lowest_layer().is_open()) { - m_handshake_complete = true; - m_callback(WebsocketCallbackType::OPEN, "code::websocket_open"); - m_timer.cancel(); - - while (!m_pending_messages.empty() && !m_closed) { - auto msg = m_pending_messages.front(); - m_pending_messages.pop(); - send(msg.first, msg.second); - } - - async_read(m_ssl, m_response, - asio::transfer_at_least(1), - [sft = shared_from_this()]( - const std::error_code& ec, const size_t bytes) { - sft->on_read(ec, bytes); - }); - } - }); - } else { - async_read_until( - m_socket, m_response, "\r\n\r\n", - [this](const std::error_code& ec, const size_t size) { - if (ec) { - onError("WebsocketSession error receiving header " + m_url + ": " + ec.message()); - return; - } - std::string header( - buffers_begin(m_response.data()), - buffers_begin(m_response.data()) + size); - m_response.consume(size); - - //TODO: Local variable 'websocket_accept' is only assigned but never accessed - /*size_t pos = header.find("Sec-WebSocket-Accept: "); - std::string websocket_accept; - if (pos != std::string::npos) { - websocket_accept = header.c_str() + pos + sizeof("Sec-WebSocket-Accept: ") - 1; - }*/ - - if (!m_closed && m_socket.is_open()) { - m_handshake_complete = true; - m_callback(WebsocketCallbackType::OPEN, "code::websocket_open"); - m_timer.cancel(); - - while (!m_pending_messages.empty() && !m_closed) { - auto msg = m_pending_messages.front(); - m_pending_messages.pop(); - send(msg.first, msg.second); - } - - async_read(m_socket, m_response, - asio::transfer_at_least(1), - [sft = shared_from_this()]( - const std::error_code& ec, const size_t bytes) { - sft->on_read(ec, bytes); - }); + if (msg->type == ix::WebSocketMessageType::Close) { + const std::string closeMessage = "close_code::normal"; + { + std::lock_guard lock(m_mutex); + m_websockets.erase(operationId); + m_operations.erase(operationId); } - }); - } -} - -void WebsocketSession::on_write(const std::error_code& ec, size_t /*bytes_transferred*/) -{ - if (ec) { - onError("WebsocketSession unable to on_write " + m_url + ": " + ec.message()); - return; - } + g_dispatcher.addEvent([result, closeMessage] { + g_lua.callGlobalField("g_http", "onWsClose", result->operationId, closeMessage); + }); + } + }); - if (m_enable_time_out_on_read_write) { - m_timer.expires_after(std::chrono::seconds(m_timeout)); - m_timer.async_wait([sft = shared_from_this()](const std::error_code& ec) {sft->onTimeout(ec); }); - } else { - m_timer.cancel(); - } + websocket->start(); - if (!m_sendQueue.empty()) { - m_sendQueue.pop(); + { + std::lock_guard lock(m_mutex); + m_websockets[operationId] = websocket; } - if (instance_uri.port == "443") { - if (!m_sendQueue.empty()) - async_write(m_ssl, asio::buffer(m_sendQueue.front()), [sft = shared_from_this()](const std::error_code& ec, const size_t bytes) { - sft->on_write(ec, bytes); - }); - } else { - if (!m_sendQueue.empty()) - async_write( - m_socket, asio::buffer(m_sendQueue.front()), - [sft = shared_from_this()]( - const std::error_code& ec, const size_t bytes) { - sft->on_write(ec, bytes); - }); - } + return operationId; } -void WebsocketSession::on_read(const std::error_code& ec, const size_t bytes_transferred) +bool Http::wsSend(int operationId, const std::string& message) { - if (ec && ec != asio::error::eof) { - if (ec != asio::error::operation_aborted && ec != asio::error::not_connected) { - onError("WebsocketSession unable to on_read " + m_url + ": " + ec.message()); + std::shared_ptr websocket; + { + std::lock_guard lock(m_mutex); + const auto it = m_websockets.find(operationId); + if (it != m_websockets.end()) { + websocket = it->second; } - return; } - if (m_closed || !m_handshake_complete) { - return; - } + if (!websocket) + return false; - stdext::millisleep(100); - if (bytes_transferred > 0) { - const auto& data = m_response.data(); - std::string received_data(buffers_begin(data), buffers_begin(data) + bytes_transferred); - m_read_buffer.append(received_data); - m_response.consume(bytes_transferred); - - while (m_read_buffer.size() >= 2) { - size_t frame_start = 0; - - const uint8_t fin_opcode = static_cast(m_read_buffer[frame_start]); - const uint8_t mask_len = static_cast(m_read_buffer[frame_start + 1]); - - const uint8_t opcode = fin_opcode & 0x0F; - const bool masked = (mask_len & 0x80) != 0; - uint64_t payload_length = mask_len & 0x7F; - - size_t header_size = 2; - - if (payload_length == 126) { - if (m_read_buffer.size() < frame_start + 4) break; - payload_length = (static_cast(m_read_buffer[frame_start + 2]) << 8) | - static_cast(m_read_buffer[frame_start + 3]); - header_size += 2; - } else if (payload_length == 127) { - if (m_read_buffer.size() < frame_start + 10) break; - payload_length = 0; - for (int i = 0; i < 8; i++) { - payload_length = (payload_length << 8) | static_cast(m_read_buffer[frame_start + 2 + i]); - } - header_size += 8; - } + websocket->send(message); + return true; +} - if (masked) { - header_size += 4; - } - if (m_read_buffer.size() < frame_start + header_size + payload_length) { - break; - } - std::string payload; - if (payload_length > 0) { - size_t payload_start = frame_start + header_size; - - if (masked) { - std::array mask; - for (int i = 0; i < 4; i++) { - mask[i] = static_cast(m_read_buffer[frame_start + header_size - 4 + i]); - } - - payload.reserve(payload_length); - for (uint64_t i = 0; i < payload_length; i++) { - uint8_t masked_byte = static_cast(m_read_buffer[payload_start + i]); - payload.push_back(static_cast(masked_byte ^ mask[i % 4])); - } - } else { - payload = m_read_buffer.substr(payload_start, payload_length); - } - } +bool Http::wsClose(int operationId) +{ + cancel(operationId); + return true; +} - if (opcode == 0x8) { - close(); - return; - } else if (opcode == 0x9) { - send(payload, 0x8A); - } else if (opcode == 0xA) { - // - } else if (opcode == 0x1 || opcode == 0x2 || opcode == 0x0) { - if (!payload.empty()) { - m_callback(WebsocketCallbackType::MESSAGE, payload); - } - } - m_read_buffer.erase(0, frame_start + header_size + payload_length); +bool Http::cancel(int id) +{ + std::shared_ptr websocket; + HttpResult_ptr result; + + { + std::lock_guard lock(m_mutex); + const auto wit = m_websockets.find(id); + if (wit != m_websockets.end()) { + websocket = wit->second; } - } - if (!m_closed) { - if (instance_uri.port == "443") { - if (m_ssl.lowest_layer().is_open()) { - async_read(m_ssl, m_response, - asio::transfer_at_least(1), - [sft = shared_from_this()]( - const std::error_code& ec, const size_t bytes) { - sft->on_read(ec, bytes); - }); - } - } else { - if (m_socket.is_open()) { - async_read(m_socket, m_response, - asio::transfer_at_least(1), - [sft = shared_from_this()]( - const std::error_code& ec, const size_t bytes) { - sft->on_read(ec, bytes); - }); - } + const auto it = m_operations.find(id); + if (it != m_operations.end()) { + result = it->second; } } -} -void WebsocketSession::on_close(const std::error_code& ec) -{ - if (!ec) { - onError("WebsocketSession unable to on_close " + m_url + ": " + ec.message()); - return; + if (websocket) { + websocket->close(); } - m_closed = true; - m_callback(WebsocketCallbackType::CLOSE, "close_code::normal"); -} - -void WebsocketSession::onError(const std::string& ec, const std::string& /*details*/) -{ - g_logger.error("WebsocketSession error {}", ec); - m_closed = true; - m_callback(WebsocketCallbackType::ERROR_, "close_code::error " + ec); -} -void WebsocketSession::onTimeout(const std::error_code& ec) -{ - if (!ec) { - g_logger.error("WebsocketSession ontimeout {}", ec.message()); - m_closed = true; - m_callback(WebsocketCallbackType::ERROR_, "close_code::ontimeout " + ec.message()); - close(); + if (result && !result->canceled) { + result->canceled = true; + if (result->request) { + result->request->cancel = true; + } } + + return true; } -void WebsocketSession::send(const std::string& data, const uint8_t ws_opcode) +std::string Http::describeHttpError(const ix::HttpResponsePtr& response, const HttpResult_ptr& result) { - if (!m_handshake_complete || m_closed) { - if (!m_closed) { - m_pending_messages.emplace(data, ws_opcode); - } - return; - } - bool socket_open = false; - if (instance_uri.port == "443") { - socket_open = m_ssl.lowest_layer().is_open(); - } else { - socket_open = m_socket.is_open(); + if (!response) { + return "http_error::no_response"; } - if (!socket_open) { - onError("WebsocketSession attempted to send on closed socket " + m_url); - return; + if (response->errorCode == ix::HttpErrorCode::Cancelled || (result && result->canceled)) { + return "canceled"; } - std::vector ws_frame; - std::array mask; - std::uniform_int_distribution dist(0, 255); - std::random_device rd; - for (auto c = 0; c < 4; c++) - mask[c] = static_cast(dist(rd)); - - const size_t length = data.size(); - - if (ws_opcode == 0) { - /* - 0x81 in binary format is 1000 0001 - 1... .... = Fin: True - .000 .... = Reserved: 0x0 - .... 0001 = Opcode: Text (1) - */ - ws_frame.push_back(0x81); - - /* - 0x82 in binary format is 1000 0010 - 1... .... = Fin: True - .000 .... = Reserved: 0x0 - .... 0010 = Opcode: Binary (1) - - ws_frame.push_back(0x82); - */ - } else { - ws_frame.push_back(ws_opcode); + if (response->errorCode != ix::HttpErrorCode::Ok) { + if (!response->errorMsg.empty()) { + return response->errorMsg; + } + return fmt::format("http_error_code::{}", static_cast(response->errorCode)); } - /* - ... size < 126 ... - 128 in binary is 1000 0000 the first bit represent a mask. - now the other 7 bits represent a size payload - - 1000 0000 - +1000 0011 - =1000 0011 - - ... - 1... .... = Mask: True - .000 0011 = Payload length: 3 - ... - - ... size < 65535 ... - 128 -> 1000 0000 - 126 -> +0111 1110 - 254 -> =1111 1110 - - 1... .... = Mask: True - .111 1110 = Payload length: 126 Extended Payload Length (16 bits) - Extended Payload length (16 bits): 276 - */ - if (length < 126) { - /* - 7 bit length, 1 bit is to mask - ... - 1... .... = Mask: True - .000 0011 = Payload length: 3 - ... - */ - ws_frame.push_back(length + 128); - } else { - size_t num_bytes; - if (length < 65535) { // 16 bit length - /* - 7 bit length, 1 bit is to mask - 1111 1110 == 254 - ... - 1... .... = Mask: True - .111 1110 = Payload length: 126 Extended Payload Length (16 bits) - Extended Payload length (16 bits): 276 - ... - */ - num_bytes = 2; - ws_frame.push_back(126 + 128); - } else { // 64 bit length - /* - 7 bit length, 1 bit is to mask - 1111 1111 == 255 - ... - 1... .... = Mask: True - .111 1111 = Payload length: 127 Extended Payload Length (64 bits) - Extended Payload length (64 bits): 273299 - ... - */ - num_bytes = 8; - ws_frame.push_back(127 + 128); + if (response->statusCode < 200 || response->statusCode > 299) { + if (!response->errorMsg.empty()) { + return fmt::format("http_status::{} {}", response->statusCode, response->errorMsg); } - - for (auto c = num_bytes - 1; c != static_cast(-1); c--) - ws_frame.push_back((static_cast(length) >> (8 * c)) % 256); + return fmt::format("http_status::{}", response->statusCode); } - // add mask, the size of mask is 32bits - for (auto c = 0; c < 4; c++) - ws_frame.push_back(static_cast(mask[c])); - - // the payload use a mask with xor - for (size_t c = 0; c < length; c++) - ws_frame.push_back(data.at(c) ^ mask[c % 4]); - - m_sendQueue.emplace(ws_frame.begin(), ws_frame.end()); - - if (m_sendQueue.size() > 1) - return; + return std::string(); +} - if (instance_uri.port == "443") { - async_write( - m_ssl, asio::buffer(m_sendQueue.front()), - [sft = shared_from_this()]( - const std::error_code& ec, const size_t bytes) { - sft->on_write(ec, bytes); - }); - } else { - async_write( - m_socket, asio::buffer(m_sendQueue.front()), - [sft = shared_from_this()]( - const std::error_code& ec, const size_t bytes) { - sft->on_write(ec, bytes); - }); +int Http::computeProgress(const int current, const int total) +{ + if (total <= 0) { + return 0; } + const double value = (static_cast(current) / static_cast(total)) * 100.0; + return std::clamp(static_cast(value), 0, 100); } -void WebsocketSession::close() +void Http::copyHeaders(const std::unordered_map& source, ix::WebSocketHttpHeaders& target) { - if (!m_closed) { - m_closed = true; - bool socket_open = false; - if (instance_uri.port == "443") { - socket_open = m_ssl.lowest_layer().is_open(); - } else { - socket_open = m_socket.is_open(); - } - - if (socket_open && m_handshake_complete) { - /* - 0x88 in binary format is 1000 1000 - 1... .... = Fin: True - .000 .... = Reserved: 0x0 - .... 1000 = Opcode: Connection Close (8) - */ - try { - send("", 0x88); - } catch (...) { - // - } - } - - if (instance_uri.port == "443") { - try { - if (m_ssl.lowest_layer().is_open()) { - std::error_code ec; - m_ssl.lowest_layer().close(ec); - m_ssl.async_shutdown( - [sft = shared_from_this()]( - std::error_code ec) { - if (ec == asio::error::eof) { - ec = {}; - } - if (ec && ec != asio::error::not_connected && ec != asio::error::operation_aborted) { - sft->onError("shutdown " + sft->m_url + ": " + ec.message()); - } - }); - } - } catch (...) { - // - } - } else { - try { - if (m_socket.is_open()) { - std::error_code ec; - m_socket.shutdown(asio::ip::tcp::socket::shutdown_both, ec); - m_socket.close(ec); - if (ec && ec != asio::error::not_connected && ec != asio::error::operation_aborted) { - onError("shutdown " + m_url + ": " + ec.message()); - } - } - } catch (...) { - // - } - } + for (const auto& header : source) { + target[header.first] = header.second; } -} \ No newline at end of file +} diff --git a/src/framework/net/protocolhttp.h b/src/framework/net/protocolhttp.h index 075f857275..d97ecf836c 100644 --- a/src/framework/net/protocolhttp.h +++ b/src/framework/net/protocolhttp.h @@ -25,16 +25,14 @@ #include #include -#include +#include +#include -#include -#include +#include +#include #include - // result -class HttpSession; - struct HttpResult { std::string url; @@ -50,151 +48,14 @@ struct HttpResult std::string postData; std::string response; std::string error; - std::weak_ptr session; + std::shared_ptr request; }; using HttpResult_ptr = std::shared_ptr; -using HttpResult_cb = std::function; - -// session - -class HttpSession : public std::enable_shared_from_this -{ -public: - - HttpSession(asio::io_service& service, std::string url, std::string agent, - const bool& enable_time_out_on_read_write, - const std::unordered_map& custom_header, - const int timeout, const bool isJson, const bool checkContentLength, HttpResult_ptr result, HttpResult_cb callback) : - m_service(service), - m_url(std::move(url)), - m_agent(std::move(agent)), - m_enable_time_out_on_read_write(enable_time_out_on_read_write), - m_custom_header(custom_header), - m_timeout(timeout), - m_isJson(isJson), - m_checkContentLength(checkContentLength), - m_result(std::move(result)), - m_callback(std::move(callback)), - m_socket(service), - m_resolver(service), - m_timer(service) - { - assert(m_callback != nullptr); - assert(m_result != nullptr); - }; - void start(); - void cancel() const { onError("canceled"); } - void close(); - -private: - asio::io_service& m_service; - std::string m_url; - std::string m_agent; - bool m_enable_time_out_on_read_write; - std::unordered_map m_custom_header; - int m_timeout; - bool m_isJson; - bool m_checkContentLength; - HttpResult_ptr m_result; - HttpResult_cb m_callback; - asio::ip::tcp::socket m_socket; - asio::ip::tcp::resolver m_resolver; - asio::steady_timer m_timer; - ParsedURI instance_uri; - - asio::ssl::context m_context{ asio::ssl::context::tlsv12_client }; - asio::ssl::stream m_ssl{ m_service, m_context }; - - std::string m_request; - asio::streambuf m_response; - int sum_bytes_response = 0; - int sum_bytes_speed_response = 0; - ticks_t m_last_progress_update = stdext::millis(); - - void on_resolve(const std::error_code& ec, asio::ip::tcp::resolver::iterator iterator); - void on_connect(const std::error_code& ec); - - void on_request_sent(const std::error_code& ec, size_t bytes_transferred); - - void on_write(); - void on_read(const std::error_code& ec, size_t bytes_transferred); - - void onTimeout(const std::error_code& ec); - void onError(const std::string& ec, const std::string& details = "") const; -}; - -// web socket -enum class WebsocketCallbackType { OPEN, MESSAGE, ERROR_, CLOSE }; -using WebsocketSession_cb = std::function; - -class WebsocketSession : public std::enable_shared_from_this -{ -public: - - WebsocketSession(asio::io_service& service, std::string url, std::string agent, - const bool& enable_time_out_on_read_write, const int timeout, HttpResult_ptr result, WebsocketSession_cb callback) : - m_service(service), - m_url(std::move(url)), - m_agent(std::move(agent)), - m_enable_time_out_on_read_write(enable_time_out_on_read_write), - m_timeout(timeout), - m_result(std::move(result)), - m_callback(std::move(callback)), - m_timer(service), - m_socket(service), - m_resolver(service) - { - assert(m_callback != nullptr); - assert(m_result != nullptr); - }; - - void start(); - void send(const std::string& data, uint8_t ws_opcode = 0); - void close(); - -private: - asio::io_service& m_service; - std::string m_url; - std::string m_agent; - std::string m_read_buffer; - std::queue> m_pending_messages; - bool m_enable_time_out_on_read_write; - int m_timeout; - HttpResult_ptr m_result; - WebsocketSession_cb m_callback; - asio::steady_timer m_timer; - asio::ip::tcp::socket m_socket; - asio::ip::tcp::resolver m_resolver; - bool m_closed{ false }; - bool m_handshake_complete{ false }; - ParsedURI instance_uri; - - asio::ssl::context m_context{ asio::ssl::context::tlsv12_client }; - asio::ssl::stream m_ssl{ m_service, m_context }; - - std::queue m_sendQueue; - - std::string m_request; - asio::streambuf m_response; - - void on_resolve(const std::error_code& ec, asio::ip::tcp::resolver::iterator iterator); - void on_connect(const std::error_code& ec); - void on_request_sent(const std::error_code& ec, size_t bytes_transferred); - - void on_write(const std::error_code& ec, size_t bytes_transferred); - void on_read(const std::error_code& ec, size_t bytes_transferred); - - void on_close(const std::error_code& ec); - void onTimeout(const std::error_code& ec); - void onError(const std::string& ec, const std::string& details = ""); -}; class Http { public: - Http() : m_guard(make_work_guard(m_ios)) {} - void init(); void terminate(); @@ -227,17 +88,20 @@ class Http void setEnableTimeOutOnReadWrite(const bool enable_time_out_on_read_write) { m_enable_time_out_on_read_write = enable_time_out_on_read_write; } private: + std::string describeHttpError(const ix::HttpResponsePtr& response, const HttpResult_ptr& result); + int computeProgress(const int current, const int total); + void copyHeaders(const std::unordered_map& source, ix::WebSocketHttpHeaders& target); + bool m_working = false; bool m_enable_time_out_on_read_write = false; int m_operationId = 1; - std::thread m_thread; - asio::io_context m_ios{}; - asio::executor_work_guard m_guard; std::unordered_map m_operations; - std::unordered_map> m_websockets; + std::unordered_map> m_websockets; std::unordered_map m_downloads; std::string m_userAgent = "Mozilla/5.0"; std::unordered_map m_custom_header; + std::mutex m_mutex; }; extern Http g_http; +extern std::shared_ptr g_ixHttpClient; diff --git a/src/framework/pch.h b/src/framework/pch.h index cc6cfd42a6..598c0794a6 100644 --- a/src/framework/pch.h +++ b/src/framework/pch.h @@ -52,6 +52,11 @@ #include #include +#include +#include +#include +#include + // FMT #include #include diff --git a/vcpkg.json b/vcpkg.json index d50f22ec79..b594eef902 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -21,6 +21,7 @@ "zlib", "bshoshany-thread-pool", "fmt", + "ixwebsocket", { "name": "luajit", "platform": "!android & !wasm32"