hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DRCETaskShellHost.cpp
Go to the documentation of this file.
1 #include <fstream>
2 #include <list>
3 #include <sys/wait.h>
4 #include <Poco/SharedPtr.h>
5 #include <Poco/Timestamp.h>
6 #include <Poco/Logger.h>
7 #include <Poco/Thread.h>
8 #include <Poco/TaskManager.h>
9 #include <Poco/File.h>
10 
11 #include "CommandExecutor.hpp"
12 #include "DRCEMessageConst.hpp"
13 #include "DRCEError.hpp"
14 #include "DRCEFileStream.hpp"
15 #include "DRCEAsyncTasksQueue.hpp"
16 #include "DRCETaskShellHost.hpp"
18 #include "DRCEFileExtractor.hpp"
20 #include "DRCECleanupTask.hpp"
21 
22 namespace HCE
23 {
24 namespace drce
25 {
26 //-----------------------------------------------------------------------------
27 DRCETaskShellHost::DRCETaskShellHost(const std::string& taskName_, DRCENodeOptions& nodeOptions_, CustomMessage& message_,
28  const DRCETaskRequestSetExecute& taskRequestSetExecute_, DRCEAsyncTasksQueue& asyncTasksQueue_,
29  DRCEResourceMonitor& resourceMonitor_, const DRCEInputJsonMessage& inputJsonMessage_, unsigned int parentTaskId_)
30 : inherited(taskName_, nodeOptions_, message_, taskRequestSetExecute_, asyncTasksQueue_, resourceMonitor_, inputJsonMessage_, parentTaskId_)
31 {
32 }
33 //-----------------------------------------------------------------------------
34 // cppcheck-suppress unusedFunction
36 {
37  if (isCancelled())
38  return;
39 
40  Poco::Timestamp now;
41  std::time_t timeStart = now.epochTime();
42 
43  DRCEResultDataItem resultDataItem = DRCETaskRequestSetExecuteExecutor::getResultDataItem(inputJsonMessage, nodeOptions, DRCETaskRequest::TaskState::SET_AS_NEW);
44  AsyncTasks asyncTask(0, parentTaskId, 0, taskRequestSetExecute.getSessionOptions().timeMax, timeStart, 0, DRCETaskRequest::TaskState::SET_AS_NEW);
46 
47  try
48  {
49  unsigned int taskId = std::stoul(name());
50 
54  for (size_t i=0;i<subtasks.size();++i)
55  {
56  asyncTasksQueue.addSubtask(taskId, subtasks[i].first, (subtasks[i].second==SessionOptions::CleanupFlag::cfDelete));
57  }
58 
60  asyncTask.taskId = taskId;
61  asyncTask.parentTaskId = parentTaskId;
62  asyncTasksQueue.safeChangeState(std::forward<AsyncTasks>(AsyncTasks(asyncTask)), &resultDataItem);
63 
64  // write to tasks log information about appear new task
66 
67  asyncTask.state = DRCETaskRequest::TaskState::IN_PROGRESS;
68  asyncTasksQueue.safeChangeState(std::forward<AsyncTasks>(AsyncTasks(asyncTask)), &resultDataItem);
69 
70  ApplyPidHandler applyPidHandler(asyncTasksQueue, asyncTask, DRCETaskRequest::TaskState::IN_PROGRESS, SessionOptions::ThreadMode::tmAsync);
71 
72  resultDataItem = execute(applyPidHandler);
73 
74  if (!isCancelled())
75  {
76  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, "DRCETaskShellHost::runTask before executeSubtasks()", Poco::Message::Priority::PRIO_TRACE));
77  // block running subtasks
78  executeSubtasks(resultDataItem);
79 
80  if (!isCancelled())
81  {
82  for (size_t i=0;i<resultDataItem.getSubtasksCount();++i)
83  {
85  }
86  }
87  }
88  // save values lenght of stdout and time execution (for resource monitor)
90  resourceMonitor.addTimeAsyncTask(resultDataItem.getTime());
91  // increment count of fail sync tasks
92  if (resultDataItem.getErrorCode()!=NO_ERROR)
94 
95  std::stringstream outMsg;
96  outMsg << "DRCETaskShellHost::runTask isCancelled = " << std::boolalpha << isCancelled();
97  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
98 
99  if (!isCancelled())
100  {
101  asyncTask.state = DRCETaskRequest::TaskState::FINISHED;
102  asyncTask.timeElapsed = DRCECommonTask::getElapsedTimeMsec(Poco::Timestamp::fromEpochTime(asyncTask.timeStart));
103  asyncTasksQueue.safeChangeState(std::forward<AsyncTasks>(AsyncTasks(asyncTask)), &resultDataItem);
104 
105  outMsg.str("");
106  outMsg << ">>>>> FINISHED (" << taskId << ")";
107  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
108  }
109  else
110  {
111  asyncTask.state = DRCETaskRequest::TaskState::TERMINATED;
112  asyncTask.timeElapsed = DRCECommonTask::getElapsedTimeMsec(Poco::Timestamp::fromEpochTime(asyncTask.timeStart));
113 
114  outMsg.str("");
115  outMsg << ">>>>> TERMINATED (" << taskId << ")\n";
116  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
117 
119  if (statusFile.exists())
120  {
121  unsigned int errorCode = NO_ERROR;
122  unsigned int errorCodeMainTask = NO_ERROR;
123  std::string errorMessage;
124  DRCETaskRequest::TaskState taskState = DRCETaskRequest::TaskState::TERMINATED;
125 
127  if (resultItem.getErrorCode() == ERROR_TERMINATE_EXPIRED_TASK)
128  {
130  errorCodeMainTask = ERROR_TERMINATE_EXPIRED_TASK;
131  errorMessage = message(message_const::TASK_TIME_EXPIRED, asyncTask.timeMax);
132  taskState = DRCETaskRequest::TaskState::TERMINATED_BY_TTL;
133  }
134  else
135  {
136  errorCode = resultItem.getErrorCode();
137  errorCodeMainTask = resultItem.getErrorCode();
138  errorMessage = resultItem.getErrorMessage();
139  taskState = static_cast<DRCETaskRequest::TaskState>(resultItem.getState());
140  }
141 
143  DRCETaskRequest::RequestType::rtSetTaskExecute, taskState,
144  errorCode, errorMessage, asyncTask.pid, asyncTask.timeElapsed);
145  resultDataItem.setErrorCode(errorCodeMainTask);
146  resultDataItem.setTime(asyncTask.timeElapsed);
147  }
148  }
149 
152 
154  if (!getParentTaskId())
155  {
156 
157  outMsg.str("");
158  outMsg << "DRCETaskShellHost::runTask getSubtaskCleanup(" << taskId << ") = " << std::boolalpha << asyncTasksQueue.getSubtaskCleanup(taskId);
159  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
160 
162  std::map<unsigned int, bool> sub = asyncTasksQueue.getSubtasks(taskId);
163 
164  for (std::map<unsigned int, bool>::iterator iter=sub.begin();iter!=sub.end();++iter)
165  {
166  outMsg.str("");
167  outMsg << "DRCETaskShellHost::runTask cleanup() taskId: " << (*iter).first << " cleanup: " << std::boolalpha << (*iter).second;
168  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
169 
170  DRCECommonTask::cleanup((*iter).first, nodeOptions, message, (*iter).second);
171  }
172  }
173  }
174  catch(Poco::Exception& e)
175  {
176  resultDataItem.setErrorCode(e.code());
177  resultDataItem.setErrorMessage(e.message());
178  asyncTask.state = DRCETaskRequest::TaskState::CRASHED;
179  asyncTask.timeElapsed = DRCECommonTask::getElapsedTimeMsec(Poco::Timestamp::fromEpochTime(asyncTask.timeStart));
180  if (asyncTasksQueue.isExistAsyncTask(asyncTask.taskId))
181  asyncTasksQueue.safeChangeState(std::forward<AsyncTasks>(AsyncTasks(asyncTask)), &resultDataItem, e.message(), e.code());
183  }
184  catch(std::exception& e)
185  {
186  resultDataItem.setErrorCode(ERROR_EXECUTE_TASK);
187  resultDataItem.setErrorMessage(e.what());
188  asyncTask.state = DRCETaskRequest::TaskState::CRASHED;
189  asyncTask.timeElapsed = DRCECommonTask::getElapsedTimeMsec(Poco::Timestamp::fromEpochTime(asyncTask.timeStart));
190  if (asyncTasksQueue.isExistAsyncTask(asyncTask.taskId))
191  asyncTasksQueue.safeChangeState(std::forward<AsyncTasks>(AsyncTasks(asyncTask)), &resultDataItem, e.what(), ERROR_COMMAND_EXECUTION);
193  }
194 
195  if (asyncTask.state != DRCETaskRequest::TaskState::TERMINATED)
196  {
197  // write to tasks log information about finished state of task
199  }
200 }
201 //-----------------------------------------------------------------------------
203 {
204  saveRequestData();
205 
206  Poco::Timestamp tsStart;
207  makeFilesBefore(taskRequestSetExecute);
208 
209  DRCEResultDataItem resultDataItem;
210  Poco::SharedPtr<CommandResultData> pCommandResultData = nullptr;
211  if (!taskRequestSetExecute.getCommandLine().empty())
212  {
213  Command command(taskRequestSetExecute.getCommandLine(),
214  taskRequestSetExecute.getSessionOptions().homeDir,
215  taskRequestSetExecute.getInputStream(),
216  taskRequestSetExecute.getSessionOptions().environments,
217  taskRequestSetExecute.getSessionOptions().shellName);
218 
219  CommandExecutor commandExecutor;
220  pCommandResultData = commandExecutor.execute(command, fn, taskRequestSetExecute.getSessionOptions().timeMax);
221 
222  if (pCommandResultData.isNull())
223  throw Poco::Exception(message(message_const::EXECUTE_SHELL_HOST_COMMAND_FAIL));
224 
225  resultDataItem.setStdoutStream(pCommandResultData->getOutStream().str());
226  resultDataItem.setStderrStream(pCommandResultData->getErrStream().str());
227  resultDataItem.setExitStatus(pCommandResultData->getExitStatus());
228  resultDataItem.setPid(pCommandResultData->getProcessId());
229 
230  if (commandExecutor.getStatusExecution()==StarterExecutor::StatusExecution::seTerminated)
231  {
232  resultDataItem.setState(static_cast<unsigned int>(DRCETaskRequest::TaskState::TERMINATED));
233  resultDataItem.setErrorMessage(message(message_const::TASK_TIME_EXPIRED));
235  this->cancel();
236  }
237  else
238  resultDataItem.setState(static_cast<unsigned int>(DRCETaskRequest::TaskState::FINISHED));
239  }
240  else
241  resultDataItem.setState(static_cast<unsigned int>(DRCETaskRequest::TaskState::FINISHED));
242 
243  resultDataItem.setRequestId(taskRequestSetExecute.getTaskId());
244  resultDataItem.setRequestType(DRCETaskRequest::RequestType::rtSetTaskExecute);
245  resultDataItem.setNodeName(nodeOptions.getNodeName());
246  resultDataItem.setNodeHost(nodeOptions.getNodeHost());
247  resultDataItem.setNodePort(nodeOptions.getNodePort());
248 
249  makeFilesAfter(taskRequestSetExecute, resultDataItem);
250 
251  if (!pCommandResultData.isNull())
252  {
253  if (pCommandResultData->getIsError())
254  throw Poco::Exception(message(message_const::SHELL_HOST_COMMAND_RETURN_ERROR)+
255  message(message_const::ERROR, pCommandResultData->getErrStream().str()), ERROR_COMMAND_EXECUTION);
256  }
257 
258  // fill spent time value
259  resultDataItem.setTime(getElapsedTimeMsec(tsStart));
260 
261  return resultDataItem;
262 }
263 //-----------------------------------------------------------------------------
264 //-----------------------------------------------------------------------------
265 } // end namespace drce
266 } // end namespace HCE