hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
zmq.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2009-2011 250bpm s.r.o.
3  Copyright (c) 2011 Botond Ballo
4  Copyright (c) 2007-2009 iMatix Corporation
5 
6  Permission is hereby granted, free of charge, to any person obtaining a copy
7  of this software and associated documentation files (the "Software"), to
8  deal in the Software without restriction, including without limitation the
9  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  sell copies of the Software, and to permit persons to whom the Software is
11  furnished to do so, subject to the following conditions:
12 
13  The above copyright notice and this permission notice shall be included in
14  all copies or substantial portions of the Software.
15 
16  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  IN THE SOFTWARE.
23 */
24 
25 #ifndef __ZMQ_HPP_INCLUDED__
26 #define __ZMQ_HPP_INCLUDED__
27 
28 #include <zmq.h>
29 
30 #include <algorithm>
31 #include <cassert>
32 #include <cstring>
33 #include <exception>
34 
35 // Detect whether the compiler supports C++11 rvalue references.
36 #if (defined(__GNUC__) && (__GNUC__ > 4 || \
37  (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) && \
38  defined(__GXX_EXPERIMENTAL_CXX0X__))
39  #define ZMQ_HAS_RVALUE_REFS
40  #define ZMQ_DELETED_FUNCTION = delete
41 #elif defined(__clang__)
42  #if __has_feature(cxx_rvalue_references)
43  #define ZMQ_HAS_RVALUE_REFS
44  #endif
45 
46  #if __has_feature(cxx_deleted_functions)
47  #define ZMQ_DELETED_FUNCTION = delete
48  #endif
49 #elif defined(_MSC_VER) && (_MSC_VER >= 1600)
50  #define ZMQ_HAS_RVALUE_REFS
51  #define ZMQ_DELETED_FUNCTION
52 #else
53  #define ZMQ_DELETED_FUNCTION
54 #endif
55 
56 // In order to prevent unused variable warnings when building in non-debug
57 // mode use this macro to make assertions.
58 #ifndef NDEBUG
59 # define ZMQ_ASSERT(expression) assert(expression)
60 #else
61 # define ZMQ_ASSERT(expression) (expression)
62 #endif
63 
64 namespace zmq
65 {
66 
67  typedef zmq_free_fn free_fn;
68  typedef zmq_pollitem_t pollitem_t;
69 
70  class error_t : public std::exception
71  {
72  public:
73 
74  error_t () : errnum (zmq_errno ()) {}
75 
76  virtual const char *what () const throw ()
77  {
78  return zmq_strerror (errnum);
79  }
80 
81  int num () const
82  {
83  return errnum;
84  }
85 
86  private:
87 
88  int errnum;
89  };
90 
91  inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1)
92  {
93  int rc = zmq_poll (items_, nitems_, timeout_);
94  if (rc < 0)
95  throw error_t ();
96  return rc;
97  }
98 
99  inline void version (int *major_, int *minor_, int *patch_)
100  {
101  zmq_version (major_, minor_, patch_);
102  }
103 
104  class message_t
105  {
106  friend class socket_t;
107 
108  public:
109 
110  inline message_t ()
111  {
112  int rc = zmq_msg_init (&msg);
113  if (rc != 0)
114  throw error_t ();
115  }
116 
117  inline explicit message_t (size_t size_)
118  {
119  int rc = zmq_msg_init_size (&msg, size_);
120  if (rc != 0)
121  throw error_t ();
122  }
123 
124  inline message_t (void *data_, size_t size_, free_fn *ffn_,
125  void *hint_ = NULL)
126  {
127  int rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_);
128  if (rc != 0)
129  throw error_t ();
130  }
131 
132 #ifdef ZMQ_HAS_RVALUE_REFS
133  inline message_t (message_t &&rhs) : msg (rhs.msg)
134  {
135  int rc = zmq_msg_init (&rhs.msg);
136  if (rc != 0)
137  throw error_t ();
138  }
139 
140  inline message_t &operator = (message_t &&rhs)
141  {
142  std::swap (msg, rhs.msg);
143  return *this;
144  }
145 #endif
146 
147  inline ~message_t ()
148  {
149  int rc = zmq_msg_close (&msg);
150  ZMQ_ASSERT (rc == 0);
151  }
152 
153  inline void rebuild ()
154  {
155  int rc = zmq_msg_close (&msg);
156  if (rc != 0)
157  throw error_t ();
158  rc = zmq_msg_init (&msg);
159  if (rc != 0)
160  throw error_t ();
161  }
162 
163  inline void rebuild (size_t size_)
164  {
165  int rc = zmq_msg_close (&msg);
166  if (rc != 0)
167  throw error_t ();
168  rc = zmq_msg_init_size (&msg, size_);
169  if (rc != 0)
170  throw error_t ();
171  }
172 
173  inline void rebuild (void *data_, size_t size_, free_fn *ffn_,
174  void *hint_ = NULL)
175  {
176  int rc = zmq_msg_close (&msg);
177  if (rc != 0)
178  throw error_t ();
179  rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_);
180  if (rc != 0)
181  throw error_t ();
182  }
183 
184  inline void move (message_t *msg_)
185  {
186  int rc = zmq_msg_move (&msg, &(msg_->msg));
187  if (rc != 0)
188  throw error_t ();
189  }
190 
191  inline void copy (message_t *msg_)
192  {
193  int rc = zmq_msg_copy (&msg, &(msg_->msg));
194  if (rc != 0)
195  throw error_t ();
196  }
197 
198  inline void *data ()
199  {
200  return zmq_msg_data (&msg);
201  }
202 
203  inline const void* data () const
204  {
205  return zmq_msg_data (const_cast<zmq_msg_t*>(&msg));
206  }
207 
208  inline size_t size () const
209  {
210  return zmq_msg_size (const_cast<zmq_msg_t*>(&msg));
211  }
212 
213  private:
214 
215  // The underlying message
216  zmq_msg_t msg;
217 
218  // Disable implicit message copying, so that users won't use shared
219  // messages (less efficient) without being aware of the fact.
220  message_t (const message_t&);
221  void operator = (const message_t&);
222  };
223 
224  class context_t
225  {
226  friend class socket_t;
227 
228  public:
229 
230  inline explicit context_t (int io_threads_)
231  {
232  ptr = zmq_init (io_threads_);
233  if (ptr == NULL)
234  throw error_t ();
235  }
236 
237 #ifdef ZMQ_HAS_RVALUE_REFS
238  inline context_t (context_t &&rhs) : ptr (rhs.ptr)
239  {
240  rhs.ptr = NULL;
241  }
242  inline context_t &operator = (context_t &&rhs)
243  {
244  std::swap (ptr, rhs.ptr);
245  return *this;
246  }
247 #endif
248 
249  inline ~context_t ()
250  {
251  close();
252  }
253 
254  inline void close()
255  {
256  if (ptr == NULL)
257  return;
258  int rc = zmq_term (ptr);
259  ZMQ_ASSERT (rc == 0);
260  ptr = NULL;
261  }
262 
263  // Be careful with this, it's probably only useful for
264  // using the C api together with an existing C++ api.
265  // Normally you should never need to use this.
266  inline operator void* () const //bgv const added
267  {
268  return ptr;
269  }
270 
271  private:
272 
273  void *ptr;
274 
275  context_t (const context_t&);
276  void operator = (const context_t&);
277  };
278 
279  class socket_t
280  {
281  public:
282 
283  inline socket_t (context_t &context_, int type_)
284  {
285  ptr = zmq_socket (context_.ptr, type_);
286  if (ptr == NULL)
287  throw error_t ();
288  }
289 
290 #ifdef ZMQ_HAS_RVALUE_REFS
291  inline socket_t(socket_t&& rhs) : ptr(rhs.ptr)
292  {
293  rhs.ptr = NULL;
294  }
295  inline socket_t& operator=(socket_t&& rhs)
296  {
297  std::swap(ptr, rhs.ptr);
298  return *this;
299  }
300 #endif
301 
302  inline ~socket_t ()
303  {
304  close();
305  }
306 
307  inline operator void* () const //bgv const added
308  {
309  return ptr;
310  }
311 
312  inline void close()
313  {
314  if(ptr == NULL)
315  // already closed
316  return ;
317 
318  int lingerPeriod = 0;
319  int rc = zmq_setsockopt (ptr, ZMQ_LINGER, &lingerPeriod, sizeof(lingerPeriod)); //set linger period added by alexv
320  ZMQ_ASSERT (rc == 0);
321 
322  rc = zmq_close (ptr);
323  ZMQ_ASSERT (rc == 0);
324  ptr = 0 ;
325  }
326 
327  inline void setsockopt (int option_, const void *optval_,
328  size_t optvallen_)
329  {
330  int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_);
331  if (rc != 0)
332  throw error_t ();
333  }
334 
335  inline void getsockopt (int option_, void *optval_,
336  size_t *optvallen_)
337  {
338  int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_);
339  if (rc != 0)
340  throw error_t ();
341  }
342 
343  inline void bind (const char *addr_)
344  {
345  int rc = zmq_bind (ptr, addr_);
346  if (rc != 0)
347  throw error_t ();
348  }
349 
350  inline void connect (const char *addr_)
351  {
352  int rc = zmq_connect (ptr, addr_);
353  if (rc != 0)
354  throw error_t ();
355  }
356 
357  inline bool connected() const //bgv const added
358  {
359  return(ptr != NULL);
360  }
361 
362  inline size_t send (const void *buf_, size_t len_, int flags_ = 0)
363  {
364  int nbytes = 0;
365  do
366  {
367  nbytes = zmq_send (ptr, buf_, len_, flags_);
368  if (nbytes >= 0)
369  return (size_t) nbytes;
370  if (zmq_errno () == EAGAIN)
371  return 0;
372  }while ((nbytes == -1) && (zmq_errno() == EINTR));
373  throw error_t ();
374  }
375 
376  inline bool send (message_t &msg_, int flags_ = 0)
377  {
378  int nbytes = 0;
379  do
380  {
381  nbytes = zmq_msg_send (&(msg_.msg), ptr, flags_);
382  if (nbytes >= 0)
383  return true;
384  if (zmq_errno () == EAGAIN)
385  return false;
386  } while ((nbytes == -1) && (zmq_errno() == EINTR));
387  throw error_t ();
388  }
389 
390  inline size_t recv (void *buf_, size_t len_, int flags_ = 0)
391  {
392  int nbytes = zmq_recv (ptr, buf_, len_, flags_);
393  if (nbytes >= 0)
394  return (size_t) nbytes;
395  if (zmq_errno () == EAGAIN)
396  return 0;
397  throw error_t ();
398  }
399 
400  inline bool recv (message_t *msg_, int flags_ = 0)
401  {
402  int nbytes = zmq_msg_recv (&(msg_->msg), ptr, flags_);
403  if (nbytes >= 0)
404  return true;
405  if (zmq_errno () == EAGAIN)
406  return false;
407  throw error_t ();
408  }
409 
410  private:
411 
412  void *ptr;
413 
415  void operator = (const socket_t&) ZMQ_DELETED_FUNCTION;
416  };
417 
418 }
419 
420 #endif