17 #ifndef __HCE_CLIENTS_QUEUE_MANAGER_INCLUDED__
18 #define __HCE_CLIENTS_QUEUE_MANAGER_INCLUDED__
31 _refreshNotFoundCounter(0), _purgeMode(purgeMode),
35 _resourceUsageManager=resourceUsageManager;
39 void append(
const std::string &identity,
bool checkPresence =
true){
44 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
45 if(it->identity.compare(identity) == 0){
56 _queue->push_back(worker);
64 void append(
const std::string& identity,
const std::vector<ClientWorkerItem>& route,
const std::string& resources,
bool checkPresence =
true){
69 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
70 if(it->identity.compare(identity) == 0){
72 if (!resources.empty() && _resourceUsageManager){
73 _resourceUsageManager->
add(it->identity, it->resources, resources, _maxResourcesUsageCollectedSize);
86 if (!resources.empty() && _resourceUsageManager){
87 _resourceUsageManager->
add(worker.
identity, worker.
resources, resources, _maxResourcesUsageCollectedSize);
89 _queue->push_back(worker);
97 void append(
const std::string& identity,
const std::vector<ClientWorkerItem>& route,
const ResourceUsageCollection& resources,
bool checkPresence =
true){
102 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
103 if(it->identity.compare(identity) == 0){
105 it->resources = resources;
116 worker.
route = route;
118 _queue->push_back(worker);
126 void append(
const std::vector<ClientWorkerItem>& queue){
127 for (
size_t i=0;i<queue.size();++i)
129 append(queue[i].identity, queue[i].route, queue[i].resources);
134 void append(
const std::string& identity,
const std::string& property,
bool checkPresence =
true){
139 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
140 if(it->identity.compare(identity) == 0){
141 it->property = property;
153 _queue->push_back(worker);
161 void remove(
const std::string &identity){
162 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
163 if(it->identity.compare(identity) == 0){
164 it = _queue->erase(it);
174 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
175 if(it->identity.compare(identity) == 0){
176 it->expiry = s_clock() + _heartbeatTimeout;
177 it->rtimestamp = s_clock();
188 _refreshNotFoundCounter++;
195 bool refresh(
const std::string &identity,
const std::vector<ClientWorkerItem>& route,
const std::string& resources=
""){
198 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
199 if(it->identity.compare(identity) == 0){
200 it->expiry = s_clock() + _heartbeatTimeout;
201 it->rtimestamp = s_clock();
203 if (!resources.empty() && _resourceUsageManager){
204 _resourceUsageManager->
add(it->identity, it->resources, resources, _maxResourcesUsageCollectedSize);
214 append(identity, route, resources);
216 _refreshNotFoundCounter++;
223 bool refresh(
const std::string& identity,
const std::string& property){
226 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
227 if(it->identity.compare(identity) == 0){
228 it->property = property;
237 append(identity, property);
239 _refreshNotFoundCounter++;
246 std::stringstream
ret(
"");
248 int64_t clock = s_clock();
249 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it != _queue->end(); ++it){
251 it->shiftCounter.responceTimeStamp(it->rtimestamp);
252 it->shiftCounter.recount(_heartbeatTimeout);
255 if(clock > it->expiry+_heartbeatTimeout + it->shiftCounter.getShift()){
256 ret <<
"{\"identity\":\"" << it->identity <<
"\",\"expiry\":" << it->expiry
257 <<
",\"count\":" << it->count <<
",\"frequency\":" << it->frequency
258 <<
",\"shift\":" << it->shiftCounter.getShift()
260 it->shiftCounter.reset();
263 it = _queue->erase(it) - 1;
272 bool exists(
const std::string &identity){
275 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
276 if(it->identity.compare(identity) == 0){
287 std::stringstream stream;
288 stream << _purgeCounter <<
"," << _refreshNotFoundCounter <<
"," << _queue->size() <<
",";
289 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
290 stream << it->identity <<
" " << it->count <<
" " << it->frequency <<
" " << it->expiry <<
" " << it->cpu <<
" " << it->iowait <<
",";
299 adminClientsSerializator.
serialize(json);
305 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
306 it->count += it->frequency;
313 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
314 if(it->identity.compare(identity) == 0){
324 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
325 if(it->identity.compare(identity) == 0){
326 result = it->property;
336 for(std::vector<ClientWorkerItem>::iterator it = _queue->begin(); it < _queue->end(); ++it){
337 if(it->identity.compare(identity) == 0){
338 result = it->resources;
352 _purgeMode = purgeMode;
357 return _heartbeatTimeout;
362 _heartbeatTimeout = heartbeatTimeout;
367 return _maxResourcesUsageCollectedSize;
372 _maxResourcesUsageCollectedSize = maxResourcesUsageCollectedSize;
377 std::vector<ClientWorkerItem>* _queue;
379 unsigned long long _refreshNotFoundCounter;
380 unsigned int _purgeMode;
381 unsigned long long _purgeCounter;
382 unsigned int _heartbeatTimeout;
385 unsigned int _maxResourcesUsageCollectedSize;