in skywalking/agent/protocol/grpc.py [0:0]
def report_snapshot(self, queue: Queue, block: bool = True):
start = None
def generator():
nonlocal start
while True:
try:
timeout = config.agent_queue_timeout # type: int
if not start: # make sure first time through queue is always checked
start = time()
else:
timeout -= int(time() - start)
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
return
snapshot = queue.get(block=block, timeout=timeout) # type: TracingThreadSnapshot
except Empty:
return
queue.task_done()
transform_snapshot = ThreadSnapshot(
taskId=str(snapshot.task_id),
traceSegmentId=str(snapshot.trace_segment_id),
time=int(snapshot.time),
sequence=int(snapshot.sequence),
stack=ThreadStack(codeSignatures=snapshot.stack_list)
)
yield transform_snapshot
try:
self.profile_channel.report(generator())
except grpc.RpcError:
self.on_error()
raise