run_job_and_wait

in source/code/plugins/oms_common.rb [1053:1152]


    def run_job_and_wait(&block)
      read_io, write_io = IO.pipe

      run_garbage_collection

      pid = fork do
        ["SIGHUP", "SIGTERM"].each do |sig|
          Signal.trap(sig) { log "Child process ##{Process.pid} receiving #{sig}\n"; exit }
        end

        read_io.close 
        result = {}
        begin
          yield_ret = yield
          trace "yield_ret=#{yield_ret}"

          yield_telemetry = []
          if yield_ret.is_a?(Array)
            yield_ret.each { |data|
              operation, source, event = UNKOWN, UNKOWN, nil
              event = data[:event] if data.is_a?(Hash) && data.key?(:event)
              
              source = data[:source] if data.is_a?(Hash) && data.key?(:source)
              if event.is_a?(String)
                operation = "LOG_ERROR"
              elsif event.is_a?(Hash) && event.key?(:op)
                operation = event[:op]
              end

              if event != nil and source != UNKOWN
                yield_telemetry.push({'operation': operation, 'event': event, 'source': source})
              end
            }
          end
          result[:telemetry] = yield_telemetry
          result[:return] = yield_ret
        rescue => e 
          result[:exception] = {'class': e.class.name, 'msg': e.message, 'backtrace': e.backtrace}
        end
        
        Process.exit(false) if Process.ppid == 1

        trace "write_io <= #{result}"
        write_io.write(result)
      end

      add_process_to_cache(pid)
      write_io.close 
      
      read = read_io.read
      results = eval(read)
      trace "Receiving read=#{read} results=#{results}"

      read_io.close
      Process.waitpid(pid)
      remove_process_from_cache(pid)

      unless results.is_a?(Hash)
        log "results is not a hash, results=#{results}"
        return results
      end

      
      if results.key?(:exception) and results[:exception] != nil
        ex_class_name = results[:exception][:class]
        ex_msg = results[:exception][:msg]
        ex_backtrace = results[:exception][:backtrace]
        if Object.const_defined?(ex_class_name)
          ex = Object.const_get(ex_class_name, Class.new).new(ex_msg)
          ex.set_backtrace(ex_backtrace)
          raise ex
        else
          error "exception not found '#{ex_class_name}', we will raise a generic exception with msg='#{ex_msg}'"
          ex = StandardError.new(ex_msg)
          ex.set_backtrace(ex_backtrace)
          raise ex
        end
      end

      
      if results.key?(:telemetry) and results[:telemetry] != nil
        results[:telemetry].each do |item|
          operation, event, source = item[:operation], item[:event], item[:source]
          debug "Handling operation=#{operation}, source=#{source}, event=#{event}"
          if operation === LOG_ERROR
            @log.error(event)
          else
            OMS::Telemetry.push_back_qos_event(source, event)
          end
        end
      end

      
      if results.key?(:return) and results[:return] != nil
        return results[:return]
      end

      return nil
    end