in chatlearn/runtime/executor.py [0:0]
def get_merged_data(self, queues, encode=True, micro_batch_index=None, model_node=None, trainable=False):
mb0 = None
if micro_batch_index is not None:
mb0 = micro_batch_index
data_list = [None] * len(queues)
merged_buffer = self.merged_buffer[model_node]
for index, queue in enumerate(queues):
if index not in merged_buffer:
merged_buffer[index] = {}
if mb0 in merged_buffer[index]:
data_list[index] = merged_buffer[index].pop(mb0)
continue
while True:
flag = False
while queue.qsize() == 0:
if mb0 in merged_buffer[index]:
data_list[index] = merged_buffer[index].pop(mb0)
flag = True
break
if flag:
break
encoded_data = queue.get()
mb, data = decode_data(encoded_data)
if mb0 is None:
mb0 = mb
if isinstance(data, list) and not trainable:
data = data[-1]
if mb == mb0:
data_list[index] = data
break
merged_buffer[index][mb] = data
if encode:
return encode_data(mb0, data_list)
return data_list