send_to_pubsub

in abap-sdk/ZGOOG_SDK_UTILITIES/zgoog_bq_cdc_with_pubsub/src/zgoog_cl_pubsub_to_bq.clas.abap [0:0]


  METHOD send_to_pubsub.

    " The table below shows the list of supported BQ Data types and
    " how the data in IATB IT_DATA should be formatted

    " IMPORTANT - Note special requirement for DATE, TIME and TIMESTAMP fields
    " Data must be already in BigQuery format in the itab, different from SAP data types

    "------------------------------------------------------------------------------------
    " BigQuery Data Type | Itab Field Type (format)       | Sample Values               |
    "------------------------------------------------------------------------------------
    " INTEGER            | INT1, INT2, INT4, INT8         | 0                           |
    "------------------------------------------------------------------------------------
    " FLOAT              | FLTP                           | 1.2                         |
    "-----------------------------------------------------------------------------------
    " NUMERIC            | CURR, DEC, QUAN, DECFLOAT16    | 300.45                      |
    "------------------------------------------------------------------------------------
    " STRING             | NUMC, RAW, CHAR, STRING        | This is a string            |
    "                    | RAWSTRING, LRAW                |                             |
    "------------------------------------------------------------------------------------
    " DATE               | CHAR, STRING (YYYY-MM-DD)      | 2024-04-01                  |
    "------------------------------------------------------------------------------------
    " TIME               | CHAR, STRING (HH:MM:SS)        | 23:10:09                    |
    "------------------------------------------------------------------------------------
    " TIMESTAMP          | CHAR, STRING                   |                             |
    "                    | (YYYY-MM-DDTHH:MM:SS.µµµµµµZ)  | 2024-09-14T11:00:00.123456Z |
    "------------------------------------------------------------------------------------

    IF gs_config-datasource <> iv_datasource.

      READ TABLE gt_config INTO gs_config
           WITH TABLE KEY datasource = iv_datasource.
      IF sy-subrc IS NOT INITIAL.
        ev_ret_code = 461.
        ev_err_text = 'Config not found in table ZGOOG_PUBSUB_BQ'.
        RETURN.
      ENDIF.

      DATA: lo_root TYPE REF TO cx_root.

      TRY.
          CREATE OBJECT go_pubsub
            EXPORTING
              iv_key_name = gs_config-keyname.

        CATCH cx_root INTO lo_root.
          ev_ret_code = 461.
          ev_err_text = lo_root->get_text( ).
          RETURN.
      ENDTRY.

    ENDIF.

    IF go_pubsub IS INITIAL.
      RETURN.
    ENDIF.

    DATA:
      ls_input_sap   TYPE /goog/cl_pubsub_v1=>ty_023,
      ls_message_sap TYPE /goog/cl_pubsub_v1=>ty_025,
      ls_input_bq    TYPE /goog/cl_pubsub_v1=>ty_023,
      ls_message_bq  TYPE /goog/cl_pubsub_v1=>ty_025.

    FIELD-SYMBOLS: <ls_row> TYPE any.

    DATA: lv_json_obj_sap TYPE string.

    DATA: lv_timestamp TYPE timestampl.
    GET TIME STAMP FIELD lv_timestamp.

    DATA:  lv_sequence_number TYPE string.
    lv_sequence_number = lv_timestamp.

    REPLACE ALL OCCURRENCES OF '.' IN  lv_sequence_number WITH space.
    lv_sequence_number = lv_sequence_number(19).
    CONDENSE lv_sequence_number NO-GAPS.

    LOOP AT it_data ASSIGNING <ls_row>.
      lv_json_obj_sap = /ui2/cl_json=>serialize( data = <ls_row>
                         pretty_name = /ui2/cl_json=>pretty_mode-low_case ).

      IF iv_set_cdc_fields IS NOT INITIAL.
        IF iv_cdc_field IS INITIAL AND
           iv_cdc_del_value IS INITIAL.
          ASSIGN 'UPSERT' TO FIELD-SYMBOL(<lv_change_type>).
        ELSE.
          ASSIGN COMPONENT iv_cdc_field OF STRUCTURE <ls_row>
          TO FIELD-SYMBOL(<lv_cdc_field>).

          IF <lv_cdc_field> = iv_cdc_del_value.
            ASSIGN 'DELETE' TO <lv_change_type>.
          ELSE.
            ASSIGN 'UPSERT' TO <lv_change_type>.
          ENDIF.
        ENDIF.

        IF iv_change_sequence_number_fld IS NOT INITIAL.
          ASSIGN COMPONENT iv_change_sequence_number_fld OF STRUCTURE <ls_row>
          TO FIELD-SYMBOL(<lv_change_sequence_number>).
        ELSE.
          ASSIGN lv_sequence_number TO <lv_change_sequence_number>.
        ENDIF.


        DATA: lv_json_len TYPE i.

        lv_json_len = strlen( lv_json_obj_sap ) - 1.

        lv_json_obj_sap = lv_json_obj_sap(lv_json_len) &&
                          ',"_CHANGE_TYPE":"' &&
                          <lv_change_type> &&
                          '","_CHANGE_SEQUENCE_NUMBER":' &&
                          <lv_change_sequence_number> && '}'.

      ENDIF.

      ls_message_sap-data = cl_http_utility=>encode_base64( unencoded = lv_json_obj_sap ).
      APPEND ls_message_sap TO ls_input_sap-messages.
    ENDLOOP.


    TRY.
        CALL METHOD go_pubsub->publish_topics
          EXPORTING
            iv_p_projects_id = CONV #( go_pubsub->gv_project_id )
            iv_p_topics_id   = CONV #( gs_config-pubsub_topic )
            is_input         = ls_input_sap
          IMPORTING
            ev_ret_code      = ev_ret_code
            ev_err_text      = ev_err_text
            es_err_resp      = ev_err_resp.
      CATCH cx_root INTO lo_root.
        ev_ret_code = 461.
        ev_err_text = lo_root->get_text( ).
    ENDTRY.