hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
main.cpp
Go to the documentation of this file.
1 
23 #include "main.hpp"
24 
27 //-----------------------------------------------------------------------------
29 {
30  std::stringstream outMsg;
31  outMsg << "Application recieved signal number: " << sig;
32  Poco::Logger::root().log(Poco::Message("HCE cluster node", outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
33 
34  if (pHandlerAdmin)
35  {
36  pHandlerAdmin->stopClients();
37  pHandlerAdmin->shutdown();
38  }
39 }
40 //-----------------------------------------------------------------------------
41 void signalInstall(void)
42 {
43  struct sigaction newAction, oldAction;
44  newAction.sa_handler = applicationSignalHandler;
45  newAction.sa_flags = 0;
46  sigemptyset(&newAction.sa_mask);
47 
48  sigaction (SIGHUP, 0, &oldAction);
49  if (oldAction.sa_handler != SIG_IGN)
50  sigaction (SIGHUP, &newAction, 0);
51 
52  sigaction (SIGTERM, 0, &oldAction);
53  if (oldAction.sa_handler != SIG_IGN)
54  sigaction (SIGTERM, &newAction, 0);
55 
56  sigset_t blockMask;
57  sigemptyset (&blockMask);
58  sigaddset (&blockMask, SIGCHLD);
59  sigaddset (&blockMask, SIGTSTP);
60  sigaddset (&blockMask, SIGTTOU);
61  sigaddset (&blockMask, SIGTTIN);
62  sigprocmask (SIG_BLOCK, &blockMask, 0);
63 }
64 //-----------------------------------------------------------------------------
66 {
70 };
71 //-----------------------------------------------------------------------------
73 {
74  BgStatus result = BACKGROUND_ERROR;
75 
76  /* Fork off the parent process */
77  switch(fork())
78  {
79  case -1: exit(EXIT_FAILURE);
80  break;
81  case 0:
82  {
83  /* Change the file mode mask */
84  umask(0);
85 
86  /* Create a new SID for the child process */
87  if(setsid() != -1)
88  {
89  /* Change the current working directory */
90  if(chdir("/") != -1)
91  {
92  /* Close out the standard file descriptors */
93  if(close(STDIN_FILENO) != -1)
94  if(close(STDOUT_FILENO) != -1)
95  if(close(STDERR_FILENO) != -1)
96  if(open("/dev/null",O_RDONLY) != -1)
97  if(open("/dev/null",O_WRONLY) != -1)
98  if(dup(1) != -1)
99  result = BACKGRAUND_OK;
100  }
101  }
102  }
103  break;
104  /* If we got a good PID, then
105  we can exit the parent process. */
106  default: result = IGNORE_PARENT; // here instead 'exit(EXIT_SUCCESS);'
107  }
108  return result;
109 }
110 //-----------------------------------------------------------------------------
111 int main(int argc, char* argv[]){
113  std::map<std::string, std::string> actualOptions;
114  actualOptions.insert(std::pair<std::string, std::string>(NODE_CLI_NAME, ""));
115  actualOptions.insert(std::pair<std::string, std::string>(NODE_CLI_ADMIN, "tcp://*:5546"));
116  actualOptions.insert(std::pair<std::string, std::string>(NODE_CLI_CLIENT, "tcp://*:5556"));
117  actualOptions.insert(std::pair<std::string, std::string>(NODE_CLI_SERVER, "tcp://*:5557"));
118  actualOptions.insert(std::pair<std::string, std::string>(NODE_CLI_DATAMODE, "1"));
119  actualOptions.insert(std::pair<std::string, std::string>(NODE_CLI_ROLE, "router"));
120  actualOptions.insert(std::pair<std::string, std::string>(NODE_CLI_INI_FILE, ""));
121  actualOptions.insert(std::pair<std::string, std::string>(NODE_CLI_LOG_MODE, "hce-node_log.ini"));
122  actualOptions.insert(std::pair<std::string, std::string>(NODE_CLI_HELP, " "));
123  actualOptions.insert(std::pair<std::string, std::string>(NODE_CLI_VERSION, " "));
124  actualOptions.insert(std::pair<std::string, std::string>(NODE_CLI_DEMONIZE, "0"));
125 
127  std::map<std::string, int> nodeModes;
128  nodeModes.insert(std::pair<std::string, int>(NODE_CLI_ROLE_RMANAGER_RND, HCE::handlers::NODE_MODE_SHARD_RND));
129  nodeModes.insert(std::pair<std::string, int>(NODE_CLI_ROLE_ROUTER, HCE::handlers::NODE_MODE_ROUTER));
130  nodeModes.insert(std::pair<std::string, int>(NODE_CLI_ROLE_REPLICA, HCE::handlers::NODE_MODE_DATA));
131  nodeModes.insert(std::pair<std::string, int>(NODE_CLI_ROLE_RMANAGER, HCE::handlers::NODE_MODE_SHARD));
132  nodeModes.insert(std::pair<std::string, int>(NODE_CLI_ROLE_RMANAGER_ROUND_ROBIN, HCE::handlers::NODE_MODE_SHARD));
133  nodeModes.insert(std::pair<std::string, int>(NODE_CLI_ROLE_RMANAGER_RESOURCES_USAGE, HCE::handlers::NODE_MODE_SHARD_RU));
134  nodeModes.insert(std::pair<std::string, int>(NODE_CLI_ROLE_SMANAGER, HCE::handlers::NODE_MODE_PROXY));
135 
137  Poco::Util::OptionSet knownOptions;
138  knownOptions.addOption(Poco::Util::Option(NODE_CLI_NAME, "n", "node name", false, NODE_CLI_NAME));
139  knownOptions.addOption(Poco::Util::Option(NODE_CLI_ADMIN, "a", "admin listen zmq connection", false, NODE_CLI_ADMIN));
140  knownOptions.addOption(Poco::Util::Option(NODE_CLI_CLIENT, "c", "client listen zmq connection", false, NODE_CLI_CLIENT));
141  knownOptions.addOption(Poco::Util::Option(NODE_CLI_SERVER, "s", "server listen zmq connection", false, NODE_CLI_SERVER));
142  knownOptions.addOption(Poco::Util::Option(NODE_CLI_DATAMODE, "m", "mode 1 - regular, 0 - test only", false, NODE_CLI_DATAMODE));
143  knownOptions.addOption(Poco::Util::Option(NODE_CLI_ROLE, "r", "node role: router, smanager, rmanager, replica", false, NODE_CLI_ROLE));
144  knownOptions.addOption(Poco::Util::Option(NODE_CLI_INI_FILE, "i", "ini file name", false, NODE_CLI_INI_FILE));
145  knownOptions.addOption(Poco::Util::Option(NODE_CLI_LOG_MODE, "l", "log configuration file name", false, NODE_CLI_LOG_MODE));
146  knownOptions.addOption(Poco::Util::Option(NODE_CLI_HELP, "h", "display help", false, NODE_CLI_HELP));
147  knownOptions.addOption(Poco::Util::Option(NODE_CLI_VERSION, "v", "display version", false, NODE_CLI_VERSION));
148  knownOptions.addOption(Poco::Util::Option(NODE_CLI_DEMONIZE, "d", "demonize", false, NODE_CLI_DEMONIZE));
149 
151  Poco::Util::OptionProcessor op(knownOptions);
152  op.setUnixStyle(true);
153  for(int i = 1; i < argc; i++){
154  std::string name;
155  std::string value;
156  try{
157  if(op.process(std::string(argv[i]), name, value)){
158  if((name != NODE_CLI_HELP && !value.empty()) || (name == NODE_CLI_HELP && value.empty()) || (name == NODE_CLI_VERSION && value.empty()) ){
159  //std::cout << "[" << name << "]" << "=[" << value << "]" << std::endl;
160  std::map<std::string, std::string>::iterator actualOption;
161  actualOption = actualOptions.find(name);
162  if(actualOption != actualOptions.end()){
163  //std::cout << "found: " << actualOption->second << std::endl;
164  actualOption->second = value;
165  }
166  }
167  }
168  } catch(Poco::Exception& exc){
169  std::cerr << exc.displayText() << std::endl;
170  ::exit(1);
171  }
172  }
173 
174  std::map<std::string, std::string>::iterator actualOption;
176  actualOption = actualOptions.find(NODE_CLI_HELP);
177  if(actualOption != actualOptions.end() && actualOption->second == ""){
178  std::cout << "Version: " << APP_VERSION << "\nUsage: " << argv[0] << " [--" << NODE_CLI_HELP << "] [--" << NODE_CLI_ADMIN << "=<tcp://<admin_address|*:port>] [--" << NODE_CLI_CLIENT
179  << "=[tcp://<client_address:port]] [--" << NODE_CLI_SERVER << "=<tcp://<server_address|*:port>] [--" << NODE_CLI_NAME << "=<nodeName>] [--" << NODE_CLI_DATAMODE
180  << "=<0 - randomizer, 1 - real data source>] " << "[--" << NODE_CLI_ROLE << "=<router | smanager | rmanager | replica> [--" << NODE_CLI_INI_FILE << "=<iniFile>] [--"
181  << NODE_CLI_LOG_MODE << "=<logger config file name>] [--" << NODE_CLI_VERSION << "] [--" << NODE_CLI_DEMONIZE << "=<0 - without background, 1 - use background>]"
182  << std::endl;
183  ::exit(0);
184  }
185 
187  actualOption = actualOptions.find(NODE_CLI_VERSION);
188  if(actualOption != actualOptions.end() && actualOption->second == ""){
189  std::cout << APP_VERSION << std::endl;
190  ::exit(0);
191  }
192 
193  //If name not set - init with hostname
194  actualOption = actualOptions.find(NODE_CLI_NAME);
195  if(actualOption != actualOptions.end() && actualOption->second == ""){
196  //Set name from hostname
197  struct utsname name;
198  if(::uname(&name) > -1){
199  actualOption->second = name.nodename;
200  }
201  }
202 
203  /*
204  for(actualOption = actualOptions.begin(); actualOption != actualOptions.end(); ++actualOption){
205  std::cout << actualOption->first << "=[" << actualOption->second << "]" << std::endl;
206  }
207  */
208 
210  Poco::Util::IniFileConfiguration::Keys loggers;
211  loggers.push_back(HCE_MAIN_LOGGER);
212  //Init vars
214  std::string nodeName = actualOptions.find(NODE_CLI_NAME)->second;
216  std::string adminConnection = actualOptions.find(NODE_CLI_ADMIN)->second;
217  unsigned int nodeMode = nodeModes.find(actualOptions.find(NODE_CLI_ROLE)->second)->second;
219  std::string dataServerConnection = "0";
220  std::string dataClientConnection = "0";
221  std::string routerServerConnection = "0";
222  if(nodeMode == HCE::handlers::NODE_MODE_ROUTER){
223  //Init router
224  routerServerConnection = actualOptions.find(NODE_CLI_CLIENT)->second;
225  dataServerConnection = actualOptions.find(NODE_CLI_SERVER)->second;
226  }else if(nodeMode == HCE::handlers::NODE_MODE_DATA){
227  //Init replica
228  dataClientConnection = actualOptions.find(NODE_CLI_CLIENT)->second;
230  loggers.push_back(DRCE_TASKS_LOGGER);
231  }else{
232  //Init shard manager or replica manager
233  dataServerConnection = actualOptions.find(NODE_CLI_SERVER)->second;
234  dataClientConnection = actualOptions.find(NODE_CLI_CLIENT)->second;
235  }
237  Poco::Path iniFilePath(actualOptions.find(NODE_CLI_INI_FILE)->second);
238  std::string iniFile = iniFilePath.absolute().toString();
240  unsigned int clientDataMode = 1;
241  std::istringstream(actualOptions.find(NODE_CLI_DATAMODE)->second) >> clientDataMode;
243  std::string configFileName;
244  std::istringstream(actualOptions.find(NODE_CLI_LOG_MODE)->second) >> configFileName;
246 // Poco::Util::IniFileConfiguration::Keys loggers;
247 // loggers.push_back(HCE_MAIN_LOGGER);
248 // loggers.push_back(DRCE_TASKS_LOGGER);
250  HCE::handlers::LoggerConfigLoader loggerConfigLoader(nodeName, loggers);
251  if (!loggerConfigLoader.loadConfig(configFileName))
252  {
253  std::cerr << loggerConfigLoader.getErrorMsg() << std::endl;
254  std::cerr << "Logger ini file load error, possible file not found, not proper format or another cause..." << std::endl << "Node not started!" << std::endl;
255  ::exit(2);
256  }
257 
259  unsigned int clientDemonize = 1;
260  std::istringstream(actualOptions.find(NODE_CLI_DEMONIZE)->second) >> clientDemonize;
262  unsigned int DEBUG_LEVEL = loggerConfigLoader.getDebugLevel();
264  HCE::LoggerStream logger(nodeName);
265 
267  if (clientDemonize == 1) {
269  if (daemon(0, 0) == 0) {
270  logger.log(HCE::LoggerStream::Priority::PRIO_INFORMATION) << "Demonization success..." << HCE::flush;
271  }
272  else {
273  std::cerr << "Demonization fail: " << strerror(errno) << std::endl;
274  ::exit(3);
275  }
276 /*
277  BgStatus rc = gotoBackgraund();
278  if (rc == BACKGROUND_ERROR){
279  std::cerr << "Demonization fail: " << strerror(errno) << std::endl;
280  ::exit(3);
281  }
282  else if (rc == IGNORE_PARENT){
283  return 0;
284  }
285  else{
286  logger.log(HCE::LoggerStream::Priority::PRIO_INFORMATION) << "Demonization success..." << HCE::flush;
287  }
288 */
289  }
290 
292  signalInstall();
294  s_version_assert(2, 1);
296  int64_t startMark = s_clock();
298  srandom((unsigned)time(NULL));
300  zmq::context_t context(1);
301 
303  pthread_t threadDataServer = 0;
304  pthread_t threadDataProcessor = 0;
305  pthread_t threadDataClient = 0;
306  pthread_t threadDataReducer = 0;
307  pthread_t threadRouterServer = 0;
308  pthread_t threadAdmin = 0;
309 
311  logger.log(HCE::LoggerStream::Priority::PRIO_INFORMATION)
312  << std::setfill('0') << std::setw(8) << (int)(s_clock() - startMark) << " [] HCE cluster node, version " << APP_VERSION << HCE::flush;
313 
315  logger.log(HCE::LoggerStream::Priority::PRIO_INFORMATION)
316  << std::setfill('0') << std::setw(8) << (int)(s_clock() - startMark) << " [] ADMIN handler creation..." << HCE::flush;
317 
318  HCE::handlers::Admin handlerAdmin(std::string("Admin"), context, nodeName, adminConnection, startMark, static_cast<unsigned char>(DEBUG_LEVEL), iniFile);
319  pHandlerAdmin = &handlerAdmin;
321  if(::pthread_create(&threadAdmin, NULL, &HCE::handlers::Admin::main, &handlerAdmin)){
322  logger.log(HCE::LoggerStream::Priority::PRIO_FATAL) << "Error create thread [" << handlerAdmin.getName() << "]!" << HCE::flush;
323  }else{
325  switch(nodeMode){
326  case 0:
328  case 1:
329  case 4:
330  case 5: {
332  //Shard manager mode
333  logger.log(HCE::LoggerStream::Priority::PRIO_INFORMATION) << std::setfill('0') << std::setw(8) << (int)(s_clock() - startMark)
334  << " [] NODE IN SHARD MANAGER MODE - sends requests to all connected data nodes multicast way" << HCE::flush;
335  }else{
336  //Replica manager mode
337  logger.log(HCE::LoggerStream::Priority::PRIO_INFORMATION) << std::setfill('0') << std::setw(8) << (int)(s_clock() - startMark)
338  << " [] NODE IN REPLICA MANAGER MODE - sends requests to only one of connected data nodes round-robin way" << HCE::flush;
339  }
341  HCE::handlers::DataReducerProxy handlerDataReducerProxy(std::string("DataReducerProxy"), context, nodeName, nodeMode, startMark, (unsigned char)DEBUG_LEVEL, iniFile);
342  if(::pthread_create(&threadDataReducer, NULL, &HCE::handlers::DataReducerProxy::main, &handlerDataReducerProxy)){
343  logger.log(HCE::LoggerStream::Priority::PRIO_FATAL) << "Error create thread [" << handlerDataReducerProxy.getName() << "]!" << HCE::flush;
344  }else{
346  s_sleep(100);
348  HCE::handlers::DataServerProxy handlerDataServerProxy(std::string("DataServerProxy"), context, nodeName, dataServerConnection, nodeMode, startMark, (unsigned char)DEBUG_LEVEL, iniFile);
349  if(::pthread_create(&threadDataServer, NULL, &HCE::handlers::DataServerProxy::main, &handlerDataServerProxy)){
350  logger.log(HCE::LoggerStream::Priority::PRIO_FATAL) << "Error create thread [" << handlerDataServerProxy.getName() << "]!" << HCE::flush;
351  }else{
353  s_sleep(100);
355  HCE::handlers::DataClientProxy handlerDataClientProxy(std::string("DataClientProxy"), context, nodeName, dataClientConnection, startMark, (unsigned char)DEBUG_LEVEL, iniFile);
356  if(::pthread_create(&threadDataClient, NULL, &HCE::handlers::DataClientProxy::main, &handlerDataClientProxy)){
357  logger.log(HCE::LoggerStream::Priority::PRIO_FATAL) << "Error create thread [" << handlerDataClientProxy.getName() << "]!" << HCE::flush;
358  }else{
360  ::pthread_join(threadAdmin, NULL);
361  logger.log(HCE::LoggerStream::Priority::PRIO_DEBUG) << "The thread [" << pHandlerAdmin->getName() << "] was joined!" << HCE::flush;
362 
364  handlerDataClientProxy.shutdown();
365  handlerDataServerProxy.shutdown();
366  handlerDataReducerProxy.shutdown();
368  ::pthread_join(threadDataClient, NULL);
369  logger.log(HCE::LoggerStream::Priority::PRIO_DEBUG) << "The thread [" << handlerDataClientProxy.getName() << "] was joined!" << HCE::flush;
371  ::pthread_join(threadDataServer, NULL);
372  logger.log(HCE::LoggerStream::Priority::PRIO_DEBUG) << "The thread [" << handlerDataServerProxy.getName() << "] was joined!" << HCE::flush;
374  ::pthread_join(threadDataReducer, NULL);
375  logger.log(HCE::LoggerStream::Priority::PRIO_DEBUG) << "The thread [" << handlerDataReducerProxy.getName() << "] was joined!" << HCE::flush;
376  }
377  }
378  }
379  break;
380  }
381  case 2: {
383  logger.log(HCE::LoggerStream::Priority::PRIO_INFORMATION) << std::setfill('0') << std::setw(8) << (int)(s_clock() - startMark) << " [] NODE IN DATA MODE" << HCE::flush;
385  HCE::handlers::DataProcessorData handlerDataProcessorData(nodeName, std::string("DataProcessorData"), context, startMark, (unsigned char)DEBUG_LEVEL, iniFile);
386  if(::pthread_create(&threadDataProcessor, NULL, &HCE::handlers::DataProcessorData::main, &handlerDataProcessorData)){
387  logger.log(HCE::LoggerStream::Priority::PRIO_FATAL) << "Error create thread [" << handlerDataProcessorData.getName() << "]!" << HCE::flush;
388  }else{
390  s_sleep(100);
392  HCE::handlers::DataClientData handlerDataClientData(std::string("DataClientData"), context, nodeName, dataClientConnection, (unsigned char)clientDataMode, startMark,
393  (unsigned char)DEBUG_LEVEL, iniFile);
394  if(::pthread_create(&threadDataClient, NULL, &HCE::handlers::DataClientData::main, &handlerDataClientData)){
395  logger.log(HCE::LoggerStream::Priority::PRIO_FATAL) << "Error create thread [" << handlerDataClientData.getName() << "]!" << HCE::flush;
396  }else{
398  ::pthread_join(threadAdmin, NULL);
399  logger.log(HCE::LoggerStream::Priority::PRIO_DEBUG) << "The thread [" << handlerAdmin.getName() << "] was joined!" << HCE::flush;
400 
402  handlerDataClientData.shutdown();
404  ::pthread_join(threadDataClient, NULL);
405  logger.log(HCE::LoggerStream::Priority::PRIO_DEBUG) << "The thread [" << handlerDataClientData.getName() << "] was joined!" << HCE::flush;
406 
408  handlerDataProcessorData.shutdown();
410  ::pthread_join(threadDataProcessor, NULL);
411  logger.log(HCE::LoggerStream::Priority::PRIO_DEBUG) << "The thread [" << handlerDataProcessorData.getName() << "] was joined!" << HCE::flush;
412  }
413  }
414  break;
415  }
416  case 3: {
418  logger.log(HCE::LoggerStream::Priority::PRIO_INFORMATION) << std::setfill('0') << std::setw(8) << (int)(s_clock() - startMark) << " [] NODE IN ROUTER MODE" << HCE::flush;
420  HCE::handlers::DataReducerProxy handlerDataReducerProxy(std::string("DataReducerProxy"), context, nodeName, nodeMode, startMark, (unsigned char)DEBUG_LEVEL, iniFile);
421  if(::pthread_create(&threadDataReducer, NULL, &HCE::handlers::DataReducerProxy::main, &handlerDataReducerProxy)){
422  logger.log(HCE::LoggerStream::Priority::PRIO_FATAL) << "Error create thread [" << handlerDataReducerProxy.getName() << "]!" << HCE::flush;
423  }else{
425  s_sleep(100);
427  HCE::handlers::DataServerProxy handlerDataServerProxy(std::string("DataServerProxy"), context, nodeName, dataServerConnection, nodeMode, startMark, (unsigned char)DEBUG_LEVEL, iniFile);
428  if(::pthread_create(&threadDataServer, NULL, &HCE::handlers::DataServerProxy::main, &handlerDataServerProxy)){
429  logger.log(HCE::LoggerStream::Priority::PRIO_FATAL) << "Error create thread [" << handlerDataServerProxy.getName() << "]!" << HCE::flush;
430  }else{
432  s_sleep(100);
434  HCE::handlers::RouterServerProxy handlerRouterServerProxy(std::string("RouterServerProxy"), context, nodeName, routerServerConnection, startMark, (unsigned char)DEBUG_LEVEL, iniFile);
435  if(::pthread_create(&threadRouterServer, NULL, &HCE::handlers::RouterServerProxy::main, &handlerRouterServerProxy)){
436  logger.log(HCE::LoggerStream::Priority::PRIO_FATAL) << "Error create thread [" << handlerRouterServerProxy.getName() << "]!" << HCE::flush;
437  }else{
439  ::pthread_join(threadAdmin, NULL);
440  logger.log(HCE::LoggerStream::Priority::PRIO_DEBUG) << "The thread [" << handlerAdmin.getName() << "] was joined!" << HCE::flush;
442  handlerRouterServerProxy.shutdown();
443  handlerDataServerProxy.shutdown();
444  handlerDataReducerProxy.shutdown();
446  ::pthread_join(threadRouterServer, NULL);
447  logger.log(HCE::LoggerStream::Priority::PRIO_DEBUG) << "The thread [" << handlerRouterServerProxy.getName() << "] was joined!" << HCE::flush;
449  ::pthread_join(threadDataServer, NULL);
450  logger.log(HCE::LoggerStream::Priority::PRIO_DEBUG) << "The thread [" << handlerDataServerProxy.getName() << "] was joined!" << HCE::flush;
452  ::pthread_join(threadDataReducer, NULL);
453  logger.log(HCE::LoggerStream::Priority::PRIO_DEBUG) << "The thread [" << handlerDataReducerProxy.getName() << "] was joined!" << HCE::flush;
454  }
455  }
456  }
457  break;
458  }
459  default: {
460  logger.log(HCE::LoggerStream::Priority::PRIO_FATAL) << "Unsupported node mode" << HCE::flush;
461  break;
462  }
463  }
464  }
465 
466  logger.log(HCE::LoggerStream::Priority::PRIO_INFORMATION) << std::setfill('0') << std::setw(8) << (int)(s_clock() - startMark) << " [] Node was shutdown normally" << HCE::flush;
467 
468  return 0;
469 }