diff --git a/samples/fbuffer/BufferTask.cpp b/samples/fbuffer/BufferTask.cpp index f3d237e..48071d0 100644 --- a/samples/fbuffer/BufferTask.cpp +++ b/samples/fbuffer/BufferTask.cpp @@ -73,6 +73,7 @@ bool BufferTask::ParseLine(rapidjson::Value &line) { if (i == -1) { if (not value.IsString()) { DARWIN_LOG_ERROR("BufferTask::ParseLine:: the source given must be a string."); + return false; } this->_input_line["source"] = value.GetString(); i++; diff --git a/samples/fbuffer/BufferThread.cpp b/samples/fbuffer/BufferThread.cpp index 266552a..59e2c09 100644 --- a/samples/fbuffer/BufferThread.cpp +++ b/samples/fbuffer/BufferThread.cpp @@ -31,8 +31,7 @@ bool BufferThread::Main() { if (len >= 0 && len < this->_connector->GetRequiredLogLength()){ DARWIN_LOG_DEBUG("BufferThread::Main:: Not enough log in Redis, wait for more"); - continue; - } else if (len<0 || !this->_connector->REDISPopLogs(len, logs, redis_list)) { + } else if (len < 0 || !this->_connector->REDISPopLogs(len, logs, redis_list)) { DARWIN_LOG_ERROR("BufferThread::Main:: Error when querying Redis on list: " + redis_list + " for source: '" + redis_config.first + "'"); continue; } else { @@ -43,6 +42,12 @@ bool BufferThread::Main() { DARWIN_LOG_DEBUG("BufferThread::Main:: Removed " + std::to_string(logs.size()) + " elements from redis"); } } + + // Set an expiration on the redis key, to purge if threads/filters are stopped or configuration is modified + // The expiration MUST be over the interval period + if (not this->_connector->REDISSetExpiry(redis_list, this->_interval + 60)) { + DARWIN_LOG_WARNING("BufferThread::Main:: Could not set an expiration on key '" + redis_list + "'"); + } } return true; diff --git a/samples/fbuffer/Connectors/AConnector.cpp b/samples/fbuffer/Connectors/AConnector.cpp index a4c7614..447c649 100644 --- a/samples/fbuffer/Connectors/AConnector.cpp +++ b/samples/fbuffer/Connectors/AConnector.cpp @@ -37,16 +37,16 @@ std::vector> AConnector::GetRedisLists() con return this->_redis_lists; } -bool AConnector::ParseData(std::string fieldname) { +bool AConnector::ParseData(std::map &input_line, std::string fieldname, std::string &entry) { DARWIN_LOGGER; - if (this->_input_line.find(fieldname) == this->_input_line.end()) { + if (input_line.find(fieldname) == input_line.end()) { DARWIN_LOG_ERROR("AConnector::ParseData '" + fieldname + "' is missing in the input line. Output ignored."); return false; } - if (not this->_entry.empty()) { - this->_entry += ";"; + if (not entry.empty()) { + entry += ";"; } - this->_entry += this->_input_line[fieldname]; + entry += input_line[fieldname]; return true; } @@ -87,7 +87,7 @@ bool AConnector::PrepareKeysInRedis(){ return ret; } - + bool AConnector::REDISAddEntry(const std::string &entry, const std::string &list_name) { DARWIN_LOGGER; DARWIN_LOG_DEBUG("AConnector::REDISAddEntry:: Add data in Redis..."); @@ -143,6 +143,21 @@ long long int AConnector::REDISListLen(const std::string &list_name) noexcept { return result; } +bool AConnector::REDISSetExpiry(const std::string &key, unsigned int expiry) { + DARWIN_LOGGER; + DARWIN_LOG_DEBUG("AConnector::REDISSetExpiry:: reseting expiration for key " + key); + + long long int result = 0; + + darwin::toolkit::RedisManager& redis = darwin::toolkit::RedisManager::GetInstance(); + + if(redis.Query(std::vector{"EXPIRE", key, std::to_string(expiry)}, result, true) != REDIS_REPLY_INTEGER) { + DARWIN_LOG_ERROR("AConnector::REDISSetExpiry:: not the expected Redis response"); + return false; + } + return result == 1; +} + bool AConnector::REDISPopLogs(long long int len, std::vector &logs, const std::string &list_name) noexcept { DARWIN_LOGGER; DARWIN_LOG_DEBUG("AConnector::REDISPopLogs:: Querying Redis for logs..."); @@ -247,7 +262,7 @@ bool AConnector::SendToFilter(std::vector &logs) { packet->certitude_size = certitude_size; packet->filter_code = GetFilterCode(); packet->body_size = data.size(); - + std::vector uuid = darwin::uuid::GenUuid(); memcpy(packet->evt_id, uuid.data(), 16); DARWIN_LOG_DEBUG("AConnector::SendToFilter:: Sending header + data"); @@ -267,11 +282,11 @@ long AConnector::GetFilterCode() noexcept { return DARWIN_FILTER_BUFFER; } -std::string AConnector::GetSource() { +std::string AConnector::GetSource(std::map &input_line) { DARWIN_LOGGER; - if (this->_input_line.find("source") == this->_input_line.end()) { + if (input_line.find("source") == input_line.end()) { DARWIN_LOG_ERROR("AConnector::GetSource:: 'source' is missing in the input line. Output ignored."); return std::string(); } - return this->_input_line["source"]; + return input_line["source"]; } diff --git a/samples/fbuffer/Connectors/AConnector.hpp b/samples/fbuffer/Connectors/AConnector.hpp index df42ce1..8b718b4 100644 --- a/samples/fbuffer/Connectors/AConnector.hpp +++ b/samples/fbuffer/Connectors/AConnector.hpp @@ -26,7 +26,7 @@ class AConnector { /// ///\class AConnector - public: +public: ///\brief Unique constructor. It contains all stuff needed to ensure REDIS and output Filter communication /// ///\param io_context The boost::asio::io_context used by the Server. Needed for communication with output Filter. @@ -45,7 +45,6 @@ class AConnector { ///\brief Virtual default constructor virtual ~AConnector() = default; - public: ///\brief Get the interval set in the connector. /// ///\return this->_interval @@ -79,9 +78,17 @@ class AConnector { ///\return true on success, false otherwise. virtual bool REDISAddEntry(const std::string &entry, const std::string &list_name); + ///\brief (Re)set the expiration on a Redis key + /// + /// \param key The name of the key to set the expiry on + /// \param expiry The value to set as expiration for the key + /// + /// \return true if expiry was set successfuly, false if the key did not exist or the command failed + bool REDISSetExpiry(const std::string &key, unsigned int expiry); + ///\brief Get the logs from the Redis List /// - /// \param len THe number of elements to pick up in the list + /// \param len The number of elements to pick up in the list /// \param logs the vector used to store our logs /// /// \return true on success, false otherwise. @@ -99,12 +106,14 @@ class AConnector { ///\return true on success, false otherwise. virtual bool REDISReinsertLogs(std::vector &logs, const std::string &list_name); - ///\brief This function extracts from _input_line some data, format it and add it to _entry. + ///\brief This function extracts from input_line some data, format it and add it to entry. /// - ///\param fieldname The name of the data to retrieve + ///\param input_line The map of each entry name with its value + ///\param fieldname The name of the entry to retrieve + ///\param entry The variable to store the resulting value to /// ///\return true on success, false otherwise. - bool ParseData(std::string fieldname); + bool ParseData(std::map &input_line, std::string fieldname, std::string &entry); ///\brief Sets the connection with the output Filter and sends logs to it. /// @@ -113,7 +122,11 @@ class AConnector { ///\return true on success, false otherwise. bool SendToFilter(std::vector &logs); - public: // Functions that needs to be implemented by children + + // ########################################################## + // ### Functions that needs to be implemented by children ### + // ########################################################## + ///\brief This function sends data to the REDIS storage. It must be overrode as each filter doesn't need the same data. /// /// It should fill _entry with the datas to send as REDISAddEntry is picking from it. @@ -123,13 +136,13 @@ class AConnector { ///\return true on success, false otherwise. virtual bool ParseInputForRedis(std::map &input_line) = 0; - private: +private: ///\brief Get the Buffer filter code /// ///\return the Buffer filter code long GetFilterCode() noexcept; - protected: +protected: ///\brief this virtual function "jsonifies" the vector of strings into a single string. /// By default it performs no other actions on the data but CAN be overrode if needed. (e.g fAnomalyConnector) /// @@ -139,10 +152,12 @@ class AConnector { ///\return True on success (formatting successful), False otherwise. virtual bool FormatDataToSendToFilter(std::vector &logs, std::string &formatted); - ///\brief Extracts the source of this->_input_line input + ///\brief Extracts the "source" entry from input_line + /// + ///\param input_line The map containing the entries' name and value /// ///\return The source of the current input line - std::string GetSource(); + std::string GetSource(std::map &input_line); // Used to link with the correct task darwin::outputType _filter_type; @@ -162,12 +177,6 @@ class AConnector { // The different REDIS lists used to store data depending to source before sending to filter std::vector> _redis_lists; - // Temporarily used to between formating and adding data to REDIS - std::string _entry; - - // Temporarily used to store an input in a form that the connectors can pick what they need - std::map _input_line; - // The number of log lines in REDIS needed to send to the output Filter unsigned int _required_log_lines; }; \ No newline at end of file diff --git a/samples/fbuffer/Connectors/SumConnector.cpp b/samples/fbuffer/Connectors/SumConnector.cpp index 3bfe7c7..f767f97 100644 --- a/samples/fbuffer/Connectors/SumConnector.cpp +++ b/samples/fbuffer/Connectors/SumConnector.cpp @@ -17,18 +17,17 @@ SumConnector::SumConnector(boost::asio::io_context &context, std::string &filter bool SumConnector::ParseInputForRedis(std::map &input_line) { - this->_input_line = input_line; - this->_entry.clear(); + std::string entry; - std::string source = this->GetSource(); + std::string source = this->GetSource(input_line); - if (not this->ParseData("decimal")) + if (not this->ParseData(input_line, "decimal", entry)) return false; for (const auto &redis_config : this->_redis_lists) { // If the source in the input is equal to the source in the redis list, or the redis list's source is empty if (not redis_config.first.compare(source) or redis_config.first.empty()) - this->REDISAddEntry(this->_entry, redis_config.second); + this->REDISAddEntry(entry, redis_config.second); } return true; } @@ -115,20 +114,21 @@ bool SumConnector::REDISPopLogs(long long int len __attribute__((unused)), std:: redis_reply = redis.Query(std::vector{"GETSET", sum_name, "0"}, result, true); if (redis_reply == REDIS_REPLY_NIL) { - DARWIN_LOG_INFO("SumConnector:: REDISPopLogs:: key '" + sum_name + "' does not exist (yet?)"); - return false; + DARWIN_LOG_DEBUG("SumConnector:: REDISPopLogs:: key '" + sum_name + "' did not exist"); + result_string = "0"; } else if(redis_reply != REDIS_REPLY_STRING) { DARWIN_LOG_ERROR("SumConnector::REDISPopLogs:: Not the expected Redis response"); return false; } - - try { - result_string = std::any_cast(result); - } - catch (const std::bad_any_cast&) { - DARWIN_LOG_ERROR("SumConnector:REDISPopLogs:: Impossible to cast redis response into a string."); - return false; + else { + try { + result_string = std::any_cast(result); + } + catch (const std::bad_any_cast&) { + DARWIN_LOG_ERROR("SumConnector:REDISPopLogs:: Impossible to cast redis response into a string."); + return false; + } } DARWIN_LOG_DEBUG("SumConnector::REDISPopLogs:: Got '" + result_string + "' from Redis"); @@ -151,7 +151,7 @@ long long int SumConnector::REDISListLen(const std::string &sum_name) noexcept { redis_reply = redis.Query(std::vector{"GET", sum_name}, result_string, true); if (redis_reply == REDIS_REPLY_NIL) { - DARWIN_LOG_INFO("SumConnector::REDISListLen:: key '" + sum_name + "' does not exist (yet?)"); + DARWIN_LOG_DEBUG("SumConnector::REDISListLen:: key '" + sum_name + "' does not exist (yet?)"); } else if (redis_reply == REDIS_REPLY_STRING) { result = strtold(result_string.c_str(), NULL); @@ -163,7 +163,7 @@ long long int SumConnector::REDISListLen(const std::string &sum_name) noexcept { } else { DARWIN_LOG_ERROR("SumConnector::REDISListLen:: Error while querying key '" + sum_name + "'"); - result = 0.0L; + return -1; } // Return an absolute rounded value for the double, cap with the maximum value of a long long int diff --git a/samples/fbuffer/Connectors/fAnomalyConnector.cpp b/samples/fbuffer/Connectors/fAnomalyConnector.cpp index f248fe3..b1295cb 100644 --- a/samples/fbuffer/Connectors/fAnomalyConnector.cpp +++ b/samples/fbuffer/Connectors/fAnomalyConnector.cpp @@ -43,24 +43,23 @@ bool fAnomalyConnector::FormatDataToSendToFilter(std::vector &logs, } bool fAnomalyConnector::ParseInputForRedis(std::map &input_line) { - this->_input_line = input_line; - this->_entry.clear(); + std::string entry; - std::string source = this->GetSource(); + std::string source = this->GetSource(input_line); - if (not this->ParseData("net_src_ip")) + if (not this->ParseData(input_line, "net_src_ip", entry)) return false; - if (not this->ParseData("net_dst_ip")) + if (not this->ParseData(input_line, "net_dst_ip", entry)) return false; - if (not this->ParseData("net_dst_port")) + if (not this->ParseData(input_line, "net_dst_port", entry)) return false; - if (not this->ParseData("ip_proto")) + if (not this->ParseData(input_line, "ip_proto", entry)) return false; for (const auto &redis_config : this->_redis_lists) { // If the source in the input is equal to the source in the redis list, or the redis list's source is "" if (not redis_config.first.compare(source) or redis_config.first.empty()) - this->REDISAddEntry(this->_entry, redis_config.second); + this->REDISAddEntry(entry, redis_config.second); } return true; } diff --git a/samples/fbuffer/Connectors/fSofaConnector.cpp b/samples/fbuffer/Connectors/fSofaConnector.cpp index 2a634b8..2698afe 100644 --- a/samples/fbuffer/Connectors/fSofaConnector.cpp +++ b/samples/fbuffer/Connectors/fSofaConnector.cpp @@ -11,26 +11,25 @@ fSofaConnector::fSofaConnector(boost::asio::io_context &context, std::string &fi AConnector(context, darwin::SOFA, filter_socket_path, interval, redis_lists, minLogLen) {} bool fSofaConnector::ParseInputForRedis(std::map &input_line) { - this->_input_line = input_line; - this->_entry.clear(); + std::string entry; - std::string source = this->GetSource(); + std::string source = this->GetSource(input_line); - if (not this->ParseData("ip")) + if (not this->ParseData(input_line, "ip", entry)) return false; - if (not this->ParseData("hostname")) + if (not this->ParseData(input_line, "hostname", entry)) return false; - if (not this->ParseData("os")) + if (not this->ParseData(input_line, "os", entry)) return false; - if (not this->ParseData("proto")) + if (not this->ParseData(input_line, "proto", entry)) return false; - if (not this->ParseData("port")) + if (not this->ParseData(input_line, "port", entry)) return false; for (const auto &redis_config : this->_redis_lists) { // If the source in the input is equal to the source in the redis list, or the redis list's source is "" if (not redis_config.first.compare(source) or redis_config.first.empty()) - this->REDISAddEntry(this->_entry, redis_config.second); + this->REDISAddEntry(entry, redis_config.second); } return true; } \ No newline at end of file diff --git a/toolkit/AThread.cpp b/toolkit/AThread.cpp index 9c46944..17af39c 100644 --- a/toolkit/AThread.cpp +++ b/toolkit/AThread.cpp @@ -10,7 +10,7 @@ #include "AThread.hpp" #include "Logger.hpp" -AThread::AThread(int interval) : +AThread::AThread(int interval) : _interval(interval), _thread(), _is_stop(false) {} @@ -21,14 +21,16 @@ void AThread::ThreadMain() { std::mutex mtx; std::unique_lock lck(mtx); - while (!(this->_is_stop)) { - if (!this->Main()) { - DARWIN_LOG_DEBUG("AThread::ThreadMain:: Error in main function, stopping the thread"); - _is_stop = true; - break; - } + while (not this->_is_stop) { // Wait for notification or until timeout this->_cv.wait_for(lck, std::chrono::seconds(_interval)); + if (not _is_stop) { + if (not this->Main()) { + DARWIN_LOG_DEBUG("AThread::ThreadMain:: Error in main function, stopping the thread"); + _is_stop = true; + break; + } + } } } diff --git a/toolkit/AThread.hpp b/toolkit/AThread.hpp index b9a38ed..e5a57a7 100644 --- a/toolkit/AThread.hpp +++ b/toolkit/AThread.hpp @@ -22,7 +22,7 @@ class AThread { /// ///\class AThread - public: +public: ///\brief Unique constructor, creates a thread and immediately calls ThreadMain. /// ///\param interval The interval in seconds between two calls of Main by ThreadMain @@ -47,11 +47,11 @@ class AThread { ///\return Override MUST return true on success and false otherwise virtual bool Main() = 0; - - private: +protected: /// Interval, set by the ctor, between two calls of Main by ThreadMain int _interval; +private: /// The actual thread std::thread _thread;