in transform_binary_payload/src-payload-decoders/python/adeunis_dc_v2.py [0:0]
def dict_from_payload(base64_input: str, fport: int = None):
""" Decodes a base64-encoded binary payload into JSON.
Parameters
----------
base64_input : str
Base64-encoded binary payload
fport: int
FPort as provided in the metadata. Please note the fport is optional and can have value "None", if not
provided by the LNS or invoking function.
If fport is None and binary decoder can not proceed because of that, it should should raise an exception.
Returns
-------
JSON object with key/value pairs of decoded attributes
"""
# Payload
# The size of the payload varies depending on the information that is send.
decoded = base64.b64decode(base64_input)
# Index for iterating over the payload bytes
byte_index = 0
# result dictionary
result = {}
# Printing the debug output
if DEBUG_OUTPUT:
print(f"Input: {decoded.hex().upper()}")
# The first byte shows the Frame code
# Byte 1 - Frame code
# 0x10 Product configuration, 0x20 Network configuration, 0x2F Set state or pulse ACK, 0x30 Keep alive frame
# 0x40 Data frame, 0x59 Time counting frame, 0x31 Response to Get register request,
# 0x33 Response to Set register request
if len(decoded):
# getting the first byte
frame_code = decoded[0]
# getting the frame type
if frame_code == 0x10:
result["type"] = "0x10 Dry Contacts 2 configuration"
elif frame_code == 0x20:
result["type"] = "0x20 Configuration"
elif frame_code == 0x2F:
result["type"] = "0x2f Downlink ack"
elif frame_code == 0x30:
result["type"] = "0x30 Dry Contacts 2 keep alive"
elif frame_code == 0x33:
result["type"] = "0x33 Set register status"
elif frame_code == 0x40:
result["type"] = "0x40 Dry Contacts 2 data"
elif frame_code == 0x59:
result["type"] = "0x59 Dry Contacts 2 time counting data"
else:
result["type"] = "Invalid frame code"
# Status - one Byte
status = decoded[1]
# bit0 - Configuration Done
config = status & 0b00000001
if config & 0b1:
config = True
else:
config = False
# bit1 - Low Battery
low_bat = (status & 0b00000010) >> 1
if low_bat & 0b1:
low_bat = True
else:
low_bat = False
# bit2 - Timestamp
timestamp = (status & 0b00000100) >> 2
if timestamp & 0b1:
timestamp = True
else:
timestamp = False
# bit3 - AppFlag1
app_flag_1 = (status & 0b00001000) >> 3
if app_flag_1 & 0b1:
app_flag_1 = True
else:
app_flag_1 = False
# bit4 - AppFlag2
app_flag_2 = (status & 0b00010000) >> 4
if app_flag_2 & 0b1:
app_flag_2 = True
else:
app_flag_2 = False
# bit5-6 - Frame Counter
frame_counter = (status & 0b11100000) >> 5
# check if there is no error
no_error = not (config or low_bat or timestamp or app_flag_1 or app_flag_2)
# building the result
result["status"] = {
"frameCounter": frame_counter,
"noError": no_error,
"lowBattery": low_bat,
"configurationDone": config,
"timestamp": timestamp
}
if frame_code == 0x40:
# channel a - byte 2&3
channel_a = ((decoded[2] << 8) | decoded[3])
result["channelA"] = {
"value": channel_a
}
# channel b - byte 4&5
channel_b = ((decoded[4] << 8) | decoded[5])
result["channelB"] = {
"value": channel_b
}
# channel c - byte 6&7
channel_c = ((decoded[6] << 8) | decoded[7])
result["channelC"] = {
"value": channel_c
}
# channel d - byte 8&9
channel_d = ((decoded[8] << 8) | decoded[9])
result["channelD"] = {
"value": channel_d
}
# details - byte 10
# Define precisely the input/output state
details = decoded[10]
# channel a
channel_a_current_state = details & 0b00000001
if channel_a_current_state & 0b1:
result["channelA"]["currentState"] = True
else:
result["channelA"]["currentState"] = False
channel_a_previous_frame = (details & 0b00000010) >> 1
if channel_a_previous_frame & 0b1:
result["channelA"]["previousFrameState"] = True
else:
result["channelA"]["previousFrameState"] = False
# channel b
channel_b_current_state = (details & 0b00000100) >> 2
if channel_b_current_state & 0b1:
result["channelB"]["currentState"] = True
else:
result["channelB"]["currentState"] = False
channel_b_previous_frame = (details & 0b00001000) >> 3
if channel_b_previous_frame & 0b1:
result["channelB"]["previousFrameState"] = True
else:
result["channelB"]["previousFrameState"] = False
# channel c
channel_c_current_state = (details & 0b00010000) >> 4
if channel_c_current_state & 0b1:
result["channelC"]["currentState"] = True
else:
result["channelC"]["currentState"] = False
channel_c_previous_frame = (details & 0b00100000) >> 5
if channel_c_previous_frame & 0b1:
result["channelC"]["previousFrameState"] = True
else:
result["channelC"]["previousFrameState"] = False
# channel d
channel_d_current_state = (details & 0b01000000) >> 6
if channel_d_current_state & 0b1:
result["channelD"]["currentState"] = True
else:
result["channelD"]["currentState"] = False
channel_d_previous_frame = (details & 0b10000000) >> 7
if channel_d_previous_frame & 0b1:
result["channelD"]["previousFrameState"] = True
else:
result["channelD"]["previousFrameState"] = False
# timestamp in EPOCH 2013
if timestamp and len(decoded) > 10:
timestamp_data = ((decoded[11] << 24) | decoded[12] << 16 | decoded[13] << 8 | decoded[14])
result["timestamp"] = timestamp_data
return result