hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ReducingTaskTTLManager.hpp
Go to the documentation of this file.
1 
14 #ifndef REDUCINGTASKTTLMANAGER_HPP_
15 #define REDUCINGTASKTTLMANAGER_HPP_
16 
17 #include <map>
18 #include <vector>
19 #include <set>
20 #include <Poco/Timestamp.h>
21 
22 namespace HCE
23 {
24 namespace reduce
25 {
26 
27 const int ONE_THOUSAND_MICROSECONDS = 1000;
28 const unsigned int USE_TASK_TTL_VALUE = 0;
29 
31 public:
32  void addTaskTTL( unsigned long long taskId, unsigned int ttl ) {
33  //if task is already terminated, don't add it
34  if( isTaskTerminatedByTTL( taskId ) ) {
35  return;
36  }
37 
38  auto iter = _runningTasks.find( taskId );
39 
40  if( iter == _runningTasks.end() ) {
41  //insert the task TTL info
42  _runningTasks[ taskId ] = std::make_pair( Poco::Timestamp(), ttl );
43  } else {
44  //simply update the TTL value if the task is already inserted
45  iter->second.second = ttl;
46  }
47  }
48 
49  void removeTaskTTL( unsigned long long taskId ) {
50  auto iter = _runningTasks.find( taskId );
51  if( iter != _runningTasks.end() ) {
52  //if it is expired by TTL, move it to the terminated TTL queue
53  if( iter->second.first.isElapsed( (iter->second.second) * ONE_THOUSAND_MICROSECONDS ) ) {
54  addTerminatedTask(taskId );
55  }
56  _runningTasks.erase( iter );
57  }
58  }
59 
60  unsigned int getTaskTTL( unsigned long long taskId ) const {
61  auto iter = _runningTasks.find( taskId );
62  if( iter != _runningTasks.end() ) {
63  return iter->second.second;
64  }
65  return 0;
66  }
67 
68 
69  int cleanupExpiredTasksByTTL(unsigned int maxRemoveTasks, unsigned int ttl = USE_TASK_TTL_VALUE) {
70  std::vector< unsigned long long > expiredTasks;
71 
72  getExceededTTLTasks( maxRemoveTasks, expiredTasks, ttl );
73 
74  for( int i = 0, n = expiredTasks.size(); i < n; i++ ) {
75  _runningTasks.erase( expiredTasks[i] );
76  //insert into the terminated tasks
77  addTerminatedTask( expiredTasks[i] );
78  }
79 
80  return expiredTasks.size();
81  }
82 
83  bool isTaskTerminatedByTTL( unsigned long long taskId ) const {
84  return _terminatedTasks.find( taskId ) != _terminatedTasks.end();
85  }
86 
87  unsigned int cleanupExpiredTasksByTTLQueue( unsigned int maxRemoveItems, unsigned int minTerminateTime ) {
88  unsigned int i = 0, n = _terminatedTasksTime.size();
89 
90  for( ; i < n && i < maxRemoveItems && _terminatedTasksTime[i].second.isElapsed( minTerminateTime * ONE_THOUSAND_MICROSECONDS ); i++ );
91 
92  if( i > 0 ) {
93  for(unsigned int j = 0; j < i; j++ ) {
94  _terminatedTasks.erase( _terminatedTasksTime[j].first );
95  }
96  _terminatedTasksTime.erase( _terminatedTasksTime.begin(), _terminatedTasksTime.begin() + i );
97  }
98  return i;
99  }
100 
101  bool isTaskExpired( unsigned long long taskId, unsigned int ttl = USE_TASK_TTL_VALUE) {
102  std::map< unsigned long long, std::pair< Poco::Timestamp, unsigned int > >::const_iterator iter = _runningTasks.find( taskId );
103 
104  return (iter == _runningTasks.end() )? false: iter->second.first.isElapsed( ( ( ttl > 0 )?ttl:iter->second.second ) * ONE_THOUSAND_MICROSECONDS );
105  }
106 
107  void getExceededTTLTasks( unsigned int maxTaskNumber, std::vector<unsigned long long>& exceededTTLTasks, unsigned int ttl = 0 ) const {
108  std::map< unsigned long long, std::pair< Poco::Timestamp, unsigned int > >::const_iterator iter;
109  unsigned int taskCount = 0;
110 
111  for( iter = _runningTasks.begin(); iter != _runningTasks.end(); ++iter ) {
112  if( iter->second.first.isElapsed( (( ttl > 0 )?ttl: iter->second.second )*ONE_THOUSAND_MICROSECONDS ) ) {
113  exceededTTLTasks.push_back( iter->first );
114  taskCount++;
115  if( taskCount >= maxTaskNumber ) {
116  break;
117  }
118  }
119  }
120  }
121 private:
122  void addTerminatedTask( unsigned long long taskId ) {
123  _terminatedTasks.insert( taskId );
124  _terminatedTasksTime.push_back( std::make_pair( taskId, Poco::Timestamp() ) );
125  }
126 private:
127  //map between <taskId, <task create time, task ttl > >
128  std::map< unsigned long long, std::pair< Poco::Timestamp, unsigned int > > _runningTasks;
129  std::set< unsigned long long > _terminatedTasks;
130  //vector of <taskid, task terminate time>
131  std::vector< std::pair< unsigned long long, Poco::Timestamp > > _terminatedTasksTime;
132 
133 };
134 
135 }
136 }
137 
138 #endif/*REDUCINGTASKTTLMANAGER_HPP_*/