in tools/tftp_tester.py [0:0]
def send_and_expect(self, packet, expect, cur):
retries = 0
while retries < self.retries:
begin = time.time()
if cur in self.failsend: # pretend we sent a packet which was lost
self.failsend.remove(cur)
else:
self.send(packet)
try:
answer, sender_addr = self.sock.recvfrom(self.blksize + 4)
self.port = sender_addr[1]
if self.verbose:
self.spinner.show()
num = get_packet_num(answer)
# is this the last packet?
is_last = (
get_packet_type(answer) == TFTP.DATA
and len(get_packet_data(answer)) < self.actual_blksize
)
# replace -1 with the actual packet number in failreceive
# this allows us to use the same construction for all packets
if is_last and (-1 in self.failreceive):
self.failreceive[self.failreceive.index(-1)] = num
# pretend we didn't receive any message
if num in self.failreceive:
self.failreceive.remove(num)
delta = time.time() - begin
time.sleep(self.timeout - delta)
raise socket.timeout()
# if it's the next DATA or an OACK, we're good
if get_packet_type(answer) == expect:
if (expect == TFTP.DATA and num == cur + 1) or (
expect == TFTP.OACK
):
break
elif get_packet_type(answer) == TFTP.ERROR:
raise TftpException(answer[4:-1].decode("ascii"))
else:
print("\nUnexpected packet received. Ignoring")
except socket.timeout:
retries += 1
return answer