hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ReducerFunctionalObjectTest.cpp
Go to the documentation of this file.
1 /*
2  * ReducerFunctionalObjectSphinxTest.cpp
3  *
4  * Created on: Oct 12, 2013
5  * Author: Steven
6  */
7 
8 #include <gtest/gtest.h>
10 #include "SphinxResultSorterIf.hpp"
11 #include "SphinxResultData.hpp"
12 #include "EncodeDecodeBase64.hpp"
13 #include "ReducingInputMessage.hpp"
15 #include <vector>
16 #include <Poco/Thread.h>
17 
18 using namespace std;
19 using namespace HCE;
20 using namespace HCE::reduce;
25 using namespace HCE::sphinx::reduce_task;
26 
27 std::string buildJsonReducingInputMessage( int milliseconds_ttl ) {
29  SphinxResultData result;
30  std::string str_result = buildInputJSONFromSphinxResultData(result);
31 
32  Poco::SharedPtr<ReducingInputMessage> inputMessage( new ReducingInputMessage(types::MessageType::mtSphinx,
33  milliseconds_ttl, str_result ) );
34 
35  return convertor.convertToJSONFrom( inputMessage );
36 }
37 
38 class ReducerFunctionalObjectTest: public ::testing::Test {
39 protected:
40  virtual void SetUp(){
41  taskId = 1;
42  std::string jsonReducingInputMessage = buildJsonReducingInputMessage( 200 );
43  obj.accumulate( jsonReducingInputMessage, taskId );
44  }
45 
46  virtual void TearDown(){
47  }
48 
49  void sleep_milliseconds( unsigned int milliseconds ) {
50  Poco::Thread::sleep( milliseconds );
51  }
52 
53  unsigned int milliseconds( int millis ) {
54  return millis;
55  }
56 
58  unsigned long long taskId;
59 };
60 
61 
62 TEST_F(ReducerFunctionalObjectTest, taskCompleteByNodeCount ) {
63  ASSERT_TRUE( obj.isCompleteTask( taskId, 1, milliseconds( 200 ) ) );
64  ASSERT_TRUE( obj.isCompleteTask( taskId, 1 ) );
65 }
66 
67 TEST_F(ReducerFunctionalObjectTest, taskCompleteByTTLParam ) {
68  sleep_milliseconds( 10 );
69  ASSERT_TRUE( obj.isCompleteTask( taskId, 2, milliseconds( 10 ) ) );
70 }
71 
72 TEST_F(ReducerFunctionalObjectTest, taskCompleteByTaskTTL )
73 {
74  sleep_milliseconds( 200 );
75  ASSERT_TRUE( obj.isCompleteTask( taskId, 2 ) );
76 }
77 
78 TEST_F(ReducerFunctionalObjectTest, taskNotCompleteByTTLParam )
79 {
80  ASSERT_FALSE( obj.isCompleteTask( taskId, 2, milliseconds( 300 ) ) );
81 }
82 
83 TEST_F(ReducerFunctionalObjectTest, taskNotCompleteByTaskTTL )
84 {
85  ASSERT_FALSE( obj.isCompleteTask( taskId, 2 ) );
86 }
87 
88 TEST_F(ReducerFunctionalObjectTest, cleanupByTTLParam )
89 {
90  sleep_milliseconds( 10 );
91 
92  EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, milliseconds( 10 ) ), 1 );
93 }
94 
96 {
97  sleep_milliseconds( 200 );
98 
99  EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, 0 ), 1 );
100 }
101 
102 TEST_F(ReducerFunctionalObjectTest, NotCleanupByTaskParam )
103 {
104  EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, milliseconds( 100 ) ), 0 );
105 }
106 
107 TEST_F(ReducerFunctionalObjectTest, NotCleanupByTaskTTL )
108 {
109  sleep_milliseconds( 10 );
110 
111  EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, 0 ), 0 );
112 }
113 
114 TEST_F(ReducerFunctionalObjectTest, ExceededByTTLParam )
115 {
116  std::vector< unsigned long long > tasks;
117 
118  sleep_milliseconds( 10 );
119 
120  obj.getExceededTTLTasks( 10, tasks, milliseconds( 10 ) );
121 
122  ASSERT_TRUE( tasks.size() == 1 );
123 
124 }
125 
127 {
128  sleep_milliseconds( 200 );
129 
130  std::vector< unsigned long long > tasks;
131 
132  obj.getExceededTTLTasks( 10, tasks, 0 );
133 
134  ASSERT_TRUE( tasks.size() == 1 );
135 
136 }
137 
138 
139 TEST_F(ReducerFunctionalObjectTest, NotExceededByTaskTTL )
140 {
141 
142  sleep_milliseconds( 100 );
143 
144  std::vector< unsigned long long > tasks;
145 
146  obj.getExceededTTLTasks( 10, tasks, 0 );
147 
148  ASSERT_TRUE( tasks.size() == 0 );
149 
150 }
151 
152 TEST_F(ReducerFunctionalObjectTest, NotExceededByTTLParam )
153 {
154  sleep_milliseconds( 10 );
155 
156  std::vector< unsigned long long > tasks;
157 
158  obj.getExceededTTLTasks( 10, tasks, milliseconds( 50 ) );
159 
160  ASSERT_TRUE( tasks.size() == 0 );
161 
162 }
163 
164 TEST_F(ReducerFunctionalObjectTest, NoExpiredTasksInTTLQueue )
165 {
166  std::vector< unsigned long long > tasks;
167 
168  ASSERT_FALSE( obj.isCompleteTask( taskId, 2, milliseconds( 10 ) ) );
169  obj.reduce( taskId );
170  ASSERT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 50 ) ), 0U );
171 }
172 
173 TEST_F(ReducerFunctionalObjectTest, ExpiredTasksInTTLQueue )
174 {
175  sleep_milliseconds( 201 );
176 
177  std::vector< unsigned long long > tasks;
178 
179  ASSERT_TRUE( obj.isCompleteTask( taskId, 2 ) );
180  obj.reduce( taskId );
181  ASSERT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 10 ) ), 0U );
182  sleep_milliseconds( 10 );
183  ASSERT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 10 ) ), 1U );
184 }
185 
186 TEST_F(ReducerFunctionalObjectTest, RejectTerminateTaskAdd )
187 {
188  sleep_milliseconds( 201 );
189 
190  ASSERT_TRUE( obj.isCompleteTask( taskId, 2, milliseconds( 10 ) ) );
191  obj.reduce( taskId );
192  EXPECT_EQ( obj.getRejectedMessages(), 0 );
193  std::string jsonReducingInputMessage = buildJsonReducingInputMessage( 200 );
194  obj.accumulate( jsonReducingInputMessage, taskId );
195  EXPECT_EQ( obj.getRejectedMessages(), 1 );
196 }
197 
198 TEST_F(ReducerFunctionalObjectTest, AddTaskAfterCleanupTTLQueue )
199 {
200  sleep_milliseconds( 201 );
201 
202  ASSERT_TRUE( obj.isCompleteTask( taskId, 2, milliseconds( 1 ) ) );
203  obj.reduce( taskId );
204  EXPECT_EQ( obj.getRejectedMessages(), 0 );
205  std::string jsonReducingInputMessage = buildJsonReducingInputMessage( 100 );
206  obj.accumulate( jsonReducingInputMessage, taskId );
207  EXPECT_EQ( obj.getRejectedMessages(), 1 );
208 
209  sleep_milliseconds( 10 );
210  EXPECT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 10 ) ), 1U );
211  obj.accumulate( jsonReducingInputMessage, taskId );
212  EXPECT_EQ( obj.getRejectedMessages(), 1 );
213 }
214 
215