9 _reducingHandler = reducingHandlerBuilder.
build();
10 _rejectedMessages = 0;
17 unsigned long long itemId)
19 if( _reducingHandler->isTaskTerminatedByTTL( itemId ) ) {
24 std::string emptyMsg =
"";
25 Poco::SharedPtr<HCE::reduce_types::ReducingInputMessage>reducingInputMessage(
29 reducingInputMessage = _pocoJSONReducingInputMessageConvertor.
30 convertToReducingInputMessageFrom(jsonReducingInputMessage);
37 _reducingHandler->accumulateReducingData(itemId, reducingInputMessage);
46 return _reducingHandler->isTaskExpired( itemId, ttl ) ||
47 _reducingHandler->getAccumulateReducingPartsNumberBy(itemId) == nodesCount;
59 Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage>reducingOutputMessage =
60 _reducingHandler->makeReducing(itemId);
61 _reducingHandler->deleteReducingTaskBy(itemId);
63 return reducingOutputMessage;
68 return _reducingHandler->getTasksNumber();
85 return _reducingHandler->cleanupExpiredTasksByTTL( maxRemoveTasks, ttl );
89 unsigned int maxRemoveItems,
90 unsigned int minTerminateTime )
92 return _reducingHandler->cleanupExpiredTasksByTTLQueue( maxRemoveItems, minTerminateTime );
96 std::vector<unsigned long long>& exceededTTLTasks,
99 _reducingHandler->getExceededTTLTasks( maxTaskNumber, exceededTTLTasks, ttl );
105 return _rejectedMessages;