hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DRCETaskRequestSetExecuteExecutor.cpp
Go to the documentation of this file.
1 #include <fstream>
2 #include <sys/wait.h>
3 #include <iostream>
4 #include <Poco/AutoPtr.h>
5 #include <Poco/Logger.h>
6 #include <Poco/File.h>
7 
8 #include "DRCEError.hpp"
9 #include "DRCEMessageConst.hpp"
12 #include "DRCETaskFactory.hpp"
13 #include "DRCETaskRequest.hpp"
14 #include "DRCETaskShellHost.hpp"
15 #include "DRCETaskSsh.hpp"
16 #include "DRCEFileBuilder.hpp"
17 #include "StarterExecutor.hpp"
18 #include "DRCEFileExtractor.hpp"
19 #include "DRCEFileStream.hpp"
20 
21 #include "CommandExecutor.hpp"
22 
23 namespace HCE
24 {
25 namespace drce
26 {
27 //-----------------------------------------------------------------------------
29  DRCENodeOptions& nodeOptions_, CustomMessage& message_, DRCEResourceMonitor& resourceMonitor_)
30 : inherited(asyncTaskQueue_, nodeOptions_, message_, resourceMonitor_)
31 {
32 }
33 //-----------------------------------------------------------------------------
35 {
36  DRCEResultDataItem resultDataItem;
37 
38  try
39  {
40  DRCETaskRequestSetExecute* pTaskReqiestSetExecute = dynamic_cast<DRCETaskRequestSetExecute*>(pTaskRequest);
41  if (!pTaskReqiestSetExecute)
43 
44  if (!pTaskReqiestSetExecute->getInputStream().empty())
45  if (pTaskReqiestSetExecute->getCommandLine().empty())
47 
48  if (pTaskReqiestSetExecute->getSessionOptions().homeDir.empty())
49  {
50  SessionOptions sessionOptions = pTaskReqiestSetExecute->getSessionOptions();
51  sessionOptions.homeDir = this->nodeOptions.getHomeDir();
52  pTaskReqiestSetExecute->setSessionOptions(std::forward<SessionOptions>(sessionOptions));
53  }
54 
55  if (!pTaskReqiestSetExecute->getSessionOptions().shellName.empty())
56  {
57  if (!HCE::StarterExecutor::isExistShellScript(pTaskReqiestSetExecute->getSessionOptions().homeDir, pTaskReqiestSetExecute->getSessionOptions().shellName))
59 
60  if (!HCE::StarterExecutor::canExecuteShellScript(pTaskReqiestSetExecute->getSessionOptions().homeDir, pTaskReqiestSetExecute->getSessionOptions().shellName))
62  }
63 
64  DRCEResourceLimitsChecker resourceLimitsChecker(message);
65  resourceLimitsChecker.checkLimits(pTaskReqiestSetExecute->getResourceLimits());
66 
67  DRCETaskFactory taskFactory;
68  DRCECommonTask* pCommonTask = taskFactory.create(pTaskReqiestSetExecute->getSessionOptions().sessionType,
69  std::to_string(pTaskReqiestSetExecute->getTaskId()),
71  message,
72  (*pTaskReqiestSetExecute),
75  inputJsonMessage,
76  pTaskRequest->getParentTaskId());
77 
78  resultDataItem = executeTask(pTaskReqiestSetExecute->getSessionOptions().tmode,
79  pCommonTask,
80  pTaskReqiestSetExecute,
81  inputJsonMessage);
82  }
83  catch(Poco::NoThreadAvailableException& e)
84  {
85  resultDataItem = getResultDataItem(*pTaskRequest, DRCETaskRequest::TaskState::NOT_SET_AS_NEW);
86  resultDataItem.setErrorCode(ERROR_CREATE_NEW_TASK);
87  resultDataItem.setErrorMessage(e.displayText());
88  }
89  catch(Poco::Exception& e)
90  {
91  resultDataItem = getResultDataItem(*pTaskRequest, DRCETaskRequest::TaskState::NOT_SET_AS_NEW);
92  resultDataItem.setErrorCode(e.code());
93  resultDataItem.setErrorMessage(e.message());
94  }
95  catch(std::exception& e)
96  {
97  resultDataItem = getResultDataItem(*pTaskRequest, DRCETaskRequest::TaskState::NOT_SET_AS_NEW);
98  resultDataItem.setErrorCode(ERROR_EXECUTE_TASK);
99  resultDataItem.setErrorMessage(e.what());
100  }
101  return resultDataItem;
102 }
103 //-----------------------------------------------------------------------------
104 DRCEResultDataItem DRCETaskRequestSetExecuteExecutor::executeTask(SessionOptions::ThreadMode threadMode,
105  DRCECommonTask* pCommonTask,
106  DRCETaskRequestSetExecute* pTaskReqiestSetExecute,
107  DRCEInputJsonMessage& inputJsonMessage) throw (Poco::Exception)
108 {
109  DRCEResultDataItem resultDataItem;
110 
111  if (pCommonTask==nullptr)
112  throw Poco::Exception(message(message_const::EMPTY_OBJECT_INSTANCE), ERROR_EXECUTE_TASK);
113 
114  if (pTaskReqiestSetExecute==nullptr)
115  throw Poco::Exception(message(message_const::EMPTY_OBJECT_INSTANCE), ERROR_EXECUTE_TASK);
116 
118  pCommonTask->cleanup(pTaskReqiestSetExecute->getTaskId(), nodeOptions, message, true);
119 
121  if (pTaskReqiestSetExecute->getSessionOptions().cleanup != SessionOptions::CleanupFlag::cfNotDelete &&
122  pTaskReqiestSetExecute->getSessionOptions().cleanup != SessionOptions::CleanupFlag::cfDelete)
123  throw Poco::Exception(message(message_const::CLEANUP_BAD_FLAG,
124  static_cast<unsigned int>(pTaskReqiestSetExecute->getSessionOptions().cleanup)),
126 
128  if (threadMode != SessionOptions::ThreadMode::tmSync &&
129  threadMode != SessionOptions::ThreadMode::tmAsync)
130  throw Poco::Exception(message(message_const::WRONG_TYPE_THREAD_MODE, static_cast<unsigned int>(threadMode)), ERROR_EXECUTE_TASK);
131 
133  pCommonTask->saveRequest(inputJsonMessage, nodeOptions, message);
134 
135  switch(threadMode)
136  {
137  case SessionOptions::ThreadMode::tmSync:
138  {
139  // save value lenght of stdin
140  resourceMonitor.addSizeInputBufferSyncTasks(pTaskReqiestSetExecute->getInputStream().length());
141 
142  // running task in sync mode
143  DRCEAsyncTasksQueue::SyncTasks& syncTask = asyncTaskQueue.getSyncTasks();
144  Poco::Timestamp now;
145  syncTask.timeStart = now.epochTime();
146  syncTask.timeMax = pTaskReqiestSetExecute->getSessionOptions().timeMax;
147  syncTask.taskId = pTaskReqiestSetExecute->getTaskId();
149  syncTask.usageLimits = pTaskReqiestSetExecute->getResourceLimits().usageLimits;
150 
151  DRCECommonTask::ApplyPidHandler applyPidHandler(asyncTaskQueue, syncTask, DRCETaskRequest::TaskState::IN_PROGRESS, SessionOptions::ThreadMode::tmSync);
152  resultDataItem = pCommonTask->execute(applyPidHandler);
153 
154  if (!pCommonTask->isCancelled())
155  {
156  Poco::Logger::root().log(Poco::Message(drce_const::moduleName,
157  "DRCETaskRequestSetExecuteExecutor::executeTask before executeSubtasks() SessionOptions::ThreadMode::tmSync",
158  Poco::Message::Priority::PRIO_TRACE));
159  // block running subtasks
160  pCommonTask->executeSubtasks(resultDataItem);
161 
162  if (!pCommonTask->isCancelled())
163  {
164  // update all subtasks
165  for (size_t i=0;i<resultDataItem.getSubtasksCount();++i)
166  {
167  pCommonTask->waitUpdateAllTasks(resultDataItem.getSubtaskItem(i).getRequestId(), nodeOptions, message, asyncTaskQueue, pCommonTask);
168  }
169  }
170  }
171 
172  if (pCommonTask->isCancelled())
173  {
174  unsigned int errorCode = NO_ERROR;
175  unsigned int errorCodeMainTask = NO_ERROR;
176  std::string errorMessage;
178 
179  DRCEResultDataItem resultItem = DRCECommonTask::extractStatusResultDataItem(pTaskReqiestSetExecute->getTaskId(), nodeOptions, asyncTaskQueue);
180  if (resultItem.getErrorCode() == ERROR_TERMINATE_EXPIRED_TASK)
181  {
183  errorCodeMainTask = ERROR_TERMINATE_EXPIRED_TASK;
184  errorMessage = message(message_const::TASK_TIME_EXPIRED, syncTask.timeMax);
186  }
187  else
188  {
189  errorCode = resultItem.getErrorCode();
190  errorCodeMainTask = resultItem.getErrorCode();
191  errorMessage = resultItem.getErrorMessage();
192  taskState = static_cast<DRCETaskRequest::TaskState>(resultItem.getState());
193  }
194 
195  resultDataItem = pCommonTask->makeResultDataItem(inputJsonMessage, nodeOptions, message,
196  DRCETaskRequest::RequestType::rtSetTaskExecute, taskState,
197  errorCode, errorMessage, syncTask.pid, syncTask.timeElapsed);
198  resultDataItem.setErrorCode(errorCodeMainTask);
199  pCommonTask->saveResultData(resultDataItem);
200  }
201 
202  // start task for cleanup tasks data if necessary
203  if (!pCommonTask->getParentTaskId())
204  {
205  Poco::Logger::root().log(Poco::Message(drce_const::moduleName,
206  "DRCETaskRequestSetExecuteExecutor::executeTask asyncTaskQueue.startCleanupTask",
207  Poco::Message::Priority::PRIO_TRACE));
208  asyncTaskQueue.startCleanupTask(inputJsonMessage.getRequestId());
209  }
210 
211  Poco::File dataFile(FileStream::getDataFileName(nodeOptions.getTasksDataDir(), syncTask.taskId));
212  if (dataFile.exists())
213  {
214  try
215  {
216  DataFileExtractor dataFileExtractor(message);
217  resultDataItem = dataFileExtractor.extract(dataFile.path());
218  }
219  catch(Poco::Exception& e)
220  {
221  std::stringstream outMsg;
222  outMsg << "!!!!! DataFileExtractor::extract(" << dataFile.path() << "): " << e.displayText();
223  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
224  }
225  }
226  // free resources
227  pCommonTask->release();
228  syncTask.clear();
229 
230  // save values lenght of stdout and time execution
231  resourceMonitor.addSizeOutputBufferSyncTasks(resultDataItem.getStdoutStream().length());
232  resourceMonitor.addTimeSyncTask(resultDataItem.getTime());
233  // increment count of sync tasks
234  resourceMonitor.incrementCountSyncTasks();
235  if (resultDataItem.getErrorCode()!=NO_ERROR)
236  resourceMonitor.incrementCountSyncTasksFail();
237  }
238  break;
239  case SessionOptions::ThreadMode::tmAsync:
240  {
241  // save value lenght of stdin
242  resourceMonitor.addSizeInputBufferAsyncTasks(pTaskReqiestSetExecute->getInputStream().length());
243 
244  // running task in async mode
245  asyncTaskQueue.startTask(pCommonTask);
246  resultDataItem = getResultDataItem(inputJsonMessage, DRCETaskRequest::TaskState::SET_AS_NEW);
247 
248  // increment count of async tasks
249  resourceMonitor.incrementCountAsyncTasks();
250  }
251  break;
252  default:;
253  }
254  return resultDataItem;
255 }
256 //-----------------------------------------------------------------------------
257 //-----------------------------------------------------------------------------
258 } // end namespace drce
259 } // end namespace HCE