in src/sagemaker_xgboost_container/dmlc_patch/tracker.py [0:0]
def assign_rank(self, rank, wait_conn, tree_map, parent_map, ring_map):
logger.debug("Assigning rank: {}".format(rank))
self.rank = rank
nnset = set(tree_map[rank])
rprev, rnext = ring_map[rank]
self.sock.sendint(rank)
# send parent rank
self.sock.sendint(parent_map[rank])
logger.debug("Parent rank: {}".format(parent_map[rank]))
# send world size
self.sock.sendint(len(tree_map))
logger.debug("World view: {}".format(len(tree_map)))
self.sock.sendint(len(nnset))
# send the rprev and next link
for r in nnset:
self.sock.sendint(r)
logger.debug("Link: {}".format(r))
# send prev link
if rprev != -1 and rprev != rank:
nnset.add(rprev)
self.sock.sendint(rprev)
logger.debug("Rprev: {}".format(rprev))
else:
self.sock.sendint(-1)
# send next link
if rnext != -1 and rnext != rank:
nnset.add(rnext)
self.sock.sendint(rnext)
logger.debug("Rnext: {}".format(rnext))
else:
self.sock.sendint(-1)
while True:
ngood = self.sock.recvint()
goodset = set([])
for _ in range(ngood):
goodset.add(self.sock.recvint())
logger.debug("In goodset: {}".format(goodset))
assert goodset.issubset(nnset)
badset = nnset - goodset
conset = []
for r in badset:
if r in wait_conn:
conset.append(r)
logger.debug("In conset: {}".format(conset))
self.sock.sendint(len(conset))
self.sock.sendint(len(badset) - len(conset))
for r in conset:
self.sock.sendstr(wait_conn[r].host)
self.sock.sendint(wait_conn[r].port)
self.sock.sendint(r)
nerr = self.sock.recvint()
if nerr != 0:
continue
self.port = self.sock.recvint()
rmset = []
# all connection was successuly setup
for r in conset:
wait_conn[r].wait_accept -= 1
if wait_conn[r].wait_accept == 0:
rmset.append(r)
for r in rmset:
wait_conn.pop(r, None)
self.wait_accept = len(badset) - len(conset)
return rmset