in tools/scraper/amqp_detail.py [0:0]
def __init__(self, _router, _common):
self.rtr = _router
self.comn = _common
# conn_details - AMQP analysis
# key= connection id '1', '2'
# val= ConnectionDetails
# for each connection, for each session, for each link:
# what happened
self.conn_details = {}
for conn in self.rtr.conn_list:
id = self.rtr.conn_id(conn)
self.conn_details[id] = ConnectionDetail(id, self.rtr, conn)
conn_details = self.conn_details[id]
conn_frames = self.rtr.conn_to_frame_map[id]
for plf in conn_frames:
pname = plf.data.name
if plf.data.amqp_error:
conn_details.counts.errors += 1
if pname in ['', 'open', 'close']:
conn_details.unaccounted_frame_list.append(plf)
continue
# session required
channel = plf.data.channel # Assume in/out channels are the same for the time being
sess_details = conn_details.FindSession(channel)
if sess_details is None:
new_id = len(conn_details.session_list)
sess_details = SessionDetail(new_id, conn_details, conn_details.GetSeqNo(), plf.datetime)
conn_details.session_list.append(sess_details)
conn_details.EndChannel(channel)
conn_details.chan_map[channel] = sess_details
sess_details.direction = plf.data.direction
sess_details.channel = channel
if plf.data.amqp_error:
sess_details.counts.errors += 1
if pname in ['begin', 'end', 'disposition']:
sess_details.session_frame_list.append(plf) # Accumulate to current session
if pname == 'end':
# end is closing this session
if sess_details.half_closed:
conn_details.EndChannel(plf.data.channel)
else:
sess_details.half_closed = True
else:
pass # begin handled above; disposition needs no action
elif pname in ['attach']:
handle = plf.data.handle # proton local handle
link_name = plf.data.link_short_name
link_name_unambiguous = link_name + "_" + str(handle)
error_was = plf.data.amqp_error
nl = sess_details.FindLinkByName(link_name, link_name_unambiguous, plf)
# if finding an ambiguous link name generated an error then propagate to session/connection
if not error_was and plf.data.amqp_error:
conn_details.counts.errors += 1
sess_details.counts.errors += 1
if nl is None:
# Creating a new link from scratch resulting in a half attached link pair
new_id = len(sess_details.link_list)
nl = LinkDetail(new_id, sess_details, sess_details.GetSeqNo(), link_name, plf.datetime)
sess_details.link_list.append(nl)
sess_details.link_name_to_detail_map[link_name_unambiguous] = nl
sess_details.link_name_conflict_map[link_name] = nl
nl.display_name = plf.data.link_short_name_popup
nl.direction = plf.data.direction
nl.is_receiver = plf.data.role == "receiver"
nl.first_address = plf.data.source if nl.is_receiver else plf.data.target
if plf.data.amqp_error:
nl.counts.errors += 1
if plf.data.direction_is_in():
# peer is creating link
nl.input_handle = handle
sess_details.DetachInputHandle(handle)
sess_details.input_handle_link_map[handle] = nl
else:
# local is creating link
nl.output_handle = handle
sess_details.DetachOutputHandle(handle)
sess_details.output_handle_link_map[handle] = nl
if plf.data.is_receiver:
nl.rcv_settle_mode = plf.data.rcv_settle_mode
nl.receiver_source_address = plf.data.source
nl.receiver_class = plf.data.link_class
else:
nl.snd_settle_mode = plf.data.snd_settle_mode
nl.sender_target_address = plf.data.target
nl.sender_class = plf.data.link_class
nl.frame_list.append(plf)
elif pname in ['detach']:
ns = conn_details.FindSession(channel)
if ns is None:
conn_details.unaccounted_frame_list.append(plf)
continue
handle = plf.data.handle
nl = ns.FindLinkByHandle(handle, plf.data.direction_is_in())
ns.DetachHandle(handle, plf.data.direction_is_in())
if nl is None:
ns.session_frame_list.append(plf)
else:
if plf.data.amqp_error:
nl.counts.errors += 1
nl.frame_list.append(plf)
elif pname in ['transfer', 'flow']:
ns = conn_details.FindSession(channel)
if ns is None:
conn_details.unaccounted_frame_list.append(plf)
plf.no_parent_link = True
continue
handle = plf.data.handle
nl = ns.FindLinkByHandle(handle, plf.data.direction_is_in())
if nl is None:
ns.session_frame_list.append(plf)
plf.no_parent_link = True
else:
if plf.data.amqp_error:
nl.counts.errors += 1
nl.frame_list.append(plf)
# identify and index dispositions
for conn in self.rtr.conn_list:
id = self.rtr.conn_id(conn)
conn_detail = self.conn_details[id]
for sess in conn_detail.session_list:
# for each disposition add state to disposition_map
for splf in sess.session_frame_list:
if splf.data.name == "disposition":
if splf.data.direction == "<-":
sdispmap = sess.rx_rcvr_disposition_map if splf.data.is_receiver else sess.rx_sndr_disposition_map
else:
sdispmap = sess.tx_rcvr_disposition_map if splf.data.is_receiver else sess.tx_sndr_disposition_map
for sdid in range(int(splf.data.first, 0), (int(splf.data.last, 0) + 1)):
did = str(sdid)
if did in sdispmap:
old_splf = sdispmap[did]
if "state=@received" in old_splf.line:
# Existing disposition is non-terminal.
# Don't complain when it is overwritten by another non-terminal
# or by a terminal disposition.
pass
else:
# Current state is terminal disposition. Complain when overwritten.
sys.stderr.write("ERROR: Delivery ID collision in disposition map. connid:%s, \n" %
(splf.data.conn_id))
sys.stderr.write(" old: %s, %s\n" % (old_splf.fid, old_splf.line))
sys.stderr.write(" new: %s, %s\n" % (splf.fid, splf.line))
sdispmap[did] = splf