26 std::string connectionString,
unsigned char nodeMode, int64_t startedAt,
27 unsigned char logPriority,
const std::string& iniFile) :
29 _previousClientNumber(0), calculatorAlgorithmData(), resourceUsageLimitsData(),
30 typeCalculatorAlgorithm(
CalculatorAlgorithm::Types::caDefault), _resourceUsageManager(logger){
121 if (!resourcesUsageAlg0Weights.empty()) {
142 if (!resourcesUsageLimits.empty()) {
383 std::string addr ((addrp)?addrp:
"");
385 std::vector<ClientWorkerItem> clientQueue;
397 std::string clientIdentity((
const char*)msg.
pop_front().c_str());
398 std::string messageId((
const char*)msg.
pop_front().c_str());
400 std::string messageRoute((msg.
parts())?(
const char*)msg.
pop_front().c_str():
"");
404 <<
" bytes, n=" << ++statRequests <<
flush;
425 std::string json(messageRoute);
426 size_t pos = json.find_last_of(
'}');
430 if (pos!=std::string::npos){
431 json = json.substr(0, pos+1);
443 if (!requestRouteMessage.
isError()){
445 std::vector<std::string> identityNames = std::forward<std::vector<std::string> >(requestRouteMessage.
getNodesNames());
461 nodeMode = managerRole;
466 if (!identityNames.empty()){
467 std::vector<std::string> routes;
468 for (
const std::string& routeName:routeNames){
469 for (
const std::string& identityName:identityNames){
471 <<
" identityName: " << identityName <<
" isEqual: " << std::boolalpha << (routeName==identityName) << flush;
472 if (routeName==identityName){
473 routes.push_back(identityName);
490 for (
size_t i=0;i<routeNames.size();++i){
494 if(routeNames.empty()){
503 std::stringstream routes;
520 std::string identity;
522 std::vector<ClientWorkerItem> clientsQueue;
523 for (
size_t i=0;i<routeNames.size();++i){
532 if (clientsQueue.empty()){
545 if (identity.empty()){
556 if (!clientsQueue.empty())
557 identity = clientsQueue[0].identity;
561 if (identity.empty()){
579 if (!resourceUsageLimitsChecker.
safeCheck(resourceData, errorMsg)) {
584 std::stringstream routes;
593 logger.
log(
NODE_LOG_MODE_DEBUG) <<
"REQUEST SENT_RR to inproc_proxy socket from [" << identity <<
"], message [" << messageId <<
"] body "
610 std::random_device rd;
611 std::mt19937_64 gen(rd());
612 std::uniform_real_distribution<double> dis(0, maxNumber);
614 return static_cast<size_t>(dis(gen));
619 std::string identity;
620 size_t queueIndex = 0;
626 if (clients.empty() || clients.size()==1){
633 if (!clients.empty()){
635 std::vector<ClientWorkerItem> clientsQueue(clients);
636 if (clientsQueue.size() > 1){
641 if (!clientsQueue.empty()){
645 identity = clientsQueue[0].identity;
647 for (queueIndex=0;queueIndex<clients.size();++queueIndex){
648 if (clients[queueIndex].identity == identity){
650 previousClientNumber = queueIndex;
661 size_t& previousClientNumber,
668 for (
size_t i=0;i<clients.size();++i){
669 if (!clients[i].resources.empty()){
673 nodeResourceDataList.
addItem(std::forward<HCE::handlers::CalculatorNodeResourceData>(resourceData));
681 if (!identity.empty()){
683 std::vector<double> listWeights = calculatorNodeExecution.
getListWeights(algorithmData, resourceUsageCollection);
691 for (
size_t i=0;i<clients.size();++i){
692 if (clients[i].identity==identity){
693 previousClientNumber = i;
701 const std::string& coverMessageBody,
703 const std::string& messageId,
704 const std::string& messageRoute,
709 msg.
append(messageId.c_str());
711 msg.
append(messageBody.c_str());
713 msg.
append(messageRoute.c_str());
715 std::stringstream clientsNumberStream;
718 msg.
append(clientsNumberStream.str().c_str());
726 clientMsg.
wrap(identity.c_str(), NULL);
727 clientMsg.
append(messageId.c_str());
728 clientMsg.
append(messageBody.c_str());
729 clientMsg.
append(messageRoute.c_str());
736 if (!iniFile.empty()){
741 Poco::AutoPtr<Poco::Util::IniFileConfiguration> pConf(
new Poco::Util::IniFileConfiguration(iniFile));
792 }
catch(Poco::SyntaxException& e){
796 }
catch(Poco::Exception& e){
917 std::string clientIdentity((
const char*)msg.
pop_front().c_str());
920 std::string body((msg.
body())?msg.
body():
"");
930 std::vector<ClientWorkerItem> clientQueue;
933 if (heartbeatMessage.
isError())
967 std::string clientIdentity((
const char*)msg.
pop_front().c_str());
976 std::string messageId((
const char*)msg.
pop_front().c_str());
983 std::string messageRoute((msg.
parts())?(
const char*)msg.
pop_front().c_str():
"");
989 std::stringstream clientsNumberStream;
991 if (pos!=std::string::npos && (pos+1)!=std::string::npos) {
992 unsigned int clientsNumber = 0;
993 std::istringstream(messageRoute.substr(pos+1)) >> clientsNumber;
995 clientsNumberStream << clientsNumber;
1000 messageRoute = messageRoute.substr(0, pos);
1005 logger.
log(
NODE_LOG_MODE_TRACE) <<
"DataServerProxy::processDataMessage messageRoute: '" << messageRoute <<
"', clientsNumberStream: '" << clientsNumberStream.str() <<
"'" <<
flush;
1008 msg.
append(messageId.c_str());
1013 msg.
append(messageRoute.c_str());
1016 msg.
append(clientsNumberStream.str().c_str());
1026 const std::string& messageRoute,
1030 std::string resultMessageBody;
1034 case HCE::types::MessageType::mtSphinx: {
1038 if (responseType==EmptyResponseType::ertNoClients){
1042 }
else if(responseType==EmptyResponseType::ertBadNodeMode){
1046 }
else if(responseType==EmptyResponseType::ertNotEnoughResourceLimits){
1052 if (!errorMsg.empty()){
1057 errorMessage.str());
1060 case HCE::types::MessageType::mtDrce: {
1064 if (responseType==EmptyResponseType::ertNoClients){
1069 else if(responseType==EmptyResponseType::ertBadNodeMode){
1073 }
else if(responseType==EmptyResponseType::ertNotEnoughResourceLimits){
1079 if (!errorMsg.empty()){
1083 HCE::drce::DRCETaskRequest::TaskState::NOT_SET_AS_NEW,
1085 errorMessage.str());
1091 std::string jsonMsg;
1092 msgCover.
setData(resultMessageBody);
1128 std::stringstream result;
1147 unsigned int nodeMode =
static_cast<unsigned int>(
getNodeMode());
1149 std::istringstream(parameters) >> nodeMode;
1153 setNodeMode(static_cast<unsigned char>(nodeMode));
1160 std::istringstream(parameters) >> resourcesUsageCollectedSize;
1173 std::istringstream(parameters) >> purgeMode;
1184 std::istringstream(parameters) >> hbDelay;
1194 std::istringstream(parameters) >> hbTimeout;
1205 std::istringstream(parameters) >> hbMode;
1211 std::vector<ClientWorkerItem> clientsQueue;
1215 ret = routeMessage.
build();
1218 std::string json =
"";
1222 resourceUsageSerializator.
serialize(json);
1223 if (resourceUsageSerializator.
isError())
1235 std::stringstream result;
1253 return result.str();
1290 << Poco::DateTimeFormatter::format(Poco::LocalDateTime(Poco::Timestamp(Poco::Timestamp::fromEpochTime(
_heartbeatAt/1000))),
"%Y-%m-%d %H:%M:%S") <<
"'" <<
flush;
1297 it->shiftCounter.responceTimeStamp(it->rtimestamp);
1301 <<
" _shiftCounter.getShift()= " << it->shiftCounter.getShift() <<
flush;
1309 it->shiftCounter.reset();
1312 << Poco::DateTimeFormatter::format(Poco::LocalDateTime(Poco::Timestamp(Poco::Timestamp::fromEpochTime(
_heartbeatAt/1000))),
"%Y-%m-%d %H:%M:%S") <<
"'" <<
flush;
1332 msg.
wrap(identity.c_str(), NULL);
1341 std::vector<zmq::pollitem_t> items;
1343 items ={{*
_dataServerSock, 0, ZMQ_POLLIN, 0},{*
_inprocProxySock, 0, ZMQ_POLLIN, 0},{*
_inprocAdminSock, 0, ZMQ_POLLIN, 0}};
1353 if(items[0].revents & ZMQ_POLLIN){
1359 if(items[items.size() - 2].revents & ZMQ_POLLIN){
1364 if(items[items.size() - 1].revents & ZMQ_POLLIN){
1386 if (EINTR!=error.
num()){
1387 std::cout <<
"DataServerProxy E: " << error.
what() <<
std::endl;
1389 }
catch(std::exception& e){
1390 std::cout <<
"DataServerProxy E: " << e.what() <<
std::endl;
1392 std::cout <<
"DataServerProxy E: Unknown exception" <<
std::endl;