hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
zmsg.hpp
Go to the documentation of this file.
1 /* =========================================================================
2  zmsg.hpp
3 
4  Multipart message class for example applications.
5 
6  Follows the ZFL class conventions and is further developed as the ZFL
7  zfl_msg class. See http://zfl.zeromq.org for more details.
8 
9  -------------------------------------------------------------------------
10  Copyright (c) 1991-2010 iMatix Corporation <www.imatix.com>
11  Copyright other contributors as noted in the AUTHORS file.
12 
13  This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
14 
15  This is free software; you can redistribute it and/or modify it under the
16  terms of the GNU Lesser General Public License as published by the Free
17  Software Foundation; either version 3 of the License, or (at your option)
18  any later version.
19 
20  This software is distributed in the hope that it will be useful, but
21  WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL-
22  ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
23  Public License for more details.
24 
25  You should have received a copy of the GNU Lesser General Public License
26  along with this program. If not, see <http://www.gnu.org/licenses/>.
27  =========================================================================
28 
29  Andreas Hoelzlwimmer <andreas.hoelzlwimmer@fh-hagenberg.at>
30  */
31 
32 #ifndef __ZMSG_H_INCLUDED__
33 #define __ZMSG_H_INCLUDED__
34 
35 #include "zhelpers.hpp"
36 
37 #include <vector>
38 #include <string>
39 #include <stdarg.h>
40 
41 class zmsg {
42 public:
43  typedef std::basic_string<unsigned char> ustring;
44 
45  zmsg() {
46  }
47 
48  // --------------------------------------------------------------------------
49  // Constructor, sets initial body
50  zmsg(char const *body) {
51  body_set(body);
52  }
53 
54  // -------------------------------------------------------------------------
55  // Constructor, sets initial body and sends message to socket
56  zmsg(char const *body, zmq::socket_t &socket) {
57  body_set(body);
58  send(socket);
59  }
60 
61  // --------------------------------------------------------------------------
62  // Constructor, calls first receive automatically
63  zmsg(zmq::socket_t &socket) {
64  recv(socket);
65  }
66 
67  // --------------------------------------------------------------------------
68  // Copy Constructor, equivalent to zmsg_dup
69  zmsg(const zmsg &msg) {
70  m_part_data.resize(msg.m_part_data.size());
71  std::copy(msg.m_part_data.begin(), msg.m_part_data.end(), m_part_data.begin());
72  }
73 
74  virtual ~zmsg() {
75  clear();
76  }
77 
78  // --------------------------------------------------------------------------
79  // Erases all messages
80  void clear() {
81  m_part_data.clear();
82  }
83 
84  void set_part(size_t part_nbr, unsigned char *data) {
85  if (part_nbr < m_part_data.size()) {
86  m_part_data[part_nbr] = data;
87  }
88  }
89 
90  bool recv(zmq::socket_t & socket) {
91  clear();
92  while (1) {
93  zmq::message_t message(0);
94  try {
95  if (!socket.recv(&message, 0)) {
96  return false;
97  }
98  //} catch (zmq::error_t error) {
99  } catch (zmq::error_t& error) { //bgv fixed : catch by reference : 2013-04-02
100  std::cout << "E: " << error.what() << std::endl;
101  return false;
102  }
103  //ustring data = (unsigned char*) message.data();
104  ustring data((unsigned char*) message.data(), message.size()); //bgv fixed 2012-04-02
105  //std::cerr << "recv: \"" << (unsigned char*) message.data() << "\", size " << message.size() << std::endl;
106  if (message.size() == 17 && ((unsigned char *) message.data())[0] == 0) {
107  char *uuidstr = encode_uuid((unsigned char*) message.data());
108  push_back(uuidstr);
109  } else {
110  data[message.size()] = 0;
111  push_back((char *) data.c_str());
112  }
113  /*
114  if(m_part_data.size() == 1){
115  std::cout << "!!![" << m_part_data[0].size() << "][" << m_part_data[0].c_str() << "]!!!" << std::endl;
116  }
117  */
118  int64_t more = 0;
119  size_t more_size = sizeof(more);
120  try {
121  socket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
122  if (!more) {
123  break;
124  }
125  } catch(zmq::error_t& error) { // added by alexv 2014-11-10
126  std::cout << "E: " << error.what() << std::endl;
127  return false;
128  }
129  }
130  return true;
131  }
132 
133  void send(zmq::socket_t & socket) {
134  for (size_t part_nbr = 0; part_nbr < m_part_data.size(); part_nbr++) {
135  zmq::message_t message;
136  ustring data = m_part_data[part_nbr];
137  if (data.size() == 33 && data[0] == '@') {
138  unsigned char * uuidbin = decode_uuid((char *) data.c_str());
139  message.rebuild(17);
140  memcpy(message.data(), uuidbin, 17);
141  delete[] uuidbin; //bgv [] added because array deleted
142  } else {
143  message.rebuild(data.size());
144  memcpy(message.data(), data.c_str(), data.size());
145  }
146  try {
147  socket.send(message, part_nbr < m_part_data.size() - 1 ? ZMQ_SNDMORE : 0);
148  //} catch (zmq::error_t error) {
149  } catch (zmq::error_t& error) { //bgv fixed : catch by reference : 2013-04-02
150  assert(error.num() != 0);
151  }
152  }
153  clear();
154  }
155 
156  size_t parts() const { //bgv const added
157  return m_part_data.size();
158  }
159 
160  void body_set(const char *body) {
161  if (!m_part_data.empty()) {
162  m_part_data.erase(m_part_data.end() - 1);
163  }
164  push_back((char*) body);
165  }
166 
167  void body_fmt(const char *format, ...) {
168  char value[255 + 1];
169  va_list args;
170 
171  va_start(args, format);
172  vsnprintf(value, 255, format, args);
173  va_end(args);
174 
175  body_set(value);
176  }
177 
178  char * body() {
179  if (!m_part_data.empty())
180  return ((char *) m_part_data[m_part_data.size() - 1].c_str());
181  else
182  return 0;
183  }
184 
185  // zmsg_push
186  void push_front(char *part) {
187  m_part_data.insert(m_part_data.begin(), (unsigned char*) part);
188  }
189 
190  // zmsg_append
191  void push_back(char *part) {
192  m_part_data.push_back((unsigned char*) part);
193  }
194 
195  // --------------------------------------------------------------------------
196  // Formats 17-byte UUID as 33-char string starting with '@'
197  // Lets us print UUIDs as C strings and use them as addresses
198  //
199  static char *
200  encode_uuid(unsigned char *data) {
201  static char hex_char[] = "0123456789ABCDEF";
202 
203  assert(data[0] == 0);
204  char *uuidstr = new char[34];
205  uuidstr[0] = '@';
206  int byte_nbr;
207  for (byte_nbr = 0; byte_nbr < 16; byte_nbr++) {
208  uuidstr[byte_nbr * 2 + 1] = hex_char[data[byte_nbr + 1] >> 4];
209  uuidstr[byte_nbr * 2 + 2] = hex_char[data[byte_nbr + 1] & 15];
210  }
211  uuidstr[33] = 0;
212  return (uuidstr);
213  }
214 
215  // --------------------------------------------------------------------------
216  // Formats 17-byte UUID as 33-char string starting with '@'
217  // Lets us print UUIDs as C strings and use them as addresses
218  //
219  static unsigned char *
220  decode_uuid(char *uuidstr) {
221  static char hex_to_bin[128] = { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* */
222  -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* */
223  -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* */
224  0, 1, 2, 3, 4, 5, 6, 7, 8, 9, -1, -1, -1, -1, -1, -1, /* 0..9 */
225  -1, 10, 11, 12, 13, 14, 15, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* A..F */
226  -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* */
227  -1, 10, 11, 12, 13, 14, 15, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* a..f */
228  -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 }; /* */
229 
230  assert(strlen(uuidstr) == 33);
231  assert(uuidstr[0] == '@');
232  unsigned char *data = new unsigned char[17];
233  int byte_nbr;
234  data[0] = 0;
235  for (byte_nbr = 0; byte_nbr < 16; byte_nbr++)
236  data[byte_nbr + 1] = (hex_to_bin[uuidstr[byte_nbr * 2 + 1] & 127] << 4) + (hex_to_bin[uuidstr[byte_nbr * 2 + 2] & 127]);
237 
238  return (data);
239  }
240 
241  // zmsg_pop
243  if (m_part_data.empty()) {
244  return 0;
245  }
246  ustring part = m_part_data.front();
247  m_part_data.erase(m_part_data.begin());
248  return part;
249  }
250 
251  void append(const char *part) {
252  assert(part);
253  push_back((char*) part);
254  }
255 
256  char *address() {
257  if (!m_part_data.empty()) {
258  return (char*) m_part_data[0].c_str();
259  } else {
260  return 0;
261  }
262  }
263 
264  void wrap(const char *address, const char *delim) {
265  if (delim) {
266  push_front((char*) delim);
267  }
268  push_front((char*) address);
269  }
270 
271  char * unwrap() {
272  if (m_part_data.empty()) {
273  return NULL;
274  }
275  char *addr = (char*) pop_front().c_str();
276  if (address() && *address() == 0) {
277  pop_front();
278  }
279  return addr;
280  }
281 
282  unsigned int size() {
283  unsigned int size=0;
284 
285  for (unsigned int part_nbr = 0; part_nbr < m_part_data.size(); part_nbr++) {
286  size+=m_part_data[part_nbr].size();
287  }
288  return size;
289  }
290 
291  std::string dump(bool outToCErr=false) const { //bgv modified
292  std::stringstream ress;
293 
294  ress << "--------------------------------------" << std::endl;
295  for (unsigned int part_nbr = 0; part_nbr < m_part_data.size(); part_nbr++) {
296  ustring data = m_part_data[part_nbr];
297  // Dump the message as text or binary
298  int is_text = 1;
299  for (unsigned int char_nbr = 0; char_nbr < data.size(); char_nbr++)
300  if (data[char_nbr] < 32 || data[char_nbr] > 127)
301  is_text = 0;
302 
303  ress << "[" << std::setw(3) << std::setfill('0') << (int) data.size() << "] ";
304  for (unsigned int char_nbr = 0; char_nbr < data.size(); char_nbr++) {
305  if (is_text) {
306  ress << (char) data[char_nbr];
307  } else {
308  ress << std::hex << std::setw(2) << std::setfill('0') << (short int) data[char_nbr];
309  }
310  }
311  ress << std::endl;
312  }
313  ress << "--------------------------------------" << std::endl << std::endl;
314 
315  if (outToCErr){
316  std::cerr << ress.str();
317  }
318 
319  return ress.str();
320  }
321 
322  static int test(int verbose) {
323  zmq::context_t context(1);
324  zmq::socket_t output(context, ZMQ_DEALER);
325  try {
326  output.bind("ipc://zmsg_selftest.ipc");
327  //} catch (zmq::error_t error) {
328  } catch (zmq::error_t& error) { //bgv fixed : catch by reference : 2013-04-02
329  assert(error.num() != 0);
330  }
331  zmq::socket_t input(context, ZMQ_ROUTER);
332  try {
333  input.connect("ipc://zmsg_selftest.ipc");
334  //} catch (zmq::error_t error) {
335  } catch (zmq::error_t& error) { //bgv fixed : catch by reference : 2013-04-02
336  assert(error.num() != 0);
337  }
338 
339  zmsg zm;
340  zm.body_set((char *) "Hello");
341  assert(strcmp(zm.body(), "Hello") == 0);
342 
343  zm.send(output);
344  assert(zm.parts() == 0);
345 
346  zm.recv(input);
347  assert(zm.parts() == 2);
348  if (verbose) {
349  zm.dump();
350  }
351 
352  assert(strcmp(zm.body(), "Hello") == 0);
353 
354  zm.clear();
355  zm.body_set("Hello");
356  zm.wrap("address1", "");
357  zm.wrap("address2", 0);
358  assert(zm.parts() == 4);
359  zm.send(output);
360 
361  zm.recv(input);
362  if (verbose) {
363  zm.dump();
364  }
365  assert(zm.parts() == 5);
366  assert(strlen(zm.address()) == 33);
367  zm.unwrap();
368  assert(strcmp(zm.address(), "address2") == 0);
369  zm.body_fmt("%c%s", 'W', "orld");
370  zm.send(output);
371 
372  zm.recv(input);
373  zm.unwrap();
374  assert(zm.parts() == 4);
375  assert(strcmp(zm.body(), "World") == 0);
376  std::string part = zm.unwrap();
377  assert(part.compare("address2") == 0);
378 
379  // Pull off address 1, check that empty part was dropped
380  part = zm.unwrap();
381  assert(part.compare("address1") == 0);
382  assert(zm.parts() == 1);
383 
384  // Check that message body was correctly modified
385  part = (char*) zm.pop_front().c_str();
386  assert(part.compare("World") == 0);
387  assert(zm.parts() == 0);
388 
389  // Check append method
390  zm.append("Hello");
391  zm.append("World!");
392  assert(zm.parts() == 2);
393  assert(strcmp(zm.body(), "World!") == 0);
394 
395  zm.clear();
396  assert(zm.parts() == 0);
397 
398  std::cout << "OK" << std::endl;
399  return 0;
400  }
401 
402 private:
403  std::vector<ustring> m_part_data;
404 };
405 
406 #endif /* ZMSG_H_ */