in source/code/plugins/out_oms_changetracking_file.rb [497:610]
def write(chunk)
if !OMS::Configuration.load_configuration(omsadmin_conf_path, cert_path, key_path)
raise 'Missing configuration. Make sure to onboard. Will continue to buffer data.'
end
datatypes = {}
unmergable_records = []
chunk.msgpack_each {|(tag, record)|
if record.has_key?('DataType') and record.has_key?('IPName')
key = "#{record['DataType']}.#{record['IPName']}".upcase
if datatypes.has_key?(key)
datatypes[key]['DataItems'].concat(record['DataItems'])
else
if record.has_key?('DataItems')
datatypes[key] = record
else
unmergable_records << [key, record]
end
end
else
@log.warn "Missing DataType or IPName field in record from tag '#{tag}'"
end
}
datatypes.each do |tag, records|
handle_records(tag, records)
end
@log.trace "Handling #{unmergable_records.size} unmergeable records"
unmergable_records.each { |key, record|
handle_record(key, record)
}
save_content_location()
end
private
class ChunkErrorHandler
include Configurable
include PluginId
include PluginLoggerMixin
SecondaryName = "__ChunkErrorHandler__"
Plugin.register_output(SecondaryName, self)
def initialize
@router = nil
end
def secondary_init(primary)
@error_handlers = create_error_handlers @router
end
def start
end
def shutdown
end
def router=(r)
@router = r
end
def write(chunk)
chunk.msgpack_each {|(tag, record)|
@error_handlers[tag].emit(record)
}
end
private
def create_error_handlers(router)
nop_handler = NopErrorHandler.new
Hash.new() { |hash, tag|
etag = OMS::Common.create_error_tag tag
hash[tag] = router.match?(etag) ?
ErrorHandler.new(router, etag) :
nop_handler
}
end
class ErrorHandler
def initialize(router, etag)
@router = router
@etag = etag
end
def emit(record)
@router.emit(@etag, Fluent::Engine.now, record)
end
end
class NopErrorHandler
def emit(record)
end
end
end
end
end