#include "ServerApplication.h"

#include <thread>
#include <mutex>
#include <condition_variable>

#include "include/base/cef_callback.h"
#include "include/wrapper/cef_closure_task.h"
#include "include/cef_app.h"
#include "include/cef_base.h"

#include "log/Log.h"
#include "Utils.h"
#include "ServerHandler.h"
#include "ServerHandlerContext.h"
#include "RpcExecutor.h"
#include "DebugInfo.h"

#include <sstream>
#include "browser/RemoteBrowser.h"
#include "handlers/app/RemoteAppHandler.h"

#if defined(OS_LINUX)
#include <X11/Xlib.h>
#endif

#include <regex>

using namespace apache::thrift;
using namespace thrift_codegen;

namespace {
bool TRACE_HANDLERS_LIFESPAN = false;
std::regex * TRACE_THRIFT_MESSAGES_REGEXP = nullptr;
}

class MyServerProcessor : public ServerProcessor {
 public:
  MyServerProcessor(::std::shared_ptr<ServerHandler> iface) : ServerProcessor(iface), myHandler(iface) {}

  bool process(std::shared_ptr<protocol::TProtocol> in,
               std::shared_ptr<protocol::TProtocol> out,
               void* connectionContext) override {
    std::string fname;
    protocol::TMessageType mtype;
    int32_t seqid;
    in->readMessageBegin(fname, mtype, seqid);

    if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) {
      Log::error("received invalid message type %d from client", mtype);
      return false;
    }

    myIsProcessing = true;
    myStartDispatch = std::chrono::steady_clock::now();
    myFuncName = fname;

    if (TRACE_THRIFT_MESSAGES_REGEXP != nullptr) {
      std::smatch m;
      if(std::regex_match(fname, m, *TRACE_THRIFT_MESSAGES_REGEXP))
        Log::trace("\t process %s", fname.c_str());
    }

    const bool dispatchResult = dispatchCall(in.get(), out.get(), fname, seqid, connectionContext);
    myIsProcessing = false;
    return dispatchResult;
  }

  const std::shared_ptr<ServerHandler> getServerHandler() const { return myHandler; }
  bool isProcessing() const { return myIsProcessing; }
  std::chrono::steady_clock::time_point getStartDispatch() const { return myStartDispatch; }
  std::string getFuncName() const { return myFuncName; }

 private:
  const std::shared_ptr<ServerHandler> myHandler;

  volatile bool myIsProcessing = false;
  std::chrono::steady_clock::time_point myStartDispatch;
  std::string myFuncName;
};

class MyServerProcessorFactory : public ::apache::thrift::TProcessorFactory {
 public:
  MyServerProcessorFactory() noexcept {}

  ::std::shared_ptr< ::apache::thrift::TProcessor > getProcessor(const ::apache::thrift::TConnectionInfo& connInfo) override {
    ServerHandler * ptrServerHandler = new ServerHandler;
    if (TRACE_HANDLERS_LIFESPAN)
      Log::trace("Created ServerHandler: %p", ptrServerHandler);
    const int cooldownMs =
        ServerApplication::instance().getCmdArgs().getOpenTransportCooldownMs();
    if (cooldownMs > 0) {
      // system 'cooldown' (otherwise pipe transport may not open on Windows)
      std::this_thread::sleep_for(std::chrono::milliseconds(cooldownMs));
    }

    std::shared_ptr<ServerHandler> handler(ptrServerHandler);
    std::shared_ptr<MyServerProcessor> processor(new MyServerProcessor(handler), [&](MyServerProcessor* p){
      if (TRACE_HANDLERS_LIFESPAN)
        Log::trace("Release ServerHandler: %p", p->getServerHandler().get());
      const bool isMaster = p->getServerHandler()->isMaster();
      bool needStartShuttingDown = false;
      {
        Lock lock(myMutex);
        myProcessors.erase(p);
        if (isMaster) {
          needStartShuttingDown = true;
          for (auto processor: myProcessors)
            if (processor->getServerHandler()->isMaster()) {
              needStartShuttingDown = false;
              break;
            }
        }
      }

      delete p;

      if (needStartShuttingDown)
        ServerApplication::instance().startShuttingDown();
    });

    Lock lock(myMutex);
    myProcessors.insert(processor.get());
    return processor;
  }

  void forEach(std::function<void(const MyServerProcessor*)> visitor);

  bool hasMaster();
  std::shared_ptr<ServerHandlerContext> findCtx(int connectionId);

 protected:
  std::recursive_mutex myMutex;
  std::set<const MyServerProcessor*> myProcessors;
};

std::shared_ptr<ServerHandlerContext> MyServerProcessorFactory::findCtx(int connectionId) {
  Lock lock(myMutex);
  for (auto p: myProcessors)
    if (p->getServerHandler()->getCid() == connectionId)
      return p->getServerHandler()->getCtx();
  return nullptr;
}

void MyServerProcessorFactory::forEach(std::function<void(const MyServerProcessor*)> visitor) {
  Lock lock(myMutex);
  for (const auto& h: myProcessors)
    visitor(h);
}

struct cancelled_error {};

class CancellationPoint {
 public:
  CancellationPoint() : myStop(false) {}

  void cancel() {
    std::unique_lock<std::mutex> lock(myMutex);
    myStop = true;
    myCond.notify_all();
  }

  template <typename P>
  void wait(const P& period) {
    std::unique_lock<std::mutex> lock(myMutex);
    if (myStop || myCond.wait_for(lock, period) == std::cv_status::no_timeout)
      throw cancelled_error();
  }

 private:
  bool myStop;
  std::mutex myMutex;
  std::condition_variable myCond;
};

ServerApplication ServerApplication::ourInstance;

ServerApplication::ServerApplication() {}

ServerApplication::~ServerApplication() {
  if (myAppHandler != nullptr)
    myAppHandler->Release();
}

#if defined(OS_MAC)
namespace CefUtils {
  bool loadCefFramework();
}
#endif

void ServerApplication::onBeforeExit() {
  myStopWatcher->cancel();
}

bool ServerApplication::init(int argc, char* argv[]) {
  myTimeStart = std::chrono::steady_clock::now();
  if (!myCmdArgs.init(argc, argv)) {
    Log::debug("Show help and exit.");
    return false;
  }

  Log::info("Init ServerApplication with transport: %s.", myCmdArgs.getTransportDesc().c_str());

  myFactory = std::make_shared<MyServerProcessorFactory>();

#if defined(OS_MAC)
  const std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
  if (!CefUtils::loadCefFramework()) {
    Log::error("Can't load CEF framework library.");
    return false;
  }

  const std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now();
  if (Log::isDebugEnabled()) {
    auto d1 = std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0);
    Log::debug("Loaded CEF framework library, spent %d ms", (int)d1.count());
  }
#elif defined(OS_LINUX)
  XInitThreads();
#endif

  CefSettings settings;
  myCmdArgs.prepareCefSettings(&settings);
  myAppHandler = new RemoteAppHandler(myCmdArgs.myChromiumSwitches, settings, myCmdArgs.myCustomSchemes);
  myAppHandler->AddRef();

  // Read constants from env
  TRACE_HANDLERS_LIFESPAN = getBoolEnv("CEF_SERVER_TRACE_HANDLERS_LIFESPAN");
  const char* envTraceMessagesRegexp = getenv("CEF_SERVER_TRACE_THRIFT_MESSAGES_REGEXP");
  if (envTraceMessagesRegexp != nullptr)
    TRACE_THRIFT_MESSAGES_REGEXP = new std::regex(envTraceMessagesRegexp);

  // Init watcher thread
  myStopWatcher = std::make_shared<CancellationPoint>();
  myThreadWatcher = std::thread([&]() {
    Log::setThreadName("Watcher");
    const std::chrono::milliseconds timeoutWatchMs(getLongEnv("CEF_SERVER_timeoutWatchMs", 5000));
    std::chrono::milliseconds timeoutDebugLogMs(getLongEnv("CEF_SERVER_TimeoutStacktraceLogMs", 30 * 1000));
    std::chrono::milliseconds timeoutExecutionMs(getLongEnv("CEF_SERVER_ourTimeoutExecutionMs", 5 * 1000));
    std::chrono::milliseconds timeoutShuttingDownMs(getLongEnv("CEF_SERVER_ourTimeoutShuttingDownMs", 25 * 1000));

    std::chrono::steady_clock::time_point lastDebugLog = std::chrono::steady_clock::now() - timeoutDebugLogMs;

    while (true) {
      try {
        myStopWatcher->wait(std::chrono::milliseconds(timeoutWatchMs));
      } catch (const cancelled_error&) {
        Log::debug("Watcher thread was stopped.");
        return 0;
      }

      std::this_thread::sleep_for(timeoutWatchMs);
      const std::chrono::time_point now(std::chrono::steady_clock::now());
      myFactory->forEach([&](const MyServerProcessor* p){
        if (!p || !p->getServerHandler()) {
          Log::error("Can't be: !p || !p->getServerHandler()");
          return;
        }
        if (!p->getServerHandler()->getCtx())
          return;
        enum {
          ServerHandler,
          JavaService,
          JavaServiceIO
        };

        using namespace std::chrono_literals;
        std::chrono::duration<float, std::micro> execTimes[] = {0us, 0us, 0us};

        const std::chrono::time_point now(std::chrono::steady_clock::now());
        // 1. Check ServerHandler timings
        if (p->isProcessing())
          execTimes[ServerHandler] = now - p->getStartDispatch();

        // 2. Check JavaService timings
        std::shared_ptr<RpcExecutor> rpcExecutor = p->getServerHandler()->getCtx()->javaService();
        if (rpcExecutor && rpcExecutor->isProcessing())
          execTimes[JavaService] = now - rpcExecutor->getProcessingStart();
          //printDebugIfNecessary(,);

        std::shared_ptr<RpcExecutor> rpcExecutorIO = p->getServerHandler()->getCtx()->javaServiceIO();
        if (rpcExecutorIO && rpcExecutorIO->isProcessing())
          execTimes[JavaServiceIO] = now - rpcExecutorIO->getProcessingStart();

          //printDebugIfNecessary(rpcExecutorIO->getProcessingName(),
        const bool isTimeoutServerHandler = execTimes[ServerHandler] > timeoutExecutionMs;
        const bool isTimeoutJavaService = execTimes[JavaService] > timeoutExecutionMs;
        const bool isTimeoutJavaServiceIO = execTimes[JavaServiceIO] > timeoutExecutionMs;
        const bool needPrintDebugLog = (now - lastDebugLog) >= timeoutDebugLogMs;
        if (isTimeoutServerHandler || isTimeoutJavaService || isTimeoutJavaServiceIO) {
          if (needPrintDebugLog) {
            Log::warn("Execution time exceeds timeout. Probably deadlock occurred. Events:");
            if (isTimeoutServerHandler)
              Log::info("Java2Native function '%s' executes more than %d seconds",
                        p->getFuncName().c_str(), (int)(std::chrono::duration_cast<std::chrono::seconds>(execTimes[ServerHandler])).count());
            else if (p->isProcessing())
              Log::info("Java2Native function '%s' was started.", p->getFuncName().c_str());
            else
              Log::info("Java2Native function '%s' was finished.", p->getFuncName().c_str());

            if (isTimeoutJavaService)
              Log::info("Native2Java function '%s' executes more than %d seconds",
                        rpcExecutor->getProcessingName().c_str(), (int)(std::chrono::duration_cast<std::chrono::seconds>(execTimes[JavaService])).count());
            else if (rpcExecutor->isProcessing())
              Log::info("Native2Java function '%s' was started.", rpcExecutor->getProcessingName().c_str());
            else
              Log::info("Native2Java function '%s' was finished.", rpcExecutor->getProcessingName().c_str());

            if (isTimeoutJavaServiceIO)
              Log::info("Native2Java IO function '%s' executes more than %d seconds",
                        rpcExecutorIO->getProcessingName().c_str(), (int)(std::chrono::duration_cast<std::chrono::seconds>(execTimes[JavaServiceIO])).count());
            else if (rpcExecutorIO->isProcessing())
              Log::info("Native2Java IO function '%s' was started.", rpcExecutorIO->getProcessingName().c_str());
            else
              Log::info("Native2Java IO function '%s' was finished.", rpcExecutorIO->getProcessingName().c_str());
          }
        }
      });

      // 3. Check application timings
      bool needShutdownHard = false;
      {
        Lock lock(myMutexState);
        if (myState >= SS_SHUTTING_DOWN) {
          std::chrono::duration<float, std::micro> elapsed = now - myTimeStartShuttingDown;
          if (elapsed > timeoutShuttingDownMs) {
            Log::warn("Start hard shutdown (elapsed %d ms)", std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
            myState = SS_SHUTDOWN;
            needShutdownHard = true;
          }
        }
      }
      if (needShutdownHard) {
        CefPostTask(TID_UI, base::BindOnce(CefQuitMessageLoop));
        Log::debug("CefQuitMessageLoop is posted (to be executed on UI thread), wait a little before exit...");
        std::this_thread::sleep_for(std::chrono::milliseconds(2000));
        Log::debug("Buy [%s]!", myCmdArgs.getTransportDesc().c_str());
        std::exit(0);
      }
    }
  });
  myThreadWatcher.detach();
  return true;
}

std::shared_ptr<apache::thrift::TProcessorFactory> ServerApplication::getProcessorFactory() const {
  return myFactory;
}

void ServerApplication::startShuttingDown() {
  {
    Lock lock(myMutexState);
    if (myState >= SS_SHUTTING_DOWN)
      return;

    myState = SS_SHUTTING_DOWN;
    myTimeStartShuttingDown = std::chrono::steady_clock::now();
  }

  myThreadShutdown = std::thread([&]() {
    Log::setThreadName("Shutdown");
    std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
    const std::chrono::milliseconds timeout(getLongEnv("CEF_SERVER_timeoutShutdownMs", 15000));

    while (std::chrono::steady_clock::now() - start < timeout) {
      std::this_thread::sleep_for(std::chrono::milliseconds(1000));
      const std::chrono::time_point now(std::chrono::steady_clock::now());

      if (RemoteBrowser::getAllBrowsersCount() == 0) {
        {
          Lock lock(myMutexState);
          myState = SS_SHUTDOWN;
        }
        CefPostTask(TID_UI, base::BindOnce(CefQuitMessageLoop));
        Log::debug("CefQuitMessageLoop will be invoked now (on TID_UI).");
        return;
      }

      if (Log::isTraceEnabled()) {
        std::vector<int> remainingBids = RemoteBrowser::enumAllBrowsers();
        std::stringstream ss;
        for (int bid : remainingBids)
          ss << bid << ", ";
        Log::trace("Server is waiting for closing remaining browsers with bids: %s", ss.str().c_str());
      }
    }
    Log::trace("Exit shutdown-loop because of timeout.");
    CefPostTask(TID_UI, base::BindOnce(CefQuitMessageLoop)); // not sure that it is necessary here..
  });
  myThreadShutdown.detach();
}

std::string ServerApplication::getRootPath() const {
  return myAppHandler->getRootPath();
}

bool ServerApplication::isDefaultRoot() const {
  return myAppHandler->isDefaultRoot();
}

const std::chrono::steady_clock::time_point& ServerApplication::getStartTime() const {
  return myTimeStart;
}

std::string ServerApplication::getState() {
  Lock lock(myMutexState);
  switch (myState) {
    case SS_NEW: return "SS_NEW";
    case SS_SHUTDOWN: return "SHUTDOWN";
    case SS_SHUTTING_DOWN: return "SHUTTING_DOWN";
    default: return "UNKNOWN_STATE";
  }
}

std::string ServerApplication::getStateWithDetails() {
  std::stringstream ss;

  ss << "Server state: " << getState() << std::endl;

  ss << "Server handlers:" << std::endl;
  myFactory->forEach([&](const MyServerProcessor* p) {
    if (!p || !p->getServerHandler()) return; // can't be
    ss << p->getServerHandler()->getDebugInfo(1);
  });
  ss << std::endl;

  ss << "RemoteObject factories:" << std::endl;
  ss << DebugInfo::getInfo(1);

  ss << "Measures:" << std::endl;
  ss << DebugInfo::getMeasures(1);

  return ss.str();
}

std::shared_ptr<ServerHandlerContext> ServerApplication::getCtx(int connectionId) {
    return myFactory->findCtx(connectionId);
}
