hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ClientsQueueManager.hpp
Go to the documentation of this file.
1 
17 #ifndef __HCE_CLIENTS_QUEUE_MANAGER_INCLUDED__
18 #define __HCE_CLIENTS_QUEUE_MANAGER_INCLUDED__
19 
20 #include "HandlerTypes.hpp"
22 #include "ResourceUsageManager.hpp"
23 
24 namespace HCE {
25  namespace handlers {
26 
28  public:
29  //Purge mode: 0 - purge client from queue if expired, 1 - not purge but increment counter only
30  ClientsQueueManager(std::vector<ClientWorkerItem>* queue, unsigned int purgeMode, HCE::handlers::ResourceUsageManager* resourceUsageManager=nullptr) :
31  _refreshNotFoundCounter(0), _purgeMode(purgeMode),
32  _purgeCounter(0), _heartbeatTimeout(HEARTBEAT_TIMEOUT),
33  _maxResourcesUsageCollectedSize(HCE::handlers::RESOURCES_USAGE_COLLECTED_SIZE){
34  _queue = queue;
35  _resourceUsageManager=resourceUsageManager;
36  }
37 
39  void append(const std::string &identity, bool checkPresence = true){
40  bool found = false;
41 
42  if(checkPresence){
43  //Check allworkers by identity
44  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
45  if(it->identity.compare(identity) == 0){
46  found = true;
47  break;
48  }
49  }
50  }
51 
52  //Not found
53  if(!found){
54  //Add new worker
55  ClientWorkerItem worker(identity, (s_clock() + _heartbeatTimeout));
56  _queue->push_back(worker);
57 
58  //Reset counters to balance requests proportional way after new worker insertion
60  }
61  }
62 
64  void append(const std::string& identity, const std::vector<ClientWorkerItem>& route, const std::string& resources, bool checkPresence = true){
65  bool found = false;
66 
67  if(checkPresence){
68  //Check allworkers by identity
69  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
70  if(it->identity.compare(identity) == 0){
71  it->route = route;
72  if (!resources.empty() && _resourceUsageManager){
73  _resourceUsageManager->add(it->identity, it->resources, resources, _maxResourcesUsageCollectedSize);
74  }
75  found = true;
76  break;
77  }
78  }
79  }
80 
81  //Not found
82  if(!found){
83  //Add new worker
84  ClientWorkerItem worker(identity, (s_clock() + getHeartbeatTimeout()));
85  worker.route = route;
86  if (!resources.empty() && _resourceUsageManager){
87  _resourceUsageManager->add(worker.identity, worker.resources, resources, _maxResourcesUsageCollectedSize);
88  }
89  _queue->push_back(worker);
90 
91  //Reset counters to balance requests proportional way after new worker insertion
93  }
94  }
95 
97  void append(const std::string& identity, const std::vector<ClientWorkerItem>& route, const ResourceUsageCollection& resources, bool checkPresence = true){
98  bool found = false;
99 
100  if(checkPresence){
101  //Check allworkers by identity
102  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
103  if(it->identity.compare(identity) == 0){
104  it->route = route;
105  it->resources = resources;
106  found = true;
107  break;
108  }
109  }
110  }
111 
112  //Not found
113  if(!found){
114  //Add new worker
115  ClientWorkerItem worker(identity, (s_clock() + getHeartbeatTimeout()));
116  worker.route = route;
117  worker.resources = resources;
118  _queue->push_back(worker);
119 
120  //Reset counters to balance requests proportional way after new worker insertion
122  }
123  }
124 
126  void append(const std::vector<ClientWorkerItem>& queue){
127  for (size_t i=0;i<queue.size();++i)
128  {
129  append(queue[i].identity, queue[i].route, queue[i].resources);
130  }
131  }
132 
134  void append(const std::string& identity, const std::string& property, bool checkPresence = true){
135  bool found = false;
136 
137  if(checkPresence){
138  //Check allworkers by identity
139  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
140  if(it->identity.compare(identity) == 0){
141  it->property = property;
142  found = true;
143  break;
144  }
145  }
146  }
147 
148  //Not found
149  if(!found){
150  //Add new worker
151  ClientWorkerItem worker(identity, (s_clock() + getHeartbeatTimeout()));
152  worker.property = property;
153  _queue->push_back(worker);
154 
155  //Reset counters to balance requests proportional way after new worker insertion
157  }
158  }
159 
161  void remove(const std::string &identity){
162  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
163  if(it->identity.compare(identity) == 0){
164  it = _queue->erase(it);
165  break;
166  }
167  }
168  }
169 
171  bool refresh(const std::string &identity){
172  bool found = false;
173 
174  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
175  if(it->identity.compare(identity) == 0){
176  it->expiry = s_clock() + _heartbeatTimeout;
177  it->rtimestamp = s_clock();
178  found = true;
179  break;
180  }
181  }
182 
183  //TODO: experimental
184  if(!found){
185  //If not found - add as new and fresh
186  append(identity);
187 
188  _refreshNotFoundCounter++;
189  }
190 
191  return found;
192  }
193 
195  bool refresh(const std::string &identity, const std::vector<ClientWorkerItem>& route, const std::string& resources=""){
196  bool found = false;
197 
198  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
199  if(it->identity.compare(identity) == 0){
200  it->expiry = s_clock() + _heartbeatTimeout;
201  it->rtimestamp = s_clock();
202  it->route = route;
203  if (!resources.empty() && _resourceUsageManager){
204  _resourceUsageManager->add(it->identity, it->resources, resources, _maxResourcesUsageCollectedSize);
205  }
206  found = true;
207  break;
208  }
209  }
210 
211  //TODO: experimental
212  if(!found){
213  //If not found - add as new and fresh
214  append(identity, route, resources);
215 
216  _refreshNotFoundCounter++;
217  }
218 
219  return found;
220  }
221 
223  bool refresh(const std::string& identity, const std::string& property){
224  bool found = false;
225 
226  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
227  if(it->identity.compare(identity) == 0){
228  it->property = property;
229  found = true;
230  break;
231  }
232  }
233 
234  //TODO: experimental
235  if(!found){
236  //If not found - add as new and fresh
237  append(identity, property);
238 
239  _refreshNotFoundCounter++;
240  }
241  return found;
242  }
243 
245  std::string purge(void){
246  std::stringstream ret("");
247 
248  int64_t clock = s_clock();
249  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it != _queue->end(); ++it){
250 
251  it->shiftCounter.responceTimeStamp(it->rtimestamp);
252  it->shiftCounter.recount(_heartbeatTimeout);
253 
254 // if(clock > it->expiry){ //old
255  if(clock > it->expiry+_heartbeatTimeout + it->shiftCounter.getShift()){
256  ret << "{\"identity\":\"" << it->identity << "\",\"expiry\":" << it->expiry
257  << ",\"count\":" << it->count << ",\"frequency\":" << it->frequency
258  << ",\"shift\":" << it->shiftCounter.getShift()
259  << "},";
260  it->shiftCounter.reset();
261 
262  if(_purgeMode == 0){
263  it = _queue->erase(it) - 1;
264  }
265  _purgeCounter++;
266  }
267  }
268  return ret.str();
269  }
270 
272  bool exists(const std::string &identity){
273  bool ret = false;
274 
275  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
276  if(it->identity.compare(identity) == 0){
277  ret = true;
278  break;
279  }
280  }
281 
282  return ret;
283  }
284 
286  std::string getDump(void) const {
287  std::stringstream stream;
288  stream << _purgeCounter << "," << _refreshNotFoundCounter << "," << _queue->size() << ",";
289  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
290  stream << it->identity << " " << it->count << " " << it->frequency << " " << it->expiry << " " << it->cpu << " " << it->iowait << ",";
291  }
292  return stream.str();
293  }
294 
296  std::string getAdminClients(void){
297  std::string json;
298  AdminClientsSerializator adminClientsSerializator(*_queue, _purgeCounter, _refreshNotFoundCounter);
299  adminClientsSerializator.serialize(json);
300  return json;
301  }
302 
304  void resetFrequencies(void){
305  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
306  it->count += it->frequency;
307  it->frequency = 0;
308  }
309  }
310 
312  void incrementFrequency(const std::string &identity){
313  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
314  if(it->identity.compare(identity) == 0){
315  ++(it->frequency);
316  break;
317  }
318  }
319  }
320 
322  std::string getProperty(const std::string &identity){
323  std::string result;
324  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
325  if(it->identity.compare(identity) == 0){
326  result = it->property;
327  break;
328  }
329  }
330  return result;
331  }
332 
334  ResourceUsageCollection getResources(const std::string &identity){
336  for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
337  if(it->identity.compare(identity) == 0){
338  result = it->resources;
339  break;
340  }
341  }
342  return result;
343  }
344 
346  unsigned int getPurgeMode(void) const{
347  return _purgeMode;
348  }
349 
351  void setPurgeMode(const unsigned int purgeMode){
352  _purgeMode = purgeMode;
353  }
354 
356  unsigned int getHeartbeatTimeout(void) const{
357  return _heartbeatTimeout;
358  }
359 
361  void setHeartbeatTimeout(unsigned int heartbeatTimeout){
362  _heartbeatTimeout = heartbeatTimeout;
363  }
364 
366  unsigned int getMaxResourcesUsageCollectedSize(void) const{
367  return _maxResourcesUsageCollectedSize;
368  }
369 
371  void setMaxResourcesUsageCollectedSize(unsigned int maxResourcesUsageCollectedSize){
372  _maxResourcesUsageCollectedSize = maxResourcesUsageCollectedSize;
373  }
374 
375  private:
377  std::vector<ClientWorkerItem>* _queue;
379  unsigned long long _refreshNotFoundCounter;
380  unsigned int _purgeMode;
381  unsigned long long _purgeCounter;
382  unsigned int _heartbeatTimeout;
383 
385  unsigned int _maxResourcesUsageCollectedSize;
387  HCE::handlers::ResourceUsageManager* _resourceUsageManager;
388  };
389  }
390 }
391 
392 #endif /* __HCE_CLIENTS_QUEUE_MANAGER_INCLUDED__ */