hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
zhelpers.hpp
Go to the documentation of this file.
1 /* =========================================================================
2  zhelpers.h - ZeroMQ helpers for example applications
3 
4  Copyright (c) 1991-2010 iMatix Corporation and contributors
5 
6  This is free software; you can redistribute it and/or modify it under
7  the terms of the Lesser GNU General Public License as published by
8  the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  This software is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  Lesser GNU General Public License for more details.
15 
16  You should have received a copy of the Lesser GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18  =========================================================================
19 */
20 
21 // Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
22 
23 
24 #ifndef __ZHELPERS_HPP_INCLUDED__
25 #define __ZHELPERS_HPP_INCLUDED__
26 
27 // Include a bunch of headers that we will need in the examples
28 
29 #include "zmq.hpp"
30 
31 #include <iostream>
32 #include <iomanip>
33 #include <string>
34 #include <sstream>
35 
36 #include <sys/time.h>
37 #include <unistd.h>
38 #include <time.h>
39 #include <assert.h>
40 #include <pthread.h>
41 #include <stdlib.h> // random() RAND_MAX
42 #include <stdio.h>
43 #include <stdarg.h>
44 #include <signal.h>
45 
46 // Bring Windows MSVC up to C99 scratch
47 #if (defined (__WINDOWS__))
48  typedef unsigned long ulong;
49  typedef unsigned int uint;
50  typedef __int64 int64_t;
51 #endif
52 
53 // Provide random number from 0..(num-1)
54 #define within(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0))
55 
56 // Receive 0MQ string from socket and convert into string
57 static std::string
58 s_recv (zmq::socket_t & socket) {
59 
60  zmq::message_t message;
61  socket.recv(&message);
62 
63  return std::string(static_cast<char*>(message.data()), message.size());
64 }
65 
66 // Convert string to 0MQ string and send to socket
67 static bool
68 s_send (zmq::socket_t & socket, const std::string & string) {
69 
70  zmq::message_t message(string.size());
71  memcpy(message.data(), string.data(), string.size());
72 
73  bool rc = false;
74  try {
75  rc = socket.send(message);
76  } catch (zmq::error_t& error) { // added by alexv 2014-11-11
77  std::cout << "E: " << error.what() << std::endl;
78  }
79  return (rc);
80 }
81 
82 // Sends string as 0MQ string, as multipart non-terminal
83 static bool
84 s_sendmore (zmq::socket_t & socket, const std::string & string) {
85 
86  zmq::message_t message(string.size());
87  memcpy(message.data(), string.data(), string.size());
88 
89  bool rc = socket.send(message, ZMQ_SNDMORE);
90  return (rc);
91 }
92 
93 // Receives all message parts from socket, prints neatly
94 //
95 static void
96 s_dump (zmq::socket_t & socket)
97 {
98  std::cout << "----------------------------------------" << std::endl;
99 
100  while (1) {
101  // Process all parts of the message
102 
103  zmq::message_t message;
104  socket.recv(&message);
105 
106  // Dump the message as text or binary
107  int size = message.size();
108  std::string data(static_cast<char*>(message.data()), size);
109 
110  bool is_text = true;
111 
112  int char_nbr;
113  //unsigned char byte;
114  for (char_nbr = 0; char_nbr < size; char_nbr++) {
115  //byte = data [char_nbr];
116  //if (byte < 32 || byte > 127)
117  //bgv
118  if (((unsigned char) data [char_nbr]) < 32 || ((unsigned char) data [char_nbr]) > 127)
119  is_text = false;
120  }
121 
122  std::cout << "[" << std::setfill('0') << std::setw(3) << size << "]";
123 
124  for (char_nbr = 0; char_nbr < size; char_nbr++) {
125  if (is_text) {
126  std::cout << (char)data [char_nbr];
127  } else {
128  std::cout << std::setfill('0') << std::setw(2)
129  << std::hex << (unsigned int) data [char_nbr];
130  }
131  }
132  std::cout << std::endl;
133 
134  int64_t more; // Multipart detection
135  size_t more_size = sizeof (more);
136  socket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
137 
138  if (!more)
139  break; // Last message part
140  }
141 }
142 
143 // Set simple random printable identity on socket
144 //
145 inline std::string
147 {
148  std::stringstream ss;
149  ss << std::hex << std::uppercase
150  << std::setw(4) << std::setfill('0') << within (0x10000) << "-"
151  << std::setw(4) << std::setfill('0') << within (0x10000);
152  socket.setsockopt(ZMQ_IDENTITY, ss.str().c_str(), ss.str().length());
153  return ss.str();
154 }
155 
156 // Report 0MQ version number
157 //
158 static void
159 s_version (void)
160 {
161  int major, minor, patch;
162  zmq_version (&major, &minor, &patch);
163  std::cout << "Current 0MQ version is " << major << "." << minor << "." << patch << std::endl;
164 }
165 
166 static void
167 s_version_assert (int want_major, int want_minor)
168 {
169  int major, minor, patch;
170  zmq_version (&major, &minor, &patch);
171  if (major < want_major
172  || (major == want_major && minor < want_minor)) {
173  std::cout << "Current 0MQ version is " << major << "." << minor << std::endl;
174  std::cout << "Application needs at least " << want_major << "." << want_minor
175  << " - cannot continue" << std::endl;
176  exit (EXIT_FAILURE);
177  }
178 }
179 
180 // Return current system clock as milliseconds
181 static int64_t
182 s_clock (void)
183 {
184 #if (defined (__WINDOWS__))
185  SYSTEMTIME st;
186  GetSystemTime (&st);
187  return (int64_t) st.wSecond * 1000 + st.wMilliseconds;
188 #else
189  struct timeval tv;
190  gettimeofday (&tv, NULL);
191  //return (int64_t) (tv.tv_sec * 1000 + tv.tv_usec / 1000);
192  //int64_t i64t1=tv.tv_sec * 1000LL, i64t2=tv.tv_usec / 1000LL, i64t3=(int64_t) (tv.tv_sec * 1000LL + tv.tv_usec / 1000LL);
193  //std::cout << "tv.tv_sec * 1000LL=" << i64t1 << ", tv.tv_usec/1000LL=" << i64t2 << ", ret=" << i64t3 << std::endl;
194  return (int64_t) (tv.tv_sec * 1000LL + tv.tv_usec / 1000LL); //bgv
195 #endif
196 }
197 
198 // Sleep for a number of milliseconds
199 static void
200 s_sleep (int msecs)
201 {
202 #if (defined (__WINDOWS__))
203  Sleep (msecs);
204 #else
205  struct timespec t;
206  t.tv_sec = msecs / 1000;
207  t.tv_nsec = (msecs % 1000) * 1000000;
208  nanosleep (&t, NULL);
209 #endif
210 }
211 
212 static void
213 s_console (const char *format, ...)
214 {
215  time_t curtime = time (NULL);
216  struct tm *loctime = localtime (&curtime);
217  char *formatted = new char[20];
218  strftime (formatted, 20, "%y-%m-%d %H:%M:%S ", loctime);
219  printf ("%s", formatted);
220  delete[] formatted;
221 
222  va_list argptr;
223  va_start (argptr, format);
224  vprintf (format, argptr);
225  va_end (argptr);
226  printf ("\n");
227 }
228 
229 // ---------------------------------------------------------------------
230 // Signal handling
231 //
232 // Call s_catch_signals() in your application at startup, and then exit
233 // your main loop if s_interrupted is ever 1. Works especially well with
234 // zmq_poll.
235 
236 static int s_interrupted = 0;
237 static void s_signal_handler (int signal_value)
238 {
239  s_interrupted = 1;
240 }
241 
242 static void s_catch_signals ()
243 {
244  struct sigaction action;
245  action.sa_handler = s_signal_handler;
246  action.sa_flags = 0;
247  sigemptyset (&action.sa_mask);
248  sigaction (SIGINT, &action, NULL);
249  sigaction (SIGTERM, &action, NULL);
250 }
251 
252 #endif