hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DataProcessorData.cpp
Go to the documentation of this file.
1 #include "DataProcessorData.hpp"
2 
3 namespace HCE {
4  namespace handlers {
5  DataProcessorData::DataProcessorData(const std::string& nodeName, std::string name, zmq::context_t& ctx, int64_t startedAt, unsigned char logPriority, const std::string& iniFile) :
6  Handler(name, ctx, nodeName, std::string(""), startedAt, logPriority, iniFile), _inprocDataSock(nullptr), _tasksStateNotificationSock(nullptr),
7  _sphinxFO(nullptr), _drceFO(nullptr), _tasksStateNotificationFunctor(nullptr) {
8 
9  setNodeName(nodeName);
10  setIniFile(iniFile);
12  initialize();
13  }
14 
16  deinitialize();
17  }
18 
20  logger.log(NODE_LOG_MODE_INFORMATION) << "Initialize..." << flush;
21 
23  _inprocDataSock = nullptr;
26 
29 
31  checkInit();
32 
35 
38 
41 
44 
47 
50 
53 
54  return true;
55  }
56 
58 
62 
63  if(_inprocDataSock){
66  }
67  delete _inprocDataSock;
68  _inprocDataSock = NULL;
69  }
70 
71  if(_sphinxFO){
72  delete _sphinxFO;
73  _sphinxFO = NULL;
74  }
75 
79  }
80 
84  }
85 
86  if(_drceFO){
87  delete _drceFO;
88  _drceFO = NULL;
89  }
90  }
91 
92  void DataProcessorData::setNodeName(const std::string& nodeName){
94  setClientIdentity(nodeName);
95  }
96 
97  std::string DataProcessorData::getNodeName(void) {
99  }
100 
101  void DataProcessorData::setStatClientResponses(unsigned long long statClientResponses){
102  statProperties.set(HCE::handlers::properties::STAT_CLIENT_RESPONSES, static_cast<Poco::UInt64>(statClientResponses));
103  }
104 
106  return static_cast<unsigned long long>(statProperties.get<Poco::UInt64>(HCE::handlers::properties::STAT_CLIENT_RESPONSES));
107  }
108 
109  void DataProcessorData::setStatSphinxRequestsSearch(unsigned long long statSphinxRequestsSearch){
110  statProperties.set(HCE::handlers::properties::STAT_SPHINX_REQUESTS_SEARCH, static_cast<Poco::UInt64>(statSphinxRequestsSearch));
111  }
112 
114  return static_cast<unsigned long long>(statProperties.get<Poco::UInt64>(HCE::handlers::properties::STAT_SPHINX_REQUESTS_SEARCH));
115  }
116 
117  void DataProcessorData::setStatSphinxRequestsAdmin(unsigned long long statSphinxRequestsAdmin){
118  statProperties.set(HCE::handlers::properties::STAT_SPHINX_REQUESTS_ADMIN, static_cast<Poco::UInt64>(statSphinxRequestsAdmin));
119  }
120 
122  return static_cast<unsigned long long>(statProperties.get<Poco::UInt64>(HCE::handlers::properties::STAT_SPHINX_REQUESTS_ADMIN));
123  }
124 
125  void DataProcessorData::setStatSphinxTimeTotalSearch(unsigned long long statSphinxTimeTotalSearch){
126  statProperties.set(HCE::handlers::properties::STAT_SPHINX_TIME_TOTAL_SEARCH, static_cast<Poco::UInt64>(statSphinxTimeTotalSearch));
127  }
128 
130  return static_cast<unsigned long long>(statProperties.get<Poco::UInt64>(HCE::handlers::properties::STAT_SPHINX_TIME_TOTAL_SEARCH));
131  }
132 
133  void DataProcessorData::setStatSphinxTimeTotalAdmin(unsigned long long statSphinxTimeTotalAdmin){
134  statProperties.set(HCE::handlers::properties::STAT_SPHINX_TIME_TOTAL_ADMIN, static_cast<Poco::UInt64>(statSphinxTimeTotalAdmin));
135  }
136 
138  return static_cast<unsigned long long>(statProperties.get<Poco::UInt64>(HCE::handlers::properties::STAT_SPHINX_TIME_TOTAL_ADMIN));
139  }
140 
141  void DataProcessorData::setStatDRCERequests(unsigned long long statDRCERequests){
142  statProperties.set(HCE::handlers::properties::STAT_DRCE_REQUESTS, static_cast<Poco::UInt64>(statDRCERequests));
143  }
144 
145  unsigned long long DataProcessorData::getStatDRCERequests(void){
146  return static_cast<unsigned long long>(statProperties.get<Poco::UInt64>(HCE::handlers::properties::STAT_DRCE_REQUESTS));
147  }
148 
149  void DataProcessorData::setStatDRCETimeTotal(unsigned long long statDRCETimeTotal){
150  statProperties.set(HCE::handlers::properties::STAT_DRCE_TIME_TOTAL, static_cast<Poco::UInt64>(statDRCETimeTotal));
151  }
152 
153  unsigned long long DataProcessorData::getStatDRCETimeTotal(void){
154  return static_cast<unsigned long long>(statProperties.get<Poco::UInt64>(HCE::handlers::properties::STAT_DRCE_TIME_TOTAL));
155  }
156 
157  void DataProcessorData::setDataProcessingMode(unsigned int dataProcessingMode){
159  }
160 
163  }
164 
165  void DataProcessorData::setDataProcessingFakeResults(unsigned int dataProcessingFakeResults){
167  }
168 
171  }
172 
173  void DataProcessorData::setDataProcessingFakeResultsMax(unsigned int dataProcessingFakeResultsMax){
175  }
176 
179  }
180 
181  void DataProcessorData::setNodeNumber(unsigned int NodeNumber){
183  }
184 
187  }
188 
189  void DataProcessorData::setStateNotificationHost(const std::string& stateNotificationHost){
191  }
192 
195  }
196 
197  void DataProcessorData::setStateNotificationPort(const std::string& stateNotificationPort){
199  }
200 
203  }
204 
209 
212 
215 
218 
221 
224 
227  }
228 
230  logger.log(NODE_LOG_MODE_INFORMATION) << "Dump node properties" << flush;
231 
243  }
244 
246  logger.log(NODE_LOG_MODE_INFORMATION) << "Dump Sphinx FO properties" << flush;
247  if (_sphinxFO){
257  }
258  }
259 
261  logger.log(NODE_LOG_MODE_INFORMATION) << "Dump DRCE FO properties" << flush;
262  if (_drceFO){
277 
280  for (HCE::drce::DRCEFunctionalObject::Dictionary::iterator iter=dictionary.begin();iter!=dictionary.end();++iter)
281  {
282  statProperties.set((*iter).first, (*iter).second);
283  }
284  }
285  }
286 
288  logger.log(NODE_LOG_MODE_INFORMATION) << "Restore node properties" << flush;
289 
292  }
293 
296  }
297 
300  }
301 
304  }
305 
308  }
309 
312  }
313 
316  }
317 
320  }
321 
324  }
325 
328  }
329 
332  }
333  }
334 
336  logger.log(NODE_LOG_MODE_INFORMATION) << "Restore Sphinx FO properties" << flush;
337  if (_sphinxFO){
338 
341  }
342 
345  }
346 
349  }
350 
353  }
354 
357  }
358 
361  }
362 
365  }
366 
369  }
370 
373  }
374  }
375  }
376 
378  logger.log(NODE_LOG_MODE_INFORMATION) << "Restore DRCE FO properties" << flush;
379  if (_drceFO){
380 
383  }
384 
387  }
388 
391  }
392 
395  }
396 
399  }
400 
403  }
404 
407  }
408 
411  }
412 
415  }
416 
419  }
420 
423  }
424 
427  }
428 
431  }
432 
435  }
436 
438  }
439  }
440 
442  bool ret = true;
443 
444  if(getStateNotificationHost() != "" && getStateNotificationPort() != ""){
445  //Init the tasks state notification update connection
447  std::string(""), std::string(""));
448  }else{
449  ret = false;
450  }
451 
452  return ret;
453  }
454 
455  void DataProcessorData::dataStorageInitSphinx(const std::string& iniFile){
457  std::string sphinx_home_dir = "";
458  std::string sphinx_default_index;
459  bool sphinx_start_searchd = NODE_FUNCTIONAL_SPHINX_NOT_START_SEARCHD;
460  bool sphinx_stop_searchd = NODE_FUNCTIONAL_SPHINX_NOT_STOP_SEARCHD;
461  unsigned int sphinx_ranker = NODE_FUNCTIONAL_SPHINX_RANKER_0;
462  std::string sphinx_ranker_expression = "";
463 
464  logger.log(NODE_LOG_MODE_INFORMATION) << "load Sphinx FO settings from " << iniFile << " file" << flush;
465 
467  try{
468  Poco::AutoPtr<Poco::Util::IniFileConfiguration> pConf(new Poco::Util::IniFileConfiguration(iniFile));
469  try{
470 
472  sphinx_home_dir = pConf->getString(HCE::handlers::properties::SPHINX_HOME_DIR);
474  }else{
475  sphinx_home_dir = handlerProperties.get<std::string>(HCE::handlers::properties::SPHINX_HOME_DIR);
476  }
477 
479  sphinx_default_index = pConf->getString(HCE::handlers::properties::SPHINX_DEFAULT_INDEX);
481  }else{
482  sphinx_default_index = handlerProperties.get<std::string>(HCE::handlers::properties::SPHINX_DEFAULT_INDEX);
483  }
484 
486  sphinx_start_searchd = pConf->getBool(HCE::handlers::properties::SPHINX_START_SEARCHD);
488  }else{
490  }
491 
493  sphinx_stop_searchd = pConf->getBool(HCE::handlers::properties::SPHINX_STOP_SEARCHD);
495  }else{
497  }
498 
500  sphinx_ranker = pConf->getInt(HCE::handlers::properties::SPHINX_RANKER);
502  }else{
503  sphinx_ranker = handlerProperties.get<unsigned int>(HCE::handlers::properties::SPHINX_RANKER);
504  }
505 
507  sphinx_ranker_expression = pConf->getString(HCE::handlers::properties::SPHINX_RANKER_EXPRESSION);
509  }else{
510  sphinx_ranker_expression = handlerProperties.get<std::string>(HCE::handlers::properties::SPHINX_RANKER_EXPRESSION);
511  }
512 
513  } catch(Poco::SyntaxException& e){
515  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_SPHINX << " data node ini file processing error: " << e.displayText() << flush;
516  }
517  } catch(Poco::Exception& e){
519  logger.log(NODE_LOG_MODE_ERROR) << " data node ini file access error: " << e.displayText() << flush;
520  }
521 
522  //If Sphinx home dir not set, define it as current user home dir
523  if(sphinx_home_dir == ""){
525  }else{
526  if(sphinx_home_dir[0] != NODE_INI_FILE_PATH_START_CHAR){
527  sphinx_home_dir = getUserHomeDir() + NODE_INI_FILE_PATH_DELIMITER_CHAR + sphinx_home_dir;
528  }
529  }
530 
531  logger.log(NODE_LOG_MODE_INFORMATION) << "Sphinx FO initialization:" << flush;
532  logger.log(NODE_LOG_MODE_INFORMATION) << "nodeName=" << getNodeName() << flush;
533  logger.log(NODE_LOG_MODE_INFORMATION) << "home_dir=" << sphinx_home_dir << flush;
534  logger.log(NODE_LOG_MODE_INFORMATION) << "default_index=" << sphinx_default_index << flush;
535  logger.log(NODE_LOG_MODE_INFORMATION) << "start_searchd=" << sphinx_start_searchd << flush;
536  logger.log(NODE_LOG_MODE_INFORMATION) << "stop_searchd=" << sphinx_stop_searchd << flush;
537  logger.log(NODE_LOG_MODE_INFORMATION) << "getNodeNumber()=" << getNodeNumber() << flush;
538  logger.log(NODE_LOG_MODE_INFORMATION) << "ranker=" << sphinx_ranker << flush;
539  logger.log(NODE_LOG_MODE_INFORMATION) << "ranker_expression=" << sphinx_ranker_expression << flush;
540 
542  _sphinxFO = new HCE::sphinx::SphinxFunctionalObject(getNodeName(), getNodeNumber(), sphinx_home_dir, sphinx_default_index, sphinx_start_searchd, sphinx_stop_searchd);
543  if(_sphinxFO){
544  if(_sphinxFO->getErrorCode()){
546  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_SPHINX << " Sphinx FO initialization error " << _sphinxFO->getErrorCode() << " : " << _sphinxFO->getErrorMsg()
548  }
549  }else{
552  }
553 
556 
558  _sphinxFO->setRankingMode(sphinx_ranker, sphinx_ranker_expression);
559 
560  logger.log(NODE_LOG_MODE_DEBUG) << "SPHO_INSTANTIATE [" << _sphinxFO->logMsg() << "]" << flush;
561  }
562 
563  void DataProcessorData::dataStorageInitDRCE(const std::string& iniFile){
565  std::string drce_home_dir = "";
566  std::string drce_node_host = "";
567  std::string drce_node_port = "";
568  std::string drce_tasks_data_dir = "";
569  std::string drce_tasks_status_dir = "";
570  unsigned int drce_max_threads = 32;
571  unsigned int drce_tasks_queue_dump_period = 10000; // time interval period in msec.
572  unsigned int drce_resources_monitor_period = 10000; // time interval period in msec.
573  bool drce_state_notification_reset_error_state = false;
574  std::string drce_env_vars = "";
575  size_t drce_resources_extractor_max_threads = 1024;
576  size_t drce_resources_extractor_max_processes = 1024;
577  size_t drce_resources_extractor_max_disk_size = 0;
578  size_t drce_resources_extractor_max_vram_size = 0;
579  size_t drce_resources_extractor_max_rram_size = 0;
580 
581  logger.log(NODE_LOG_MODE_INFORMATION) << "load DRCE FO settings from " << iniFile << " file" << flush;
582 
584  try{
585  Poco::AutoPtr<Poco::Util::IniFileConfiguration> pConf(new Poco::Util::IniFileConfiguration(iniFile));
586  try{
587 
589  drce_home_dir = pConf->getString(HCE::handlers::properties::DRCE_HOME_DIR);
590  }else{
591  drce_home_dir = handlerProperties.get<std::string>(HCE::handlers::properties::DRCE_HOME_DIR);
592  }
593 
595  drce_tasks_data_dir = pConf->getString(HCE::handlers::properties::DRCE_TASKS_DATA_DIR);
597  }else{
598  drce_tasks_data_dir = handlerProperties.get<std::string>(HCE::handlers::properties::DRCE_TASKS_DATA_DIR);
599  }
600 
602  drce_tasks_status_dir = pConf->getString(HCE::handlers::properties::DRCE_TASKS_STATUS_DIR);
604  }else{
605  drce_tasks_status_dir = handlerProperties.get<std::string>(HCE::handlers::properties::DRCE_TASKS_STATUS_DIR);
606  }
607 
609  drce_node_host = pConf->getString(HCE::handlers::properties::DRCE_NODE_HOST);
611  }else{
612  drce_node_host = handlerProperties.get<std::string>(HCE::handlers::properties::DRCE_NODE_HOST);
613  }
614 
616  drce_node_port = pConf->getString(HCE::handlers::properties::DRCE_NODE_PORT);
618  }else{
619  drce_node_port = handlerProperties.get<std::string>(HCE::handlers::properties::DRCE_NODE_PORT);
620  }
621 
623  drce_max_threads = pConf->getInt(HCE::handlers::properties::DRCE_MAX_THREADS);
625  }else{
626  drce_max_threads = handlerProperties.get<unsigned int>(HCE::handlers::properties::DRCE_MAX_THREADS);
627  }
628 
630  drce_tasks_queue_dump_period = pConf->getInt(HCE::handlers::properties::DRCE_TASKS_QUEUE_DUMP_PERIOD);
632  }else{
633  drce_tasks_queue_dump_period = handlerProperties.get<unsigned int>(HCE::handlers::properties::DRCE_TASKS_QUEUE_DUMP_PERIOD);
634  }
635 
637  drce_resources_monitor_period = pConf->getInt(HCE::handlers::properties::DRCE_RESOURCES_MONITOR_PERIOD);
639  }else{
640  drce_resources_monitor_period = handlerProperties.get<unsigned int>(HCE::handlers::properties::DRCE_RESOURCES_MONITOR_PERIOD);
641  }
642 
645  }else{
647  }
648 
651  }else{
653  }
654 
656  drce_state_notification_reset_error_state = pConf->getBool(HCE::handlers::properties::DRCE_STATE_NOTIFICATION_RESET_ERROR_CODE);
658  }else{
659  drce_state_notification_reset_error_state = handlerProperties.get<bool>(HCE::handlers::properties::DRCE_STATE_NOTIFICATION_RESET_ERROR_CODE);
660  }
661 
663  drce_env_vars = pConf->getString(HCE::handlers::properties::DRCE_ENV_VARS);
665  }else{
666  drce_env_vars = handlerProperties.get<std::string>(HCE::handlers::properties::DRCE_ENV_VARS);
667  }
668 
670  drce_resources_extractor_max_threads = pConf->getInt(HCE::handlers::properties::DRCE_RESOURCES_EXTRACTOR_MAX_THREADS);
672  }else{
673  drce_resources_extractor_max_threads = handlerProperties.get<size_t>(HCE::handlers::properties::DRCE_RESOURCES_EXTRACTOR_MAX_THREADS);
674  }
675 
677  drce_resources_extractor_max_processes = pConf->getInt(HCE::handlers::properties::DRCE_RESOURCES_EXTRACTOR_MAX_PROCESSES);
679  }else{
680  drce_resources_extractor_max_processes = handlerProperties.get<size_t>(HCE::handlers::properties::DRCE_RESOURCES_EXTRACTOR_MAX_PROCESSES);
681  }
682 
684  drce_resources_extractor_max_disk_size = pConf->getInt(HCE::handlers::properties::DRCE_RESOURCES_EXTRACTOR_MAX_DISK_SIZE);
686  }else{
687  drce_resources_extractor_max_disk_size = handlerProperties.get<size_t>(HCE::handlers::properties::DRCE_RESOURCES_EXTRACTOR_MAX_DISK_SIZE);
688  }
689 
691  drce_resources_extractor_max_vram_size = pConf->getInt(HCE::handlers::properties::DRCE_RESOURCES_EXTRACTOR_MAX_VRAM_SIZE);
693  }else{
694  drce_resources_extractor_max_vram_size = handlerProperties.get<size_t>(HCE::handlers::properties::DRCE_RESOURCES_EXTRACTOR_MAX_VRAM_SIZE);
695  }
696 
698  drce_resources_extractor_max_rram_size = pConf->getInt(HCE::handlers::properties::DRCE_RESOURCES_EXTRACTOR_MAX_RRAM_SIZE);
700  }else{
701  drce_resources_extractor_max_rram_size = handlerProperties.get<size_t>(HCE::handlers::properties::DRCE_RESOURCES_EXTRACTOR_MAX_RRAM_SIZE);
702  }
703 
704  } catch(Poco::SyntaxException& e){
706  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_DRCE << " data node ini file processing error: " << e.displayText() << flush;
707  }
708  } catch(Poco::Exception& e){
710  logger.log(NODE_LOG_MODE_ERROR) << " data node ini file access error: " << e.displayText() << flush;
711  }
712 
713  //If DRCE home dir not set, define it as current user home dir
714  if(drce_home_dir == ""){
716  }else{
717  //If first path character is not slash, this means that the path specified is relative to the home dir
718  if(drce_home_dir[0] != NODE_INI_FILE_PATH_START_CHAR){
719  drce_home_dir = getUserHomeDir() + NODE_INI_FILE_PATH_DELIMITER_CHAR + drce_home_dir;
720  }
721  }
722 
723  //If drce_tasks_data_dir not set
724  if(drce_tasks_data_dir == ""){
725  drce_tasks_data_dir = drce_home_dir + NODE_INI_FILE_PATH_DELIMITER_CHAR + NODE_FUNCTIONAL_DRCE_TASKS_DATA_DIR;
726  }else{
727  //If first path character is not slash, this means that the path specified is relative to the home dir
728  if(drce_tasks_data_dir[0] != NODE_INI_FILE_PATH_START_CHAR){
729  drce_tasks_data_dir = drce_home_dir + NODE_INI_FILE_PATH_DELIMITER_CHAR + NODE_FUNCTIONAL_DRCE_TASKS_DATA_DIR;
730  }
731  }
732 
733  //If drce_tasks_status_dir not set
734  if(drce_tasks_status_dir == ""){
735  drce_tasks_status_dir = drce_home_dir + NODE_INI_FILE_PATH_DELIMITER_CHAR + NODE_FUNCTIONAL_DRCE_TASKS_STATUS_DIR;
736  }else{
737  //If first path character is not slash, this means that the path specified is relative to the home dir
738  if(drce_tasks_status_dir[0] != NODE_INI_FILE_PATH_START_CHAR){
739  drce_tasks_status_dir = drce_home_dir + NODE_INI_FILE_PATH_DELIMITER_CHAR + NODE_FUNCTIONAL_DRCE_TASKS_STATUS_DIR;
740  }
741  }
742 
743  logger.log(NODE_LOG_MODE_INFORMATION) << "DRCE FO initialization:" << flush;
744  logger.log(NODE_LOG_MODE_INFORMATION) << "nodeName=" << getNodeName() << flush;
745  logger.log(NODE_LOG_MODE_INFORMATION) << "home_dir=" << drce_home_dir << flush;
746  logger.log(NODE_LOG_MODE_INFORMATION) << "drce_tasks_data_dir=" << drce_tasks_data_dir << flush;
747  logger.log(NODE_LOG_MODE_INFORMATION) << "drce_tasks_status_dir=" << drce_tasks_status_dir << flush;
748  logger.log(NODE_LOG_MODE_INFORMATION) << "drce_node_host=" << drce_node_host << flush;
749  logger.log(NODE_LOG_MODE_INFORMATION) << "drce_node_port=" << drce_node_port << flush;
750  logger.log(NODE_LOG_MODE_INFORMATION) << "drce_max_threads=" << drce_max_threads << flush;
751  logger.log(NODE_LOG_MODE_INFORMATION) << "drce_tasks_queue_dump_period=" << drce_tasks_queue_dump_period << flush;
752  logger.log(NODE_LOG_MODE_INFORMATION) << "drce_resources_monitor_period=" << drce_resources_monitor_period << flush;
753  logger.log(NODE_LOG_MODE_INFORMATION) << "state_notification_host=" << getStateNotificationHost() << flush;
754  logger.log(NODE_LOG_MODE_INFORMATION) << "state_notification_port=" << getStateNotificationPort() << flush;
755 
757 
759  _drceFO = new HCE::drce::DRCEFunctionalObject(getNodeName(), drce_home_dir, drce_tasks_data_dir, drce_tasks_status_dir, drce_node_host, drce_node_port);
760 
761  if(_drceFO){
762  if(_drceFO->getErrorCode()){
764  logger.log(NODE_LOG_MODE_FATAL) << NODE_FUNCTIONAL_ERROR_DRCE << " DRCE FO initialization error " << _drceFO->getErrorCode() << " : " << _drceFO->getErrorMsg()
767  }
768  }else{
771  }
772 
774  _drceFO->setMaxThreadCount(drce_max_threads);
776  _drceFO->setTasksQueueDumpPeriod(drce_tasks_queue_dump_period);
778  _drceFO->setResourceMonitorTimePeriod(drce_resources_monitor_period);
780  _drceFO->setEnvironments(drce_env_vars);
782  _drceFO->setResourceExtractorMaxThreadsCount(drce_resources_extractor_max_threads);
784  _drceFO->setResourceExtractorMaxProcessesCount(drce_resources_extractor_max_processes);
786  if (drce_resources_extractor_max_disk_size)
787  _drceFO->setResourceExtractorMaxDiskSize(drce_resources_extractor_max_disk_size);
789  if (drce_resources_extractor_max_vram_size)
790  _drceFO->setResourceExtractorMaxVramSize(drce_resources_extractor_max_vram_size);
792  if (drce_resources_extractor_max_rram_size)
793  _drceFO->setResourceExtractorMaxRramSize(drce_resources_extractor_max_rram_size);
794 
797  logger.log(NODE_LOG_MODE_DEBUG) << "Tasks state notification connection created!" << flush;
803  _drceFO->setResetErrorCodeStateNotification(drce_state_notification_reset_error_state);
804  logger.log(NODE_LOG_MODE_DEBUG) << "Tasks state notification functor is set!" << flush;
805  }
806 
807  logger.log(NODE_LOG_MODE_DEBUG) << "DRCE_FO_INSTANTIATE [" << _drceFO->logMsg() << "]" << flush;
808  }
809 
811  std::string homeDir;
812 
813  struct passwd *pw = getpwuid(getuid());
814  const char *homedir = pw->pw_dir;
815  if(homedir){
816  homeDir = homedir;
817  }
818 
819  return homeDir;
820  }
821 
822  void DataProcessorData::nodeInit(const std::string& iniFile){
824  unsigned int processingMode = NODE_DATA_PROCESSING_MODE_NORMAL;
825 
826  logger.log(NODE_LOG_MODE_INFORMATION) << "load node settings from " << iniFile << " file" << flush;
827 
829  try{
830  Poco::AutoPtr<Poco::Util::IniFileConfiguration> pConf(new Poco::Util::IniFileConfiguration(iniFile));
831  try{
832  //Set node number from ini
835 
836  //Set data processing mode from ini
838  processingMode = pConf->getInt(HCE::handlers::properties::DATA_PROCESSING_MODE);
839 
840  setDataProcessingMode(processingMode);
841 
842  if(processingMode > 0){
845  }
848  }
849  }
850  } catch(Poco::SyntaxException& e){
852  logger.log(NODE_LOG_MODE_ERROR) << "nodeInit() - data node ini file processing error: " << e.displayText() << flush;
853  }
854  } catch(Poco::Exception& e){
856  logger.log(NODE_LOG_MODE_ERROR) << "nodeInit() - data node ini file access error: " << e.displayText() << flush;
857  }
858 
860  setNodeNumber(0);
861 
864 
867  }
868 
871  }
872 
877  }
878 
879  std::string DataProcessorData::processDataRequest(const std::string& messageBody){
880  std::string ret;
881 
882  if(messageBody.size() > 0 && messageBody[0] == NODE_JSON_BEGIN_CHAR){
883  logger.log(NODE_LOG_MODE_TRACE) << "JSON processing..." << flush;
884 
886  ret = processDataRequestJson(messageBody);
887  }else{
888  logger.log(NODE_LOG_MODE_TRACE) << "PLAIN_TEXT processing..." << flush;
889 
891  ret = processDataRequestPlainText(messageBody);
892  }
893 
894  return ret;
895  }
896 
898  //Response json string
899  std::string ret;
900 
901  //Create default empty cover object for results
902  HCE::JsonMessageCover msgCoverOutEmpty;
903 
904  //Serialize default empty cover object
905  if(!msgCoverOutEmpty.serialize(ret)){
906  logger.log(NODE_LOG_MODE_DEBUG) << "Default empty cover object serialize error " << msgCoverOutEmpty.getErrorCode() << " : " << msgCoverOutEmpty.getErrorMsg() << flush;
907  }
908 
909  logger.log(NODE_LOG_MODE_DEBUG) << "Request json : " << messageBody << flush;
910 
911  //Unserialize request json
912  HCE::JsonMessageCover msgCoverIn(messageBody);
913 
914  if(msgCoverIn.getErrorCode()){
915  logger.log(NODE_LOG_MODE_ERROR) << "Data request cover json deserialization error : " << msgCoverIn.getErrorMsg() << flush;
916  }else{
917  logger.log(NODE_LOG_MODE_DEBUG) << "Json request object from cover: [" << msgCoverIn.getData() << "]" << flush;
918 
919  if(msgCoverIn.getType() == HCE::types::MessageType::mtSphinx){
920  //Process Sphinx request type
921  std::string sphinxResultDataJson = processDataRequestSphinx(msgCoverIn.getData());
922 
923  //Create cover object for results
924  HCE::JsonMessageCover msgCoverOut(msgCoverIn.getType(), sphinxResultDataJson, msgCoverIn.getTTL());
925 
926  //Serialize output cover object
927  if(!msgCoverOut.serialize(ret)){
929  logger.log(NODE_LOG_MODE_ERROR) << "Cover object for Sphinx request serialize error " << msgCoverOut.getErrorCode() << " : " << msgCoverOut.getErrorMsg() << flush;
930  }
931  }else{
932  if(msgCoverIn.getType() == HCE::types::MessageType::mtDrce){
933  //Process DRCE request type
934  std::string drceResultDataJson = processDataRequestDRCE(msgCoverIn.getData());
935 
936  //Create cover object for results
937  HCE::JsonMessageCover msgCoverOut(msgCoverIn.getType(), drceResultDataJson, msgCoverIn.getTTL());
938 
939  //Serialize output cover object
940  if(!msgCoverOut.serialize(ret)){
942  logger.log(NODE_LOG_MODE_ERROR) << "Cover object for DRCE request serialize error " << msgCoverOut.getErrorCode() << " : " << msgCoverOut.getErrorMsg() << flush;
943  }
944  }else{
945  logger.log(NODE_LOG_MODE_ERROR) << "Data request type error : type " << static_cast<int>(msgCoverIn.getType()) << " not supported" << flush;
946  }
947  }
948  }
949 
950  logger.log(NODE_LOG_MODE_DEBUG) << "Result json : " << ret << flush;
951 
952  return ret;
953  }
954 
956  //Create empty Sphinx result object to return it for farther processing by default
957  HCE::sphinx::SphinxDefaultJSON defaultJson;
958  std::string ret = defaultJson.getJSON();
959 
960  //Create message cover object and fill it with default results json to return it to upper level processing (reducing) in case of some error happened
961  HCE::JsonMessageCover msgCover(HCE::types::MessageType::mtSphinx, ret);
962  if(!msgCover.serialize(ret)){
964  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_SPHINX << " cover object from default results - serialization error "
965  << msgCover.getErrorCode() << " : " << msgCover.getErrorMsg() << flush;
966  }
967 
968  logger.log(NODE_LOG_MODE_DEBUG) << "Sphinx data handler request, json=[" << messageBody << "]" << flush;
969 
970  //If node data processing mode is normal - enter FO usage, else in simulation mode - return default results
973  HCE::sphinx::SphinxInputJsonMessage inputMsg(messageBody);
974  if(!inputMsg.isError()){
975  if(_sphinxFO->getErrorCode()){
977  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_SPHINX << " searcher state error " << _sphinxFO->getErrorCode() << " : " << _sphinxFO->getErrorMsg()
979 
982  }else{
984  HCE::sphinx::SphinxOutputJsonMessage outputMessage = _sphinxFO->Process(inputMsg);
985 
986  //TODO temporary for filters debug reason, need to be removed in release
988  //TODO
989 
990  if(_sphinxFO->getErrorCode()){
992  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_SPHINX << " search process error " << _sphinxFO->getErrorCode() << " : " << _sphinxFO->getErrorMsg()
996  }
997 
998  if(outputMessage.getErrorCode()){
1000  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_SPHINX << " output message error " << outputMessage.getErrorCode() << " : " << outputMessage.getErrorMessage() << flush;
1001  }
1002 
1003  logger.log(NODE_LOG_MODE_DEBUG) << "Sphinx time:" << outputMessage.getTime() << flush;
1004 
1006  ret = outputMessage.getData();
1007 
1010  }
1011  }else{
1013  logger.log(NODE_LOG_MODE_DEBUG) << NODE_FUNCTIONAL_ERROR_SPHINX << " deserialization error " << inputMsg.getErrorCode() << " : " << inputMsg.getErrorMsg() << flush;
1014  }
1015  }else{
1016  //Fake processing mode, random generator of results set
1017  //Create message cover object and fill it with default results json to return it to upper level processing (reducing) in case of some error happened
1019  //Return Sphinx results data json string of data from fake results generator
1020  ret = defaultJson.getJSON();
1021  }else{
1022  //TODO
1023  logger.log(NODE_LOG_MODE_ERROR) << "Error create fake results " << getDataProcessingFakeResults() << ", zero results returned for Sphinx" << flush;
1024  }
1025  }
1026 
1027  logger.log(NODE_LOG_MODE_DEBUG) << "Sphinx results json : " << ret << flush;
1028 
1029  //Register Sphinx requests number stat
1030  unsigned long long statSphinxRequestsSearch = getStatSphinxRequestsSearch();
1031  setStatSphinxRequestsSearch(++statSphinxRequestsSearch);
1032 
1033  return ret;
1034  }
1035 
1037  //Create empty DRCE result object to return it for farther processing by default
1038  HCE::drce::DRCEDefaultJSON defaultJson;
1039  std::string ret = defaultJson.getJSON();
1040 
1041  logger.log(NODE_LOG_MODE_DEBUG) << "DRCE data handler request, json=[" << messageBody << "]" << flush;
1042 
1043  //If node data processing mode is normal - enter FO usage, else in simulation mode - return default results
1046  HCE::drce::DRCEInputJsonMessage inputMsg(messageBody);
1047  if(!inputMsg.isError()){
1048  if(_drceFO->getErrorCode()){
1050  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_DRCE << " DRCE state error " << _drceFO->getErrorCode() << " : " << _drceFO->getErrorMsg()
1052 
1054  _drceFO->resetError();
1055  }else{
1057  HCE::drce::DRCEOutputJsonMessage outputMessage = _drceFO->Process(inputMsg);
1058 
1059  //TODO temporary for filters debug reason, need to be removed in release
1061 
1062  if(_drceFO->getErrorCode()){
1064  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_DRCE << " execution error " << _drceFO->getErrorCode() << " : " << _drceFO->getErrorMsg()
1066 
1068  _drceFO->resetError();
1069  }
1070 
1071  if(outputMessage.getErrorCode()){
1073  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_DRCE << " output message error " << outputMessage.getErrorCode() << " : " << outputMessage.getErrorMsg() << flush;
1074  }
1075  logger.log(NODE_LOG_MODE_DEBUG) << "DRCE time:" << _drceFO->getRequestTime() << flush;
1076 
1078  outputMessage.serialize(ret);
1079 
1082  }
1083  }else{
1085  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_DRCE << " deserialization error " << inputMsg.getErrorCode() << " : " << inputMsg.getErrorMsg() << flush;
1087  defaultJson.setJsonErrorCode(inputMsg.getErrorCode());
1088  defaultJson.setJsonErrorMessage(inputMsg.getErrorMsg());
1089  defaultJson.setJsonRequestType(inputMsg.getRequestType());
1090  //defaultJson.setJsonRequestId(inputMsg.getRequestId());
1091  defaultJson.setJsonNodeHost(_drceFO->getNodeHost());
1092  defaultJson.setJsonNodePort(_drceFO->getNodePort());
1093  defaultJson.setJsonNodeName(_drceFO->getNodeName());
1095  defaultJson.makeJSON();
1096  ret = defaultJson.getJSON();
1097  }
1098  }else{
1099  //Fake processing mode, random generator of results set
1100  //Create message cover object and fill it with default results json to return it to upper level processing (reducing) in case of some error happened
1101  if(defaultJson.makeJSON(getDataProcessingFakeResults())){
1102  //Return DRCE results data json string of data from fake results generator
1103  ret = defaultJson.getJSON();
1104  }else{
1105  logger.log(NODE_LOG_MODE_ERROR) << "Error create fake results " << getDataProcessingFakeResults() << ", zero results returned for DRCE" << flush;
1106  }
1107  }
1108 
1109  //TODO: test, need to be removed
1110  //ret = DRCE_FAKE_RESPONSE;
1111 
1112  logger.log(NODE_LOG_MODE_DEBUG) << "DRCE results json : " << ret << flush;
1113 
1115  unsigned long long statDRCERequests = getStatDRCERequests();
1116  setStatDRCERequests(++statDRCERequests);
1117 
1118  return ret;
1119  }
1120 
1123  unsigned int items_number;
1124  std::istringstream(messageBody) >> items_number;
1126  std::vector<unsigned long long> items(items_number);
1128  if(items_number > 0){
1129  for(unsigned long long i = 0; i < items.size(); ++i){
1130  items[i] = (rand() % RAND_MAX) + 1;
1131  }
1132  }else{
1133  items.resize(1);
1134  }
1135 
1136  logger.log(NODE_LOG_MODE_DEBUG) << "DATA items requested:" << items_number << ", items generated:" << items.size() << flush;
1137 
1139  sort(items.begin(), items.end(), std::less<unsigned long long>());
1140 
1141  logger.log(NODE_LOG_MODE_DEBUG) << "DATA sorted" << flush;
1143  std::ostringstream oss;
1144  std::copy(items.begin(), items.end() - 1, std::ostream_iterator<int>(oss, " "));
1145  oss << items.back();
1146  std::string body(oss.str());
1147 
1148  return body;
1149  }
1150 
1153  zmsg msg(*_inprocDataSock);
1154 
1155  logger.log(NODE_LOG_MODE_TRACE) << "MSG FROM inproc_data" << endl << msg.dump() << flush;
1157  std::string messageId((const char*)msg.pop_front().c_str());
1158  std::string messageBody((const char*)msg.pop_front().c_str());
1159  std::string messageRoute((msg.parts())?(const char*)msg.pop_front().c_str():"");
1160 
1161  unsigned long long statClientResponses = getStatClientResponses();
1162  setStatClientResponses(++statClientResponses);
1163 
1164  logger.log(NODE_LOG_MODE_DEBUG) << "DATA PROCESSOR RECEIVED message [" << messageId << "] body " << messageBody.size() << " bytes n=" << getStatClientResponses() << flush;
1165 
1167  std::string body = processDataRequest(messageBody);
1168 
1170  msg.append(messageId.c_str());
1171  msg.append(body.c_str());
1172  msg.append(messageRoute.c_str());
1173 
1174  logger.log(NODE_LOG_MODE_TRACE) << "DATA PROCESSOR RESPONSE message [" << messageId << "] body [" << body << "]" << flush;
1175 
1177  msg.send(*_inprocDataSock);
1178 
1179  logger.log(NODE_LOG_MODE_DEBUG) << "DATA PROCESSOR RESPONSE message [" << messageId << "] body " << body.size() << " bytes sent" << flush;
1180  }
1181 
1182  std::string DataProcessorData::processAdminCommands(std::string & command, std::string & parameters){
1183  std::string ret = NODE_ADMIN_ERROR_OK;
1184 
1185  logger.log(NODE_LOG_MODE_DEBUG) << "Admin command requested [" << command << "], parameters [" << parameters << "]" << flush;
1186 
1188  if(command == NODE_MSG_STAT){
1189  std::stringstream result;
1190  result << NODE_ADMIN_ERROR_OK << NODE_ADMIN_COMMAND_DELIMITER << "responses=" << getStatClientResponses() << NODE_ADMIN_COMMAND_DELIMITER << "avgSphinxSearchTime="
1193  << NODE_ADMIN_COMMAND_DELIMITER << "sphinxAdminRequests=" << getStatSphinxRequestsAdmin() << NODE_ADMIN_COMMAND_DELIMITER << "DRCERequests=" << getStatDRCERequests()
1194  << NODE_ADMIN_COMMAND_DELIMITER << "avgDRCETime=" << (getStatDRCETimeTotal() / (getStatDRCERequests() + 1));
1195  ret = result.str();
1196  }else if(command == NODE_MSG_PROPERTIES){
1198  ret = getPropertyInformation();
1199  }else if(command == NODE_MSG_SET_DATA_PROCESSING_MODE){
1200  std::stringstream result;
1201 
1203  unsigned int processingMode;
1204  std::istringstream(parameters) >> processingMode;
1205 
1207 
1208  //Change data processing mode
1209  setDataProcessingMode(processingMode);
1210 
1211  ret = result.str();
1212  }else if(command == NODE_MSG_SPHINX){
1214  ret = processAdminCommandsSphinx(parameters);
1215  }else if(command == NODE_MSG_DRCE){
1218  //ret = NODE_ADMIN_ERROR_OK + NODE_ADMIN_COMMAND_DELIMITER + DRCE_FAKE_RESPONSE;
1219  }else if((command == NODE_MSG_DRCE_SET_HOST) || (command == NODE_MSG_DRCE_GET_HOST)){
1222  if(command == NODE_MSG_DRCE_SET_HOST){
1223  //Change DRCE FO host
1224  _drceFO->setNodeHost(parameters);
1225  }
1226  }else if((command == NODE_MSG_DRCE_SET_PORT) || (command == NODE_MSG_DRCE_GET_PORT)){
1229  if(command == NODE_MSG_DRCE_SET_PORT){
1230  //Change DRCE FO port
1231  _drceFO->setNodePort(parameters);
1232  }
1233  }else if(command == NODE_MSG_DRCE_GET_TASKS){
1234  ret = _drceFO->getCurrentTasksQueue();
1235  }else if(command == NODE_MSG_DRCE_GET_TASKS_INFO){
1236  ret = _drceFO->getListInfoAllTasks();
1237  }else if(command == NODE_MSG_RESOURCE_USAGE){
1238  ret = _drceFO->getHostResourceUsage();
1239  }else{
1242  }
1243 
1244  return ret;
1245  }
1246 
1248  std::stringstream result;
1249  result << NODE_ADMIN_ERROR_OK
1251  << NODE_ADMIN_COMMAND_DELIMITER << "iniFile=" << getIniFile()
1252  << NODE_ADMIN_COMMAND_DELIMITER << "nodeName=" << getNodeName()
1253  << NODE_ADMIN_COMMAND_DELIMITER << "dataProcessingMode=" << getDataProcessingMode()
1254  << NODE_ADMIN_COMMAND_DELIMITER << "avgSphinxSearchTime=" << (getStatSphinxTimeTotalSearch() / (getStatSphinxRequestsSearch() + 1))
1255  << NODE_ADMIN_COMMAND_DELIMITER << "avgSphinxAdminTime=" << (getStatSphinxTimeTotalAdmin() / (getStatSphinxRequestsAdmin() + 1))
1256  << NODE_ADMIN_COMMAND_DELIMITER << "sphinxSearchRequests=" << getStatSphinxRequestsSearch()
1257  << NODE_ADMIN_COMMAND_DELIMITER << "sphinxAdminRequests=" << getStatSphinxRequestsAdmin()
1258  << NODE_ADMIN_COMMAND_DELIMITER << "sphinxDataStorageProperties=" << getDataStorageSphinxProperties()
1259  << NODE_ADMIN_COMMAND_DELIMITER << "drceDataStorageProperties=" << getDataStorageDRCEProperties()
1260  << NODE_ADMIN_COMMAND_DELIMITER << "DRCERequests=" << getStatDRCERequests()
1261  << NODE_ADMIN_COMMAND_DELIMITER << "avgDRCETime=" << (getStatDRCETimeTotal() / (getStatDRCERequests() + 1))
1262  << NODE_ADMIN_COMMAND_DELIMITER << "nodeNumber=" << getNodeNumber();
1263  return result.str();
1264  }
1265 
1267  std::stringstream ret;
1282 
1283  return ret.str();
1284  }
1285 
1287  std::stringstream ret;
1300 
1304 
1308 
1312 
1316 
1320 
1324 
1328 
1332 
1336 
1340 
1344  << "countAsyncTasksFail" << NODE_ADMIN_COMMAND_PROPERTY_ITEM_PAIR_DELIMITER << statVariables.countAsyncTasksFail;
1345 
1346  return ret.str();
1347  }
1348 
1351 
1352  logger.log(NODE_LOG_MODE_DEBUG) << NODE_MSG_SPHINX << " admin command requested [" << command << "]" << flush;
1353 
1355  HCE::sphinx::SphinxInputJsonMessage inputMsg(command);
1356  if(!inputMsg.isError()){
1357  if(_sphinxFO->getErrorCode()){
1358  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_SPHINX_ADMIN << " before process indexer state error "
1359  << _sphinxFO->getErrorCode() << " : " << _sphinxFO->getErrorMsg() << flush;
1361  _sphinxFO->resetError();
1362  }else{
1364  HCE::sphinx::SphinxOutputJsonMessage outputMessage = _sphinxFO->Process(inputMsg);
1365  //Sphinx FO error
1366  if(_sphinxFO->getErrorCode()){
1368  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_SPHINX_ADMIN << " after process indexer state error " << _sphinxFO->getErrorCode() << " : " << _sphinxFO->getErrorMsg()
1371  _sphinxFO->resetError();
1372  }
1373  //Output message error
1374  if(outputMessage.getErrorCode()){
1376  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_SPHINX_ADMIN << " output message error " << outputMessage.getErrorCode() << " : " << outputMessage.getErrorMessage() << flush;
1377  }
1378  //Get json message from output object
1379  if(!outputMessage.serialize(ret)){
1381  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_SPHINX_ADMIN << " output message serialization error " << outputMessage.getErrorCode() << " : "
1382  << outputMessage.getErrorMessage() << flush;
1383 
1386  }
1389  }
1390  }else{
1391  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_SPHINX_ADMIN << " deserialization error " << inputMsg.getErrorCode() << " : " << inputMsg.getErrorMsg() << flush;
1392  }
1393 
1395  unsigned long long statSphinxRequestsAdmin = getStatSphinxRequestsAdmin();
1396  setStatSphinxRequestsAdmin(++statSphinxRequestsAdmin);
1397 
1398  return ret;
1399  }
1400 
1404 
1406  while(getInProgress()){
1407  try{
1409  zmq::pollitem_t items[] = {{*_inprocDataSock, 0, ZMQ_POLLIN, 0}, {*_inprocAdminSock, 0, ZMQ_POLLIN, 0}};
1410 
1412  zmq::poll(items, 2, getPollTimeout());
1413 
1415  if(items[0].revents & ZMQ_POLLIN){
1417  }
1418 
1420  if(items[1].revents & ZMQ_POLLIN){
1422  }
1423 
1426 
1428  if (dumpRunTimeVariables()){
1432  dumpProperties();
1433  }
1434 
1435  }catch(zmq::error_t& error){
1436  if (EINTR!=error.num()){
1437  std::cout << "DataProcessorData E: " << error.what() << std::endl;
1438  }
1439  }catch(std::exception& e){
1440  std::cout << "DataProcessorData E: " << e.what() << std::endl;
1441  }catch(...){
1442  std::cout << "DataProcessorData E: Unknown exception" << std::endl;
1443  }
1444  }
1445 
1447  }else{
1448  logger.log(NODE_LOG_MODE_FATAL) << "DataProcessorData::process() aborted because not all required sockets created : " << NODE_CRITICAL_ERROR_MESSAGE << flush;
1450  }
1451 
1452  return NULL;
1453  }
1454 
1455  }
1456 }