8 #include <gtest/gtest.h>
16 #include <Poco/Thread.h>
23 using namespace HCE::sphinx::reduce_task;
30 Poco::SharedPtr<ReducingInputMessage> inputMessage(
new ReducingInputMessage(1, milliseconds_ttl, str_result ) );
40 obj.accumulate( jsonReducingInputMessage,
taskId );
47 Poco::Thread::sleep( milliseconds );
60 ASSERT_TRUE( obj.isCompleteTask(
taskId, 1, milliseconds( 200 ) ) );
61 ASSERT_TRUE( obj.isCompleteTask(
taskId, 1 ) );
65 sleep_milliseconds( 10 );
66 ASSERT_TRUE( obj.isCompleteTask(
taskId, 2, milliseconds( 10 ) ) );
71 sleep_milliseconds( 200 );
72 ASSERT_TRUE( obj.isCompleteTask(
taskId, 2 ) );
77 ASSERT_FALSE( obj.isCompleteTask(
taskId, 2, milliseconds( 300 ) ) );
82 ASSERT_FALSE( obj.isCompleteTask(
taskId, 2 ) );
87 sleep_milliseconds( 10 );
89 EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, milliseconds( 10 ) ), 1 );
94 sleep_milliseconds( 200 );
96 EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, 0 ), 1 );
101 EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, milliseconds( 100 ) ), 0 );
106 sleep_milliseconds( 10 );
108 EXPECT_EQ( obj.cleanupExpiredTasksByTTL( 10, 0 ), 0 );
113 std::vector< unsigned long long > tasks;
115 sleep_milliseconds( 10 );
117 obj.getExceededTTLTasks( 10, tasks, milliseconds( 10 ) );
119 ASSERT_TRUE( tasks.size() == 1 );
125 sleep_milliseconds( 200 );
127 std::vector< unsigned long long > tasks;
129 obj.getExceededTTLTasks( 10, tasks, 0 );
131 ASSERT_TRUE( tasks.size() == 1 );
139 sleep_milliseconds( 100 );
141 std::vector< unsigned long long > tasks;
143 obj.getExceededTTLTasks( 10, tasks, 0 );
145 ASSERT_TRUE( tasks.size() == 0 );
151 sleep_milliseconds( 10 );
153 std::vector< unsigned long long > tasks;
155 obj.getExceededTTLTasks( 10, tasks, milliseconds( 50 ) );
157 ASSERT_TRUE( tasks.size() == 0 );
163 std::vector< unsigned long long > tasks;
165 ASSERT_FALSE( obj.isCompleteTask(
taskId, 2, milliseconds( 10 ) ) );
167 ASSERT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 50 ) ), 0U );
172 sleep_milliseconds( 201 );
174 std::vector< unsigned long long > tasks;
176 ASSERT_TRUE( obj.isCompleteTask(
taskId, 2 ) );
178 ASSERT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 10 ) ), 0U );
179 sleep_milliseconds( 10 );
180 ASSERT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 10 ) ), 1U );
185 sleep_milliseconds( 201 );
187 ASSERT_TRUE( obj.isCompleteTask(
taskId, 2, milliseconds( 10 ) ) );
189 EXPECT_EQ( obj.getRejectedMessages(), 0 );
191 obj.accumulate( jsonReducingInputMessage,
taskId );
192 EXPECT_EQ( obj.getRejectedMessages(), 1 );
197 sleep_milliseconds( 201 );
199 ASSERT_TRUE( obj.isCompleteTask(
taskId, 2, milliseconds( 1 ) ) );
201 EXPECT_EQ( obj.getRejectedMessages(), 0 );
203 obj.accumulate( jsonReducingInputMessage,
taskId );
204 EXPECT_EQ( obj.getRejectedMessages(), 1 );
206 sleep_milliseconds( 10 );
207 EXPECT_EQ( obj.cleanupExpiredTasksByTTLQueue( 10, milliseconds( 10 ) ), 1U );
208 obj.accumulate( jsonReducingInputMessage,
taskId );
209 EXPECT_EQ( obj.getRejectedMessages(), 1 );