Browse Source

Merge remote-tracking branch 'tor-github/pr/28'

teor 4 years ago
parent
commit
49dceb9690
3 changed files with 323 additions and 296 deletions
  1. 2 1
      lib/chutney/TorNet.py
  2. 318 291
      lib/chutney/Traffic.py
  3. 3 4
      scripts/chutney_tests/verify.py

+ 2 - 1
lib/chutney/TorNet.py

@@ -1406,10 +1406,11 @@ def runConfigFile(verb, data):
     if verb in getTests():
         test_module = importlib.import_module("chutney_tests.{}".format(verb))
         try:
-            return test_module.run_test(network)
+            run_test = test_module.run_test
         except AttributeError as e:
             print("Error running test {!r}: {}".format(verb, e))
             return False
+        return run_test(network)
 
     # tell the user we don't know what their verb meant
     if not hasattr(network, verb):

+ 318 - 291
lib/chutney/Traffic.py

@@ -24,14 +24,44 @@ from __future__ import print_function
 
 import sys
 import socket
-import select
 import struct
 import errno
 import time
 import os
 
+import asyncore
+import asynchat
+
 from chutney.Debug import debug_flag, debug
 
+def note(s):
+    sys.stderr.write("NOTE: %s\n"%s)
+def warn(s):
+    sys.stderr.write("WARN: %s\n"%s)
+
+UNIQ_CTR = 0
+def uniq(s):
+    global UNIQ_CTR
+    UNIQ_CTR += 1
+    return "%s-%s"%(s,UNIQ_CTR)
+
+if sys.version_info[0] >= 3:
+    def byte_to_int(b):
+        return b
+else:
+    def byte_to_int(b):
+        return ord(b)
+
+def addr_to_family(addr):
+    for family in [socket.AF_INET, socket.AF_INET6]:
+        try:
+            socket.inet_pton(family, addr)
+            return family
+        except (socket.error, OSError):
+            pass
+
+    return socket.AF_INET
+
 def socks_cmd(addr_port):
     """
     Return a SOCKS command for connecting to addr_port.
@@ -54,27 +84,49 @@ def socks_cmd(addr_port):
         dnsname = dnsname.encode("ascii")
     return struct.pack('!BBH', ver, cmd, port) + addr + user + dnsname
 
-
 class TestSuite(object):
 
     """Keep a tab on how many tests are pending, how many have failed
     and how many have succeeded."""
 
     def __init__(self):
+        self.tests = {}
         self.not_done = 0
         self.successes = 0
         self.failures = 0
+        self.teststatus = {}
 
-    def add(self):
-        self.not_done += 1
+    def note(self, testname, status):
+        self.teststatus[testname] = status
 
-    def success(self):
-        self.not_done -= 1
-        self.successes += 1
-
-    def failure(self):
-        self.not_done -= 1
-        self.failures += 1
+    def add(self, name):
+        note("Registering %s"%name)
+        if name not in self.tests:
+            debug("Registering %s"%name)
+            self.not_done += 1
+            self.tests[name] = 'not done'
+        else:
+            warn("... already registered!")
+
+    def success(self, name):
+        note("Success for %s"%name)
+        if self.tests[name] == 'not done':
+            debug("Succeeded %s"%name)
+            self.tests[name] = 'success'
+            self.not_done -= 1
+            self.successes += 1
+        else:
+            warn("... status was %s"%self.tests.get(name))
+
+    def failure(self, name):
+        note("Failure for %s"%name)
+        if self.tests[name] == 'not done':
+            debug("Failed %s"%name)
+            self.tests[name] = 'failure'
+            self.not_done -= 1
+            self.failures += 1
+        else:
+            warn("... status was %s"%self.tests.get(name))
 
     def failure_count(self):
         return self.failures
@@ -83,107 +135,131 @@ class TestSuite(object):
         return self.not_done == 0
 
     def status(self):
-        return('%d/%d/%d' % (self.not_done, self.successes, self.failures))
-
-
-class Peer(object):
-
-    "Base class for Listener, Source and Sink."
-    LISTENER = 1
-    SOURCE = 2
-    SINK = 3
-
-    def __init__(self, ptype, tt, s=None):
-        self.type = ptype
-        self.tt = tt  # TrafficTester
-        if s is not None:
-            self.s = s
-        else:
-            self.s = socket.socket()
-            self.s.setblocking(False)
-
-    def fd(self):
-        return self.s.fileno()
-
-    def is_source(self):
-        return self.type == self.SOURCE
-
-    def is_sink(self):
-        return self.type == self.SINK
-
-
-class Listener(Peer):
+        return('%s: %d/%d/%d' % (self.tests, self.not_done, self.successes,
+                                 self.failures))
 
+class Listener(asyncore.dispatcher):
     "A TCP listener, binding, listening and accepting new connections."
 
     def __init__(self, tt, endpoint):
-        super(Listener, self).__init__(Peer.LISTENER, tt)
-        self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        self.s.bind(endpoint)
-        self.s.listen(0)
-
-    def accept(self):
-        newsock, endpoint = self.s.accept()
-        debug("new client from %s:%s (fd=%d)" %
-              (endpoint[0], endpoint[1], newsock.fileno()))
-        self.tt.add(Sink(self.tt, newsock))
+        asyncore.dispatcher.__init__(self, map=tt.socket_map)
+        self.create_socket(addr_to_family(endpoint[0]), socket.SOCK_STREAM)
+        self.set_reuse_addr()
+        self.bind(endpoint)
+        self.listen(0)
+        self.tt = tt
+
+    def writable(self):
+        return False
 
+    def handle_accept(self):
+        # deprecated in python 3.2
+        pair = self.accept()
+        if pair is not None:
+            newsock, endpoint = pair
+            debug("new client from %s:%s (fd=%d)" %
+                  (endpoint[0], endpoint[1], newsock.fileno()))
+            self.tt.add_responder(newsock)
 
-class Sink(Peer):
+    def fileno(self):
+        return self.socket.fileno()
 
-    "A data sink, reading from its peer and verifying the data."
+class DataSource(object):
+    """A data source generates some number of bytes of data, and then
+       returns None.
 
-    def __init__(self, tt, s):
-        super(Sink, self).__init__(Peer.SINK, tt, s)
-        self.inbuf = b''
-        self.repetitions = self.tt.repetitions
+       For convenience, it conforms to the 'producer' api.
+    """
+    def __init__(self, data, repetitions=1):
+        self.data = data
+        self.repetitions = repetitions
+        self.sent_any = False
 
-    def on_readable(self):
-        """Invoked when the socket becomes readable.
-        Return 0 on finished, successful verification.
-               -1 on failed verification
-               >0 if more data needs to be read
-        """
-        return self.verify(self.tt.data)
+    def copy(self):
+        assert not self.sent_any
+        return DataSource(self.data, self.repetitions)
 
-    def verify(self, data):
-        # shortcut read when we don't ever expect any data
-        if self.repetitions == 0 or len(self.tt.data) == 0:
-            debug("no verification required - no data")
-            return 0
-        inp = self.s.recv(len(data) - len(self.inbuf))
-        debug("Verify: received %d bytes"% len(inp))
-        if len(inp) == 0:
-            debug("EOF on fd %s" % self.fd())
-            return -1
-        self.inbuf += inp
-        debug("successfully received (bytes=%d)" % len(self.inbuf))
-        while len(self.inbuf) >= len(data):
-            assert(len(self.inbuf) <= len(data) or self.repetitions > 1)
-            if self.inbuf[:len(data)] != data:
-                debug("receive comparison failed (bytes=%d)" % len(data))
-                return -1  # Failed verification.
-            # if we're not debugging, print a dot every dot_repetitions reps
-            elif (not debug_flag and self.tt.dot_repetitions > 0 and
-                  self.repetitions % self.tt.dot_repetitions == 0):
-                sys.stdout.write('.')
-                sys.stdout.flush()
-            # repeatedly check data against self.inbuf if required
-            debug("receive comparison success (bytes=%d)" % len(data))
-            self.inbuf = self.inbuf[len(data):]
-            debug("receive leftover bytes (bytes=%d)" % len(self.inbuf))
+    def more(self):
+        self.sent_any = True
+        if self.repetitions > 0:
             self.repetitions -= 1
-            debug("receive remaining repetitions (reps=%d)" % self.repetitions)
-        if self.repetitions == 0 and len(self.inbuf) == 0:
-            debug("successful verification")
-        # calculate the actual length of data remaining, including reps
-        debug("receive remaining bytes (bytes=%d)"
-              % (self.repetitions*len(data) - len(self.inbuf)))
-        return self.repetitions*len(data) - len(self.inbuf)
+            return self.data
+
+        return None
+
+class DataChecker(object):
+    """A data checker verifies its input against bytes in a stream."""
+    def __init__(self, source):
+        self.source = source
+        self.pending = b""
+        self.succeeded = False
+        self.failed = False
+
+    def consume(self, inp):
+        if self.failed:
+            return
+        if self.succeeded and len(inp):
+            self.succeeded = False
+            self.failed = True
+            return
+
+        while len(inp):
+            n = min(len(inp), len(self.pending))
+            if inp[:n] != self.pending[:n]:
+                self.failed = True
+                return
+            inp = inp[n:]
+            self.pending = self.pending[n:]
+            if not self.pending:
+                self.pending = self.source.more()
+
+                if self.pending is None:
+                    if len(inp):
+                        self.failed = True
+                    else:
+                        self.succeeded = True
+                    return
+
+class Sink(asynchat.async_chat):
+    "A data sink, reading from its peer and verifying the data."
+    def __init__(self, sock, tt):
+        asynchat.async_chat.__init__(self, sock, map=tt.socket_map)
+        self.set_terminator(None)
+        self.tt = tt
+        self.data_checker = DataChecker(tt.data_source.copy())
+        self.testname = uniq("recv-data")
 
+    def get_test_names(self):
+        return [ self.testname ]
 
-class Source(Peer):
+    def collect_incoming_data(self, inp):
+        # shortcut read when we don't ever expect any data
 
+        debug("successfully received (bytes=%d)" % len(inp))
+        self.data_checker.consume(inp)
+        if self.data_checker.succeeded:
+            debug("successful verification")
+            self.close()
+            self.tt.success(self.testname)
+        elif self.data_checker.failed:
+            debug("receive comparison failed")
+            self.tt.failure(self.testname)
+            self.close()
+
+    def fileno(self):
+        return self.socket.fileno()
+
+class CloseSourceProducer:
+    """Helper: when this producer is returned, a source is successful."""
+    def __init__(self, source):
+        self.source = source
+
+    def more(self):
+        self.source.note("Flushed")
+        self.source.sent_ok()
+        return b""
+
+class Source(asynchat.async_chat):
     """A data source, connecting to a TCP server, optionally over a
     SOCKS proxy, sending data."""
     NOT_CONNECTED = 0
@@ -191,137 +267,110 @@ class Source(Peer):
     CONNECTING_THROUGH_PROXY = 2
     CONNECTED = 5
 
-    def __init__(self, tt, server, buf, proxy=None, repetitions=1):
-        super(Source, self).__init__(Peer.SOURCE, tt)
-        self.state = self.NOT_CONNECTED
-        self.data = buf
-        self.outbuf = b''
+    def __init__(self, tt, server, proxy=None):
+        asynchat.async_chat.__init__(self, map=tt.socket_map)
+        self.data_source = tt.data_source.copy()
         self.inbuf = b''
         self.proxy = proxy
-        self.repetitions = repetitions
-        self._sent_no_bytes = 0
-        # sanity checks
-        if len(self.data) == 0:
-            self.repetitions = 0
-        if self.repetitions == 0:
-            self.data = {}
-        self.connect(server)
-
-    def connect(self, endpoint):
-        self.dest = endpoint
+        self.server = server
+        self.tt = tt
+        self.testname = uniq("send-data")
+
+        self.set_terminator(None)
+        dest = (self.proxy or self.server)
+        self.create_socket(addr_to_family(dest[0]), socket.SOCK_STREAM)
+        debug("socket %d connecting to %r..."%(self.fileno(),dest))
         self.state = self.CONNECTING
-        dest = self.proxy or self.dest
-        try:
-            debug("socket %d connecting to %r..."%(self.fd(),dest))
-            self.s.connect(dest)
-        except socket.error as e:
-            if e.errno != errno.EINPROGRESS:
-                raise
-
-    def on_readable(self):
-        """Invoked when the socket becomes readable.
-        Return -1 on failure
-               >0 if more data needs to be read or written
-        """
+        self.connect(dest)
+
+    def get_test_names(self):
+        return [ self.testname ]
+
+    def sent_ok(self):
+        self.tt.success(self.testname)
+
+    def note(self, s):
+        self.tt.tests.note(self.testname, s)
+
+    def handle_connect(self):
+        if self.proxy:
+            self.state = self.CONNECTING_THROUGH_PROXY
+            self.note("connected, sending socks handshake")
+            self.push(socks_cmd(self.server))
+        else:
+            self.state = self.CONNECTED
+            self.push_output()
+
+    def collect_incoming_data(self, data):
+        self.inbuf += data
         if self.state == self.CONNECTING_THROUGH_PROXY:
-            inp = self.s.recv(8 - len(self.inbuf))
-            debug("-- connecting through proxy, got %d bytes"%len(inp))
-            if len(inp) == 0:
-                debug("EOF on fd %d"%self.fd())
-                return -1
-            self.inbuf += inp
-            if len(self.inbuf) == 8:
+            if len(self.inbuf) >= 8:
                 if self.inbuf[:2] == b'\x00\x5a':
-                    debug("proxy handshake successful (fd=%d)" % self.fd())
+                    self.note("proxy handshake successful")
                     self.state = self.CONNECTED
-                    self.inbuf = b''
-                    debug("successfully connected (fd=%d)" % self.fd())
-                    # if we have no reps or no data, skip sending actual data
-                    if self.want_to_write():
-                        return 1    # Keep us around for writing.
-                    else:
-                        # shortcut write when we don't ever expect any data
-                        debug("no connection required - no data")
-                        return 0
+                    debug("successfully connected (fd=%d)" % self.fileno())
+                    self.inbuf = self.inbuf[8:]
+                    self.push_output()
                 else:
                     debug("proxy handshake failed (0x%x)! (fd=%d)" %
-                          (ord(self.inbuf[1]), self.fd()))
+                          (byte_to_int(self.inbuf[1]), self.fileno()))
                     self.state = self.NOT_CONNECTED
-                    return -1
-            assert(8 - len(self.inbuf) > 0)
-            return 8 - len(self.inbuf)
-        return self.want_to_write()  # Keep us around for writing if needed
-
-    def want_to_write(self):
-        if self.state == self.CONNECTING:
-            return True
-        if len(self.outbuf) > 0:
-            return True
-        if (self.state == self.CONNECTED and
-            self.repetitions > 0 and
-            len(self.data) > 0):
-            return True
-        return False
+                    self.close()
 
-    def on_writable(self):
-        """Invoked when the socket becomes writable.
-        Return 0 when done writing
-               -1 on failure (like connection refused)
-               >0 if more data needs to be written
-        """
-        if self.state == self.CONNECTING:
-            if self.proxy is None:
-                self.state = self.CONNECTED
-                debug("successfully connected (fd=%d)" % self.fd())
-            else:
-                self.state = self.CONNECTING_THROUGH_PROXY
-                self.outbuf = socks_cmd(self.dest)
-                # we write socks_cmd() to the proxy, then read the response
-                # if we get the correct response, we're CONNECTED
-        if self.state == self.CONNECTED:
-            # repeat self.data into self.outbuf if required
-            if (len(self.outbuf) < len(self.data) and self.repetitions > 0):
-                self.outbuf += self.data
-                self.repetitions -= 1
-                debug("adding more data to send (bytes=%d)" % len(self.data))
-                debug("now have data to send (bytes=%d)" % len(self.outbuf))
-                debug("send repetitions remaining (reps=%d)"
-                      % self.repetitions)
-        try:
-            n = self.s.send(self.outbuf)
-        except socket.error as e:
-            if e.errno == errno.ECONNREFUSED:
-                debug("connection refused (fd=%d)" % self.fd())
-                return -1
-            raise
-        # sometimes, this debug statement prints 0
-        # it should print length of the data sent
-        # but the code works as long as this doesn't keep on happening
-        if n > 0:
-            debug("successfully sent (bytes=%d)" % n)
-            self._sent_no_bytes = 0
-        else:
-            debug("BUG: sent no bytes (out of %d; state is %s)"% (len(self.outbuf), self.state))
-            self._sent_no_bytes += 1
-            # We can't retry too fast, otherwise clients burn all their HSDirs
-            if self._sent_no_bytes >= 2:
-                print("Sent no data %d times. Stalled." %
-                      (self._sent_no_bytes))
-                return -1
-            time.sleep(5)
-        self.outbuf = self.outbuf[n:]
-        if self.state == self.CONNECTING_THROUGH_PROXY:
-            return 1  # Keep us around.
-        debug("bytes remaining on outbuf (bytes=%d)" % len(self.outbuf))
-        # calculate the actual length of data remaining, including reps
-        # When 0, we're being removed.
-        debug("bytes remaining overall (bytes=%d)"
-              % (self.repetitions*len(self.data) + len(self.outbuf)))
-        return self.repetitions*len(self.data) + len(self.outbuf)
+    def push_output(self):
+        self.note("pushed output")
+        self.push_with_producer(self.data_source)
+
+        self.push_with_producer(CloseSourceProducer(self))
+
+    def fileno(self):
+        return self.socket.fileno()
+
+class EchoServer(asynchat.async_chat):
+    def __init__(self, sock, tt):
+        asynchat.async_chat.__init__(self, sock, map=tt.socket_map)
+        self.set_terminator(None)
+        self.tt = tt
+        self.am_closing = False
+
+    def collect_incoming_data(self, data):
+        self.push(data)
 
+class EchoClient(Source):
+    def __init__(self, tt, server, proxy=None):
+        Source.__init__(self, tt, server, proxy)
+        self.data_checker = DataChecker(tt.data_source.copy())
+        self.testname_check = uniq("check")
+        self.am_closing = False
 
-class TrafficTester():
+    def enote(self, s):
+        self.tt.tests.note(self.testname_check, s)
 
+    def get_test_names(self):
+        return [ self.testname, self.testname_check ]
+
+    def collect_incoming_data(self, data):
+        if self.state == self.CONNECTING_THROUGH_PROXY:
+            Source.collect_incoming_data(self, data)
+            if self.state == self.CONNECTING_THROUGH_PROXY:
+                return
+            data = self.inbuf
+            self.inbuf = b""
+
+        self.data_checker.consume(data)
+        self.enote("consumed some")
+
+        if self.data_checker.succeeded:
+            self.enote("successful verification")
+            debug("successful verification")
+            self.close()
+            self.tt.success(self.testname_check)
+        elif self.data_checker.failed:
+            debug("receive comparison failed")
+            self.tt.failure(self.testname_check)
+            self.close()
+
+class TrafficTester(object):
     """
     Hang on select.select() and dispatch to Sources and Sinks.
     Time out after self.timeout seconds.
@@ -332,101 +381,79 @@ class TrafficTester():
 
     def __init__(self,
                  endpoint,
-                 data={},
+                 data=b"",
                  timeout=3,
                  repetitions=1,
-                 dot_repetitions=0):
+                 dot_repetitions=0,
+                 chat_type="Echo"):
+        if chat_type == "Echo":
+            self.client_class = EchoClient
+            self.responder_class = EchoServer
+        else:
+            self.client_class = Source
+            self.responder_class = Sink
+
+        self.socket_map = {}
+
         self.listener = Listener(self, endpoint)
         self.pending_close = []
         self.timeout = timeout
         self.tests = TestSuite()
-        self.data = data
-        self.repetitions = repetitions
+        self.data_source = DataSource(data, repetitions)
+
         # sanity checks
-        if len(self.data) == 0:
-            self.repetitions = 0
-        if self.repetitions == 0:
-            self.data = {}
         self.dot_repetitions = dot_repetitions
-        debug("listener fd=%d" % self.listener.fd())
-        self.peers = {}  # fd:Peer
+        debug("listener fd=%d" % self.listener.fileno())
 
-    def sinks(self):
-        return self.get_by_ptype(Peer.SINK)
+    def add(self, item):
+        """Register a single item."""
+        # We used to hold on to these items for their fds, but now
+        # asyncore manages them for us.
+        if hasattr(item, "get_test_names"):
+            for name in item.get_test_names():
+                self.tests.add(name)
 
-    def sources(self):
-        return self.get_by_ptype(Peer.SOURCE)
+    def add_client(self, server, proxy=None):
+        source = self.client_class(self, server, proxy)
+        self.add(source)
 
-    def get_by_ptype(self, ptype):
-        return list(filter(lambda p: p.type == ptype, self.peers.values()))
+    def add_responder(self, socket):
+        sink = self.responder_class(socket, self)
+        self.add(sink)
 
-    def add(self, peer):
-        self.peers[peer.fd()] = peer
-        if peer.is_source():
-            self.tests.add()
+    def success(self, name):
+        """Declare that a single test has passed."""
+        self.tests.success(name)
 
-    def remove(self, peer):
-        self.peers.pop(peer.fd())
-        self.pending_close.append(peer.s)
+    def failure(self, name):
+        """Declare that a single test has failed."""
+        self.tests.failure(name)
 
     def run(self):
-        while not self.tests.all_done() and self.timeout > 0:
-            rset = [self.listener.fd()] + list(self.peers)
-            wset = [p.fd() for p in
-                    filter(lambda x: x.want_to_write(), self.sources())]
-            # debug("rset %s wset %s" % (rset, wset))
-            sets = select.select(rset, wset, [], 1)
-            if all(len(s) == 0 for s in sets):
-                debug("Decrementing timeout.")
-                self.timeout -= 1
-                continue
-
-            for fd in sets[0]:  # readable fd's
-                if fd == self.listener.fd():
-                    self.listener.accept()
-                    continue
-                p = self.peers[fd]
-                n = p.on_readable()
-                debug("On read, fd %d for %s said %d"%(fd, p, n))
-                if n > 0:
-                    # debug("need %d more octets from fd %d" % (n, fd))
-                    pass
-                elif n == 0:  # Success.
-                    self.tests.success()
-                    self.remove(p)
-                else:       # Failure.
-                    debug("Got a failure reading fd %d for %s" % (fd,p))
-                    self.tests.failure()
-                    if p.is_sink():
-                        print("verification failed!")
-                    self.remove(p)
-
-            for fd in sets[1]:  # writable fd's
-                p = self.peers.get(fd)
-                if p is not None:  # Might have been removed above.
-                    n = p.on_writable()
-                    debug("On write, fd %d said %d"%(fd, n))
-                    if n == 0:
-                        self.remove(p)
-                    elif n < 0:
-                        debug("Got a failure writing fd %d for %s" % (fd,p))
-                        self.tests.failure()
-                        self.remove(p)
-
-        for fd in self.peers:
-            peer = self.peers[fd]
-            debug("peer fd=%d never pending close, never read or wrote" % fd)
-            self.pending_close.append(peer.s)
-        self.listener.s.close()
-        for s in self.pending_close:
-            s.close()
+        start = now = time.time()
+        end = time.time() + self.timeout
+        DUMP_TEST_STATUS_INTERVAL=0.5
+        dump_at = start+DUMP_TEST_STATUS_INTERVAL
+        while now < end and not self.tests.all_done():
+            # run only one iteration at a time, with a nice short timeout, so we
+            # can actually detect completion and timeouts.
+            asyncore.loop(0.2, False, self.socket_map, 1)
+            now = time.time()
+            if now > dump_at:
+                debug("Test status: %s"%self.tests.status())
+                dump_at += DUMP_TEST_STATUS_INTERVAL
+
         if not debug_flag:
             sys.stdout.write('\n')
             sys.stdout.flush()
         debug("Done with run(); all_done == %s and failure_count == %s"
               %(self.tests.all_done(), self.tests.failure_count()))
-        return self.tests.all_done() and self.tests.failure_count() == 0
 
+        note("Status:\n%s"%self.tests.teststatus)
+
+        self.listener.close()
+
+        return self.tests.all_done() and self.tests.failure_count() == 0
 
 def main():
     """Test the TrafficTester by sending and receiving some data."""
@@ -435,7 +462,7 @@ def main():
 
     tt = TrafficTester(bind_to, DATA)
     # Don't use a proxy for self-testing, so that we avoid tor entirely
-    tt.add(Source(tt, bind_to, DATA))
+    tt.add_client(bind_to)
     success = tt.run()
 
     if success:

+ 3 - 4
scripts/chutney_tests/verify.py

@@ -155,8 +155,7 @@ def _configure_exits(tt, bind_to, tmpdata, reps, client_list, exit_list,
                      'localhost', op._env['socksport']))
             for _ in range(connection_count):
                 proxy = ('localhost', int(op._env['socksport']))
-                tt.add(chutney.Traffic.Source(tt, bind_to, tmpdata, proxy,
-                                              reps))
+                tt.add_client(bind_to, proxy)
     return exit_path_node_count
 
 
@@ -188,8 +187,8 @@ def _configure_hs(tt, tmpdata, reps, client_list, hs_list, HS_PORT,
                      'localhost', client._env['socksport']))
             for _ in range(connection_count):
                 proxy = ('localhost', int(client._env['socksport']))
-                tt.add(chutney.Traffic.Source(tt, hs_bind_to, tmpdata,
-                                              proxy, reps))
+                tt.add_client(hs_bind_to, proxy)
+
     return hs_path_node_count