winss
pipe_client.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2016-2017 Morgan Stanley
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef LIB_WINSS_PIPE_CLIENT_HPP_
18 #define LIB_WINSS_PIPE_CLIENT_HPP_
19 
20 #include <windows.h>
21 #include <functional>
22 #include <vector>
23 #include "easylogging/easylogging++.hpp"
24 #include "wait_multiplexer.hpp"
25 #include "pipe_name.hpp"
26 #include "handle_wrapper.hpp"
27 #include "pipe_server.hpp"
28 #include "not_owning_ptr.hpp"
29 
30 namespace winss {
38 };
39 
48 template<typename TInstance, typename TListener>
49 class PipeClient {
50  protected:
51  bool stopping = false;
53  TInstance instance;
58  std::vector<winss::NotOwningPtr<TListener>> listeners;
59 
63  virtual void Triggered() {}
64 
68  virtual void Connected() {}
69 
75  void TellAll(
76  const std::function<bool(TListener&)>& func) {
77  auto it = listeners.begin();
78  while (it != listeners.end()) {
79  if (func(**it)) {
80  ++it;
81  } else {
82  it = listeners.erase(it);
83  }
84  }
85  }
86 
92  void Triggered(const winss::HandleWrapper& handle) {
93  if (handle == instance.GetHandle()) {
94  winss::OverlappedResult result = instance.GetOverlappedResult();
95  if (result == REMOVE) {
96  if (instance.Close()) {
97  TellAll([](TListener& listener) {
98  return listener.Disconnected();
99  });
100  }
101  return;
102  }
103 
104  multiplexer->AddTriggeredCallback(handle, [this](
106  Triggered(handle);
107  });
108 
109  if (result != SKIP) {
110  Triggered();
111  }
112  }
113  }
114 
115  public:
121  explicit PipeClient(const PipeClientConfig& config) :
122  pipe_name(config.pipe_name), multiplexer(config.multiplexer) {
123  }
124 
125  PipeClient(const PipeClient&) = delete;
126  PipeClient(PipeClient&&) = delete;
134  listeners.push_back(listener);
135  }
136 
142  virtual bool IsStopping() const {
143  return stopping;
144  }
145 
149  virtual void Connect() {
150  if (!stopping && !instance.IsConnected()) {
151  multiplexer->AddStopCallback([this](winss::WaitMultiplexer&) {
152  this->Stop();
153  });
154 
155  if (instance.CreateFile(pipe_name)) {
156  multiplexer->AddTriggeredCallback(instance.GetHandle(),
157  [this](winss::WaitMultiplexer&,
158  const winss::HandleWrapper& handle) {
159  this->Triggered(handle);
160  });
161  if (instance.SetConnected()) {
162  Connected();
163  }
164  } else {
165  TellAll([](TListener& listener) {
166  return listener.Disconnected();
167  });
168  }
169  }
170  }
171 
175  virtual void Stop() {
176  if (!stopping) {
177  stopping = true;
178  instance.Closing();
179  }
180  }
181 
182  PipeClient& operator=(const PipeClient&) = delete;
183  PipeClient& operator=(PipeClient&&) = delete;
188  virtual ~PipeClient() {
189  if (instance.Close()) {
190  TellAll([](TListener& listener) {
191  return listener.Disconnected();
192  });
193  }
194  }
195 };
196 
201  public:
207  virtual bool Connected() = 0;
208 
214  virtual bool Disconnected() = 0;
215 
218 };
219 
224  public:
230  virtual bool WriteComplete() = 0;
231 
234 };
235 
240  public:
247  virtual bool Received(const std::vector<char>& message) = 0;
248 
251 };
252 
258 template<typename TInstance>
259 class OutboundPipeClientTmpl : public PipeClient<TInstance,
260  PipeClientSendListener> {
261  private:
265  void Connected() {
266  instance.Read();
267 
268  TellAll([](winss::PipeClientSendListener& listener) {
269  return listener.Connected();
270  });
271  }
272 
276  void Triggered() {
277  if (instance.FinishWrite()) {
278  instance.Write();
279  } else {
280  bool was_writting = instance.IsWriting();
281 
282  instance.Read();
283 
284  if (was_writting) {
285  TellAll([](winss::PipeClientSendListener& listener) {
286  return listener.WriteComplete();
287  });
288  }
289  }
290  }
291 
292  public:
298  explicit OutboundPipeClientTmpl(const PipeClientConfig& config) :
300  ::PipeClient(config) {}
301 
306 
315  virtual bool Send(const std::vector<char>& data) {
316  if (instance.Queue(data)) {
317  return instance.Write();
318  }
319 
320  return false;
321  }
322 
324  OutboundPipeClientTmpl& operator=(const OutboundPipeClientTmpl&) = delete;
326  OutboundPipeClientTmpl& operator=(OutboundPipeClientTmpl&&) = delete;
327 };
328 
333 
343 template<typename TInstance>
344 class InboundPipeClientTmpl : public PipeClient<TInstance,
345  PipeClientReceiveListener> {
346  private:
347  bool handshake = false;
352  void Connected() {
353  instance.Read();
354  }
355 
359  void Triggered() {
360  if (instance.FinishRead()) {
361  Notify();
362  }
363 
364  instance.Read();
365  }
366 
370  void Notify() {
371  std::vector<char> buff = instance.SwapBuffer();
372 
373  if (!handshake) {
374  auto pos = std::find(buff.begin(), buff.end(), 0);
375  if (pos != buff.end()) {
376  VLOG(6) << "Inbound pipe handshake complete";
377  handshake = true;
378 
379  TellAll([](winss::PipeClientReceiveListener& listener) {
380  return listener.Connected();
381  });
382 
383  buff.erase(buff.begin(), pos);
384  if (buff.empty()) {
385  return;
386  }
387  } else {
388  VLOG(1)
389  << "Inbound pipe handshake failed (expected null byte)";
390  return;
391  }
392  }
393 
394  TellAll([buff](PipeClientReceiveListener& listener) {
395  return listener.Received(buff);
396  });
397  }
398 
399  public:
405  explicit InboundPipeClientTmpl(const PipeClientConfig& config) :
407  ::PipeClient(config) {}
408 
413 
415  InboundPipeClientTmpl& operator=(const InboundPipeClientTmpl&) = delete;
417  InboundPipeClientTmpl& operator=(InboundPipeClientTmpl&&) = delete;
418 };
419 
424 } // namespace winss
425 
426 #endif // LIB_WINSS_PIPE_CLIENT_HPP_
virtual ~PipeClientConnectionListener()
Default destructor.
Definition: pipe_client.hpp:217
virtual void Connected()
Called when the client is connected.
Definition: pipe_client.hpp:68
A wrapper for a Windows HANDLE.
Definition: handle_wrapper.hpp:39
winss::PipeName pipe_name
The pipe instance.
Definition: pipe_client.hpp:54
OverlappedResult
The result of the overlapped operation.
Definition: pipe_instance.hpp:30
InboundPipeClientTmpl(const PipeClientConfig &config)
Creates an inbound pipe client with the given config.
Definition: pipe_client.hpp:405
virtual bool IsStopping() const
Gets if the pipe client is stopping.
Definition: pipe_client.hpp:142
virtual void Stop()
Stop the pipe client.
Definition: pipe_client.hpp:175
InboundPipeClientTmpl< winss::InboundPipeInstance > InboundPipeClient
A concrete inbound pipe client.
Definition: pipe_client.hpp:423
OutboundPipeClientTmpl(const PipeClientConfig &config)
Creates an outbound pipe client with the given config.
Definition: pipe_client.hpp:298
virtual ~PipeClientSendListener()
Default destructor.
Definition: pipe_client.hpp:233
A listener for pipe client received data events.
Definition: pipe_client.hpp:239
void TellAll(const std::function< bool(TListener &)> &func)
Call a function against all listeners.
Definition: pipe_client.hpp:75
An outbound pipe client.
Definition: pipe_client.hpp:259
winss::NotOwningPtr< winss::WaitMultiplexer > multiplexer
The event multiplexer for the named pipe client.
Definition: pipe_client.hpp:37
Pipe names are based on file system paths.
Definition: pipe_name.hpp:32
virtual void AddStopCallback(Callback callback)
Add a stop callback.
Definition: wait_multiplexer.cpp:60
virtual void Connect()
Start the connection process to the pipe server.
Definition: pipe_client.hpp:149
Definition: case_ignore.hpp:23
virtual bool Connected()=0
Called when the pipe client is connected.
virtual bool WriteComplete()=0
Called when the pipe client has finished sending data.
virtual void AddTriggeredCallback(const winss::HandleWrapper &handle, TriggeredCallback callback)
Add a triggered callback for when an event happens on the given handle.
Definition: wait_multiplexer.cpp:43
PipeClient(const PipeClientConfig &config)
Creates a pipe client with the given config.
Definition: pipe_client.hpp:121
An inbound pipe client.
Definition: pipe_client.hpp:344
Config for a named pipe client.
Definition: pipe_client.hpp:34
Wait till next result.
Definition: pipe_instance.hpp:33
virtual void AddListener(winss::NotOwningPtr< TListener > listener)
Add a listener to the client.
Definition: pipe_client.hpp:133
winss::NotOwningPtr< winss::WaitMultiplexer > multiplexer
The event multiplexer for the pipe client.
Definition: pipe_client.hpp:56
OutboundPipeClientTmpl< winss::OutboundPipeInstance > OutboundPipeClient
A concrete outbound pipe client.
Definition: pipe_client.hpp:332
TInstance instance
Marked if stopping the client.
Definition: pipe_client.hpp:53
A HANDLE wait multiplexer.
Definition: wait_multiplexer.hpp:70
void Triggered(const winss::HandleWrapper &handle)
Event handler for the pipe client.
Definition: pipe_client.hpp:92
A listener for pipe client connection events.
Definition: pipe_client.hpp:200
std::vector< winss::NotOwningPtr< TListener > > listeners
Listeners for the pipe client.
Definition: pipe_client.hpp:58
winss::PipeName pipe_name
The name of the named pipe.
Definition: pipe_client.hpp:35
virtual bool Received(const std::vector< char > &message)=0
Called when the pipe client has received data.
Close client.
Definition: pipe_instance.hpp:31
A listener for pipe client send complete events.
Definition: pipe_client.hpp:223
Base named pipe client.
Definition: pipe_client.hpp:49
virtual ~PipeClient()
Close the pipe client and notify listeners.
Definition: pipe_client.hpp:188
virtual void Triggered()
Called when an event is triggered.
Definition: pipe_client.hpp:63
virtual bool Send(const std::vector< char > &data)
Sends the given list of bytes to the pipe server.
Definition: pipe_client.hpp:315
virtual ~PipeClientReceiveListener()
Default destructor.
Definition: pipe_client.hpp:250