30 std::stringstream outMsg;
31 outMsg <<
"Application recieved signal number: " << sig;
32 Poco::Logger::root().log(Poco::Message(
"HCE cluster node", outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
43 struct sigaction newAction, oldAction;
45 newAction.sa_flags = 0;
46 sigemptyset(&newAction.sa_mask);
48 sigaction (SIGHUP, 0, &oldAction);
49 if (oldAction.sa_handler != SIG_IGN)
50 sigaction (SIGHUP, &newAction, 0);
52 sigaction (SIGTERM, 0, &oldAction);
53 if (oldAction.sa_handler != SIG_IGN)
54 sigaction (SIGTERM, &newAction, 0);
57 sigemptyset (&blockMask);
58 sigaddset (&blockMask, SIGCHLD);
59 sigaddset (&blockMask, SIGTSTP);
60 sigaddset (&blockMask, SIGTTOU);
61 sigaddset (&blockMask, SIGTTIN);
62 sigprocmask (SIG_BLOCK, &blockMask, 0);
79 case -1: exit(EXIT_FAILURE);
93 if(close(STDIN_FILENO) != -1)
94 if(close(STDOUT_FILENO) != -1)
95 if(close(STDERR_FILENO) != -1)
96 if(open(
"/dev/null",O_RDONLY) != -1)
97 if(open(
"/dev/null",O_WRONLY) != -1)
111 int main(
int argc,
char* argv[]){
113 std::map<std::string, std::string> actualOptions;
114 actualOptions.insert(std::pair<std::string, std::string>(
NODE_CLI_NAME,
""));
115 actualOptions.insert(std::pair<std::string, std::string>(
NODE_CLI_ADMIN,
"tcp://*:5546"));
116 actualOptions.insert(std::pair<std::string, std::string>(
NODE_CLI_CLIENT,
"tcp://*:5556"));
117 actualOptions.insert(std::pair<std::string, std::string>(
NODE_CLI_SERVER,
"tcp://*:5557"));
118 actualOptions.insert(std::pair<std::string, std::string>(
NODE_CLI_DATAMODE,
"1"));
119 actualOptions.insert(std::pair<std::string, std::string>(
NODE_CLI_ROLE,
"router"));
120 actualOptions.insert(std::pair<std::string, std::string>(
NODE_CLI_INI_FILE,
""));
121 actualOptions.insert(std::pair<std::string, std::string>(
NODE_CLI_LOG_MODE,
"hce-node_log.ini"));
122 actualOptions.insert(std::pair<std::string, std::string>(
NODE_CLI_HELP,
" "));
123 actualOptions.insert(std::pair<std::string, std::string>(
NODE_CLI_VERSION,
" "));
124 actualOptions.insert(std::pair<std::string, std::string>(
NODE_CLI_DEMONIZE,
"0"));
127 std::map<std::string, int> nodeModes;
137 Poco::Util::OptionSet knownOptions;
143 knownOptions.addOption(Poco::Util::Option(
NODE_CLI_ROLE,
"r",
"node role: router, smanager, rmanager, replica",
false,
NODE_CLI_ROLE));
151 Poco::Util::OptionProcessor
op(knownOptions);
152 op.setUnixStyle(
true);
153 for(
int i = 1; i < argc; i++){
157 if(op.process(std::string(argv[i]), name, value)){
160 std::map<std::string, std::string>::iterator actualOption;
161 actualOption = actualOptions.find(name);
162 if(actualOption != actualOptions.end()){
164 actualOption->second = value;
168 }
catch(Poco::Exception& exc){
169 std::cerr << exc.displayText() <<
std::endl;
174 std::map<std::string, std::string>::iterator actualOption;
177 if(actualOption != actualOptions.end() && actualOption->second ==
""){
180 <<
"=<0 - randomizer, 1 - real data source>] " <<
"[--" <<
NODE_CLI_ROLE <<
"=<router | smanager | rmanager | replica> [--" <<
NODE_CLI_INI_FILE <<
"=<iniFile>] [--"
188 if(actualOption != actualOptions.end() && actualOption->second ==
""){
195 if(actualOption != actualOptions.end() && actualOption->second ==
""){
198 if(::uname(&name) > -1){
199 actualOption->second = name.nodename;
210 Poco::Util::IniFileConfiguration::Keys loggers;
216 std::string adminConnection = actualOptions.find(
NODE_CLI_ADMIN)->second;
217 unsigned int nodeMode = nodeModes.find(actualOptions.find(
NODE_CLI_ROLE)->second)->second;
219 std::string dataServerConnection =
"0";
220 std::string dataClientConnection =
"0";
221 std::string routerServerConnection =
"0";
238 std::string iniFile = iniFilePath.absolute().toString();
240 unsigned int clientDataMode = 1;
241 std::istringstream(actualOptions.find(
NODE_CLI_DATAMODE)->second) >> clientDataMode;
243 std::string configFileName;
244 std::istringstream(actualOptions.find(
NODE_CLI_LOG_MODE)->second) >> configFileName;
251 if (!loggerConfigLoader.loadConfig(configFileName))
253 std::cerr << loggerConfigLoader.getErrorMsg() <<
std::endl;
254 std::cerr <<
"Logger ini file load error, possible file not found, not proper format or another cause..." <<
std::endl <<
"Node not started!" <<
std::endl;
259 unsigned int clientDemonize = 1;
260 std::istringstream(actualOptions.find(
NODE_CLI_DEMONIZE)->second) >> clientDemonize;
262 unsigned int DEBUG_LEVEL = loggerConfigLoader.getDebugLevel();
267 if (clientDemonize == 1) {
269 if (daemon(0, 0) == 0) {
270 logger.
log(HCE::LoggerStream::Priority::PRIO_INFORMATION) <<
"Demonization success..." <<
HCE::flush;
273 std::cerr <<
"Demonization fail: " << strerror(errno) <<
std::endl;
294 s_version_assert(2, 1);
296 int64_t startMark = s_clock();
298 srandom((
unsigned)
time(NULL));
303 pthread_t threadDataServer = 0;
304 pthread_t threadDataProcessor = 0;
305 pthread_t threadDataClient = 0;
306 pthread_t threadDataReducer = 0;
307 pthread_t threadRouterServer = 0;
308 pthread_t threadAdmin = 0;
311 logger.
log(HCE::LoggerStream::Priority::PRIO_INFORMATION)
312 << std::setfill(
'0') << std::setw(8) << (int)(s_clock() - startMark) <<
" [] HCE cluster node, version " <<
APP_VERSION <<
HCE::flush;
315 logger.
log(HCE::LoggerStream::Priority::PRIO_INFORMATION)
316 << std::setfill(
'0') << std::setw(8) << (int)(s_clock() - startMark) <<
" [] ADMIN handler creation..." <<
HCE::flush;
318 HCE::handlers::Admin handlerAdmin(std::string(
"Admin"), context, nodeName, adminConnection, startMark, static_cast<unsigned char>(DEBUG_LEVEL), iniFile);
319 pHandlerAdmin = &handlerAdmin;
322 logger.
log(HCE::LoggerStream::Priority::PRIO_FATAL) <<
"Error create thread [" << handlerAdmin.
getName() <<
"]!" <<
HCE::flush;
333 logger.
log(HCE::LoggerStream::Priority::PRIO_INFORMATION) << std::setfill(
'0') << std::setw(8) << (int)(s_clock() - startMark)
334 <<
" [] NODE IN SHARD MANAGER MODE - sends requests to all connected data nodes multicast way" <<
HCE::flush;
337 logger.
log(HCE::LoggerStream::Priority::PRIO_INFORMATION) << std::setfill(
'0') << std::setw(8) << (int)(s_clock() - startMark)
338 <<
" [] NODE IN REPLICA MANAGER MODE - sends requests to only one of connected data nodes round-robin way" <<
HCE::flush;
341 HCE::handlers::DataReducerProxy handlerDataReducerProxy(std::string(
"DataReducerProxy"), context, nodeName, nodeMode, startMark, (
unsigned char)DEBUG_LEVEL, iniFile);
343 logger.
log(HCE::LoggerStream::Priority::PRIO_FATAL) <<
"Error create thread [" << handlerDataReducerProxy.
getName() <<
"]!" <<
HCE::flush;
348 HCE::handlers::DataServerProxy handlerDataServerProxy(std::string(
"DataServerProxy"), context, nodeName, dataServerConnection, nodeMode, startMark, (
unsigned char)DEBUG_LEVEL, iniFile);
350 logger.
log(HCE::LoggerStream::Priority::PRIO_FATAL) <<
"Error create thread [" << handlerDataServerProxy.
getName() <<
"]!" <<
HCE::flush;
355 HCE::handlers::DataClientProxy handlerDataClientProxy(std::string(
"DataClientProxy"), context, nodeName, dataClientConnection, startMark, (
unsigned char)DEBUG_LEVEL, iniFile);
357 logger.
log(HCE::LoggerStream::Priority::PRIO_FATAL) <<
"Error create thread [" << handlerDataClientProxy.
getName() <<
"]!" <<
HCE::flush;
360 ::pthread_join(threadAdmin, NULL);
361 logger.
log(HCE::LoggerStream::Priority::PRIO_DEBUG) <<
"The thread [" << pHandlerAdmin->
getName() <<
"] was joined!" <<
HCE::flush;
368 ::pthread_join(threadDataClient, NULL);
369 logger.
log(HCE::LoggerStream::Priority::PRIO_DEBUG) <<
"The thread [" << handlerDataClientProxy.
getName() <<
"] was joined!" <<
HCE::flush;
371 ::pthread_join(threadDataServer, NULL);
372 logger.
log(HCE::LoggerStream::Priority::PRIO_DEBUG) <<
"The thread [" << handlerDataServerProxy.
getName() <<
"] was joined!" <<
HCE::flush;
374 ::pthread_join(threadDataReducer, NULL);
375 logger.
log(HCE::LoggerStream::Priority::PRIO_DEBUG) <<
"The thread [" << handlerDataReducerProxy.
getName() <<
"] was joined!" <<
HCE::flush;
383 logger.
log(HCE::LoggerStream::Priority::PRIO_INFORMATION) << std::setfill(
'0') << std::setw(8) << (int)(s_clock() - startMark) <<
" [] NODE IN DATA MODE" <<
HCE::flush;
385 HCE::handlers::DataProcessorData handlerDataProcessorData(nodeName, std::string(
"DataProcessorData"), context, startMark, (
unsigned char)DEBUG_LEVEL, iniFile);
387 logger.
log(HCE::LoggerStream::Priority::PRIO_FATAL) <<
"Error create thread [" << handlerDataProcessorData.
getName() <<
"]!" <<
HCE::flush;
392 HCE::handlers::DataClientData handlerDataClientData(std::string(
"DataClientData"), context, nodeName, dataClientConnection, (
unsigned char)clientDataMode, startMark,
393 (
unsigned char)DEBUG_LEVEL, iniFile);
395 logger.
log(HCE::LoggerStream::Priority::PRIO_FATAL) <<
"Error create thread [" << handlerDataClientData.
getName() <<
"]!" <<
HCE::flush;
398 ::pthread_join(threadAdmin, NULL);
399 logger.
log(HCE::LoggerStream::Priority::PRIO_DEBUG) <<
"The thread [" << handlerAdmin.
getName() <<
"] was joined!" <<
HCE::flush;
404 ::pthread_join(threadDataClient, NULL);
405 logger.
log(HCE::LoggerStream::Priority::PRIO_DEBUG) <<
"The thread [" << handlerDataClientData.
getName() <<
"] was joined!" <<
HCE::flush;
408 handlerDataProcessorData.
shutdown();
410 ::pthread_join(threadDataProcessor, NULL);
411 logger.
log(HCE::LoggerStream::Priority::PRIO_DEBUG) <<
"The thread [" << handlerDataProcessorData.
getName() <<
"] was joined!" <<
HCE::flush;
418 logger.
log(HCE::LoggerStream::Priority::PRIO_INFORMATION) << std::setfill(
'0') << std::setw(8) << (int)(s_clock() - startMark) <<
" [] NODE IN ROUTER MODE" <<
HCE::flush;
420 HCE::handlers::DataReducerProxy handlerDataReducerProxy(std::string(
"DataReducerProxy"), context, nodeName, nodeMode, startMark, (
unsigned char)DEBUG_LEVEL, iniFile);
422 logger.
log(HCE::LoggerStream::Priority::PRIO_FATAL) <<
"Error create thread [" << handlerDataReducerProxy.
getName() <<
"]!" <<
HCE::flush;
427 HCE::handlers::DataServerProxy handlerDataServerProxy(std::string(
"DataServerProxy"), context, nodeName, dataServerConnection, nodeMode, startMark, (
unsigned char)DEBUG_LEVEL, iniFile);
429 logger.
log(HCE::LoggerStream::Priority::PRIO_FATAL) <<
"Error create thread [" << handlerDataServerProxy.
getName() <<
"]!" <<
HCE::flush;
434 HCE::handlers::RouterServerProxy handlerRouterServerProxy(std::string(
"RouterServerProxy"), context, nodeName, routerServerConnection, startMark, (
unsigned char)DEBUG_LEVEL, iniFile);
436 logger.
log(HCE::LoggerStream::Priority::PRIO_FATAL) <<
"Error create thread [" << handlerRouterServerProxy.
getName() <<
"]!" <<
HCE::flush;
439 ::pthread_join(threadAdmin, NULL);
440 logger.
log(HCE::LoggerStream::Priority::PRIO_DEBUG) <<
"The thread [" << handlerAdmin.
getName() <<
"] was joined!" <<
HCE::flush;
442 handlerRouterServerProxy.
shutdown();
446 ::pthread_join(threadRouterServer, NULL);
447 logger.
log(HCE::LoggerStream::Priority::PRIO_DEBUG) <<
"The thread [" << handlerRouterServerProxy.
getName() <<
"] was joined!" <<
HCE::flush;
449 ::pthread_join(threadDataServer, NULL);
450 logger.
log(HCE::LoggerStream::Priority::PRIO_DEBUG) <<
"The thread [" << handlerDataServerProxy.
getName() <<
"] was joined!" <<
HCE::flush;
452 ::pthread_join(threadDataReducer, NULL);
453 logger.
log(HCE::LoggerStream::Priority::PRIO_DEBUG) <<
"The thread [" << handlerDataReducerProxy.
getName() <<
"] was joined!" <<
HCE::flush;
460 logger.
log(HCE::LoggerStream::Priority::PRIO_FATAL) <<
"Unsupported node mode" <<
HCE::flush;
466 logger.
log(HCE::LoggerStream::Priority::PRIO_INFORMATION) << std::setfill(
'0') << std::setw(8) << (int)(s_clock() - startMark) <<
" [] Node was shutdown normally" <<
HCE::flush;