hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ReducerFunctionalObject.cpp
Go to the documentation of this file.
2 
3 namespace HCE{
4 namespace reduce{
5 
7 {
8  ReducingHandlerBuilder reducingHandlerBuilder;
9  _reducingHandler = reducingHandlerBuilder.build();
10  _rejectedMessages = 0;
11  errCode = 0;
12 }
13 
15 
16 void ReducerFunctionalObject::accumulate(std::string& jsonReducingInputMessage,
17  unsigned long long itemId)
18 {
19  if( _reducingHandler->isTaskTerminatedByTTL( itemId ) ) {
20  _rejectedMessages ++;
21  return;
22  }
23 
24  std::string emptyMsg = "";
25  Poco::SharedPtr<HCE::reduce_types::ReducingInputMessage>reducingInputMessage(
27  HCE::types::MessageType::mtNull, HCE::reduce_types::FAKE_TTL, emptyMsg));
28  try{
29  reducingInputMessage = _pocoJSONReducingInputMessageConvertor.
30  convertToReducingInputMessageFrom(jsonReducingInputMessage);
31  errCode = OK;
32  }
34  errCode = WRONG_JSON_STRUCT_ERROR;
35  errMsg = WRONG_JSON_STRUCT_DESCR;
36  }
37  _reducingHandler->accumulateReducingData(itemId, reducingInputMessage);
38 }
39 
40 // cppcheck-suppress unusedFunction
41 bool ReducerFunctionalObject::isCompleteTask(unsigned long long itemId, int nodesCount,
42  unsigned int ttl)
43 {
44  try{
45  errCode = OK;
46  return _reducingHandler->isTaskExpired( itemId, ttl ) ||
47  _reducingHandler->getAccumulateReducingPartsNumberBy(itemId) == nodesCount;
48  }
50  errCode = NOT_FOUND_ITEM_ID_ERROR;
51  errMsg = NOT_FOUND_ITEM_ID_DESCR;
52  }
53  return false;
54 }
55 
56 Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage>ReducerFunctionalObject::
57  reduce(unsigned long long itemId)
58 {
59  Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage>reducingOutputMessage =
60  _reducingHandler->makeReducing(itemId);
61  _reducingHandler->deleteReducingTaskBy(itemId);
62  errCode = OK;
63  return reducingOutputMessage;
64 }
65 
67 {
68  return _reducingHandler->getTasksNumber();
69 }
70 
71 // cppcheck-suppress unusedFunction
73 {
74  return errCode;
75 }
76 
78 {
79  return errMsg;
80 }
81 
82 int ReducerFunctionalObject::cleanupExpiredTasksByTTL(unsigned int maxRemoveTasks,
83  unsigned int ttl)
84 {
85  return _reducingHandler->cleanupExpiredTasksByTTL( maxRemoveTasks, ttl );
86 }
87 
89  unsigned int maxRemoveItems,
90  unsigned int minTerminateTime )
91 {
92  return _reducingHandler->cleanupExpiredTasksByTTLQueue( maxRemoveItems, minTerminateTime );
93 }
94 
95 void ReducerFunctionalObject::getExceededTTLTasks(unsigned int maxTaskNumber,
96  std::vector<unsigned long long>& exceededTTLTasks,
97  unsigned int ttl)
98 {
99  _reducingHandler->getExceededTTLTasks( maxTaskNumber, exceededTTLTasks, ttl );
100 }
101 
102 // cppcheck-suppress unusedFunction
104 {
105  return _rejectedMessages;
106 }
107 
108 }
109 }
110