6 Handler(name, ctx, nodeName, std::string(
""), startedAt, logPriority, iniFile), _inprocDataSock(nullptr), _tasksStateNotificationSock(nullptr),
7 _sphinxFO(nullptr), _drceFO(nullptr), _tasksStateNotificationFunctor(nullptr) {
280 for (HCE::drce::DRCEFunctionalObject::Dictionary::iterator iter=dictionary.begin();iter!=dictionary.end();++iter)
447 std::string(
""), std::string(
""));
457 std::string sphinx_home_dir =
"";
458 std::string sphinx_default_index;
462 std::string sphinx_ranker_expression =
"";
468 Poco::AutoPtr<Poco::Util::IniFileConfiguration> pConf(
new Poco::Util::IniFileConfiguration(iniFile));
513 }
catch(Poco::SyntaxException& e){
517 }
catch(Poco::Exception& e){
523 if(sphinx_home_dir ==
""){
565 std::string drce_home_dir =
"";
566 std::string drce_node_host =
"";
567 std::string drce_node_port =
"";
568 std::string drce_tasks_data_dir =
"";
569 std::string drce_tasks_status_dir =
"";
570 unsigned int drce_max_threads = 32;
571 unsigned int drce_tasks_queue_dump_period = 10000;
572 unsigned int drce_resources_monitor_period = 10000;
573 bool drce_state_notification_reset_error_state =
false;
574 std::string drce_env_vars =
"";
575 size_t drce_resources_extractor_max_threads = 1024;
576 size_t drce_resources_extractor_max_processes = 1024;
577 size_t drce_resources_extractor_max_disk_size = 0;
578 size_t drce_resources_extractor_max_vram_size = 0;
579 size_t drce_resources_extractor_max_rram_size = 0;
585 Poco::AutoPtr<Poco::Util::IniFileConfiguration> pConf(
new Poco::Util::IniFileConfiguration(iniFile));
704 }
catch(Poco::SyntaxException& e){
708 }
catch(Poco::Exception& e){
714 if(drce_home_dir ==
""){
724 if(drce_tasks_data_dir ==
""){
734 if(drce_tasks_status_dir ==
""){
786 if (drce_resources_extractor_max_disk_size)
789 if (drce_resources_extractor_max_vram_size)
792 if (drce_resources_extractor_max_rram_size)
813 struct passwd *pw = getpwuid(getuid());
814 const char *homedir = pw->pw_dir;
830 Poco::AutoPtr<Poco::Util::IniFileConfiguration> pConf(
new Poco::Util::IniFileConfiguration(iniFile));
842 if(processingMode > 0){
850 }
catch(Poco::SyntaxException& e){
854 }
catch(Poco::Exception& e){
919 if(msgCoverIn.
getType() == HCE::types::MessageType::mtSphinx){
927 if(!msgCoverOut.serialize(ret)){
929 logger.
log(
NODE_LOG_MODE_ERROR) <<
"Cover object for Sphinx request serialize error " << msgCoverOut.getErrorCode() <<
" : " << msgCoverOut.getErrorMsg() <<
flush;
932 if(msgCoverIn.
getType() == HCE::types::MessageType::mtDrce){
940 if(!msgCoverOut.serialize(ret)){
942 logger.
log(
NODE_LOG_MODE_ERROR) <<
"Cover object for DRCE request serialize error " << msgCoverOut.getErrorCode() <<
" : " << msgCoverOut.getErrorMsg() <<
flush;
1006 ret = outputMessage.
getData();
1123 unsigned int items_number;
1124 std::istringstream(messageBody) >> items_number;
1126 std::vector<unsigned long long> items(items_number);
1128 if(items_number > 0){
1129 for(
unsigned long long i = 0; i < items.size(); ++i){
1130 items[i] = (rand() % RAND_MAX) + 1;
1139 sort(items.begin(), items.end(), std::less<unsigned long long>());
1143 std::ostringstream oss;
1144 std::copy(items.begin(), items.end() - 1, std::ostream_iterator<int>(oss,
" "));
1145 oss << items.back();
1146 std::string body(oss.str());
1157 std::string messageId((
const char*)msg.
pop_front().c_str());
1159 std::string messageRoute((msg.
parts())?(
const char*)msg.
pop_front().c_str():
"");
1170 msg.
append(messageId.c_str());
1171 msg.
append(body.c_str());
1172 msg.
append(messageRoute.c_str());
1189 std::stringstream result;
1200 std::stringstream result;
1203 unsigned int processingMode;
1204 std::istringstream(parameters) >> processingMode;
1248 std::stringstream result;
1263 return result.str();
1267 std::stringstream
ret;
1287 std::stringstream
ret;
1415 if(items[0].revents & ZMQ_POLLIN){
1420 if(items[1].revents & ZMQ_POLLIN){
1436 if (EINTR!=error.
num()){
1437 std::cout <<
"DataProcessorData E: " << error.
what() <<
std::endl;
1439 }
catch(std::exception& e){
1440 std::cout <<
"DataProcessorData E: " << e.what() <<
std::endl;
1442 std::cout <<
"DataProcessorData E: Unknown exception" <<
std::endl;