hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
RouterServerProxy.cpp
Go to the documentation of this file.
1 #include "RouterServerProxy.hpp"
2 #include "ServiceMessages.hpp"
3 
4 namespace HCE {
5  namespace handlers {
6 
7  RouterServerProxy::RouterServerProxy(std::string name, zmq::context_t& ctx, const std::string& clientIdentity, std::string connectionString,
8  int64_t startedAt, unsigned char logPriority, const std::string& iniFile) :
9  Handler(name, ctx, clientIdentity, connectionString, startedAt, logPriority, iniFile){
10 
12  initialize();
13  }
14 
16  deinitialize();
17  }
18 
21  _inprocProxySock = NULL;
22  _inprocReducerOutSock = NULL;
23  _routerServerSock = NULL;
24 
26  checkInit();
27 
29  setPstatsAt(s_clock() + LOG_STATISTIC_PERIOD);
30 
31  logger.log(NODE_LOG_MODE_INFORMATION) << "Initialize client [" << getClientIdentity() << "] with connection [" << getConnectionString() << "]" << flush;
32 
35 
42 
43  return true;
44  }
45 
47  if(_inprocProxySock){
48  delete _inprocProxySock;
49  }
51  delete _inprocReducerOutSock;
52  }
54  delete _routerServerSock;
55  }
56  _inprocProxySock = NULL;
57  _inprocReducerOutSock = NULL;
58  _routerServerSock = NULL;
59  _responses.clear();
60  }
61 
62  void RouterServerProxy::setStatRequests(unsigned int statRequests){
64  }
65 
68  }
69 
70  void RouterServerProxy::setStatResponses(unsigned int statResponses){
72  }
73 
76  }
77 
78  void RouterServerProxy::setStatRequestsSize(unsigned int statRequestsSize){
80  }
81 
84  }
85 
86  void RouterServerProxy::setStatResponsesSize(unsigned int statResponsesSize){
88  }
89 
92  }
93 
94  void RouterServerProxy::setStatRequestsProcessed(unsigned int statRequestsProcessed){
96  }
97 
100  }
101 
102  void RouterServerProxy::setStatResponsesProcessed(unsigned int statResponsesProcessed){
104  }
105 
108  }
109 
110  void RouterServerProxy::setStatRequestsProcessedSize(unsigned int statRequestsProcessedSize){
112  }
113 
116  }
117 
118  void RouterServerProxy::setStatResponsesProcessedSize(unsigned int statResponsesProcessedSize){
120  }
121 
124  }
125 
126  void RouterServerProxy::setStatQPS(float statQPS){
128  }
129 
132  }
133 
134  void RouterServerProxy::setStatRPS(float statRPS){
136  }
137 
140  }
141 
142  void RouterServerProxy::setStatRequestsBandwidth(float statRequestsBandwidth){
144  }
145 
148  }
149 
150  void RouterServerProxy::setStatResponsesBandwidth(float statResponsesBandwidth){
152  }
153 
156  }
157 
158  void RouterServerProxy::setPstatsAt(int64_t pstatsAt){
160  }
161 
164  }
165 
169  setStatRequests(0);
170 
172  setStatResponses(0);
173 
176 
179 
182 
185 
188 
191 
193  setStatQPS(0);
194 
196  setStatRPS(0);
197 
200 
203  }
204 
208  logger.log(NODE_LOG_MODE_TRACE) << " MSG FROM inproc_reducer_out" << endl << msg.dump() << flush;
209 
210  unsigned int statResponses = getStatResponses();
211  setStatResponses(++statResponses);
212 
214  std::string messageId((const char*)msg.pop_front().c_str());
215  std::string messageBody((const char*)msg.pop_front().c_str());
216  std::string messageRoute((msg.parts())?(const char*)msg.pop_front().c_str():"");
217 
219  logger.log(NODE_LOG_MODE_DEBUG) << " RECEIVED RESPONSE from inproc_reducer_out message [" << messageId << "] body " << messageBody.size() << " bytes, n=" << getStatResponses() << flush;
221  std::vector<std::string> parts;
223  std::istringstream partsStream(messageId);
224  std::string tmpString;
225 
226  while(std::getline(partsStream, tmpString, NODE_ROUTER_MSG_DELIMITER)){
227  parts.push_back(tmpString);
228  }
229  if(parts.size() > 1){
231  msg.append(parts[0].c_str());
232  msg.append(parts[1].c_str());
233  msg.append(messageBody.c_str());
234 
235  logger.log(NODE_LOG_MODE_TRACE) << "MSG TO routerserver" << endl << msg.dump() << flush;
236 
238  msg.send(*_routerServerSock);
239 
240  logger.log(NODE_LOG_MODE_DEBUG) << "RESPONSE SENT to routerserver socket from proxy node, n=" << getStatResponses() << flush;
241  }else{
243  logger.log(NODE_LOG_MODE_ERROR) << "ERROR : Wrong message_Id format [" << messageId << "] received from client" << flush;
244  }
245  }
246 
249  zmsg clientMsg(*_routerServerSock);
250 
252  if(clientMsg.parts() > NODE_MSG_DATA_MIN_FIELDS){
253  logger.log(NODE_LOG_MODE_TRACE) << "MSG FROM routerserver\n" << clientMsg.dump() << flush;
254 
255  unsigned int statRequest = getStatRequests();
256  setStatRequests(++statRequest);
257 
259  std::string clientIdentity((const char*)clientMsg.pop_front().c_str());
260  std::string messageId((const char*)clientMsg.pop_front().c_str());
261  std::string messageBody((const char*)clientMsg.pop_front().c_str());
262  std::string messageRoute((clientMsg.parts())?(const char*)clientMsg.pop_front().c_str():"");
263 
264  logger.log(NODE_LOG_MODE_DEBUG) << "RouterServerProxy::handleExternalMessage Recieved body: \"" << messageBody << "\"" << flush;
265  logger.log(NODE_LOG_MODE_DEBUG) << "RouterServerProxy::handleExternalMessage Recieved route: \"" << messageRoute << "\"" << flush;
266 
268  logger.log(NODE_LOG_MODE_DEBUG) << "RECEIVED REQUEST from router`s client [" << getClientIdentity() << "], message [" << messageId << "] body "
269  << messageBody.size() << " bytes, n=" << getStatRequests() << flush;
270 
271  logger.log(NODE_LOG_MODE_DEBUG) << "RouterServerProxy::handleExternalMessage Recieved route: \"" << messageRoute << "\"" << flush;
272 
274  clientMsg.clear();
275  clientMsg.append(clientIdentity.c_str());
276  clientMsg.append((clientIdentity + NODE_ROUTER_MSG_DELIMITER + messageId).c_str());
277  clientMsg.append(messageBody.c_str());
278  clientMsg.append(messageRoute.c_str());
279 
281  logger.log(NODE_LOG_MODE_DEBUG) << " MSG TO inproc_proxy" << endl << clientMsg.dump() << flush;
282  }
283 
285  clientMsg.send(*_inprocProxySock);
286 
287  logger.log(NODE_LOG_MODE_DEBUG) << " REQUEST SENT to inproc_proxy socket, n=" << getStatRequests() << flush;
288  }else{
289  logger.log(NODE_LOG_MODE_ERROR) << "ERROR MESSAGE RECEIVED from routerserver, parts=" << clientMsg.parts() << ", n=" << getStatRequests()
290  << endl << clientMsg.dump() << flush;
291  }
292  }
293 
295 
297  zmsg msg(*_inprocProxySock);
298  logger.log(NODE_LOG_MODE_DEBUG) << " MSG FROM inproc_proxy" << endl << msg.dump() << flush;
299 
300  std::string addr((msg.address())?msg.address():"");
301  if (addr.compare(0, NODE_MSG_ROUTES.length(), NODE_MSG_ROUTES)==0)
302  {
303  std::vector<ClientWorkerItem> clients;
304  RouteMessage routeMessage(clients);
305  routeMessage.extract(NODE_MSG_ROUTES, addr);
306  if (routeMessage.isError())
307  logger.log(NODE_LOG_MODE_ERROR) << "Operation extract ROUTE MESSAGE has error: " << routeMessage.getErrorMsg() << flush;
308  else
309  {
310  std::vector<ClientWorkerItem> clientsQueue;
311  RouteMessage routesMsg(clientsQueue);
312  ClientsQueueManager clientsManager(&clientsQueue, 0);
313  clientsManager.refresh(getClientIdentity(), clients, "Here must be resources information...");
314  std::string route = routesMsg.build();
315  if (routesMsg.isError())
316  logger.log(NODE_LOG_MODE_ERROR) << "Operation build ROUTE MESSAGE has error: " << routesMsg.getErrorMsg() << flush;
317  else
318  {
319  logger.log(NODE_LOG_MODE_TRACE) << "RouterServerProxy::handleRouteMessage ROUTE_TABLE: " << route << flush;
320  s_send(*_routerServerSock, route);
321  }
322  }
323  }
324  }
325 
326  std::string RouterServerProxy::processAdminCommands(std::string& command, std::string& parameters){
327  std::string ret = NODE_ADMIN_ERROR_OK;
328 
329  logger.log(NODE_LOG_MODE_DEBUG) << "Admin command requested [" << command << "], parameters [" << parameters << "]" << flush;
330 
332  if(command == NODE_MSG_STAT){
333  std::stringstream result;
334  result << NODE_ADMIN_ERROR_OK << NODE_ADMIN_COMMAND_DELIMITER << "requests=" << getStatRequests() << ",responses=" << getStatResponses() << ",requestsSize=" << getStatRequestsSize()
335  << ",response_size=" << getStatResponsesSize();
336  ret = result.str();
337  }else if(command == NODE_MSG_rebuildServerConnection){
340  }else if(command == NODE_MSG_DISCONNECT_SERVER_CONNECTION){
343  }else if(command == NODE_MSG_UPDATE_SCHEMA){
344  setConnectionString(parameters);
345  }else if(command == NODE_MSG_PROPERTIES){
347  ret = getPropertyInformation();
348  }else{
351  }
352 
353  return ret;
354  }
355 
357  if(s_clock() > getPstatsAt()){
358  setStatQPS((getStatRequests() - getStatRequestsProcessed()) / (float)((STAT_INTERVAL) / 1000.0));
362 
367 
368  logger.log(NODE_LOG_MODE_DEBUG) << "requests=" << getStatRequests() << ", responses=" << getStatResponses() << ", qps=" << getStatQPS() << ", rps=" << getStatRPS()
369  << ", requestsSize=" << getStatRequestsSize() << ", responsesSize=" << getStatResponsesSize() << ", requestsBandwidth=" << getStatRequestsBandwidth()
370  << ", responsesBandwidth=" << getStatResponsesBandwidth() << flush;
371 
372  setPstatsAt(s_clock() + LOG_STATISTIC_PERIOD);
373  }
374  }
375 
378  std::stringstream result;
379  result << NODE_ADMIN_ERROR_OK
381  << NODE_ADMIN_COMMAND_DELIMITER << "requestsSize=" << getStatRequestsSize() << NODE_ADMIN_COMMAND_DELIMITER << "responseSize=" << getStatResponsesSize()
382  << NODE_ADMIN_COMMAND_DELIMITER << "clientIdentity=" << getClientIdentity() << NODE_ADMIN_COMMAND_DELIMITER //<< "heartbeatAt" << static_cast<uint64_t>(_heartbeatAt)
383  << NODE_ADMIN_COMMAND_DELIMITER << "pstatsAt=" << static_cast<uint64_t>(getPstatsAt()) << NODE_ADMIN_COMMAND_DELIMITER << "statQPS=" << getStatQPS()
384  << NODE_ADMIN_COMMAND_DELIMITER << "statRPS=" << getStatRPS() << NODE_ADMIN_COMMAND_DELIMITER << "statRequestsBandwidth=" << getStatRequestsBandwidth()
385  << NODE_ADMIN_COMMAND_DELIMITER << "statResponsesBandwidth=" << getStatResponsesBandwidth();
386  return result.str();
387  }
388 
391 
392  while(getInProgress()){
393  try {
395  std::vector<zmq::pollitem_t> items;
397  items ={{*_routerServerSock, 0, ZMQ_POLLIN, 0},{*_inprocReducerOutSock, 0, ZMQ_POLLIN, 0},{*_inprocAdminSock, 0, ZMQ_POLLIN, 0},{*_inprocProxySock, 0, ZMQ_POLLIN, 0}};
398  } else{
399  items ={{*_inprocReducerOutSock, 0, ZMQ_POLLIN, 0},{*_inprocAdminSock, 0, ZMQ_POLLIN, 0},{*_inprocProxySock, 0, ZMQ_POLLIN, 0}};
400  }
401 
403  zmq::poll(&items[0], items.size(), getPollTimeout());
404 
407  if(items[0].revents & ZMQ_POLLIN){
409  }
410  }
411 
413  if(items[items.size() - 3].revents & ZMQ_POLLIN){
415  }
416 
418  if(items[items.size() - 2].revents & ZMQ_POLLIN){
420  }
421 
423  if(items[items.size() - 1].revents & ZMQ_POLLIN){
425  }
426 
429 
432 
435 
436  }catch(zmq::error_t& error){
437  if (EINTR!=error.num()){
438  std::cout << "RouterServerProxy E: " << error.what() << std::endl;
439  }
440  }catch(std::exception& e){
441  std::cout << "RouterServerProxy E: " << e.what() << std::endl;
442  }catch(...){
443  std::cout << "RouterServerProxy E: Unknown exception" << std::endl;
444  }
445  }
446 
448 
449  return NULL;
450  }
451 
452  }
453 }