in transform_binary_payload/src-payload-decoders/python/adeunis_ftd2.py [0:0]
def dict_from_payload(base64_input: str, fport: int = None):
""" Decodes a base64-encoded binary payload into JSON.
Parameters
----------
base64_input : str
Base64-encoded binary payload
fport: int
FPort as provided in the metadata. Please note the fport is optional and can have value "None", if not
provided by the LNS or invoking function.
If fport is None and binary decoder can not proceed because of that, it should should raise an exception.
Returns
-------
JSON object with key/value pairs of decoded attributes
"""
# Payload
# The size of the payload varies depending on the information that is send.
decoded = base64.b64decode(base64_input)
# Index for iterating over the payload bytes
byte_index = 0
# result dictionary
result = {}
# Printing the debug output
if DEBUG_OUTPUT:
print(f"Input: {decoded.hex().upper()}")
# The first byte shows the presence of information that is contained in the payload
# Byte 1 - Status
# True = Data is present in the payload
# False = Data is missing from the Payload
if len(decoded):
# getting the first byte
status_presence = decoded[byte_index]
status_presence_rssi_snr = status_presence & 0b00000001
if status_presence_rssi_snr == 0b1:
status_presence_rssi_snr = True
elif status_presence_rssi_snr == 0b0:
status_presence_rssi_snr = False
status_presence_battery_lvl = (status_presence & 0b00000010) >> 1
if status_presence_battery_lvl == 0b1:
status_presence_battery_lvl = True
elif status_presence_battery_lvl == 0b0:
status_presence_battery_lvl = False
status_presence_downlink = (status_presence & 0b00000100) >> 2
if status_presence_downlink == 0b1:
status_presence_downlink = True
elif status_presence_downlink == 0b0:
status_presence_downlink = False
status_presence_uplink = (status_presence & 0b00001000) >> 3
if status_presence_uplink == 0b1:
status_presence_uplink = True
elif status_presence_uplink == 0b0:
status_presence_uplink = False
status_presence_gps = (status_presence & 0b00010000) >> 4
if status_presence_gps == 0b1:
status_presence_gps = True
elif status_presence_gps == 0b0:
status_presence_gps = False
status_presence_trigger_button = (status_presence & 0b00100000) >> 5
if status_presence_trigger_button == 0b001:
status_presence_trigger_button = True
elif status_presence_trigger_button == 0b000:
status_presence_trigger_button = False
status_presence_trigger_acc = (status_presence & 0b01000000) >> 6
if status_presence_trigger_acc == 0b1:
status_presence_trigger_acc = True
elif status_presence_trigger_acc == 0b0:
status_presence_trigger_acc = False
status_presence_temp = (status_presence & 0b10000000) >> 7
if status_presence_temp == 0b1:
status_presence_temp = True
elif status_presence_temp == 0b0:
status_presence_temp = False
# adding the trigger action to the result
if status_presence_trigger_button:
result["trigger"] = "Button"
elif status_presence_trigger_acc:
result["trigger"] = "Accelerometer"
byte_index += 1
# Temperature - one Byte
if status_presence_temp and len(decoded) >= byte_index + 1:
result["temperature"] = helpers.bin8dec(decoded[byte_index])
byte_index += 1
# Latitude - 4 Bytes
if status_presence_gps and len(decoded) >= byte_index + 9:
lat_bcd_deg_tenth = (decoded[byte_index] & 0b11110000) >> 4
lat_bcd_deg_whole = decoded[byte_index] & 0b00001111
byte_index += 1
lat_bcd_min_tenth = (decoded[byte_index] & 0b11110000) >> 4
lat_bcd_min_whole = decoded[byte_index] & 0b00001111
byte_index += 1
lat_bcd_dec_tenth = (decoded[byte_index] & 0b11110000) >> 4
lat_bcd_dec_hundredth = decoded[byte_index] & 0b00001111
byte_index += 1
lat_bcd_dec_thousandth = (decoded[byte_index] & 0b11110000) >> 4
# lat_not_used = decoded[byte_index] & 0b00001110
# when the bit on the position 0 is 1 variable gets the value -1 else 1
lat_hemisphere = decoded[byte_index] & 0b00000001
if lat_hemisphere == 0b1:
lat_hemisphere = -1
elif lat_hemisphere == 0b0:
lat_hemisphere = 1
# format the latitude values in Decimal Degrees
result["latitude"] = lat_hemisphere * (
lat_bcd_deg_tenth * 10 + lat_bcd_deg_whole + (
lat_bcd_min_tenth * 10 + lat_bcd_min_whole + lat_bcd_dec_tenth * 0.1 + lat_bcd_dec_hundredth * 0.01
+ lat_bcd_dec_thousandth * 0.001) / 60)
byte_index += 1
# Longitude - four Bytes
long_bcd_deg_hundredth = (decoded[byte_index] & 0b11110000) >> 4
long_bcd_deg_tenth = decoded[byte_index] & 0b00001111
byte_index += 1
long_bcd_deg_whole = (decoded[byte_index] & 0b11110000) >> 4
long_bcd_min_tenth = decoded[byte_index] & 0b00001111
byte_index += 1
long_bcd_min_whole = (decoded[byte_index] & 0b11110000) >> 4
long_bcd_dec_tenth = decoded[byte_index] & 0b00001111
byte_index += 1
long_bcd_dec_hundredth = (decoded[byte_index] & 0b11110000) >> 4
# long_not_used = decoded[byte_index] & 0b00001110
# when the bit on the position 0 is 1 variable gets the value -1 else 1
long_hemisphere = decoded[byte_index] & 0b00000001
if long_hemisphere == 0b1:
long_hemisphere = -1
elif long_hemisphere == 0b0:
long_hemisphere = 1
# format the latitude values in Decimal Degrees
result["longitude"] = long_hemisphere * (
long_bcd_deg_hundredth * 100 + long_bcd_deg_tenth * 10 + long_bcd_deg_whole + (
long_bcd_min_tenth * 10 + long_bcd_min_whole + long_bcd_dec_tenth * 0.1 + long_bcd_dec_hundredth *
0.01) / 60)
byte_index += 1
# GPS quality - one Byte
result["gps_quality"] = {
"reception_scale": "Unknown",
"number_satellites": decoded[byte_index] & 0b00001111
}
reception_scale = (decoded[byte_index] & 0b11110000) >> 4
if reception_scale == 1:
result["gps_quality"]["reception_scale"] = "Good"
elif reception_scale == 2:
result["gps_quality"]["reception_scale"] = "Average"
elif reception_scale == 3:
result["gps_quality"]["reception_scale"] = "Poor"
byte_index += 1
# Uplink counter - one Byte
if status_presence_uplink and len(decoded) >= byte_index + 1:
result["uplink_frame_counter"] = decoded[byte_index]
byte_index += 1
# Downlink counter - one Byte
if status_presence_downlink and len(decoded) >= byte_index + 1:
result["downlink_frame_counter"] = decoded[byte_index]
byte_index += 1
# Battery level - two Bytes
if status_presence_battery_lvl and len(decoded) >= byte_index + 2:
result["battery_lvl"] = ((decoded[byte_index] << 8) | decoded[byte_index + 1])
byte_index += 2
# RSSI - one Byte
# SNR - one Byte
if status_presence_rssi_snr and len(decoded) >= byte_index + 2:
result["rssi/snr"] = {
"rssi_dbm": decoded[byte_index],
"snr_db": helpers.bin8dec(decoded[byte_index + 1])
}
if DEBUG_OUTPUT:
print(f"Output: {json.dumps(result, indent=2)}")
return result