hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ReducingHandler.cpp
Go to the documentation of this file.
1 #include "ReducingHandler.hpp"
2 
3 namespace HCE{
4 namespace reduce{
5 
6 ReducingHandler::ReducingHandler(
7  const Poco::SharedPtr<ReducingPartsCountersStorageIf>& reducingPartsCountersStorage,
8  const Poco::SharedPtr<ReduceAdditionsStorageIf>& reduceAdditionsStorage,
9  const Poco::SharedPtr<TaskReducersManagerFactoryIf> &taskReducersManagerFactory,
10  const Poco::SharedPtr<ReducingOutputMessageBuilderIf>& reducingOutputMessageBuilder,
11  const Poco::SharedPtr<SupportNotifierIf>& supportNotifier)
12 {
13  _reducingPartsCountersStorage = reducingPartsCountersStorage;
14  _reduceAdditionsStorage = reduceAdditionsStorage;
15  _taskReducersManagerFactory = taskReducersManagerFactory;
16  _reducingOutputMessageBuilder = reducingOutputMessageBuilder;
17  _supportNotifier = supportNotifier;
18  }
19 
21 
22 void ReducingHandler::accumulateReducingData(unsigned long long itemId,
23  Poco::SharedPtr<reduce_types::ReducingInputMessage>& reducingInputMessage)
24 {
25  try{
26  if (not(_reducingPartsCountersStorage->findBy(itemId))){
27  createNewTaskReducersManager(reducingInputMessage->getProcessingType());
28  _reduceAdditionsStorage->addWith(itemId,
29  _reduceAdditionsStorage->findBy(reducingInputMessage->getProcessingType()));
30  _reducingPartsCountersStorage->addReducingPartsCounterWithKey(itemId);
31  _reduceAdditionsStorage->findBy(reducingInputMessage->getProcessingType())->createReducer(itemId);
32  }
33  _reducingPartsCountersStorage->incrementBy(itemId);
34  _taskTTLManager.addTaskTTL( itemId, reducingInputMessage->getTTL() );
35  _reduceAdditionsStorage->findBy(itemId)->addDataInReducer(itemId, reducingInputMessage->getBody());
36  }
37  catch(NotFoundByKeyException& e){
38  _supportNotifier->notifyNotFoundItemException(e);
39  }
41  _supportNotifier->notifyWrongJSONStructure(itemId, reducingInputMessage);
42  }
43 }
44 
45 Poco::SharedPtr<reduce_types::ReducingOutputMessage>ReducingHandler::makeReducing(unsigned long long itemId)
46 {
47  try{
48  unsigned int taskTTL = _taskTTLManager.getTaskTTL( itemId );
49  //remove the task to avoid the task will be moved to terminated task queue if the reducing task spends too long time
50  _taskTTLManager.removeTaskTTL( itemId );
51  types::MessageType msgTypes = _reduceAdditionsStorage->findBy(itemId)->getType();
52  std::string reducingJSON =_reduceAdditionsStorage->findBy(itemId)->runReduceTaskForTaskId(itemId);
53  return _reducingOutputMessageBuilder->build(msgTypes, taskTTL, reducingJSON);
54  }
55  catch(NotFoundByKeyException& e){
56  _supportNotifier->notifyNotFoundItemException(e);
57  return _reducingOutputMessageBuilder->build(e);
58  }
59 }
60 
61 void ReducingHandler::deleteReducingTaskBy(unsigned long long itemId)
62 {
63  try{
64  _reducingPartsCountersStorage->deleteBy(itemId);
65  _reduceAdditionsStorage->findBy(itemId)->deleteReducerBy(itemId);
66  _reduceAdditionsStorage->deleteBy(itemId);
67  }
68  catch(NotFoundByKeyException& e){
69  _supportNotifier->notifyNotFoundItemException(e);
70  }
71 }
72 
74 {
75  return _reducingPartsCountersStorage->getAccumulateReducingParts(itemId);
76 }
77 
79 {
80  return _reducingPartsCountersStorage->getTotalTasksNumber();
81 }
82 
83 bool ReducingHandler::isTaskTerminatedByTTL( unsigned long long taskId ) const
84 {
85  return _taskTTLManager.isTaskTerminatedByTTL( taskId );
86 }
87 
88 bool ReducingHandler::isTaskExpired( unsigned long long taskId, unsigned int ttl)
89 {
90  return _taskTTLManager.isTaskExpired( taskId, ttl );
91 }
92 
93 int ReducingHandler::cleanupExpiredTasksByTTL(unsigned int maxRemoveTasks,
94  unsigned int ttl)
95 {
96  return _taskTTLManager.cleanupExpiredTasksByTTL( maxRemoveTasks, ttl );
97 }
98 
99 unsigned int ReducingHandler::cleanupExpiredTasksByTTLQueue(unsigned int maxRemoveItems,
100  unsigned int minTerminateTime )
101 {
102  return _taskTTLManager.cleanupExpiredTasksByTTLQueue(maxRemoveItems, minTerminateTime );
103 }
104 
105 void ReducingHandler::getExceededTTLTasks(unsigned int maxTaskNumber,
106  std::vector<unsigned long long>& exceededTTLTasks,
107  unsigned int ttl) const
108 {
109  _taskTTLManager.getExceededTTLTasks( maxTaskNumber, exceededTTLTasks, ttl );
110 }
111 
112 void ReducingHandler::createNewTaskReducersManager(HCE::types::MessageType reducerType)
113 {
114  try{
115  _reduceAdditionsStorage->findBy(reducerType);
116  }
117  catch(NotFoundByKeyException &e){
118  _reduceAdditionsStorage->addWith(reducerType,
119  _taskReducersManagerFactory->build(reducerType));
120  }
121 }
122 
123 }
124 }
125