hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DataClientData.cpp
Go to the documentation of this file.
1 #include "DataClientData.hpp"
3 #include "ServiceMessages.hpp"
4 
5 namespace HCE {
6  namespace handlers {
7 
8  DataClientData::DataClientData(std::string name, zmq::context_t& ctx, const std::string& clientIdentity, std::string connectionString, unsigned char dataClientMode, int64_t startedAt,
9  unsigned char logPriority, const std::string& iniFile) :
10  Handler(name, ctx, clientIdentity, connectionString, startedAt, logPriority, iniFile), DataNodeOptions(Handler::handlerProperties),
11  _shiftCounter(){
12 
13  setIniFile(iniFile);
14  setDataClientSockMode(dataClientMode);
16  initialize();
17  }
18 
20  deinitialize();
21  }
22 
24  logger.log(NODE_LOG_MODE_INFORMATION) << "Initialize client [" << getClientIdentity() << "] with connection [" << getConnectionString() << "]" << flush;
25 
27  _inprocDataSock = NULL;
28  _dataClientSock = NULL;
30 
33 
35  setHeartbeatAt(s_clock() + getHeartbeatDelay());
37  setHeartbeatedAt(s_clock());
38 
40  setPstatsAt(s_clock() + STAT_INTERVAL);
41 
43  checkInit();
44 
47 
50 
53 
54  return true;
55  }
56 
58  if(_inprocDataSock){
59  delete _inprocDataSock;
60  }
61  _inprocDataSock = NULL;
62 
63  if(_dataClientSock){
66  delete _dataClientSock;
67  }
68  _dataClientSock = NULL;
69 
71  }
72 
73  void DataClientData::setDataClientSockMode(unsigned char dataClientSockMode){
75  }
76 
79  }
80 
81  void DataClientData::setStatRequests(unsigned int statRequests){
83  }
84 
85  unsigned int DataClientData::getStatRequests(void){
87  }
88 
89  void DataClientData::setStatResponses(unsigned int statResponses){
91  }
92 
95  }
96 
97  void DataClientData::setStatRequestsSize(unsigned int statRequestsSize){
99  }
100 
103  }
104 
105  void DataClientData::setStatResponsesSize(unsigned int statResponsesSize){
107  }
108 
111  }
112 
113  void DataClientData::setStatRequestsProcessed(unsigned int statRequestsProcessed){
115  }
116 
119  }
120 
121  void DataClientData::setStatResponsesProcessed(unsigned int statResponsesProcessed){
123  }
124 
127  }
128 
129  void DataClientData::setStatRequestsProcessedSize(unsigned int statRequestsProcessedSize){
131  }
132 
135  }
136 
137  void DataClientData::setStatResponsesProcessedSize(unsigned int statResponsesProcessedSize){
139  }
140 
143  }
144 
145  void DataClientData::setStatQPS(float statQPS){
147  }
148 
151  }
152 
153  void DataClientData::setStatRPS(float statRPS){
155  }
156 
159  }
160 
161  void DataClientData::setStatRequestsBandwidth(float statRequestsBandwidth){
163  }
164 
167  }
168 
169  void DataClientData::setStatResponsesBandwidth(float statResponsesBandwidth){
171  }
172 
175  }
176 
177  void DataClientData::setPstatsAt(int64_t pstatsAt){
179  }
180 
183  }
184 
185  void DataClientData::setHeartbeatAt(int64_t heartbeatAt){
187  }
188 
191  }
192 
193  void DataClientData::setHeartbeatedAt(int64_t heartbeatedAt){
195  }
196 
199  }
200 
204  setStatRequests(0);
205 
207  setStatResponses(0);
208 
211 
214 
217 
220 
223 
226 
228  setStatQPS(0);
229 
231  setStatRPS(0);
232 
235 
238  }
239 
241  const std::string connectionString = getConnectionString();
242  if(connectionString != "" && connectionString != "0"){
244  }else{
245  logger.log(NODE_LOG_MODE_INFORMATION) << "Client connection was not rebuilt due connection string empty value [" << connectionString << "]" << flush;
246  }
247  }
248 
251  zmsg clientMsg(*_dataClientSock);
252 
253  std::string addr((clientMsg.address())?clientMsg.address():"");
254 
255  logger.log(NODE_LOG_MODE_TRACE) << "DataClientData::handleExternalMessage addr: " << addr << flush;
256 
258  if(clientMsg.parts() == NODE_MSG_CMD_MIN_FIELDS){
260  if(addr.compare(0, NODE_MSG_HEARTBEAT.length(), NODE_MSG_HEARTBEAT)==0){
261  logger.log(NODE_LOG_MODE_DEBUG) << "HEARTBEAT message from server" << flush;
262 
264  setHeartbeatedAt(s_clock());
265 
269  sendHeartbeat();
270 
271  logger.log(NODE_LOG_MODE_TRACE) << "HEARBEAT_MODE_IMMEDIATE_AFTER_HB || HEARBEAT_MODE_MIXED" << flush;
272  }
273 
274  }else{
275  logger.log(NODE_LOG_MODE_TRACE) << "ERROR control message" << endl << clientMsg.dump() << flush;
276  }
277  }else{
279  logger.log(NODE_LOG_MODE_TRACE) << "MSG FROM dataclient" << endl << clientMsg.dump() << flush;
280 
282  std::string clientIdentity((const char*)clientMsg.pop_front().c_str());
283  std::string messageId((const char*)clientMsg.pop_front().c_str());
284  std::string messageBody((const char*)clientMsg.pop_front().c_str());
285  std::string messageRoute((clientMsg.parts())?(const char*)clientMsg.pop_front().c_str():"");
286 
287  logger.log(NODE_LOG_MODE_TRACE) << "DataClientData::handleExternalMessage Recieved body: \"" << messageBody << "\"" << flush;
288  logger.log(NODE_LOG_MODE_TRACE) << "DataClientData::handleExternalMessage Recieved route: \"" << messageRoute << "\"" << flush;
289 
290  unsigned int statRequests = getStatRequests();
291  setStatRequests(++statRequests);
293 
294  logger.log(NODE_LOG_MODE_DEBUG) << "RECEIVED REQUEST from upper server [" << clientIdentity << "], message [" << messageId << "] body " << messageBody.size()
295  << " bytes n=" << getStatRequests() << flush;
296 
298  zmsg msg(messageId.c_str());
300  msg.append(messageBody.c_str());
302  msg.append(messageRoute.c_str());
303 
304  if(getDataClientSockMode() == 0){
306  msg.send(*_dataClientSock);
307  unsigned int statResponses = getStatResponses();
308  setStatResponses(++statResponses);
310 
311  logger.log(NODE_LOG_MODE_TRACE) << " MSG TO dataclient" << endl << msg.dump() << flush;
312  logger.log(NODE_LOG_MODE_DEBUG) << " RESPONSE to dataclient socket from end-point node to upper server [" << clientIdentity << "] sent, n=" << getStatResponses() << flush;
313 
314  }else{
316  msg.send(*_inprocDataSock);
317  logger.log(NODE_LOG_MODE_DEBUG) << "REQUEST SENT to inproc_data socket" << flush;
318  }
319 
321  setHeartbeatedAt(s_clock());
322 
324 
328  setHeartbeatAt(s_clock() + getHeartbeatDelay());
329  }
330  }
331  }
332 
335  zmsg msg(*_inprocDataSock);
336 
337  logger.log(NODE_LOG_MODE_TRACE) << "MSG FROM inproc_data" << endl << msg.dump() << flush;
338 
340  msg.send(*_dataClientSock);
341 
342  unsigned int statResponses = getStatResponses();
343  setStatResponses(++statResponses);
345 
346  logger.log(NODE_LOG_MODE_DEBUG) << "RESPONSE to dataclient socket from data node to upper server sent, n=" << getStatResponses() << flush;
347 
349  setHeartbeatAt(s_clock() + getHeartbeatDelay());
350  }
351 
355  if((s_clock() > getHeartbeatAt()) && _dataClientSockConnected){
356 
358  sendHeartbeat();
359 
360  logger.log(NODE_LOG_MODE_TRACE) << "HEARBEAT_MODE_FIXED_TIME || HEARBEAT_MODE_MIXED" << flush;
361  logger.log(NODE_LOG_MODE_DEBUG) << "SENT HEARTBIT from client [" << getClientIdentity() << "] to server" << flush;
362  }
366 
367  logger.log(NODE_LOG_MODE_TRACE) << "DataClientData::heartbit HEARBEAT_MODE_ADAPTIVE delta = " << (s_clock()- getHeartbeatAt()+getHeartbeatDelay())
368  << " _shiftCounter.getShift()= " << _shiftCounter.getShift() << flush;
369 
370  size_t delta = s_clock()-getHeartbeatAt()+getHeartbeatDelay();
373  sendHeartbeat();
376 
377  logger.log(NODE_LOG_MODE_DEBUG) << "SENT HEARTBIT from client [" << getClientIdentity() << "] to server getHeartbeatAt() = '"
378  << Poco::DateTimeFormatter::format(Poco::LocalDateTime(Poco::Timestamp(Poco::Timestamp::fromEpochTime(getHeartbeatAt()/1000))), "%Y-%m-%d %H:%M:%S") << "'" << flush;
379  }
381  setHeartbeatedAt(s_clock());
382  }
383  }
384 
387 // zmsg msg(NODE_MSG_HEARTBEAT.c_str());
388 // msg.append(getResourceUsageMessage().c_str());
389 // msg.send(*_dataClientSock);
390 
391  std::vector<ClientWorkerItem> queue;
392  HCE::handlers::HeartbeatMessage heartbeatMessage(queue, getResourceUsageMessage());
393  std::string message = heartbeatMessage.build(NODE_MSG_HEARTBEAT);
394  s_send(*_dataClientSock, message);
395 
396  logger.log(NODE_LOG_MODE_TRACE) << "SENT HEARTBIT MESSAGE: " << message << flush;
397 
399  setHeartbeatAt(s_clock() + getHeartbeatDelay());
400  }
401 
406  HCE::drce::ResourceUsageConverter resourceUsageConverter(resourceUsage);
407  return resourceUsageConverter.toString();
408  }
409 
411  if(s_clock() > getPstatsAt()){
412  setStatQPS((getStatRequests() - getStatRequestsProcessed()) / (float)((STAT_INTERVAL) / 1000.0));
413  setStatRPS(((getStatResponses() - getStatResponsesProcessed()) / (float)((STAT_INTERVAL) / 1000.0)));
416 
421 
422  logger.log(NODE_LOG_MODE_DEBUG) << "requests=" << getStatRequests() << ", responses=" << getStatResponses() << ", qps=" << getStatQPS() << ", rps=" << getStatRPS()
423  << ", requestsSize=" << getStatRequestsSize() << ", responsesSize=" << getStatResponsesSize() << ", requestsBandwidth="
424  << getStatRequestsBandwidth() << ", responsesBandwidth=" << getStatResponsesBandwidth() << flush;
425 
427  setPstatsAt(s_clock() + STAT_INTERVAL);
428  }
429  }
430 
432  std::stringstream result;
433  result << NODE_ADMIN_ERROR_OK
435  << NODE_ADMIN_COMMAND_DELIMITER << "responses=" << getStatResponses()
436  << NODE_ADMIN_COMMAND_DELIMITER << "requestsSize=" << getStatRequestsSize()
437  << NODE_ADMIN_COMMAND_DELIMITER << "responsesSize=" << getStatResponsesSize()
438  << NODE_ADMIN_COMMAND_DELIMITER << "clientIdentity=" << getClientIdentity()
439  << NODE_ADMIN_COMMAND_DELIMITER << "dataClientMode=" << static_cast<int>(getDataClientSockMode())
440  << NODE_ADMIN_COMMAND_DELIMITER << "pstatsAt=" << static_cast<uint64_t>(getPstatsAt())
441  << NODE_ADMIN_COMMAND_DELIMITER << "heartbeatAt=" << static_cast<uint64_t>(getHeartbeatAt())
442  << NODE_ADMIN_COMMAND_DELIMITER << "heartbeatedAt=" << static_cast<uint64_t>(getHeartbeatedAt())
443  << NODE_ADMIN_COMMAND_DELIMITER << "statQPS=" << getStatQPS()
444  << NODE_ADMIN_COMMAND_DELIMITER << "statRPS=" << getStatRPS()
445  << NODE_ADMIN_COMMAND_DELIMITER << "statRequestsBandwidth=" << getStatRequestsBandwidth()
446  << NODE_ADMIN_COMMAND_DELIMITER << "statResponsesBandwidth=" << getStatResponsesBandwidth();
447  return result.str();
448  }
449 
451 
453 
455  logger.log(NODE_LOG_MODE_INFORMATION) << "RECONNECT client [" << getClientIdentity() << "] because HEARTBIT not received during time of "
456  << (getHeartbeatTimeout()+_shiftCounter.getShift()) << " msec" << flush;
457 
460 
462  setHeartbeatedAt(s_clock());
463 
465  setHeartbeatAt(s_clock() + getHeartbeatDelay());
466  }
467  }else{ // HEARBEAT_MODE_ADAPTIVE
468 
470 
471  logger.log(NODE_LOG_MODE_TRACE) << "DataClientData::externalConnectionRefresh (s_clock()-getHeartbeatedAt())= " << (s_clock()-getHeartbeatedAt())
472  << " getHeartbeatTimeout()= " << getHeartbeatTimeout() << " _shiftCounter.getShift()= " << _shiftCounter.getShift() << flush;
473 
475  logger.log(NODE_LOG_MODE_INFORMATION) << "RECONNECT client [" << getClientIdentity() << "] because HEARTBIT not received during time of "
476  << (getHeartbeatTimeout()+_shiftCounter.getShift()) << " msec" << flush;
477 
480 
482  setHeartbeatedAt(s_clock());
483 
485 
487  setHeartbeatAt(s_clock() + getHeartbeatDelay());
488  }
489  }
490  }
491 
492  std::string DataClientData::processAdminCommands(std::string& command, std::string& parameters){
493  std::string ret = NODE_ADMIN_ERROR_OK;
494 
495  logger.log(NODE_LOG_MODE_DEBUG) << "Admin command requested [" << command << "], parameters [" << parameters << "]" << flush;
496 
498  if(command == NODE_MSG_STAT){
499  std::stringstream result;
500  result << NODE_ADMIN_ERROR_OK << NODE_ADMIN_COMMAND_DELIMITER << "requests=" << getStatRequests() << ",responses=" << getStatResponses()
501  << ",requests_size=" << getStatRequestsSize() << ",responses_size=" << getStatResponsesSize();
502  ret = result.str();
503  }else if(command == NODE_MSG_REBUILD_CLIENT_CONNECTION){
506  }else if(command == NODE_MSG_DISCONNECT_CLIENT_CONNECTION){
509  _dataClientSockConnected = false;
510  }else if(command == NODE_MSG_UPDATE_SCHEMA){
511  setConnectionString(parameters);
512  }else if(command == NODE_MSG_PROPERTIES){
514  ret = getPropertyInformation();
515  }else if(command == NODE_MSG_HEARTBEAT_DELAY_GET){
517  }else if(command == NODE_MSG_HEARTBEAT_DELAY_SET){
519  unsigned int heartbeatDelay = HEARTBEAT_DELAY;
521  std::istringstream(parameters) >> heartbeatDelay;
524  setHeartbeatDelay(heartbeatDelay*1000);
525  }else if(command == NODE_MSG_HEARTBEAT_TIMEOUT_GET){
527  }else if(command == NODE_MSG_HEARTBEAT_TIMEOUT_SET){
529  unsigned int heartbeatTimeout = HEARTBEAT_TIMEOUT;
531  std::istringstream(parameters) >> heartbeatTimeout;
534  setHeartbeatTimeout(heartbeatTimeout*1000);
535  }else if(command == NODE_MSG_HEARTBEAT_MODE_GET){
537  }else if(command == NODE_MSG_HEARTBEAT_MODE_SET){
539  int hbMode = HEARBEAT_MODE_DEFAULT;
541  std::istringstream(parameters) >> hbMode;
544  setHeartbeatMode(static_cast<unsigned int>(hbMode));
545  }else{
548  }
549 
550  return ret;
551  }
552 
555 
556  long delay = getPollTimeout();
557 
558  while(getInProgress()){
559  try{
561  std::vector<zmq::pollitem_t> items;
563  items ={{*_dataClientSock, 0, ZMQ_POLLIN, 0},{*_inprocDataSock, 0, ZMQ_POLLIN, 0},{*_inprocAdminSock, 0, ZMQ_POLLIN, 0}};
564  } else{
565  items ={{*_inprocDataSock, 0, ZMQ_POLLIN, 0},{*_inprocAdminSock, 0, ZMQ_POLLIN, 0}};
566  }
567 
569  if (delay){
570  delay = getPollTimeout();
571  }
572 
574  zmq::poll(&items[0], items.size(), delay);
575 
578  if(items[0].revents & ZMQ_POLLIN){
580  }
581  }
582 
584  if(items[items.size() - 2].revents & ZMQ_POLLIN){
586  }
587 
589  if(items[items.size() - 1].revents & ZMQ_POLLIN){
591  if(getInProgress() == false){
592  delay = 0;
593  setInProgress(true);
594  continue;
595  }
596  }
597 
598  if(getInProgress() == true && delay == 0){
599  setInProgress(false);
600  continue;
601  }
602 
604  heartbit();
605 
608 
611 
614 
617 
618  }catch(zmq::error_t& error){
619  if (EINTR!=error.num()){
620  std::cout << "DataClientData E: " << error.what() << std::endl;
621  }
622  }catch(std::exception& e){
623  std::cout << "DataClientData E: " << e.what() << std::endl;
624  }catch(...){
625  std::cout << "DataClientData E: Unknown exception" << std::endl;
626  }
627  }
628 
630 
631  return NULL;
632  }
633  }
634 }