hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
main2.cpp
Go to the documentation of this file.
1 #include <ReducingHandler.h>
2 #include <ReducingPartsCountersStorage.h>
3 #include <ReducingOutputMessageBuilder.h>
4 #include <ConsoleSupportNotifier.h>
5 #include <ReducingExceptionsTranslator.h>
6 #include <SphinxTaskReducersManagerBuilder.h>
7 #include <PocoJSONReducingInputMessageConvertor.h>
8 #include <PocoJSONReducingOutputMessageConvertor.h>
9 
10 using namespace HCE::reduce;
11 using namespace HCE::reduce_types;
12 using namespace HCE::reduce_convertors;
13 using namespace HCE::sphinx;
14 using namespace HCE::sphinx::reduce_task;
15 using namespace Poco;
16 using namespace std;
17 
18 class Processor{
19  public:
21  {
22  // wrap in builder
23  SphinxTaskReducersManagerBuilder<> sphinxTaskReducersManagerBuilder;
24  SharedPtr<ReducingPartsCountersStorage> reducingPartsCountersStorage(new ReducingPartsCountersStorage());
25  SharedPtr<SphinxTaskReducersManagerIf<SphinxReduceJobBuilder<> > >sphinxTaskReducersManager =
26  sphinxTaskReducersManagerBuilder.build();
27  SharedPtr<ReducingExceptionsTranslator>reducingExceptionsTranslator(new ReducingExceptionsTranslator());
28  SharedPtr<ReducingOutputMessageBuilder>reducingOutputMessageBuilder(new ReducingOutputMessageBuilder(reducingExceptionsTranslator));
29  SharedPtr<SupportNotifierIf> supportNotifier(new ConsoleSupportNotifier());
30 
31  reducingHandler.assign(new ReducingHandler<>(reducingPartsCountersStorage, sphinxTaskReducersManager,
32  reducingOutputMessageBuilder, supportNotifier));
33  }
34 
35  std::string process(std::string& json_reducing_input_message, int itemId)
36  {
37  //may be class members
38  PocoJSONReducingInputMessageConvertor pocoJSONReducingInputMessageConvertor;
39  PocoJSONReducingOutputMessageConvertor pocoJSONReducingOutputMessageConvertor;
40 
41  SharedPtr<ReducingInputMessage>reducingInputMessage;
42  SharedPtr<ReducingOutputMessage> reducingOutputMessage;
43  std::string json_reducing_output_message = "";
44 
45  int nodesCount = 3; // may be parameters
46 
47  try{
48  reducingInputMessage = pocoJSONReducingInputMessageConvertor.convertToReducingInputMessageFrom(json_reducing_input_message);
49  }
51  std::string ret_string = "some error in json struct";
52  return ret_string;
53  }
54 
55  reducingHandler->accumulateReducingData(itemId, reducingInputMessage);
56 
57  try{
58  if(reducingHandler->getAccumulateReducingPartsNumberBy(itemId) == nodesCount){
59  reducingOutputMessage = reducingHandler->makeReducing(itemId);
60  reducingHandler->deleteReducingTaskBy(itemId);
61  }
62  }
63  catch(NotFoundByKeyException& e){
64  std::string ret_string = "some logic error";
65  return ret_string;
66  }
67 
68  try{
69  json_reducing_output_message = pocoJSONReducingOutputMessageConvertor.convertToJSONFrom(reducingOutputMessage);
70  }
72  std::string ret_string = "some error in json struct";
73  return ret_string;
74  }
75  return json_reducing_output_message;
76  }
77 
78  private:
79  SharedPtr<ReducingHandler<> >reducingHandler;
80 };
81 
82 
83 //work only with c++ objects
85  public:
87  {
88  // wrap in builder
89  SphinxTaskReducersManagerBuilder<> sphinxTaskReducersManagerBuilder;
90  SharedPtr<ReducingPartsCountersStorage> reducingPartsCountersStorage(new ReducingPartsCountersStorage());
91  SharedPtr<SphinxTaskReducersManagerIf<SphinxReduceJobBuilder<> > >sphinxTaskReducersManager =
92  sphinxTaskReducersManagerBuilder.build();
93  SharedPtr<ReducingExceptionsTranslator>reducingExceptionsTranslator(new ReducingExceptionsTranslator());
94  SharedPtr<ReducingOutputMessageBuilder>reducingOutputMessageBuilder(new ReducingOutputMessageBuilder(reducingExceptionsTranslator));
95  SharedPtr<SupportNotifierIf> supportNotifier(new ConsoleSupportNotifier());
96 
97  reducingHandler.assign(new ReducingHandler<>(reducingPartsCountersStorage, sphinxTaskReducersManager,
98  reducingOutputMessageBuilder, supportNotifier));
99  }
100 
101  // throws NotFoundByKeyException
102  Poco::SharedPtr<ReducingOutputMessage> process(Poco::SharedPtr<ReducingInputMessage>& reducingInputMessage, int itemId)
103  {
104 
105  int nodesCount = 3; // may be parameters
106  SharedPtr<ReducingOutputMessage> reducingOutputMessage;
107 
108  reducingHandler->accumulateReducingData(itemId, reducingInputMessage);
109  if(reducingHandler->getAccumulateReducingPartsNumberBy(itemId) == nodesCount){
110  reducingOutputMessage = reducingHandler->makeReducing(itemId);
111  reducingHandler->deleteReducingTaskBy(itemId);
112  }
113  return reducingOutputMessage;
114  }
115 
116  private:
117  SharedPtr<ReducingHandler<> >reducingHandler;
118 };
119 
120 
121 
122 int main(int argc, char** argv)
123 {
124  Processor processor;
125 
126  std::string json_sphinx_result_with_type = "";
127  int itemId = 3;
128 
129  std::string reducingResult = processor.process(json_sphinx_result_with_type, itemId);
130 
131  std::cout << reducingResult << endl;
132 
133  //
134  ProcessorWithObjects processorWithObjects;
135  Poco::SharedPtr<HCE::reduce_types::ReducingInputMessage>reducingInputMessage;
136 
137  try{
138  Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage> reducingOutputMessage = processorWithObjects.process(reducingInputMessage, itemId);
139  // ...
140  }
141  catch(NotFoundByKeyException& e){
142  std::cout << e.what() << std::endl;
143  return 1;
144  }
145 
146  return 0;
147 }