hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DRCEResourceMonitor.cpp
Go to the documentation of this file.
1 #include <utility>
2 #include <Poco/Logger.h>
3 #include <Poco/Timestamp.h>
4 
5 #include "DRCEResultData.hpp"
7 #include "DRCEMessageConst.hpp"
8 #include "Process.hpp"
11 
12 namespace HCE
13 {
14 namespace drce
15 {
16 //-----------------------------------------------------------------------------
18 : Poco::Runnable(), thread(), mutex(), terminated(false), timePeriod(DEFAULT_TIME_PERIOD),
19  asyncTasksQueue(asyncTasksQueue_), message(message_), statVariables(), counterStatVariables(), avgCounts()
20 {
22  thread.start(*this);
23 }
24 //-----------------------------------------------------------------------------
26 {
27  terminate();
28  thread.join();
30 }
31 //-----------------------------------------------------------------------------
33 {
34  Poco::Mutex::ScopedLock lock(mutex);
35  terminated = true;
36 }
37 //-----------------------------------------------------------------------------
39 {
40  Poco::Mutex::ScopedLock lock(mutex);
41  return terminated;
42 }
43 //-----------------------------------------------------------------------------
44 void DRCEResourceMonitor::setTimePeriod(unsigned int timePeriod_)
45 {
46  Poco::Mutex::ScopedLock lock(mutex);
47  timePeriod=timePeriod_;
48 }
49 //-----------------------------------------------------------------------------
51 {
52  Poco::Mutex::ScopedLock lock(mutex);
53  return timePeriod;
54 }
55 //-----------------------------------------------------------------------------
56 void DRCEResourceMonitor::setStatVariables(StatVariables&& rhs)
57 {
58  Poco::Mutex::ScopedLock lock(mutex);
59  statVariables = std::forward<StatVariables>(rhs);
60 }
61 //-----------------------------------------------------------------------------
63 {
64  Poco::Mutex::ScopedLock lock(mutex);
65  return statVariables;
66 }
67 //-----------------------------------------------------------------------------
68 void DRCEResourceMonitor::dump(StatVariables& statVariables, AvgCounts& avgCounts)
69 {
70  Poco::Mutex::ScopedLock lock(mutex);
71  statVariables = this->statVariables;
72  avgCounts = this->avgCounts;
73 }
74 //-----------------------------------------------------------------------------
75 void DRCEResourceMonitor::restore(const StatVariables& statVariables, const AvgCounts& avgCounts)
76 {
77  Poco::Mutex::ScopedLock lock(mutex);
78  this->statVariables = statVariables;
79  this->counterStatVariables = statVariables;
80  this->avgCounts = avgCounts;
81 }
82 //-----------------------------------------------------------------------------
84 {
85  fillFieldsArray(resultData, asyncTasksQueue, message, this);
86 }
87 //-----------------------------------------------------------------------------
88 void DRCEResourceMonitor::fillFieldsArray(DRCEResultData& resultData, DRCEAsyncTasksQueue& asyncTasksQueue_, CustomMessage& message_, DRCEResourceMonitor* pResourceMonitor)
89 {
90  for (size_t i=0;i<resultData.getItemsCount();++i)
91  {
92  DRCEResultDataItem resultDataItem(resultData.getDataItem(i));
93  if (fillFieldsArray(resultDataItem, asyncTasksQueue_, message_, pResourceMonitor))
94  resultData.setDataItem(i, resultDataItem);
95  }
96 }
97 //-----------------------------------------------------------------------------
98 bool DRCEResourceMonitor::fillFieldsArray(DRCEResultDataItem& resultDataItem, DRCEAsyncTasksQueue& asyncTasksQueue_, CustomMessage& message_, DRCEResourceMonitor* pResourceMonitor)
99 {
100  if (!asyncTasksQueue_.isExistAsyncTask(resultDataItem.getRequestId()))
101  {
102  std::stringstream outMsg;
103  outMsg << "DRCEResourceMonitor::fillFieldsArray task [ " << resultDataItem.getRequestId() << " ] already not exist...";
104  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
105  return false;
106  }
107 
108  AsyncTasks asyncTask = asyncTasksQueue_.getAsyncTask(resultDataItem.getRequestId());
109 
110  if (!asyncTask.pid)
111  {
112  std::stringstream outMsg;
113  outMsg << "DRCEResourceMonitor::fillFieldsArray task [ " << resultDataItem.getRequestId() << " ] pid is empty ...";
114  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
115  return false;
116  }
117 
118  if (asyncTask.pid && !HCE::Process::isExistProcess(asyncTask.pid))
119  {
120  std::stringstream outMsg;
121  outMsg << "DRCEResourceMonitor::fillFieldsArray task [ " << resultDataItem.getRequestId() << " ] has pid [ " << asyncTask.pid << " ] but process already not exist ...";
122  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
123  return false;
124  }
125 
126  ProcessInfoExtractor processInfoExtractor;
127  ProcessInfoExtractor::FieldsArray fieldsArray = processInfoExtractor.extract(asyncTask);
128 
129  std::stringstream outMsg;
130  outMsg << "DRCEResourceMonitor::fillFieldsArray task [ " << resultDataItem.getRequestId() << " ] has state: " << static_cast<int>(asyncTask.state);
131  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
132 
133  try
134  {
136  std::map<std::string, std::string> fields;
137  fields.insert(std::map<std::string, std::string>::value_type(drce_const::taskCpu, trim(fieldsArray.cpu)));
138  fields.insert(std::map<std::string, std::string>::value_type(drce_const::taskVram, std::to_string(fieldsArray.vram)));
139  fields.insert(std::map<std::string, std::string>::value_type(drce_const::taskRram, std::to_string(fieldsArray.rram)));
140  fields.insert(std::map<std::string, std::string>::value_type(drce_const::taskThreads, std::to_string(fieldsArray.threads)));
141 
143  fields.insert(std::map<std::string, std::string>::value_type(drce_const::hostCpu, trim(resourceUsage.cpu)));
144  fields.insert(std::map<std::string, std::string>::value_type(drce_const::hostIowait, trim(resourceUsage.iowait)));
145  fields.insert(std::map<std::string, std::string>::value_type(drce_const::hostVram, trim(resourceUsage.vramUsedPercent)));
146  fields.insert(std::map<std::string, std::string>::value_type(drce_const::hostRram, trim(resourceUsage.rramUsedPercent)));
147  fields.insert(std::map<std::string, std::string>::value_type(drce_const::hostDisk, trim(resourceUsage.diskUsedPercent)));
148  fields.insert(std::map<std::string, std::string>::value_type(drce_const::hostProcesses, trim(resourceUsage.processesPercent)));
149  fields.insert(std::map<std::string, std::string>::value_type(drce_const::hostThreads, trim(resourceUsage.threadsPercent)));
150  fields.insert(std::map<std::string, std::string>::value_type(drce_const::hostLoadAverage, trim(resourceUsage.loadAverage)));
151 
152  resultDataItem.setFields(fields);
153 
154  // save values Cpu and Memory usage
155  if (pResourceMonitor)
156  {
157  pResourceMonitor->addCpuUsageAsyncTasks(fieldsArray.cpu);
158  if (fieldsArray.rram)
159  pResourceMonitor->addMemoryUsageAsyncTasks(fieldsArray.rram);
160  }
161  }
162  catch(std::exception& e)
163  {
164  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message_(message_const::ERROR, e.what()), Poco::Message::Priority::PRIO_ERROR));
165  return false;
166  }
167  return true;
168 }
169 //-----------------------------------------------------------------------------
171 {
172  const unsigned int periodSleep = 1000; // msec
173  Poco::Timestamp tsMinuteCounter;
174  size_t countSyncTasks(0), countAsyncTasks(0);
175 
176  while(!isTerminated())
177  {
178  unsigned int periodMs = getTimePeriod();
179  if (periodMs > periodSleep)
180  {
181  unsigned int repeat = periodMs/periodSleep; // msec to sec
182  for (unsigned int i=0;i<repeat;++i)
183  {
184  Poco::Thread::sleep(periodSleep);
185  if (isTerminated())
186  return;
187  countTasksForMinute(tsMinuteCounter, countSyncTasks, countAsyncTasks);
188  }
189  }
190  else
191  {
192  Poco::Thread::sleep(periodMs);
193  countTasksForMinute(tsMinuteCounter, countSyncTasks, countAsyncTasks);
194  }
195  try
196  {
197  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, "!!! DRCEResourceMonitor::run !!!", Poco::Message::Priority::PRIO_TRACE));
198 
200  DRCEResultData resultData = asyncTasksQueue.getTasksAsResultData();
201  if (resultData.getItemsCount())
202  {
206  fillFieldsArray(resultData);
207  asyncTasksQueue.executeNotification(resultData, DRCETaskRequest::RequestType::rtResourceMonitorNotification);
208  }
210  setStatVariables(std::forward<StatVariables>(recountStatVariables()));
211  }
212  catch(Poco::Exception& e)
213  {
214  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::ERROR, e.displayText()), Poco::Message::Priority::PRIO_ERROR));
215  }
216  catch(std::exception& e)
217  {
218  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::ERROR, e.what()), Poco::Message::Priority::PRIO_ERROR));
219  }
220  }
221 }
222 //-----------------------------------------------------------------------------
224 {
225  addStatValue<double>(static_cast<double>(value), counterStatVariables.timeAsyncTask, avgCounts.timeAsyncTaskCount);
226 }
227 //-----------------------------------------------------------------------------
229 {
230  addStatValue<double>(static_cast<double>(value), counterStatVariables.timeSyncTask, avgCounts.timeSyncTaskCount);
231 }
232 //-----------------------------------------------------------------------------
234 {
235  addStatValue(value, counterStatVariables.sizeInputBufferSyncTasks, avgCounts.sizeInputBufferSyncTasksCount);
236 }
237 //-----------------------------------------------------------------------------
239 {
240  addStatValue(value, counterStatVariables.sizeInputBufferAsyncTasks, avgCounts.sizeInputBufferAsyncTasksCount);
241 }
242 //-----------------------------------------------------------------------------
244 {
245  addStatValue(value, counterStatVariables.sizeOutputBufferSyncTasks, avgCounts.sizeOutputBufferSyncTasksCount);
246 }
247 //-----------------------------------------------------------------------------
249 {
250  addStatValue(value, counterStatVariables.sizeOutputBufferAsyncTasks, avgCounts.sizeOutputBufferAsyncTasksCount);
251 }
252 //-----------------------------------------------------------------------------
254 {
255  addStatValue<double>(value, counterStatVariables.cpuUsageAsyncTasks, avgCounts.cpuUsageAsyncTasksCount);
256 }
257 //-----------------------------------------------------------------------------
259 {
260  addStatValue(value, counterStatVariables.memoryUsageAsyncTasks, avgCounts.memoryUsageAsyncTasksCount);
261 }
262 //-----------------------------------------------------------------------------
264 {
265  addStatValue<double>(value, counterStatVariables.countAsyncTasksForMinute, avgCounts.countAsyncTasksForMinuteCount);
266 }
267 //-----------------------------------------------------------------------------
269 {
270  addStatValue<double>(value, counterStatVariables.countSyncTasksForMinute, avgCounts.countSyncTasksForMinuteCount);
271 }
272 //-----------------------------------------------------------------------------
274 {
275  ++(counterStatVariables.countSyncTasks);
276 }
277 //-----------------------------------------------------------------------------
279 {
280  ++(counterStatVariables.countAsyncTasks);
281 }
282 //-----------------------------------------------------------------------------
284 {
285  ++(counterStatVariables.countSyncTasksFail);
286 }
287 //-----------------------------------------------------------------------------
289 {
290  ++(counterStatVariables.countAsyncTasksFail);
291 }
292 //-----------------------------------------------------------------------------
293 template <class T> void DRCEResourceMonitor::addStatValue(T value, StatVariableData<T>& statVariableData, size_t& count)
294 {
295  if (value)
296  {
297  if (statVariableData.min)
298  {
299  if (value < statVariableData.min)
300  statVariableData.min = value;
301  }
302  else
303  statVariableData.min = value;
304 
305  if (statVariableData.max)
306  {
307  if(value > statVariableData.max)
308  statVariableData.max = value;
309  }
310  else
311  statVariableData.max = value;
312 
313  statVariableData.avg += value;
314  ++count;
315  }
316 }
317 //-----------------------------------------------------------------------------
318 void DRCEResourceMonitor::countTasksForMinute(Poco::Timestamp& ts, size_t& countSyncTasks, size_t& countAsyncTasks)
319 {
320  const size_t oneMinute = 60000000; // in microseconds
321  if (ts.isElapsed(oneMinute) || !countSyncTasks || !countAsyncTasks)
322  {
323  // update count sync and async tasks if one minute was elapsed
324  addCountSyncTasksForMinute(counterStatVariables.countSyncTasks-countSyncTasks);
325  countSyncTasks = counterStatVariables.countSyncTasks;
326  addCountAsyncTasksForMinute(counterStatVariables.countAsyncTasks-countAsyncTasks);
327  countAsyncTasks = counterStatVariables.countAsyncTasks;
328  }
329 }
330 //-----------------------------------------------------------------------------
331 StatVariables DRCEResourceMonitor::recountStatVariables(void)
332 {
333  StatVariables localStatVariables(counterStatVariables);
334 /*
335  std::cout << "avgCounts.timeAsyncTaskCount = " << localStatVariables.timeAsyncTask.avg << "/" << avgCounts.timeAsyncTaskCount << std::endl;
336  std::cout << "avgCounts.timeSyncTaskCount = " << localStatVariables.timeSyncTask.avg << "/" << avgCounts.timeSyncTaskCount << std::endl;
337  std::cout << "avgCounts.sizeInputBufferSyncTasksCount = " << localStatVariables.sizeInputBufferSyncTasks.avg << "/" << avgCounts.sizeInputBufferSyncTasksCount << std::endl;
338  std::cout << "avgCounts.sizeOutputBufferSyncTasksCount = " << localStatVariables.sizeOutputBufferSyncTasks.avg << "/" << avgCounts.sizeOutputBufferSyncTasksCount << std::endl;
339  std::cout << "avgCounts.sizeInputBufferAsyncTasksCount = " << localStatVariables.sizeInputBufferAsyncTasks.avg << "/" << avgCounts.sizeInputBufferAsyncTasksCount << std::endl;
340  std::cout << "avgCounts.sizeOutputBufferAsyncTasksCount = " << localStatVariables.sizeOutputBufferAsyncTasks.avg << "/" << avgCounts.sizeOutputBufferAsyncTasksCount << std::endl;
341  std::cout << "avgCounts.cpuUsageAsyncTasksCount = " << localStatVariables.cpuUsageAsyncTasks.avg << "/" << avgCounts.cpuUsageAsyncTasksCount << std::endl;
342  std::cout << "avgCounts.memoryUsageAsyncTasksCount = " << localStatVariables.memoryUsageAsyncTasks.avg << "/" << avgCounts.memoryUsageAsyncTasksCount << std::endl;
343  std::cout << "avgCounts.countSyncTasksForMinuteCount = " << localStatVariables.countSyncTasksForMinute.avg << "/" << avgCounts.countSyncTasksForMinuteCount << std::endl;
344  std::cout << "avgCounts.countAsyncTasksForMinuteCount = " << localStatVariables.countAsyncTasksForMinute.avg << "/" << avgCounts.countAsyncTasksForMinuteCount << std::endl;
345 */
346  localStatVariables.timeAsyncTask.avg /= (avgCounts.timeAsyncTaskCount)?avgCounts.timeAsyncTaskCount:1;
347  localStatVariables.timeSyncTask.avg /= (avgCounts.timeSyncTaskCount)?avgCounts.timeSyncTaskCount:1;
348  localStatVariables.sizeInputBufferSyncTasks.avg /= (avgCounts.sizeInputBufferSyncTasksCount)?avgCounts.sizeInputBufferSyncTasksCount:1;
349  localStatVariables.sizeOutputBufferSyncTasks.avg /= (avgCounts.sizeOutputBufferSyncTasksCount)?avgCounts.sizeOutputBufferSyncTasksCount:1;
350  localStatVariables.sizeInputBufferAsyncTasks.avg /= (avgCounts.sizeInputBufferAsyncTasksCount)?avgCounts.sizeInputBufferAsyncTasksCount:1;
351  localStatVariables.sizeOutputBufferAsyncTasks.avg /= (avgCounts.sizeOutputBufferAsyncTasksCount)?avgCounts.sizeOutputBufferAsyncTasksCount:1;
352  localStatVariables.cpuUsageAsyncTasks.avg /= (avgCounts.cpuUsageAsyncTasksCount)?avgCounts.cpuUsageAsyncTasksCount:1;
353  localStatVariables.memoryUsageAsyncTasks.avg /= (avgCounts.memoryUsageAsyncTasksCount)?avgCounts.memoryUsageAsyncTasksCount:1;
354  localStatVariables.countSyncTasksForMinute.avg /= (avgCounts.countSyncTasksForMinuteCount)?avgCounts.countSyncTasksForMinuteCount:1;
355  localStatVariables.countAsyncTasksForMinute.avg /= (avgCounts.countAsyncTasksForMinuteCount)?avgCounts.countAsyncTasksForMinuteCount:1;
356 
357  return localStatVariables;
358 }
359 //-----------------------------------------------------------------------------
360 //-----------------------------------------------------------------------------
361 } // end namespace drce
362 } // end namespace HCE