|
@@ -157,7 +157,12 @@ class Sink(Peer):
|
|
|
if self.repetitions == 0 or len(self.tt.data) == 0:
|
|
|
debug("no verification required - no data")
|
|
|
return 0
|
|
|
- self.inbuf += self.s.recv(len(data) - len(self.inbuf))
|
|
|
+ inp = self.s.recv(len(data) - len(self.inbuf))
|
|
|
+ debug("Verify: received %d bytes"% len(inp))
|
|
|
+ if len(inp) == 0:
|
|
|
+ debug("EOF on fd %s" % self.fd())
|
|
|
+ return -1
|
|
|
+ self.inbuf += inp
|
|
|
debug("successfully received (bytes=%d)" % len(self.inbuf))
|
|
|
while len(self.inbuf) >= len(data):
|
|
|
assert(len(self.inbuf) <= len(data) or self.repetitions > 1)
|
|
@@ -213,6 +218,7 @@ class Source(Peer):
|
|
|
self.state = self.CONNECTING
|
|
|
dest = self.proxy or self.dest
|
|
|
try:
|
|
|
+ debug("socket %d connecting to %r..."%(self.fd(),dest))
|
|
|
self.s.connect(dest)
|
|
|
except socket.error as e:
|
|
|
if e[0] != errno.EINPROGRESS:
|
|
@@ -224,7 +230,12 @@ class Source(Peer):
|
|
|
>0 if more data needs to be read or written
|
|
|
"""
|
|
|
if self.state == self.CONNECTING_THROUGH_PROXY:
|
|
|
- self.inbuf += self.s.recv(8 - len(self.inbuf))
|
|
|
+ inp = self.s.recv(8 - len(self.inbuf))
|
|
|
+ debug("-- connecting through proxy, got %d bytes"%len(inp))
|
|
|
+ if len(inp) == 0:
|
|
|
+ debug("EOF on fd %d"%self.fd())
|
|
|
+ return -1
|
|
|
+ self.inbuf += inp
|
|
|
if len(self.inbuf) == 8:
|
|
|
if ord(self.inbuf[0]) == 0 and ord(self.inbuf[1]) == 0x5a:
|
|
|
debug("proxy handshake successful (fd=%d)" % self.fd())
|
|
@@ -248,8 +259,15 @@ class Source(Peer):
|
|
|
return self.want_to_write() # Keep us around for writing if needed
|
|
|
|
|
|
def want_to_write(self):
|
|
|
- return (self.state == self.CONNECTING or len(self.outbuf) > 0 or
|
|
|
- (self.repetitions > 0 and len(self.data) > 0))
|
|
|
+ if self.state == self.CONNECTING:
|
|
|
+ return True
|
|
|
+ if len(self.outbuf) > 0:
|
|
|
+ return True
|
|
|
+ if (self.state == self.CONNECTED and
|
|
|
+ self.repetitions > 0 and
|
|
|
+ len(self.data) > 0):
|
|
|
+ return True
|
|
|
+ return False
|
|
|
|
|
|
def on_writable(self):
|
|
|
"""Invoked when the socket becomes writable.
|
|
@@ -289,7 +307,7 @@ class Source(Peer):
|
|
|
debug("successfully sent (bytes=%d)" % n)
|
|
|
self._sent_no_bytes = 0
|
|
|
else:
|
|
|
- debug("BUG: sent no bytes")
|
|
|
+ debug("BUG: sent no bytes (out of %d; state is %s)"% (len(self.outbuf), self.state))
|
|
|
self._sent_no_bytes += 1
|
|
|
# We can't retry too fast, otherwise clients burn all their HSDirs
|
|
|
if self._sent_no_bytes >= 2:
|
|
@@ -365,6 +383,7 @@ class TrafficTester():
|
|
|
# debug("rset %s wset %s" % (rset, wset))
|
|
|
sets = select.select(rset, wset, [], 1)
|
|
|
if all(len(s) == 0 for s in sets):
|
|
|
+ debug("Decrementing timeout.")
|
|
|
self.timeout -= 1
|
|
|
continue
|
|
|
|
|
@@ -374,6 +393,7 @@ class TrafficTester():
|
|
|
continue
|
|
|
p = self.peers[fd]
|
|
|
n = p.on_readable()
|
|
|
+ debug("On read, fd %d for %s said %d"%(fd, p, n))
|
|
|
if n > 0:
|
|
|
# debug("need %d more octets from fd %d" % (n, fd))
|
|
|
pass
|
|
@@ -381,6 +401,7 @@ class TrafficTester():
|
|
|
self.tests.success()
|
|
|
self.remove(p)
|
|
|
else: # Failure.
|
|
|
+ debug("Got a failure reading fd %d for %s" % (fd,p))
|
|
|
self.tests.failure()
|
|
|
if p.is_sink():
|
|
|
print("verification failed!")
|
|
@@ -390,9 +411,11 @@ class TrafficTester():
|
|
|
p = self.peers.get(fd)
|
|
|
if p is not None: # Might have been removed above.
|
|
|
n = p.on_writable()
|
|
|
+ debug("On write, fd %d said %d"%(fd, n))
|
|
|
if n == 0:
|
|
|
self.remove(p)
|
|
|
elif n < 0:
|
|
|
+ debug("Got a failure writing fd %d for %s" % (fd,p))
|
|
|
self.tests.failure()
|
|
|
self.remove(p)
|
|
|
|
|
@@ -406,6 +429,8 @@ class TrafficTester():
|
|
|
if not debug_flag:
|
|
|
sys.stdout.write('\n')
|
|
|
sys.stdout.flush()
|
|
|
+ debug("Done with run(); all_done == %s and failure_count == %s"
|
|
|
+ %(self.tests.all_done(), self.tests.failure_count()))
|
|
|
return self.tests.all_done() and self.tests.failure_count() == 0
|
|
|
|
|
|
|