create_blob_put_request

in source/code/plugins/out_oms_blob.rb [62:317]


    def create_blob_put_request(uri, msg, request_id, file_path = nil)
      headers = {}

      headers[OMS::CaseSensitiveString.new("x-ms-meta-TimeZoneid")] = OMS::Common.get_current_timezone
      headers[OMS::CaseSensitiveString.new("x-ms-meta-ComputerName")] = OMS::Common.get_hostname
      if !file_path.nil?
        headers[OMS::CaseSensitiveString.new("x-ms-meta-FilePath")] = file_path
      end

      azure_resource_id = OMS::Configuration.azure_resource_id
      if !azure_resource_id.to_s.empty?
        headers[OMS::CaseSensitiveString.new("x-ms-AzureResourceId")] = azure_resource_id
      end
      
      azure_region = OMS::Configuration.azure_region if defined?(OMS::Configuration.azure_region)
      if !azure_region.to_s.empty?
        headers[OMS::CaseSensitiveString.new("x-ms-AzureRegion")] = azure_region
      end
      
      omscloud_id = OMS::Configuration.omscloud_id
      if !omscloud_id.to_s.empty?
        headers[OMS::CaseSensitiveString.new("x-ms-OMSCloudId")] = omscloud_id
      end

      uuid = OMS::Configuration.uuid
      if !uuid.to_s.empty?
        headers[OMS::CaseSensitiveString.new("x-ms-UUID")] = uuid
      end

      headers[OMS::CaseSensitiveString.new("X-Request-ID")] = request_id

      headers["Content-Type"] = "application/octet-stream"
      headers["Content-Length"] = msg.bytesize.to_s

      
      headers[OMS::CaseSensitiveString.new("x-ms-version")] = "2016-05-31"

      req = Net::HTTP::Put.new(uri.request_uri, headers)
      req.body = msg
      return req
    rescue OMS::RetryRequestException => e
        OMS::Log.error_once("HTTP error for Request-ID: #{request_id} Error: #{e}")
        raise e.message, "Request-ID: #{request_id}"
    end 

    
    
    
    
    
    
    
    
    def request_blob_json(container_type, data_type, custom_data_type, suffix)
      data_type_id = data_type
      if !custom_data_type.nil?
        data_type_id = "#{data_type}.#{custom_data_type}"
      end

      url_suffix = eval(url_suffix_template)

      data = {
        "ContainerType" => container_type,
        "DataTypeId" => data_type_id,
        "ExpiryDuration" => blob_uri_expiry,
        "Suffix" => url_suffix,
        "SkipScanningQueue" => true,
        "SupportWriteOnlyBlob" => true
      }

      extra_headers = {
        OMS::CaseSensitiveString.new('x-ms-client-request-retry-count') => "#{@num_errors}"
      }
      req = OMS::Common.create_ods_request(OMS::Configuration.get_blob_ods_endpoint.path, data, compress=false, extra_headers)

      ods_http = OMS::Common.create_ods_http(OMS::Configuration.get_blob_ods_endpoint, @proxy_config)
      body = OMS::Common.start_request(req, ods_http)

      
      clean_body = body.encode(Encoding::UTF_8, :invalid => :replace, :undef => :replace, :replace => "")
      return JSON.parse(clean_body)
    end 

    
    
    
    
    
    
    
    
    
    def get_blob_uri_and_committed_blocks(container_type, data_type, custom_data_type, suffix)
      blob_json = request_blob_json(container_type, data_type, custom_data_type, suffix)

      if blob_json.has_key?("Uri")
        blob_uri = URI.parse(blob_json["Uri"])
      else
        @log.error "JSON from BLOB does not contain a URI"
        blob_uri = nil
      end
      if blob_json.has_key?("CommittedBlockList") and !blob_json["CommittedBlockList"].nil?
        blocks_committed = blob_json["CommittedBlockList"]
      else
        blocks_committed = []
      end
      if blob_json.has_key?("Size")
        blob_size = blob_json["Size"]
      else
        blob_size = 0
      end

      return blob_uri, blocks_committed, blob_size
    end 

    
    
    
    
    
    def append_blob(uri, msgs, file_path, blocks_committed)
      if msgs.size == 0
        return 0
      end

      
      msg = ''
      msgs.each { |s| msg << "#{s}\r\n" if s.to_s.length > 0 }
      dataSize = msg.length

      if dataSize == 0
        return 0
      end

      
      
      chunk_size = 100000000
      blocks_uncommitted = []
      if msg.to_s.length <= chunk_size
        blocks_uncommitted << upload_block(uri, msg)
      else
        while msg.to_s.length > 0 do
          chunk = msg.slice!(0, chunk_size)
          blocks_uncommitted << upload_block(uri, chunk)
        end
      end

      
      etag = commit_blocks(uri, blocks_committed, blocks_uncommitted, file_path)
      return dataSize, etag
    end 

    
    
    
    
    
    
    def upload_block(uri, msg)
      base64_blockid = Base64.encode64(SecureRandom.uuid)
      request_id = SecureRandom.uuid
      append_uri = URI.parse("#{uri.to_s}&comp=block&blockid=#{base64_blockid}")

      @log.debug("uploading block request_id=#{request_id}, blockid=#{base64_blockid}")
      @log.trace("blockid=#{base64_blockid} block=#{msg}")

      put_block_req = create_blob_put_request(append_uri, msg, request_id, nil)
      http = OMS::Common.create_secure_http(append_uri, @proxy_config)
      OMS::Common.start_request(put_block_req, http)

      return base64_blockid
    end 

    
    
    
    
    
    
    
    def commit_blocks(uri, blocks_committed, blocks_uncommitted, file_path)
      doc = REXML::Document.new "<BlockList />"
      blocks_committed.each { |blockid| doc.root.add_element(REXML::Element.new("Committed").add_text(blockid)) }
      blocks_uncommitted.each { |blockid| doc.root.add_element(REXML::Element.new("Uncommitted").add_text(blockid)) }

      commit_msg = doc.to_s

      blocklist_uri = URI.parse("#{uri.to_s}&comp=blocklist")
      request_id = SecureRandom.uuid
      put_blocklist_req = create_blob_put_request(blocklist_uri, commit_msg, request_id, file_path)
      http = OMS::Common.create_secure_http(blocklist_uri, @proxy_config)
      response = OMS::Common.start_request(put_blocklist_req, http, ignore404 = false, return_entire_response = true)

      headers = response.to_hash
      if headers.has_key?("etag")
        etag_quoted = headers["etag"]
        if etag_quoted.is_a?(Array)
          etag_quoted = etag_quoted[0]
        end
        etag = etag_quoted.gsub(/"/, "")
      else
        @log.error("Cannot extract ETag from BLOB response 
        etag = ""
      end
      return etag
    end 

    
    
    
    
    
    
    
    
    def notify_blob_upload_complete(uri, data_type, custom_data_type, offset_blob_size, sent_size, etag)
      data_type_id = data_type
      if !custom_data_type.nil?
        data_type_id = "#{data_type}.#{custom_data_type}"
      end

      
      uri.fragment = uri.query = nil

      data = {
        
        "metadata-TimeZoneId" => OMS::Common.get_current_timezone,
        "DataType" => "BLOB_UPLOAD_NOTIFICATION",
        "IPName" => "",
        "DataItems" => [
          {
            "BlobUrl" => uri.to_s,
            "OriginalDataTypeId" => data_type_id,
            "StartOffset" => offset_blob_size,
            "FileSize" => (offset_blob_size + sent_size),
            "Etag" => etag
          }
        ]
      }

      req = OMS::Common.create_ods_request(OMS::Configuration.notify_blob_ods_endpoint.path, data, compress=false)

      ods_http = OMS::Common.create_ods_http(OMS::Configuration.notify_blob_ods_endpoint, @proxy_config)
      body = OMS::Common.start_request(req, ods_http)
    end 
    
    def write_status_file(success, message)
      fn = '/var/opt/microsoft/omsagent/log/ODSIngestionBlob.status'
      status = '{ "operation": "ODSIngestionBlob", "success": "%s", "message": "%s" }' % [success, message]
      begin
        File.open(fn,'w') { |file| file.write(status) }
      rescue => e
        @log.debug "Error:'#{e}'"
      end
    end