hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DataClientProxy.cpp
Go to the documentation of this file.
1 #include "DataClientProxy.hpp"
2 #include "ServiceMessages.hpp"
3 #include "DRCEResourceUsage.hpp"
5 
6 namespace HCE {
7  namespace handlers {
8 
9  DataClientProxy::DataClientProxy(std::string name, zmq::context_t& ctx, const std::string& clientIdentity, std::string connectionString, int64_t startedAt,
10  unsigned char logPriority, const std::string& iniFile) :
11  Handler(name, ctx, clientIdentity, connectionString, startedAt, logPriority, iniFile), DataNodeOptions(Handler::handlerProperties),
12  _shiftCounter(), _clientsQueue(){
13 
14  setIniFile(iniFile),
16  initialize();
17  }
18 
20  deinitialize();
21  }
22 
24  logger.log(NODE_LOG_MODE_INFORMATION) << "Initialize client [" << getClientIdentity() << "] with connection [" << getConnectionString() << "]" << flush;
25 
27  _inprocProxySock = nullptr;
28  _inprocReducerOutSock = nullptr;
29  _dataClientSock = nullptr;
31 
34 
36  setHeartbeatAt(s_clock() + getHeartbeatDelay());
38  setHeartbeatedAt(s_clock());
39 
42  setStatRequests(0);
43 
46 
49 
52 
55 
58 
59  return true;
60  }
61 
63  if(_inprocProxySock){
66  }
67  delete _inprocProxySock;
68  _inprocProxySock = NULL;
69  }
70 
74  }
75  delete _inprocReducerOutSock;
76  _inprocReducerOutSock = NULL;
77  }
78 
79  if(_dataClientSock){
84  }
85  delete _dataClientSock;
86  _dataClientSock = NULL;
87  }
88 
90  }
91 
92  void DataClientProxy::setHeartbeatAt(int64_t heartbeatAt){
94  }
95 
98  }
99 
100  void DataClientProxy::setHeartbeatedAt(int64_t heartbeatedAt){
102  }
103 
106  }
107 
108  void DataClientProxy::setStatRequests(unsigned int statRequests){
110  }
111 
114  }
115 
116  void DataClientProxy::setStatResponses(unsigned int statResponses){
118  }
119 
122  }
123 
126  }
127 
129 
130  std::string addr((msg.address())?msg.address():"");
131  if(addr.compare(0, NODE_MSG_HEARTBEAT.length(), NODE_MSG_HEARTBEAT)==0){
132  logger.log(NODE_LOG_MODE_DEBUG) << "RECEIVED HEARTBEAT from server" << flush;
133 
135  setHeartbeatedAt(s_clock());
136 
140  sendHeartbeat();
141  logger.log(NODE_LOG_MODE_TRACE) << "HEARBEAT_MODE_IMMEDIATE_AFTER_HB || HEARBEAT_MODE_MIXED" << flush;
142  }
143  }else{
144  logger.log(NODE_LOG_MODE_TRACE) << "ERROR: wrong control message received" << endl << msg.dump() << flush;
145  }
146  }
147 
149  unsigned int statRequests = getStatRequests();
150  setStatRequests(++statRequests);
151 
152  logger.log(NODE_LOG_MODE_TRACE) << "MSG FROM dataclient" << endl << msg.dump() << flush;
153  logger.log(NODE_LOG_MODE_DEBUG) << "RECEIVED REQUEST from upper server, n=" << getStatRequests() << flush;
154 
156  msg.send(*_inprocProxySock);
157 
158  logger.log(NODE_LOG_MODE_DEBUG) << "REQUEST SENT to inproc_proxy socket" << flush;
159 
162  setHeartbeatedAt(s_clock());
166  setHeartbeatAt(s_clock() + getHeartbeatDelay());
167  }
168  }
169 
172  zmsg clientMsg(*_dataClientSock);
173 
174  if(clientMsg.parts() == NODE_MSG_CMD_MIN_FIELDS){
176  processControlMessage(clientMsg);
177  }else{
179  processDataMessage(clientMsg);
180  }
181 
183  setHeartbeatedAt(s_clock());
184  }
185 
189 
190  if (msg.parts() == NODE_MSG_CMD_MIN_FIELDS)
191  {
192  char* addrp = msg.address();
193  std::string addr ((addrp)?addrp:"");
194  if (addr.compare(0, NODE_MSG_ROUTES.length(), NODE_MSG_ROUTES)==0)
195  {
196  s_send(*_dataClientSock, addr); // resend message to server
197  }
198  }
199  else
200  {
201  unsigned int statResponses = getStatResponses();
202  setStatResponses(++statResponses);
203 
204  logger.log(NODE_LOG_MODE_TRACE) << "MSG FROM inproc_reducer_out" << endl << msg.dump() << flush;
205  logger.log(NODE_LOG_MODE_DEBUG) << "RESPONSE SENT to dataclient socket from proxy node to upper server, n=" << getStatResponses() << flush;
206 
208  msg.send(*_dataClientSock);
209  }
210  }
211 
213 
215  zmsg msg(*_inprocProxySock);
216  logger.log(NODE_LOG_MODE_TRACE) << " MSG FROM inproc_proxy" << endl << msg.dump() << flush;
217 
218  std::string addr((msg.address())?msg.address():"");
219 
220  if (addr.compare(0, NODE_MSG_ROUTES.length(), NODE_MSG_ROUTES)==0)
221  {
222  RouteMessage routeMessage(_clientsQueue);
223  routeMessage.extract(NODE_MSG_ROUTES, addr);
224  if (routeMessage.isError())
225  logger.log(NODE_LOG_MODE_ERROR) << "Operation extract ROUTE MESSAGE has error: " << routeMessage.getErrorMsg() << flush;
226  else
227  {
228  logger.log(NODE_LOG_MODE_TRACE) << "Operation extract ROUTE MESSAGE was made SUCCESSFULLY" << flush;
229  }
230  }
231  }
232 
236  if(s_clock() > getHeartbeatAt() && _dataClientSockConnected){
237  sendHeartbeat();
238 
239  logger.log(NODE_LOG_MODE_DEBUG) << "SENT HEARTBIT from client [" << getClientIdentity() << "] to server" << flush;
240  }
244 
245  logger.log(NODE_LOG_MODE_TRACE) << "DataClientProxy::heartbit HEARBEAT_MODE_ADAPTIVE delta = " << (s_clock()-getHeartbeatedAt()+getHeartbeatDelay())
246  << " _shiftCounter.getShift()= " << _shiftCounter.getShift() << flush;
247 
248  size_t delta = s_clock()-getHeartbeatAt()+getHeartbeatDelay();
250 
251  sendHeartbeat();
253 
254  logger.log(NODE_LOG_MODE_DEBUG) << "SENT HEARTBIT from client [" << getClientIdentity() << "] to server getHeartbeatAt() = '"
255  << Poco::DateTimeFormatter::format(Poco::LocalDateTime(Poco::Timestamp(Poco::Timestamp::fromEpochTime(getHeartbeatAt()/1000))), "%Y-%m-%d %H:%M:%S") << "'" << flush;
256  }
258  setHeartbeatedAt(s_clock());
259  }
260  }
261 
266  HCE::drce::ResourceUsageConverter resourceUsageConverter(resourceUsage);
268  HCE::handlers::HeartbeatMessage heartbeatMessage(_clientsQueue, resourceUsageConverter.toString());
269  std::string message = heartbeatMessage.build(NODE_MSG_HEARTBEAT);
270 
271  logger.log(NODE_LOG_MODE_TRACE) << "SENT HEARTBIT MESSAGE: " << message << flush;
273  s_send(*_dataClientSock, message);
274 
276  setHeartbeatAt(s_clock() + getHeartbeatDelay());
277  }
278 
280 
282 
284  logger.log(NODE_LOG_MODE_INFORMATION) << "RECONNECT client [" << getClientIdentity() << "] because HEARTBIT not received during time of "
285  << (getHeartbeatTimeout()+_shiftCounter.getShift()) << " msec" << flush;
286 
289 
291  setHeartbeatedAt(s_clock());
292 
294  setHeartbeatAt(s_clock() + getHeartbeatDelay());
295  }
296  }else{ // HEARBEAT_MODE_ADAPTIVE
297 
299 
300  logger.log(NODE_LOG_MODE_TRACE) << "DataClientProxy::externalConnectionRefresh (s_clock()-getHeartbeatedAt())= " << (s_clock()-getHeartbeatedAt()) <<
301  " getHeartbeatTimeout()= " << getHeartbeatTimeout() << " _shiftCounter.getShift()= " << _shiftCounter.getShift() << flush;
302 
304  logger.log(NODE_LOG_MODE_INFORMATION) << "RECONNECT client [" << getClientIdentity() << "] because HEARTBIT not received during time of "
305  << (getHeartbeatTimeout()+_shiftCounter.getShift()) << " msec" << flush;
306 
309 
310  //Reset time mark
311  setHeartbeatedAt(s_clock());
312 
314  setHeartbeatAt(s_clock() + getHeartbeatDelay());
315  }
316  }
317  }
318 
321  std::stringstream result;
322  result << NODE_ADMIN_ERROR_OK
324  << NODE_ADMIN_COMMAND_DELIMITER << "responses=" << getStatResponses()
325  << NODE_ADMIN_COMMAND_DELIMITER << "clientIdentity=" << getClientIdentity()
326  << NODE_ADMIN_COMMAND_DELIMITER << "heartbeatAt=" << static_cast<uint64_t>(getHeartbeatAt())
327  << NODE_ADMIN_COMMAND_DELIMITER << "heartbeatedAt=" << static_cast<uint64_t>(getHeartbeatedAt());
328  return result.str();
329  }
330 
331  std::string DataClientProxy::processAdminCommands(std::string& command, std::string& parameters){
332  std::string ret = NODE_ADMIN_ERROR_OK;
333 
334  logger.log(NODE_LOG_MODE_DEBUG) << "Admin command requested [" << command << "], parameters [" << parameters << "]" << flush;
335 
337  if(command == NODE_MSG_STAT){
338  std::stringstream result;
339  result << NODE_ADMIN_ERROR_OK << NODE_ADMIN_COMMAND_DELIMITER << "requests=" << getStatRequests() << ",responses=" << getStatResponses();
340  ret = result.str();
341  }else if(command == NODE_MSG_REBUILD_CLIENT_CONNECTION){
344  }else if(command == NODE_MSG_DISCONNECT_CLIENT_CONNECTION){
347  _dataClientSockConnected = false;
348  }else if(command == NODE_MSG_UPDATE_SCHEMA){
349  setConnectionString(parameters);
350  }else if(command == NODE_MSG_PROPERTIES){
352  ret = getPropertyInformation();
353  }else if(command == NODE_MSG_HEARTBEAT_DELAY_GET){
355  }else if(command == NODE_MSG_HEARTBEAT_DELAY_SET){
357  unsigned int heartbeatDelay = HEARTBEAT_DELAY;
359  std::istringstream(parameters) >> heartbeatDelay;
362  setHeartbeatDelay(heartbeatDelay*1000);
363  }else if(command == NODE_MSG_HEARTBEAT_TIMEOUT_GET){
365  }else if(command == NODE_MSG_HEARTBEAT_TIMEOUT_SET){
367  int hbTimeout = HEARTBEAT_TIMEOUT;
369  std::istringstream(parameters) >> hbTimeout;
372  setHeartbeatTimeout(static_cast<unsigned int>(hbTimeout*1000));
373  }else if(command == NODE_MSG_HEARTBEAT_MODE_GET){
375  }else if(command == NODE_MSG_HEARTBEAT_MODE_SET){
377  int hbMode = HEARBEAT_MODE_DEFAULT;
379  std::istringstream(parameters) >> hbMode;
382  setHeartbeatMode(static_cast<unsigned int>(hbMode));
383  }else{
386  }
387 
388  return ret;
389  }
390 
393 
394  while(getInProgress()){
395  try {
397  std::vector<zmq::pollitem_t> items;
399  items ={{*_dataClientSock, 0, ZMQ_POLLIN, 0},{*_inprocReducerOutSock, 0, ZMQ_POLLIN, 0},{*_inprocAdminSock, 0, ZMQ_POLLIN, 0},{*_inprocProxySock, 0, ZMQ_POLLIN, 0}}; //add new socket
400  } else{
401  items ={{*_inprocReducerOutSock, 0, ZMQ_POLLIN, 0},{*_inprocAdminSock, 0, ZMQ_POLLIN, 0},{*_inprocProxySock, 0, ZMQ_POLLIN, 0}}; //add new socket inproc_proxy
402  }
403 
405  zmq::poll(&items[0], items.size(), getPollTimeout());
406 
409  if(items[0].revents & ZMQ_POLLIN){
411  }
412  }
413 
415  if(items[items.size() - 3].revents & ZMQ_POLLIN){
417  }
418 
420  if(items[items.size() - 2].revents & ZMQ_POLLIN){
422  }
423 
425  if(items[items.size() - 1].revents & ZMQ_POLLIN){
427  }
428 
430  heartbit();
431 
434 
437 
440  }catch(zmq::error_t& error){
441  if (EINTR!=error.num()){
442  std::cout << "DataClientProxy E: " << error.what() << std::endl;
443  }
444  }catch(std::exception& e){
445  std::cout << "DataClientProxy E: " << e.what() << std::endl;
446  }catch(...){
447  std::cout << "DataClientProxy E: Unknown exception" << std::endl;
448  }
449  }
451 
452  return NULL;
453  }
454 
455  }
456 }