1 #include <ReducingHandler.h>
2 #include <ReducingPartsCountersStorage.h>
3 #include <ReducingOutputMessageBuilder.h>
4 #include <ConsoleSupportNotifier.h>
5 #include <ReducingExceptionsTranslator.h>
6 #include <SphinxTaskReducersManagerBuilder.h>
7 #include <PocoJSONReducingInputMessageConvertor.h>
8 #include <PocoJSONReducingOutputMessageConvertor.h>
10 using namespace HCE::reduce;
11 using namespace HCE::reduce_types;
12 using namespace HCE::reduce_convertors;
13 using namespace HCE::sphinx;
14 using namespace HCE::sphinx::reduce_task;
25 SharedPtr<SphinxTaskReducersManagerIf<SphinxReduceJobBuilder<> > >sphinxTaskReducersManager =
26 sphinxTaskReducersManagerBuilder.
build();
31 reducingHandler.assign(
new ReducingHandler<>(reducingPartsCountersStorage, sphinxTaskReducersManager,
32 reducingOutputMessageBuilder, supportNotifier));
35 std::string process(std::string& json_reducing_input_message,
int itemId)
41 SharedPtr<ReducingInputMessage>reducingInputMessage;
42 SharedPtr<ReducingOutputMessage> reducingOutputMessage;
43 std::string json_reducing_output_message =
"";
51 std::string ret_string =
"some error in json struct";
55 reducingHandler->accumulateReducingData(itemId, reducingInputMessage);
58 if(reducingHandler->getAccumulateReducingPartsNumberBy(itemId) == nodesCount){
59 reducingOutputMessage = reducingHandler->makeReducing(itemId);
60 reducingHandler->deleteReducingTaskBy(itemId);
64 std::string ret_string =
"some logic error";
69 json_reducing_output_message = pocoJSONReducingOutputMessageConvertor.
convertToJSONFrom(reducingOutputMessage);
72 std::string ret_string =
"some error in json struct";
75 return json_reducing_output_message;
79 SharedPtr<ReducingHandler<> >reducingHandler;
91 SharedPtr<SphinxTaskReducersManagerIf<SphinxReduceJobBuilder<> > >sphinxTaskReducersManager =
92 sphinxTaskReducersManagerBuilder.
build();
97 reducingHandler.assign(
new ReducingHandler<>(reducingPartsCountersStorage, sphinxTaskReducersManager,
98 reducingOutputMessageBuilder, supportNotifier));
102 Poco::SharedPtr<ReducingOutputMessage> process(Poco::SharedPtr<ReducingInputMessage>& reducingInputMessage,
int itemId)
106 SharedPtr<ReducingOutputMessage> reducingOutputMessage;
108 reducingHandler->accumulateReducingData(itemId, reducingInputMessage);
109 if(reducingHandler->getAccumulateReducingPartsNumberBy(itemId) == nodesCount){
110 reducingOutputMessage = reducingHandler->makeReducing(itemId);
111 reducingHandler->deleteReducingTaskBy(itemId);
113 return reducingOutputMessage;
117 SharedPtr<ReducingHandler<> >reducingHandler;
122 int main(
int argc,
char** argv)
126 std::string json_sphinx_result_with_type =
"";
129 std::string reducingResult = processor.
process(json_sphinx_result_with_type, itemId);
131 std::cout << reducingResult <<
endl;
135 Poco::SharedPtr<HCE::reduce_types::ReducingInputMessage>reducingInputMessage;
138 Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage> reducingOutputMessage = processorWithObjects.
process(reducingInputMessage, itemId);