java/com/jetbrains/cef/remote/CefServer.java (353 lines of code) (raw):
package com.jetbrains.cef.remote;
import com.jetbrains.cef.remote.browser.RemoteBrowser;
import com.jetbrains.cef.remote.browser.RemoteClient;
import com.jetbrains.cef.remote.thrift.TException;
import com.jetbrains.cef.remote.thrift.server.TServer;
import com.jetbrains.cef.remote.thrift.server.TThreadPoolServer;
import com.jetbrains.cef.remote.thrift.transport.TServerTransport;
import com.jetbrains.cef.remote.thrift_codegen.ClientHandlers;
import org.cef.CefApp;
import org.cef.CefSettings;
import org.cef.handler.CefAppHandler;
import org.cef.misc.CefLog;
import org.cef.misc.Utils;
import java.io.File;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class CefServer {
// NOTE: TeamCity runs tests in parallel with downloading (and other processes), and because of that
// first CEF initialization takes a long time (more than 15 sec in 1% of test runs). So use a large constant here.
private static final int WAIT_FOR_SERVER_START_SEC = Utils.getInteger("JCEF_WAIT_FOR_SERVER_START_SEC", 60);
private static final int WAIT_FOR_SERVER_EXIT_SEC = Utils.getInteger("JCEF_WAIT_FOR_SERVER_EXIT_SEC", 10);
private static HashSet<CefServer> ourInstances = new HashSet<>();
private final ThriftTransport myThriftServer;
private final File myServerExe;
private final boolean myConnectAsMaster;
private ThriftTransport myThriftBackward;
private final CefParams myParams;
private CefApp myCefApp = null;
// Fields for cef-handlers execution on java side
private Thread myClientHandlersThread;
private TServer myClientHandlersServer;
private TServerTransport myClientHandlersTransport;
private final RpcContext myRpc;
private final ClientHandlersImpl myClientHandlersImpl;
private volatile boolean myIsConnected = false;
private volatile boolean myIsContextInitialized = false;
private volatile boolean myIsDisconnected = false;
private volatile boolean myIsCrashed = false;
private final LinkedList<Runnable> myDelayedActions = new LinkedList<>();
private Runnable myDisconnectionCallback = null;
public final Map<Integer, RemoteClient> cid2Client = new ConcurrentHashMap<>();
public final Map<Integer, RemoteBrowser> bid2Browser = new ConcurrentHashMap<>();
public CefServer(ThriftTransport transport, String[] args, CefSettings settings) {
this(NativeServerManager.getServerExe(), transport, args, settings);
}
public CefServer(File serverExe, ThriftTransport transport, String[] args, CefSettings settings) {
this(serverExe, transport, args, settings, true);
}
public CefServer(File serverExe, ThriftTransport transport, String[] args, CefSettings settings, boolean connectAsMaster) {
myThriftServer = transport;
myServerExe = serverExe;
myConnectAsMaster = connectAsMaster;
myParams = new CefParams(settings, args);
myRpc = new RpcContext(this);
myClientHandlersImpl = new ClientHandlersImpl(myRpc);
CefServer otherRunningInstance = findInstance(myParams, true);
if (otherRunningInstance != null)
CefLog.Error("Found running CefServer instance for params:\n%s", myParams);
ourInstances.add(this);
CefLog.Debug("Created CefServer instance '%s' with transport '%s'. Params:\n%s", toStringShort(), transport, myParams);
}
public static CefServer findInstance(CefParams params, boolean onlyRunning) {
if (params == null)
return null;
CefLog.Debug("Find for params: " + params);
synchronized (ourInstances) {
for (CefServer s: ourInstances) {
if (onlyRunning) {
if (s.myIsDisconnected)
continue;
final CefApp app = s.getCefApp();
if (app != null && app.isShuttingDown())
continue;
}
if (s.myParams.isAlmostEqual(params))
return s;
}
return null;
}
}
public static CefServer findRunningInstance(String[] args, CefSettings settings) {
return findInstance(new CefParams(settings, args), true);
}
public static int getInstancesCount() {
synchronized (ourInstances) {
return ourInstances.size();
}
}
public static void logInstances() {
if (CefLog.IsDebugEnabled()) {
CefLog.Debug("Available CefServer instances: ");
synchronized (ourInstances) {
for (CefServer s : ourInstances)
CefLog.Debug(s.toStringDetailed());
}
}
}
public ThriftTransport getThriftServer() { return myThriftServer; }
public String[] getArgs() { return myParams.args; }
public CefSettings getCefSettings() { return myParams.settings; }
public String toStringShort() { return myThriftServer.toStringShort(); }
public String toStringDetailed() { return "CefServer_" + myThriftServer.toStringShort() + ", params: " + myParams.toString(); }
public CefApp getCefApp() { return myCefApp; }
public void setCefApp(CefApp cefApp) { myCefApp = cefApp; }
public void setDisconnectionCallback(Runnable disconnectionCallback) { myDisconnectionCallback = disconnectionCallback; }
// Connects to CefServer and start cef-handlers service.
// Should be executed in bg thread.
// NOTE: appHandler is necessary for (1) custom schemes, (2) onContextInitialized callback
public boolean start(CefAppHandler appHandler) {
try {
if (!CefApp.isRemoteEnabled())
return false;
final String runningRoot = NativeServerManager.isRunning(myThriftServer);
if (runningRoot != null) {
// NOTE: pipe-names/ports are unique for each client process, so we can go here only when custom transport is specified manually.
CefLog.Info("Going to connect with already running cef_server: transport '%s', root '%s'", myThriftServer, runningRoot);
} else {
final boolean success = ServerStarter.startProcessAndWait(myServerExe, myThriftServer, appHandler, myParams.args, myParams.settings, false, null, WAIT_FOR_SERVER_START_SEC*1000l);
if (!success)
return false;
}
if (!connect(appHandler == null ? null : appHandler::onContextInitialized)) {
CefLog.Error("CefServer.connect() fails, can't initialize thrift client for native server.");
return false;
}
return true;
} catch (Throwable e) {
CefLog.Error("RuntimeException in CefServer.start: %s", e.getMessage());
return false;
} finally {
synchronized (myDelayedActions) {
myDelayedActions.clear();
}
}
}
// returns true when server is connected and action was executed immediately
public boolean onConnected(Runnable r, String name, boolean first) {
synchronized (myDelayedActions) {
if (myIsConnected) {
if (r != null)
r.run();
return true;
}
if (r != null) {
if (first)
myDelayedActions.addFirst(r);
else
myDelayedActions.addLast(r);
CefLog.Debug("Delay action '%s' until server connected (first=%s).", name, String.valueOf(first));
}
return false;
}
}
public RpcContext getRpcContext() { return myRpc; }
public RemoteClient createClient() {
return new RemoteClient(myRpc);
}
public String getVersion() {
if (myIsConnected)
return myRpc.execObj(r->r.getServerInfo("version"));
return "unknown(not connected)";
}
private boolean connect(Runnable onContextInitialized) {
myClientHandlersImpl.setOnContextInitialized(() -> {
myIsContextInitialized = true;
if (onContextInitialized != null)
onContextInitialized.run();
});
int cid = -1;
try {
// 1. Start server for cef-handlers execution. Open transport for rpc-handlers
try {
CefLog.Debug("Initialize CefServer, open server transport.");
myRpc.openTransport(myThriftServer);
} catch (TException x) {
CefLog.Error("TException when opening server %s : %s", myThriftServer.isTcp() ? "tcp-socket" : "pipe", x.getMessage());
return false;
}
CefLog.Info("cef_server version: %s", (String)myRpc.execObj(r->r.getServerInfo("version")));
// 2. Open transport for backward rpc calls (from native to java)
if (ThriftTransport.isTcpUsed())
myThriftBackward = new ThriftTransport(ThriftTransport.findFreePort(null));
else {
// Use unique suffix for pipe name to avoid filename collisions.
final String suffix = "" + System.currentTimeMillis();
myThriftBackward = new ThriftTransport(ThriftTransport.getJavaHandlersPipe(suffix));
}
try {
myClientHandlersTransport = myThriftBackward.createServerTransport();
} catch (Exception e) {
CefLog.Error("Exception when opening backward transport '%s'", myThriftBackward.toString(), e.getMessage());
return false;
}
// 3. Start service for backward rpc calls (from native to java)
ClientHandlers.Processor processor = new ClientHandlers.Processor(myClientHandlersImpl);
TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(myClientHandlersTransport)
.processor(processor).executorService(new ThreadPoolExecutor(3, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() {
final AtomicLong count = new AtomicLong();
public Thread newThread(Runnable r) {
final String name = String.format("CefHandlers-execution-%d", this.count.getAndIncrement());
Thread thread = new Thread(()->{
r.run();
CefServer.this.onCefHandlersThreadFinished();
}, name);
thread.setDaemon(true);
return thread;
}
}));
myClientHandlersServer = new TThreadPoolServer(serverArgs);
myClientHandlersThread = new Thread(()-> myClientHandlersServer.serve());
myClientHandlersThread.setName("CefHandlers-listening");
myClientHandlersThread.start();
// 4. Connect to CefServer.
cid = myRpc.connect(myThriftBackward, myConnectAsMaster);
if (cid == -1) {
CefLog.Error("Can't connect to '%s', cid==-1", this.toStringDetailed());
return false;
}
CefLog.Debug("Connected to '%s', cid=%d", this.toStringDetailed(), cid);
} catch (Throwable e) {
CefLog.Error("RuntimeException in CefServer.connect: %s", e.getMessage());
return false;
} finally {
synchronized (myDelayedActions) {
if (cid != -1) {
myIsConnected = true;
myDelayedActions.forEach(r -> r.run());
}
myDelayedActions.clear();
}
}
return true;
}
public void stop() {
CefLog.Debug("Disconnect from native server '%s'.", myThriftServer);
disconnect();
if (WAIT_FOR_SERVER_EXIT_SEC > 0) {
if (myIsCrashed)
CefLog.Debug("Server [%s] was crashed, will skip waiting for stop.", myThriftServer);
else {
CefLog.Debug("Waiting for server [%s] stop (max %d sec).", myThriftServer, WAIT_FOR_SERVER_EXIT_SEC);
final long startMs = System.currentTimeMillis();
final Thread t = new Thread(() -> {
boolean stopped = NativeServerManager.waitForStopped(myThriftServer, WAIT_FOR_SERVER_EXIT_SEC * 1000);
if (stopped)
CefLog.Info("Server [%s] was stopped in %d ms.", myThriftServer, System.currentTimeMillis() - startMs);
else
CefLog.Error("Can't stop server [%s] in %d seconds.", myThriftServer, WAIT_FOR_SERVER_EXIT_SEC);
}, "CEF-shutdown-thread");
t.setDaemon(false);
t.start();
}
}
synchronized (ourInstances) {
ourInstances.remove(this);
}
}
void onRpcThriftException(TException e) {
onDisconnected(e);
}
void onCefHandlersThreadFinished() {
onDisconnected(null);
}
private void disconnect() {
myIsConnected = false;
myIsDisconnected = true;
myRpc.close();
if (myClientHandlersServer != null) {
myClientHandlersServer.stop();
myClientHandlersServer = null;
}
if (myClientHandlersTransport != null) {
myClientHandlersTransport.close();
myClientHandlersTransport = null;
}
}
private void onDisconnected(TException e) {
if (!myIsConnected)
return;
//
// Check CefApp state and notify the user if necessary.
//
final CefApp app = myCefApp;
if (app == null) {
if (e == null)
CefLog.Error("CefApp of server '%s' is null when disconnection happened.", toStringShort());
else
CefLog.Error("CefApp of server '%s' is null, thrift exception: %s", toStringShort(), e.getMessage());
} else if (!app.isShuttingDown()) {
myIsCrashed = true;
if (e == null)
CefLog.Error("CefApp of server '%s' isn't shutting down, but java-client is disconnected", toStringShort());
else
CefLog.Error("CefApp of server '%s' isn't shutting down, but java-client is disconnected, thrift exception: %s", toStringShort(), e.getMessage());
if (myDisconnectionCallback != null)
myDisconnectionCallback.run();
}
//
// Set the 'isConnected' flag and perform close actions.
//
disconnect();
}
public static TServer startTestHandlersService(ThriftTransport backward, CountDownLatch finished) {
// Start dummy service for backward rpc calls (from native to java)
TServerTransport transport;
try {
transport = backward.createServerTransport();
} catch (Exception e) {
CefLog.Error("Exception when opening transport '%s' for test-client, error: %s", backward.toString(), e.getMessage());
return null;
}
ClientHandlers.Processor processor = new ClientHandlers.Processor(new ClientHandlersDummy());
TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(transport)
.processor(processor).executorService(new ThreadPoolExecutor(2, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() {
final AtomicLong count = new AtomicLong();
public Thread newThread(Runnable r) {
final String name = String.format("CefHandlers(dummy)-execution-%d", this.count.getAndIncrement());
Thread thread = new Thread(r, name);
return thread;
}
}));
TServer result = new TThreadPoolServer(serverArgs) {
@Override
public void stop() {
super.stop();
transport.close();
}
};
Thread t = new Thread(()-> {
try {
result.serve();
} catch (Throwable e) {
throw e;
} finally {
if (finished != null)
finished.countDown();
}
});
t.setName("CefHandlers(dummy)-listening");
t.start();
return result;
}
//
// Convenience methods
//
public void exec(RpcExecutor.Rpc r) {
myRpc.exec(r);
}
public <T> T execObj(RpcExecutor.RpcObj<T> r) {
return myRpc.execObj(r);
}
public static class CefParams {
final String[] args;
final Set<String> argsSet = new HashSet<>();
final CefSettings settings;
public CefParams(CefSettings settings, String[] args) {
this.args = args == null ? new String[0] : Arrays.copyOf(args, args.length);
this.settings = settings == null ? new CefSettings() : settings.clone();
if (args != null && args.length > 0) {
for (String arg: args)
if (arg != null)
argsSet.add(arg.trim());
}
}
public boolean isAlmostEqual(CefParams cp) {
if (!argsSet.equals(cp.argsSet))
return false;
return settings.isAlmostEqual(cp.settings);
}
@Override
public String toString() {
return argsSet + "| " + settings.getDescription();
}
}
}