hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DataReducerProxy.cpp
Go to the documentation of this file.
1 #include "DataReducerProxy.hpp"
2 
3 namespace HCE {
4  namespace handlers {
5 
6  DataReducerProxy::DataReducerProxy(std::string name, zmq::context_t& ctx, const std::string& clientIdentity, unsigned char nodeMode, int64_t startedAt,
7  unsigned char logPriority, const std::string& iniFile) :
8  Handler(name, ctx, clientIdentity, std::string(""), startedAt, logPriority, iniFile),
9  _inprocReducerInSock(nullptr), _inprocReducerOutSock(nullptr),
10  _reducerTtlTasksCleanupAt(TTL_TASKS_CLEANUP_INTERVAL_DEFAULT), _extendedReducerFO(NULL){
11 
12  setNodeMode(nodeMode);
13  initialize();
14  }
15 
17  deinitialize();
18  }
19 
21  logger.log(NODE_LOG_MODE_INFORMATION) << "Initialize..." << flush;
22 
24  reducerInit();
25 
27  setStatRequests(0);
28 
31 
33 
36 
37  //TODO: must be replaced with parent methods for client and server sockets rebuild
39  _inprocReducerInSock = new zmq::socket_t(_context, ZMQ_DEALER);
42  logger.log(NODE_LOG_MODE_DEBUG) << "Bound to inproc_reducer_in" << flush;
43 
45  _inprocReducerOutSock = new zmq::socket_t(_context, ZMQ_DEALER);
48  logger.log(NODE_LOG_MODE_DEBUG) << "Bound to inproc_reducer_out" << flush;
49 
50  return true;
51  }
52 
57  }
58  delete _inprocReducerInSock;
59  _inprocReducerInSock = NULL;
60  }
64  }
65  delete _inprocReducerOutSock;
66  _inprocReducerOutSock = NULL;
67  }
68 
69  _responses.clear();
70 
72  delete _extendedReducerFO;
73  _extendedReducerFO = NULL;
74  }
75  }
76 
77  void DataReducerProxy::setStatRequests(unsigned int statRequests){
79  }
80 
83  }
84 
85  void DataReducerProxy::setStatResponses(unsigned int statResponses){
87  }
88 
91  }
92 
93  void DataReducerProxy::setClientsNumber(unsigned int clientsNumber){
95  }
96 
99  }
100 
101  void DataReducerProxy::setNodeMode(unsigned char nodeMode){
103  }
104 
105  unsigned char DataReducerProxy::getNodeMode(void){
107  }
108 
112 
113  if(_extendedReducerFO){
114  logger.log(NODE_LOG_MODE_INFORMATION) << " Sphinx reducer FO instance created." << flush;
115  }else{
118  }
119  }
120 
121  void DataReducerProxy::defaultReducerResponseItemCreate(std::string& messageId, std::string& messageBody){
123  if(ri){
125  ri->bodies = messageBody;
127  ri->count = 1;
129  _responses[messageId] = ri;
130  }else{
131  logger.log(NODE_LOG_MODE_ERROR) << "ERROR create new response item" << flush;
132  }
133  }
134 
135  void DataReducerProxy::defaultReducerResponseItemAccumulate(std::map<std::string, DefaultReducerResponseItem*>::iterator& DefaultReducerResponseItemIterator, std::string& messageBody){
137  DefaultReducerResponseItemIterator->second->bodies += " " + messageBody;
138  DefaultReducerResponseItemIterator->second->count++;
139  }
140 
141  std::string DataReducerProxy::defaultReducerResponseItemReduce(std::map<std::string, DefaultReducerResponseItem*>::iterator& DefaultReducerResponseItemIterator){
143  std::stringstream ss(DefaultReducerResponseItemIterator->second->bodies);
144  std::vector<unsigned long long> items((std::istream_iterator<unsigned long long>(ss)), std::istream_iterator<unsigned long long>());
146  sort(items.begin(), items.end(), std::less<unsigned long long>());
148  items.erase(std::unique(items.begin(), items.end()), items.end());
150  std::ostringstream oss;
151  std::copy(items.begin(), items.end() - 1, std::ostream_iterator<unsigned long long>(oss, " "));
152  oss << items.back();
153  std::string body(oss.str());
154 
155  return body;
156  }
157 
158  void DataReducerProxy::defaultReducerReducePlainText(std::string& messageId, std::string& messageBody){
160  zmsg msg(messageId.c_str());
161 
163  std::map<std::string, DefaultReducerResponseItem*>::iterator DefaultReducerResponseItemIterator = _responses.find(messageId);
164  if(DefaultReducerResponseItemIterator == _responses.end()){
166  defaultReducerResponseItemCreate(messageId, messageBody);
167  }else{
169  defaultReducerResponseItemAccumulate(DefaultReducerResponseItemIterator, messageBody);
171  if(DefaultReducerResponseItemIterator->second->count == getClientsNumber()){
173  std::string body = defaultReducerResponseItemReduce(DefaultReducerResponseItemIterator);
174 
176  msg.append(body.c_str());
177 
178  logger.log(NODE_LOG_MODE_TRACE) << "MSG TO inproc_reducer_out" << endl << msg.dump() << flush;
179 
181  msg.send(*_inprocReducerOutSock);
182 
183  unsigned int statResponses = getStatResponses();
184  setStatResponses(++statResponses);
185 
186  logger.log(NODE_LOG_MODE_DEBUG) << "RESPONSE from plain text reducer to inproc_reducer_out socket for upper server sent, message [" << messageId << "], size:" << body.size()
187  << " bytes, accumulated:" << DefaultReducerResponseItemIterator->second->count << ", n=" << getStatResponses() << flush;
188 
190  delete DefaultReducerResponseItemIterator->second;
191 
193  _responses.erase(DefaultReducerResponseItemIterator);
194  }
195  }
196  }
197 
198  void DataReducerProxy::reduceJson(std::string& messageId, std::string& messageBody){
199  //Generate itemId on the basis of messageId string
200  Poco::Checksum checksum(Poco::Checksum::TYPE_CRC32);
201  checksum.update(messageId);
202  unsigned long long itemId = checksum.checksum();
203 
204  logger.log(NODE_LOG_MODE_DEBUG) << "MERGE_JSON2: [" << messageBody << "], messageId=" << messageId << ", clietns: " << getClientsNumber() << ", itemId: " << itemId << flush;
205 
206  _extendedReducerFO->accumulate(messageBody, itemId);
209  }
210 
211  if(_extendedReducerFO->isCompleteTask(itemId, (int)getClientsNumber()) == true){
212  Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage> reducingOutputMessage = _extendedReducerFO->reduce(itemId);
213 
214  logger.log(NODE_LOG_MODE_DEBUG) << "Reducer output json [" << reducingOutputMessage->getBody() << "], type: " << static_cast<int>(reducingOutputMessage->getProcessingType()) << flush;
215 
216  //Response message body json string
217  std::string reducerResultJson;
218  HCE::sphinx::SphinxDefaultJSON defaultJson;
219 
220  if(reducingOutputMessage->getProcessingErrorCode()){
221  //Error of reducer processing, return default empty result
222  logger.log(NODE_LOG_MODE_DEBUG) << NODE_FUNCTIONAL_ERROR_SPHINX_REDUCER << " reduce task error " << reducingOutputMessage->getProcessingErrorCode() << " : "
223  << reducingOutputMessage->getProcessingErrorMsg() << flush;
224  //Create empty Sphinx result object to return it for farther processing
225  reducerResultJson = defaultJson.getJSON();
226  }else{
227  //Get body to return to upper level reducer
228  reducerResultJson = reducingOutputMessage->getBody();
229  }
230 
231  //Create message cover object to return it to upper level processing (reducing)
232  HCE::JsonMessageCover msgCover(reducingOutputMessage->getProcessingType(), reducerResultJson, reducingOutputMessage->getTTL());
233  if(!msgCover.serialize(reducerResultJson)){
235  logger.log(NODE_LOG_MODE_ERROR) << NODE_FUNCTIONAL_ERROR_SPHINX << " cover searialization error " << msgCover.getErrorCode() << " : " << msgCover.getErrorMsg() << flush;
236 
237  //Create empty Sphinx result object to return it for farther processing
238  reducerResultJson = defaultJson.getJSON();
239  }
240 
241  logger.log(NODE_LOG_MODE_DEBUG) << "Reducer covered json : " << reducerResultJson << flush;
242 
244  zmsg msg(messageId.c_str());
246  msg.append(reducerResultJson.c_str());
247 
248  logger.log(NODE_LOG_MODE_TRACE) << "MSG TO inproc_reducer_out" << endl << msg.dump() << flush;
249 
251  msg.send(*_inprocReducerOutSock);
252 
253  unsigned int statResponses = getStatResponses();
254  setStatResponses(++statResponses);
255 
256  logger.log(NODE_LOG_MODE_DEBUG) << "RESPONSE from json reducer to inproc_reducer_out socket for upper server sent, message [" << messageId << "], size:" << reducerResultJson.size()
257  << " bytes, items accumulated:" << /*DefaultReducerResponseItemIterator->second->count <<*/", time:" << reducingOutputMessage->getProcessingElapsedTime()
258  << ", n=" << getStatResponses() << flush;
259  }else{
260  //TODO: possible call for garbage collector or log some timing stats when not all messages are accumulated for the reduce task
261  logger.log(NODE_LOG_MODE_DEBUG) << "Reducer task is not complete : messageId=" << messageId << ", clietns: " << getClientsNumber() << ", itemId: " << itemId << "\n"
263  }
264  }
265 
269 
270  logger.log(NODE_LOG_MODE_TRACE) << "MSG FROM inproc_reducer_in" << endl << msg.dump() << flush;
271 
272  unsigned int statRequests = getStatRequests();
273  setStatRequests(++statRequests);
274 
275  unsigned int clientsNumber = 0;
277  std::string messageId((const char*)msg.pop_front().c_str());
278  std::string messageBody((const char*)msg.pop_front().c_str());
279  std::string messageRoute((const char*)msg.pop_front().c_str());
280  std::istringstream((const char*)msg.pop_front().c_str()) >> clientsNumber;
281  setClientsNumber(clientsNumber);
282 
283  logger.log(NODE_LOG_MODE_DEBUG) << "REQUEST from inproc_reducer_in, nodeMode=" << static_cast<unsigned int>(getNodeMode()) << ", message [" << messageId
284  << "], size:" << messageBody.size() << ", clients:" << getClientsNumber() << " , n=" << getStatRequests() << flush;
285 
288  if(messageBody.size() > 0 && messageBody[0] == NODE_JSON_BEGIN_CHAR){
289  reduceJson(messageId, messageBody);
290  }else{
292  }
293  }else{
296  msg.append(messageId.c_str());
297  msg.append(messageBody.c_str());
298  msg.append(messageRoute.c_str());
299 
300  unsigned int statResponses = getStatResponses();
301  setStatResponses(++statResponses);
302 
303  logger.log(NODE_LOG_MODE_DEBUG) << "RESPONSE to inproc_reducer_out socket from proxy node redirector to upper server sent, message [" << messageId << "], size:" << messageBody.size()
304  << ", n=" << getStatResponses() << flush;
305  logger.log(NODE_LOG_MODE_TRACE) << "MSG TO inproc_reducer_out" << endl << msg.dump() << flush;
306 
309  }
310  }
311 
312  std::string DataReducerProxy::processAdminCommands(std::string & command, std::string & parameters){
313  std::string ret = NODE_ADMIN_ERROR_OK;
314 
315  logger.log(NODE_LOG_MODE_DEBUG) << "Admin command requested [" << command << "], parameters [" << parameters << "]" << flush;
316 
318  if(command == NODE_MSG_STAT){
319  std::stringstream result;
321  << NODE_ADMIN_COMMAND_DELIMITER << "clientsNumber=" << getClientsNumber();
322  ret = result.str();
323  }else if(command == NODE_MSG_PROPERTIES){
325  ret = getPropertyInformation();
326 /*
327  }else if(command == NODE_MSG_GET_DATA_PROCESSING_MODE){
328  std::stringstream result;
329 
330  result << NODE_ADMIN_ERROR_OK << NODE_ADMIN_COMMAND_DELIMITER << std::to_string(static_cast<unsigned int>(getNodeMode()));
331 
332  ret = result.str();
333  }else if(command == NODE_MSG_SET_DATA_PROCESSING_MODE){
334  std::stringstream result;
335 
337  unsigned int processingMode;
338  std::istringstream(parameters) >> processingMode;
339 
340  result << NODE_ADMIN_ERROR_OK << NODE_ADMIN_COMMAND_DELIMITER << std::to_string(static_cast<unsigned int>(getNodeMode()));
341 
342  //Change data processing mode
343  getNodeMode() = static_cast<unsigned char>(processingMode);
344 
345  ret = result.str();
346 */
347  }else if(command == NODE_MSG_MANAGER_MODE_GET){
348  ret = NODE_ADMIN_ERROR_OK + NODE_ADMIN_COMMAND_DELIMITER + NODE_ADMIN_PROPERTY_NAME_MMODE + "=" + std::to_string(static_cast<unsigned int>(getNodeMode()));
349  }else if(command == NODE_MSG_MANAGER_MODE_SET){
351  int nodeMode = getNodeMode();
352  //Parse log priority from parameters
353  std::istringstream(parameters) >> nodeMode;
354  //Make response
355  ret = NODE_ADMIN_ERROR_OK + NODE_ADMIN_COMMAND_DELIMITER + NODE_ADMIN_PROPERTY_NAME_MMODE + "=" + std::to_string(static_cast<unsigned int>(getNodeMode()));
356  //Set new log priority
357  setNodeMode(static_cast<unsigned char>(nodeMode));
358  }else{
361  }
362 
363  return ret;
364  }
365 
367  if(s_clock() > _reducerTtlTasksCleanupAt){
368 
371 
372  if(tasks > 0 || tasksItems){
373  logger.log(NODE_LOG_MODE_TRACE) << "TTL terminated tasks and queue cleaned, " << tasks << " tasks, " << tasksItems << " queue items removed" << flush;
374  }
375 
378  }
379  }
380 
383  std::stringstream result;
384  result << NODE_ADMIN_ERROR_OK
386  << NODE_ADMIN_COMMAND_DELIMITER << "responses=" << getStatResponses()
387  << NODE_ADMIN_COMMAND_DELIMITER << "clientsNumber=" << getClientsNumber()
388  << NODE_ADMIN_COMMAND_DELIMITER << "nodeMode=" << static_cast<unsigned int>(getNodeMode())
390  << NODE_ADMIN_COMMAND_DELIMITER << "reducerRejectedMessages=" << _extendedReducerFO->getRejectedMessages();
391  return result.str();
392  }
393 
397 
399  while(getInProgress()){
400  try {
402  zmq::pollitem_t items[] = {{*_inprocReducerInSock, 0, ZMQ_POLLIN, 0}, {*_inprocAdminSock, 0, ZMQ_POLLIN, 0}};
403 
405  zmq::poll(items, 2, getPollTimeout());
406 
408  if(items[0].revents & ZMQ_POLLIN){
410  }
411 
413  if(items[1].revents & ZMQ_POLLIN){
415  }
416 
419 
422 
425 
426  }catch(zmq::error_t& error){
427  if (EINTR!=error.num()){
428  std::cout << "DataReducerProxy E: " << error.what() << std::endl;
429  }
430  }catch(std::exception& e){
431  std::cout << "DataReducerProxy E: " << e.what() << std::endl;
432  }catch(...){
433  std::cout << "DataReducerProxy E: Unknown exception" << std::endl;
434  }
435  }
436 
438  }else{
439  logger.log(NODE_LOG_MODE_FATAL) << "DataReducerProxy::process() aborted because not all required sockets created : " << NODE_CRITICAL_ERROR_MESSAGE << flush;
441  }
442 
443  return NULL;
444  }
445 
446  }
447 }