4 #include <Poco/AutoPtr.h>
5 #include <Poco/Logger.h>
30 :
inherited(asyncTaskQueue_, nodeOptions_, message_, resourceMonitor_)
41 if (!pTaskReqiestSetExecute)
52 pTaskReqiestSetExecute->
setSessionOptions(std::forward<SessionOptions>(sessionOptions));
69 std::to_string(pTaskReqiestSetExecute->
getTaskId()),
72 (*pTaskReqiestSetExecute),
80 pTaskReqiestSetExecute,
83 catch(Poco::NoThreadAvailableException& e)
85 resultDataItem =
getResultDataItem(*pTaskRequest, DRCETaskRequest::TaskState::NOT_SET_AS_NEW);
89 catch(Poco::Exception& e)
91 resultDataItem =
getResultDataItem(*pTaskRequest, DRCETaskRequest::TaskState::NOT_SET_AS_NEW);
95 catch(std::exception& e)
97 resultDataItem =
getResultDataItem(*pTaskRequest, DRCETaskRequest::TaskState::NOT_SET_AS_NEW);
101 return resultDataItem;
111 if (pCommonTask==
nullptr)
114 if (pTaskReqiestSetExecute==
nullptr)
118 pCommonTask->cleanup(pTaskReqiestSetExecute->getTaskId(), nodeOptions, message,
true);
121 if (pTaskReqiestSetExecute->getSessionOptions().cleanup != SessionOptions::CleanupFlag::cfNotDelete &&
122 pTaskReqiestSetExecute->getSessionOptions().cleanup != SessionOptions::CleanupFlag::cfDelete)
124 static_cast<unsigned int>(pTaskReqiestSetExecute->getSessionOptions().cleanup)),
128 if (threadMode != SessionOptions::ThreadMode::tmSync &&
129 threadMode != SessionOptions::ThreadMode::tmAsync)
133 pCommonTask->saveRequest(inputJsonMessage, nodeOptions, message);
137 case SessionOptions::ThreadMode::tmSync:
140 resourceMonitor.addSizeInputBufferSyncTasks(pTaskReqiestSetExecute->getInputStream().length());
146 syncTask.
timeMax = pTaskReqiestSetExecute->getSessionOptions().timeMax;
147 syncTask.
taskId = pTaskReqiestSetExecute->getTaskId();
149 syncTask.
usageLimits = pTaskReqiestSetExecute->getResourceLimits().usageLimits;
151 DRCECommonTask::ApplyPidHandler applyPidHandler(asyncTaskQueue, syncTask, DRCETaskRequest::TaskState::IN_PROGRESS, SessionOptions::ThreadMode::tmSync);
152 resultDataItem = pCommonTask->execute(applyPidHandler);
154 if (!pCommonTask->isCancelled())
157 "DRCETaskRequestSetExecuteExecutor::executeTask before executeSubtasks() SessionOptions::ThreadMode::tmSync",
158 Poco::Message::Priority::PRIO_TRACE));
160 pCommonTask->executeSubtasks(resultDataItem);
162 if (!pCommonTask->isCancelled())
167 pCommonTask->waitUpdateAllTasks(resultDataItem.
getSubtaskItem(i).
getRequestId(), nodeOptions, message, asyncTaskQueue, pCommonTask);
172 if (pCommonTask->isCancelled())
175 unsigned int errorCodeMainTask =
NO_ERROR;
189 errorCode = resultItem.getErrorCode();
190 errorCodeMainTask = resultItem.getErrorCode();
191 errorMessage = resultItem.getErrorMessage();
195 resultDataItem = pCommonTask->makeResultDataItem(inputJsonMessage, nodeOptions, message,
196 DRCETaskRequest::RequestType::rtSetTaskExecute, taskState,
199 pCommonTask->saveResultData(resultDataItem);
203 if (!pCommonTask->getParentTaskId())
206 "DRCETaskRequestSetExecuteExecutor::executeTask asyncTaskQueue.startCleanupTask",
207 Poco::Message::Priority::PRIO_TRACE));
208 asyncTaskQueue.startCleanupTask(inputJsonMessage.getRequestId());
212 if (dataFile.exists())
216 DataFileExtractor dataFileExtractor(message);
217 resultDataItem = dataFileExtractor.extract(dataFile.path());
219 catch(Poco::Exception& e)
221 std::stringstream outMsg;
222 outMsg <<
"!!!!! DataFileExtractor::extract(" << dataFile.path() <<
"): " << e.displayText();
223 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
227 pCommonTask->release();
231 resourceMonitor.addSizeOutputBufferSyncTasks(resultDataItem.
getStdoutStream().length());
232 resourceMonitor.addTimeSyncTask(resultDataItem.
getTime());
234 resourceMonitor.incrementCountSyncTasks();
236 resourceMonitor.incrementCountSyncTasksFail();
239 case SessionOptions::ThreadMode::tmAsync:
242 resourceMonitor.addSizeInputBufferAsyncTasks(pTaskReqiestSetExecute->getInputStream().length());
245 asyncTaskQueue.startTask(pCommonTask);
246 resultDataItem = getResultDataItem(inputJsonMessage, DRCETaskRequest::TaskState::SET_AS_NEW);
249 resourceMonitor.incrementCountAsyncTasks();
254 return resultDataItem;