hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ReducerFunctionalObjectSphinx.hpp
Go to the documentation of this file.
1 
14 #ifndef REDUCERFUNCTIONALOBJECT_HPP_
15 #define REDUCERFUNCTIONALOBJECT_HPP
16 
19 #include <Poco/SharedPtr.h>
20 
21 namespace HCE
22 {
23 namespace reduce
24 {
25 
26 const int OK = 0;
27 const int WRONG_JSON_STRUCT_ERROR = 1;
28 const int NOT_FOUND_ITEM_ID_ERROR = 2;
29 const std::string WRONG_JSON_STRUCT_DESCR = "err string";
30 const std::string NOT_FOUND_ITEM_ID_DESCR = "err string2";
31 
33  public:
35  {
36  ReducingHandlerBuilder reducingHandlerBuilder;
37  _reducingHandler = reducingHandlerBuilder.build();
38  _rejectedMessages = 0;
39  }
40 
41  void accumulate(std::string& jsonReducingInputMessage, unsigned long long itemId)
42  {
43  if( _reducingHandler->isTaskTerminatedByTTL( itemId ) ) {
44  _rejectedMessages ++;
45  return;
46  }
47 
48  std::string emptyMsg = "";
49  Poco::SharedPtr<HCE::reduce_types::ReducingInputMessage>reducingInputMessage(
51  try{
52  reducingInputMessage = _pocoJSONReducingInputMessageConvertor.convertToReducingInputMessageFrom(jsonReducingInputMessage);
53  errCode = OK;
54  }
55  catch(HCE::sphinx::reduce_task::WrongJSONStructureException& e){
56  errCode = WRONG_JSON_STRUCT_ERROR;
57  errMsg = WRONG_JSON_STRUCT_DESCR;
58  }
59  _reducingHandler->accumulateReducingData(itemId, reducingInputMessage);
60  }
61 
62  bool isCompleteTask(unsigned long long itemId, int nodesCount, unsigned int ttl = USE_TASK_TTL_VALUE)
63  {
64  try{
65  errCode = OK;
66  return _reducingHandler->isTaskExpired( itemId, ttl ) || _reducingHandler->getAccumulateReducingPartsNumberBy(itemId) == nodesCount;
67  }
68  catch(HCE::sphinx::reduce_task::NotFoundByKeyException& e){
69  errCode = NOT_FOUND_ITEM_ID_ERROR;
70  errMsg = NOT_FOUND_ITEM_ID_DESCR;
71  }
72  return false;
73  }
74 
75  Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage> reduce(unsigned long long itemId)
76  {
77  Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage>reducingOutputMessage = _reducingHandler->makeReducing(itemId);
78  _reducingHandler->deleteReducingTaskBy(itemId);
79  errCode = OK;
80  return reducingOutputMessage;
81  }
82 
83  unsigned int getTasksNumber(){
84  return _reducingHandler->getTasksNumber();
85  }
86 
87  int getErrCode(){
88  return errCode;
89  }
90 
91  std::string getErrMsg(){
92  return errMsg;
93  }
94 
95  int cleanupExpiredTasksByTTL(unsigned int maxRemoveTasks = 1, unsigned int ttl = USE_TASK_TTL_VALUE) {
96  return _reducingHandler->cleanupExpiredTasksByTTL( maxRemoveTasks, ttl );
97  }
98 
99  unsigned int cleanupExpiredTasksByTTLQueue( unsigned int maxRemoveItems, unsigned int minTerminateTime ) {
100  return _reducingHandler->cleanupExpiredTasksByTTLQueue( maxRemoveItems, minTerminateTime );
101  }
102 
103  void getExceededTTLTasks( unsigned int maxTaskNumber, std::vector<unsigned long long>& exceededTTLTasks,
104  unsigned int ttl = USE_TASK_TTL_VALUE) {
105  _reducingHandler->getExceededTTLTasks( maxTaskNumber, exceededTTLTasks, ttl );
106  }
107 
108  int getRejectedMessages() const {
109  return _rejectedMessages;
110  }
111 
112  private:
113  Poco::SharedPtr<HCE::reduce::ReducingHandler<> >_reducingHandler;
114  HCE::reduce_convertors::PocoJSONReducingInputMessageConvertor _pocoJSONReducingInputMessageConvertor;
115  int errCode;
116  std::string errMsg;
117  int _rejectedMessages;
118 };
119 
120 }
121 }
122 
123 #endif