def dict_from_payload()

in transform_binary_payload/src-payload-decoders/python/adeunis_dc_v2.py [0:0]


def dict_from_payload(base64_input: str, fport: int = None):
    """ Decodes a base64-encoded binary payload into JSON.
        Parameters
        ----------
        base64_input : str
            Base64-encoded binary payload
        fport: int
            FPort as provided in the metadata. Please note the fport is optional and can have value "None", if not
            provided by the LNS or invoking function.
            If  fport is None and binary decoder can not proceed because of that, it should should raise an exception.
        Returns
        -------
        JSON object with key/value pairs of decoded attributes
    """
    # Payload
    # The size of the payload varies depending on the information that is send.
    decoded = base64.b64decode(base64_input)
    # Index for iterating over the payload bytes
    byte_index = 0
    # result dictionary
    result = {}

    # Printing the debug output
    if DEBUG_OUTPUT:
        print(f"Input: {decoded.hex().upper()}")

    # The first byte shows the Frame code
    # Byte 1 - Frame code
    # 0x10 Product configuration, 0x20 Network configuration, 0x2F Set state or pulse ACK, 0x30 Keep alive frame
    # 0x40 Data frame, 0x59 Time counting frame, 0x31 Response to Get register request,
    # 0x33 Response to Set register request
    if len(decoded):
        # getting the first byte
        frame_code = decoded[0]

        # getting the frame type
        if frame_code == 0x10:
            result["type"] = "0x10 Dry Contacts 2 configuration"
        elif frame_code == 0x20:
            result["type"] = "0x20 Configuration"
        elif frame_code == 0x2F:
            result["type"] = "0x2f Downlink ack"
        elif frame_code == 0x30:
            result["type"] = "0x30 Dry Contacts 2 keep alive"
        elif frame_code == 0x33:
            result["type"] = "0x33 Set register status"
        elif frame_code == 0x40:
            result["type"] = "0x40 Dry Contacts 2 data"
        elif frame_code == 0x59:
            result["type"] = "0x59 Dry Contacts 2 time counting data"
        else:
            result["type"] = "Invalid frame code"

        # Status - one Byte
        status = decoded[1]

        # bit0 - Configuration Done
        config = status & 0b00000001
        if config & 0b1:
            config = True
        else:
            config = False

        # bit1 - Low Battery
        low_bat = (status & 0b00000010) >> 1
        if low_bat & 0b1:
            low_bat = True
        else:
            low_bat = False

        # bit2 - Timestamp
        timestamp = (status & 0b00000100) >> 2
        if timestamp & 0b1:
            timestamp = True
        else:
            timestamp = False

        # bit3 - AppFlag1
        app_flag_1 = (status & 0b00001000) >> 3
        if app_flag_1 & 0b1:
            app_flag_1 = True
        else:
            app_flag_1 = False

        # bit4 - AppFlag2
        app_flag_2 = (status & 0b00010000) >> 4
        if app_flag_2 & 0b1:
            app_flag_2 = True
        else:
            app_flag_2 = False

        # bit5-6 - Frame Counter
        frame_counter = (status & 0b11100000) >> 5

        # check if there is no error
        no_error = not (config or low_bat or timestamp or app_flag_1 or app_flag_2)

        # building the result
        result["status"] = {
            "frameCounter": frame_counter,
            "noError": no_error,
            "lowBattery": low_bat,
            "configurationDone": config,
            "timestamp": timestamp
        }

        if frame_code == 0x40:
            # channel a - byte 2&3
            channel_a = ((decoded[2] << 8) | decoded[3])
            result["channelA"] = {
                "value": channel_a
            }

            # channel b - byte 4&5
            channel_b = ((decoded[4] << 8) | decoded[5])
            result["channelB"] = {
                "value": channel_b
            }

            # channel c - byte 6&7
            channel_c = ((decoded[6] << 8) | decoded[7])
            result["channelC"] = {
                "value": channel_c
            }

            # channel d - byte 8&9
            channel_d = ((decoded[8] << 8) | decoded[9])
            result["channelD"] = {
                "value": channel_d
            }

            # details - byte 10
            # Define precisely the input/output state
            details = decoded[10]

            # channel a
            channel_a_current_state = details & 0b00000001
            if channel_a_current_state & 0b1:
                result["channelA"]["currentState"] = True
            else:
                result["channelA"]["currentState"] = False

            channel_a_previous_frame = (details & 0b00000010) >> 1
            if channel_a_previous_frame & 0b1:
                result["channelA"]["previousFrameState"] = True
            else:
                result["channelA"]["previousFrameState"] = False

            # channel b
            channel_b_current_state = (details & 0b00000100) >> 2
            if channel_b_current_state & 0b1:
                result["channelB"]["currentState"] = True
            else:
                result["channelB"]["currentState"] = False

            channel_b_previous_frame = (details & 0b00001000) >> 3
            if channel_b_previous_frame & 0b1:
                result["channelB"]["previousFrameState"] = True
            else:
                result["channelB"]["previousFrameState"] = False

            # channel c
            channel_c_current_state = (details & 0b00010000) >> 4
            if channel_c_current_state & 0b1:
                result["channelC"]["currentState"] = True
            else:
                result["channelC"]["currentState"] = False

            channel_c_previous_frame = (details & 0b00100000) >> 5
            if channel_c_previous_frame & 0b1:
                result["channelC"]["previousFrameState"] = True
            else:
                result["channelC"]["previousFrameState"] = False

            # channel d
            channel_d_current_state = (details & 0b01000000) >> 6
            if channel_d_current_state & 0b1:
                result["channelD"]["currentState"] = True
            else:
                result["channelD"]["currentState"] = False

            channel_d_previous_frame = (details & 0b10000000) >> 7
            if channel_d_previous_frame & 0b1:
                result["channelD"]["previousFrameState"] = True
            else:
                result["channelD"]["previousFrameState"] = False

            # timestamp in EPOCH 2013
            if timestamp and len(decoded) > 10:
                timestamp_data = ((decoded[11] << 24) | decoded[12] << 16 | decoded[13] << 8 | decoded[14])
                result["timestamp"] = timestamp_data

        return result