hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DRCETaskRequestTerminateExecutor.cpp
Go to the documentation of this file.
1 #include <signal.h>
2 #include <sys/wait.h>
3 #include <Poco/Timestamp.h>
4 #include <Poco/Logger.h>
5 #include <Poco/File.h>
6 
7 #include "Process.hpp"
10 #include "DRCEFileExtractor.hpp"
11 #include "DRCEFileBuilder.hpp"
12 #include "DRCEFileStream.hpp"
13 #include "DRCECleanupTask.hpp"
14 #include "DRCECommonTask.hpp"
15 #include "DRCEReadProcessData.hpp"
16 
17 namespace HCE
18 {
19 namespace drce
20 {
21 //-----------------------------------------------------------------------------
23  DRCENodeOptions& nodeOptions_, CustomMessage& message_, DRCEResourceMonitor& resourceMonitor_)
24 : inherited(asyncTaskQueue_, nodeOptions_, message_, resourceMonitor_)
25 {
26 }
27 //-----------------------------------------------------------------------------
29 {
30  return executeTerminate(pTaskRequest, inputJsonMessage, asyncTaskQueue, nodeOptions, message);
31 }
32 //-----------------------------------------------------------------------------
34  DRCEAsyncTasksQueue& asyncTaskQueue, DRCENodeOptions& nodeOptions, CustomMessage& message, const std::string& errorMsg, unsigned int errorCode,
36 {
37  DRCEResultDataItem resultDataItem = getResultDataItem(*pTaskRequest, nodeOptions, DRCETaskRequest::TaskState::UNDEFINED);
38 
39  try
40  {
41  DRCETaskRequestTerminate* pTaskRequestTerminate = dynamic_cast<DRCETaskRequestTerminate*>(pTaskRequest);
42  if (!pTaskRequestTerminate)
44 
45  if (pTaskRequestTerminate->getDelayValue() == 0 || pTaskRequestTerminate->getRepeatValue() == 0)
47 
48  resultDataItem.setState(static_cast<unsigned int>(DRCETaskRequest::TaskState::NOT_FOUND));
49 
50  if (!asyncTaskQueue.isExistAsyncTask(pTaskRequestTerminate->getTaskId()))
51  {
52  if (!loadStatusFile(asyncTaskQueue, nodeOptions, message, resultDataItem))
54  }
55 
56  AsyncTasks asyncTask = asyncTaskQueue.getAsyncTask(pTaskRequestTerminate->getTaskId());
57 
58  std::stringstream outMsg;
59  outMsg << "Request terminate taskId: " << pTaskRequestTerminate->getTaskId();
60  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
61 
63  DRCEInputJsonMessage inputMessage = DRCECommonTask::getRequestData(pTaskRequestTerminate->getTaskId(), nodeOptions, message);
64  DRCECommonTask::getSubtasksList(inputMessage, subtasks);
65 
66  // below block code need to checking
67  for (DRCECommonTask::SubtasksList::reverse_iterator iter=subtasks.rbegin();iter!=subtasks.rend();++iter)
68  {
69  asyncTaskQueue.addSubtask(pTaskRequestTerminate->getTaskId(), (*iter).first, (pTaskRequestTerminate->getCleanupFlag()==DRCETaskRequestTerminate::CleanupFlag::cfDelete));
70  asyncTaskQueue.cancelTask((*iter).first);
71  terminateTask((*iter).first, *pTaskRequestTerminate, asyncTaskQueue, nodeOptions, message, resultDataItem, errorMsg, errorCode, taskState);
72  }
73 
74  if (pTaskRequestTerminate->getCleanupFlag()==DRCETaskRequestTerminate::CleanupFlag::cfDelete)
75  {
76  asyncTaskQueue.setAllSubtaskCleanup(pTaskRequestTerminate->getTaskId());
77  }
78 
79  Poco::File statusFile(FileStream::getStatusFileName(nodeOptions.getTasksStatusDir(), pTaskRequestTerminate->getTaskId()));
80  if (statusFile.exists())
81  {
82  unsigned int errorCodeMainTask = errorCode;
83  std::string errorMessage = errorMsg;
84 
85  DRCEResultDataItem resultItem = DRCECommonTask::extractStatusResultDataItem(pTaskRequestTerminate->getTaskId(), nodeOptions, asyncTaskQueue);
86  if (resultItem.getErrorCode() == ERROR_TERMINATE_EXPIRED_TASK)
87  {
89  errorCodeMainTask = ERROR_TERMINATE_EXPIRED_TASK;
90  errorMessage = message(message_const::TASK_TIME_EXPIRED, asyncTask.timeMax);
91  taskState = DRCETaskRequest::TaskState::TERMINATED_BY_TTL;
92  }
93 
94  resultDataItem = DRCECommonTask::makeResultDataItem(inputMessage, nodeOptions, message,
95  DRCETaskRequest::RequestType::rtSetTaskExecute, taskState,
96  errorCode, errorMessage, 0, asyncTask.timeElapsed, resultDataItem.getExitStatus());
97  resultDataItem.setErrorCode(errorCodeMainTask);
98 
99  DRCECommonTask::saveResultData(resultDataItem, nodeOptions, message, asyncTaskQueue);
100  }
101 
103  std::map<unsigned int, bool> sub = asyncTaskQueue.getSubtasks(pTaskRequestTerminate->getTaskId());
104 
105  for (std::map<unsigned int, bool>::iterator iter=sub.begin();iter!=sub.end();++iter)
106  {
107  outMsg.str("");
108  outMsg << "DRCETaskRequestTerminateExecutor::executeTerminate cleanup() taskId: " << (*iter).first << " cleanup: " << std::boolalpha << (*iter).second;
109  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
110 
111  DRCECommonTask::cleanup((*iter).first, nodeOptions, message, (*iter).second);
112  }
113  }
114  catch(Poco::Exception& e)
115  {
116  resultDataItem.setErrorCode(e.code());
117  resultDataItem.setErrorMessage(e.message());
118  AsyncTasks asyncTask(asyncTaskQueue.getAsyncTask(pTaskRequest->getTaskId()));
119  asyncTask.timeElapsed = DRCECommonTask::getElapsedTimeMsec(Poco::Timestamp::fromEpochTime(asyncTask.timeStart));
120  asyncTask.state = DRCETaskRequest::TaskState::CRASHED;
121  if (asyncTaskQueue.isExistAsyncTask(pTaskRequest->getTaskId()))
122  asyncTaskQueue.safeChangeState(std::forward<AsyncTasks>(asyncTask), &resultDataItem, e.message(), e.code());
123  }
124  catch(std::exception& e)
125  {
126  resultDataItem.setErrorCode(ERROR_EXECUTE_TASK);
127  resultDataItem.setErrorMessage(e.what());
128  AsyncTasks asyncTask(asyncTaskQueue.getAsyncTask(pTaskRequest->getTaskId()));
129  asyncTask.timeElapsed = DRCECommonTask::getElapsedTimeMsec(Poco::Timestamp::fromEpochTime(asyncTask.timeStart));
130  asyncTask.state = DRCETaskRequest::TaskState::CRASHED;
131  if (asyncTaskQueue.isExistAsyncTask(pTaskRequest->getTaskId()))
132  asyncTaskQueue.safeChangeState(std::forward<AsyncTasks>(asyncTask), &resultDataItem, std::string(e.what()), ERROR_EXECUTE_TASK);
133  }
134  return resultDataItem;
135 }
136 //-----------------------------------------------------------------------------
137 void DRCETaskRequestTerminateExecutor::executeDefaultAlgorithm(DRCETaskRequestTerminate& taskRequestTerminate, pid_t pid, DRCEAsyncTasksQueue& asyncTaskQueue,
138  DRCENodeOptions& nodeOptions, CustomMessage& message, DRCEResultDataItem& resultDataItem) throw (Poco::Exception)
139 {
140  if (pid)
141  {
142  kill(pid, SIGTERM);
145  {
146  Poco::Thread::sleep(taskRequestTerminate.getDelayValue());
147 
148  std::string errorMsg;
149  for (unsigned int i=0;i<taskRequestTerminate.getRepeatValue();++i)
150  {
151  if (kill(pid, SIGKILL) != 0)
152  errorMsg = strerror(errno);
154  Poco::Thread::sleep(taskRequestTerminate.getDelayValue());
155  else
156  break;
157  }
158  if (!errorMsg.empty())
159  {
160  if (!loadStatusFile(asyncTaskQueue, nodeOptions, message, resultDataItem))
161  throw Poco::Exception(message(message_const::PROCESS_WAS_NOT_STOPPED, errorMsg)+
163  }
165  }
166  }
167 }
168 //-----------------------------------------------------------------------------
169 void DRCETaskRequestTerminateExecutor::executeCustomAlgorithm(DRCETaskRequestTerminate& taskRequestTerminate, pid_t pid, DRCEAsyncTasksQueue& asyncTaskQueue,
170  DRCENodeOptions& nodeOptions, CustomMessage& message, DRCEResultDataItem& resultDataItem) throw (Poco::Exception)
171 {
172  if (pid)
173  {
174  std::string errorMsg;
175  for (unsigned int i=0;i<taskRequestTerminate.getRepeatValue();++i)
176  {
177  if (kill(pid, taskRequestTerminate.getSignalValue()) != 0)
178  errorMsg = strerror(errno);
180  Poco::Thread::sleep(taskRequestTerminate.getDelayValue());
181  else
182  break;
183  }
184  if (!errorMsg.empty())
185  {
186  if (!loadStatusFile(asyncTaskQueue, nodeOptions, message, resultDataItem))
187  throw Poco::Exception(message(message_const::PROCESS_WAS_NOT_STOPPED, errorMsg)+
189  }
190  resultDataItem.setExitStatus(HCE::drce::drce_const::EXIT_STATUS_BASE+taskRequestTerminate.getSignalValue());
191  }
192 }
193 //-----------------------------------------------------------------------------
194 bool DRCETaskRequestTerminateExecutor::loadStatusFile(DRCEAsyncTasksQueue& asyncTaskQueue,
195  DRCENodeOptions& nodeOptions,
196  CustomMessage& message,
197  DRCEResultDataItem& resultDataItem)
198 {
199  bool result = false;
200  Poco::File statusFile(FileStream::getStatusFileName(nodeOptions.getTasksStatusDir(), resultDataItem.getRequestId()));
201  if (statusFile.exists())
202  {
203  try
204  {
205  AsyncTaskLocker lock(asyncTaskQueue, resultDataItem.getRequestId());
206  DataFileExtractor dataFileExtractor(message);
207  resultDataItem = dataFileExtractor.extract(statusFile.path());
208  result = true;
209  }
210  catch(Poco::Exception& e)
211  {
212  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
213  }
214  }
215  return result;
216 }
217 //-----------------------------------------------------------------------------
218 void DRCETaskRequestTerminateExecutor::terminateTask(unsigned int taskId,
219  DRCETaskRequestTerminate& taskRequestTerminate,
220  DRCEAsyncTasksQueue& asyncTaskQueue,
221  DRCENodeOptions& nodeOptions,
222  CustomMessage& message,
223  DRCEResultDataItem& resultDataItem,
224  const std::string& errorMsg,
225  unsigned int errorCode,
226  DRCETaskRequest::TaskState taskState) throw (Poco::Exception)
227 {
228  if (asyncTaskQueue.isExistAsyncTask(taskId))
229  {
230  AsyncTasks asyncTask(asyncTaskQueue.getAsyncTask(taskId));
231  resultDataItem.setRequestId(taskId);
232  resultDataItem.setPid(asyncTask.pid);
233  asyncTask.timeElapsed = Poco::Timestamp::fromEpochTime(asyncTask.timeStart).elapsed()/1000;
234 
235  std::vector<pid_t> descendants = DRCEReadProcessData::getAllProcessIds(asyncTask.pid);
236  switch(taskRequestTerminate.getAlgorithmType())
237  {
238  case DRCETaskRequestTerminate::AlgorithmType::atDefault:
239  {
240  executeDefaultAlgorithm(taskRequestTerminate, asyncTask.pid, asyncTaskQueue, nodeOptions, message, resultDataItem);
241  }
242  break;
243  case DRCETaskRequestTerminate::AlgorithmType::atCustom:
244  {
245  if (taskRequestTerminate.getSignalValue() == 0)
246  throw Poco::Exception(message(message_const::BAD_COMMAND_PARAMS), ERROR_BAD_COMMAND_PARAMS);
247  executeCustomAlgorithm(taskRequestTerminate, asyncTask.pid, asyncTaskQueue, nodeOptions, message, resultDataItem);
248  }
249  break;
250  default: throw Poco::Exception(message(message_const::WRONG_ALGORITHM_TYPE,
251  static_cast<unsigned int>(taskRequestTerminate.getAlgorithmType())), ERROR_BAD_COMMAND_PARAMS);
252  }
253 
254  std::stringstream outMsg;
255  outMsg << "descendants count = " << descendants.size() << " (";
256  for (size_t i=0;i<descendants.size();++i)
257  {
258  if (i!=0)
259  outMsg << ", ";
260  outMsg << descendants[i];
261  kill(descendants[i], SIGKILL);
262  }
263  outMsg << ") has been killed...";
264  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
265 
266  resultDataItem.setTime(asyncTask.timeElapsed);
267  asyncTask.state = taskState; //((errorCode == ERROR_TERMINATE_EXPIRED_TASK)?DRCETaskRequest::TaskState::TERMINATED_BY_TTL:DRCETaskRequest::TaskState::TERMINATED);
268  asyncTaskQueue.safeChangeState(std::forward<AsyncTasks>(asyncTask), &resultDataItem, errorMsg, errorCode);
269  // write to tasks log information about finished state of task
270  asyncTaskQueue.writeToTasksLog(drce_const::DRCE_TASKS_LOGGER_DELIMITER, drce_const::DRCE_TASKS_LOGGER_DELETED, asyncTask);
271  }
272  else
273  {
274  std::stringstream outMsg;
275  outMsg << "Task for terminate [ " << taskId << " ] alredy not exist in async queue";
276  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
277  }
278 }
279 //-----------------------------------------------------------------------------
280 DRCEResultDataItem DRCETaskRequestTerminateExecutor::makeResultDataItem(DRCEInputJsonMessage& inputJsonMessage, unsigned int exitStatus)
281 {
282  return DRCECommonTask::makeResultDataItem(inputJsonMessage,
283  nodeOptions,
284  message,
285  DRCETaskRequest::RequestType::rtTerminateTask,
286  DRCETaskRequest::TaskState::TERMINATED,
287  NO_ERROR,
288  "",
289  0,
290  0,
291  exitStatus);
292 }
293 //-----------------------------------------------------------------------------
294 void DRCETaskRequestTerminateExecutor::prepareCleanup(unsigned int taskId, DRCEInputJsonMessage& inputJsonMessage)
295 {
296  if (taskId == inputJsonMessage.getRequestId())
297  {
298  Poco::SharedPtr<DRCETaskRequest> pTaskRequest = inputJsonMessage.getTaskRequest();
299  DRCETaskRequestSetExecute* pTaskRequestSetExecute = dynamic_cast<DRCETaskRequestSetExecute*>(pTaskRequest.get());
300  if (pTaskRequestSetExecute)
301  {
302  SessionOptions sessionOptions = pTaskRequestSetExecute->getSessionOptions();
303  sessionOptions.cleanup = SessionOptions::CleanupFlag::cfDelete;
304  pTaskRequestSetExecute->setSessionOptions(std::forward<SessionOptions>(sessionOptions));
305  pTaskRequest = pTaskRequestSetExecute;
306  inputJsonMessage.setTaskRequest(pTaskRequest);
307  }
308  }
309  else
310  {
311  for (size_t i=0;i<inputJsonMessage.getSubtasksCount();++i)
312  {
313  DRCEInputJsonMessage inputMessage(inputJsonMessage.getSubtaskItem(i));
314  prepareCleanup(taskId, inputMessage);
315  }
316  }
317 }
318 //-----------------------------------------------------------------------------
320  DRCEAsyncTasksQueue& asyncTaskQueue,
321  DRCENodeOptions& nodeOptions,
322  CustomMessage& message)
323 {
325  asyncTaskQueue,
326  nodeOptions,
327  message,
328  message(message_const::TASK_TIME_EXPIRED, asyncTask.timeMax),
330  DRCETaskRequest::TaskState::TERMINATED_BY_TTL);
331 }
332 //-----------------------------------------------------------------------------
333 void DRCETaskRequestTerminateExecutor::executeTerminate(unsigned int taskId, DRCEAsyncTasksQueue& asyncTaskQueue, DRCENodeOptions& nodeOptions, CustomMessage& message,
334  const std::string& errorMsg, unsigned int errorCode, DRCETaskRequest::TaskState taskState)
335 {
336  DRCETaskRequestTerminate taskRequestTerminate;
337  taskRequestTerminate.setTaskId(taskId);
338  taskRequestTerminate.setAlgorithmType(DRCETaskRequestTerminate::AlgorithmType::atCustom);
339  taskRequestTerminate.setDelayValue(100);
340  taskRequestTerminate.setRepeatValue(3);
341  taskRequestTerminate.setSignalValue(9);
342  taskRequestTerminate.setCleanupFlag(DRCETaskRequestTerminate::CleanupFlag::cfNotDelete);
343  DRCEInputJsonMessage inputJsonMessage;
344  DRCETaskRequestTerminateExecutor::executeTerminate(&taskRequestTerminate, inputJsonMessage, asyncTaskQueue, nodeOptions, message, errorMsg, errorCode, taskState);
345 }
346 //-----------------------------------------------------------------------------
347 //-----------------------------------------------------------------------------
348 } // end namespace drce
349 } // end namespace HCE