winss
supervise.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2016-2017 Morgan Stanley
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef LIB_WINSS_SUPERVISE_SUPERVISE_HPP_
18 #define LIB_WINSS_SUPERVISE_SUPERVISE_HPP_
19 
20 #include <windows.h>
21 #include <filesystem>
22 #include <vector>
23 #include <chrono>
24 #include <string>
25 #include "easylogging/easylogging++.hpp"
26 #include "../handle_wrapper.hpp"
27 #include "../windows_interface.hpp"
28 #include "../filesystem_interface.hpp"
29 #include "../wait_multiplexer.hpp"
30 #include "../not_owning_ptr.hpp"
31 #include "../environment.hpp"
32 #include "../path_mutex.hpp"
33 #include "../process.hpp"
34 #include "../utils.hpp"
35 
36 namespace fs = std::experimental::filesystem;
37 
38 namespace winss {
43  std::chrono::system_clock::time_point time;
44  std::chrono::system_clock::time_point last;
46  bool is_up;
48  int up_count;
50  int exit_code;
51  DWORD pid;
52 };
53 
60  RUN,
61  END,
65 };
66 
71  public:
79  virtual bool Notify(SuperviseNotification notification,
80  const SuperviseState& state) = 0;
81 
85  virtual ~SuperviseListener() {}
86 };
87 
94 template<typename TMutex, typename TProcess>
96  protected:
99  TMutex mutex;
100  TProcess process;
101  fs::path service_dir;
102  SuperviseState state{};
104  std::vector<winss::NotOwningPtr<winss::SuperviseListener>> listeners;
105  int exiting = 0;
106  bool waiting = false;
111  virtual void Init() {
112  if (mutex.HasLock()) {
113  return;
114  }
115 
116  if (!FILESYSTEM.ChangeDirectory(service_dir)) {
117  LOG(ERROR)
118  << "The directory '"
119  << service_dir
120  << "' does not exist.";
121  multiplexer->Stop(kFatalExitCode);
122  return;
123  }
124 
125  if (!mutex.Lock()) {
126  multiplexer->Stop(kMutexTaken);
127  return;
128  }
129 
130  state.time = std::chrono::system_clock::now();
131  state.last = state.time;
132 
133  if (FILESYSTEM.FileExists(kDownFile)) {
134  state.initially_up = false;
135  state.remaining_count = 0;
136  }
137 
138  NotifyAll(START);
139  Triggered(false);
140  }
141 
142 
150  virtual DWORD GetFinishTimeout() const {
151  std::string timeout_finish = FILESYSTEM.Read(kTimeoutFinishFile);
152 
153  if (timeout_finish.empty()) {
154  return kCommandTimeout;
155  }
156 
157  return std::strtoul(timeout_finish.data(), nullptr, 10);
158  }
159 
166  virtual bool Start(const std::string& file_name) {
167  process.Close();
168 
169  std::string cmd = FILESYSTEM.Read(file_name);
170 
171  if (cmd.empty()) {
172  return false;
173  }
174 
175  std::string expanded = winss::Utils::ExpandEnvironmentVariables(cmd);
176  winss::EnvironmentDir env_dir(service_dir / fs::path(kEnvDir));
177 
178  winss::ProcessParams params{ expanded, true };
179  params.env = &env_dir;
180  bool created = process.Create(params);
181  state.pid = process.GetProcessId();
182  return created;
183  }
184 
190  virtual bool StartRun() {
191  if (state.remaining_count == 0) {
192  // Return true to prevent false positive unable to spawn message.
193  return true;
194  }
195 
196  VLOG(2) << "Starting run process";
197 
198  state.up_count++;
199  state.is_run_process = true;
200 
201  WINDOWS.SetEnvironmentVariable(kRunExitCodeEnvName, nullptr);
202  if (Start(kRunFile)) {
203  multiplexer->AddTriggeredCallback(process.GetHandle(), [this](
204  winss::WaitMultiplexer& m, const winss::HandleWrapper& handle) {
205  this->Triggered(false);
206  });
207  state.is_run_process = true;
208  state.is_up = true;
209  state.exit_code = 0;
210  if (state.remaining_count > 0) {
211  state.remaining_count--;
212  }
213  NotifyAll(RUN);
214  return true;
215  }
216 
217  return false;
218  }
219 
225  virtual bool StartFinish() {
226  VLOG(2) << "Starting finish process";
227 
228  state.is_run_process = false;
229 
230  WINDOWS.SetEnvironmentVariable(kRunExitCodeEnvName,
231  std::to_string(state.exit_code).c_str());
232 
233  if (Start(kFinishFile)) {
234  multiplexer->AddTriggeredCallback(process.GetHandle(), [this](
235  winss::WaitMultiplexer& m, const winss::HandleWrapper& handle) {
236  this->Triggered(false);
237  });
238  DWORD timeout = GetFinishTimeout();
239  if (timeout > 0) {
240  multiplexer->AddTimeoutCallback(timeout,
241  [this](winss::WaitMultiplexer&) {
242  Triggered(true);
243  }, kTimeoutGroup);
244  waiting = true;
245  }
246  state.is_up = true;
247  return true;
248  }
249 
250  return false;
251  }
252 
258  virtual void NotifyAll(winss::SuperviseNotification notification) {
259  state.time = std::chrono::system_clock::now();
260 
261  switch (notification) {
262  case RUN:
263  state.last = state.time;
264  break;
265  case END:
266  state.last = state.time;
267  break;
268  }
269 
270  auto it = listeners.begin();
271  while (it != listeners.end()) {
272  if ((*it)->Notify(notification, state)) {
273  ++it;
274  } else {
275  it = listeners.erase(it);
276  }
277  }
278  }
279 
287  virtual void Triggered(bool timeout) {
288  if (waiting && !timeout) {
289  multiplexer->RemoveTimeoutCallback(kTimeoutGroup);
290  }
291 
292  waiting = false;
293  int restart = 0;
294  DWORD wait = kBusyWait;
295 
296  if (state.is_up) {
297  if (state.is_run_process) {
298  VLOG(2) << "Run process ended";
299 
300  state.is_up = false;
301  state.pid = 0;
302 
303  NotifyAll(END);
304 
305  if (exiting) {
306  state.exit_code = kSignaledExitCode;
307  } else {
308  state.exit_code = process.GetExitCode();
309  }
310 
311  if (!StartFinish()) {
312  restart = 2;
313  }
314  } else {
315  if (timeout) {
316  process.Terminate();
317  return;
318  }
319 
320  VLOG(2) << "Finish process ended";
321 
322  state.is_up = false;
323  state.pid = 0;
324 
325  if (process.GetExitCode() == kDownExitCode) {
326  state.remaining_count = 0;
327  NotifyAll(BROKEN);
328  }
329 
330  restart = 2;
331  }
332  } else if (!Complete()) {
333  if (!StartRun()) {
334  restart = 1;
335  wait = kRunFailedWait;
336  LOG(WARNING) << "Unable to spawn ./run - waiting 10 seconds";
337  }
338  }
339 
340  if (restart > 1) {
341  NotifyAll(FINISHED);
342  }
343 
344  if (restart && !Complete() && state.remaining_count != 0) {
345  VLOG(2) << "Waiting for: " << wait;
346  waiting = true;
347  multiplexer->AddTimeoutCallback(wait,
348  [this](winss::WaitMultiplexer&) {
349  Triggered(true);
350  }, kTimeoutGroup);
351  }
352  }
353 
362  virtual bool Complete() {
363  switch (exiting) {
364  case 0:
365  return false;
366  case 1:
367  ++exiting;
368 
369  if (!multiplexer->IsStopping()) {
370  multiplexer->Stop(0);
371  }
372 
373  NotifyAll(EXIT);
374  default:
375  return true;
376  }
377  }
378 
382  virtual void Stop() {
383  if (exiting) {
384  return;
385  }
386 
387  exiting = 1;
388  Down();
389  }
390 
391  public:
392  static const int kMutexTaken = 100;
393  static const int kFatalExitCode = 111;
394  static const int kSignaledExitCode = 256;
395  static const int kDownExitCode = 125;
396  static const DWORD kCommandTimeout = 5000;
397  static const DWORD kBusyWait = 1000;
398  static const DWORD kRunFailedWait = 10000;
400  static constexpr const char kMutexName[10] = "supervise";
401  static constexpr const char kRunFile[4] = "run";
403  static constexpr const char kFinishFile[7] = "finish";
404  static constexpr const char kDownFile[5] = "down";
405  static constexpr const char kEnvDir[4] = "env";
407  static constexpr const char kTimeoutFinishFile[15] = "timeout-finish";
409  static constexpr const char kTimeoutGroup[10] = "supervise";
411  static constexpr const char kRunExitCodeEnvName[24] =
412  "SUPERVISE_RUN_EXIT_CODE";
413 
421  const fs::path& service_dir) : multiplexer(multiplexer),
422  mutex(service_dir, kMutexName), service_dir(service_dir) {
423  state.is_run_process = true;
424  state.is_up = false;
425  state.initially_up = true;
426  state.up_count = 0;
427  state.remaining_count = -1;
428  state.exit_code = 0;
429  state.pid = 0;
430 
431  multiplexer->AddInitCallback([this](winss::WaitMultiplexer&) {
432  this->Init();
433  });
434 
435  multiplexer->AddStopCallback([this](winss::WaitMultiplexer&) {
436  this->Stop();
437  });
438  }
439 
440  SuperviseTmpl(const SuperviseTmpl&) = delete;
441  SuperviseTmpl(SuperviseTmpl&&) = delete;
448  virtual const SuperviseState& GetState() const {
449  return state;
450  }
451 
457  virtual void AddListener(
459  listeners.push_back(listener);
460  }
461 
465  virtual void Up() {
466  if (!mutex.HasLock() || exiting) {
467  return;
468  }
469 
470  VLOG(3) << "Start supervised process if not started";
471  state.remaining_count = -1;
472  if (!state.is_up) {
473  Triggered(false);
474  }
475  }
476 
481  virtual void Once() {
482  if (!mutex.HasLock() || exiting) {
483  return;
484  }
485 
486  VLOG(3) << "Run supervised process once";
487  if (!state.is_up) {
488  state.remaining_count = 1;
489  Triggered(false);
490  } else {
491  state.remaining_count = 0;
492  }
493  }
494 
498  virtual void OnceAtMost() {
499  if (!mutex.HasLock() || exiting) {
500  return;
501  }
502 
503  VLOG(3) << "Run supervised process at least once";
504  state.remaining_count = 0;
505  }
506 
510  virtual void Down() {
511  if (!mutex.HasLock()) {
512  return;
513  }
514 
515  state.remaining_count = 0;
516  Term();
517  }
518 
522  virtual void Kill() {
523  if (!mutex.HasLock()) {
524  return;
525  }
526 
527  VLOG(3) << "Kill supervised process if not stopped";
528  if (state.is_up && state.is_run_process) {
529  process.Terminate();
530  }
531  }
532 
536  virtual void Term() {
537  if (!mutex.HasLock()) {
538  return;
539  }
540 
541  VLOG(3) << "Stop supervised process if not stopped";
542  if (state.is_up && state.is_run_process) {
543  process.SendBreak();
544  }
545  }
546 
550  virtual void Exit() {
551  if (!mutex.HasLock() || exiting) {
552  return;
553  }
554 
555  VLOG(3) << "Exit supervisor when supervised process goes down";
556  state.remaining_count = 0;
557  exiting = 1;
558 
559  if (!state.is_up) {
560  Triggered(false);
561  }
562  }
563 
564  SuperviseTmpl& operator=(const SuperviseTmpl&) = delete;
565  SuperviseTmpl& operator=(SuperviseTmpl&&) = delete;
566 };
567 
572 } // namespace winss
573 
574 #endif // LIB_WINSS_SUPERVISE_SUPERVISE_HPP_
SuperviseTmpl(winss::NotOwningPtr< winss::WaitMultiplexer > multiplexer, const fs::path &service_dir)
Supervise constructor.
Definition: supervise.hpp:420
A wrapper for a Windows HANDLE.
Definition: handle_wrapper.hpp:39
The supervisor class template.
Definition: supervise.hpp:95
virtual void OnceAtMost()
Signals the supervisor to only run one if it is already running.
Definition: supervise.hpp:498
Run process has started.
Definition: supervise.hpp:60
Parameters to start a Windows process.
Definition: process.hpp:29
A directory where each file is an environment variable.
Definition: environment.hpp:58
bool is_run_process
Definition: supervise.hpp:45
#define WINDOWS
Definition: windows_interface.hpp:25
DWORD pid
Definition: supervise.hpp:51
Unknown notification.
Definition: supervise.hpp:58
virtual void Up()
Signals the supervisor to go into the up state.
Definition: supervise.hpp:465
virtual void Init()
Initializes the supervisor.
Definition: supervise.hpp:111
virtual bool IsStopping() const
Gets if the multiplexer is stopping.
Definition: wait_multiplexer.cpp:201
virtual void Down()
Signals the supervisor to be in the down state.
Definition: supervise.hpp:510
virtual void Stop()
Signal the supervisor to exit.
Definition: supervise.hpp:382
std::chrono::system_clock::time_point time
Definition: supervise.hpp:43
virtual bool Complete()
Tests exiting value.
Definition: supervise.hpp:362
virtual void AddStopCallback(Callback callback)
Add a stop callback.
Definition: wait_multiplexer.cpp:60
Definition: case_ignore.hpp:23
int up_count
Definition: supervise.hpp:48
Supervisor starting.
Definition: supervise.hpp:59
bool is_up
Definition: supervise.hpp:46
std::vector< winss::NotOwningPtr< winss::SuperviseListener > > listeners
The supervisor listeners.
Definition: supervise.hpp:104
virtual void AddTimeoutCallback(DWORD timeout, Callback callback, std::string group="")
Add a timeout item which given the timeout period will call the callback if it is not removed before ...
Definition: wait_multiplexer.cpp:50
virtual void AddTriggeredCallback(const winss::HandleWrapper &handle, TriggeredCallback callback)
Add a triggered callback for when an event happens on the given handle.
Definition: wait_multiplexer.cpp:43
virtual ~SuperviseListener()
Default virtual destructor.
Definition: supervise.hpp:85
winss::NotOwningPtr< winss::WaitMultiplexer > multiplexer
The event multiplexer for the supervisor.
Definition: supervise.hpp:98
fs::path service_dir
The service directory.
Definition: supervise.hpp:101
virtual void Once()
Signals the supervisor to be in the up state and when the process exits then leave it down...
Definition: supervise.hpp:481
std::chrono::system_clock::time_point last
Definition: supervise.hpp:44
virtual void AddListener(winss::NotOwningPtr< winss::SuperviseListener > listener)
Adds a supervisor listener to the list of listeners.
Definition: supervise.hpp:457
virtual void Triggered(bool timeout)
Event triggered handler.
Definition: supervise.hpp:287
The state of the supervisor.
Definition: supervise.hpp:42
int remaining_count
Definition: supervise.hpp:49
virtual bool StartFinish()
Starts the finish file process.
Definition: supervise.hpp:225
int exit_code
Definition: supervise.hpp:50
Supervisor exiting.
Definition: supervise.hpp:64
winss::Environment * env
The process environment.
Definition: process.hpp:36
Run process has ended.
Definition: supervise.hpp:61
virtual DWORD GetFinishTimeout() const
Gets the finish timeout value.
Definition: supervise.hpp:150
A HANDLE wait multiplexer.
Definition: wait_multiplexer.hpp:70
virtual void Stop(int code)
Stops the multiplexer with the given code if one has not already been set.
Definition: wait_multiplexer.cpp:189
virtual void NotifyAll(winss::SuperviseNotification notification)
Notify all the listeners with the given event.
Definition: supervise.hpp:258
virtual void AddInitCallback(Callback callback)
Add an initialization callback.
Definition: wait_multiplexer.cpp:37
#define FILESYSTEM
Definition: filesystem_interface.hpp:26
TProcess process
The supervised process.
Definition: supervise.hpp:100
The supervisor listener.
Definition: supervise.hpp:70
Definition: supervise.hpp:62
Permanent failure.
Definition: supervise.hpp:63
SuperviseNotification
The supervisor events which can occur.
Definition: supervise.hpp:57
virtual bool StartRun()
Starts the run file process.
Definition: supervise.hpp:190
SuperviseTmpl< winss::PathMutex, winss::Process > Supervise
Concrete supervise implementation.
Definition: supervise.hpp:571
TMutex mutex
The supervisor global mutex.
Definition: supervise.hpp:99
virtual bool RemoveTimeoutCallback(std::string group)
Removes the timeout call back for the given group.
Definition: wait_multiplexer.cpp:77
virtual void Kill()
Kills the supervised process.
Definition: supervise.hpp:522
virtual void Exit()
Signals the supervisor to exit.
Definition: supervise.hpp:550
virtual bool Start(const std::string &file_name)
Starts the process defined in the given file.
Definition: supervise.hpp:166
virtual void Term()
Sends a CTRL+BREAK to the supervised process.
Definition: supervise.hpp:536
bool initially_up
Definition: supervise.hpp:47
virtual const SuperviseState & GetState() const
Gets the current supervisor state.
Definition: supervise.hpp:448
static std::string ExpandEnvironmentVariables(const std::string &value)
Expand the given string with environment variables.
Definition: utils.cpp:31