14 #ifndef REDUCERFUNCTIONALOBJECT_HPP_
15 #define REDUCERFUNCTIONALOBJECT_HPP
19 #include <Poco/SharedPtr.h>
37 _reducingHandler = reducingHandlerBuilder.
build();
38 _rejectedMessages = 0;
41 void accumulate(std::string& jsonReducingInputMessage,
unsigned long long itemId)
43 if( _reducingHandler->isTaskTerminatedByTTL( itemId ) ) {
48 std::string emptyMsg =
"";
49 Poco::SharedPtr<HCE::reduce_types::ReducingInputMessage>reducingInputMessage(
55 catch(HCE::sphinx::reduce_task::WrongJSONStructureException& e){
59 _reducingHandler->accumulateReducingData(itemId, reducingInputMessage);
66 return _reducingHandler->isTaskExpired( itemId, ttl ) || _reducingHandler->getAccumulateReducingPartsNumberBy(itemId) == nodesCount;
68 catch(HCE::sphinx::reduce_task::NotFoundByKeyException& e){
75 Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage>
reduce(
unsigned long long itemId)
77 Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage>reducingOutputMessage = _reducingHandler->makeReducing(itemId);
78 _reducingHandler->deleteReducingTaskBy(itemId);
80 return reducingOutputMessage;
84 return _reducingHandler->getTasksNumber();
96 return _reducingHandler->cleanupExpiredTasksByTTL( maxRemoveTasks, ttl );
100 return _reducingHandler->cleanupExpiredTasksByTTLQueue( maxRemoveItems, minTerminateTime );
105 _reducingHandler->getExceededTTLTasks( maxTaskNumber, exceededTTLTasks, ttl );
109 return _rejectedMessages;
113 Poco::SharedPtr<HCE::reduce::ReducingHandler<> >_reducingHandler;
117 int _rejectedMessages;