6 ReducingHandler::ReducingHandler(
7 const Poco::SharedPtr<ReducingPartsCountersStorageIf>& reducingPartsCountersStorage,
8 const Poco::SharedPtr<ReduceAdditionsStorageIf>& reduceAdditionsStorage,
9 const Poco::SharedPtr<TaskReducersManagerFactoryIf> &taskReducersManagerFactory,
10 const Poco::SharedPtr<ReducingOutputMessageBuilderIf>& reducingOutputMessageBuilder,
11 const Poco::SharedPtr<SupportNotifierIf>& supportNotifier)
13 _reducingPartsCountersStorage = reducingPartsCountersStorage;
14 _reduceAdditionsStorage = reduceAdditionsStorage;
15 _taskReducersManagerFactory = taskReducersManagerFactory;
16 _reducingOutputMessageBuilder = reducingOutputMessageBuilder;
17 _supportNotifier = supportNotifier;
23 Poco::SharedPtr<reduce_types::ReducingInputMessage>& reducingInputMessage)
26 if (not(_reducingPartsCountersStorage->findBy(itemId))){
27 createNewTaskReducersManager(reducingInputMessage->getProcessingType());
28 _reduceAdditionsStorage->addWith(itemId,
29 _reduceAdditionsStorage->findBy(reducingInputMessage->getProcessingType()));
30 _reducingPartsCountersStorage->addReducingPartsCounterWithKey(itemId);
31 _reduceAdditionsStorage->findBy(reducingInputMessage->getProcessingType())->createReducer(itemId);
33 _reducingPartsCountersStorage->incrementBy(itemId);
34 _taskTTLManager.
addTaskTTL( itemId, reducingInputMessage->getTTL() );
35 _reduceAdditionsStorage->findBy(itemId)->addDataInReducer(itemId, reducingInputMessage->getBody());
38 _supportNotifier->notifyNotFoundItemException(e);
41 _supportNotifier->notifyWrongJSONStructure(itemId, reducingInputMessage);
48 unsigned int taskTTL = _taskTTLManager.
getTaskTTL( itemId );
52 std::string reducingJSON =_reduceAdditionsStorage->findBy(itemId)->runReduceTaskForTaskId(itemId);
53 return _reducingOutputMessageBuilder->build(msgTypes, taskTTL, reducingJSON);
56 _supportNotifier->notifyNotFoundItemException(e);
57 return _reducingOutputMessageBuilder->build(e);
64 _reducingPartsCountersStorage->deleteBy(itemId);
65 _reduceAdditionsStorage->findBy(itemId)->deleteReducerBy(itemId);
66 _reduceAdditionsStorage->deleteBy(itemId);
69 _supportNotifier->notifyNotFoundItemException(e);
75 return _reducingPartsCountersStorage->getAccumulateReducingParts(itemId);
80 return _reducingPartsCountersStorage->getTotalTasksNumber();
100 unsigned int minTerminateTime )
106 std::vector<unsigned long long>& exceededTTLTasks,
107 unsigned int ttl)
const
115 _reduceAdditionsStorage->findBy(reducerType);
117 catch(NotFoundByKeyException &e){
118 _reduceAdditionsStorage->addWith(reducerType,
119 _taskReducersManagerFactory->build(reducerType));