9 _reducingHandler = reducingHandlerBuilder.
build();
10 _rejectedMessages = 0;
17 unsigned long long itemId)
19 if( _reducingHandler->isTaskTerminatedByTTL( itemId ) ) {
24 std::string emptyMsg =
"";
25 Poco::SharedPtr<HCE::reduce_types::ReducingInputMessage>reducingInputMessage(
29 reducingInputMessage = _pocoJSONReducingInputMessageConvertor.
30 convertToReducingInputMessageFrom(jsonReducingInputMessage);
37 _reducingHandler->accumulateReducingData(itemId, reducingInputMessage);
45 return _reducingHandler->isTaskExpired( itemId, ttl ) ||
46 _reducingHandler->getAccumulateReducingPartsNumberBy(itemId) == nodesCount;
58 Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage>reducingOutputMessage =
59 _reducingHandler->makeReducing(itemId);
60 _reducingHandler->deleteReducingTaskBy(itemId);
62 return reducingOutputMessage;
67 return _reducingHandler->getTasksNumber();
83 return _reducingHandler->cleanupExpiredTasksByTTL( maxRemoveTasks, ttl );
87 unsigned int maxRemoveItems,
88 unsigned int minTerminateTime )
90 return _reducingHandler->cleanupExpiredTasksByTTLQueue( maxRemoveItems, minTerminateTime );
94 std::vector<unsigned long long>& exceededTTLTasks,
97 _reducingHandler->getExceededTTLTasks( maxTaskNumber, exceededTTLTasks, ttl );
102 return _rejectedMessages;