HCE project C++ developers source code library  1.1.1
HCE project developer library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ReducerFunctionalObject.cpp
Go to the documentation of this file.
2 
3 namespace HCE{
4 namespace reduce{
5 
7 {
8  ReducingHandlerBuilder reducingHandlerBuilder;
9  _reducingHandler = reducingHandlerBuilder.build();
10  _rejectedMessages = 0;
11  errCode = 0;
12 }
13 
15 
16 void ReducerFunctionalObject::accumulate(std::string& jsonReducingInputMessage,
17  unsigned long long itemId)
18 {
19  if( _reducingHandler->isTaskTerminatedByTTL( itemId ) ) {
20  _rejectedMessages ++;
21  return;
22  }
23 
24  std::string emptyMsg = "";
25  Poco::SharedPtr<HCE::reduce_types::ReducingInputMessage>reducingInputMessage(
27  HCE::types::MessageType::mtNull, HCE::reduce_types::FAKE_TTL, emptyMsg));
28  try{
29  reducingInputMessage = _pocoJSONReducingInputMessageConvertor.
30  convertToReducingInputMessageFrom(jsonReducingInputMessage);
31  errCode = OK;
32  }
34  errCode = WRONG_JSON_STRUCT_ERROR;
35  errMsg = WRONG_JSON_STRUCT_DESCR;
36  }
37  _reducingHandler->accumulateReducingData(itemId, reducingInputMessage);
38 }
39 
40 bool ReducerFunctionalObject::isCompleteTask(unsigned long long itemId, int nodesCount,
41  unsigned int ttl)
42 {
43  try{
44  errCode = OK;
45  return _reducingHandler->isTaskExpired( itemId, ttl ) ||
46  _reducingHandler->getAccumulateReducingPartsNumberBy(itemId) == nodesCount;
47  }
49  errCode = NOT_FOUND_ITEM_ID_ERROR;
50  errMsg = NOT_FOUND_ITEM_ID_DESCR;
51  }
52  return false;
53 }
54 
55 Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage>ReducerFunctionalObject::
56  reduce(unsigned long long itemId)
57 {
58  Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage>reducingOutputMessage =
59  _reducingHandler->makeReducing(itemId);
60  _reducingHandler->deleteReducingTaskBy(itemId);
61  errCode = OK;
62  return reducingOutputMessage;
63 }
64 
66 {
67  return _reducingHandler->getTasksNumber();
68 }
69 
71 {
72  return errCode;
73 }
74 
76 {
77  return errMsg;
78 }
79 
80 int ReducerFunctionalObject::cleanupExpiredTasksByTTL(unsigned int maxRemoveTasks,
81  unsigned int ttl)
82 {
83  return _reducingHandler->cleanupExpiredTasksByTTL( maxRemoveTasks, ttl );
84 }
85 
87  unsigned int maxRemoveItems,
88  unsigned int minTerminateTime )
89 {
90  return _reducingHandler->cleanupExpiredTasksByTTLQueue( maxRemoveItems, minTerminateTime );
91 }
92 
93 void ReducerFunctionalObject::getExceededTTLTasks(unsigned int maxTaskNumber,
94  std::vector<unsigned long long>& exceededTTLTasks,
95  unsigned int ttl)
96 {
97  _reducingHandler->getExceededTTLTasks( maxTaskNumber, exceededTTLTasks, ttl );
98 }
99 
101 {
102  return _rejectedMessages;
103 }
104 
105 }
106 }
107