3 #include <Poco/Timestamp.h>
4 #include <Poco/Logger.h>
24 :
inherited(asyncTaskQueue_, nodeOptions_, message_, resourceMonitor_)
42 if (!pTaskRequestTerminate)
48 resultDataItem.
setState(static_cast<unsigned int>(DRCETaskRequest::TaskState::NOT_FOUND));
52 if (!loadStatusFile(asyncTaskQueue, nodeOptions, message, resultDataItem))
58 std::stringstream outMsg;
59 outMsg <<
"Request terminate taskId: " << pTaskRequestTerminate->
getTaskId();
60 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
67 for (DRCECommonTask::SubtasksList::reverse_iterator iter=subtasks.rbegin();iter!=subtasks.rend();++iter)
69 asyncTaskQueue.
addSubtask(pTaskRequestTerminate->
getTaskId(), (*iter).first, (pTaskRequestTerminate->
getCleanupFlag()==DRCETaskRequestTerminate::CleanupFlag::cfDelete));
71 terminateTask((*iter).first, *pTaskRequestTerminate, asyncTaskQueue, nodeOptions, message, resultDataItem, errorMsg, errorCode, taskState);
74 if (pTaskRequestTerminate->
getCleanupFlag()==DRCETaskRequestTerminate::CleanupFlag::cfDelete)
80 if (statusFile.exists())
82 unsigned int errorCodeMainTask =
errorCode;
91 taskState = DRCETaskRequest::TaskState::TERMINATED_BY_TTL;
95 DRCETaskRequest::RequestType::rtSetTaskExecute, taskState,
96 errorCode, errorMessage, 0, asyncTask.timeElapsed, resultDataItem.
getExitStatus());
103 std::map<unsigned int, bool> sub = asyncTaskQueue.
getSubtasks(pTaskRequestTerminate->
getTaskId());
105 for (std::map<unsigned int, bool>::iterator iter=sub.begin();iter!=sub.end();++iter)
108 outMsg <<
"DRCETaskRequestTerminateExecutor::executeTerminate cleanup() taskId: " << (*iter).first <<
" cleanup: " << std::boolalpha << (*iter).second;
109 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
114 catch(Poco::Exception& e)
120 asyncTask.state = DRCETaskRequest::TaskState::CRASHED;
122 asyncTaskQueue.
safeChangeState(std::forward<AsyncTasks>(asyncTask), &resultDataItem, e.message(), e.code());
124 catch(std::exception& e)
130 asyncTask.state = DRCETaskRequest::TaskState::CRASHED;
134 return resultDataItem;
137 void DRCETaskRequestTerminateExecutor::executeDefaultAlgorithm(DRCETaskRequestTerminate& taskRequestTerminate, pid_t
pid, DRCEAsyncTasksQueue& asyncTaskQueue,
138 DRCENodeOptions& nodeOptions, CustomMessage& message, DRCEResultDataItem& resultDataItem)
throw (Poco::Exception)
146 Poco::Thread::sleep(taskRequestTerminate.getDelayValue());
149 for (
unsigned int i=0;i<taskRequestTerminate.getRepeatValue();++i)
151 if (kill(
pid, SIGKILL) != 0)
152 errorMsg = strerror(errno);
154 Poco::Thread::sleep(taskRequestTerminate.getDelayValue());
158 if (!errorMsg.empty())
160 if (!loadStatusFile(asyncTaskQueue, nodeOptions, message, resultDataItem))
169 void DRCETaskRequestTerminateExecutor::executeCustomAlgorithm(DRCETaskRequestTerminate& taskRequestTerminate, pid_t
pid, DRCEAsyncTasksQueue& asyncTaskQueue,
170 DRCENodeOptions& nodeOptions, CustomMessage& message, DRCEResultDataItem& resultDataItem)
throw (Poco::Exception)
175 for (
unsigned int i=0;i<taskRequestTerminate.getRepeatValue();++i)
177 if (kill(
pid, taskRequestTerminate.getSignalValue()) != 0)
178 errorMsg = strerror(errno);
180 Poco::Thread::sleep(taskRequestTerminate.getDelayValue());
184 if (!errorMsg.empty())
186 if (!loadStatusFile(asyncTaskQueue, nodeOptions, message, resultDataItem))
194 bool DRCETaskRequestTerminateExecutor::loadStatusFile(DRCEAsyncTasksQueue& asyncTaskQueue,
195 DRCENodeOptions& nodeOptions,
196 CustomMessage& message,
197 DRCEResultDataItem& resultDataItem)
201 if (statusFile.exists())
205 AsyncTaskLocker lock(asyncTaskQueue, resultDataItem.getRequestId());
206 DataFileExtractor dataFileExtractor(message);
207 resultDataItem = dataFileExtractor.extract(statusFile.path());
210 catch(Poco::Exception& e)
212 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
218 void DRCETaskRequestTerminateExecutor::terminateTask(
unsigned int taskId,
219 DRCETaskRequestTerminate& taskRequestTerminate,
220 DRCEAsyncTasksQueue& asyncTaskQueue,
221 DRCENodeOptions& nodeOptions,
222 CustomMessage& message,
223 DRCEResultDataItem& resultDataItem,
224 const std::string& errorMsg,
225 unsigned int errorCode,
228 if (asyncTaskQueue.isExistAsyncTask(
taskId))
230 AsyncTasks asyncTask(asyncTaskQueue.getAsyncTask(
taskId));
231 resultDataItem.setRequestId(
taskId);
232 resultDataItem.setPid(asyncTask.pid);
233 asyncTask.timeElapsed = Poco::Timestamp::fromEpochTime(asyncTask.timeStart).elapsed()/1000;
236 switch(taskRequestTerminate.getAlgorithmType())
238 case DRCETaskRequestTerminate::AlgorithmType::atDefault:
240 executeDefaultAlgorithm(taskRequestTerminate, asyncTask.pid, asyncTaskQueue, nodeOptions, message, resultDataItem);
243 case DRCETaskRequestTerminate::AlgorithmType::atCustom:
245 if (taskRequestTerminate.getSignalValue() == 0)
247 executeCustomAlgorithm(taskRequestTerminate, asyncTask.pid, asyncTaskQueue, nodeOptions, message, resultDataItem);
254 std::stringstream outMsg;
255 outMsg <<
"descendants count = " << descendants.size() <<
" (";
256 for (
size_t i=0;i<descendants.size();++i)
260 outMsg << descendants[i];
261 kill(descendants[i], SIGKILL);
263 outMsg <<
") has been killed...";
264 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
266 resultDataItem.setTime(asyncTask.timeElapsed);
267 asyncTask.state = taskState;
268 asyncTaskQueue.safeChangeState(std::forward<AsyncTasks>(asyncTask), &resultDataItem, errorMsg, errorCode);
274 std::stringstream outMsg;
275 outMsg <<
"Task for terminate [ " <<
taskId <<
" ] alredy not exist in async queue";
276 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
280 DRCEResultDataItem DRCETaskRequestTerminateExecutor::makeResultDataItem(DRCEInputJsonMessage& inputJsonMessage,
unsigned int exitStatus)
285 DRCETaskRequest::RequestType::rtTerminateTask,
286 DRCETaskRequest::TaskState::TERMINATED,
294 void DRCETaskRequestTerminateExecutor::prepareCleanup(
unsigned int taskId, DRCEInputJsonMessage& inputJsonMessage)
296 if (taskId == inputJsonMessage.getRequestId())
298 Poco::SharedPtr<DRCETaskRequest> pTaskRequest = inputJsonMessage.getTaskRequest();
299 DRCETaskRequestSetExecute* pTaskRequestSetExecute =
dynamic_cast<DRCETaskRequestSetExecute*
>(pTaskRequest.get());
300 if (pTaskRequestSetExecute)
302 SessionOptions
sessionOptions = pTaskRequestSetExecute->getSessionOptions();
303 sessionOptions.cleanup = SessionOptions::CleanupFlag::cfDelete;
304 pTaskRequestSetExecute->setSessionOptions(std::forward<SessionOptions>(sessionOptions));
305 pTaskRequest = pTaskRequestSetExecute;
306 inputJsonMessage.setTaskRequest(pTaskRequest);
311 for (
size_t i=0;i<inputJsonMessage.getSubtasksCount();++i)
313 DRCEInputJsonMessage inputMessage(inputJsonMessage.getSubtaskItem(i));
314 prepareCleanup(taskId, inputMessage);
330 DRCETaskRequest::TaskState::TERMINATED_BY_TTL);
338 taskRequestTerminate.
setAlgorithmType(DRCETaskRequestTerminate::AlgorithmType::atCustom);
342 taskRequestTerminate.
setCleanupFlag(DRCETaskRequestTerminate::CleanupFlag::cfNotDelete);