From b17427ffb869718ef86cf2000bac0d6379b88775 Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Thu, 5 Aug 2021 15:04:58 +0300 Subject: [PATCH 01/13] Fixed windows compile error. --- src/utils/backoff_performer.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/utils/backoff_performer.cpp b/src/utils/backoff_performer.cpp index cafa625e..e8eb3e86 100644 --- a/src/utils/backoff_performer.cpp +++ b/src/utils/backoff_performer.cpp @@ -26,10 +26,10 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ - -#include -#include #include "utils/backoff_performer.h" +#undef max // solves the windows issue which prevents using std::numeric_limits::max() +#include +#include using std::min; using std::numeric_limits; From 4430b9189f3944b00e95d19656f7e6570f618f9a Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Thu, 5 Aug 2021 15:06:01 +0300 Subject: [PATCH 02/13] Set to use C++17. --- CMakeLists.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c4e44e66..657fe47a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,6 +17,7 @@ set(RDKAFKA_MIN_VERSION "0.9.4") set(RDKAFKA_MIN_VERSION_HEX 0x00090400) if (NOT CMAKE_CXX_FLAGS) + set(CMAKE_CXX_STANDARD 17) # Set default compile flags for the project if(MSVC) # Don't always use Wall, since VC's /Wall is ridiculously verbose. @@ -28,7 +29,7 @@ if (NOT CMAKE_CXX_FLAGS) add_definitions("-DNOGDI=1") add_definitions("-DNOMINMAX=1") else() - set(CMAKE_CXX_FLAGS "-std=c++11 -Wall") + set(CMAKE_CXX_FLAGS "-Wall") endif() endif() From 709eb4d2ae064d75d2150ed609b7e14617cd95c4 Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Thu, 5 Aug 2021 15:54:08 +0300 Subject: [PATCH 03/13] Replaced boost with cpp17 things in sources + removed undef max because it was useless actually. --- include/cppkafka/configuration.h | 8 ++++---- include/cppkafka/message.h | 4 ++-- include/cppkafka/utils/compacted_topic_processor.h | 14 +++++++------- include/cppkafka/utils/poll_strategy_base.h | 4 ++-- src/configuration.cpp | 2 +- src/message.cpp | 2 +- src/producer.cpp | 2 +- src/utils/backoff_performer.cpp | 3 +-- src/utils/poll_strategy_base.cpp | 4 ++-- 9 files changed, 21 insertions(+), 22 deletions(-) diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index c97f5a83..79e8e057 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -35,7 +35,7 @@ #include #include #include -#include +#include #include #include "topic_partition_list.h" #include "topic_configuration.h" @@ -226,12 +226,12 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { /** * Gets the default topic configuration */ - const boost::optional& get_default_topic_configuration() const; + const std::optional& get_default_topic_configuration() const; /** * Gets the default topic configuration */ - boost::optional& get_default_topic_configuration(); + std::optional& get_default_topic_configuration(); private: using HandlePtr = ClonablePtr; @@ -240,7 +240,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { static HandlePtr make_handle(rd_kafka_conf_t* ptr); HandlePtr handle_; - boost::optional default_topic_config_; + std::optional default_topic_config_; DeliveryReportCallback delivery_report_callback_; OffsetCommitCallback offset_commit_callback_; ErrorCallback error_callback_; diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index db5484ef..ace85622 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -34,7 +34,7 @@ #include #include #include -#include +#include #include #include "buffer.h" #include "macros.h" @@ -189,7 +189,7 @@ class CPPKAFKA_API Message { * * If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned. */ - boost::optional get_timestamp() const; + std::optional get_timestamp() const; #if RD_KAFKA_VERSION >= RD_KAFKA_MESSAGE_LATENCY_SUPPORT_VERSION /** diff --git a/include/cppkafka/utils/compacted_topic_processor.h b/include/cppkafka/utils/compacted_topic_processor.h index 166dcfe8..9f7b54f3 100644 --- a/include/cppkafka/utils/compacted_topic_processor.h +++ b/include/cppkafka/utils/compacted_topic_processor.h @@ -34,7 +34,7 @@ #include #include #include -#include +#include #include "../buffer.h" #include "../consumer.h" #include "../macros.h" @@ -106,8 +106,8 @@ class CPPKAFKA_API CompactedTopicEvent { EventType type_; std::string topic_; int partition_; - boost::optional key_; - boost::optional value_; + std::optional key_; + std::optional value_; }; template @@ -121,12 +121,12 @@ class CPPKAFKA_API CompactedTopicProcessor { /** * Callback used for decoding key objects */ - using KeyDecoder = std::function(const Buffer&)>; + using KeyDecoder = std::function(const Buffer&)>; /** * Callback used for decoding value objects */ - using ValueDecoder = std::function(const Key& key, const Buffer&)>; + using ValueDecoder = std::function(const Key& key, const Buffer&)>; /** * Callback used for event handling @@ -276,10 +276,10 @@ void CompactedTopicProcessor::process_event() { Message message = consumer_.poll(); if (message) { if (!message.get_error()) { - boost::optional key = key_decoder_(message.get_key()); + std::optional key = key_decoder_(message.get_key()); if (key) { if (message.get_payload()) { - boost::optional value = value_decoder_(*key, message.get_payload()); + std::optional value = value_decoder_(*key, message.get_payload()); if (value) { // If there's a payload and we managed to parse the value, generate a // SET_ELEMENT event diff --git a/include/cppkafka/utils/poll_strategy_base.h b/include/cppkafka/utils/poll_strategy_base.h index e8d49287..8d019923 100644 --- a/include/cppkafka/utils/poll_strategy_base.h +++ b/include/cppkafka/utils/poll_strategy_base.h @@ -31,7 +31,7 @@ #define CPPKAFKA_POLL_STRATEGY_BASE_H #include -#include +#include #include "../queue.h" #include "../topic_partition_list.h" #include "poll_interface.h" @@ -45,7 +45,7 @@ namespace cppkafka { */ struct QueueData { Queue queue; - boost::any metadata; + std::any metadata; }; /** diff --git a/src/configuration.cpp b/src/configuration.cpp index 5a59c517..d29e0fa3 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -41,7 +41,7 @@ using std::move; using std::vector; using std::initializer_list; using std::chrono::milliseconds; -using boost::optional; +using std::optional; namespace cppkafka { diff --git a/src/message.cpp b/src/message.cpp index 103bae87..ca83040c 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -84,7 +84,7 @@ Message& Message::load_internal() { return *this; } -boost::optional Message::get_timestamp() const { +std::optional Message::get_timestamp() const { rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type); if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { diff --git a/src/producer.cpp b/src/producer.cpp index af138d04..d016fb34 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -143,7 +143,7 @@ void Producer::do_produce(const Message& message, const Buffer& payload = message.get_payload(); const Buffer& key = message.get_key(); const int policy = static_cast(message_payload_policy_); - int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0; + int64_t duration = message.get_timestamp() ? message.get_timestamp().value().get_timestamp().count() : 0; auto result = rd_kafka_producev(get_handle(), RD_KAFKA_V_TOPIC(message.get_topic().data()), RD_KAFKA_V_PARTITION(message.get_partition()), diff --git a/src/utils/backoff_performer.cpp b/src/utils/backoff_performer.cpp index e8eb3e86..20de11f9 100644 --- a/src/utils/backoff_performer.cpp +++ b/src/utils/backoff_performer.cpp @@ -26,10 +26,9 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ -#include "utils/backoff_performer.h" -#undef max // solves the windows issue which prevents using std::numeric_limits::max() #include #include +#include "utils/backoff_performer.h" using std::min; using std::numeric_limits; diff --git a/src/utils/poll_strategy_base.cpp b/src/utils/poll_strategy_base.cpp index 15f75d8d..149accdd 100644 --- a/src/utils/poll_strategy_base.cpp +++ b/src/utils/poll_strategy_base.cpp @@ -36,7 +36,7 @@ namespace cppkafka { PollStrategyBase::PollStrategyBase(Consumer& consumer) : consumer_(consumer), - consumer_queue_(QueueData{consumer.get_consumer_queue(), boost::any()}) { + consumer_queue_(QueueData{consumer.get_consumer_queue(), std::any()}) { // get all currently active partition assignments TopicPartitionList assignment = consumer_.get_assignment(); on_assignment(assignment); @@ -93,7 +93,7 @@ void PollStrategyBase::assign(TopicPartitionList& partitions) { // populate partition queues for (const auto& partition : partitions) { // get the queue associated with this partition - partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), boost::any()}); + partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), std::any()}); } reset_state(); } From 03d046aef40710b5258478311225ebe1fc27e71f Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Thu, 5 Aug 2021 15:58:20 +0300 Subject: [PATCH 04/13] Fixed CMakeLists. --- CMakeLists.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 657fe47a..a34e9177 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,6 @@ set(RDKAFKA_MIN_VERSION "0.9.4") set(RDKAFKA_MIN_VERSION_HEX 0x00090400) if (NOT CMAKE_CXX_FLAGS) - set(CMAKE_CXX_STANDARD 17) # Set default compile flags for the project if(MSVC) # Don't always use Wall, since VC's /Wall is ridiculously verbose. @@ -32,6 +31,8 @@ if (NOT CMAKE_CXX_FLAGS) set(CMAKE_CXX_FLAGS "-Wall") endif() endif() +# Use c++17 +set(CMAKE_CXX_STANDARD 17) # Set output directories set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib) From 80d53a6bde6e03341da2c3baf4a2b2c60dac012a Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Thu, 5 Aug 2021 15:58:53 +0300 Subject: [PATCH 05/13] Fixed a typo. --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a34e9177..462212db 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,7 +31,7 @@ if (NOT CMAKE_CXX_FLAGS) set(CMAKE_CXX_FLAGS "-Wall") endif() endif() -# Use c++17 +# Use C++17 set(CMAKE_CXX_STANDARD 17) # Set output directories From e5358fa3f4d01701cb0509cffd8bd2cad653e119 Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Thu, 5 Aug 2021 16:12:35 +0300 Subject: [PATCH 06/13] Returned original backoff_performer.cpp --- src/utils/backoff_performer.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/utils/backoff_performer.cpp b/src/utils/backoff_performer.cpp index 20de11f9..cafa625e 100644 --- a/src/utils/backoff_performer.cpp +++ b/src/utils/backoff_performer.cpp @@ -26,8 +26,9 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ -#include + #include +#include #include "utils/backoff_performer.h" using std::min; From 73aa4e5474d0bb3b799c54acd6e546e198a44a89 Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Thu, 5 Aug 2021 16:51:36 +0300 Subject: [PATCH 07/13] Now it works without boost if you don't want the examples. --- CMakeLists.txt | 44 ++++++++++++++++++++++---------------------- src/CMakeLists.txt | 7 ++++++- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 462212db..06c6e278 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -104,24 +104,6 @@ if (NOT CPPKAFKA_PKGCONFIG_DIR) set(CPPKAFKA_PKGCONFIG_DIR share/pkgconfig) endif() -# Look for Boost (just need boost.optional headers here) -find_package(Boost REQUIRED ${FIND_PACKAGE_QUIET}) - -if (Boost_FOUND) - find_package(Boost COMPONENTS program_options ${FIND_PACKAGE_QUIET}) - set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS}) - set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED}) - include_directories(${Boost_INCLUDE_DIRS}) - link_directories(${Boost_LIBRARY_DIRS}) - if (CPPKAFKA_CMAKE_VERBOSE) - message(STATUS "Boost include dir: ${Boost_INCLUDE_DIRS}") - message(STATUS "Boost library dir: ${Boost_LIBRARY_DIRS}") - message(STATUS "Boost use static libs: ${Boost_USE_STATIC_LIBS}") - message(STATUS "Boost is multi-threaded: ${CPPKAFKA_BOOST_USE_MULTITHREADED}") - message(STATUS "Boost libraries: ${Boost_LIBRARIES}") - endif() -endif() - # Try to find the RdKafka configuration file if present. # This will search default system locations as well as RdKafka_ROOT and RdKafka_Dir paths if specified. find_package(RdKafka ${FIND_PACKAGE_QUIET} CONFIG) @@ -142,10 +124,28 @@ add_subdirectory(src) add_subdirectory(include/cppkafka) # Examples target -if (NOT CPPKAFKA_DISABLE_EXAMPLES AND Boost_PROGRAM_OPTIONS_FOUND) - add_subdirectory(examples) -else() - message(STATUS "Disabling examples") +if (NOT CPPKAFKA_DISABLE_EXAMPLES) + # Look for Boost (just need boost.optional headers here) + find_package(Boost ${FIND_PACKAGE_QUIET} CONFIG) + if (Boost_FOUND) + option(CPPKAFKA_BOOST_STATIC_LIBS "Link with Boost static libraries." ON) + option(CPPKAFKA_BOOST_USE_MULTITHREADED "Use Boost multithreaded libraries." ON) + find_package(Boost COMPONENTS program_options ${FIND_PACKAGE_QUIET}) + set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS}) + set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED}) + include_directories(${Boost_INCLUDE_DIRS}) + link_directories(${Boost_LIBRARY_DIRS}) + if (CPPKAFKA_CMAKE_VERBOSE) + message(STATUS "Boost include dir: ${Boost_INCLUDE_DIRS}") + message(STATUS "Boost library dir: ${Boost_LIBRARY_DIRS}") + message(STATUS "Boost use static libs: ${Boost_USE_STATIC_LIBS}") + message(STATUS "Boost is multi-threaded: ${CPPKAFKA_BOOST_USE_MULTITHREADED}") + message(STATUS "Boost libraries: ${Boost_LIBRARIES}") + endif() + add_subdirectory(examples) + else() + message(STATUS "Disabling examples") + endif() endif() # Add a target to generate API documentation using Doxygen diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5b8649b5..b9861a3b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -47,7 +47,12 @@ set_target_properties(${TARGET_NAME} PROPERTIES VERSION ${CPPKAFKA_VERSION} SOVERSION ${CPPKAFKA_VERSION}) # In CMake >= 3.15 Boost::boost == Boost::headers -target_link_libraries(${TARGET_NAME} PUBLIC RdKafka::rdkafka Boost::boost) +target_link_libraries(${TARGET_NAME} PUBLIC RdKafka::rdkafka) + +if(NOT CPPKAFKA_DISABLE_EXAMPLES) + target_link_libraries(${TARGET_NAME} PUBLIC Boost::boost) +endif() + if (WIN32) # On windows ntohs and related are in ws2_32 target_link_libraries(${TARGET_NAME} PUBLIC ws2_32.lib) From 77f0ea17617e9ce18963a85f1ed9a64ef8e4047c Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Mon, 9 Aug 2021 17:51:50 +0300 Subject: [PATCH 08/13] Fixed syntax error on windows when using std::numeric_limits::max. --- src/utils/backoff_performer.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/utils/backoff_performer.cpp b/src/utils/backoff_performer.cpp index cafa625e..db62d353 100644 --- a/src/utils/backoff_performer.cpp +++ b/src/utils/backoff_performer.cpp @@ -27,10 +27,15 @@ * */ +#ifdef _WIN32 +#define NOMINMAX +#endif + #include #include #include "utils/backoff_performer.h" + using std::min; using std::numeric_limits; From 6213cd3a293f339fb177bec7164450e6ee5e3026 Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Tue, 10 Aug 2021 11:05:22 +0300 Subject: [PATCH 09/13] Modified message.cpp to use the new feature. --- CMakeLists.txt | 65 ++++++++++++++++++++++--------------- include/cppkafka/cppkafka.h | 2 ++ include/cppkafka/message.h | 4 +-- src/CMakeLists.txt | 2 +- src/message.cpp | 2 +- 5 files changed, 44 insertions(+), 31 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 06c6e278..2c70039e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,8 @@ set(CPPKAFKA_VERSION "${CPPKAFKA_VERSION_MAJOR}.${CPPKAFKA_VERSION_MINOR}.${CPPK set(RDKAFKA_MIN_VERSION "0.9.4") set(RDKAFKA_MIN_VERSION_HEX 0x00090400) +option(USE_CPP17 "Use C++17 features instead of Boost (still, if you don't have boost you can't build examples" OFF) + if (NOT CMAKE_CXX_FLAGS) # Set default compile flags for the project if(MSVC) @@ -31,8 +33,13 @@ if (NOT CMAKE_CXX_FLAGS) set(CMAKE_CXX_FLAGS "-Wall") endif() endif() -# Use C++17 -set(CMAKE_CXX_STANDARD 17) + +if(USE_CPP17) + set(CMAKE_CXX_STANDARD 17) + add_definitions("-D_USE_CPP17") +else() + set(CMAKE_CXX_STANDARD 11) +endif() # Set output directories set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib) @@ -104,32 +111,11 @@ if (NOT CPPKAFKA_PKGCONFIG_DIR) set(CPPKAFKA_PKGCONFIG_DIR share/pkgconfig) endif() -# Try to find the RdKafka configuration file if present. -# This will search default system locations as well as RdKafka_ROOT and RdKafka_Dir paths if specified. -find_package(RdKafka ${FIND_PACKAGE_QUIET} CONFIG) -set(RDKAFKA_TARGET_IMPORTS ${RdKafka_FOUND}) -if (NOT RdKafka_FOUND) - message(STATUS "RdKafkaConfig.cmake not found. Attempting to find module instead...") - find_package(RdKafka REQUIRED ${FIND_PACKAGE_QUIET} MODULE) - if (NOT RdKafka_FOUND) - message(FATAL_ERROR "RdKafka module not found. Please set RDKAFKA_ROOT to the install path or RDKAFKA_DIR pointing to the RdKafka configuration file location.") - else() - message(STATUS "RdKafka module found.") - endif() -else() - message(STATUS "RdKafka configuration file found: ${RdKafka_CONFIG}") -endif() - -add_subdirectory(src) -add_subdirectory(include/cppkafka) - -# Examples target -if (NOT CPPKAFKA_DISABLE_EXAMPLES) +if(NOT USE_CPP17 OR NOT CPPKAFKA_DISABLE_EXAMPLES OR NOT CPPKAFKA_DISABLE_TESTS) # Look for Boost (just need boost.optional headers here) - find_package(Boost ${FIND_PACKAGE_QUIET} CONFIG) + find_package(Boost REQUIRED ${FIND_PACKAGE_QUIET}) + if (Boost_FOUND) - option(CPPKAFKA_BOOST_STATIC_LIBS "Link with Boost static libraries." ON) - option(CPPKAFKA_BOOST_USE_MULTITHREADED "Use Boost multithreaded libraries." ON) find_package(Boost COMPONENTS program_options ${FIND_PACKAGE_QUIET}) set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS}) set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED}) @@ -142,12 +128,37 @@ if (NOT CPPKAFKA_DISABLE_EXAMPLES) message(STATUS "Boost is multi-threaded: ${CPPKAFKA_BOOST_USE_MULTITHREADED}") message(STATUS "Boost libraries: ${Boost_LIBRARIES}") endif() + endif() + + # Examples target + if (Boost_PROGRAM_OPTIONS_FOUND) add_subdirectory(examples) else() message(STATUS "Disabling examples") endif() endif() +# Try to find the RdKafka configuration file if present. +# This will search default system locations as well as RdKafka_ROOT and RdKafka_Dir paths if specified. +find_package(RdKafka ${FIND_PACKAGE_QUIET} CONFIG) +set(RDKAFKA_TARGET_IMPORTS ${RdKafka_FOUND}) +if (NOT RdKafka_FOUND) + message(STATUS "RdKafkaConfig.cmake not found. Attempting to find module instead...") + find_package(RdKafka REQUIRED ${FIND_PACKAGE_QUIET} MODULE) + if (NOT RdKafka_FOUND) + message(FATAL_ERROR "RdKafka module not found. Please set RDKAFKA_ROOT to the install path or RDKAFKA_DIR pointing to the RdKafka configuration file location.") + else() + message(STATUS "RdKafka module found.") + endif() +else() + message(STATUS "RdKafka configuration file found: ${RdKafka_CONFIG}") +endif() + +add_subdirectory(src) +add_subdirectory(include/cppkafka) + + + # Add a target to generate API documentation using Doxygen find_package(Doxygen ${FIND_PACKAGE_QUIET}) if(DOXYGEN_FOUND) @@ -188,4 +199,4 @@ if(NOT TARGET uninstall) # Add uninstall target add_custom_target(uninstall COMMAND ${CMAKE_COMMAND} -P ${CMAKE_CURRENT_BINARY_DIR}/cmake_uninstall.cmake) -endif() +endif() \ No newline at end of file diff --git a/include/cppkafka/cppkafka.h b/include/cppkafka/cppkafka.h index 86ac366d..450b4a70 100644 --- a/include/cppkafka/cppkafka.h +++ b/include/cppkafka/cppkafka.h @@ -57,11 +57,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index ace85622..8168f436 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -34,7 +34,7 @@ #include #include #include -#include +#include "utils/optional.h" #include #include "buffer.h" #include "macros.h" @@ -189,7 +189,7 @@ class CPPKAFKA_API Message { * * If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned. */ - std::optional get_timestamp() const; + optional get_timestamp() const; #if RD_KAFKA_VERSION >= RD_KAFKA_MESSAGE_LATENCY_SUPPORT_VERSION /** diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b9861a3b..49d5bc1c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -49,7 +49,7 @@ set_target_properties(${TARGET_NAME} PROPERTIES # In CMake >= 3.15 Boost::boost == Boost::headers target_link_libraries(${TARGET_NAME} PUBLIC RdKafka::rdkafka) -if(NOT CPPKAFKA_DISABLE_EXAMPLES) +if(NOT USE_CPP17 OR NOT CPPKAFKA_DISABLE_EXAMPLES OR NOT CPPKAFKA_DISABLE_TESTS) target_link_libraries(${TARGET_NAME} PUBLIC Boost::boost) endif() diff --git a/src/message.cpp b/src/message.cpp index ca83040c..a49f61ae 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -84,7 +84,7 @@ Message& Message::load_internal() { return *this; } -std::optional Message::get_timestamp() const { +optional Message::get_timestamp() const { rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type); if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { From a9e276ab3f9c4dfabd5f6310ffd2b73e2f05d9fa Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Tue, 10 Aug 2021 11:10:16 +0300 Subject: [PATCH 10/13] Fixed configuration header and source files. --- include/cppkafka/configuration.h | 8 ++++---- src/configuration.cpp | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index 79e8e057..0015abbc 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -35,7 +35,7 @@ #include #include #include -#include +#include "utils/optional.h" #include #include "topic_partition_list.h" #include "topic_configuration.h" @@ -226,12 +226,12 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { /** * Gets the default topic configuration */ - const std::optional& get_default_topic_configuration() const; + const optional& get_default_topic_configuration() const; /** * Gets the default topic configuration */ - std::optional& get_default_topic_configuration(); + optional& get_default_topic_configuration(); private: using HandlePtr = ClonablePtr; @@ -240,7 +240,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { static HandlePtr make_handle(rd_kafka_conf_t* ptr); HandlePtr handle_; - std::optional default_topic_config_; + optional default_topic_config_; DeliveryReportCallback delivery_report_callback_; OffsetCommitCallback offset_commit_callback_; ErrorCallback error_callback_; diff --git a/src/configuration.cpp b/src/configuration.cpp index d29e0fa3..62f8cf1c 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -41,7 +41,6 @@ using std::move; using std::vector; using std::initializer_list; using std::chrono::milliseconds; -using std::optional; namespace cppkafka { From b73875ba20da9b6461315aee2017a12908a526d3 Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Tue, 10 Aug 2021 11:24:33 +0300 Subject: [PATCH 11/13] Modified 2 more files + added the new headers (forgot to previously) --- include/cppkafka/utils/any.h | 14 ++++++++++++++ include/cppkafka/utils/compacted_topic_processor.h | 14 +++++++------- include/cppkafka/utils/optional.h | 14 ++++++++++++++ include/cppkafka/utils/poll_strategy_base.h | 4 ++-- 4 files changed, 37 insertions(+), 9 deletions(-) create mode 100644 include/cppkafka/utils/any.h create mode 100644 include/cppkafka/utils/optional.h diff --git a/include/cppkafka/utils/any.h b/include/cppkafka/utils/any.h new file mode 100644 index 00000000..0750ba16 --- /dev/null +++ b/include/cppkafka/utils/any.h @@ -0,0 +1,14 @@ + #ifdef _USE_CPP17 + #include + #else + #include + #endif + +namespace cppkafka +{ + #ifdef _USE_CPP17 + using any = std::any; + #else + using any = boost::any; + #endif +} \ No newline at end of file diff --git a/include/cppkafka/utils/compacted_topic_processor.h b/include/cppkafka/utils/compacted_topic_processor.h index 9f7b54f3..38f95628 100644 --- a/include/cppkafka/utils/compacted_topic_processor.h +++ b/include/cppkafka/utils/compacted_topic_processor.h @@ -34,7 +34,7 @@ #include #include #include -#include +#include "optional.h" #include "../buffer.h" #include "../consumer.h" #include "../macros.h" @@ -106,8 +106,8 @@ class CPPKAFKA_API CompactedTopicEvent { EventType type_; std::string topic_; int partition_; - std::optional key_; - std::optional value_; + optional key_; + optional value_; }; template @@ -121,12 +121,12 @@ class CPPKAFKA_API CompactedTopicProcessor { /** * Callback used for decoding key objects */ - using KeyDecoder = std::function(const Buffer&)>; + using KeyDecoder = std::function(const Buffer&)>; /** * Callback used for decoding value objects */ - using ValueDecoder = std::function(const Key& key, const Buffer&)>; + using ValueDecoder = std::function(const Key& key, const Buffer&)>; /** * Callback used for event handling @@ -276,10 +276,10 @@ void CompactedTopicProcessor::process_event() { Message message = consumer_.poll(); if (message) { if (!message.get_error()) { - std::optional key = key_decoder_(message.get_key()); + optional key = key_decoder_(message.get_key()); if (key) { if (message.get_payload()) { - std::optional value = value_decoder_(*key, message.get_payload()); + optional value = value_decoder_(*key, message.get_payload()); if (value) { // If there's a payload and we managed to parse the value, generate a // SET_ELEMENT event diff --git a/include/cppkafka/utils/optional.h b/include/cppkafka/utils/optional.h new file mode 100644 index 00000000..d8709e8e --- /dev/null +++ b/include/cppkafka/utils/optional.h @@ -0,0 +1,14 @@ +#ifdef _USE_CPP17 +#include +#else +#include +#endif + +namespace cppkafka +{ +#ifdef _USE_CPP17 + using std::optional; +#else + using boost::optional +#endif +} \ No newline at end of file diff --git a/include/cppkafka/utils/poll_strategy_base.h b/include/cppkafka/utils/poll_strategy_base.h index 8d019923..cc590344 100644 --- a/include/cppkafka/utils/poll_strategy_base.h +++ b/include/cppkafka/utils/poll_strategy_base.h @@ -31,7 +31,7 @@ #define CPPKAFKA_POLL_STRATEGY_BASE_H #include -#include +#include "any.h" #include "../queue.h" #include "../topic_partition_list.h" #include "poll_interface.h" @@ -45,7 +45,7 @@ namespace cppkafka { */ struct QueueData { Queue queue; - std::any metadata; + any metadata; }; /** From 1c91d3301fadb5d91a0e7542ed317f6590e904e5 Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Tue, 10 Aug 2021 12:03:19 +0300 Subject: [PATCH 12/13] Fixed 3 more files. --- src/producer.cpp | 4 ++++ src/utils/backoff_performer.cpp | 1 - src/utils/poll_strategy_base.cpp | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/producer.cpp b/src/producer.cpp index d016fb34..6ab6ae18 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -143,7 +143,11 @@ void Producer::do_produce(const Message& message, const Buffer& payload = message.get_payload(); const Buffer& key = message.get_key(); const int policy = static_cast(message_payload_policy_); +#ifdef _USE_CPP17 int64_t duration = message.get_timestamp() ? message.get_timestamp().value().get_timestamp().count() : 0; +#else + int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0; +#endif auto result = rd_kafka_producev(get_handle(), RD_KAFKA_V_TOPIC(message.get_topic().data()), RD_KAFKA_V_PARTITION(message.get_partition()), diff --git a/src/utils/backoff_performer.cpp b/src/utils/backoff_performer.cpp index db62d353..27b43356 100644 --- a/src/utils/backoff_performer.cpp +++ b/src/utils/backoff_performer.cpp @@ -35,7 +35,6 @@ #include #include "utils/backoff_performer.h" - using std::min; using std::numeric_limits; diff --git a/src/utils/poll_strategy_base.cpp b/src/utils/poll_strategy_base.cpp index 149accdd..40df5e2f 100644 --- a/src/utils/poll_strategy_base.cpp +++ b/src/utils/poll_strategy_base.cpp @@ -93,7 +93,7 @@ void PollStrategyBase::assign(TopicPartitionList& partitions) { // populate partition queues for (const auto& partition : partitions) { // get the queue associated with this partition - partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), std::any()}); + partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), any()}); } reset_state(); } From e38438ca061c951013773020269b6182baa26cf3 Mon Sep 17 00:00:00 2001 From: TheSharpOwl Date: Tue, 10 Aug 2021 12:06:01 +0300 Subject: [PATCH 13/13] Fixed std::any to any. --- src/utils/poll_strategy_base.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/poll_strategy_base.cpp b/src/utils/poll_strategy_base.cpp index 40df5e2f..c295c00b 100644 --- a/src/utils/poll_strategy_base.cpp +++ b/src/utils/poll_strategy_base.cpp @@ -36,7 +36,7 @@ namespace cppkafka { PollStrategyBase::PollStrategyBase(Consumer& consumer) : consumer_(consumer), - consumer_queue_(QueueData{consumer.get_consumer_queue(), std::any()}) { + consumer_queue_(QueueData{consumer.get_consumer_queue(), any()}) { // get all currently active partition assignments TopicPartitionList assignment = consumer_.get_assignment(); on_assignment(assignment);