src/com/facebook/buck/logd/client/LogdClient.java (99 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.buck.logd.client;
import com.facebook.buck.logd.LogDaemonException;
import com.facebook.buck.logd.proto.CreateLogRequest;
import com.facebook.buck.logd.proto.CreateLogResponse;
import com.facebook.buck.logd.proto.LogMessage;
import com.facebook.buck.logd.proto.LogType;
import com.facebook.buck.logd.proto.LogdServiceGrpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.rpc.Status;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Given a host and port number, this client is used to make a connection and stream logs to logD
* server
*/
public class LogdClient implements LogDaemonClient {
private static final Logger LOG = LogManager.getLogger();
private static final int TIME_OUT_SECONDS = 5;
private final ManagedChannel channel;
private final LogdServiceGrpc.LogdServiceBlockingStub blockingStub;
private final LogdServiceGrpc.LogdServiceStub asyncStub;
private Map<Integer, StreamObserver<Status>> responseObservers = new ConcurrentHashMap<>();
private Map<Integer, StreamObserver<LogMessage>> requestObservers = new ConcurrentHashMap<>();
private Map<Integer, String> fileIdToPath = new ConcurrentHashMap<>();
private StreamObserverFactory streamObserverFactory;
/**
* Constructs a LogdClient with the provided hostname and port number.
*
* @param host host name
* @param port port number
*/
public LogdClient(String host, int port) {
this(host, port, new DefaultStreamObserverFactory());
}
/**
* Constructs a LogdClient with the provided hostname, port number and an implementation of
* StreamObserverFactory.
*
* @param host host name
* @param port port number
* @param streamObserverFactory an implementation of StreamObserverFactory
*/
public LogdClient(String host, int port, StreamObserverFactory streamObserverFactory) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(), streamObserverFactory);
LOG.info("Channel established to {} at port {}", host, port);
}
/**
* Constructs a LogdClient with the provided channel
*
* @param channelBuilder a channel to LogD server
*/
@VisibleForTesting
public LogdClient(
ManagedChannelBuilder<?> channelBuilder, StreamObserverFactory streamObserverFactory) {
channel = channelBuilder.build();
blockingStub = LogdServiceGrpc.newBlockingStub(channel);
asyncStub = LogdServiceGrpc.newStub(channel);
this.streamObserverFactory = streamObserverFactory;
}
@Override
public void shutdown() {
try {
LOG.info(
"Awaiting termination of channel to logD server. Waiting for up to {} seconds...",
TIME_OUT_SECONDS);
channel.shutdown().awaitTermination(TIME_OUT_SECONDS, TimeUnit.SECONDS);
if (!channel.isTerminated()) {
LOG.warn(
"Channel is still open after shutdown request and {} seconds timeout. Shutting down forcefully...",
TIME_OUT_SECONDS);
channel.shutdownNow();
LOG.info("Successfully shut down LogD client.");
}
} catch (InterruptedException e) {
channel.shutdownNow();
LOG.info("Shutdown interrupted. Shutting down LogD client forcefully...");
}
}
@Override
public int createLogFile(String path, LogType logType) throws LogDaemonException {
CreateLogResponse response;
try {
response =
blockingStub.createLogFile(
CreateLogRequest.newBuilder().setLogFilePath(path).setLogType(logType).build());
int logdFileId = response.getLogId();
fileIdToPath.put(logdFileId, path);
return logdFileId;
} catch (StatusRuntimeException e) {
LOG.error("LogD failed to return response with a file identifier: " + e.getStatus(), e);
throw new LogDaemonException(
e, "LogD failed to create a log file at %s, of type %s", path, logType);
}
}
@Override
public StreamObserver<LogMessage> openLog(int logFileId, String logContent)
throws LogDaemonException {
// Client calls this method with the returned generated id from calling createLogFile
// logD server will then return the client with a requestObserver which observes and processes
// incoming logs from client i.e. subsequent logs are sent via requestObserver.onNext(...)
// Upon receiving a client's request to close the requestObserver, logD server will send a
// response back via a responseObserver confirming that logs have been written and close
// the responseObserver.
responseObservers.computeIfAbsent(
logFileId,
newLogFileId -> streamObserverFactory.createStreamObserver(fileIdToPath.get(newLogFileId)));
StreamObserver<LogMessage> requestObserver =
requestObservers.computeIfAbsent(
logFileId, newLogFileId -> asyncStub.openLog(responseObservers.get(newLogFileId)));
try {
LogMessage logMessage =
LogMessage.newBuilder().setLogId(logFileId).setLogMessage(logContent).build();
requestObserver.onNext(logMessage);
return requestObserver;
} catch (Exception e) {
requestObserver.onError(e);
throw new LogDaemonException(
e, "Failed to establish a log stream to logD at %s", fileIdToPath.get(logFileId));
}
}
}