winss
pipe_server.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2016-2017 Morgan Stanley
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef LIB_WINSS_PIPE_SERVER_HPP_
18 #define LIB_WINSS_PIPE_SERVER_HPP_
19 
20 #include <windows.h>
21 #include <vector>
22 #include <utility>
23 #include <map>
24 #include "easylogging/easylogging++.hpp"
25 #include "wait_multiplexer.hpp"
26 #include "pipe_name.hpp"
27 #include "pipe_instance.hpp"
28 #include "handle_wrapper.hpp"
29 #include "not_owning_ptr.hpp"
30 
31 namespace winss {
38 };
39 
47 template<typename TPipeInstance>
48 class PipeServer {
49  protected:
50  bool stopping = false;
51  bool open = false;
54  std::map<winss::HandleWrapper, TPipeInstance> instances;
62  void StartClient() {
63  while (!stopping && !open) {
64  TPipeInstance instance;
65  if (instance.CreateNamedPipe(pipe_name)) {
66  winss::HandleWrapper handle = instance.GetHandle();
67  instances.emplace(handle, std::move(instance));
68  multiplexer->AddTriggeredCallback(handle, [this](
70  this->Triggered(h);
71  });
72  open = true;
73  VLOG(6) << "Pipe server clients: " << instances.size();
74  }
75  }
76  }
77 
81  void Stop() {
82  if (!stopping) {
83  stopping = true;
84  for (auto it = instances.begin(); it != instances.end(); ++it) {
85  if (!it->second.IsConnected()) {
86  it->second.Closing();
87  }
88  }
89  }
90  }
91 
97  void Triggered(const winss::HandleWrapper& handle) {
98  auto it = instances.find(handle);
99  if (it != instances.end()) {
100  winss::OverlappedResult result = it->second.GetOverlappedResult();
101  if (result == REMOVE) {
102  if (!it->second.IsConnected()) {
103  VLOG(1) << "Pipe server client did not connect (closing)";
104  open = false;
105  }
106  it->second.DisconnectNamedPipe();
107  it->second.Close();
108  instances.erase(it);
109  VLOG(6) << "Pipe server clients: " << instances.size();
110  StartClient();
111  return;
112  }
113 
114  multiplexer->AddTriggeredCallback(handle, [this](
116  this->Triggered(h);
117  });
118 
119  if (result == SKIP) {
120  return;
121  }
122 
123  if (it->second.SetConnected()) {
124  Connected(&it->second);
125  open = false;
126  StartClient();
127  } else {
128  Triggered(&it->second);
129  }
130  } else {
131  VLOG(6) << "Pipe instance not found";
132  }
133  }
134 
140  virtual void Connected(TPipeInstance* instance) {}
141 
147  virtual void Triggered(TPipeInstance* instance) {}
148 
149  public:
155  explicit PipeServer(const PipeServerConfig& config) :
156  pipe_name(config.pipe_name), multiplexer(config.multiplexer) {
157  multiplexer->AddInitCallback([this](winss::WaitMultiplexer&) {
158  this->StartClient();
159  });
160 
161  multiplexer->AddStopCallback([this](winss::WaitMultiplexer&) {
162  this->Stop();
163  });
164  }
165 
166  PipeServer(const PipeServer&) = delete;
167  PipeServer(PipeServer&&) = delete;
174  virtual bool IsAccepting() const {
175  return open;
176  }
177 
183  virtual bool IsStopping() const {
184  return stopping;
185  }
186 
192  virtual size_t InstanceCount() const {
193  return instances.size();
194  }
195 
196  PipeServer& operator=(const PipeServer&) = delete;
197  PipeServer& operator=(PipeServer&&) = delete;
202  virtual ~PipeServer() {
203  for (auto it = instances.begin(); it != instances.end(); ++it) {
204  it->second.DisconnectNamedPipe();
205  it->second.Close();
206  }
207  }
208 };
209 
215 template<typename TPipeInstance>
216 class OutboundPipeServerTmpl : public PipeServer<TPipeInstance> {
217  private:
225  void Connected(TPipeInstance* instance) {
226  Send(instance, { 0 }); // Send null char to signal connected
227  }
228 
234  void Triggered(TPipeInstance* instance) {
235  if (instance->FinishWrite()) {
236  instance->Write();
237  } else {
238  instance->Read();
239  }
240  }
241 
249  bool Send(TPipeInstance* instance, const std::vector<char>& data) {
250  if (instance->Queue(data)) {
251  return instance->Write();
252  }
253 
254  return false;
255  }
256 
257  public:
258  explicit OutboundPipeServerTmpl(const PipeServerConfig& config) :
259  winss::PipeServer<TPipeInstance>::PipeServer(config) {}
260 
265 
271  virtual bool Send(const std::vector<char>& data) {
272  bool sent = true;
273  for (auto it = instances.begin(); it != instances.end(); ++it) {
274  if (!Send(&it->second, data)) {
275  sent = false;
276  }
277  }
278 
279  return sent;
280  }
281 
283  OutboundPipeServerTmpl& operator=(const OutboundPipeServerTmpl&) = delete;
285  OutboundPipeServerTmpl& operator=(OutboundPipeServerTmpl&&) = delete;
286 };
287 
292 
297  public:
303  virtual bool Received(const std::vector<char>& data) = 0;
304 
307 };
308 
314 template<typename TPipeInstance>
315 class InboundPipeServerTmpl : public PipeServer<TPipeInstance> {
316  private:
317  std::vector<winss::NotOwningPtr<PipeServerReceiveListener>> listeners;
318 
326  void Connected(TPipeInstance* instance) {
327  instance->Read();
328  }
329 
335  void Triggered(TPipeInstance* instance) {
336  if (instance->FinishRead()) {
337  Notify(instance);
338  }
339 
340  instance->Read();
341  }
342 
346  void Notify(TPipeInstance* instance) {
347  std::vector<char> buff = instance->SwapBuffer();
348 
349  if (!buff.empty()) {
350  auto it = listeners.begin();
351  while (it != listeners.end()) {
352  if ((*it)->Received(buff)) {
353  ++it;
354  } else {
355  it = listeners.erase(it);
356  }
357  }
358  }
359  }
360 
361  public:
362  explicit InboundPipeServerTmpl(const PipeServerConfig& config) :
363  winss::PipeServer<TPipeInstance>::PipeServer(config) {}
364 
369 
373  virtual void AddListener(
375  listeners.push_back(listener);
376  }
377 
379  InboundPipeServerTmpl& operator=(const InboundPipeServerTmpl&) = delete;
381  InboundPipeServerTmpl& operator=(InboundPipeServerTmpl&&) = delete;
382 };
383 
388 } // namespace winss
389 
390 #endif // LIB_WINSS_PIPE_SERVER_HPP_
A wrapper for a Windows HANDLE.
Definition: handle_wrapper.hpp:39
InboundPipeServerTmpl(const PipeServerConfig &config)
Definition: pipe_server.hpp:362
virtual size_t InstanceCount() const
Gets the current instance count.
Definition: pipe_server.hpp:192
OverlappedResult
The result of the overlapped operation.
Definition: pipe_instance.hpp:30
A listener for server receiving data from pipe clients.
Definition: pipe_server.hpp:296
void StartClient()
Open a new named pipe for a new client to connect to.
Definition: pipe_server.hpp:62
virtual void AddListener(winss::NotOwningPtr< PipeServerReceiveListener > listener)
Add a listener to the pipe server for receive events.
Definition: pipe_server.hpp:373
winss::PipeName pipe_name
Definition: pipe_server.hpp:36
winss::NotOwningPtr< winss::WaitMultiplexer > multiplexer
The event multiplexer for the named pipe server.
Definition: pipe_server.hpp:56
virtual ~PipeServer()
Disconnect all clients and close all connections.
Definition: pipe_server.hpp:202
Pipe names are based on file system paths.
Definition: pipe_name.hpp:32
virtual void AddStopCallback(Callback callback)
Add a stop callback.
Definition: wait_multiplexer.cpp:60
An inbound pipe server.
Definition: pipe_server.hpp:315
Definition: case_ignore.hpp:23
virtual bool Send(const std::vector< char > &data)
Send the given data to all instances.
Definition: pipe_server.hpp:271
virtual void AddTriggeredCallback(const winss::HandleWrapper &handle, TriggeredCallback callback)
Add a triggered callback for when an event happens on the given handle.
Definition: wait_multiplexer.cpp:43
OutboundPipeServerTmpl< winss::OutboundPipeInstance > OutboundPipeServer
Concrete outbound pipe server implementation.
Definition: pipe_server.hpp:291
Config for a named pipe server.
Definition: pipe_server.hpp:35
Wait till next result.
Definition: pipe_instance.hpp:33
void Stop()
Stop the pipe server.
Definition: pipe_server.hpp:81
virtual void Triggered(TPipeInstance *instance)
Called when an event is triggered.
Definition: pipe_server.hpp:147
winss::NotOwningPtr< winss::WaitMultiplexer > multiplexer
Definition: pipe_server.hpp:37
std::map< winss::HandleWrapper, TPipeInstance > instances
A mapping of handles to instances.
Definition: pipe_server.hpp:54
InboundPipeServerTmpl< winss::InboundPipeInstance > InboundPipeServer
Concrete inbound pipe server implementation.
Definition: pipe_server.hpp:387
Base named pipe server.
Definition: pipe_server.hpp:48
A HANDLE wait multiplexer.
Definition: wait_multiplexer.hpp:70
winss::PipeName pipe_name
The name of the pipe.
Definition: pipe_server.hpp:57
virtual void AddInitCallback(Callback callback)
Add an initialization callback.
Definition: wait_multiplexer.cpp:37
An outbound pipe server.
Definition: pipe_server.hpp:216
virtual ~PipeServerReceiveListener()
Default destructor.
Definition: pipe_server.hpp:306
virtual bool IsStopping() const
Gets if the pipe server is stopping.
Definition: pipe_server.hpp:183
virtual void Connected(TPipeInstance *instance)
Called when a client is connected.
Definition: pipe_server.hpp:140
PipeServer(const PipeServerConfig &config)
Create a new pipe instance with the given config.
Definition: pipe_server.hpp:155
Close client.
Definition: pipe_instance.hpp:31
virtual bool IsAccepting() const
Gets if the pipe server is accepting a new connection.
Definition: pipe_server.hpp:174
void Triggered(const winss::HandleWrapper &handle)
The event handler for the pipe server.
Definition: pipe_server.hpp:97
OutboundPipeServerTmpl(const PipeServerConfig &config)
Definition: pipe_server.hpp:258