write

in source/code/plugins/out_oms_changetracking_file.rb [497:610]


    def write(chunk)
      
      if !OMS::Configuration.load_configuration(omsadmin_conf_path, cert_path, key_path)
        raise 'Missing configuration. Make sure to onboard. Will continue to buffer data.'
      end

      
      datatypes = {}
      unmergable_records = []
      chunk.msgpack_each {|(tag, record)|
        if record.has_key?('DataType') and record.has_key?('IPName')
          key = "#{record['DataType']}.#{record['IPName']}".upcase

          if datatypes.has_key?(key)
            
            datatypes[key]['DataItems'].concat(record['DataItems'])
          else
            if record.has_key?('DataItems')
              datatypes[key] = record
            else
              unmergable_records << [key, record]
            end
          end
        else
          @log.warn "Missing DataType or IPName field in record from tag '#{tag}'"
        end
      }

      datatypes.each do |tag, records|
        handle_records(tag, records)
      end

      @log.trace "Handling #{unmergable_records.size} unmergeable records"
      unmergable_records.each { |key, record|
        handle_record(key, record)
      }

      save_content_location()
  end 



  private

    class ChunkErrorHandler
      include Configurable
      include PluginId
      include PluginLoggerMixin

      SecondaryName = "__ChunkErrorHandler__"

      Plugin.register_output(SecondaryName, self)

      def initialize
        @router = nil
      end

      def secondary_init(primary)
        @error_handlers = create_error_handlers @router
      end

      def start
        
      end

      def shutdown
        
      end

      def router=(r)
        @router = r
      end

      def write(chunk)
        chunk.msgpack_each {|(tag, record)|
          @error_handlers[tag].emit(record)
        }
      end
   
    private

      def create_error_handlers(router)
        nop_handler = NopErrorHandler.new
        Hash.new() { |hash, tag|
          etag = OMS::Common.create_error_tag tag
          hash[tag] = router.match?(etag) ?
                      ErrorHandler.new(router, etag) :
                      nop_handler
        }
      end

      class ErrorHandler
        def initialize(router, etag)
          @router = router
          @etag = etag
        end

        def emit(record)
          @router.emit(@etag, Fluent::Engine.now, record)
        end
      end

      class NopErrorHandler
        def emit(record)
          
        end
      end

   end 

 end 

end