Providers/Modules/Plugins/VMInsights/plugin/VMInsightsEngine.rb (275 lines of code) (raw):
# frozen_string_literal: true
module VMInsights
require_relative 'VMInsightsDataCollector.rb'
class MetricsEngine
require 'json'
require 'thread'
def initialize
@thread = nil
end
def start(config, &cb)
raise ArgumentError, 'config is nil' if config.nil?
raise ArgumentError, "config is not kind of #{Configuration}" unless config.kind_of? Configuration
raise ArgumentError, 'proc required' if cb.nil?
raise RuntimeError, 'already started' unless @thread.nil?
@thread = PollingThread.new config.poll_interval, config.computer, config.log, config.data_collector, cb
nil
end
def stop
return if @thread.nil?
@thread = nil if @thread.stop
end
def running?
(! @thread.nil?) && @thread.status
end
class Configuration
def initialize(computer, log, data_collector)
raise ArgumentError unless log
raise ArgumentError, "#{data_collector.class.name}" unless data_collector.kind_of? IDataCollector
@poll_interval = 60 # seconds
@computer = computer
@data_collector = data_collector
@log = log
end
def poll_interval=(v)
raise ArgumentError unless (v.kind_of? Numeric) && (v.real?) && (v >= 1)
@poll_interval = v
end
attr_reader :poll_interval, :computer, :data_collector, :log
end # class Configuration
private
class PollingThread < Thread
def initialize(interval, computer, log, data_collector, cb)
@mutex = Mutex.new
@condvar = ConditionVariable.new
@run = true
@log = log
@data_collector = data_collector
@cb = cb
@saved_exception = SavedException.new
@saved_cpu_exception = SavedException.new
@cummulative_data = CummulativeData.new
super() {
@cummulative_data.initialize_from_baseline @data_collector.baseline
MetricTuple.computer computer
begin
@log.info "Starting polling loop at #{interval} second interval"
# try to keep as close to the polling interval, independent of
# any delay waking up from the sleep, collecting data, or
# emitting the message.
expected_wakeup_time = Time.now
while @run do
expected_wakeup_time += interval
now = Time.now
if expected_wakeup_time <= now
expected_wakeup_time = now
else
sleep_time = expected_wakeup_time - now
@mutex.synchronize {
@condvar.wait(@mutex, sleep_time) if @run
}
end
yield_metrics_message() if @run
end
ensure
@log.info "Stopping polling"
end
}
end
def stop
@mutex.synchronize {
@run = false
@condvar.broadcast
}
self.join(5) || self.terminate
end
def yield_metrics_message()
begin
begin #1
perf_data = gather_data
message = data_to_message perf_data
protected_yield message
rescue => std
@log.error_backtrace std.backtrace
@log.error std.message
end # begin #1
rescue SystemCallError => ex
return "#{__FILE__}(#{__LINE__}): #{Time.now}" if @saved_exception.same(ex)
@log.error ex.message
@saved_exception = SavedException.new(ex)
rescue => std
return "#{__FILE__}(#{__LINE__}): #{Time.now}" if @saved_exception.same(std)
@log.error std.message
@saved_exception = SavedException.new(std)
end
end
def protected_yield(*args)
@cb[*args]
rescue => ex
@log.error "Unexpected exception #{ex.inspect}"
@log.error_backtrace ex.backtrace
true # if there was an exception, assume next steps should happen so as not to have it keep happening
end
def gather_data
@log.debug "Gather Data" # Don't delete, used in unit test
data = Array.new
start_sample
[
:liveness,
:available_memory_mb,
:processor,
:logical_disks,
:network,
].each { |method|
begin
send(method) { |me| data << me }
rescue IDataCollector::Unavailable => un
@log.debug "#{method}: #{un.message}"
@log.debug_backtrace un.backtrace
rescue => ex
@log.error "Unexpected exception #{ex.inspect}"
@log.error_backtrace ex.backtrace
end
}
end_sample
return data
rescue SystemCallError => sce
@log.error sce.message
@log.debug_backtrace
rescue NoMemoryError, StandardError => ex
@log.error ex.message
@log.debug_backtrace
end
def data_to_message(data)
data
end
def start_sample
@data_collector.start_sample
end
def liveness
yield MetricTuple.factory "Computer", "Heartbeat", 1
nil
end
def available_memory_mb
free, total = @data_collector.get_available_memory_kb
free = mb_from_kb free
total = mb_from_kb total
yield MetricTuple.factory "Memory", "AvailableMB", free, { "#{MetricTuple::Origin}/memorySizeMB" => total }
nil
end
def processor
total_time, idle = @data_collector.get_cpu_idle
total_time, idle = @cummulative_data.get_cpu_time_delta total_time, idle
raise IDataCollector::Unavailable.new "total time delta is zero" if total_time.zero?
begin
cpus = @data_collector.get_number_of_cpus
yield MetricTuple.factory "Processor", "UtilizationPercentage",
100.0 * (1.0 - ((idle * 1.0) / (total_time * 1.0))),
{ "#{MetricTuple::Origin}/totalCpus" => cpus }
rescue => ex
unless @saved_cpu_exception.same(ex)
@log.error ex.message
@log.debug_backtrace
@saved_cpu_exception = SavedException.new(ex)
end
end
nil
end
def logical_disks
@data_collector.get_filesystems.each { |fs|
common_tag = { "#{MetricTuple::Origin}/mountId" => fs.mount_point }
yield MetricTuple.factory "LogicalDisk", "Status", 1, common_tag
yield MetricTuple.factory "LogicalDisk", "FreeSpacePercentage", (100.0 * fs.free_space_in_bytes) / fs.size_in_bytes, common_tag
yield MetricTuple.factory "LogicalDisk", "FreeSpaceMB",
fs.free_space_in_bytes / (1024 * 1024),
common_tag.merge({"#{MetricTuple::Origin}/diskSizeMB" => fs.size_in_bytes / (1024 * 1024)})
begin
perf = @data_collector.get_disk_stats(fs.device_name)
delta_time = perf.delta_time.to_f
reads = perf.reads
writes = perf.writes
yield MetricTuple.factory "LogicalDisk", "ReadsPerSecond", (reads / delta_time), common_tag unless reads.nil?
yield MetricTuple.factory "LogicalDisk", "WritesPerSecond", (writes / delta_time), common_tag unless writes.nil?
yield MetricTuple.factory "LogicalDisk", "TransfersPerSecond", ((reads + writes) / delta_time), common_tag unless (reads.nil? || writes.nil?)
reads = perf.bytes_read
writes = perf.bytes_written
yield MetricTuple.factory "LogicalDisk", "ReadBytesPerSecond", (reads / delta_time), common_tag unless reads.nil?
yield MetricTuple.factory "LogicalDisk", "WriteBytesPerSecond", (writes / delta_time), common_tag unless writes.nil?
yield MetricTuple.factory "LogicalDisk", "BytesPerSecond", ((reads + writes) / delta_time), common_tag unless (reads.nil? || writes.nil?)
rescue IDataCollector::Unavailable => un
@log.debug "#{fs.device_name}: #{un.message}"
@log.debug_backtrace un.backtrace
end
}
nil
end
def network
@data_collector.get_net_stats.each { |d|
yield make_network_metric d.delta_time, d.device, "ReadBytesPerSecond", d.bytes_received
yield make_network_metric d.delta_time, d.device, "WriteBytesPerSecond", d.bytes_sent
}
nil
end
def end_sample
@data_collector.end_sample
end
private
def make_network_metric(delta_time, dev, name, bytes)
MetricTuple.factory "Network", name,
(bytes.to_f / delta_time.to_f),
{
"#{MetricTuple::Origin}/networkDeviceId" => dev,
"#{MetricTuple::Origin}/bytes" => bytes
}
end
class SavedException
def initialize(ex = nil)
@ex = ex
@timeout = Time.now + (60 * 60)
end
def same(ex)
@ex && (@ex == ex) && (Time.now < @timeout)
end
end
def mb_from_kb(kb)
kb /= 1024.0
end
class CummulativeData
def initialize
@total_time = 0
@idle_time = 0
end
def initialize_from_baseline(baseline)
total_time = baseline[:total_time]
@total_time = total_time unless total_time.nil?
i = baseline[:idle]
@idle_time = i unless i.nil?
end
def get_cpu_time_delta(total_time, idle)
total_time_delta = (total_time - @total_time)
@total_time = total_time
idle_delta = (idle - @idle_time)
@idle_time = idle
return total_time_delta, idle_delta
end
end
class MetricTuple < Hash
def self.factory(namespace, name, value, tags = {})
result = {}
raise ArgumentError, "tags (#{tags.class}) must be a Hash" unless tags.kind_of? Hash
tags = Hash.new.merge! tags
result[:Origin] = Origin
result[:Namespace] = namespace
result[:Name] = name
result[:Value] = value
result[:Tags] = JSON.generate(tags)
result[:CollectionTime] = Time.new.utc.strftime("%FT%TZ")
result[:Computer] = @@computer if @@computer
result
end
def self.computer(name)
raise ArgumentError, "name must be a string or nil" unless name.nil? || name.kind_of?(String)
@@computer = name
end
@@computer = nil
Origin = "vm.azm.ms"
end # class MetricTuple
end # class PollingThread
end # class
end #module