|
@@ -3,11 +3,137 @@
|
|
|
#include "config.hpp"
|
|
|
#include "net.hpp"
|
|
|
|
|
|
+NodeIO::NodeIO(tcp::socket &&socket) : sock(std::move(socket))
|
|
|
+{
|
|
|
+}
|
|
|
+
|
|
|
+uint8_t *NodeIO::request_frame()
|
|
|
+{
|
|
|
+ if (frames_available.empty()) {
|
|
|
+ // Allocate a new frame. Note that this memory will (at this
|
|
|
+ // time) never get deallocated. In theory, we could deallocate
|
|
|
+ // it in return_frame, but if a certain number of frames were
|
|
|
+ // allocated here, it means we had that much data in flight
|
|
|
+ // (queued but not accepted for sending by the OS), and we're
|
|
|
+ // likely to need that much again. Subsequent messages will
|
|
|
+ // _reuse_ the allocated data, though, so the used memory won't
|
|
|
+ // grow forever, and will be limited to the amount of in-flight
|
|
|
+ // data needed.
|
|
|
+ return new uint8_t[MAXCHUNKSIZE];
|
|
|
+ }
|
|
|
+ // Copy the pointer to the frame out of the deque and remove it from
|
|
|
+ // the deque. Note this is _not_ taking the address of the element
|
|
|
+ // *in* the deque (and then popping it, which would invalidate that
|
|
|
+ // pointer).
|
|
|
+ frame_deque_lock.lock();
|
|
|
+ uint8_t *frame = frames_available.back();
|
|
|
+ frames_available.pop_back();
|
|
|
+ frame_deque_lock.unlock();
|
|
|
+ return frame;
|
|
|
+}
|
|
|
+
|
|
|
+void NodeIO::return_frame(uint8_t *frame)
|
|
|
+{
|
|
|
+ if (!frame) return;
|
|
|
+
|
|
|
+ // We push the frame back on to the end of the deque so that it will
|
|
|
+ // be the next one used. This may lead to better cache behaviour?
|
|
|
+ frame_deque_lock.lock();
|
|
|
+ frames_available.push_back(frame);
|
|
|
+ frame_deque_lock.unlock();
|
|
|
+}
|
|
|
+
|
|
|
+void NodeIO::send_header_data(uint64_t header, uint8_t *data, size_t len)
|
|
|
+{
|
|
|
+ std::vector<boost::asio::const_buffer> tosend;
|
|
|
+
|
|
|
+ // Put the header into the deque so it's in memory at a stable
|
|
|
+ // address during the async write
|
|
|
+ header_deque_lock.lock();
|
|
|
+ headers_inflight.push_back(header);
|
|
|
+
|
|
|
+ uint64_t *headerp = &(headers_inflight.back());
|
|
|
+ header_deque_lock.unlock();
|
|
|
+ tosend.push_back(boost::asio::buffer(headerp, 5));
|
|
|
+ if (data != NULL && len > 0) {
|
|
|
+ tosend.push_back(boost::asio::buffer(data, len));
|
|
|
+ }
|
|
|
+ boost::asio::async_write(sock, tosend,
|
|
|
+ [this, headerp, data](boost::system::error_code, std::size_t){
|
|
|
+ // When the write completes, pop the header from the deque
|
|
|
+ // (which should now be in the front)
|
|
|
+ header_deque_lock.lock();
|
|
|
+ assert(!headers_inflight.empty() &&
|
|
|
+ &(headers_inflight.front()) == headerp);
|
|
|
+ headers_inflight.pop_front();
|
|
|
+ header_deque_lock.unlock();
|
|
|
+ // And return the frame
|
|
|
+ return_frame(data);
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+void NodeIO::send_epoch(uint32_t epoch_num)
|
|
|
+{
|
|
|
+ uint64_t header = (uint64_t(epoch_num) << 8) + 0x00;
|
|
|
+ send_header_data(header, NULL, 0);
|
|
|
+}
|
|
|
+
|
|
|
+void NodeIO::send_message_header(uint32_t tot_message_len)
|
|
|
+{
|
|
|
+ uint64_t header = (uint64_t(tot_message_len) << 8) + 0x01;
|
|
|
+ send_header_data(header, NULL, 0);
|
|
|
+ // If we're sending a new message header, we have to have finished
|
|
|
+ // sending the previous message.
|
|
|
+ assert(chunksize_inflight == msgsize_inflight);
|
|
|
+ msgsize_inflight = tot_message_len;
|
|
|
+ chunksize_inflight = 0;
|
|
|
+}
|
|
|
+
|
|
|
+void NodeIO::send_chunk(uint8_t *data, uint32_t chunk_len)
|
|
|
+{
|
|
|
+ assert(chunk_len <= MAXCHUNKSIZE);
|
|
|
+ uint64_t header = (uint64_t(chunk_len) << 8) + 0x02;
|
|
|
+ send_header_data(header, data, chunk_len);
|
|
|
+ chunksize_inflight += chunk_len;
|
|
|
+ assert(chunksize_inflight <= msgsize_inflight);
|
|
|
+}
|
|
|
+
|
|
|
+bool NodeIO::recv_header(uint64_t &header)
|
|
|
+{
|
|
|
+ header = 0;
|
|
|
+ try {
|
|
|
+ boost::asio::read(sock, boost::asio::buffer(&header, 5));
|
|
|
+ } catch (...) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+bool NodeIO::recv_chunk(uint64_t header, uint8_t *&data, size_t &len)
|
|
|
+{
|
|
|
+ len = 0;
|
|
|
+ data = NULL;
|
|
|
+ assert((header & 0xff) == 0x02);
|
|
|
+ size_t datalen = header >> 8;
|
|
|
+ if (datalen > MAXCHUNKSIZE) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ boost::asio::read(sock,
|
|
|
+ boost::asio::buffer(receive_frame, datalen));
|
|
|
+ } catch (...) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ data = receive_frame;
|
|
|
+ len = datalen;
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)
|
|
|
: conf(config), myconf(config.nodes[config.my_node_num])
|
|
|
{
|
|
|
num_nodes = conf.nodes.size();
|
|
|
- nodesockets.resize(num_nodes);
|
|
|
+ nodeios.resize(num_nodes);
|
|
|
me = conf.my_node_num;
|
|
|
|
|
|
// Node number n will accept connections from nodes 0, ..., n-1 and
|
|
@@ -38,7 +164,7 @@ NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)
|
|
|
if (node_num >= num_nodes) {
|
|
|
std::cerr << "Received bad node number\n";
|
|
|
} else {
|
|
|
- nodesockets[node_num] = std::move(nodesock);
|
|
|
+ nodeios[node_num].emplace(std::move(nodesock));
|
|
|
#ifdef VERBOSE_NET
|
|
|
std::cerr << "Received connection from " <<
|
|
|
config.nodes[node_num].name << "\n";
|
|
@@ -66,7 +192,7 @@ NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)
|
|
|
unsigned short node_num = (unsigned short)me;
|
|
|
boost::asio::write(nodesock,
|
|
|
boost::asio::buffer(&node_num, sizeof(node_num)));
|
|
|
- nodesockets[i] = std::move(nodesock);
|
|
|
+ nodeios[i].emplace(std::move(nodesock));
|
|
|
#ifdef VERBOSE_NET
|
|
|
std::cerr << "Connected to " << config.nodes[i].name << "\n";
|
|
|
#endif
|