diff --git a/tapeserver/daemon/ProcessManager.cpp b/tapeserver/daemon/ProcessManager.cpp index fd77f84f19395517eb0f73cae8565d68ad01a6ea..de7d12743a0f65399c685fa2d77de1cf7f95e042 100644 --- a/tapeserver/daemon/ProcessManager.cpp +++ b/tapeserver/daemon/ProcessManager.cpp @@ -65,6 +65,9 @@ int ProcessManager::run() { // The first statuses were updated at subprocess registration, so we do // not need an initialization. while(true) { + // Manage sigChild requests + auto sigChildStatus = runSigChildManagement(); + if (sigChildStatus.doExit) return sigChildStatus.exitCode; // Manage shutdown requests and completions. auto shutdownStatus = runShutdownManagement(); if (shutdownStatus.doExit) return shutdownStatus.exitCode; @@ -155,6 +158,27 @@ ProcessManager::RunPartStatus ProcessManager::runForkManagement() { return RunPartStatus(); } +ProcessManager::RunPartStatus ProcessManager::runSigChildManagement() { + // If any process received sigChild, we signal it to all processes + bool sigChild = std::count_if(m_subprocessHandlers.cbegin(), + m_subprocessHandlers.cend(), + [](const SubprocessAndStatus &i){return i.status.sigChild;}); + if (sigChild) { + for(auto & sp: m_subprocessHandlers) { sp.status = sp.handler->processSigChild(); } + } + // If all processes completed their shutdown, we can exit + bool shutdownComplete=true; + for (auto & sp: m_subprocessHandlers) { shutdownComplete &= sp.status.shutdownComplete; } + if (shutdownComplete) { + RunPartStatus ret; + ret.doExit = true; + ret.exitCode = EXIT_SUCCESS; + return ret; + } + return RunPartStatus(); +} + + void ProcessManager::runEventLoop() { // Compute the next timeout. Epoll expects milliseconds. std::chrono::time_point<std::chrono::steady_clock> nextTimeout = decltype(nextTimeout)::max(); @@ -165,7 +189,7 @@ void ProcessManager::runEventLoop() { sp.status = sp.handler->processTimeout(); // If the handler requested kill, shutdown or fork, we can go back to handlers, // which means we exit from the loop here. - if (sp.status.forkRequested || sp.status.killRequested || sp.status.shutdownRequested) return; + if (sp.status.forkRequested || sp.status.killRequested || sp.status.shutdownRequested || sp.status.sigChild) return; // If new timeout is still in the past, we overlook it // TODO: log if (sp.status.nextTimeout < std::chrono::steady_clock::now()) { diff --git a/tapeserver/daemon/ProcessManager.hpp b/tapeserver/daemon/ProcessManager.hpp index 871da2c33d06acf07dc391be10ca55d25a48dd8a..597cc05512d69b88f50cac8997a5f7d42b976032 100644 --- a/tapeserver/daemon/ProcessManager.hpp +++ b/tapeserver/daemon/ProcessManager.hpp @@ -65,6 +65,8 @@ private: RunPartStatus runKillManagement(); /// subpart for run(): handle forking RunPartStatus runForkManagement(); + /// subpart for run(): handle SIGCHLD + RunPartStatus runSigChildManagement(); /// subpart for run(): handle events and timeouts. Does not return any value. void runEventLoop(); }; diff --git a/tapeserver/daemon/ProcessManagerTests.cpp b/tapeserver/daemon/ProcessManagerTests.cpp index 4cd85fb1f0e0ba08ddbe7c17f4f800b0efdac391..7ea406be6611b2522ebff35707789d57d962cb3f 100644 --- a/tapeserver/daemon/ProcessManagerTests.cpp +++ b/tapeserver/daemon/ProcessManagerTests.cpp @@ -25,13 +25,18 @@ namespace unitTests { TEST(cta_Daemon, ProcessManager) { cta::tape::daemon::ProcessManager pm; - std::unique_ptr<EchoSubprocess> es(new EchoSubprocess("Echo subprocess", pm)); - // downcast pointer - std::unique_ptr<cta::tape::daemon::SubprocessHandler> sph = std::move(es); - pm.addHandler(std::move(sph)); - pm.run(); + { + std::unique_ptr<EchoSubprocess> es(new EchoSubprocess("Echo subprocess", pm)); + // downcast pointer + std::unique_ptr<cta::tape::daemon::SubprocessHandler> sph = std::move(es); + pm.addHandler(std::move(sph)); + pm.run(); + } + EchoSubprocess & es = dynamic_cast<EchoSubprocess&>(pm.at("Echo subprocess")); + ASSERT_TRUE(es.echoReceived()); } + // TODO: process manager managing several handlers. } //namespace unitTests \ No newline at end of file diff --git a/tapeserver/daemon/SignalHandler.cpp b/tapeserver/daemon/SignalHandler.cpp index bae42155762daa0cc146752160f8c30d6c5c4ec8..cd8417129a6fea79ea3009bda36209e09cdd5095 100644 --- a/tapeserver/daemon/SignalHandler.cpp +++ b/tapeserver/daemon/SignalHandler.cpp @@ -55,8 +55,11 @@ SignalHandler::~SignalHandler() { } SubprocessHandler::ProcessingStatus SignalHandler::getInitialStatus() { - // On initiation, we expect nothing but signals, i.e. default status. - return SubprocessHandler::ProcessingStatus(); + // On initiation, we expect nothing but signals, i.e. default status + // except we are considered shutdown at all times. + SubprocessHandler::ProcessingStatus ret; + ret.shutdownComplete = true; + return ret; } SubprocessHandler::ProcessingStatus SignalHandler::processEvent() { @@ -98,7 +101,9 @@ SubprocessHandler::ProcessingStatus SignalHandler::processEvent() { } break; case SIGCHLD: - // TODO: add SIGCHLD handling. + // We will request the processing of sigchild until it is acknowledged (by receiving + // it ourselves) + m_sigChildPending = true; break; } SubprocessHandler::ProcessingStatus ret; @@ -106,6 +111,7 @@ SubprocessHandler::ProcessingStatus SignalHandler::processEvent() { // for it ret.shutdownRequested = m_shutdownRequested && !m_shutdownAcknowlegded; ret.nextTimeout = m_shutdownStartTime+m_timeoutDuration; + ret.sigChild = m_sigChildPending; ret.shutdownComplete = true; // We are always ready to leave. return ret; } @@ -122,18 +128,33 @@ SubprocessHandler::ProcessingStatus SignalHandler::processTimeout() { // If we reach timeout, it means it's time to kill child processes SubprocessHandler::ProcessingStatus ret; ret.killRequested = true; + ret.shutdownComplete = true; + return ret; +} + +SubprocessHandler::ProcessingStatus SignalHandler::processSigChild() { + // Our sigchild is now acknowledged + m_sigChildPending = false; + SubprocessHandler::ProcessingStatus ret; + ret.shutdownRequested = m_shutdownRequested; + ret.shutdownComplete = true; + if (m_shutdownRequested) { + ret.nextTimeout = m_shutdownStartTime+m_timeoutDuration; + } return ret; } + SubprocessHandler::ProcessingStatus SignalHandler::shutdown() { // We received (back) our own shutdown: consider it acknowledged m_shutdownAcknowlegded = true; SubprocessHandler::ProcessingStatus ret; - ret.shutdownComplete = m_shutdownAcknowlegded; + ret.shutdownComplete = true; // if we ever asked for shutdown, we have a timeout if (m_shutdownRequested) { ret.nextTimeout = m_shutdownStartTime+m_timeoutDuration; } + ret.sigChild = m_sigChildPending; return ret; } diff --git a/tapeserver/daemon/SignalHandler.hpp b/tapeserver/daemon/SignalHandler.hpp index c2937cce1cdbee925a2278fd3425450fafb5dfb3..c7e7db6f0acd9c25bd5d17f8881efcf91b28f94d 100644 --- a/tapeserver/daemon/SignalHandler.hpp +++ b/tapeserver/daemon/SignalHandler.hpp @@ -35,6 +35,7 @@ public: SubprocessHandler::ProcessingStatus fork() override; SubprocessHandler::ProcessingStatus getInitialStatus() override; void kill() override; + SubprocessHandler::ProcessingStatus processSigChild() override; void prepareForFork() override; SubprocessHandler::ProcessingStatus processEvent() override; SubprocessHandler::ProcessingStatus processTimeout() override; @@ -49,6 +50,7 @@ private: int m_sigFd; bool m_shutdownRequested=false; bool m_shutdownAcknowlegded=false; + bool m_sigChildPending=false; std::chrono::time_point<std::chrono::steady_clock> m_shutdownStartTime= decltype(m_shutdownStartTime)::max(); std::chrono::nanoseconds m_timeoutDuration= diff --git a/tapeserver/daemon/SignalHandlerTests.cpp b/tapeserver/daemon/SignalHandlerTests.cpp index 6a777f0ac43a5fcadb36fa2b4de9f7399e266bf9..6b319f96969939d211483376f63a2b2f865c6249 100644 --- a/tapeserver/daemon/SignalHandlerTests.cpp +++ b/tapeserver/daemon/SignalHandlerTests.cpp @@ -71,4 +71,37 @@ TEST(cta_Daemon, SignalHandlerKill) { ASSERT_TRUE(ps.sawShutdown()); ASSERT_TRUE(ps.sawKill()); } -} \ No newline at end of file + +TEST(cta_Daemon, SignalHandlerSigChild) { + cta::tape::daemon::ProcessManager pm; + { + // Add the signal handler to the manager + std::unique_ptr<SignalHandler> sh(new SignalHandler(pm)); + // Set the timeout + sh->setTimeout(std::chrono::milliseconds(10)); + // downcast pointer + std::unique_ptr<SubprocessHandler> shSph = std::move(sh); + pm.addHandler(std::move(shSph)); + // Add the probe handler to the manager + std::unique_ptr<ProbeSubprocess> ps(new ProbeSubprocess()); + ps->shutdown(); + // downcast pointer + std::unique_ptr<SubprocessHandler> shPs = std::move(ps); + pm.addHandler(std::move(shPs)); + // This signal will be queued for the signal handler. + // Add the EchoSubprocess whose child will exit. + std::unique_ptr<EchoSubprocess> es(new EchoSubprocess("Echo", pm)); + es->setCrashingShild(true); + // downcast pointer + std::unique_ptr<SubprocessHandler> shEs = std::move(es); + pm.addHandler(std::move(shEs)); + } + pm.run(); + ProbeSubprocess &ps = dynamic_cast<ProbeSubprocess&>(pm.at("ProbeProcessHandler")); + ASSERT_TRUE(ps.sawShutdown()); + ASSERT_FALSE(ps.sawKill()); + ASSERT_TRUE(ps.sawSigChild()); + EchoSubprocess &es = dynamic_cast<EchoSubprocess&>(pm.at("Echo")); + ASSERT_FALSE(es.echoReceived()); +} +} diff --git a/tapeserver/daemon/SubprocessHandler.hpp b/tapeserver/daemon/SubprocessHandler.hpp index 7b07dfc83813143350bf032d2807b70814b601f1..00cd8ae7b0006ab3a64f37a80914ecaa0e07a05d 100644 --- a/tapeserver/daemon/SubprocessHandler.hpp +++ b/tapeserver/daemon/SubprocessHandler.hpp @@ -69,6 +69,7 @@ public: bool shutdownComplete = false; ///< Did this process complete its shutdown? bool killRequested = false; ///< Does the process handler require killing all processes bool forkRequested = false; ///< Does the procerss handler request to fork a new process? + bool sigChild = false; ///< Did the process see a SIGCHLD? /// Instant of the next timeout for the process handler. Defaults to end of times. std::chrono::time_point<std::chrono::steady_clock> nextTimeout=decltype(nextTimeout)::max(); /// A extra state variable used in the return value of fork() @@ -80,6 +81,8 @@ public: virtual ProcessingStatus processEvent() = 0; /** Function called to process timeouts. */ virtual ProcessingStatus processTimeout() = 0; + /** Function called when SIGCHLD is received */ + virtual ProcessingStatus processSigChild() = 0; /** Instructs the handler to initiate a clean shutdown and update its status */ virtual ProcessingStatus shutdown() = 0; /** Instructs the handler to kill its subprocess immediately. The process diff --git a/tapeserver/daemon/TestSubprocessHandlers.hpp b/tapeserver/daemon/TestSubprocessHandlers.hpp index 1791aab459ff5505a26b3115ebde1094e6511aab..a54b4f9636a5d41e32e9bfe40c4fe7e0c5e7a3e2 100644 --- a/tapeserver/daemon/TestSubprocessHandlers.hpp +++ b/tapeserver/daemon/TestSubprocessHandlers.hpp @@ -60,9 +60,12 @@ public: echo.counter = 666; std::string echoString; echoString.append((char*)&echo, sizeof(echo)); - m_socketPair.send(echoString); + try { + m_socketPair.send(echoString); + } catch (...) {} // Register the file descriptor. m_processManager.addFile(m_socketPair.getFdForAccess(cta::server::SocketPair::Side::child), this); + m_socketPairRegistered=true; ret.nextTimeout = m_subprocessLaunchTime + m_timeoutLength; ret.forkState = SubprocessHandler::ForkState::parent; return ret; @@ -73,6 +76,7 @@ public: } int runChild() override { + if (m_crashingChild) return EXIT_FAILURE; // Just wait forever for an echo request EchoRequestRepy echo; memset(&echo, '\0', sizeof(echo)); @@ -91,6 +95,18 @@ public: m_socketPair.send(echoString); return EXIT_SUCCESS; } + + SubprocessHandler::ProcessingStatus processSigChild() override { + // Check out child process's status. If the child process is still around, + // waitpid will return 0. Non zero if process completed (and status needs to + // be picked up) and -1 if the process is entirely gone. + if (!m_subprocessComplete && ::waitpid(m_childProcess, nullptr, WNOHANG)) { + m_subprocessComplete = true; + } + SubprocessHandler::ProcessingStatus ret; + ret.shutdownComplete = m_subprocessComplete; + return ret; + } void kill() override { // Send kill signal to child. @@ -99,31 +115,35 @@ public: SubprocessHandler::ProcessingStatus processEvent() override { // We expect to read from the child through the socket pair. - auto echoString = m_socketPair.receive(); - EchoRequestRepy echo; - auto size=echoString.copy((char*)&echo, sizeof(echo)); - if (size!=sizeof(echo)) { - std::stringstream err; - err << "In EchoSubprocess::processEvent(): unexpected size for echo: " - "expected: " << sizeof(echo) << " received: " << size; - throw cta::exception::Exception(err.str()); - } - if (echo.magic != 0xdeadbeef) { - std::stringstream err; - err << "In EchoSubprocess::processEvent(): unexpected magic number: " - "expected: " << std::hex << 0xdeadbeef << " received: " << echo.magic; - throw cta::exception::Exception(err.str()); - } - if (echo.counter != 667) { - std::stringstream err; - err << "In EchoSubprocess::processEvent(): unexpected counter: " - "expected: " << 667 << " received: " << echo.counter; - throw cta::exception::Exception(err.str()); - } + try { + auto echoString = m_socketPair.receive(); + EchoRequestRepy echo; + auto size=echoString.copy((char*)&echo, sizeof(echo)); + if (size!=sizeof(echo)) { + std::stringstream err; + err << "In EchoSubprocess::processEvent(): unexpected size for echo: " + "expected: " << sizeof(echo) << " received: " << size; + throw cta::exception::Exception(err.str()); + } + if (echo.magic != 0xdeadbeef) { + std::stringstream err; + err << "In EchoSubprocess::processEvent(): unexpected magic number: " + "expected: " << std::hex << 0xdeadbeef << " received: " << echo.magic; + throw cta::exception::Exception(err.str()); + } + if (echo.counter != 667) { + std::stringstream err; + err << "In EchoSubprocess::processEvent(): unexpected counter: " + "expected: " << 667 << " received: " << echo.counter; + throw cta::exception::Exception(err.str()); + } + m_echoReceived = true; + ::waitpid(m_childProcess, nullptr, 0); + m_subprocessComplete = true; + } catch (...) {} SubprocessHandler::ProcessingStatus ret; - ::waitpid(m_childProcess, nullptr, 0); - m_processManager.removeFile(m_socketPair.getFdForAccess(cta::server::SocketPair::Side::child)); - ret.shutdownComplete = true; + unregisterSocketpair(); + ret.shutdownComplete = m_subprocessComplete; return ret; } @@ -142,6 +162,7 @@ public: private: typedef cta::tape::daemon::SubprocessHandler SubprocessHandler; cta::server::SocketPair m_socketPair; + bool m_socketPairRegistered=false; cta::tape::daemon::ProcessManager & m_processManager; bool m_subprocesLaunched=false; bool m_subprocessComplete=false; @@ -152,6 +173,19 @@ private: uint32_t magic = 0xdeadbeef; uint32_t counter = 0; }; + + void unregisterSocketpair() { + if (m_socketPairRegistered) { + m_processManager.removeFile(m_socketPair.getFdForAccess(cta::server::SocketPair::Side::child)); + m_socketPairRegistered=false; + } + } + + bool m_crashingChild=false; + bool m_echoReceived=false; +public: + void setCrashingShild(bool doCrash) { m_crashingChild = doCrash; } + bool echoReceived() { return m_echoReceived; } }; /** A simple subprocess recording status changes and reporting them */ @@ -161,7 +195,9 @@ public: virtual ~ProbeSubprocess() {} SubprocessHandler::ProcessingStatus getInitialStatus() override { - return SubprocessHandler::ProcessingStatus(); + SubprocessHandler::ProcessingStatus ret; + ret.shutdownComplete = m_shutdownAsked && m_honorShutdown; + return ret; } void prepareForFork() override { } @@ -181,6 +217,14 @@ public: m_killAsked = true; } + SubprocessHandler::ProcessingStatus processSigChild() override { + m_sigChildReceived = true; + SubprocessHandler::ProcessingStatus ret; + ret.shutdownComplete = m_shutdownAsked && m_honorShutdown; + return ret; + } + + SubprocessHandler::ProcessingStatus processEvent() override { throw cta::exception::Exception("In ProbeSubprocess::processEvent(): should not have been called"); } @@ -197,12 +241,15 @@ public: bool sawKill() { return m_killAsked; } + bool sawSigChild() { return m_sigChildReceived; } + void setHonorShutdown(bool doHonor) { m_honorShutdown = doHonor; } private: bool m_shutdownAsked=false; bool m_killAsked=false; bool m_honorShutdown=true; + bool m_sigChildReceived=false; }; }