in source/code/plugins/out_oms_blob.rb [62:317]
def create_blob_put_request(uri, msg, request_id, file_path = nil)
headers = {}
headers[OMS::CaseSensitiveString.new("x-ms-meta-TimeZoneid")] = OMS::Common.get_current_timezone
headers[OMS::CaseSensitiveString.new("x-ms-meta-ComputerName")] = OMS::Common.get_hostname
if !file_path.nil?
headers[OMS::CaseSensitiveString.new("x-ms-meta-FilePath")] = file_path
end
azure_resource_id = OMS::Configuration.azure_resource_id
if !azure_resource_id.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-AzureResourceId")] = azure_resource_id
end
azure_region = OMS::Configuration.azure_region if defined?(OMS::Configuration.azure_region)
if !azure_region.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-AzureRegion")] = azure_region
end
omscloud_id = OMS::Configuration.omscloud_id
if !omscloud_id.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-OMSCloudId")] = omscloud_id
end
uuid = OMS::Configuration.uuid
if !uuid.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-UUID")] = uuid
end
headers[OMS::CaseSensitiveString.new("X-Request-ID")] = request_id
headers["Content-Type"] = "application/octet-stream"
headers["Content-Length"] = msg.bytesize.to_s
headers[OMS::CaseSensitiveString.new("x-ms-version")] = "2016-05-31"
req = Net::HTTP::Put.new(uri.request_uri, headers)
req.body = msg
return req
rescue OMS::RetryRequestException => e
OMS::Log.error_once("HTTP error for Request-ID: #{request_id} Error: #{e}")
raise e.message, "Request-ID: #{request_id}"
end
def request_blob_json(container_type, data_type, custom_data_type, suffix)
data_type_id = data_type
if !custom_data_type.nil?
data_type_id = "#{data_type}.#{custom_data_type}"
end
url_suffix = eval(url_suffix_template)
data = {
"ContainerType" => container_type,
"DataTypeId" => data_type_id,
"ExpiryDuration" => blob_uri_expiry,
"Suffix" => url_suffix,
"SkipScanningQueue" => true,
"SupportWriteOnlyBlob" => true
}
extra_headers = {
OMS::CaseSensitiveString.new('x-ms-client-request-retry-count') => "#{@num_errors}"
}
req = OMS::Common.create_ods_request(OMS::Configuration.get_blob_ods_endpoint.path, data, compress=false, extra_headers)
ods_http = OMS::Common.create_ods_http(OMS::Configuration.get_blob_ods_endpoint, @proxy_config)
body = OMS::Common.start_request(req, ods_http)
clean_body = body.encode(Encoding::UTF_8, :invalid => :replace, :undef => :replace, :replace => "")
return JSON.parse(clean_body)
end
def get_blob_uri_and_committed_blocks(container_type, data_type, custom_data_type, suffix)
blob_json = request_blob_json(container_type, data_type, custom_data_type, suffix)
if blob_json.has_key?("Uri")
blob_uri = URI.parse(blob_json["Uri"])
else
@log.error "JSON from BLOB does not contain a URI"
blob_uri = nil
end
if blob_json.has_key?("CommittedBlockList") and !blob_json["CommittedBlockList"].nil?
blocks_committed = blob_json["CommittedBlockList"]
else
blocks_committed = []
end
if blob_json.has_key?("Size")
blob_size = blob_json["Size"]
else
blob_size = 0
end
return blob_uri, blocks_committed, blob_size
end
def append_blob(uri, msgs, file_path, blocks_committed)
if msgs.size == 0
return 0
end
msg = ''
msgs.each { |s| msg << "#{s}\r\n" if s.to_s.length > 0 }
dataSize = msg.length
if dataSize == 0
return 0
end
chunk_size = 100000000
blocks_uncommitted = []
if msg.to_s.length <= chunk_size
blocks_uncommitted << upload_block(uri, msg)
else
while msg.to_s.length > 0 do
chunk = msg.slice!(0, chunk_size)
blocks_uncommitted << upload_block(uri, chunk)
end
end
etag = commit_blocks(uri, blocks_committed, blocks_uncommitted, file_path)
return dataSize, etag
end
def upload_block(uri, msg)
base64_blockid = Base64.encode64(SecureRandom.uuid)
request_id = SecureRandom.uuid
append_uri = URI.parse("#{uri.to_s}&comp=block&blockid=#{base64_blockid}")
@log.debug("uploading block request_id=#{request_id}, blockid=#{base64_blockid}")
@log.trace("blockid=#{base64_blockid} block=#{msg}")
put_block_req = create_blob_put_request(append_uri, msg, request_id, nil)
http = OMS::Common.create_secure_http(append_uri, @proxy_config)
OMS::Common.start_request(put_block_req, http)
return base64_blockid
end
def commit_blocks(uri, blocks_committed, blocks_uncommitted, file_path)
doc = REXML::Document.new "<BlockList />"
blocks_committed.each { |blockid| doc.root.add_element(REXML::Element.new("Committed").add_text(blockid)) }
blocks_uncommitted.each { |blockid| doc.root.add_element(REXML::Element.new("Uncommitted").add_text(blockid)) }
commit_msg = doc.to_s
blocklist_uri = URI.parse("#{uri.to_s}&comp=blocklist")
request_id = SecureRandom.uuid
put_blocklist_req = create_blob_put_request(blocklist_uri, commit_msg, request_id, file_path)
http = OMS::Common.create_secure_http(blocklist_uri, @proxy_config)
response = OMS::Common.start_request(put_blocklist_req, http, ignore404 = false, return_entire_response = true)
headers = response.to_hash
if headers.has_key?("etag")
etag_quoted = headers["etag"]
if etag_quoted.is_a?(Array)
etag_quoted = etag_quoted[0]
end
etag = etag_quoted.gsub(/"/, "")
else
@log.error("Cannot extract ETag from BLOB response
etag = ""
end
return etag
end
def notify_blob_upload_complete(uri, data_type, custom_data_type, offset_blob_size, sent_size, etag)
data_type_id = data_type
if !custom_data_type.nil?
data_type_id = "#{data_type}.#{custom_data_type}"
end
uri.fragment = uri.query = nil
data = {
"metadata-TimeZoneId" => OMS::Common.get_current_timezone,
"DataType" => "BLOB_UPLOAD_NOTIFICATION",
"IPName" => "",
"DataItems" => [
{
"BlobUrl" => uri.to_s,
"OriginalDataTypeId" => data_type_id,
"StartOffset" => offset_blob_size,
"FileSize" => (offset_blob_size + sent_size),
"Etag" => etag
}
]
}
req = OMS::Common.create_ods_request(OMS::Configuration.notify_blob_ods_endpoint.path, data, compress=false)
ods_http = OMS::Common.create_ods_http(OMS::Configuration.notify_blob_ods_endpoint, @proxy_config)
body = OMS::Common.start_request(req, ods_http)
end
def write_status_file(success, message)
fn = '/var/opt/microsoft/omsagent/log/ODSIngestionBlob.status'
status = '{ "operation": "ODSIngestionBlob", "success": "%s", "message": "%s" }' % [success, message]
begin
File.open(fn,'w') { |file| file.write(status) }
rescue => e
@log.debug "Error:'#{e}'"
end
end