From b25dc9bcc2baddbdd9fc17981062486029eaa6e5 Mon Sep 17 00:00:00 2001 From: Grzegorz Mazur Date: Sat, 13 Nov 2021 21:24:53 +0100 Subject: [PATCH] get rid of zmpp closes #338 --- cyacas/yacas-kernel/CMakeLists.txt | 6 +- cyacas/yacas-kernel/include/yacas_engine.hpp | 6 +- cyacas/yacas-kernel/include/yacas_kernel.hpp | 24 ++-- cyacas/yacas-kernel/src/yacas_engine.cpp | 29 ++--- cyacas/yacas-kernel/src/yacas_kernel.cpp | 113 +++++++++---------- 5 files changed, 85 insertions(+), 93 deletions(-) diff --git a/cyacas/yacas-kernel/CMakeLists.txt b/cyacas/yacas-kernel/CMakeLists.txt index fe1600aa7..da9b0988a 100644 --- a/cyacas/yacas-kernel/CMakeLists.txt +++ b/cyacas/yacas-kernel/CMakeLists.txt @@ -16,12 +16,10 @@ # # + find_path (ZEROMQ_INCLUDE_DIR zmq.hpp) find_library (ZEROMQ_LIBRARY NAMES zmq) -find_path (ZMQPP_INCLUDE_DIR zmqpp.hpp) -find_library (ZMQPP_LIBRARY NAMES zmqpp) - find_path (JSONCPP_INCLUDE_DIR json.h) find_library (JSONCPP_LIBRARY NAMES jsoncpp) @@ -31,6 +29,6 @@ find_package (Boost REQUIRED date_time filesystem) include_directories (include) add_executable (yacas-kernel src/main.cpp src/yacas_kernel.cpp src/yacas_engine.cpp src/hmac_sha256.cpp src/base64.cpp) -target_link_libraries (yacas-kernel libyacas ${ZMQPP_LIBRARY} ${ZEROMQ_LIBRARY} ${JSONCPP_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY} ${Boost_LIBRARIES} pthread ${CMAKE_DL_LIBS}) +target_link_libraries (yacas-kernel libyacas ${ZEROMQ_LIBRARY} ${JSONCPP_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY} ${Boost_LIBRARIES} pthread ${CMAKE_DL_LIBS}) install (TARGETS yacas-kernel DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/cyacas/yacas-kernel/include/yacas_engine.hpp b/cyacas/yacas-kernel/include/yacas_engine.hpp index 16a1ac800..b0929ece8 100644 --- a/cyacas/yacas-kernel/include/yacas_engine.hpp +++ b/cyacas/yacas-kernel/include/yacas_engine.hpp @@ -27,7 +27,7 @@ #include "yacas/yacas.h" -#include +#include #include #include @@ -40,7 +40,7 @@ class YacasEngine { public: YacasEngine(const std::string& scripts_path, - const zmqpp::context& ctx, + zmq::context_t& ctx, const std::string& endpoint = "inproc://engine"); ~YacasEngine(); @@ -65,7 +65,7 @@ class YacasEngine { std::thread* _worker_thread; - zmqpp::socket _socket; + zmq::socket_t _socket; std::atomic _shutdown; }; diff --git a/cyacas/yacas-kernel/include/yacas_kernel.hpp b/cyacas/yacas-kernel/include/yacas_kernel.hpp index 91d36ac01..1cc4eabf9 100644 --- a/cyacas/yacas-kernel/include/yacas_kernel.hpp +++ b/cyacas/yacas-kernel/include/yacas_kernel.hpp @@ -30,8 +30,8 @@ #include #include -#include - +#include +#include #include #include @@ -60,12 +60,12 @@ class YacasKernel : NonCopyable { class Request : NonCopyable { public: - Request(const Session& session, const zmqpp::message& msg); + Request(const Session& session, const zmq::multipart_t& msg); const Json::Value& header() const { return _header; } const Json::Value& content() const { return _content; } - void reply(zmqpp::socket&, + void reply(zmq::socket_t&, const std::string& type, const Json::Value& content) const; @@ -78,19 +78,19 @@ class YacasKernel : NonCopyable { }; void _handle_shell(const std::shared_ptr& request); - void _handle_engine(const zmqpp::message& msg); + void _handle_engine(const zmq::multipart_t& msg); Session _session; - zmqpp::context _ctx; + zmq::context_t _ctx; - zmqpp::socket _hb_socket; - zmqpp::socket _iopub_socket; - zmqpp::socket _control_socket; - zmqpp::socket _stdin_socket; - zmqpp::socket _shell_socket; + zmq::socket_t _hb_socket; + zmq::socket_t _iopub_socket; + zmq::socket_t _control_socket; + zmq::socket_t _stdin_socket; + zmq::socket_t _shell_socket; - zmqpp::socket _engine_socket; + zmq::socket_t _engine_socket; unsigned long _execution_count; diff --git a/cyacas/yacas-kernel/src/yacas_engine.cpp b/cyacas/yacas-kernel/src/yacas_engine.cpp index 6ed60ca3b..3226b113c 100644 --- a/cyacas/yacas-kernel/src/yacas_engine.cpp +++ b/cyacas/yacas-kernel/src/yacas_engine.cpp @@ -22,15 +22,16 @@ * Created on November 7, 2015, 12:52 PM */ -#include - #include "yacas_engine.hpp" +#include +#include + YacasEngine::YacasEngine(const std::string& scripts_path, - const zmqpp::context& ctx, + zmq::context_t& ctx, const std::string& endpoint) : _yacas(_side_effects), - _socket(ctx, zmqpp::socket_type::pair), + _socket(ctx, zmq::socket_type::pair), _shutdown(false) { _yacas.Evaluate(std::string("DefaultDirectory(\"") + scripts_path + @@ -90,11 +91,11 @@ void YacasEngine::_worker() Json::Value calculate_content; calculate_content["id"] = Json::Value::UInt64(ti.id); calculate_content["expr"] = ti.expr; - zmqpp::message status_msg; - status_msg << "calculate" - << Json::writeString(Json::StreamWriterBuilder(), - calculate_content); - _socket.send(status_msg); + zmq::multipart_t status_msg; + status_msg.addstr("calculate"); + status_msg.addstr(Json::writeString(Json::StreamWriterBuilder(), + calculate_content)); + status_msg.send(_socket); _side_effects.clear(); _side_effects.str(""); @@ -111,10 +112,10 @@ void YacasEngine::_worker() result_content["side_effects"] = _side_effects.str(); - zmqpp::message result_msg; - result_msg << "result" - << Json::writeString(Json::StreamWriterBuilder(), - result_content); - _socket.send(result_msg); + zmq::multipart_t result_msg; + result_msg.addstr("result"); + result_msg.addstr(Json::writeString(Json::StreamWriterBuilder(), + result_content)); + result_msg.send(_socket); } } diff --git a/cyacas/yacas-kernel/src/yacas_kernel.cpp b/cyacas/yacas-kernel/src/yacas_kernel.cpp index d8f35914f..f2667d02c 100644 --- a/cyacas/yacas-kernel/src/yacas_kernel.cpp +++ b/cyacas/yacas-kernel/src/yacas_kernel.cpp @@ -30,8 +30,6 @@ #include #include -#include -#include #include #include #include @@ -48,12 +46,12 @@ namespace { YacasKernel::YacasKernel(const std::string& scripts_path, const Json::Value& config) : _session(config["key"].asString()), - _hb_socket(_ctx, zmqpp::socket_type::reply), - _iopub_socket(_ctx, zmqpp::socket_type::publish), - _control_socket(_ctx, zmqpp::socket_type::router), - _stdin_socket(_ctx, zmqpp::socket_type::router), - _shell_socket(_ctx, zmqpp::socket_type::router), - _engine_socket(_ctx, zmqpp::socket_type::pair), + _hb_socket(_ctx, zmq::socket_type::rep), + _iopub_socket(_ctx, zmq::socket_type::pub), + _control_socket(_ctx, zmq::socket_type::router), + _stdin_socket(_ctx, zmq::socket_type::router), + _shell_socket(_ctx, zmq::socket_type::router), + _engine_socket(_ctx, zmq::socket_type::pair), _execution_count(1), _engine(scripts_path, _ctx, "inproc://engine"), _tex_output(true), @@ -82,33 +80,35 @@ YacasKernel::YacasKernel(const std::string& scripts_path, void YacasKernel::run() { - zmqpp::poller poller; - - poller.add(_hb_socket); - poller.add(_control_socket); - poller.add(_stdin_socket); - poller.add(_shell_socket); - poller.add(_iopub_socket); - poller.add(_engine_socket); + std::vector items{ + {_hb_socket, 0, ZMQ_POLLIN, 0}, + {_control_socket, 0, ZMQ_POLLIN, 0}, + {_stdin_socket, 0, ZMQ_POLLIN, 0}, + {_shell_socket, 0, ZMQ_POLLIN, 0}, + {_iopub_socket, 0, ZMQ_POLLIN, 0}, + {_engine_socket, 0, ZMQ_POLLIN, 0} + }; for (;;) { - poller.poll(); + zmq::poll(items); + + std::cerr << "toratoratora" << std::endl; - if (poller.has_input(_hb_socket)) { - zmqpp::message msg; - _hb_socket.receive(msg); + if (items[0].revents & ZMQ_POLLIN) { // heartbeat + zmq::message_t msg; + _hb_socket.recv(msg); _hb_socket.send(msg); } - if (poller.has_input(_shell_socket)) { - zmqpp::message msg; - _shell_socket.receive(msg); + if (items[3].revents & ZMQ_POLLIN) { // shell + zmq::multipart_t msg; + msg.recv(_shell_socket); _handle_shell(std::make_shared(_session, msg)); } - if (poller.has_input(_control_socket)) { - zmqpp::message msg; - _control_socket.receive(msg); + if (items[1].revents & ZMQ_POLLIN) { // control + zmq::multipart_t msg; + msg.recv(_control_socket); Request request(_session, msg); if (request.header()["msg_type"].asString() == "shutdown_request") @@ -118,14 +118,14 @@ void YacasKernel::run() if (_shutdown) return; - if (poller.has_input(_stdin_socket)) { - zmqpp::message msg; - _stdin_socket.receive(msg); + if (items[2].revents & ZMQ_POLLIN) { // stdin + zmq::message_t msg; + _stdin_socket.recv(msg); } - if (poller.has_input(_engine_socket)) { - zmqpp::message msg; - _engine_socket.receive(msg); + if (items[5].revents & ZMQ_POLLIN) { // engine + zmq::multipart_t msg; + msg.recv(_engine_socket); _handle_engine(msg); } } @@ -138,17 +138,13 @@ YacasKernel::Session::Session(const std::string& key) : } YacasKernel::Request::Request(const Session& session, - const zmqpp::message& msg) : + const zmq::multipart_t& msg) : _session(session) { - std::string header_buf; - msg.get(header_buf, 3); - std::string parent_header_buf; - msg.get(parent_header_buf, 4); - std::string metadata_buf; - msg.get(metadata_buf, 5); - std::string content_buf; - msg.get(content_buf, 6); + const std::string header_buf{msg.peekstr(3)}; + const std::string parent_header_buf{msg.peekstr(4)}; + const std::string metadata_buf{msg.peekstr(5)}; + const std::string content_buf{msg.peekstr(6)}; HMAC_SHA256 auth(_session.auth()); @@ -157,13 +153,12 @@ YacasKernel::Request::Request(const Session& session, auth.update(metadata_buf); auth.update(content_buf); - std::string signature_buf; - msg.get(signature_buf, 2); + std::string signature_buf{msg.peekstr(2)}; if (auth.hexdigest() != signature_buf) throw std::runtime_error("invalid signature"); - msg.get(_identities_buf, 0); + _identities_buf = msg.peekstr(0); Json::Reader reader; @@ -172,7 +167,7 @@ YacasKernel::Request::Request(const Session& session, reader.parse(metadata_buf, _metadata); } -void YacasKernel::Request::reply(zmqpp::socket& socket, +void YacasKernel::Request::reply(zmq::socket_t& socket, const std::string& msg_type, const Json::Value& content) const { @@ -199,16 +194,17 @@ void YacasKernel::Request::reply(zmqpp::socket& socket, auth.update(metadata_buf); auth.update(content_buf); - zmqpp::message msg; - msg.add(_identities_buf); - msg.add(""); - msg.add(auth.hexdigest()); - msg.add(header_buf); - msg.add(parent_header_buf); - msg.add(metadata_buf); - msg.add(content_buf); + zmq::multipart_t msg; - socket.send(msg); + msg.addstr(_identities_buf); + msg.addstr(""); + msg.addstr(auth.hexdigest()); + msg.addstr(header_buf); + msg.addstr(parent_header_buf); + msg.addstr(metadata_buf); + msg.addstr(content_buf); + + msg.send(socket); } void YacasKernel::_handle_shell(const std::shared_ptr& request) @@ -316,13 +312,10 @@ void YacasKernel::_handle_shell(const std::shared_ptr& request) } } -void YacasKernel::_handle_engine(const zmqpp::message& msg) +void YacasKernel::_handle_engine(const zmq::multipart_t& msg) { - std::string msg_type; - msg.get(msg_type, 0); - - std::string content_buf; - msg.get(content_buf, 1); + std::string msg_type{msg.peekstr(0)}; + std::string content_buf{msg.peekstr(1)}; Json::Value content; Json::Reader().parse(content_buf, content);