worker/src/reporter/reporter_grpc.rs (88 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::anyhow;
use skywalking::reporter::{CollectItemConsume, CollectItemProduce, grpc::GrpcReporter};
use std::time::Duration;
use tokio::time::sleep;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity};
use tracing::{debug, info, warn};
pub struct GrpcReporterConfiguration {
pub server_addr: String,
pub authentication: String,
pub enable_tls: bool,
pub ssl_cert_chain_path: String,
pub ssl_key_path: String,
pub ssl_trusted_ca_path: String,
}
pub async fn run_reporter(
config: GrpcReporterConfiguration, producer: impl CollectItemProduce,
consumer: impl CollectItemConsume,
) -> anyhow::Result<()> {
let endpoint = create_endpoint(&config).await?;
let channel = connect(endpoint).await;
let mut reporter = GrpcReporter::new_with_pc(channel, producer, consumer);
if !config.authentication.is_empty() {
reporter = reporter.with_authentication(config.authentication);
}
info!("Worker is ready...");
let handle = reporter
.reporting()
.await
.with_status_handle(|message, status| {
warn!(?status, "Collect failed: {}", message);
})
.spawn();
handle
.await
.map_err(|err| anyhow!("Tracer reporting failed: {:?}", err))?;
Ok(())
}
async fn create_endpoint(config: &GrpcReporterConfiguration) -> anyhow::Result<Endpoint> {
let scheme = if config.enable_tls { "https" } else { "http" };
let url = format!("{}://{}", scheme, config.server_addr);
debug!(url, "Create Endpoint");
let mut endpoint = Endpoint::from_shared(url)?;
debug!(
enable_tls = config.enable_tls,
ssl_trusted_ca_path = config.ssl_trusted_ca_path,
ssl_key_path = config.ssl_key_path,
ssl_cert_chain_path = config.ssl_cert_chain_path,
"Skywalking TLS info"
);
if config.enable_tls {
let domain_name = config.server_addr.split(':').next().unwrap_or_default();
debug!(domain_name, "Configure TLS domain");
let mut tls = ClientTlsConfig::new().domain_name(domain_name);
let ssl_trusted_ca_path = &config.ssl_trusted_ca_path;
if !ssl_trusted_ca_path.is_empty() {
debug!(ssl_trusted_ca_path, "Configure TLS CA");
let ca_cert = tokio::fs::read(&config.ssl_trusted_ca_path).await?;
let ca_cert = Certificate::from_pem(ca_cert);
tls = tls.ca_certificate(ca_cert);
}
let ssl_key_path = &config.ssl_key_path;
let ssl_cert_chain_path = &config.ssl_cert_chain_path;
if !ssl_key_path.is_empty() && !ssl_cert_chain_path.is_empty() {
debug!(ssl_trusted_ca_path, "Configure mTLS");
let client_cert = tokio::fs::read(&config.ssl_cert_chain_path).await?;
let client_key = tokio::fs::read(&config.ssl_key_path).await?;
let client_identity = Identity::from_pem(client_cert, client_key);
tls = tls.identity(client_identity);
}
endpoint = endpoint.tls_config(tls)?;
}
Ok(endpoint)
}
#[tracing::instrument(skip_all)]
async fn connect(endpoint: Endpoint) -> Channel {
let channel = loop {
match endpoint.connect().await {
Ok(channel) => break channel,
Err(err) => {
warn!(?err, "Connect to skywalking server failed, retry after 10s");
sleep(Duration::from_secs(10)).await;
}
}
};
let uri = &*endpoint.uri().to_string();
info!(uri, "Skywalking server connected");
channel
}