in transform_binary_payload/src-payload-decoders/python/elsys.py [0:0]
def dict_from_payload(base64_input: str, fport: int = None):
decoded = base64.b64decode(base64_input)
if DEBUG_OUTPUT:
print(f"Input: {decoded.hex().upper()}")
# Output
result = {
}
# Iterate over the payload
i = 0
while i < len(decoded):
if decoded[i] == TYPE_TEMP: # Temperature
temp = (decoded[i + 1] << 8) | (decoded[i + 2])
temp = helpers.bin16dec(temp)
result['temperature'] = temp / 10
i += 3
elif decoded[i] == TYPE_RH: # Humidity
rh = (decoded[i + 1])
result['humidity'] = rh
i += 2
elif decoded[i] == TYPE_ACC: # Acceleration X,Y,Z
result['accX'] = helpers.bin8dec(decoded[i + 1])
result['accY'] = helpers.bin8dec(decoded[i + 2])
result['accZ'] = helpers.bin8dec(decoded[i + 3])
i += 4
elif decoded[i] == TYPE_LIGHT: # Light
result['light'] = (decoded[i + 1] << 8) | (decoded[i + 2])
i += 3
elif decoded[i] == TYPE_MOTION: # Motion (PIR)
result['motion'] = (decoded[i + 1])
i += 2
elif decoded[i] == TYPE_CO2: # CO2
result['co2'] = (decoded[i + 1] << 8) | (decoded[i + 2])
i += 3
elif decoded[i] == TYPE_VDD: # Int battery voltage
result['vdd'] = (decoded[i + 1] << 8) | (decoded[i + 2])
i += 3
elif decoded[i] == TYPE_ANALOG1: # Analog input 1
result['analog1'] = (decoded[i + 1] << 8) | (decoded[i + 2])
i += 3
elif decoded[i] == TYPE_GPS: # GPS lat & long
i += 1
result['gpsLat'] = (decoded[i + 0] | decoded[i + 1] << 8 | decoded[i + 2] << 16 |
(0xFF << 24 if decoded[i + 2] & 0x80 else 0x00)) / 10000
result['gpsLong'] = (decoded[i + 3] | decoded[i + 4] << 8 | decoded[i + 5] << 16 |
(0xFF << 24 if decoded[i + 5] & 0x80 else 0x00)) / 10000
i += 6
elif decoded[i] == TYPE_PULSE1: # Pulse input 1 rel
result['pulse1'] = (decoded[i + 1] << 8) | (decoded[i + 2])
i += 3
elif decoded[i] == TYPE_PULSE1_ABS: # Pulse input 1 abs
result['pulse1Abs'] = ((decoded[i + 1] << 24) | (decoded[i + 2] << 16) |
(decoded[i + 3] << 8) | (decoded[i + 4]))
i += 5
elif decoded[i] == TYPE_EXT_TEMP1: # Ext temp 1
temp = (decoded[i + 1] << 8) | (decoded[i + 2])
temp = helpers.bin16dec(temp)
result['extTemp1'] = temp / 10
i += 3
elif decoded[i] == TYPE_EXT_DIGITAL: # Ext dig input 1
result['extDigital'] = (decoded[i + 1])
i += 2
elif decoded[i] == TYPE_EXT_DISTANCE: # Ext distance
result['extDistance'] = (decoded[i + 1] << 8) | (decoded[i + 2])
i += 3
elif decoded[i] == TYPE_ACC_MOTION: # Motion (Acc mov)
result['accMotion'] = (decoded[i + 1])
i += 2
elif decoded[i] == TYPE_IR_TEMP: # IR temp
iTemp = (decoded[i + 1] << 8) | (decoded[i + 2])
iTemp = helpers.bin16dec(iTemp)
eTemp = (decoded[i + 3] << 8) | (decoded[i + 4])
eTemp = helpers.bin16dec(eTemp)
result['irTempInt'] = iTemp / 10
result['irTempExt'] = eTemp / 10
i += 5
elif decoded[i] == TYPE_OCCUPANCY: # Occupancy
result['occupancy'] = (decoded[i + 1])
i += 2
elif decoded[i] == TYPE_WATERLEAK: # Ext water leak
result['waterleak'] = (decoded[i + 1])
i += 2
elif decoded[i] == TYPE_GRIDEYE: # Grideye data
ref = decoded[i + 1]
i += 1
grideye = []
for j in range(64):
grideye[j] = ref + (decoded[1+i+j] / 10.0)
i += 65
result['grideye'] = grideye
elif decoded[i] == TYPE_PRESSURE: # Pressure
temp = ((decoded[i + 1] << 24) | (decoded[i + 2] << 16) |
(decoded[i + 3] << 8) | (decoded[i + 4]))
result['pressure'] = temp / 1000
i += 5
elif decoded[i] == TYPE_SOUND: # Sound
result['soundPeak'] = decoded[i + 1]
result['soundAvg'] = decoded[i + 2]
i += 3
elif decoded[i] == TYPE_PULSE2: # Pulse input 2 rel
result['pulse2'] = (decoded[i + 1] << 8) | (decoded[i + 2])
i += 3
elif decoded[i] == TYPE_PULSE2_ABS: # Pulse input 2 abs
result['pulse2Abs'] = ((decoded[i + 1] << 24) | (decoded[i + 2] << 16) |
(decoded[i + 3] << 8) | (decoded[i + 4]))
i += 5
elif decoded[i] == TYPE_ANALOG2: # Analog input 2
result['analog2'] = (decoded[i + 1] << 8) | (decoded[i + 2])
i += 3
elif decoded[i] == TYPE_EXT_TEMP2: # Ext temp 2
temp = (decoded[i + 1] << 8) | (decoded[i + 2])
temp = helpers.bin16dec(temp)
if 'extTemp2' in result:
if type(result['extTemp2']) is float:
result['extTemp2'] = [result['extTemp2']]
if type(result['extTemp2']) is list:
result['extTemp2'].append(temp / 10)
else:
result['extTemp2'] = temp / 10
i += 3
elif decoded[i] == TYPE_EXT_DIGITAL2: # Ext dig input 2
result['extDigital2'] = (decoded[i + 1])
i += 2
elif decoded[i] == TYPE_EXT_ANALOG_UV: # Ext analog uV
result['extAnalogUv'] = ((decoded[i + 1] << 24) | (decoded[i + 2] << 16) |
(decoded[i + 3] << 8) | (decoded[i + 4]))
i += 5
elif decoded[i] == TYPE_DEBUG: # Debug
result['debug'] = ((decoded[i + 1] << 24) | (decoded[i + 2] << 16) |
(decoded[i + 3] << 8) | (decoded[i + 4]))
i += 5
elif decoded[i] == TYPE_SETTINGS: # Sensor settings
i = len(decoded) # just ignore sensor settings packets
else: # something is wrong with the data
raise Exception(f"Data field type {hex(decoded[i])} not known.")
if DEBUG_OUTPUT:
print(f"Output: {json.dumps(result,indent=2)}")
return result