hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ReducerFunctionalObjectSphinxTest.cpp
Go to the documentation of this file.
1 /*
2  * ReducerFunctionalObjectSphinxTest.cpp
3  *
4  * Created on: Oct 12, 2013
5  * Author: Steven
6  */
7 
8 #include <gtest/gtest.h>
10 #include "SphinxResultSorterIf.hpp"
11 #include "SphinxResultData.hpp"
12 #include "EncodeDecodeBase64.hpp"
13 #include "ReducingInputMessage.hpp"
15 #include <vector>
16 #include <Poco/Thread.h>
17 
18 using namespace std;
23 using namespace HCE::sphinx::reduce_task;
24 
25 std::string buildJsonReducingInputMessage( int milliseconds_ttl ) {
27  SphinxResultData result;
28  std::string str_result = buildInputJSONFromSphinxResultData(result);
29 
30  Poco::SharedPtr<ReducingInputMessage> inputMessage( new ReducingInputMessage(1, milliseconds_ttl, str_result ) );
31 
32  return convertor.convertToJSONFrom( inputMessage );
33 }
34 
35 class ReducerFunctionalObjectSphinxTest: public ::testing::Test {
36 protected:
37  virtual void SetUp(){
38  taskId = 1;
39  std::string jsonReducingInputMessage = buildJsonReducingInputMessage( 200 );
40  obj.accumulate( jsonReducingInputMessage, taskId );
41  }
42 
43  virtual void TearDown(){
44  }
45 
46  void sleep_milliseconds( unsigned int milliseconds ) {
47  Poco::Thread::sleep( milliseconds );
48  }
49 
50  unsigned int milliseconds( int millis ) {
51  return millis;
52  }
53 
55  unsigned long long taskId;
56 };
57 
58 
59 TEST_F(ReducerFunctionalObjectSphinxTest, taskCompleteByNodeCount ) {
60  ASSERT_TRUE( obj.isCompleteTask( taskId, 1, milliseconds( 200 ) ) );
61  ASSERT_TRUE( obj.isCompleteTask( taskId, 1 ) );
62 }
63 
64 TEST_F(ReducerFunctionalObjectSphinxTest, taskCompleteByTTLParam ) {
65  sleep_milliseconds( 10 );
66  ASSERT_TRUE( obj.isCompleteTask( taskId, 2, milliseconds( 10 ) ) );
67 }
68 
69 TEST_F(ReducerFunctionalObjectSphinxTest, taskCompleteByTaskTTL )
70 {
71  sleep_milliseconds( 200 );
72  ASSERT_TRUE( obj.isCompleteTask( taskId, 2 ) );
73 }
74 
75 TEST_F(ReducerFunctionalObjectSphinxTest, taskNotCompleteByTTLParam )
76 {
77  ASSERT_FALSE( obj.isCompleteTask( taskId, 2, milliseconds( 300 ) ) );
78 }
79 
80 TEST_F(ReducerFunctionalObjectSphinxTest, taskNotCompleteByTaskTTL )
81 {
82  ASSERT_FALSE( obj.isCompleteTask( taskId, 2 ) );
83 }
84 
86 {
87  sleep_milliseconds( 10 );
88 
89  EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, milliseconds( 10 ) ), 1 );
90 }
91 
93 {
94  sleep_milliseconds( 200 );
95 
96  EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, 0 ), 1 );
97 }
98 
99 TEST_F(ReducerFunctionalObjectSphinxTest, NotCleanupByTaskParam )
100 {
101  EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, milliseconds( 100 ) ), 0 );
102 }
103 
105 {
106  sleep_milliseconds( 10 );
107 
108  EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, 0 ), 0 );
109 }
110 
112 {
113  std::vector< unsigned long long > tasks;
114 
115  sleep_milliseconds( 10 );
116 
117  obj.getExceededTTLTasks( 10, tasks, milliseconds( 10 ) );
118 
119  ASSERT_TRUE( tasks.size() == 1 );
120 
121 }
122 
124 {
125  sleep_milliseconds( 200 );
126 
127  std::vector< unsigned long long > tasks;
128 
129  obj.getExceededTTLTasks( 10, tasks, 0 );
130 
131  ASSERT_TRUE( tasks.size() == 1 );
132 
133 }
134 
135 
137 {
138 
139  sleep_milliseconds( 100 );
140 
141  std::vector< unsigned long long > tasks;
142 
143  obj.getExceededTTLTasks( 10, tasks, 0 );
144 
145  ASSERT_TRUE( tasks.size() == 0 );
146 
147 }
148 
150 {
151  sleep_milliseconds( 10 );
152 
153  std::vector< unsigned long long > tasks;
154 
155  obj.getExceededTTLTasks( 10, tasks, milliseconds( 50 ) );
156 
157  ASSERT_TRUE( tasks.size() == 0 );
158 
159 }
160 
161 TEST_F(ReducerFunctionalObjectSphinxTest, NoExpiredTasksInTTLQueue )
162 {
163  std::vector< unsigned long long > tasks;
164 
165  ASSERT_FALSE( obj.isCompleteTask( taskId, 2, milliseconds( 10 ) ) );
166  obj.reduce( taskId );
167  ASSERT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 50 ) ), 0U );
168 }
169 
170 TEST_F(ReducerFunctionalObjectSphinxTest, ExpiredTasksInTTLQueue )
171 {
172  sleep_milliseconds( 201 );
173 
174  std::vector< unsigned long long > tasks;
175 
176  ASSERT_TRUE( obj.isCompleteTask( taskId, 2 ) );
177  obj.reduce( taskId );
178  ASSERT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 10 ) ), 0U );
179  sleep_milliseconds( 10 );
180  ASSERT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 10 ) ), 1U );
181 }
182 
183 TEST_F(ReducerFunctionalObjectSphinxTest, RejectTerminateTaskAdd )
184 {
185  sleep_milliseconds( 201 );
186 
187  ASSERT_TRUE( obj.isCompleteTask( taskId, 2, milliseconds( 10 ) ) );
188  obj.reduce( taskId );
189  EXPECT_EQ( obj.getRejectedMessages(), 0 );
190  std::string jsonReducingInputMessage = buildJsonReducingInputMessage( 200 );
191  obj.accumulate( jsonReducingInputMessage, taskId );
192  EXPECT_EQ( obj.getRejectedMessages(), 1 );
193 }
194 
195 TEST_F(ReducerFunctionalObjectSphinxTest, AddTaskAfterCleanupTTLQueue )
196 {
197  sleep_milliseconds( 201 );
198 
199  ASSERT_TRUE( obj.isCompleteTask( taskId, 2, milliseconds( 1 ) ) );
200  obj.reduce( taskId );
201  EXPECT_EQ( obj.getRejectedMessages(), 0 );
202  std::string jsonReducingInputMessage = buildJsonReducingInputMessage( 100 );
203  obj.accumulate( jsonReducingInputMessage, taskId );
204  EXPECT_EQ( obj.getRejectedMessages(), 1 );
205 
206  sleep_milliseconds( 10 );
207  EXPECT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 10 ) ), 1U );
208  obj.accumulate( jsonReducingInputMessage, taskId );
209  EXPECT_EQ( obj.getRejectedMessages(), 1 );
210 }
211 
212