hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Admin.cpp
Go to the documentation of this file.
1 #include "Admin.hpp"
3 
4 namespace HCE {
5  namespace handlers {
6 
7  const unsigned char ADMIN_REQUEST_TYPE_UNDEFINED = 0;
8  const unsigned char ADMIN_REQUEST_TYPE_EXTERNAL = 1;
9  const unsigned char ADMIN_REQUEST_TYPE_INTERNAL = 2;
10 
11  Admin::Admin(std::string name, zmq::context_t& ctx, const std::string& clientIdentity, std::string connectionString,
12  int64_t startedAt, unsigned char logPriority, const std::string& iniFile) :
13  Handler(name, ctx, clientIdentity, connectionString, startedAt, logPriority, iniFile), _adminServerSock(nullptr), _clientsManagerInproc(nullptr){
14 
16  initialize();
17  }
18 
20  deinitialize();
21  }
22 
23  int Admin::initialize(void){
24  logger.log(NODE_LOG_MODE_INFORMATION) << "Initialize client [" << getClientIdentity() << "] with connection [" << getConnectionString() << "]" << flush;
25 
28 
31  //Fatal error, can't bind admin port, abort
33  }
34 
37 
41 
42  return true;
43  }
44 
45  void Admin::deinitialize(void){
46  if(_adminServerSock){
49  }
50  delete _adminServerSock;
51  _adminServerSock = NULL;
52  }
53 
54  _queueInproc.clear();
55 
57  delete _clientsManagerInproc;
58  }
59  }
60 
61  void Admin::setStatRequestsTotal(unsigned int statRequestsTotal){
63  }
64 
65  unsigned int Admin::getStatRequestsTotal(void){
67  }
68 
71  zmsg msg(*_inprocAdminSock);
72 
73  logger.log(NODE_LOG_MODE_TRACE) << "MSG FROM inproc_admin\n" << msg.dump() << flush;
74 
76  if(msg.parts() == NODE_MSG_DATA_MIN_FIELDS){
79  }else if(msg.parts() > NODE_MSG_DATA_MIN_FIELDS){
82  }else{
83  logger.log(NODE_LOG_MODE_TRACE) << "ERROR of message structure inproc_admin, wrong fields number " << msg.parts()
84  << endl << msg.dump() << flush;
85  }
86  }
87 
88  void Admin::registerClientInproc(const std::string& identity){
89  _clientsManagerInproc->remove(identity);
90  _clientsManagerInproc->append(identity);
91  logger.log(NODE_LOG_MODE_DEBUG) << "READY message inproc from client [" << identity << "] received" << flush;
92  }
93 
96  std::string clientIdentity((const char*)msg.pop_front().c_str());
97  std::string addr((msg.address())?msg.address():"");
98 
99  if(msg.address() == NODE_MSG_READY){
100  registerClientInproc(clientIdentity);
101  }else if(addr.compare(0, NODE_MSG_PROPERTIES.length(), NODE_MSG_PROPERTIES)==0){
103  HCE::handlers::PropertyMessage propertyMessage;
104  propertyMessage.extract(addr);
105  if (!propertyMessage.isError()){
106  _clientsManagerInproc->refresh(clientIdentity, propertyMessage.getProperty());
107  logger.log(NODE_LOG_MODE_TRACE) << "Received property message from client [" << clientIdentity << "] " << flush;
108  }else{
110  logger.log(NODE_LOG_MODE_ERROR) << "Operation extract property message from client [" << clientIdentity << "] has error: " << propertyMessage.getErrorMsg() << flush;
111  logger.log(NODE_LOG_MODE_TRACE) << msg.dump() << flush;
112  }
113  }else{
115  logger.log(NODE_LOG_MODE_ERROR) << "INVALID inproc control message from client [" << clientIdentity << "]" << flush;
116  logger.log(NODE_LOG_MODE_TRACE) << msg.dump() << flush;
117  }
118  }
119 
120  std::vector<std::string> Admin::parseAdminRequest(const std::string& command){
122  std::vector<std::string> parts;
124  std::istringstream partsStream(command);
125  std::string tmpString;
126  while(std::getline(partsStream, tmpString, NODE_ADMIN_COMMAND_DELIMITER)){
127  parts.push_back(tmpString);
128  }
129 
130  return parts;
131  }
132 
133  unsigned char Admin::processAdminRequest(zmsg& msg){
134  unsigned char ret = ADMIN_REQUEST_TYPE_UNDEFINED;
135 
137  std::string clientIdentity((const char*)msg.pop_front().c_str());
138 
140  std::string messageId((const char*)msg.pop_front().c_str());
142  std::string messageBody((const char*)msg.pop_front().c_str());
143  logger.log(NODE_LOG_MODE_DEBUG)<< "RECEIVED REQUEST from adminserver client [" << clientIdentity << "], message [" << messageId << "] body " << messageBody << flush;
144 
146  std::vector<std::string> requestItems = parseAdminRequest(messageBody);
147  if (requestItems.size() < 3){
148  logger.log(NODE_LOG_MODE_ERROR) << "Received admin request has wrong items count = " << requestItems.size() << flush;
149  }
150 
152  if(requestItems.size() > 2){
153  logger.log(NODE_LOG_MODE_TRACE) << "Admin::processAdminRequest requestItems[0]: " << requestItems[0] << flush;
154  logger.log(NODE_LOG_MODE_TRACE) << "Admin::processAdminRequest requestItems[1]: " << requestItems[1] << flush;
155  logger.log(NODE_LOG_MODE_TRACE) << "Admin::processAdminRequest requestItems[2]: " << requestItems[2] << flush;
156 
158  AdminCommandParameters adminCommandParameters(requestItems[2]);
159  if (adminCommandParameters.isError()){
160  logger.log(NODE_LOG_MODE_ERROR) << "Operation extract of command parameters has error: " << adminCommandParameters.getErrorMsg() << flush;
161  }else{
162  logger.log(NODE_LOG_MODE_TRACE) << "Admin::processAdminRequest adminCommandParameters.getParameters(): " << adminCommandParameters.getParameters() << flush;
163  logger.log(NODE_LOG_MODE_TRACE) << "Admin::processAdminRequest adminCommandParameters.getRealtime(): " << adminCommandParameters.getRealtime() << flush;
164  logger.log(NODE_LOG_MODE_TRACE) << "Admin::processAdminRequest adminCommandParameters.isRealtime(): " << std::boolalpha << adminCommandParameters.isRealtime() << flush;
165  }
166 
167  if (adminCommandParameters.isJsonString() && requestItems[1]==NODE_MSG_PROPERTIES)
168  requestItems[2] = adminCommandParameters.getParameters();
169 
170  if(((_clientsManagerInproc->exists(std::string(requestItems[0].c_str())) && requestItems[1]!=NODE_MSG_PROPERTIES) ||
171  (_clientsManagerInproc->exists(std::string(requestItems[0].c_str())) && requestItems[1]==NODE_MSG_PROPERTIES && adminCommandParameters.isRealtime()))){
172 
175  msg.append(requestItems[0].c_str());
177  msg.append(messageId.c_str());
179  msg.append(requestItems[1].c_str());
181  msg.append(requestItems[2].c_str());
183  msg.append(clientIdentity.c_str());
184 
185  logger.log(NODE_LOG_MODE_DEBUG) << "External admin command message prepared for client [" << requestItems[0].c_str() << "], command ["
186  << requestItems[1].c_str() << "], params [" << requestItems[2].c_str() << "] " << " bytes" << flush;
187 
189 
190  }else if (_clientsManagerInproc->exists(std::string(requestItems[0].c_str())) && !adminCommandParameters.isRealtime()){
191 
193  std::string responseMsg = _clientsManagerInproc->getProperty(requestItems[0]);
195  msg.append(clientIdentity.c_str());
197  msg.append(messageId.c_str());
199  msg.append(responseMsg.c_str());
200 
202 
203  }else if(requestItems[0] == "" || requestItems[0] == NODE_ADMIN_HANDLER){
204 
206  std::string responseMsg = processAdminCommands(requestItems[1], requestItems[2]);
207 
208  if(responseMsg == ""){
211  logger.log(NODE_LOG_MODE_DEBUG) << "Internal admin command [" << requestItems[1].c_str() << "], params [" << requestItems[2].c_str() << "] " << " not supported" << flush;
212  }else{
214  logger.log(NODE_LOG_MODE_DEBUG) << "Internal admin command processed successfully" << flush;
215  }
217  msg.append(clientIdentity.c_str());
219  msg.append(messageId.c_str());
221  msg.append(responseMsg.c_str());
222 
223  }else{
225  std::string responseMsg = NODE_ADMIN_ERROR_ERROR + NODE_ADMIN_COMMAND_DELIMITER + NODE_ADMIN_ERROR_HANDLER_NOT_FOUND + " [" + requestItems[0] + "]";
226 
227  logger.log(NODE_LOG_MODE_DEBUG) << "Handler [" << requestItems[0] << "] not registered or not connected to admin server." << flush;
228 
230  msg.append(clientIdentity.c_str());
232  msg.append(messageId.c_str());
234  msg.append(responseMsg.c_str());
235  }
236  }else{
239  msg.append(clientIdentity.c_str());
241  msg.append(messageId.c_str());
244 
245  logger.log(NODE_LOG_MODE_DEBUG) << "ADMIN REQUEST MESSAGE ERROR - items " << requestItems.size() << " " << NODE_ADMIN_ERROR_WRONG_FORMAT << flush;
246  }
247 
248  logger.log(NODE_LOG_MODE_TRACE) << "MSG FROM processAdminRequest():" << endl << msg.dump() << flush;
249 
251  unsigned int statRequestsTotal = getStatRequestsTotal();
252  setStatRequestsTotal(++statRequestsTotal);
253 
254  return ret;
255  }
256 
257  void Admin::stopClients(void){
259  for(std::vector<ClientWorkerItem>::iterator it = _queueInproc.begin(); it < _queueInproc.end(); ++it){
261  zmsg clientMsg(getClientIdentity().c_str()); //Source handler identity
262  clientMsg.wrap(it->identity.c_str(), NULL); //Destination handler identity
263  clientMsg.append(NODE_MSG_STOP.c_str()); //Message Id
264  clientMsg.append(NODE_MSG_STOP.c_str()); //Message body
265  clientMsg.append(NODE_ADMIN_HANDLER.c_str()); //Message source identity
266 
268  clientMsg.send(*_inprocAdminSock);
269 
270  logger.log(NODE_LOG_MODE_DEBUG) << "STOP REQUEST SENT to _adminServerSock for client [" << it->identity << "]" << flush;
271  }
272  }
273 
274  std::string Admin::processAdminCommands(std::string& command, std::string& parameters){
275  std::string ret = "";
276 
277  logger.log(NODE_LOG_MODE_DEBUG) << "Admin internal command [" << command << "], parameters [" << parameters << "]" << flush;
278 
280  if(command == NODE_MSG_SHUTDOWN){
282  unsigned int timeout = 0;
283  std::istringstream(parameters) >> timeout;
284  if(timeout > 0){
286  s_sleep(timeout);
287  }
288 
290  stopClients();
291 
293  setInProgress(false);
294 
295  ret = NODE_ADMIN_ERROR_OK;
296 
297  logger.log(NODE_LOG_MODE_DEBUG) << "Done with timeout " << timeout << " ms" << flush;
298  }else if(command == NODE_MSG_ECHO){
299  std::stringstream result;
300  result << NODE_ADMIN_ERROR_OK << NODE_ADMIN_COMMAND_DELIMITER << "command=" << NODE_MSG_ECHO;
301  ret = result.str();
302 
303  logger.log(NODE_LOG_MODE_DEBUG) << "Done" << flush;
304  }else if(command == NODE_MSG_TIME){
305  std::stringstream timeStream;
306  timeStream << (unsigned int)(s_clock() - getStartedAt());
307 
308  std::stringstream result;
309  result << NODE_ADMIN_ERROR_OK << NODE_ADMIN_COMMAND_DELIMITER << "time=" << timeStream.str();
310  ret = result.str();
311 
312  logger.log(NODE_LOG_MODE_DEBUG) << "Done" << flush;
313  }else if(command == NODE_MSG_PROPERTIES){
315  ret = getPropertyInformation();
316  }else if(command == NODE_MSG_LOG_LEVEL_GET){
318  }else if(command == NODE_MSG_LOG_LEVEL_SET){
320  unsigned int logp = getLogPriority();
321  //Parse log priority from parameters
322  std::istringstream(parameters) >> logp;
326  setLogPriority(logp);
327  Poco::Logger::root().setLevel(logp);
328  }else if(command == NODE_MSG_POLL_TIMEOUT_GET){
330  }else if(command == NODE_MSG_POLL_TIMEOUT_SET){
332  unsigned int pollTimeout = getPollTimeout();
334  std::istringstream(parameters) >> pollTimeout;
337  //Set new poll timeout value
338  setPollTimeout(pollTimeout*1000);
339  }else if(command == NODE_MSG_DUMP_INTERVAL_GET){
341  }else if(command == NODE_MSG_DUMP_INTERVAL_SET){
343  unsigned int dumpInterval = getDumpInterval();
345  std::istringstream(parameters) >> dumpInterval;
349  setDumpInterval(dumpInterval*1000);
350  }
351 
352  return ret;
353  }
354 
356  std::string Admin::getPropertyInformation(void){
357  std::stringstream result;
358  result << NODE_ADMIN_ERROR_OK
359  << NODE_ADMIN_COMMAND_DELIMITER << getProperties() << "clientIdentity=" << getClientIdentity()
361  << NODE_ADMIN_COMMAND_DELIMITER << "requestsTotal=" << getStatRequestsTotal();
362  return result.str();
363  }
364 
366  unsigned char destinationType = processAdminRequest(msg);
367 
368  if(destinationType == ADMIN_REQUEST_TYPE_EXTERNAL){
370  msg.send(*_inprocAdminSock);
371 
372  logger.log(NODE_LOG_MODE_DEBUG) << "REQUEST SENT to inproc_admin socket to execute externally" << flush;
373  }else if(destinationType == ADMIN_REQUEST_TYPE_INTERNAL){
375  msg.send(*_adminServerSock);
376 
377  logger.log(NODE_LOG_MODE_DEBUG) << "REQUEST was executed internally" << flush;
378  }else{
380  msg.send(*_adminServerSock);
381 
382  logger.log(NODE_LOG_MODE_DEBUG) << "REQUEST execution error : wrong format of requested command or another parse error" << flush;
383  }
384  }
385 
388  std::string clientIdentity((const char*)msg.pop_front().c_str());
390  std::string messageId((const char*)msg.pop_front().c_str());
392  std::string messageBody((const char*)msg.pop_front().c_str());
394  std::string sourceClientIdentity((const char*)msg.pop_front().c_str());
395 
396  logger.log(NODE_LOG_MODE_DEBUG) << "RECEIVED RESPONSE from inproc client [" << clientIdentity << "], message [" << messageId << "] body [" << messageBody << "]" << flush;
397 
399  zmsg responseMsg;
401  responseMsg.append(sourceClientIdentity.c_str());
403  responseMsg.append(messageId.c_str());
405  responseMsg.append(messageBody.c_str());
406 
408  responseMsg.send(*_adminServerSock);
409 
410  logger.log(NODE_LOG_MODE_DEBUG) << "RESPONSE SENT to '_adminServerSock'" << flush;
411  logger.log(NODE_LOG_MODE_DEBUG) << "===================================" << flush;
412  logger.log(NODE_LOG_MODE_DEBUG) << "sourceClientIdentity: '" << sourceClientIdentity << "'" << flush;
413  logger.log(NODE_LOG_MODE_DEBUG) << "messageId: '" << messageId << "'" << flush;
414  logger.log(NODE_LOG_MODE_DEBUG) << "messageBody: '" << messageBody << "'" << flush;
415  logger.log(NODE_LOG_MODE_DEBUG) << "===================================" << flush;
416  }
417 
420  zmsg msg(*_adminServerSock);
421 
422  logger.log(NODE_LOG_MODE_TRACE) << "MSG FROM adminserver" << endl << msg.dump() << flush;
423 
425  if(msg.parts() > NODE_MSG_DATA_MIN_FIELDS){
427  processDataMessage(msg);
428  }else{
429  logger.log(NODE_LOG_MODE_TRACE) << "ERROR of message structure adminserver, wrong fields number " << msg.parts()
430  << endl << msg.dump() << flush;
431  }
432  }
433 
434  void* Admin::process(void){
437 
439  while(getInProgress()){
440  try {
442  zmq::pollitem_t items[] = {{*_adminServerSock, 0, ZMQ_POLLIN, 0}, {*_inprocAdminSock, 0, ZMQ_POLLIN, 0}};
443 
445  zmq::poll(items, 2, getPollTimeout());
446 
448  if(items[0].revents & ZMQ_POLLIN){
450  }
451 
453  if(items[1].revents & ZMQ_POLLIN){
455  }
456 
459 
460  }catch(zmq::error_t& error){
461  if (EINTR!=error.num()){
462  std::cout << "Admin E: " << error.what() << std::endl;
463  }
464  }catch(std::exception& e){
465  std::cout << "Admin E: " << e.what() << std::endl;
466  }catch(...){
467  std::cout << "Admin E: Unknown exception" << std::endl;
468  }
469 
470  }
471 
473  }else{
474  logger.log(NODE_LOG_MODE_FATAL) << "Admin::process() aborted because not all required sockets created : " << NODE_CRITICAL_ERROR_MESSAGE << flush;
475 
476  //TODO: possible more intelligent termination implementation
478  }
479 
480  return NULL;
481  }
482 
483  }
484 }