Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)

# --- dependencies --
find_package(
Boost 1.68
COMPONENTS date_time
thread
chrono
random
REQUIRED)

# Try to find Boost::system
find_package(Boost COMPONENTS system)
if (NOT Boost_SYSTEM_FOUND)
message(WARNING "Boost::system not found, some examples may not compile")
endif()
find_package(Boost 1.68 CONFIG REQUIRED COMPONENTS headers)

include(FindAzmqLibzmq.cmake)

Expand All @@ -42,7 +30,7 @@ find_package(Threads REQUIRED)
add_library(${PROJECT_NAME} INTERFACE)
add_library(Azmq::${PROJECT_NAME} ALIAS ${PROJECT_NAME})

target_link_libraries(${PROJECT_NAME} INTERFACE Azmq::libzmq Boost::boost ${Boost_SYSTEM_LIBRARY} ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(${PROJECT_NAME} INTERFACE Azmq::libzmq Boost::headers Threads::Threads)
target_include_directories(${PROJECT_NAME} INTERFACE "$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>"
"$<INSTALL_INTERFACE:include>")

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ which supports C++11. Currently this has been tested with -
* Microsoft Visual Studio 2013 on Windows Server 2008 R2

Library dependencies are -
* Boost 1.68 or later
* Boost 1.68 or later (headers only; no compiled Boost libraries required)
* ZeroMQ 4.0.x

Tests and example code require -
Expand Down
8 changes: 2 additions & 6 deletions azmqConfig.cmake
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
include(CMakeFindDependencyMacro)
find_package(Boost 1.68 COMPONENTS date_time thread chrono random REQUIRED)
find_package(Boost 1.68 COMPONENTS system)
if (NOT Boost_SYSTEM_FOUND)
message(WARNING "Boost::system not found")
endif()
find_dependency(Boost 1.68 CONFIG COMPONENTS headers)

include(${CMAKE_CURRENT_LIST_DIR}/FindAzmqLibzmq.cmake)
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
find_package(Threads REQUIRED)
find_dependency(Threads)
include("${CMAKE_CURRENT_LIST_DIR}/azmqTargets.cmake")
85 changes: 54 additions & 31 deletions test/socket/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ TEST_CASE( "Async send/receive copies buffer refs", "[socket_ops]" ) {
ioservice.run();
}

std::string endpoint() const {
return sock.endpoint();
}

boost::system::error_code get_error() const {
return error;
}
Expand Down Expand Up @@ -279,10 +283,11 @@ TEST_CASE( "Async send/receive copies buffer refs", "[socket_ops]" ) {
}
};

static const char ENDPOINT[] = "tcp://127.0.0.1:9998";

std::string endpoint = "tcp://127.0.0.1:*";
Sender sender;
sender.bind(ENDPOINT);
sender.bind(endpoint);
endpoint = sender.endpoint();
REQUIRE(!endpoint.empty());
sender.start_send();

Receiver receiver;
Expand All @@ -293,8 +298,8 @@ TEST_CASE( "Async send/receive copies buffer refs", "[socket_ops]" ) {
sender.run();
});

std::thread receiverthread([&receiver]() {
receiver.connect(ENDPOINT);
std::thread receiverthread([&receiver, &endpoint]() {
receiver.connect(endpoint);
receiver.start_receive();
receiver.run();
});
Expand Down Expand Up @@ -610,8 +615,10 @@ TEST_CASE( "Socket Monitor", "[socket]" ) {
ioc_m.run();
});

server->bind("tcp://127.0.0.1:9998");
client->connect("tcp://127.0.0.1:9998");
server->bind("tcp://127.0.0.1:*");
auto endpoint = server->endpoint();
REQUIRE(!endpoint.empty());
client->connect(endpoint);

bounce(*client, *server);

Expand All @@ -627,27 +634,39 @@ TEST_CASE( "Socket Monitor", "[socket]" ) {
t.join();

{
size_t i = 0;
CHECK(client_monitor.events_.at(i++).e == ZMQ_EVENT_CONNECT_DELAYED);
CHECK(client_monitor.events_.at(i++).e == ZMQ_EVENT_CONNECTED);
auto has_event = [&client_monitor](uint16_t wanted) {
for (auto const& ev : client_monitor.events_) {
if (ev.e == wanted)
return true;
}
return false;
};

CHECK(has_event(ZMQ_EVENT_CONNECT_DELAYED));
CHECK(has_event(ZMQ_EVENT_CONNECTED));
#ifdef ZMQ_EVENT_HANDSHAKE_SUCCEEDED
CHECK(client_monitor.events_.at(i++).e == ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
CHECK(has_event(ZMQ_EVENT_HANDSHAKE_SUCCEEDED));
#endif
CHECK(client_monitor.events_.at(i++).e == ZMQ_EVENT_MONITOR_STOPPED);
REQUIRE(client_monitor.events_.size() == i);
}

{
size_t i = 0;
CHECK(server_monitor.events_.at(i++).e == ZMQ_EVENT_LISTENING);
CHECK(server_monitor.events_.at(i++).e == ZMQ_EVENT_ACCEPTED);
#ifdef ZMQ_EVENT_HANDSHAKE_SUCCEEDED
CHECK(server_monitor.events_.at(i++).e == ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
#endif
CHECK(server_monitor.events_.at(i++).e == ZMQ_EVENT_CLOSED);
CHECK(server_monitor.events_.at(i++).e == ZMQ_EVENT_MONITOR_STOPPED);
REQUIRE(server_monitor.events_.size() == i);
}
CHECK(has_event(ZMQ_EVENT_MONITOR_STOPPED));
}

{
auto has_event = [&server_monitor](uint16_t wanted) {
for (auto const& ev : server_monitor.events_) {
if (ev.e == wanted)
return true;
}
return false;
};

CHECK(has_event(ZMQ_EVENT_LISTENING));
CHECK(has_event(ZMQ_EVENT_ACCEPTED));
#ifdef ZMQ_EVENT_HANDSHAKE_SUCCEEDED
CHECK(has_event(ZMQ_EVENT_HANDSHAKE_SUCCEEDED));
#endif
CHECK(has_event(ZMQ_EVENT_CLOSED));
CHECK(has_event(ZMQ_EVENT_MONITOR_STOPPED));
}
}

TEST_CASE( "Attach Method", "[socket]" ) {
Expand All @@ -657,18 +676,20 @@ TEST_CASE( "Attach Method", "[socket]" ) {

std::vector<std::string> elems;

azmq::attach(s, split(elems, "@inproc://myendpoint,tcp://127.0.0.1:5556,inproc://others", is_any_of(",")), true);
azmq::attach(s, split(elems, "@inproc://myendpoint,tcp://127.0.0.1:*,inproc://others", is_any_of(",")), true);
REQUIRE(s.endpoint() == "inproc://others");
}

TEST_CASE( "Pub/Sub", "[socket]" ) {
boost::asio::io_context ioc;
azmq::sub_socket subscriber(ioc);
subscriber.connect("tcp://127.0.0.1:5556");
subscriber.set_option(azmq::socket::subscribe("FOO"));

azmq::pub_socket publisher(ioc);
publisher.bind("tcp://127.0.0.1:5556");
publisher.bind("tcp://127.0.0.1:*");
auto endpoint = publisher.endpoint();
REQUIRE(!endpoint.empty());
subscriber.connect(endpoint);

std::this_thread::sleep_for(std::chrono::seconds(1));

Expand Down Expand Up @@ -713,10 +734,12 @@ TEST_CASE( "Loopback", "[socket]" ) {
boost::asio::io_context ioc_c;

azmq::socket sb(ioc_b, ZMQ_ROUTER);
sb.bind("tcp://127.0.0.1:5560");
sb.bind("tcp://127.0.0.1:*");
auto endpoint = sb.endpoint();
REQUIRE(!endpoint.empty());

azmq::socket sc(ioc_c, ZMQ_DEALER);
sc.connect("tcp://127.0.0.1:5560");
sc.connect(endpoint);

size_t ct = 100000;
state s(ct);
Expand Down
24 changes: 21 additions & 3 deletions test/socket_ops/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include <array>
#include <chrono>
#include <cerrno>
#include <random>
#include <thread>

#define CATCH_CONFIG_MAIN
Expand All @@ -27,9 +29,25 @@ TEST_CASE( "Tcp Dynamic Binding Expressions", "[socket_ops]" ) {
auto sb = azmq::detail::socket_ops::create_socket(ctx, ZMQ_ROUTER, ec);
REQUIRE(ec == boost::system::error_code());

std::string uri{ "tcp://127.0.0.1:5560" };
azmq::detail::socket_ops::bind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
// Bind to an explicit numeric port (but pick one dynamically) to avoid
// flakiness when tests run in parallel.
std::mt19937 rng{std::random_device{}()};
std::uniform_int_distribution<int> port_dist(49152, 65535);
std::string uri;
bool bound = false;
for (int attempt = 0; attempt < 50; ++attempt) {
uri = std::string("tcp://127.0.0.1:") + std::to_string(port_dist(rng));
azmq::detail::socket_ops::bind(sb, uri, ec);
if (!ec) {
bound = true;
break;
}
if (ec.value() == EADDRINUSE) {
continue;
}
FAIL(ec.message());
}
REQUIRE(bound);
azmq::detail::socket_ops::unbind(sb, uri, ec);
REQUIRE(ec == boost::system::error_code());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Expand Down
Loading