hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DRCEAsyncTasksQueue.cpp
Go to the documentation of this file.
1 #include <iostream>
2 #include <fstream>
3 #include <sstream>
4 #include <utility>
5 #include <iterator>
6 #include <signal.h>
7 #include <dirent.h>
8 #include <sys/types.h>
9 #include <Poco/TaskManager.h>
10 #include <Poco/Timestamp.h>
11 #include <Poco/File.h>
12 #include <Poco/Logger.h>
13 
14 #include "Process.hpp"
15 #include "DRCEAsyncTasksQueue.hpp"
16 #include "DRCEFileStream.hpp"
18 #include "DRCEMessageConst.hpp"
19 #include "DRCEError.hpp"
20 #include "DRCEResultData.hpp"
21 #include "DRCEFileBuilder.hpp"
22 #include "DRCECommonTask.hpp"
23 #include "DRCEFileExtractor.hpp"
24 #include "DRCECleanupTask.hpp"
25 #include "DRCETasksQueue.hpp"
26 #include "DRCEReadProcessData.hpp"
29 
30 namespace HCE
31 {
32 namespace drce
33 {
34 //-----------------------------------------------------------------------------
35 void ProgressHandler::onStarted(Poco::TaskStartedNotification* pNf)
36 {
37  if (pNf->task())
38  this->log("Started task: "+pNf->task()->name());
39  pNf->release();
40 }
41 
42 //-----------------------------------------------------------------------------
43 void ProgressHandler::onCancelled(Poco::TaskCancelledNotification* pNf)
44 {
45  if (pNf->task())
46  this->log("Cancelled task: "+pNf->task()->name());
47  pNf->release();
48 }
49 //-----------------------------------------------------------------------------
50 void ProgressHandler::onFinished(Poco::TaskFinishedNotification* pNf)
51 {
52  if (pNf->task())
53  this->log("Stopped task: "+pNf->task()->name());
54  pNf->release();
55 }
56 //-----------------------------------------------------------------------------
57 void ProgressHandler::log(const std::string& msg)
58 {
59  try
60  {
61  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, msg, Poco::Message::Priority::PRIO_INFORMATION));
62  }
63  catch(Poco::Exception& e)
64  {
65  std::cerr << "ProgressHandler: " << e.displayText() << std::endl;
66  }
67 }
68 //-----------------------------------------------------------------------------
69 //-----------------------------------------------------------------------------
71 : taskId(0), parentTaskId(0), pid(0), timeMax(0), timeStart(0), timeElapsed(0),
72  state(DRCETaskRequest::TaskState::FINISHED), locked(false), usageLimits()
73 {
74 }
75 //-----------------------------------------------------------------------------
76 AsyncTasks::AsyncTasks(unsigned int taskId_, unsigned int parentTaskId_, pid_t pid_, unsigned int timeMax_, size_t timeStart_,
77  size_t timeElapsed_, DRCETaskRequest::TaskState state_)
78 : taskId(taskId_), parentTaskId(parentTaskId_), pid(pid_), timeMax(timeMax_), timeStart(timeStart_), timeElapsed(timeElapsed_),
79  state(state_), locked(false), usageLimits()
80 {
81 }
82 //-----------------------------------------------------------------------------
84 : taskId(0), parentTaskId(0), pid(0), timeMax(0), timeStart(0), timeElapsed(0),
85  state(DRCETaskRequest::TaskState::FINISHED), locked(false), usageLimits()
86 {
87  (*this) = rhs;
88 }
89 //-----------------------------------------------------------------------------
91 : taskId(0), parentTaskId(0), pid(0), timeMax(0), timeStart(0), timeElapsed(0),
92  state(DRCETaskRequest::TaskState::FINISHED), locked(false), usageLimits()
93 {
94  (*this) = std::forward<AsyncTasks>(rhs);
95 }
96 //-----------------------------------------------------------------------------
98 {
99  if (this!=&rhs)
100  {
101  taskId = rhs.taskId;
103  pid = rhs.pid;
104  timeMax = rhs.timeMax;
105  timeStart = rhs.timeStart;
106  timeElapsed = rhs.timeElapsed;
107  state = rhs.state;
108  locked = rhs.locked;
109  usageLimits = rhs.usageLimits;
110  }
111  return *this;
112 }
113 //-----------------------------------------------------------------------------
115 {
116  if (this!=&rhs)
117  {
118  taskId = std::move(rhs.taskId);
119  parentTaskId = std::move(rhs.parentTaskId);
120  pid = std::move(rhs.pid);
121  timeMax = std::move(rhs.timeMax);
122  timeStart = std::move(rhs.timeStart);
123  timeElapsed = std::move(rhs.timeElapsed);
124  state = std::move(rhs.state);
125  locked = std::move(rhs.locked);
126  usageLimits = std::move(rhs.usageLimits);
127  }
128  return *this;
129 }
130 //-----------------------------------------------------------------------------
132 {
133  taskId = 0;
134  parentTaskId = 0;
135  pid = 0;
136  timeMax = 0;
137  timeStart = 0;
138  timeElapsed = 0;
139  state = DRCETaskRequest::TaskState::UNDEFINED;
140  locked = false;
141  usageLimits.clear();
142 }
143 //-----------------------------------------------------------------------------
144 std::ostream& operator<<(std::ostream& os, const AsyncTasks& rhs)
145 {
146  os << rhs.taskId << " " << rhs.parentTaskId << " " << rhs.pid << " " << rhs.timeMax << " " << rhs.timeStart << " " << rhs.timeElapsed << " "
147  << static_cast<unsigned int>(rhs.state);
148  return os;
149 }
150 //-----------------------------------------------------------------------------
151 std::istream& operator>>(std::istream& is, AsyncTasks& rhs)
152 {
153  is >> rhs.taskId;
154  is >> rhs.parentTaskId;
155  is >> rhs.pid;
156  is >> rhs.timeMax;
157  is >> rhs.timeStart;
158  is >> rhs.timeElapsed;
159  unsigned int state = 0;
160  is >> state;
161  rhs.state = static_cast<DRCETaskRequest::TaskState>(state);
162 
163  return is;
164 }
165 //-----------------------------------------------------------------------------
166 //-----------------------------------------------------------------------------
168  DRCENotificationExecutor& notificationExecutor_,
169  CustomMessage& message_)
170 : maxThreadCount(DEFAULT_MAX_CAPACITY), threadPool(), taskManager(threadPool), progressHandler(*this), thread(),
171  nodeOptions(nodeOptions_), notificationExecutor(notificationExecutor_), message(message_),
172  tasks(), subtasks(), mutex(), terminated(false), tasksQueueDumpPeriod(DEFAULT_QUEUE_DUMP_PERIOD), cleanupTaskManager(), syncTask(), resetErrorCodeStateNotification(false)
173 {
174  taskManager.addObserver(Poco::Observer<ProgressHandler, Poco::TaskStartedNotification>(progressHandler, &ProgressHandler::onStarted));
175  taskManager.addObserver(Poco::Observer<ProgressHandler, Poco::TaskFinishedNotification>(progressHandler, &ProgressHandler::onFinished));
176  taskManager.addObserver(Poco::Observer<ProgressHandler, Poco::TaskCancelledNotification>(progressHandler, &ProgressHandler::onCancelled));
177  loadTasksQueue();
178  thread.start(*this);
179 }
180 //-----------------------------------------------------------------------------
182 {
183  cancelAll();
184  joinAll();
185 
186  terminate(); // terminate 'watch dogs' thread
187  thread.join();
188 }
189 //-----------------------------------------------------------------------------
191 {
192  taskManager.cancelAll();
194  cleanupTaskManager.cancelAll();
195 }
196 //-----------------------------------------------------------------------------
198 {
199  taskManager.joinAll();
200  cleanupTaskManager.joinAll();
201 }
202 //-----------------------------------------------------------------------------
203 void DRCEAsyncTasksQueue::startTask(Poco::Task* pTask) throw (Poco::Exception)
204 {
205  if (pTask)
206  {
207  unsigned int taskId = 0;
208  try
209  {
210  taskId = std::stoul(pTask->name());
211  }
212  catch(std::exception& e)
213  {
214  throw Poco::Exception(message(message_const::BAD_TASK_ID, e.what()), ERROR_CREATE_NEW_TASK);
215  }
216 
217  if (isExistAsyncTask(taskId))
218  throw Poco::Exception(message(message_const::TASK_CONTINUES_TO_RUN), ERROR_CREATE_NEW_TASK);
219 
220  taskManager.start(pTask);
221  }
222 }
223 //-----------------------------------------------------------------------------
224 // cppcheck-suppress unusedFunction
226 {
227  bool result = false;
228  try
229  {
230  const std::string taskName = std::to_string(taskId);
231 
232  Poco::TaskManager::TaskList taskList = taskManager.taskList();
233  for (Poco::TaskManager::TaskList::iterator iter=taskList.begin();iter!=taskList.end();++iter)
234  {
235  if ((*iter)->name() == taskName)
236  {
237  result = true;
238  break;
239  }
240  }
241  }
242  catch(std::exception& e)
243  {
244  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::SEARCH_TASK, e.what()), Poco::Message::Priority::PRIO_ERROR));
245  }
246  return result;
247 }
248 //-----------------------------------------------------------------------------
250 {
251  try
252  {
253  const std::string taskName = std::to_string(taskId);
254  Poco::TaskManager::TaskList taskList = taskManager.taskList();
255  for (Poco::TaskManager::TaskList::iterator iter=taskList.begin();iter!=taskList.end();++iter)
256  {
257  if ((*iter)->name() == taskName)
258  {
259  (*iter)->cancel();
260  break;
261  }
262  }
263  }
264  catch(std::exception& e)
265  {
266  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::CANCEL_TASK, e.what()), Poco::Message::Priority::PRIO_ERROR));
267  }
268 }
269 //-----------------------------------------------------------------------------
270 void DRCEAsyncTasksQueue::setMaxThreadCount(unsigned int threadCount)
271 {
272  if (threadCount < maxThreadCount)
273  threadPool.addCapacity(maxThreadCount - threadPool.capacity());
274  else
275  threadPool.addCapacity(static_cast<int>(threadCount) - threadPool.capacity());
276 }
277 //-----------------------------------------------------------------------------
279 {
280  return static_cast<unsigned int>(threadPool.capacity());
281 }
282 //-----------------------------------------------------------------------------
284  const std::string& errorMessage, unsigned int errorCode) throw (Poco::Exception)
285 {
286  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, "DRCEAsyncTasksQueue::changeState enter", Poco::Message::Priority::PRIO_TRACE));
287 
288  if (!asyncTask.timeStart)
289  {
290  Poco::Logger::root().log(Poco::Message(drce_const::moduleName,
291  "!!! DRCEAsyncTasksQueue::changeState recieved empty 'asyncTask.timeStart' !!!",
292  Poco::Message::Priority::PRIO_TRACE));
293  return;
294  }
295 
296  AsyncTaskLocker(*this, asyncTask);
297 
298  DRCEResultData resultData;
299  DRCEResultDataItem resultDataItem;
300  std::stringstream outMsg;
301 
302  Poco::File dataFile(FileStream::getDataFileName(nodeOptions.getTasksDataDir(), asyncTask.taskId));
303  if (dataFile.exists())
304  {
305  try
306  {
307  DataFileExtractor dataFileExtractor(message);
308  resultDataItem = dataFileExtractor.extract(dataFile.path());
309  }
310  catch(Poco::Exception& e)
311  {
312  outMsg.str("");
313  outMsg << "!!!!! DataFileExtractor::extract(" << dataFile.path() << "): " << e.displayText();
314  outMsg << " SET STATE TO IN_PROGRESS";
315  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
316 
317  resultDataItem.setState(static_cast<unsigned int>(DRCETaskRequest::TaskState::IN_PROGRESS));
318  }
319  }
320 
321  if (pResultDataItem)
322  {
323  resultDataItem = *pResultDataItem;
324  }
325 
326  outMsg.str("");
327  outMsg << "DRCEAsyncTasksQueue::changeState taskId: " << asyncTask.taskId
328  << " asyncTasks.state: " << (int)asyncTask.state
329  << " resultDataItem.state: " << resultDataItem.getState()
330  << " asyncTask.parentTaskId: " << asyncTask.parentTaskId;
331  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
332 
333  resultDataItem.setNodeName(nodeOptions.getNodeName());
334  resultDataItem.setNodeHost(nodeOptions.getNodeHost());
335  resultDataItem.setNodePort(nodeOptions.getNodePort());
336 
337  resultDataItem.setRequestId(asyncTask.taskId);
338  resultDataItem.setPid(asyncTask.pid);
339  resultDataItem.setState(static_cast<unsigned int>(asyncTask.state));
340  resultDataItem.setTime(asyncTask.timeElapsed);
341 
342  if (!errorMessage.empty())
343  resultDataItem.setErrorMessage(errorMessage);
344  if (errorCode!=NO_ERROR)
345  resultDataItem.setErrorCode(errorCode);
346 
347  resultData.addDataItem(std::forward<DRCEResultDataItem>(resultDataItem));
348 
349  // execute notification only for `main` task and only if tasks has been changed state
350  if (!asyncTask.parentTaskId)
351  {
352  AsyncTasks prevStateAsyncTask = getAsyncTask(asyncTask.taskId);
353  if (prevStateAsyncTask.state != asyncTask.state || prevStateAsyncTask.pid != asyncTask.pid)
354  {
355  // change information about state and other property async task
356  setAsyncTask(asyncTask);
357  // execute notification use new state
358  executeNotification(resultData, DRCETaskRequest::RequestType::rtTaskStateNotification);
359  }
360  }
361 
362  if ((asyncTask.state == DRCETaskRequest::TaskState::FINISHED) ||
363  (asyncTask.state == DRCETaskRequest::TaskState::TERMINATED) ||
364  (asyncTask.state == DRCETaskRequest::TaskState::CRASHED) ||
365  (asyncTask.state == DRCETaskRequest::TaskState::DELETED) ||
366  (asyncTask.state == DRCETaskRequest::TaskState::UNDEFINED) ||
367  (asyncTask.state == DRCETaskRequest::TaskState::TERMINATED_BY_TTL))
368  {
369  bool ret = removeAsyncTask(asyncTask.taskId);
370  outMsg.str("");
371  outMsg << "TASK (ID: " << asyncTask.taskId << ", PID: " << asyncTask.pid << ", TIME: " << asyncTask.timeElapsed
372  << ", STATE: " << static_cast<unsigned int>(asyncTask.state) << ") WAS " << ((ret)?"SUCCESS":"FAILED") << " ERASED FROM QUEUE !!!";
373  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
374 
375 
376  // write to tasks log information about finished state of task
378  }
379 
380  if (asyncTask.state != DRCETaskRequest::TaskState::DELETED)
381  {
382  // rewrite content of `data` and `status` files
383  DataFileBuilder dataFileBuilder(message, nodeOptions.getTasksDataDir());
384  dataFileBuilder.build(resultData);
385 
386  StatusFileBuilder statusFileBuilder(message, nodeOptions.getTasksStatusDir());
387  statusFileBuilder.build(resultData);
388  }
389 }
390 //-----------------------------------------------------------------------------
392  const std::string& errorMessage, unsigned int errorCode)
393 {
394  try
395  {
396  changeState(std::forward<AsyncTasks>(asyncTask), pResultDataItem, errorMessage, errorCode);
397  }
398  catch(Poco::Exception& e)
399  {
400  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::CHANGE_STATE, e.displayText()), Poco::Message::Priority::PRIO_ERROR));
401  }
402  catch(std::exception& e)
403  {
404  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::CHANGE_STATE, e.what()), Poco::Message::Priority::PRIO_ERROR));
405  }
406 }
407 //-----------------------------------------------------------------------------
408 //-----------------------------------------------------------------------------
410 {
411  Poco::Mutex::ScopedLock lock(mutex);
412  AsyncTasks asyncTask;
413  Tasks::iterator iter = tasks.find(taskId);
414  if (iter != tasks.end())
415  {
416  asyncTask = (*iter).second;
417  }
418  return asyncTask;
419 }
420 //-----------------------------------------------------------------------------
422 {
423  Poco::Mutex::ScopedLock lock(mutex);
424  tasks[asyncTask.taskId] = asyncTask;
425 }
426 //-----------------------------------------------------------------------------
428 {
429  Poco::Mutex::ScopedLock lock(mutex);
430  tasks[asyncTask.taskId] = std::forward<AsyncTasks>(asyncTask);
431 }
432 //-----------------------------------------------------------------------------
434 {
435  Poco::Mutex::ScopedLock lock(mutex);
436  return tasks.size();
437 }
438 //-----------------------------------------------------------------------------
440 {
441  bool result = false;
442  Poco::Mutex::ScopedLock lock(mutex);
443  Tasks::iterator iter = tasks.find(taskId);
444  if (iter != tasks.end())
445  {
446  tasks.erase(iter);
447  result = true;
448  }
449  return result;
450 }
451 //-----------------------------------------------------------------------------
453 {
454  Poco::Mutex::ScopedLock lock(mutex);
455  return (tasks.find(taskId)!=tasks.end());
456 }
457 //-----------------------------------------------------------------------------
459 {
460  Poco::Mutex::ScopedLock lock(mutex);
461  Tasks::iterator iter = tasks.find(taskId);
462  if (iter != tasks.end())
463  {
464  (*iter).second.locked = true;
465  }
466 }
467 //-----------------------------------------------------------------------------
469 {
470  lockAsyncTask(asyncTask.taskId);
471 }
472 //-----------------------------------------------------------------------------
474 {
475  Poco::Mutex::ScopedLock lock(mutex);
476  Tasks::iterator iter = tasks.find(taskId);
477  if (iter != tasks.end())
478  {
479  (*iter).second.locked = false;
480  }
481 }
482 //-----------------------------------------------------------------------------
484 {
485  unlockAsyncTask(asyncTask.taskId);
486 }
487 //-----------------------------------------------------------------------------
489 {
490  bool result = false;
491  Poco::Mutex::ScopedLock lock(mutex);
492  Tasks::iterator iter = tasks.find(taskId);
493  if (iter != tasks.end())
494  {
495  result = (*iter).second.locked;
496  }
497  return result;
498 }
499 //-----------------------------------------------------------------------------
501 {
502  Poco::Mutex::ScopedLock lock(mutex);
503  terminated = true;
504 }
505 //-----------------------------------------------------------------------------
507 {
508  Poco::Mutex::ScopedLock lock(mutex);
509  return terminated;
510 }
511 //-----------------------------------------------------------------------------
512 void DRCEAsyncTasksQueue::setTasksQueueDumpPeriod(unsigned int tasksQueueDumpPeriod_)
513 {
514  Poco::Mutex::ScopedLock lock(mutex);
515  tasksQueueDumpPeriod = tasksQueueDumpPeriod_;
516 }
517 //-----------------------------------------------------------------------------
519 {
520  Poco::Mutex::ScopedLock lock(mutex);
521  return tasksQueueDumpPeriod;
522 }
523 //-----------------------------------------------------------------------------
525 {
526  Poco::Mutex::ScopedLock lock(mutex);
527 
528  const std::string dumpFileName = FileStream::getDumpFileName(nodeOptions.getTasksStatusDir(), nodeOptions.getNodeName());
529  FileStream fileStream(dumpFileName, FileStream::read);
530  if (fileStream.isOpen())
531  {
532  std::string fileContent;
533  fileStream >> fileContent;
534  std::istringstream istr(fileContent);
535  std::string line;
536  while(std::getline(istr, line))
537  {
538  AsyncTasks asyncTask;
539  std::stringstream str(line);
540  str >> asyncTask;
541 
542  std::stringstream outMsg;
543  outMsg << ">>> loadTasksQueue\ttaskId = " << asyncTask.taskId << " parentTaskId = " << asyncTask.parentTaskId << " pid = " << asyncTask.pid
544  << " timeMax = " << asyncTask.timeMax << " timeStart = " << asyncTask.timeStart << " state = " << (unsigned int)(asyncTask.state);
545  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
546 
547  unsigned int taskId = asyncTask.taskId;
548  if (HCE::Process::isExistProcess(asyncTask.pid))
549  {
550  tasks[taskId] = std::forward<AsyncTasks>(asyncTask); // add task to queue
551  }
552  else
553  {
554  asyncTask.state = DRCETaskRequest::TaskState::UNDEFINED;
555  tasks.insert(Tasks::value_type(asyncTask.taskId, asyncTask));
556  }
557  }
558  fileStream.close();
559  try
560  {
561  Poco::File dumpFile(dumpFileName);
562  if (dumpFile.exists())
563  dumpFile.remove();
564  }
565  catch(Poco::Exception& e)
566  {
567  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, dumpFileName+", "+message(message_const::ERROR, e.displayText()), Poco::Message::Priority::PRIO_ERROR));
568  }
569  }
570 }
571 //-----------------------------------------------------------------------------
573 {
574  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, "DRCEAsyncTasksQueue::saveTasksQueue enter", Poco::Message::Priority::PRIO_TRACE));
575 
576  Poco::Mutex::ScopedLock lock(mutex);
577 
578  FileStream fileStream(FileStream::getDumpFileName(nodeOptions.getTasksStatusDir(), nodeOptions.getNodeName()), FileStream::write);
579 
580  for (Tasks::iterator iter=tasks.begin();iter!=tasks.end();++iter)
581  {
582  std::stringstream ostr;
583  ostr << ((*iter).second) << std::endl;
584  fileStream << ostr.str();
585  }
586 }
587 //-----------------------------------------------------------------------------
588 void DRCEAsyncTasksQueue::executeNotification(const std::string& json)
589 {
590  Poco::Mutex::ScopedLock lock(mutex);
591  notificationExecutor.execute(json);
592 }
593 //-----------------------------------------------------------------------------
595 {
596  try
597  {
598  DRCEResultData localResultData(resultData);
599  const size_t itemCount = resultData.getItemsCount();
600 
601  if (resetErrorCodeStateNotification)
602  {
603  for (size_t i=0;i<itemCount;++i)
604  {
605  DRCEResultDataItem resultDataItem = localResultData.getDataItem(i);
606  if (resultDataItem.getErrorCode() != ERROR_TERMINATE_EXPIRED_TASK)
607  {
608  resultDataItem.setErrorCode(NO_ERROR);
609  resultDataItem.setErrorMessage("");
610  localResultData.setDataItem(i, std::forward<DRCEResultDataItem>(resultDataItem));
611  }
612  }
613  }
614  for (size_t i=0;i<itemCount;++i)
615  {
616  DRCEResultDataItem resultDataItem = localResultData.getDataItem(i);
617  resultDataItem.setRequestType(requestType);
618  localResultData.setDataItem(i, std::forward<DRCEResultDataItem>(resultDataItem));
619  }
620 
621  if (itemCount)
622  {
623  if (requestType == DRCETaskRequest::RequestType::rtTaskStateNotification)
625  else
627  }
628  }
629  catch(Poco::Exception& e)
630  {
631  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::ERROR_EXECUTE_NOTIFICATION, e.message()), Poco::Message::Priority::PRIO_ERROR));
632  }
633  catch(std::exception& e)
634  {
635  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::ERROR_EXECUTE_NOTIFICATION, e.what()), Poco::Message::Priority::PRIO_ERROR));
636  }
637 }
638 //-----------------------------------------------------------------------------
640 {
641  Poco::Mutex::ScopedLock lock(mutex);
642  DRCEResultData resultData;
643 
644  for (Tasks::iterator iter=tasks.begin();iter!=tasks.end();++iter)
645  {
647  DataFileExtractor dataFileExtractor(message);
648  Poco::File statusFile(FileStream::getStatusFileName(nodeOptions.getTasksStatusDir(), (*iter).second.taskId));
649  if (statusFile.exists())
650  {
651  try
652  {
653  DRCEResultDataItem resultDataItem = dataFileExtractor.extract(statusFile.path());
654  AsyncTasks asyncTask = getAsyncTask((*iter).second.taskId);
655  asyncTask.state = static_cast<HCE::drce::DRCETaskRequest::TaskState>(resultDataItem.getState());
656  asyncTask.timeElapsed = DRCECommonTask::getElapsedTimeMsec(Poco::Timestamp::fromEpochTime(asyncTask.timeStart));
657  setAsyncTask(std::forward<AsyncTasks>(asyncTask));
658 
659  if (asyncTask.state!=HCE::drce::DRCETaskRequest::TaskState::FINISHED)
660  resultDataItem.setTime(asyncTask.timeElapsed);
661 
662  resultData.addDataItem(std::forward<DRCEResultDataItem>(resultDataItem));
663  }
664  catch(Poco::Exception& e)
665  {
666  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.message(), Poco::Message::Priority::PRIO_DEBUG));
667  }
668  }
669  }
670  return resultData;
671 }
672 //-----------------------------------------------------------------------------
674 {
675  try
676  {
677  cleanupTaskManager.start(new DRCECleanupTask(taskId, nodeOptions, message, *this));
678  }
679  catch(Poco::NoThreadAvailableException& e)
680  {
681  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::START_CLEANUP_TASK_ERROR, e.displayText()), Poco::Message::Priority::PRIO_ERROR));
682  }
683  catch(Poco::Exception& e)
684  {
685  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::START_CLEANUP_TASK_ERROR, e.displayText()), Poco::Message::Priority::PRIO_ERROR));
686  }
687 }
688 //-----------------------------------------------------------------------------
690 {
691  DRCETasksQueue taskQueue;
692  Poco::Mutex::ScopedLock lock(mutex);
693  for (Tasks::iterator iter=tasks.begin();iter!=tasks.end();++iter)
694  {
695  taskQueue.addItem(std::forward<TaskStatusData>(TaskStatusData((*iter).second.taskId,
696  (*iter).second.timeStart,
697  (*iter).second.state,
698  SessionOptions::ThreadMode::tmAsync)));
699  }
700  return taskQueue;
701 }
702 //-----------------------------------------------------------------------------
704 {
705  std::string resultJson;
706  DRCETasksQueue taskQueue = std::forward<DRCETasksQueue>(getCurrentTasksQueue());
707  if (!taskQueue.serialize(resultJson))
708  {
709  Poco::Logger::root().log(Poco::Message(drce_const::moduleName,
711  Poco::Message::Priority::PRIO_ERROR));
712  }
713  return resultJson;
714 }
715 //-----------------------------------------------------------------------------
716 void DRCEAsyncTasksQueue::writeToTasksLog(const std::string& delimiter, const std::string& taskStatus, const AsyncTasks& asyncTask)
717 {
718  std::stringstream outMsg;
719  outMsg << taskStatus << delimiter
720  << asyncTask.taskId << delimiter
721  << asyncTask.timeStart << delimiter
722  << static_cast<unsigned int>(asyncTask.state) << delimiter
723  << static_cast<unsigned int>(SessionOptions::ThreadMode::tmAsync);
724  Poco::Logger::get(drce_const::DRCE_TASKS_LOGGER).log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_INFORMATION));
725 }
726 //-----------------------------------------------------------------------------
727 void DRCEAsyncTasksQueue::setSyncTaskAsTerminated(const std::string& errorMessage, unsigned int errorCode)
728 {
729  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, "DRCEAsyncTasksQueue::setSyncTaskAsTerminated enter", Poco::Message::Priority::PRIO_TRACE));
730 
731  if (!syncTask.timeStart)
732  {
733  Poco::Logger::root().log(Poco::Message(drce_const::moduleName,
734  "!!! DRCEAsyncTasksQueue::setSyncTaskAsTerminated recieved empty 'syncTask.timeStart' !!!",
735  Poco::Message::Priority::PRIO_TRACE));
736  return;
737  }
738 
739  syncTask.state = DRCETaskRequest::TaskState::TERMINATED_BY_TTL;
740 
741  DRCEResultData resultData;
742  DRCEResultDataItem resultDataItem;
743 
744  Poco::File dataFile(FileStream::getDataFileName(nodeOptions.getTasksDataDir(), syncTask.taskId));
745  if (dataFile.exists())
746  {
747  try
748  {
749  DataFileExtractor dataFileExtractor(message);
750  resultDataItem = dataFileExtractor.extract(dataFile.path());
751  }
752  catch(Poco::Exception& e)
753  {
754  std::stringstream outMsg;
755  outMsg << "!!!!!DataFileExtractor::extract(" << dataFile.path() << "): " << e.displayText();
756  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
757  }
758 
759  std::stringstream outMsg;
760  outMsg << ">>>>> DataFileExtractor::extract Size of " << syncTask.taskId << ".data: " << dataFile.getSize();
761  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
762  }
763 
764  resultDataItem.setNodeName(nodeOptions.getNodeName());
765  resultDataItem.setNodeHost(nodeOptions.getNodeHost());
766  resultDataItem.setNodePort(nodeOptions.getNodePort());
767 
768  resultDataItem.setRequestId(syncTask.taskId);
769  resultDataItem.setPid(syncTask.pid);
770  resultDataItem.setState(static_cast<unsigned int>(DRCETaskRequest::TaskState::TERMINATED_BY_TTL));
771  resultDataItem.setTime(Poco::Timestamp::fromEpochTime(syncTask.timeStart).elapsed()/1000);
772  resultDataItem.setErrorMessage(errorMessage);
773  resultDataItem.setErrorCode(errorCode);
774 
775  resultData.addDataItem(std::forward<DRCEResultDataItem>(resultDataItem));
776 
777  try
778  {
779  // rewrite content of `data` and `status` files
780  DataFileBuilder dataFileBuilder(message, nodeOptions.getTasksDataDir());
781  dataFileBuilder.build(resultData);
782 
783  StatusFileBuilder statusFileBuilder(message, nodeOptions.getTasksStatusDir());
784  statusFileBuilder.build(resultData);
785  }
786  catch(Poco::Exception& e)
787  {
788  std::stringstream outMsg;
789  outMsg << "!!!!!FileBuilder::build: " << e.displayText();
790  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_ERROR));
791  }
792  catch(std::exception& e)
793  {
794  std::stringstream outMsg;
795  outMsg << "!!!!!FileBuilder::build: " << e.what();
796  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_ERROR));
797  }
798 }
799 //-----------------------------------------------------------------------------
801 {
802  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, "DRCEAsyncTasksQueue::checkExpiredTime enter", Poco::Message::Priority::PRIO_TRACE));
803 
804  Poco::Mutex::ScopedLock lock(mutex);
805 
806  for (Tasks::iterator iter=tasks.begin();iter!=tasks.end();)
807  {
808  std::stringstream outMsg;
809  if (!(*iter).second.timeStart || !(*iter).second.timeMax || !(*iter).second.taskId)
810  {
811  outMsg << "taskId: " << (*iter).second.taskId
812  << " pid: " << (*iter).second.pid
813  << " timeMax: " << (*iter).second.timeMax
814  << " timeStart: " << (*iter).second.timeStart
815  << " state: " << (*iter).second.state << " WAS FORCE ERASE";
816  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
817  iter = tasks.erase(iter);
818  }
819  else
820  ++iter;
821  }
822 
823  Tasks taskList = tasks;
824  for (Tasks::iterator iter=taskList.begin();iter!=taskList.end();++iter)
825  {
826  std::stringstream outMsg;
827  outMsg << "taskId: " << (*iter).second.taskId
828  << " pid: " << (*iter).second.pid
829  << " timeMax: " << (*iter).second.timeMax
830  << " timeStart: " << (*iter).second.timeStart
831  << " state: " << (*iter).second.state;
832 
833  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
834 
835  Poco::Timestamp::TimeDiff diff = Poco::Timestamp::fromEpochTime((*iter).second.timeStart).elapsed();
836  (*iter).second.timeElapsed = static_cast<size_t>(diff/1000);
837 
838  outMsg.str("");
839  outMsg << "diff = " << (diff/1000);
840  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
841 
842  if ((*iter).second.timeMax &&
843  ((*iter).second.timeMax < (diff/1000)))
844  {
845  AsyncTasks asyncTask((*iter).second);
847  DRCETaskRequestTerminateExecutor::terminateExpiredTask((*iter).second, *this, nodeOptions, message);
848 
849  std::stringstream outMsg;
850  outMsg << "TASK (ID: " << asyncTask.taskId << ", PID: " << asyncTask.pid << ", TIME: " << asyncTask.timeElapsed << ") WAS TERMINATED AS EXPIRED !!!";
851  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_INFORMATION));
852 
853  outMsg.str("");
854  outMsg << "DRCEAsyncTasksQueue::checkExpiredTime getSubtaskCleanup(" << asyncTask.taskId << ") = " << std::boolalpha << getSubtaskCleanup(asyncTask.taskId);
855  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
856  }
857  }
858 
859  if (syncTask.timeStart && syncTask.taskId)
860  {
861  std::stringstream outMsg;
862  Poco::Timestamp::TimeDiff diff = Poco::Timestamp::fromEpochTime(syncTask.timeStart).elapsed();
863 
864  outMsg << "taskId: " << syncTask.taskId
865  << " pid: " << syncTask.pid
866  << " timeMax: " << syncTask.timeMax
867  << " timeStart: " << syncTask.timeStart
868  << " state: " << syncTask.state;
869  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
870 
871  outMsg.str("");
872  outMsg << "diff = " << (diff/1000);
873  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
874 
875  if (syncTask.timeMax && (syncTask.timeMax < (diff/1000)))
876  {
877  if (syncTask.pid)
878  {
879  // make sync task to expired state
880  setSyncTaskAsTerminated(message(message_const::TASK_TIME_EXPIRED, syncTask.timeMax) , ERROR_TERMINATE_EXPIRED_TASK);
881  // kill all processes of sync task
882  std::vector<pid_t> descendants = DRCEReadProcessData::getAllProcessIds(syncTask.pid);
883  for (size_t i=0;i<descendants.size();++i)
884  kill(descendants[i], SIGKILL);
885  // log termination
886  outMsg.str("");
887  outMsg << "TASK (ID: " << syncTask.taskId << ", PID: " << syncTask.pid << ", TIME: " << (unsigned int)(diff/1000) << ") WAS TERMINATED AS EXPIRED !!!";
888  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_INFORMATION));
889  }
890  }
891  }
892 }
893 //-----------------------------------------------------------------------------
895 {
896  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, "DRCEAsyncTasksQueue::checkUsageResources enter", Poco::Message::Priority::PRIO_TRACE));
897 
898  Poco::Mutex::ScopedLock lock(mutex);
899 
900  DRCEResourceLimitsChecker resourceLimitsChecker(message);
901 
902  Tasks taskList = tasks;
903  for (Tasks::iterator iter=taskList.begin();iter!=taskList.end();++iter)
904  {
905  try
906  {
907  resourceLimitsChecker.checkLimits((*iter).second, (*iter).second.usageLimits);
908  }
909  catch(Poco::Exception& e)
910  {
911  std::stringstream outMsg;
912  outMsg << "TASK (ID: " << (*iter).second.taskId << ", PID: " << (*iter).second.pid << ", TIME: " << (*iter).second.timeElapsed << ") WAS TERMINATED AS EXCEEDED USAGE RESOURCES !!!";
913  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_INFORMATION));
914 
915  DRCETaskRequestTerminateExecutor::executeTerminate((*iter).second.taskId, *this, nodeOptions, message, e.message(), e.code());
916  }
917  }
918 
919  if (syncTask.timeStart && syncTask.taskId)
920  {
921  try
922  {
923  resourceLimitsChecker.checkLimits(syncTask, syncTask.usageLimits);
924  }
925  catch(Poco::Exception& e)
926  {
927  if (syncTask.pid)
928  {
929  Poco::Timestamp::TimeDiff diff = Poco::Timestamp::fromEpochTime(syncTask.timeStart).elapsed();
930  // log termination
931  std::stringstream outMsg;
932  outMsg << "TASK (ID: " << syncTask.taskId << ", PID: " << syncTask.pid << ", TIME: " << (unsigned int)(diff/1000) << ") WAS TERMINATED AS EXCEEDED USAGE RESOURCES !!!";
933  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_INFORMATION));
934 
935  // make sync task to expired state
936  setSyncTaskAsTerminated(e.message(), e.code());
937  // kill all processes of sync task
938  std::vector<pid_t> descendants = DRCEReadProcessData::getAllProcessIds(syncTask.pid);
939  for (size_t i=0;i<descendants.size();++i)
940  kill(descendants[i], SIGKILL);
941  }
942  }
943  }
944 }
945 //-----------------------------------------------------------------------------
947 {
948  Poco::Mutex::ScopedLock lock(mutex);
949 
950  for (Tasks::iterator iter=tasks.begin();iter!=tasks.end(); ++iter)
951  {
952  if ((*iter).second.state == DRCETaskRequest::TaskState::IN_PROGRESS ||
953  (*iter).second.state == DRCETaskRequest::TaskState::SET_AS_NEW ||
954  (*iter).second.state == DRCETaskRequest::TaskState::QUEUED_TO_RUN)
955  {
956  AsyncTasks asyncTask((*iter).second);
957  asyncTask.state = DRCETaskRequest::TaskState::TERMINATED;
958  asyncTask.timeElapsed = DRCECommonTask::getElapsedTimeMsec(Poco::Timestamp::fromEpochTime(asyncTask.timeStart));
959  setAsyncTask(std::forward<AsyncTasks>(asyncTask));
960  }
961  }
962  saveTasksQueue();
963 
964  if (syncTask.pid)
965  kill(syncTask.pid, SIGKILL);
966  syncTask.clear();
967 }
968 //-----------------------------------------------------------------------------
969 // cppcheck-suppress unusedFunction
971 {
972  const unsigned int periodSleep = 1000; // msec
973 
974  while(!isTerminated())
975  {
976  unsigned int periodMs = getTasksQueueDumpPeriod();
977  if (periodMs > periodSleep)
978  {
979  unsigned int repeat = periodMs/periodSleep; // msec to sec
980  for (unsigned int i=0;i<repeat;++i)
981  {
982  Poco::Thread::sleep(periodSleep);
983  if (isTerminated())
984  return;
985  }
986  }
987  else
988  {
989  Poco::Thread::sleep(periodMs);
990  }
991  try
992  {
995 
996  if (isTerminated())
997  return;
998 
999  saveTasksQueue();
1000  }
1001  catch(Poco::Exception& e)
1002  {
1003  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::ERROR, e.displayText()), Poco::Message::Priority::PRIO_ERROR));
1004  }
1005  catch(std::exception& e)
1006  {
1007  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::ERROR, e.what()), Poco::Message::Priority::PRIO_ERROR));
1008  }
1009  }
1010 }
1011 //-----------------------------------------------------------------------------
1012 void DRCEAsyncTasksQueue::addSubtask(unsigned int taskId, unsigned int subtaskId, bool needCleanup)
1013 {
1014  Poco::Mutex::ScopedLock lock(mutex);
1015  subtasks[taskId][subtaskId] = needCleanup;
1016 }
1017 //-----------------------------------------------------------------------------
1018 void DRCEAsyncTasksQueue::setSubtaskCleanup(unsigned int subtaskId, bool needCleanup)
1019 {
1020  Poco::Mutex::ScopedLock lock(mutex);
1021  for (SubTasks::iterator iter=subtasks.begin();iter!=subtasks.end();++iter)
1022  {
1023  std::map<unsigned int, bool>::iterator it = (*iter).second.find(subtaskId);
1024  if (it!=(*iter).second.end())
1025  {
1026  (*it).second = needCleanup;
1027  break;
1028  }
1029  }
1030 }
1031 //-----------------------------------------------------------------------------
1032 bool DRCEAsyncTasksQueue::getSubtaskCleanup(unsigned int subtaskId)
1033 {
1034  Poco::Mutex::ScopedLock lock(mutex);
1035  bool needCleanup = false;
1036  for (SubTasks::iterator iter=subtasks.begin();iter!=subtasks.end();++iter)
1037  {
1038  std::map<unsigned int, bool>::iterator it = (*iter).second.find(subtaskId);
1039  if (it!=(*iter).second.end())
1040  {
1041  needCleanup = (*it).second;
1042  break;
1043  }
1044  }
1045  return needCleanup;
1046 }
1047 //-----------------------------------------------------------------------------
1049 {
1050  Poco::Mutex::ScopedLock lock(mutex);
1051  subtasks.erase(taskId);
1052 }
1053 //-----------------------------------------------------------------------------
1055 {
1056  Poco::Mutex::ScopedLock lock(mutex);
1057  SubTasks::iterator iter = subtasks.find(taskId);
1058  if (iter!=subtasks.end())
1059  {
1060  for (std::map<unsigned int, bool>::iterator iT=(*iter).second.begin();iT!=(*iter).second.end();++iT)
1061  {
1062  (*iT).second = true;
1063  }
1064  }
1065 }
1066 //-----------------------------------------------------------------------------
1067 DRCEAsyncTasksQueue::SubTasks::const_iterator DRCEAsyncTasksQueue::getSubtasksBegin(void)
1068 {
1069  Poco::Mutex::ScopedLock lock(mutex);
1070  return subtasks.begin();
1071 }
1072 //-----------------------------------------------------------------------------
1073 DRCEAsyncTasksQueue::SubTasks::const_iterator DRCEAsyncTasksQueue::getSubtasksEnd(void)
1074 {
1075  Poco::Mutex::ScopedLock lock(mutex);
1076  return subtasks.end();
1077 }
1078 //-----------------------------------------------------------------------------
1079 std::map<unsigned int, bool> DRCEAsyncTasksQueue::getSubtasks(unsigned int taskId)
1080 {
1081  Poco::Mutex::ScopedLock lock(mutex);
1082  std::map<unsigned int, bool> sub;
1083  SubTasks::iterator iter = subtasks.find(taskId);
1084  if (iter!=subtasks.end())
1085  {
1086  sub = (*iter).second;
1087  }
1088  return sub;
1089 }
1090 //-----------------------------------------------------------------------------
1091 //-----------------------------------------------------------------------------
1092 AsyncTaskLocker::AsyncTaskLocker(DRCEAsyncTasksQueue& asyncTasksQueue_, unsigned int taskId_)
1093 : asyncTasksQueue(asyncTasksQueue_), taskId(taskId_)
1094 {
1095  asyncTasksQueue.lockAsyncTask(taskId);
1096 }
1097 //-----------------------------------------------------------------------------
1099 : asyncTasksQueue(asyncTasksQueue_), taskId(asyncTask.taskId)
1100 {
1101  asyncTasksQueue.lockAsyncTask(asyncTask);
1102 }
1103 //-----------------------------------------------------------------------------
1105 {
1106  asyncTasksQueue.unlockAsyncTask(taskId);
1107 }
1108 //-----------------------------------------------------------------------------
1109 //-----------------------------------------------------------------------------
1110 } // end namespace drce
1111 } // end namespace HCE