#include #include "config.hpp" #include "net.hpp" NodeIO::NodeIO(tcp::socket &&socket) : sock(std::move(socket)) { } uint8_t *NodeIO::request_frame() { if (frames_available.empty()) { // Allocate a new frame. Note that this memory will (at this // time) never get deallocated. In theory, we could deallocate // it in return_frame, but if a certain number of frames were // allocated here, it means we had that much data in flight // (queued but not accepted for sending by the OS), and we're // likely to need that much again. Subsequent messages will // _reuse_ the allocated data, though, so the used memory won't // grow forever, and will be limited to the amount of in-flight // data needed. return new uint8_t[MAXCHUNKSIZE]; } // Copy the pointer to the frame out of the deque and remove it from // the deque. Note this is _not_ taking the address of the element // *in* the deque (and then popping it, which would invalidate that // pointer). frame_deque_lock.lock(); uint8_t *frame = frames_available.back(); frames_available.pop_back(); frame_deque_lock.unlock(); return frame; } void NodeIO::return_frame(uint8_t *frame) { if (!frame) return; // We push the frame back on to the end of the deque so that it will // be the next one used. This may lead to better cache behaviour? frame_deque_lock.lock(); frames_available.push_back(frame); frame_deque_lock.unlock(); } void NodeIO::send_header_data(uint64_t header, uint8_t *data, size_t len) { std::vector tosend; // Put the header into the deque so it's in memory at a stable // address during the async write header_deque_lock.lock(); headers_inflight.push_back(header); uint64_t *headerp = &(headers_inflight.back()); header_deque_lock.unlock(); tosend.push_back(boost::asio::buffer(headerp, 5)); if (data != NULL && len > 0) { tosend.push_back(boost::asio::buffer(data, len)); } boost::asio::async_write(sock, tosend, [this, headerp, data](boost::system::error_code, std::size_t){ // When the write completes, pop the header from the deque // (which should now be in the front) header_deque_lock.lock(); assert(!headers_inflight.empty() && &(headers_inflight.front()) == headerp); headers_inflight.pop_front(); header_deque_lock.unlock(); // And return the frame return_frame(data); }); } void NodeIO::send_epoch(uint32_t epoch_num) { uint64_t header = (uint64_t(epoch_num) << 8) + 0x00; send_header_data(header, NULL, 0); } void NodeIO::send_message_header(uint32_t tot_message_len) { uint64_t header = (uint64_t(tot_message_len) << 8) + 0x01; send_header_data(header, NULL, 0); // If we're sending a new message header, we have to have finished // sending the previous message. assert(chunksize_inflight == msgsize_inflight); msgsize_inflight = tot_message_len; chunksize_inflight = 0; } void NodeIO::send_chunk(uint8_t *data, uint32_t chunk_len) { assert(chunk_len <= MAXCHUNKSIZE); uint64_t header = (uint64_t(chunk_len) << 8) + 0x02; send_header_data(header, data, chunk_len); chunksize_inflight += chunk_len; assert(chunksize_inflight <= msgsize_inflight); } bool NodeIO::recv_header(uint64_t &header) { header = 0; try { boost::asio::read(sock, boost::asio::buffer(&header, 5)); } catch (...) { return false; } return true; } bool NodeIO::recv_chunk(uint64_t header, uint8_t *&data, size_t &len) { len = 0; data = NULL; assert((header & 0xff) == 0x02); size_t datalen = header >> 8; if (datalen > MAXCHUNKSIZE) { return false; } try { boost::asio::read(sock, boost::asio::buffer(receive_frame, datalen)); } catch (...) { return false; } data = receive_frame; len = datalen; return true; } NetIO::NetIO(boost::asio::io_context &io_context, const Config &config) : conf(config), myconf(config.nodes[config.my_node_num]) { num_nodes = conf.nodes.size(); nodeios.resize(num_nodes); me = conf.my_node_num; // Node number n will accept connections from nodes 0, ..., n-1 and // make connections to nodes n+1, ..., num_nodes-1. This is all // single threaded, but it doesn't deadlock because node 0 isn't // waiting for any incoming connections, so it immediately makes // outgoing connections. When it connects to node 1, that node // accepts its (only) incoming connection, and then starts making // its outgoing connections, etc. tcp::resolver resolver(io_context); tcp::acceptor acceptor(io_context, resolver.resolve(myconf.listenhost, myconf.listenport)->endpoint()); for(size_t i=0; i= num_nodes) { std::cerr << "Received bad node number\n"; } else { nodeios[node_num].emplace(std::move(nodesock)); #ifdef VERBOSE_NET std::cerr << "Received connection from " << config.nodes[node_num].name << "\n"; #endif } } for(size_t i=me+1; i