hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Handler.hpp
Go to the documentation of this file.
1 
15 #ifndef __HCE_HANDLER_INCLUDED__
16 #define __HCE_HANDLER_INCLUDED__
17 
18 #include <bitset>
19 
20 #include "zmsg.hpp"
21 #include "HandlerTypes.hpp"
22 #include "ClientsQueueManager.hpp"
24 
25 //POCO date time formatter dep.
26 #include <Poco/Timestamp.h>
27 #include <Poco/DateTimeFormatter.h>
28 #include <Poco/Logger.h>
29 #include <Poco/AutoPtr.h>
30 #include <Poco/Configurable.h>
31 #include <Poco/Util/IniFileConfiguration.h>
32 #include <Poco/String.h>
33 #include <Poco/Path.h>
34 #include <Poco/File.h>
35 
36 #include "LoggerStream.hpp"
37 #include "ServiceMessages.hpp"
38 #include "HandlerProperties.hpp"
39 
40 namespace HCE {
41  namespace handlers {
42 
43  class Handler {
44  public:
45  Handler(const std::string& name, zmq::context_t& context, const std::string& clientIdentity, const std::string& connectionString,
46  int64_t startedAt, unsigned char logPriority, const std::string& iniFile="") :
55  initHandler(iniFile, logger);
56 
57  setName(name);
58  setClientIdentity(clientIdentity);
59  setStartedAt(startedAt);
60  setLogPriority(logPriority);
63  setConnectionString(connectionString);
66  if (getDumpAllowRestore() > 0){
69  logger.log(HCE::handlers::NODE_LOG_MODE_ERROR) << "Load handler properties has error: " << handlerProperties.getErrorMsg() << flush;
70  }else{
71  logger.log(HCE::handlers::NODE_LOG_MODE_DEBUG) << "Load handler properties - SUCCESS" << flush;
72  }
75  logger.log(HCE::handlers::NODE_LOG_MODE_ERROR) << "Load stat properties has error: " << statProperties.getErrorMsg() << flush;
76  }else{
77  logger.log(HCE::handlers::NODE_LOG_MODE_DEBUG) << "Load stat properties - SUCCESS" << flush;
78  }
79  }
80  }
81 
82  virtual ~Handler(){
85  logger.log(HCE::handlers::NODE_LOG_MODE_DEBUG) << "Save handler properties has error: " << handlerProperties.getErrorMsg() << flush;
86  }
87 
90  logger.log(HCE::handlers::NODE_LOG_MODE_DEBUG) << "Save stat properties has error: " << statProperties.getErrorMsg() << flush;
91  }
92 
93  if(_inprocAdminSock){
96  }
97  delete _inprocAdminSock;
98  _inprocAdminSock = NULL;
99  }
100  }
101 
102  static void* main(void *that){
103  return (static_cast<Handler*>(that))->process();
104  }
105 
106  void shutdown(void){
107  _inProgress = false;
108  }
109 
111  std::string getName(void) {
113  }
114 
115  protected:
116 
118  void setName(const std::string& name){
120  }
121 
123  void setClientIdentity(const std::string& clientIdentity){
125  }
126 
128  std::string getClientIdentity(void) {
130  }
131 
133  void setStartedAt(int64_t startedAt) {
135  }
136 
138  int64_t getStartedAt(void) {
140  }
141 
143  void setLogPriority(unsigned int logPriority) {
145  }
146 
148  unsigned int getLogPriority(void) {
150  }
151 
153  void setIniFile(const std::string& iniFile){
155  }
156 
158  std::string getIniFile(void){
160  }
161 
163  void setBindTriesMaxNumber(const unsigned int& bindTriesMaxNumber){
165  }
166 
168  unsigned int getBindTriesMaxNumber(void) {
170  }
171 
173  void setBindTriesDelay(const unsigned int& bindTriesDelay){
175  }
176 
178  unsigned int getBindTriesDelay(void) {
180  }
181 
183  void setConnectionString(const std::string& connectionString){
185  }
186 
188  std::string getConnectionString(void) {
190  }
191 
193  void setInProgress(bool in_progress){
194  _inProgress = in_progress;
195  }
196 
198  bool getInProgress(void) const{
199  return _inProgress;
200  }
201 
203  void setPollTimeout(unsigned int pollTimeout){
204  if (pollTimeout>=MIN_ALLOWED_POLL_TIMEOUT){
206  }
207  }
208 
210  unsigned int getPollTimeout(void) {
212  }
213 
215  void setPropertyInterval(unsigned int propertyInterval){
216  if (propertyInterval >= MIN_ALLOWED_PROPERTY_INTERVAL){
218  }
219  }
220 
222  unsigned int getPropertyInterval(void) {
224  }
225 
227  void setDumpInterval(unsigned int dumpInterval){
228  if (dumpInterval >= MIN_ALLOWED_DUMP_INTERVAL){
230  _dumpIntervalAt = s_clock() + dumpInterval;
231  }
232  }
233 
235  unsigned int getDumpInterval(void) {
237  }
238 
240  void setDumpAllowRestore(unsigned int dumpAllowRestore){
242  }
243 
245  unsigned int getDumpAllowRestore(void) {
247  }
248 
249  void logFlush(unsigned int priority = NODE_LOG_MODE_DEFAULT){
250  if(priority == getLogPriority() || priority == NODE_LOG_MODE_DEFAULT){
251  //Fill time mark
252  std::string timeMark = Poco::DateTimeFormatter::format(Poco::Timestamp(static_cast<Poco::Timestamp::TimeVal>(s_clock() * 1000)), _logTimeFormat);
253 
254  //Backup log message
255  std::string message = log.str();
256 
257  //Reset log accumulator
258  log.str(std::string());
259  log.clear();
260 
261  //Create log context
262  if(_logMask[0]){
263  //Out time diff
264  log << std::setfill('0') << std::setw(8) << ((unsigned int)(s_clock() - getStartedAt())) << " ";
265  }
266  //Out three more log components
267  log << ((_logMask[1] ? timeMark : "") + (_logMask[2] ? "[" + getName() + "] " : "") + (_logMask[3] ? message : "")) << std::endl;
268 
269  //Out final log context
270  std::cout << log.str();
271  std::cout.flush();
272  }
273 
274  //Reset log accumulator
275  log.str(std::string());
276  log.clear();
277  }
278 
280  void initHandler(const std::string& iniFile, LoggerStream& logger){
281  if (!iniFile.empty()){
282  logger.log(NODE_LOG_MODE_INFORMATION) << "load node settings from '" << iniFile << "' file" << flush;
283 
284  setIniFile(iniFile);
285 
286  std::string dumpDir = HCE::handlers::DUMP_DIR;
288  try
289  {
290  Poco::AutoPtr<Poco::Util::IniFileConfiguration> pConf(new Poco::Util::IniFileConfiguration(iniFile));
291  try
292  {
296  dumpDir = pConf->getString(HCE::handlers::properties::DUMP_DIR);
297  }
298  catch(Poco::SyntaxException& e)
299  {
301  logger.log(NODE_LOG_MODE_ERROR) << "nodeInit() - data node options ini file processing error: '" << e.displayText() << "'" << flush;
302  }
303  }
304  catch(Poco::Exception& e)
305  {
307  logger.log(NODE_LOG_MODE_ERROR) << "nodeInit() - data node options ini file access error: '" << e.displayText() << "'" << flush;
308  }
312 
315 
318 
319  _dumpDir = dumpDir;
320 
325  }
326  }
327 
328  void handleAdminMessage(void){
330  zmsg msg(*_inprocAdminSock);
331 
332  logger.log(NODE_LOG_MODE_TRACE) << " MSG FROM inproc_admin\n" << msg.dump() << flush;
333 
335  processAdminMessage(msg);
336 
338  msg.send(*_inprocAdminSock);
339  }
340 
343  std::string messageId((const char*)msg.pop_front().c_str());
345  std::string messageCommand((const char*)msg.pop_front().c_str());
347  std::string messageParams((const char*)msg.pop_front().c_str());
349  std::string messageSrcIdentity((const char*)msg.pop_front().c_str());
350 
351  logger.log(NODE_LOG_MODE_DEBUG) << "RECEIVED admin message from inproc_admin id [" << messageId << "], body [" << messageCommand << "], params ["
352  << messageParams << "], source identity [" << messageSrcIdentity << "]" << flush;
353 
355  std::string response = NODE_ADMIN_ERROR_OK + NODE_ADMIN_COMMAND_DELIMITER;
356 
357  if(messageCommand == NODE_MSG_ECHO){
358  response += NODE_MSG_ECHO + NODE_ADMIN_COMMAND_DELIMITER + getName();
359  }else if(messageCommand == NODE_MSG_LOG_LEVEL_GET){
360  response += NODE_ADMIN_PROPERTY_NAME_LOG + "=" + std::to_string(getLogPriority()) + NODE_ADMIN_COMMAND_DELIMITER + getName();
361  }else if(messageCommand == NODE_MSG_LOG_LEVEL_SET){
363  unsigned int logp = getLogPriority();
364 
365  std::istringstream(messageParams) >> logp;
366  response += NODE_ADMIN_PROPERTY_NAME_LOG + "=" + std::to_string(getLogPriority()) + NODE_ADMIN_COMMAND_DELIMITER + getName();
367  setLogPriority(logp);
368  Poco::Logger::root().setLevel(getLogPriority());
369  }else if(messageCommand == NODE_MSG_POLL_TIMEOUT_GET){
370  response += NODE_ADMIN_PROPERTY_POLL_TIMEOUT + "=" + std::to_string(getPollTimeout()/1000);
371  }else if(messageCommand == NODE_MSG_POLL_TIMEOUT_SET){
373  unsigned int pollTimeout = getPollTimeout();
374  //Parse log priority from parameters
375  std::istringstream(messageParams) >> pollTimeout;
376  //Make response
377  response += NODE_ADMIN_PROPERTY_POLL_TIMEOUT + "=" + std::to_string(getPollTimeout()/1000);
378  //Set new poll timeout value
379  setPollTimeout(pollTimeout*1000);
380  }else if(messageCommand == NODE_MSG_PROPERTY_INTERVAL_GET){
381  response += NODE_ADMIN_PROPERTY_PROPERTY_INTERVAL + "=" + std::to_string(getPropertyInterval()/1000);
382  }else if(messageCommand == NODE_MSG_PROPERTY_INTERVAL_SET){
384  unsigned int propertyInterval = getPropertyInterval();
385  //Parse property interval from parameters
386  std::istringstream(messageParams) >> propertyInterval;
387  //Make response
388  response += NODE_ADMIN_PROPERTY_PROPERTY_INTERVAL + "=" + std::to_string(getPropertyInterval()/1000);
389  //Set new property interval value
390  setPropertyInterval(propertyInterval*1000);
391  }else if(messageCommand == NODE_MSG_DUMP_INTERVAL_GET){
392  response += NODE_ADMIN_PROPERTY_DUMP_INTERVAL + "=" + std::to_string(getDumpInterval()/1000);
393  }else if(messageCommand == NODE_MSG_DUMP_INTERVAL_SET){
395  unsigned int dumpInterval = getDumpInterval();
396  //Parse interval value from parameters
397  std::istringstream(messageParams) >> dumpInterval;
398  //Make response
399  response += NODE_ADMIN_PROPERTY_DUMP_INTERVAL + "=" + std::to_string(getDumpInterval()/1000);
400  //Set new dump interval value
401  setDumpInterval(dumpInterval*1000);
402  }else if(messageCommand == NODE_MSG_STOP){
404  setInProgress(false);
405  }else{
407  response = processAdminCommands(messageCommand, messageParams);
408  }
409 
411  msg.append(messageId.c_str());
413  msg.append(response.c_str());
415  msg.append(messageSrcIdentity.c_str());
416  }
417 
418  virtual std::string processAdminCommands(std::string& command, std::string& parameters){
419  std::string ret("DEFAULT stub for command processor!");
420 
421  return ret;
422  }
423 
425  if(_inprocAdminSock){
428  }
429  delete _inprocAdminSock;
430  _inprocAdminSock = NULL;
431  }
432 
434  _inprocAdminSock = new zmq::socket_t(_context, ZMQ_DEALER);
435  if(_inprocAdminSock){
437  _inprocAdminSock->setsockopt(ZMQ_IDENTITY, getName().c_str(), getName().size());
440  logger.log(NODE_LOG_MODE_DEBUG) << "Connected inproc_admin as [" << getName() << "]" << flush;
442  logger.log(NODE_LOG_MODE_DEBUG) << "Sending ready message to admin..." << flush;
444  logger.log(NODE_LOG_MODE_DEBUG) << "Sent ready message to admin" << flush;
445  }else{
446  logger.log(NODE_LOG_MODE_FATAL) << "Admin client socket instantiate : " << NODE_CRITICAL_ERROR_MESSAGE << flush;
447  ::abort();
448  }
449  }
450 
451  bool rebuildServerConnection(zmq::socket_t*& sock_p, const std::string& connectionString, const int type){
452  bool bound = false;
453 
455  if(sock_p != NULL){
456  //TODO: experimental
457  if(sock_p->connected()){
458  sock_p->close();
459  s_sleep(100);
460  }
461  delete sock_p;
462  sock_p = NULL;
463  }
464 
466  sock_p = new zmq::socket_t(_context, type);
467  if(sock_p != NULL){
469  unsigned int counter = 0;
470  while(counter < getBindTriesMaxNumber()){
471  try{
472  sock_p->bind(connectionString.c_str());
473  bound = true;
474  break;
475  } catch(zmq::error_t &e){
476  logger.log(NODE_LOG_MODE_ERROR) << "Bind exeption : " << e.what() << flush;
477  counter++;
478  s_sleep(getBindTriesDelay());
479  }
480  }
481  }else{
482  logger.log(NODE_LOG_MODE_FATAL) << "Server connection socket type " << type << " instantiate : " << NODE_CRITICAL_ERROR_MESSAGE << flush;
484  }
485 
486  logger.log(NODE_LOG_MODE_DEBUG) << "Bind to [" << connectionString << "] ";
487  if(bound){
488  logger.log(NODE_LOG_MODE_DEBUG) << "success";
489  }else{
490  logger.log(NODE_LOG_MODE_DEBUG) << "error";
491  }
493 
494  return bound;
495  }
496 
497  void rebuildClientConnection(zmq::socket_t*& sock_p, const std::string& connectionString, const int type, const std::string& identity=""){
498  rebuildClientConnection(sock_p, connectionString, type, identity, -1);
499  }
500 
501  bool rebuildClientConnection(zmq::socket_t*& sock_p, const std::string& connectionString, const int type, const std::string& identity, int linger, const std::string& readyMessage =
502  "", const std::string& byeMessage = NODE_MSG_BYE){
503  bool connected = false;
504 
506  if(sock_p != NULL){
507  if(sock_p->connected()){
508  if(byeMessage != ""){
510  sendByeMessage(sock_p, identity, byeMessage);
511  }
512 
513  logger.log(NODE_LOG_MODE_DEBUG) << "rebuildServerConnection()::close();" << flush;
514  sock_p->close();
515  }
516  delete sock_p;
517  sock_p = NULL;
518  }
519 
521  sock_p = new zmq::socket_t(_context, type);
522  if(sock_p != NULL){
523  if(identity != ""){
525  sock_p->setsockopt(ZMQ_IDENTITY, identity.c_str(), identity.size());
526  }
527 
529  unsigned int counter = 0;
530  while(counter < getBindTriesMaxNumber()){
531  try{
532  sock_p->connect(connectionString.c_str());
533  connected = true;
534  break;
535  } catch(zmq::error_t &e){
536  logger.log(NODE_LOG_MODE_ERROR) << "Connect [" << connectionString << "] warning : " << e.what() << flush;
537  counter++;
538  s_sleep(getBindTriesDelay());
539  }
540  }
541  }else{
542  logger.log(NODE_LOG_MODE_FATAL) << "Client connection socket type " << type << " instantiate : " << NODE_CRITICAL_ERROR_MESSAGE << flush;
544  }
545 
546  logger.log(NODE_LOG_MODE_INFORMATION) << "Connect to [" << connectionString << "] ";
547  if(connected){
548  logger.log(NODE_LOG_MODE_INFORMATION) << "success" << flush;
549 
550  if(linger != -1){
552  sock_p->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
553  logger.log(NODE_LOG_MODE_DEBUG) << "Linger value " << linger << " for [" << connectionString << "] set" << flush;
554  }
555 
556  if(readyMessage != ""){
558  s_send(*sock_p, readyMessage);
559  logger.log(NODE_LOG_MODE_INFORMATION) << "READY message sent from client [" << identity << "] to [" << connectionString << "]" << flush;
560  }
561  }else{
562  logger.log(NODE_LOG_MODE_DEBUG) << "error" << flush;
563  }
564 
565  return connected;
566  }
567 
568  void sendByeMessage(zmq::socket_t*& sock_p, const std::string& identity="", const std::string& byeMessage = NODE_MSG_BYE){
570  s_send(*sock_p, byeMessage);
571  logger.log(NODE_LOG_MODE_INFORMATION) << "BYE message sent from client [" << ((identity.empty())?getName():identity) << "]" << flush;
572  }
573 
574  std::string getProperties(void) {
575  std::stringstream result;
576  result << "name=" << getName() << NODE_ADMIN_COMMAND_DELIMITER << "startedAt=" << getStartedAt() << NODE_ADMIN_COMMAND_DELIMITER << "logPriority=" << static_cast<int>(getLogPriority())
577  << NODE_ADMIN_COMMAND_DELIMITER << "connectionRebuildTriesMax=" << getBindTriesMaxNumber() << NODE_ADMIN_COMMAND_DELIMITER << "connectionRebuildTriesDelay="
579  return result.str();
580  }
581 
583  virtual std::string getPropertyInformation(void){
584  std::string ret("DEFAULT stub for property informaion!");
585  return ret;
586  }
587 
590  if(s_clock() > _propertyIntervalAt){
591 
593  std::string message = propertyMessage.build();
594  if (!propertyMessage.isError()){
595  s_send(*_inprocAdminSock, message);
596 
597  if (getName() == "DataServerProxy"){
599  logger.log(NODE_LOG_MODE_TRACE) << "Send 'Periodic Properties' from DataServerProxy to _inprocAdminSock" << flush;
600  }
601  }else{
602  logger.log(NODE_LOG_MODE_ERROR) << "Operation build property message has error: " << propertyMessage.getErrorMsg() << flush;
603  }
605  }
606  }
607 
609  void createDirectories(const std::string& name){
610  Poco::File dirPath(name);
611  try{
612  if (!dirPath.exists()){
613  dirPath.createDirectories();
614  }
615  }catch(Poco::Exception& e){
616  logger.log(NODE_LOG_MODE_DEBUG) << "Create directories '" << name << "' return: " << e.message() << flush;
617  }
618  }
619 
621  std::string getDumpFileName(void) {
623  std::ostringstream ostr;
624  ostr << _dumpDir << Poco::Path::separator() << getClientIdentity() << "_" << getName() << ".json";
625  return ostr.str();
626  }
627 
629  std::string getStatFileName(void) {
631  std::ostringstream ostr;
632  ostr << _dumpDir << Poco::Path::separator() << getClientIdentity() << "_" << getName() << ".stat_vars";
633  return ostr.str();
634  }
635 
637  bool dumpProperties(void){
640  logger.log(HCE::handlers::NODE_LOG_MODE_DEBUG) << "Save handler properties has error: " << handlerProperties.getErrorMsg() << flush;
641  }
644  logger.log(HCE::handlers::NODE_LOG_MODE_DEBUG) << "Save stat properties has error: " << statProperties.getErrorMsg() << flush;
645  }
647  }
648 
651  bool result = false;
652  if(s_clock() > _dumpIntervalAt && getDumpInterval() > 0){
653  result = dumpProperties();
654  _dumpIntervalAt = s_clock() + getDumpInterval();
655  }
656  return result;
657  }
658 
659  virtual int initialize(void) = 0;
660  virtual void deinitialize(void) = 0;
661  virtual void* process(void) = 0;
662 
663  void handleInternalMessage(void);
664  void handleExternalMessage(void);
665 
666  void processControlMessage(zmsg& msg);
667  void processDataMessage(zmsg& msg);
668 
669  void logPeriodicStatistics(void);
670  void heartbit(void);
671  void registerClient(const std::string& identity);
672 
675 
678 
681 
684 
685  //Log components mask
686  std::bitset<4> _logMask; // remove ??
687  //Log time format POCO specification
688  std::string _logTimeFormat; // remove ??
689  //Log stream buffer (use only in method logFlush)
690  std::stringstream log; // remove ??
691 
692  //Logger stream use now as log buffer
694 
696  bool _inProgress;
697 
699  int64_t _propertyIntervalAt;
700 
702  int64_t _dumpIntervalAt;
703 
705  std::string _dumpDir;
706  };
707 
708  }
709 }
710 
711 #endif /* __HCE_HANDLER_INCLUDED__ */