in streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpWriterTask.scala [60:99]
def buildRequest(url: String): Request = {
val method = httpMethods.filter(x => url.startsWith(x)).head
val uriAndParams = url.drop(method.length + 3).split("\\?")
val uri = uriAndParams.head
val builder = method match {
case HttpGet.METHOD_NAME => asyncHttpClient.prepareGet(uri)
case HttpDelete.METHOD_NAME => asyncHttpClient.prepareDelete(uri)
case HttpOptions.METHOD_NAME => asyncHttpClient.prepareOptions(uri)
case HttpTrace.METHOD_NAME => asyncHttpClient.prepareTrace(uri)
case HttpPost.METHOD_NAME => asyncHttpClient.preparePost(uri)
case HttpPatch.METHOD_NAME => asyncHttpClient.preparePatch(uri)
case HttpPut.METHOD_NAME => asyncHttpClient.preparePut(uri)
}
if (header != null && header.nonEmpty) {
header.foreach { case (k, v) => builder.setHeader(k, v) }
}
Try(uriAndParams(1).trim).getOrElse(null) match {
case null =>
case params =>
val paramMap = new util.HashMap[String, String]()
params
.split("&")
.foreach(
x => {
val param = x.split("=")
paramMap.put(param.head, param.last)
})
if (paramMap.nonEmpty) {
builder.setHeader(HttpHeaders.Names.CONTENT_TYPE, HttpHeaders.Values.APPLICATION_JSON)
val json = JsonUtils.write(paramMap)
builder.setBody(json.getBytes)
}
}
builder.setRequestTimeout(thresholdConf.timeout).build()
}