hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DataServerProxy.cpp
Go to the documentation of this file.
1 #include <stdlib.h>
2 #include <time.h>
3 #include <random>
4 
5 #include "DataServerProxy.hpp"
6 #include "ServiceMessages.hpp"
7 //DRCE FO
9 #include "DRCEMessageConst.hpp"
10 #include "DRCEError.hpp"
11 //Sphinx FO
13 #include "SphinxMessageConst.hpp"
14 #include "SphinxError.hpp"
15 //Message cover object
16 #include "JsonMessageCover.hpp"
17 //Resource extractor
19 //Resource usage checker
21 
22 namespace HCE {
23  namespace handlers {
24 
25  DataServerProxy::DataServerProxy(std::string name, zmq::context_t& ctx, const std::string& clientIdentity,
26  std::string connectionString, unsigned char nodeMode, int64_t startedAt,
27  unsigned char logPriority, const std::string& iniFile) :
28  Handler(name, ctx, clientIdentity, connectionString, startedAt, logPriority, iniFile), DataNodeOptions(Handler::handlerProperties),
29  _previousClientNumber(0), calculatorAlgorithmData(), resourceUsageLimitsData(),
30  typeCalculatorAlgorithm(CalculatorAlgorithm::Types::caDefault), _resourceUsageManager(logger){
31 
32  setNodeMode(nodeMode);
34  initialize();
35  }
36 
38  deinitialize();
39  }
40 
42  logger.log(NODE_LOG_MODE_INFORMATION) << "Initialize client [" << getClientIdentity() << "] with connection [" << getConnectionString() << "]" << flush;
43 
45  _inprocProxySock = NULL;
46  _inprocReducerInSock = NULL;
47  _dataServerSock = NULL;
48 
51 
53  _heartbeatAt = s_clock() + getHeartbeatDelay();
54 
56  _pstatsAt = s_clock() + STAT_INTERVAL;
57 
60 
62  checkInit();
63 
69 
72 
79 
80  return true;
81  }
82 
84  if(_inprocProxySock){
85  delete _inprocProxySock;
86  }
88  delete _inprocReducerInSock;
89  }
90  if(_dataServerSock){
91  delete _dataServerSock;
92  }
93  _inprocProxySock = NULL;
94  _inprocReducerInSock = NULL;
95  _dataServerSock = NULL;
96  _clientsQueue.clear();
97  if(_clientsManager){
98  delete _clientsManager;
99  }
101  }
102 
103  void DataServerProxy::setNodeMode(unsigned char nodeMode){
105  }
106 
107  unsigned char DataServerProxy::getNodeMode(void){
109  }
110 
111  void DataServerProxy::setResourcesUsageAlg(unsigned int resourcesUsageAlg){
112  typeCalculatorAlgorithm = static_cast<CalculatorAlgorithm::Types>(resourcesUsageAlg);
114  }
115 
118  }
119 
120  void DataServerProxy::setResourcesUsageAlg0Weights(const std::string& resourcesUsageAlg0Weights){
121  if (!resourcesUsageAlg0Weights.empty()) {
122  HCE::handlers::DictionaryConverter<std::string, double> converter(resourcesUsageAlg0Weights);
125  }
126  }
127 
130  }
131 
132  void DataServerProxy::setResourcesUsageOrder(unsigned int resourcesUsageOrder){
133  calculatorAlgorithmData.setOrder(resourcesUsageOrder);
135  }
136 
139  }
140 
141  void DataServerProxy::setResourcesUsageLimits(const std::string& resourcesUsageLimits){
142  if (!resourcesUsageLimits.empty()) {
143  HCE::handlers::DictionaryConverter<std::string, double> converter(resourcesUsageLimits);
146  }
147  }
148 
151  }
152 
153  void DataServerProxy::setResourcesUsageCollectedSize(unsigned int resourcesUsageCollectedSize){
155  }
156 
159  }
160 
161  void DataServerProxy::setResourcesUsageMaxAllowedRange(double resourcesUsageMaxAllowedRange){
163  }
164 
167  }
168 
169  void DataServerProxy::setResourcesExtractorMaxThreads(unsigned int resourcesExtractorMaxThreads){
172  }
173 
176  }
177 
178  void DataServerProxy::setResourcesExtractorMaxProcesses(unsigned int resourcesExtractorMaxProcesses){
181  }
182 
185  }
186 
187  void DataServerProxy::setResourcesExtractorMaxDiskSize(unsigned int resourcesExtractorMaxDiskSize){
189  HCE::drce::ResourceUsageExtractor::getInstance().setMaxDiskSize(resourcesExtractorMaxDiskSize);
190  }
191 
194  }
195 
196  void DataServerProxy::setResourcesExtractorMaxVramSize(unsigned int resourcesExtractorMaxVramSize){
198  HCE::drce::ResourceUsageExtractor::getInstance().setMaxVramSize(resourcesExtractorMaxVramSize);
199  }
200 
203  }
204 
205  void DataServerProxy::setResourcesExtractorMaxRramSize(unsigned int resourcesExtractorMaxRramSize){
207  HCE::drce::ResourceUsageExtractor::getInstance().setMaxRramSize(resourcesExtractorMaxRramSize);
208  }
209 
212  }
213 
215  void DataServerProxy::setStatClientRequests(unsigned int statClientRequests){
217  }
218 
221  }
222 
223  void DataServerProxy::setStatClientResponses(unsigned int statClientResponses){
225  }
226 
229  }
230 
231  void DataServerProxy::setStatRequests(unsigned int statRequests){
233  }
234 
237  }
238 
239  void DataServerProxy::setStatClientRequestsSize(unsigned int statClientRequestsSize){
241  }
242 
245  }
246 
247  void DataServerProxy::setStatClientResponsesSize(unsigned int statClientResponsesSize){
249  }
250 
253  }
254 
255  void DataServerProxy::setStatRequestsSize(unsigned int statRequestsSize){
257  }
258 
261  }
262 
263  void DataServerProxy::setPeriodicStatClientRequests(unsigned int periodicStatClientRequests){
265  }
266 
269  }
270 
271  void DataServerProxy::setPeriodicStatClientResponses(unsigned int periodicStatClientResponses){
273  }
274 
277  }
278 
279  void DataServerProxy::setStatRequestsProcessed(unsigned int statRequestsProcessed){
281  }
282 
285  }
286 
287  void DataServerProxy::setStatRequestsProcessedSize(unsigned int statRequestsProcessedSize){
289  }
290 
293  }
294 
295  void DataServerProxy::setPeriodicStatClientRequestsSize(unsigned int periodicStatClientRequestsSize){
297  }
298 
301  }
302 
303  void DataServerProxy::setPeriodicStatClientResponsesSize(unsigned int periodicStatClientResponsesSize){
305  }
306 
309  }
310 
311  void DataServerProxy::setStatQPSClient(float statQPSClient){
313  }
314 
317  }
318 
319  void DataServerProxy::setStatQPSServer(float statQPSServer){
321  }
322 
325  }
326 
327  void DataServerProxy::setStatRPSClient(float statRPSClient){
329  }
330 
333  }
334 
335  void DataServerProxy::setStatRPSServer(float statRPSServer){
337  }
338 
341  }
342 
343  void DataServerProxy::setStatRequestsBandwidthClient(float statRequestsBandwidthClient){
345  }
346 
349  }
350 
351  void DataServerProxy::setStatRequestsBandwidthServer(float statRequestsBandwidthServer){
353  }
354 
357  }
358 
359  void DataServerProxy::setStatResponsesBandwidthClient(float statResponsesBandwidthClient){
361  }
362 
365  }
366 
367  void DataServerProxy::setStatResponsesBandwidthServer(float statResponsesBandwidthServer){
369  }
370 
373  }
374 
377  zmsg msg(*_inprocProxySock);
378  logger.log(NODE_LOG_MODE_TRACE) << "MSG FROM inproc_proxy" << endl << msg.dump() << flush;
379 
380  if (msg.parts() == NODE_MSG_CMD_MIN_FIELDS){
381 
382  char* addrp = msg.address();
383  std::string addr ((addrp)?addrp:"");
384  if (addr.compare(0, NODE_MSG_ROUTES.length(), NODE_MSG_ROUTES)==0){
385  std::vector<ClientWorkerItem> clientQueue;
386  RouteMessage routeMessage(clientQueue);
387  routeMessage.extract(NODE_MSG_ROUTES, addr);
388 
389  _clientsManager->append(clientQueue);
390 
391  RouteMessage routeMsg(_clientsQueue);
392 
393  s_send(*_inprocReducerInSock, routeMsg.build(NODE_MSG_ROUTES));
394  }
395  }else{
397  std::string clientIdentity((const char*)msg.pop_front().c_str());
398  std::string messageId((const char*)msg.pop_front().c_str());
399  std::string messageBody((const char*)msg.pop_front().c_str());
400  std::string messageRoute((msg.parts())?(const char*)msg.pop_front().c_str():"");
401 
402  unsigned int statRequests = getStatRequests();
403  logger.log(NODE_LOG_MODE_DEBUG) << "RECEIVED REQUEST from upper server [" << clientIdentity << "], message [" << messageId << "] body " << messageBody.size()
404  << " bytes, n=" << ++statRequests << flush;
405  setStatRequests(statRequests);
406 
407  if(_clientsQueue.empty()){
409  sendEmptyResponseMessage(msg, messageBody, EmptyResponseType::ertNoClients, messageId, messageRoute);
410  logger.log(NODE_LOG_MODE_DEBUG) << "No client`s connection registered!" << flush;
411  logger.log(NODE_LOG_MODE_TRACE) << msg.dump() << flush;
412  }else{
414  std::vector<std::string> routeNames = std::forward<std::vector<std::string> >(RouteMessage::extractRouteNames(_clientsQueue));
415 
416  unsigned int nodeMode(getNodeMode());
417  CalculatorAlgorithm::Types resourcesUsageAlgorithm = CalculatorAlgorithm::Types::caDefault;
420 
421  if (!messageRoute.empty() && nodeMode != NODE_MODE_ROUTER){
422  logger.log(NODE_LOG_MODE_TRACE) << "!!! DataServerProxy::handleInternalMessage >>> (!messageRoute.empty() && nodeMode != NODE_MODE_ROUTER) <<<" << flush;
423 
425  std::string json(messageRoute);
426  size_t pos = json.find_last_of('}');
427 
428  logger.log(NODE_LOG_MODE_TRACE) << "!!! DataServerProxy::handleInternalMessage pos = " << pos << flush;
429 
430  if (pos!=std::string::npos){
431  json = json.substr(0, pos+1);
432 
433  logger.log(NODE_LOG_MODE_TRACE) << "!!! DataServerProxy::handleInternalMessage json(1): '" << json << "'" << flush;
434  }else{
435  json.clear();
436  }
438  if (!json.empty()){
439  logger.log(NODE_LOG_MODE_TRACE) << "!!! DataServerProxy::handleInternalMessage json(2): '" << json << "'" << flush;
440 
442  RequestRouteMessage requestRouteMessage(json); //deserialize route json
443  if (!requestRouteMessage.isError()){
444  unsigned int managerRole = requestRouteMessage.getManagerRole();
445  std::vector<std::string> identityNames = std::forward<std::vector<std::string> >(requestRouteMessage.getNodesNames());
447  resourcesUsageAlgorithm = (static_cast<CalculatorAlgorithm::Types>(requestRouteMessage.getResourcesUsageAlgorithm()) != typeCalculatorAlgorithm)?
450  if (!requestRouteMessage.getResourcesUsageAlgorithmWeights().empty()){
452  algorithmData = converter.toObject().cast<HCE::handlers::CalculatorAlgorithmData>();
453  }
455  if (!requestRouteMessage.getResourcesUsageLimits().empty()){
457  limitsData = converter.toObject().cast<HCE::handlers::ResourceUsageLimitsData>();
458  }
460  if (managerRole == NODE_MODE_PROXY || managerRole == NODE_MODE_SHARD || managerRole == NODE_MODE_SHARD_RND || managerRole == NODE_MODE_SHARD_RU)
461  nodeMode = managerRole;
462 
463  logger.log(NODE_LOG_MODE_TRACE) << "!!! DataServerProxy::handleInternalMessage routeNames.size() = " << routeNames.size() << flush;
464  logger.log(NODE_LOG_MODE_TRACE) << "!!! DataServerProxy::handleInternalMessage identityNames.size() = " << identityNames.size() << flush;
465 
466  if (!identityNames.empty()){
467  std::vector<std::string> routes;
468  for (const std::string& routeName:routeNames){
469  for (const std::string& identityName:identityNames){
470  logger.log(NODE_LOG_MODE_TRACE) << "!!! DataServerProxy::handleInternalMessage routeName: " << routeName
471  << " identityName: " << identityName << " isEqual: " << std::boolalpha << (routeName==identityName) << flush;
472  if (routeName==identityName){
473  routes.push_back(identityName);
474  }
475  }
476  }
477  routeNames = routes;
478  }
479  }else{
480  logger.log(NODE_LOG_MODE_DEBUG) << "!!! DataServerProxy::handleInternalMessage ErrorMsg: " << requestRouteMessage.getErrorMsg() << flush;
481  logger.log(NODE_LOG_MODE_DEBUG) << "!!! DataServerProxy::handleInternalMessage messageRoute: " << messageRoute << flush;
482  }
483  }
484  }
485 
486  logger.log(NODE_LOG_MODE_TRACE) << "!!! DataServerProxy::handleInternalMessage getNodeMode(): " << getNodeMode() << flush;
487  logger.log(NODE_LOG_MODE_TRACE) << "!!! DataServerProxy::handleInternalMessage nodeMode: " << nodeMode << flush;
488  logger.log(NODE_LOG_MODE_TRACE) << "!!! DataServerProxy::handleInternalMessage routeNames.size() = " << routeNames.size() << flush;
489 
490  for (size_t i=0;i<routeNames.size();++i){
491  logger.log(NODE_LOG_MODE_TRACE) << "!!! DataServerProxy::handleInternalMessage routeNames[" << i << "] = " << routeNames[i] << flush;
492  }
493 
494  if(routeNames.empty()){
496  sendEmptyResponseMessage(msg, messageBody, EmptyResponseType::ertNoClients, messageId, messageRoute);
497  logger.log(NODE_LOG_MODE_DEBUG) << "No valid route names!" << flush;
498  logger.log(NODE_LOG_MODE_TRACE) << msg.dump() << flush;
499 
501  }else if(nodeMode == NODE_MODE_PROXY || nodeMode == NODE_MODE_ROUTER){
502 
503  std::stringstream routes;
504  routes << messageRoute << ROUTE_LIST_COUNT_DELIMITER << _clientsQueue.size();
506  for (size_t i=0;i<_clientsQueue.size();++i){
508  sendResponseMessage(_clientsQueue[i].identity, messageId, messageBody, routes.str());
509 
510  unsigned int statClientRequests = getStatClientRequests();
511  setStatClientRequests(++statClientRequests);
513 
514  logger.log(NODE_LOG_MODE_DEBUG) << "REQUEST SENT_ALL to dataserver socket for client [" << _clientsQueue[i].identity
515  << "], message [" << messageId << "] body " << messageBody.size() << ", n=" << getStatClientRequests() << flush;
516  }
517  }else{
518 
519  if(nodeMode == NODE_MODE_SHARD || nodeMode == NODE_MODE_SHARD_RND || nodeMode == NODE_MODE_SHARD_RU) {
520  std::string identity;
522  std::vector<ClientWorkerItem> clientsQueue;
523  for (size_t i=0;i<routeNames.size();++i){
524  for (size_t k=0;k<_clientsQueue.size();++k){
525  if (routeNames[i] == _clientsQueue[k].identity){
526  clientsQueue.push_back(ClientWorkerItem(_clientsQueue[k]));
527  break;
528  }
529  }
530  }
531 
532  if (clientsQueue.empty()){
534  sendEmptyResponseMessage(msg, messageBody, EmptyResponseType::ertNoClients, messageId, messageRoute);
535  logger.log(NODE_LOG_MODE_DEBUG) << "No valid route names!" << flush;
536  logger.log(NODE_LOG_MODE_TRACE) << msg.dump() << flush;
537  }else{
538 
539  if (nodeMode == NODE_MODE_SHARD_RND){
542  }else if(nodeMode == NODE_MODE_SHARD_RU){
544  identity = getIdentityResourceUsage(clientsQueue, _previousClientNumber, resourcesUsageAlgorithm, algorithmData);
545  if (identity.empty()){
547  }
549  logger.log(NODE_LOG_MODE_DEBUG) << "Choosen identity: '" << identity << "'" << flush;
550  }else{
552  std::sort(clientsQueue.begin(), clientsQueue.end(), ClientWorkerItem::sortByFrequencyPredicate);
554  _clientsManager->incrementFrequency(clientsQueue[0].identity);
556  if (!clientsQueue.empty())
557  identity = clientsQueue[0].identity;
558  }
559 
560 
561  if (identity.empty()){
563  sendEmptyResponseMessage(msg, messageBody, EmptyResponseType::ertNoClients, messageId, messageRoute);
564  logger.log(NODE_LOG_MODE_DEBUG) << "Empty client name cannot be use for routing balance!" << flush;
565  logger.log(NODE_LOG_MODE_TRACE) << msg.dump() << flush;
566  }else{
567 
568  logger.log(NODE_LOG_MODE_DEBUG) << "resourcesUsageAlgorithm = " << (unsigned int)resourcesUsageAlgorithm << flush;
569  logger.log(NODE_LOG_MODE_DEBUG) << "algorithmData = " << HCE::handlers::DictionaryConverter<std::string, double>(algorithmData).toString() << flush;
570  logger.log(NODE_LOG_MODE_DEBUG) << "limitsData = " << HCE::handlers::DictionaryConverter<std::string, double>(limitsData).toString() << flush;
571 
572  ResourceUsageCollection resourceUsageCollection = _clientsManager->getResources(identity);
573 
576 
577  std::string errorMsg;
578  HCE::handlers::ResourceUsageLimitsChecker resourceUsageLimitsChecker(limitsData);
579  if (!resourceUsageLimitsChecker.safeCheck(resourceData, errorMsg)) {
581  sendEmptyResponseMessage(msg, messageBody, EmptyResponseType::ertNotEnoughResourceLimits, messageId, messageRoute, errorMsg);
582  logger.log(NODE_LOG_MODE_TRACE) << "Node '" << identity << "' has not allowed resources usage limits, message skipped" << endl << msg.dump() << flush;
583  }else{
584  std::stringstream routes;
585  routes << messageRoute << ROUTE_LIST_COUNT_DELIMITER << 1;
587  sendResponseMessage(identity, messageId, messageBody, routes.str());
588 
589  unsigned int statClientRequests = getStatClientRequests();
590  setStatClientRequests(++statClientRequests);
592 
593  logger.log(NODE_LOG_MODE_DEBUG) << "REQUEST SENT_RR to inproc_proxy socket from [" << identity << "], message [" << messageId << "] body "
594  << messageBody.size() << ", n=" << getStatClientRequests() << flush;
595  }
596  }
597  }
598  }else{
600  sendEmptyResponseMessage(msg, messageBody, EmptyResponseType::ertBadNodeMode, messageId, messageRoute);
601  logger.log(NODE_LOG_MODE_TRACE) << "Node mode value " << getNodeMode() << " not supported, message skipped" << endl << msg.dump() << flush;
602  }
603  }
604  }
605  }
606  }
607 
608  size_t DataServerProxy::getRandomNodeNumber(size_t maxNumber){
610  std::random_device rd;
611  std::mt19937_64 gen(rd());
612  std::uniform_real_distribution<double> dis(0, maxNumber);
614  return static_cast<size_t>(dis(gen));
615  }
616 
617  std::string DataServerProxy::getIdentityRandom(std::vector<ClientWorkerItem>& clients, size_t& previousClientNumber){
618  logger.log(NODE_LOG_MODE_TRACE) << "NODE_MODE_SHARD_RND clients count = " << clients.size() << " previousClientNumber = " << _previousClientNumber << flush;
619  std::string identity;
620  size_t queueIndex = 0;
621  do{
622  queueIndex = getRandomNodeNumber(clients.size());
623 
624  logger.log(NODE_LOG_MODE_TRACE) << "NODE_MODE_SHARD_RND getRandomNodeNumber() return queueIndex = " << queueIndex << flush;
625 
626  if (clients.empty() || clients.size()==1){
627  logger.log(NODE_LOG_MODE_TRACE) << "NODE_MODE_SHARD_RND do-while broken because: clients count==1" << flush;
628  break;
629  }
630 
631  }while(queueIndex==_previousClientNumber);
632 
633  if (!clients.empty()){
634 
635  std::vector<ClientWorkerItem> clientsQueue(clients);
636  if (clientsQueue.size() > 1){
638  clientsQueue.erase(clientsQueue.begin()+_previousClientNumber);
639  }
640 
641  if (!clientsQueue.empty()){
643  std::sort(clientsQueue.begin(), clientsQueue.end(), ClientWorkerItem::sortByFrequencyPredicate);
645  identity = clientsQueue[0].identity;
646 
647  for (queueIndex=0;queueIndex<clients.size();++queueIndex){
648  if (clients[queueIndex].identity == identity){
650  previousClientNumber = queueIndex;
651  break;
652  }
653  }
654  }
655  }
657  return identity;
658  }
659 
660  std::string DataServerProxy::getIdentityResourceUsage(std::vector<ClientWorkerItem>& clients,
661  size_t& previousClientNumber,
662  CalculatorAlgorithm::Types typeAlgorithm,
663  const CalculatorAlgorithmData& algorithmData){
664 
665  logger.log(NODE_LOG_MODE_TRACE) << "NODE_MODE_SHARD_RU client count = " << clients.size() << flush;
666 
667  HCE::handlers::NodeResourceDataList nodeResourceDataList;
668  for (size_t i=0;i<clients.size();++i){
669  if (!clients[i].resources.empty()){
671  HCE::handlers::DictionaryConverter<std::string, double> converter(_resourceUsageManager.calculateAverage(clients[i].identity, clients[i].resources));
672  HCE::handlers::CalculatorNodeResourceData resourceData = converter.toObject().cast<HCE::handlers::CalculatorNodeResourceData, std::string>(clients[i].identity);
673  nodeResourceDataList.addItem(std::forward<HCE::handlers::CalculatorNodeResourceData>(resourceData));
674 
675  logger.log(NODE_LOG_MODE_TRACE) << i << ") [ " << clients[i].identity << " ] avg: '" << converter.toString() << "'" << flush;
676  }
677  }
678 
679  HCE::handlers::CalculatorNodeExecution calculatorNodeExecution(typeAlgorithm);
680  std::string identity = calculatorNodeExecution.calculate(algorithmData, nodeResourceDataList, _resourceUsageManager);
681  if (!identity.empty()){
682  ResourceUsageCollection resourceUsageCollection = _clientsManager->getResources(identity);
683  std::vector<double> listWeights = calculatorNodeExecution.getListWeights(algorithmData, resourceUsageCollection);
684  _resourceUsageManager.setListWeights(identity, listWeights);
685 
687  identity.clear();
688  }
689  }
690 
691  for (size_t i=0;i<clients.size();++i){
692  if (clients[i].identity==identity){
693  previousClientNumber = i;
694  break;
695  }
696  }
697  return identity;
698  }
699 
701  const std::string& coverMessageBody,
702  EmptyResponseType responseType,
703  const std::string& messageId,
704  const std::string& messageRoute,
705  const std::string& errorMsg){
707  std::string messageBody = makeEmptyResponseMessage(coverMessageBody, responseType, messageRoute, errorMsg);
709  msg.append(messageId.c_str());
711  msg.append(messageBody.c_str());
713  msg.append(messageRoute.c_str());
715  std::stringstream clientsNumberStream;
716  clientsNumberStream << _clientsQueue.size();
718  msg.append(clientsNumberStream.str().c_str());
721  }
722 
723  void DataServerProxy::sendResponseMessage(const std::string& identity, const std::string& messageId, const std::string& messageBody, const std::string& messageRoute){
725  zmsg clientMsg(getClientIdentity().c_str());
726  clientMsg.wrap(identity.c_str(), NULL);
727  clientMsg.append(messageId.c_str());
728  clientMsg.append(messageBody.c_str());
729  clientMsg.append(messageRoute.c_str());
731  clientMsg.send(*_dataServerSock);
732  }
733 
734  void DataServerProxy::nodeInit(const std::string& iniFile){
735 
736  if (!iniFile.empty()){
737  logger.log(NODE_LOG_MODE_INFORMATION) << "load node settings from '" << iniFile << "' file" << flush;
738 
740  try{
741  Poco::AutoPtr<Poco::Util::IniFileConfiguration> pConf(new Poco::Util::IniFileConfiguration(iniFile));
742  try{
745  setResourcesUsageAlg(static_cast<unsigned int>(pConf->getInt(HCE::handlers::properties::RESOURCES_USAGE_ALG)));
746  }
750  }
751 
755  }
759  }
760 
764  }
765 
769  }
770 
774  }
775 
778  }
779 
782  }
783 
786  }
787 
790  }
791 
792  } catch(Poco::SyntaxException& e){
794  logger.log(NODE_LOG_MODE_ERROR) << "nodeInit() - data node ini file processing error: " << e.displayText() << flush;
795  }
796  } catch(Poco::Exception& e){
798  logger.log(NODE_LOG_MODE_ERROR) << "nodeInit() - data node ini file access error: " << e.displayText() << flush;
799  }
800 
804  }
805 
808  }
809 
812  }
813 
816  }
817 
820  }
821 
824  }
825 
828  }
829 
830  logger.log(NODE_LOG_MODE_INFORMATION) << "node.resources_usage_order: " << calculatorAlgorithmData.getOrder() << flush;
831  logger.log(NODE_LOG_MODE_INFORMATION) << "node.resources_usage_alg: " << static_cast<unsigned int>(typeCalculatorAlgorithm) << flush;
832  logger.log(NODE_LOG_MODE_INFORMATION) << "node.resources_usage_alg0_weights: " << getResourcesUsageAlg0Weights() << flush;
833  logger.log(NODE_LOG_MODE_INFORMATION) << "node.resources_usage_limits: " << getResourcesUsageLimits() << flush;
834  logger.log(NODE_LOG_MODE_INFORMATION) << "node.resources_usage_collected_size: " << getResourcesUsageCollectedSize() << flush;
835  logger.log(NODE_LOG_MODE_INFORMATION) << "node.resources_usage_max_allowed_range: " << getResourcesUsageMaxAllowedRange() << flush;
836  logger.log(NODE_LOG_MODE_INFORMATION) << "drce.drce_resources_extractor_max_threads: " << getResourcesExtractorMaxThreads() << flush;
837  logger.log(NODE_LOG_MODE_INFORMATION) << "drce.drce_resources_extractor_max_processes: " << getResourcesExtractorMaxProcesses() << flush;
838  logger.log(NODE_LOG_MODE_INFORMATION) << "drce.drce_resources_extractor_max_disk_size: " << getResourcesExtractorMaxDiskSize() << flush;
839  logger.log(NODE_LOG_MODE_INFORMATION) << "drce.drce_resources_extractor_max_vram_size: " << getResourcesExtractorMaxVramSize() << flush;
840  logger.log(NODE_LOG_MODE_INFORMATION) << "drce.drce_resources_extractor_max_rram_size: " << getResourcesExtractorMaxRramSize() << flush;
841  }
842  }
843 
848 
851 
853  setStatRequests(0);
854 
857 
860 
863 
866 
869 
872 
875 
878 
881 
883  setStatQPSClient(0);
884 
886  setStatQPSServer(0);
887 
889  setStatRPSClient(0);
890 
892  setStatRPSServer(0);
893 
896 
899 
902 
905  }
906 
907  void DataServerProxy::registerClient(const std::string& identity){
908  _clientsManager->remove(identity);
909  _clientsManager->append(identity, false);
910 
911  logger.log(NODE_LOG_MODE_DEBUG) << "READY message from client [" << identity << "] received" << flush;
912  }
913 
915 
917  std::string clientIdentity((const char*)msg.pop_front().c_str());
918 
919  std::string addr((msg.address())?msg.address():"");
920  std::string body((msg.body())?msg.body():"");
921 
922  logger.log(NODE_LOG_MODE_TRACE) << "DataServerProxy::processControlMessage clientIdentity: " << clientIdentity << flush;
923  logger.log(NODE_LOG_MODE_TRACE) << "DataServerProxy::processControlMessage addr: " << addr << flush;
924  logger.log(NODE_LOG_MODE_TRACE) << "DataServerProxy::processControlMessage body: " << body << flush;
925 
926  if(addr == NODE_MSG_READY){
927  registerClient(clientIdentity);
928  }else if(addr.compare(0, NODE_MSG_HEARTBEAT.length(), NODE_MSG_HEARTBEAT)==0){
929 
930  std::vector<ClientWorkerItem> clientQueue;
931  HCE::handlers::HeartbeatMessage heartbeatMessage(clientQueue);
932  heartbeatMessage.extract(addr, NODE_MSG_HEARTBEAT);
933  if (heartbeatMessage.isError())
934  logger.log(NODE_LOG_MODE_ERROR) << "Operation extract heartbeat message has error: " << heartbeatMessage.getErrorMsg() << flush;
935  else
936  {
937  _clientsManager->refresh(clientIdentity, clientQueue, heartbeatMessage.getResources());
938 
939  logger.log(NODE_LOG_MODE_TRACE) << "DataServerProxy::processControlMessage _clientsManager->refresh(" << clientIdentity << ")" << flush;
940 
941  RouteMessage routeMsg(_clientsQueue);
942  std::string route = routeMsg.build(NODE_MSG_ROUTES);
943  if (routeMsg.isError())
944  logger.log(NODE_LOG_MODE_ERROR) << "Operation build route message has error: " << routeMsg.getErrorMsg() << flush;
945  else
946  {
947  logger.log(NODE_LOG_MODE_TRACE) << "DataServerProxy::processControlMessage route: " << route << flush;
948  s_send(*_inprocProxySock, route);
949  }
950  }
951  logger.log(NODE_LOG_MODE_DEBUG) << "HEARTBEAT message from client [" << clientIdentity << "] received" << flush;
952 
953  }else if(addr == NODE_MSG_BYE){
955  _clientsManager->remove(clientIdentity);
956 
957  logger.log(NODE_LOG_MODE_DEBUG) << "BYE message from client [" << clientIdentity << "] received" << flush;
958  }else{
960  logger.log(NODE_LOG_MODE_DEBUG) << "INVALID command message from client [" << clientIdentity << "]" << flush;
961  logger.log(NODE_LOG_MODE_TRACE) << msg.dump() << flush;
962  }
963  }
964 
967  std::string clientIdentity((const char*)msg.pop_front().c_str());
968 
969  if(!_clientsManager->refresh(clientIdentity)){
970  logger.log(NODE_LOG_MODE_DEBUG) << "MSG STRUCTURE ERROR wrong client identity [" << clientIdentity << "]"
971  << endl << "Clients:[" << _clientsManager->getDump() << "]" << flush;
972  logger.log(NODE_LOG_MODE_TRACE) << msg.dump() << flush;
973  }
974 
976  std::string messageId((const char*)msg.pop_front().c_str());
978  std::string messageBody((const char*)msg.pop_front().c_str());
980  logger.log(NODE_LOG_MODE_DEBUG) << "RECEIVED RESPONSE from down client [" << clientIdentity << "], message [" << messageId << "] body " << messageBody.size()
981  << " bytes n=" << getStatClientResponses() << flush;
982 
983  std::string messageRoute((msg.parts())?(const char*)msg.pop_front().c_str():"");
984 
985  logger.log(NODE_LOG_MODE_TRACE) << "DataServerProxy::processDataMessage messageRoute: " << messageRoute << flush;
986 
987  size_t pos = messageRoute.find_last_of(ROUTE_LIST_COUNT_DELIMITER);
989  std::stringstream clientsNumberStream;
990 
991  if (pos!=std::string::npos && (pos+1)!=std::string::npos) {
992  unsigned int clientsNumber = 0;
993  std::istringstream(messageRoute.substr(pos+1)) >> clientsNumber;
994  if (clientsNumber && clientsNumber < _clientsQueue.size()) {
995  clientsNumberStream << clientsNumber;
996  }
997  else
998  clientsNumberStream << _clientsQueue.size();
999 
1000  messageRoute = messageRoute.substr(0, pos);
1001  }else{
1002  clientsNumberStream << _clientsQueue.size();
1003  }
1004 
1005  logger.log(NODE_LOG_MODE_TRACE) << "DataServerProxy::processDataMessage messageRoute: '" << messageRoute << "', clientsNumberStream: '" << clientsNumberStream.str() << "'" << flush;
1006 
1008  msg.append(messageId.c_str());
1010  msg.append(messageBody.c_str());
1011 
1013  msg.append(messageRoute.c_str());
1014 
1016  msg.append(clientsNumberStream.str().c_str());
1017 
1019  msg.send(*_inprocReducerInSock);
1020 
1021  logger.log(NODE_LOG_MODE_DEBUG) << "RESPONSE SENT to inproc_reducer_in socket, message [" << messageId << "] body " << messageBody.size() << " bytes sent" << flush;
1022  }
1023 
1025  EmptyResponseType responseType,
1026  const std::string& messageRoute,
1027  const std::string& errorMsg){
1028 
1029  HCE::JsonMessageCover msgCover(messageBody);
1030  std::string resultMessageBody;
1031  std::stringstream errorMessage;
1032 
1033  switch(msgCover.getType()){
1034  case HCE::types::MessageType::mtSphinx: {
1035 
1036  unsigned int errorCode = HCE::sphinx::NO_ERROR;
1037 
1038  if (responseType==EmptyResponseType::ertNoClients){
1042  }else if(responseType==EmptyResponseType::ertBadNodeMode){
1046  }else if(responseType==EmptyResponseType::ertNotEnoughResourceLimits){
1050  }
1051 
1052  if (!errorMsg.empty()){
1053  errorMessage << ". " << errorMsg;
1054  }
1055  resultMessageBody = HCE::sphinx::SphinxFunctionalObject::makeErrorMessage(msgCover.getData(),
1056  errorCode,
1057  errorMessage.str());
1058  }
1059  break;
1060  case HCE::types::MessageType::mtDrce: {
1061 
1062  unsigned int errorCode = HCE::drce::NO_ERROR;
1063 
1064  if (responseType==EmptyResponseType::ertNoClients){
1068  }
1069  else if(responseType==EmptyResponseType::ertBadNodeMode){
1073  }else if(responseType==EmptyResponseType::ertNotEnoughResourceLimits){
1077  }
1078 
1079  if (!errorMsg.empty()){
1080  errorMessage << ". " << errorMsg;
1081  }
1082  resultMessageBody = HCE::drce::DRCEFunctionalObject::makeErrorMessage(msgCover.getData(),
1083  HCE::drce::DRCETaskRequest::TaskState::NOT_SET_AS_NEW,
1084  errorCode,
1085  errorMessage.str());
1086  }
1087  break;
1088  default:;
1089  }
1090 
1091  std::string jsonMsg;
1092  msgCover.setData(resultMessageBody);
1093  if(!msgCover.serialize(jsonMsg)){
1094  logger.log(NODE_LOG_MODE_ERROR) << "Serialization error of cover object from empty results " << msgCover.getErrorCode() << " : " << msgCover.getErrorMsg() << flush;
1095  }
1096 
1097  return jsonMsg;
1098  }
1099 
1102  zmsg msg(*_dataServerSock);
1103 
1104  logger.log(NODE_LOG_MODE_TRACE) << "MSG FROM dataserver" << endl << msg.dump() << flush;
1105 
1107  if(msg.parts() == NODE_MSG_DATA_MIN_FIELDS){
1109  processControlMessage(msg);
1110  }else if(msg.parts() > NODE_MSG_DATA_MIN_FIELDS){
1112  unsigned int statClientResponses = getStatClientResponses();
1113  setStatClientResponses(++statClientResponses);
1115  processDataMessage(msg);
1116  }else{
1117  logger.log(NODE_LOG_MODE_TRACE) << "ERROR of message structure, wrong fields number " << msg.parts() << endl << msg.dump() << flush;
1118  }
1119  }
1120 
1121  std::string DataServerProxy::processAdminCommands(std::string& command, std::string& parameters){
1122  std::string ret = NODE_ADMIN_ERROR_OK;
1123 
1124  logger.log(NODE_LOG_MODE_DEBUG) << "Admin command requested [" << command << "], parameters [" << parameters << "]" << flush;
1125 
1127  if(command == NODE_MSG_STAT){
1128  std::stringstream result;
1129  result << NODE_ADMIN_ERROR_OK << NODE_ADMIN_COMMAND_DELIMITER << "clientRequests=" << getStatClientRequests() << ",clientResponses=" << getStatClientResponses() << ",clientRequestsSize="
1130  << getStatClientRequestsSize() << ",clientResponsesSize=" << getStatClientResponsesSize() << ",Requests=" << getStatRequests() << ",requestsSize=" << getStatRequestsSize();
1131  ret = result.str();
1132  }else if(command == NODE_MSG_rebuildServerConnection){
1135  }else if(command == NODE_MSG_DISCONNECT_SERVER_CONNECTION){
1138  }else if(command == NODE_MSG_UPDATE_SCHEMA){
1139  setConnectionString(parameters);
1140  }else if(command == NODE_MSG_PROPERTIES){
1142  ret = getPropertyInformation();
1143  }else if(command == NODE_MSG_MANAGER_MODE_GET){
1144  ret = NODE_ADMIN_ERROR_OK + NODE_ADMIN_COMMAND_DELIMITER + NODE_ADMIN_PROPERTY_NAME_MMODE + "=" + std::to_string(static_cast<unsigned int>(getNodeMode()));
1145  }else if(command == NODE_MSG_MANAGER_MODE_SET){
1147  unsigned int nodeMode = static_cast<unsigned int>(getNodeMode());
1148  //Parse manager mode from parameters
1149  std::istringstream(parameters) >> nodeMode;
1150  //Make response
1151  ret = NODE_ADMIN_ERROR_OK + NODE_ADMIN_COMMAND_DELIMITER + NODE_ADMIN_PROPERTY_NAME_MMODE + "=" + std::to_string(static_cast<unsigned int>(getNodeMode()));
1152  //Set new manager mode
1153  setNodeMode(static_cast<unsigned char>(nodeMode));
1154  }else if(command == NODE_MSG_MANAGER_RESOURCES_COLLECTED_SIZE_GET){
1156  }else if(command == NODE_MSG_MANAGER_RESOURCES_COLLECTED_SIZE_SET){
1158  unsigned int resourcesUsageCollectedSize = getResourcesUsageCollectedSize();
1159  //Parse resources usage collected size from parameters
1160  std::istringstream(parameters) >> resourcesUsageCollectedSize;
1161  //Make response
1163  //Set new resources usage collected size
1164  _resourceUsageManager.setMaxSize(resourcesUsageCollectedSize);
1165  _clientsManager->setMaxResourcesUsageCollectedSize(resourcesUsageCollectedSize);
1166  setResourcesUsageCollectedSize(resourcesUsageCollectedSize);
1167  }else if(command == NODE_MSG_MANAGER_PURGE_MODE_GET){
1168  ret = NODE_ADMIN_ERROR_OK + NODE_ADMIN_COMMAND_DELIMITER + NODE_ADMIN_PROPERTY_NAME_MMODE + "=" + std::to_string(static_cast<unsigned int>(_clientsManager->getPurgeMode()));
1169  }else if(command == NODE_MSG_MANAGER_PURGE_MODE_SET){
1171  int purgeMode = 0;
1173  std::istringstream(parameters) >> purgeMode;
1175  ret = NODE_ADMIN_ERROR_OK + NODE_ADMIN_COMMAND_DELIMITER + NODE_ADMIN_PROPERTY_NAME_MMODE + "=" + std::to_string(static_cast<unsigned int>(_clientsManager->getPurgeMode()));
1177  _clientsManager->setPurgeMode(purgeMode);
1178  }else if(command == NODE_MSG_HEARTBEAT_DELAY_GET){
1180  }else if(command == NODE_MSG_HEARTBEAT_DELAY_SET){
1182  int hbDelay = HEARTBEAT_DELAY;
1184  std::istringstream(parameters) >> hbDelay;
1187  setHeartbeatDelay(static_cast<unsigned int>(hbDelay*1000));
1188  }else if(command == NODE_MSG_HEARTBEAT_TIMEOUT_GET){
1190  }else if(command == NODE_MSG_HEARTBEAT_TIMEOUT_SET){
1192  int hbTimeout = HEARTBEAT_TIMEOUT;
1194  std::istringstream(parameters) >> hbTimeout;
1197  setHeartbeatTimeout(static_cast<unsigned int>(hbTimeout*1000));
1199  }else if(command == NODE_MSG_HEARTBEAT_MODE_GET){
1201  }else if(command == NODE_MSG_HEARTBEAT_MODE_SET){
1203  int hbMode = HEARBEAT_MODE_DEFAULT;
1205  std::istringstream(parameters) >> hbMode;
1208  setHeartbeatMode(static_cast<unsigned int>(hbMode));
1209  }else if(command == NODE_MSG_ROUTES){
1211  std::vector<ClientWorkerItem> clientsQueue;
1212  RouteMessage routeMessage(clientsQueue);
1213  ClientsQueueManager clientsManager(&clientsQueue, 0);
1214  clientsManager.refresh(getClientIdentity(), _clientsQueue);
1215  ret = routeMessage.build();
1216  }else if(command == NODE_MSG_RESOURCE_USAGE){
1218  std::string json = "";
1221  HCE::drce::ResourceUsageSerializator resourceUsageSerializator(resourceUsage);
1222  resourceUsageSerializator.serialize(json);
1223  if (resourceUsageSerializator.isError())
1224  logger.log(NODE_LOG_MODE_ERROR) << "Get resource usage error: "<< resourceUsageSerializator.getErrorMsg() << HCE::flush;
1225  ret = json;
1226  }else{
1229  }
1230  return ret;
1231  }
1232 
1235  std::stringstream result;
1236  result << NODE_ADMIN_ERROR_OK
1237  << NODE_ADMIN_COMMAND_DELIMITER << getProperties() << "clientRequests=" << getStatClientRequests()
1238  << NODE_ADMIN_COMMAND_DELIMITER << "clientResponses=" << getStatClientResponses()
1239  << NODE_ADMIN_COMMAND_DELIMITER << "clientRequestsSize=" << getStatClientRequestsSize()
1240  << NODE_ADMIN_COMMAND_DELIMITER << "clientResponsesSize=" << getStatClientResponsesSize()
1241  << NODE_ADMIN_COMMAND_DELIMITER << "requests=" << getStatRequests()
1242  << NODE_ADMIN_COMMAND_DELIMITER << "requestsSize=" << getStatRequestsSize()
1243  << NODE_ADMIN_COMMAND_DELIMITER << "nodeMode=" << static_cast<int>(getNodeMode())
1245  << NODE_ADMIN_COMMAND_DELIMITER << "statQPSClient=" << getStatQPSClient()
1246  << NODE_ADMIN_COMMAND_DELIMITER << "statQPSServer=" << getStatQPSServer()
1247  << NODE_ADMIN_COMMAND_DELIMITER << "statRPSClient=" << getStatRPSClient()
1248  << NODE_ADMIN_COMMAND_DELIMITER << "statRPSServer=" << getStatRPSServer()
1249  << NODE_ADMIN_COMMAND_DELIMITER << "statRequestsBandwidthClient=" << getStatRequestsBandwidthClient()
1250  << NODE_ADMIN_COMMAND_DELIMITER << "statRequestsBandwidthServer=" << getStatRequestsBandwidthServer()
1251  << NODE_ADMIN_COMMAND_DELIMITER << "statResponsesBandwidthClient=" << getStatResponsesBandwidthClient()
1252  << NODE_ADMIN_COMMAND_DELIMITER << "statResponsesBandwidthServer=" << getStatResponsesBandwidthServer();
1253  return result.str();
1254  }
1255 
1258  if(s_clock() > _pstatsAt){
1259 
1263  setStatRPSServer(0);
1268 
1275 
1276  logger.log(NODE_LOG_MODE_DEBUG) << "SRequests=" << getStatRequests() << ", CRequests=" << getStatClientRequests() << ", CResponses=" << getStatClientResponses()
1277  << ", SQPS=" << getStatQPSServer() << ", CQPS=" << getStatQPSClient() << ", CRPS=" << getStatRPSClient() << ", SRequestsSize=" << getStatRequestsSize()
1278  << ", CRequestsSize=" << getStatClientRequestsSize() << ", CResponsesSize=" << getStatClientResponsesSize()
1279  << ", SRequestsBandwidth=" << getStatRequestsBandwidthServer() << ", CRequestsBandwidth=" << getStatRequestsBandwidthClient()
1280  << ", CResponsesBandwidth=" << getStatResponsesBandwidthClient() << flush;
1281 
1282  //Shift next log stat action time
1283  _pstatsAt = s_clock() + STAT_INTERVAL;
1284  }
1285  }
1286 
1288 
1289  logger.log(NODE_LOG_MODE_TRACE) << "DataServerProxy::heartbit HeartbeatMode: " << getHeartbeatMode() << " _heartbeatAt = '"
1290  << Poco::DateTimeFormatter::format(Poco::LocalDateTime(Poco::Timestamp(Poco::Timestamp::fromEpochTime(_heartbeatAt/1000))), "%Y-%m-%d %H:%M:%S") << "'" << flush;
1291 
1293  if(s_clock() > _heartbeatAt){
1295 
1296  for(std::vector<ClientWorkerItem>::iterator it = _clientsQueue.begin(); it < _clientsQueue.end(); ++it){
1297  it->shiftCounter.responceTimeStamp(it->rtimestamp);
1298  it->shiftCounter.recount(getHeartbeatTimeout());
1299 
1300  logger.log(NODE_LOG_MODE_TRACE) << "[ " << it->identity << " ] delta = " << abs(s_clock()-_heartbeatAt+getHeartbeatDelay())
1301  << " _shiftCounter.getShift()= " << it->shiftCounter.getShift() << flush;
1302 
1303  size_t delta = s_clock()-_heartbeatAt+getHeartbeatDelay();
1304  if (delta > (getHeartbeatTimeout()+it->shiftCounter.getShift())){
1305 
1307  sendHeartbeat(it->identity);
1309  it->shiftCounter.reset();
1310 
1311  logger.log(NODE_LOG_MODE_DEBUG) << "SENT HEARTBIT from client [" << getClientIdentity() << "] to server _heartbeatAt = '"
1312  << Poco::DateTimeFormatter::format(Poco::LocalDateTime(Poco::Timestamp(Poco::Timestamp::fromEpochTime(_heartbeatAt/1000))), "%Y-%m-%d %H:%M:%S") << "'" << flush;
1313  }
1314  }
1316  }else{
1317 
1318  for(std::vector<ClientWorkerItem>::iterator it = _clientsQueue.begin(); it != _clientsQueue.end(); ++it){
1320  sendHeartbeat(it->identity);
1321  logger.log(NODE_LOG_MODE_DEBUG) << "HEARTBEAT message for client [" << it->identity.c_str() << "] sent by timer reason" << flush;
1322  }
1323  }
1325  _heartbeatAt = s_clock() + getHeartbeatDelay();
1326  }
1327  }
1328 
1329  void DataServerProxy::sendHeartbeat(const std::string& identity){
1331  zmsg msg(NODE_MSG_HEARTBEAT.c_str());
1332  msg.wrap(identity.c_str(), NULL);
1333  msg.send(*_dataServerSock);
1334  }
1335 
1338 
1339  while(getInProgress()){
1340  try {
1341  std::vector<zmq::pollitem_t> items;
1342  if(_dataServerSock->connected()){
1343  items ={{*_dataServerSock, 0, ZMQ_POLLIN, 0},{*_inprocProxySock, 0, ZMQ_POLLIN, 0},{*_inprocAdminSock, 0, ZMQ_POLLIN, 0}};
1344  } else{
1345  items ={{*_inprocProxySock, 0, ZMQ_POLLIN, 0},{*_inprocAdminSock, 0, ZMQ_POLLIN, 0}};
1346  }
1347 
1349  zmq::poll(&items[0], items.size(), getPollTimeout());
1350 
1352  if(_dataServerSock->connected()){
1353  if(items[0].revents & ZMQ_POLLIN){
1355  }
1356  }
1357 
1359  if(items[items.size() - 2].revents & ZMQ_POLLIN){
1361  }
1362 
1364  if(items[items.size() - 1].revents & ZMQ_POLLIN){
1366  }
1367 
1369  heartbit();
1370 
1373 
1376 
1379 
1381  std::string r = _clientsManager->purge();
1382  if(r != ""){
1383  logger.log(NODE_LOG_MODE_INFORMATION) << "PURGED_CLIENTS: " << r << flush;
1384  }
1385  }catch(zmq::error_t& error){
1386  if (EINTR!=error.num()){
1387  std::cout << "DataServerProxy E: " << error.what() << std::endl;
1388  }
1389  }catch(std::exception& e){
1390  std::cout << "DataServerProxy E: " << e.what() << std::endl;
1391  }catch(...){
1392  std::cout << "DataServerProxy E: Unknown exception" << std::endl;
1393  }
1394  }
1395 
1397 
1398  return NULL;
1399  }
1400 
1401  }
1402 }