|
@@ -10,24 +10,51 @@ import nacl.public
|
|
|
import network
|
|
|
import dirauth
|
|
|
|
|
|
-class CircuitCellMsg(network.NetMsg):
|
|
|
+class RelayNetMsg(network.NetMsg):
|
|
|
+ """The subclass of NetMsg for messages between relays and either
|
|
|
+ relays or clients."""
|
|
|
+
|
|
|
+
|
|
|
+class RelayGetConsensusMsg(RelayNetMsg):
|
|
|
+ """The subclass of RelayNetMsg for fetching the consensus."""
|
|
|
+
|
|
|
+
|
|
|
+class RelayConsensusMsg(RelayNetMsg):
|
|
|
+ """The subclass of RelayNetMsg for returning the consensus."""
|
|
|
+
|
|
|
+ def __init__(self, consensus):
|
|
|
+ self.consensus = consensus
|
|
|
+
|
|
|
+
|
|
|
+class RelayRandomHopMsg(RelayNetMsg):
|
|
|
+ """A message used for testing, that hops from relay to relay
|
|
|
+ randomly until its TTL expires."""
|
|
|
+
|
|
|
+ def __init__(self, ttl):
|
|
|
+ self.ttl = ttl
|
|
|
+
|
|
|
+ def __str__(self):
|
|
|
+ return "RandomHop TTL=%d" % self.ttl
|
|
|
+
|
|
|
+
|
|
|
+class CircuitCellMsg(RelayNetMsg):
|
|
|
"""Send a message tagged with a circuit id."""
|
|
|
- def __init__(self, circuitid, msg):
|
|
|
+ def __init__(self, circuitid, cell):
|
|
|
self.circid = circuitid
|
|
|
- self.msg = msg
|
|
|
+ self.cell = cell
|
|
|
|
|
|
def __str__(self):
|
|
|
- return "C%d:%s" % (self.circid, self.msg)
|
|
|
+ return "C%d:%s" % (self.circid, self.cell)
|
|
|
|
|
|
|
|
|
-class MultiplexedCircuitConnection(network.Connection):
|
|
|
- """A class representing a connection between a relay and either a
|
|
|
+class Channel(network.Connection):
|
|
|
+ """A class representing a channel between a relay and either a
|
|
|
client or a relay, transporting cells from various circuits."""
|
|
|
def __init__(self):
|
|
|
super().__init__()
|
|
|
- # The CellRelay managing this MultiplexedCircuitConnection
|
|
|
+ # The CellRelay managing this Channel
|
|
|
self.cellrelay = None
|
|
|
- # The MultiplexedCircuitConnection at the other end
|
|
|
+ # The Channel at the other end
|
|
|
self.peer = None
|
|
|
# The function to call when the connection closes
|
|
|
self.closer = lambda: 0
|
|
@@ -41,52 +68,101 @@ class MultiplexedCircuitConnection(network.Connection):
|
|
|
self.peer.closed()
|
|
|
self.closed()
|
|
|
|
|
|
- def send_cell(self, circid, msg):
|
|
|
+ def send_cell(self, circid, cell):
|
|
|
"""Send the given message, tagged for the given circuit id."""
|
|
|
- cell = CircuitCellMsg(circid, msg)
|
|
|
- self.peer.received(self.cellrelay.myaddr, cell)
|
|
|
+ msg = CircuitCellMsg(circid, cell)
|
|
|
+ self.send_msg(msg)
|
|
|
+
|
|
|
+ def send_msg(self, msg):
|
|
|
+ """Send the given NetMsg on the channel."""
|
|
|
+ self.peer.received(self.cellrelay.myaddr, msg)
|
|
|
|
|
|
- def received(self, peeraddr, cell):
|
|
|
- """Callback when a cell is received from the network."""
|
|
|
- circid, msg = cell.circid, cell.msg
|
|
|
- print("received", msg, "on circuit", circid, "from", peeraddr)
|
|
|
+ def received(self, peeraddr, msg):
|
|
|
+ """Callback when a message is received from the network."""
|
|
|
+ if isinstance(msg, CircuitCellMsg):
|
|
|
+ circid, cell = msg.circid, msg.cell
|
|
|
+ self.cellrelay.received_cell(circid, cell, peeraddr, self.peer)
|
|
|
+ else:
|
|
|
+ self.cellrelay.received_msg(msg, peeraddr, self.peer)
|
|
|
|
|
|
|
|
|
class CellRelay:
|
|
|
- """The class that manages the connections to other relays and
|
|
|
- clients. Relays and clients both use this class to both create
|
|
|
- on-demand connections to relays, to gracefully handle the closing of
|
|
|
- connections, and to handle commands received over the
|
|
|
- connections."""
|
|
|
-
|
|
|
- def __init__(self, myaddr):
|
|
|
- # A dictionary of MultiplexedCircuitConnections to other hosts,
|
|
|
- # indexed by NetAddr
|
|
|
- self.connections = dict()
|
|
|
+ """The class that manages the channels to other relays and clients.
|
|
|
+ Relays and clients both use this class to both create on-demand
|
|
|
+ channels to relays, to gracefully handle the closing of channels,
|
|
|
+ and to handle commands received over the channels."""
|
|
|
+
|
|
|
+ def __init__(self, myaddr, dirauthaddrs):
|
|
|
+ # A dictionary of Channels to other hosts, indexed by NetAddr
|
|
|
+ self.channels = dict()
|
|
|
self.myaddr = myaddr
|
|
|
+ self.dirauthaddrs = dirauthaddrs
|
|
|
+ self.consensus = None
|
|
|
|
|
|
- def get_connection_to(self, addr):
|
|
|
- """Get the MultiplexedCircuitConnection connected to the given
|
|
|
- NetAddr, creating one if none exists right now."""
|
|
|
- if addr in self.connections:
|
|
|
- return self.connections[addr]
|
|
|
+ def get_consensus(self):
|
|
|
+ """Download a fresh consensus from a random dirauth."""
|
|
|
+ a = random.choice(self.dirauthaddrs)
|
|
|
+ c = network.thenetwork.connect(self, a)
|
|
|
+ self.consensus = c.getconsensus()
|
|
|
+ print('consensus downloaded:', self.consensus)
|
|
|
+ c.close()
|
|
|
+
|
|
|
+ def add_channel(self, channel, peeraddr):
|
|
|
+ """Add the given channel to the list of channels we are
|
|
|
+ managing. If we are already managing a channel to the same
|
|
|
+ peer, close it first."""
|
|
|
+ if peeraddr in self.channels:
|
|
|
+ self.channels[peeraddr].close()
|
|
|
+
|
|
|
+ channel.cellrelay = self
|
|
|
+ self.channels[peeraddr] = channel
|
|
|
+
|
|
|
+ def get_channel_to(self, addr):
|
|
|
+ """Get the Channel connected to the given NetAddr, creating one
|
|
|
+ if none exists right now."""
|
|
|
+ if addr in self.channels:
|
|
|
+ return self.channels[addr]
|
|
|
|
|
|
# Create the new connection
|
|
|
newconn = network.thenetwork.connect(self.myaddr, addr)
|
|
|
- self.connections[addr] = newconn
|
|
|
- newconn.closer = lambda: self.connections.pop(addr)
|
|
|
+ self.channels[addr] = newconn
|
|
|
+ newconn.closer = lambda: self.channels.pop(addr)
|
|
|
newconn.cellrelay = self
|
|
|
|
|
|
return newconn
|
|
|
|
|
|
+ def received_msg(self, msg, peeraddr, peer):
|
|
|
+ """Callback when a NetMsg not specific to a circuit is
|
|
|
+ received."""
|
|
|
+ print("Node %s received msg %s from %s" % (self.myaddr, msg, peeraddr))
|
|
|
+ if isinstance(msg, RelayRandomHopMsg):
|
|
|
+ if msg.ttl > 0:
|
|
|
+ # Pick a random next hop from the consensus
|
|
|
+ nexthop = random.choice(self.consensus.consdict['relays'])
|
|
|
+ nextaddr = nexthop.descdict['addr']
|
|
|
+ self.send_msg(RelayRandomHopMsg(msg.ttl-1), nextaddr)
|
|
|
+
|
|
|
+
|
|
|
+ def received_cell(self, circid, cell, peeraddr, peer):
|
|
|
+ """Callback with a circuit-specific cell is received."""
|
|
|
+ print("Node %s received cell on circ %d: %s from %s" % (self.myaddr, circid, cell, peeraddr))
|
|
|
+
|
|
|
+ def send_msg(self, msg, peeraddr):
|
|
|
+ """Send a message to the peer with the given address."""
|
|
|
+ conn = self.get_channel_to(peeraddr)
|
|
|
+ conn.send_msg(msg)
|
|
|
+
|
|
|
+ def send_cell(self, circid, cell, peeraddr):
|
|
|
+ """Send a cell on the given circuit to the peer with the given
|
|
|
+ address."""
|
|
|
+ conn = self.get_channel_to(peeraddr)
|
|
|
+ conn.send_cell(circid, cell)
|
|
|
+
|
|
|
|
|
|
class Relay(network.Server):
|
|
|
"""The class representing an onion relay."""
|
|
|
|
|
|
def __init__(self, dirauthaddrs, bw, flags):
|
|
|
- self.consensus = None
|
|
|
- self.dirauthaddrs = dirauthaddrs
|
|
|
-
|
|
|
# Create the identity and onion keys
|
|
|
self.idkey = nacl.signing.SigningKey.generate()
|
|
|
self.onionkey = nacl.public.PrivateKey.generate()
|
|
@@ -104,7 +180,7 @@ class Relay(network.Server):
|
|
|
network.thenetwork.wantepochticks(self, True)
|
|
|
|
|
|
# Create the CellRelay connection manager
|
|
|
- self.cellrelay = CellRelay(self.netaddr)
|
|
|
+ self.cellrelay = CellRelay(self.netaddr, dirauthaddrs)
|
|
|
|
|
|
self.uploaddesc()
|
|
|
|
|
@@ -112,10 +188,7 @@ class Relay(network.Server):
|
|
|
# Download the new consensus, which will have been created
|
|
|
# already since the dirauths' epoch_ending callbacks happened
|
|
|
# before the relays'.
|
|
|
- a = random.choice(self.dirauthaddrs)
|
|
|
- c = network.thenetwork.connect(self, a)
|
|
|
- self.consensus = c.getconsensus()
|
|
|
- c.close()
|
|
|
+ self.cellrelay.get_consensus()
|
|
|
|
|
|
def newepoch(self, epoch):
|
|
|
self.uploaddesc()
|
|
@@ -136,23 +209,26 @@ class Relay(network.Server):
|
|
|
descmsg = dirauth.DirAuthUploadDescMsg(desc)
|
|
|
|
|
|
# Upload them
|
|
|
- for a in self.dirauthaddrs:
|
|
|
+ for a in self.cellrelay.dirauthaddrs:
|
|
|
c = network.thenetwork.connect(self, a)
|
|
|
c.sendmsg(descmsg)
|
|
|
c.close()
|
|
|
|
|
|
def connected(self, peer):
|
|
|
"""Callback invoked when someone (client or relay) connects to
|
|
|
- us. Create a pair of linked MultiplexedCircuitConnections and
|
|
|
- return the peer half to the peer."""
|
|
|
+ us. Create a pair of linked Channels and return the peer half
|
|
|
+ to the peer."""
|
|
|
|
|
|
# Create the linked pair
|
|
|
- peerconn = MultiplexedCircuitConnection()
|
|
|
- ourconn = MultiplexedCircuitConnection()
|
|
|
- peerconn.peer = ourconn
|
|
|
- ourconn.peer = peerconn
|
|
|
+ peerchannel = Channel()
|
|
|
+ ourchannel = Channel()
|
|
|
+ peerchannel.peer = ourchannel
|
|
|
+ ourchannel.peer = peerchannel
|
|
|
+
|
|
|
+ # Add our channel to the CellRelay
|
|
|
+ self.cellrelay.add_channel(ourchannel, peer)
|
|
|
|
|
|
- return peerconn
|
|
|
+ return peerchannel
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
# Start some dirauths
|
|
@@ -174,6 +250,12 @@ if __name__ == '__main__':
|
|
|
bw = int(200000-(200000-25000)/3*math.log10(x))
|
|
|
relays.append(Relay(dirauthaddrs, bw, 0))
|
|
|
|
|
|
+ # The fallback relays are a hardcoded list of about 5% of the
|
|
|
+ # relays, used by clients for bootstrapping
|
|
|
+ numfallbackrelays = int(numrelays * 0.05) + 1
|
|
|
+ fallbackrelays = random.sample(relays, numfallbackrelays)
|
|
|
+ network.thenetwork.setfallbackrelays(fallbackrelays)
|
|
|
+
|
|
|
# Tick the epoch
|
|
|
network.thenetwork.nextepoch()
|
|
|
|
|
@@ -181,14 +263,12 @@ if __name__ == '__main__':
|
|
|
|
|
|
print('ticked; epoch=', network.thenetwork.getepoch())
|
|
|
|
|
|
- c = relays[3].cellrelay.get_connection_to(relays[3].consensus.consdict['relays'][5].descdict['addr'])
|
|
|
-
|
|
|
- c.send_cell(1, network.StringNetMsg("test"))
|
|
|
- c.close()
|
|
|
- c2 = relays[3].cellrelay.get_connection_to(relays[3].consensus.consdict['relays'][6].descdict['addr'])
|
|
|
- c = relays[3].cellrelay.get_connection_to(relays[3].consensus.consdict['relays'][5].descdict['addr'])
|
|
|
- c.send_cell(2, network.StringNetMsg("cell"))
|
|
|
- c3 = relays[3].cellrelay.get_connection_to(relays[3].consensus.consdict['relays'][1].descdict['addr'])
|
|
|
- c = relays[3].cellrelay.get_connection_to(relays[3].consensus.consdict['relays'][5].descdict['addr'])
|
|
|
- c.send_cell(3, network.StringNetMsg("again"))
|
|
|
- c.close()
|
|
|
+ relays[3].cellrelay.send_msg(RelayRandomHopMsg(30), relays[5].netaddr)
|
|
|
+
|
|
|
+ # See what channels exist
|
|
|
+ for r in relays:
|
|
|
+ print("%s: %s" % (r.netaddr, [ str(k) for k in r.cellrelay.channels.keys()]))
|
|
|
+ #relays[3].cellrelay.send_cell(1, network.StringNetMsg("test"), relays[3].consensus.consdict['relays'][5].descdict['addr'])
|
|
|
+ #relays[3].cellrelay.send_cell(2, network.StringNetMsg("cell"), relays[3].consensus.consdict['relays'][6].descdict['addr'])
|
|
|
+ #relays[3].cellrelay.send_cell(2, network.StringNetMsg("again"), relays[3].consensus.consdict['relays'][1].descdict['addr'])
|
|
|
+ #relays[3].cellrelay.send_cell(2, network.StringNetMsg("and again"), relays[3].consensus.consdict['relays'][5].descdict['addr'])
|