def get_merged_data()

in chatlearn/runtime/executor.py [0:0]


    def get_merged_data(self, queues, encode=True, micro_batch_index=None, model_node=None, trainable=False):
        mb0 = None
        if micro_batch_index is not None:
            mb0 = micro_batch_index
        data_list = [None] * len(queues)
        merged_buffer = self.merged_buffer[model_node]
        for index, queue in enumerate(queues):
            if index not in merged_buffer:
                merged_buffer[index] = {}
            if mb0 in merged_buffer[index]:
                data_list[index] = merged_buffer[index].pop(mb0)
                continue
            while True:
                flag = False
                while queue.qsize() == 0:
                    if mb0 in merged_buffer[index]:
                        data_list[index] = merged_buffer[index].pop(mb0)
                        flag = True
                        break
                if flag:
                    break
                encoded_data = queue.get()
                mb, data = decode_data(encoded_data)
                if mb0 is None:
                    mb0 = mb
                if isinstance(data, list) and not trainable:
                    data = data[-1]
                if mb == mb0:
                    data_list[index] = data
                    break
                merged_buffer[index][mb] = data
        if encode:
            return encode_data(mb0, data_list)
        return data_list