8 #include <gtest/gtest.h>
16 #include <Poco/Thread.h>
20 using namespace HCE::reduce;
25 using namespace HCE::sphinx::reduce_task;
32 Poco::SharedPtr<ReducingInputMessage> inputMessage(
new ReducingInputMessage(types::MessageType::mtSphinx,
33 milliseconds_ttl, str_result ) );
43 obj.accumulate( jsonReducingInputMessage,
taskId );
50 Poco::Thread::sleep( milliseconds );
63 ASSERT_TRUE( obj.isCompleteTask(
taskId, 1, milliseconds( 200 ) ) );
64 ASSERT_TRUE( obj.isCompleteTask(
taskId, 1 ) );
68 sleep_milliseconds( 10 );
69 ASSERT_TRUE( obj.isCompleteTask(
taskId, 2, milliseconds( 10 ) ) );
74 sleep_milliseconds( 200 );
75 ASSERT_TRUE( obj.isCompleteTask(
taskId, 2 ) );
80 ASSERT_FALSE( obj.isCompleteTask(
taskId, 2, milliseconds( 300 ) ) );
85 ASSERT_FALSE( obj.isCompleteTask(
taskId, 2 ) );
90 sleep_milliseconds( 10 );
92 EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, milliseconds( 10 ) ), 1 );
97 sleep_milliseconds( 200 );
99 EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, 0 ), 1 );
104 EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, milliseconds( 100 ) ), 0 );
109 sleep_milliseconds( 10 );
111 EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, 0 ), 0 );
116 std::vector< unsigned long long > tasks;
118 sleep_milliseconds( 10 );
120 obj.getExceededTTLTasks( 10, tasks, milliseconds( 10 ) );
122 ASSERT_TRUE( tasks.size() == 1 );
128 sleep_milliseconds( 200 );
130 std::vector< unsigned long long > tasks;
132 obj.getExceededTTLTasks( 10, tasks, 0 );
134 ASSERT_TRUE( tasks.size() == 1 );
142 sleep_milliseconds( 100 );
144 std::vector< unsigned long long > tasks;
146 obj.getExceededTTLTasks( 10, tasks, 0 );
148 ASSERT_TRUE( tasks.size() == 0 );
154 sleep_milliseconds( 10 );
156 std::vector< unsigned long long > tasks;
158 obj.getExceededTTLTasks( 10, tasks, milliseconds( 50 ) );
160 ASSERT_TRUE( tasks.size() == 0 );
166 std::vector< unsigned long long > tasks;
168 ASSERT_FALSE( obj.isCompleteTask(
taskId, 2, milliseconds( 10 ) ) );
170 ASSERT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 50 ) ), 0U );
175 sleep_milliseconds( 201 );
177 std::vector< unsigned long long > tasks;
179 ASSERT_TRUE( obj.isCompleteTask(
taskId, 2 ) );
181 ASSERT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 10 ) ), 0U );
182 sleep_milliseconds( 10 );
183 ASSERT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 10 ) ), 1U );
188 sleep_milliseconds( 201 );
190 ASSERT_TRUE( obj.isCompleteTask(
taskId, 2, milliseconds( 10 ) ) );
192 EXPECT_EQ( obj.getRejectedMessages(), 0 );
194 obj.accumulate( jsonReducingInputMessage,
taskId );
195 EXPECT_EQ( obj.getRejectedMessages(), 1 );
200 sleep_milliseconds( 201 );
202 ASSERT_TRUE( obj.isCompleteTask(
taskId, 2, milliseconds( 1 ) ) );
204 EXPECT_EQ( obj.getRejectedMessages(), 0 );
206 obj.accumulate( jsonReducingInputMessage,
taskId );
207 EXPECT_EQ( obj.getRejectedMessages(), 1 );
209 sleep_milliseconds( 10 );
210 EXPECT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 10 ) ), 1U );
211 obj.accumulate( jsonReducingInputMessage,
taskId );
212 EXPECT_EQ( obj.getRejectedMessages(), 1 );