4 #include <Poco/Logger.h>
23 : asyncTasksQueue(asyncTasksQueue_), asyncTask(asyncTask_), allowedTaskState(allowedTaskState_), threadMode(threadMode_)
29 if (threadMode == SessionOptions::ThreadMode::tmAsync)
34 if (task.
state == allowedTaskState)
42 else if (threadMode == SessionOptions::ThreadMode::tmSync)
60 const size_t filesCount = filesList.getFilesCount();
62 for (
size_t i=0;i<filesCount;++i)
64 const std::string
fileName = filesList.getFileItem(i).name;
68 std::ofstream ofs(fileName.c_str(), std::fstream::trunc|std::fstream::binary);
77 if (
remove(fileName.c_str())!=0)
88 for (
size_t i=0;i<filesCount;++i)
97 ifs.open(fileName, std::fstream::binary);
98 if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
101 std::string fileContent;
102 ifs.seekg(0, std::ios::end);
103 fileContent.resize(ifs.tellg());
104 ifs.seekg(0, std::ios::beg);
105 ifs.read(const_cast<char*>(fileContent.c_str()), fileContent.size());
112 resultDataItem.
addFileItem(std::forward<FileItem>(fileItem));
116 if (
remove(fileName.c_str())!=0)
122 catch(Poco::Exception& e)
125 if (!errMsg.str().empty())
127 errMsg << e.message();
143 if (requetFile.exists())
169 if (pTaskRequestSetExecute)
196 catch(Poco::Exception& e)
198 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
206 Poco::File file(path);
211 std::stringstream outMsg;
212 outMsg <<
"DRCECommonTask::safeCleanup path: " << path;
213 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
216 catch(Poco::Exception& e)
218 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
224 Poco::Timestamp tsStop;
225 Poco::Timestamp::TimeDiff diff = tsStop-tsStart;
226 return static_cast<size_t>(diff/1000);
231 std::stringstream outMsg;
232 outMsg <<
"DRCECommonTask::executeSubtasks enter taskId: " << resultDataItem.
getRequestId();
233 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
241 if (!pTaskRequest.isNull() && !isCancelled())
249 resultDataItem.
addSubtaskItem(std::forward<DRCEResultDataItem>(resultItem));
262 dataFileBuilder.build(resultData);
264 catch(Poco::Exception& e)
266 std::stringstream outMsg;
267 outMsg <<
"DRCECommonTask::executeSubtasks dataFileBuilder.build " << e.displayText();
268 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_ERROR));
271 catch(std::exception& e)
273 std::stringstream outMsg;
274 outMsg <<
"DRCECommonTask::executeSubtasks dataFileBuilder.build " << e.what();
275 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_ERROR));
285 unsigned int& progressCount,
289 Poco::File dataFile(dataFileName);
290 if (dataFile.exists())
295 std::stringstream outMsg;
296 outMsg <<
"DRCECommonTask::updateTasks taskId: " << resultDataItem.
getRequestId();
297 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
302 resultDataItem = dataFileExtractor.
extract(dataFileName);
304 catch(Poco::Exception& e)
306 std::stringstream outMsg;
307 outMsg <<
"DRCECommonTask::updateTasks dataFileExtractor.extract: " << e.displayText();
308 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
310 catch(std::exception& e)
312 std::stringstream outMsg;
313 outMsg <<
"DRCECommonTask::updateTasks dataFileExtractor.extract: " << e.what();
314 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
317 if (resultDataItem.
getState()==
static_cast<unsigned int>(DRCETaskRequest::TaskState::SET_AS_NEW) ||
318 resultDataItem.
getState()==
static_cast<unsigned int>(DRCETaskRequest::TaskState::IN_PROGRESS) ||
319 resultDataItem.
getState()==
static_cast<unsigned int>(DRCETaskRequest::TaskState::QUEUED_TO_RUN))
323 if (resultDataItem.
getState()==
static_cast<unsigned int>(DRCETaskRequest::TaskState::QUEUED_TO_RUN))
326 outMsg <<
"DRCECommonTask::updateTasks pCommonTask->executeSubtasks taskId: " << resultDataItem.
getRequestId();
327 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
336 unsigned int subtasksProgressCount = 0;
337 updateTasks(resultItem, nodeOptions, message, asyncTasksQueue, subtasksProgressCount, pCommonTask);
338 resultDataItem.
setSubtaskItem(i, std::forward<DRCEResultDataItem>(resultItem));
344 dataFileBuilder.
build(resultData);
347 statusFileBuilder.
build(resultData);
349 catch(Poco::Exception& e)
353 catch(std::exception& e)
370 if (pTaskRequestSetExecute)
374 std::stringstream outMsg;
375 outMsg <<
"DRCECommonTask::getSubtasksList taskId: " << inputJsonMessage.
getRequestId()
376 <<
" cleanup: " << std::boolalpha << (pTaskRequestSetExecute->
getSessionOptions().
cleanup==SessionOptions::CleanupFlag::cfDelete);
377 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
394 if (requestFile.exists())
397 inputJsonMessage = requestFileExtractor.
extract(requestFile.path());
400 catch(Poco::Exception& e)
402 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
404 catch(std::exception& e)
406 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.what(), Poco::Message::Priority::PRIO_ERROR));
417 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName,
"DRCECommonTask::waitUpdateAllTasks enter", Poco::Message::Priority::PRIO_TRACE));
420 Poco::File dataFile(dataFileName);
422 std::stringstream outMsg;
423 outMsg <<
"DRCECommonTask::waitUpdateAllTasks '" << dataFileName <<
"' isExist = " << std::boolalpha << dataFile.exists() <<
std::endl;
424 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
426 if (dataFile.exists())
432 unsigned int progressCount = 0;
434 while (progressCount)
437 Poco::Thread::sleep(1000);
441 std::stringstream outMsg;
442 outMsg <<
"DRCECommonTask::waitUpdateAllTasks taskId: " << resultDataItem.
getRequestId() <<
" progressCount: " << progressCount;
443 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
446 if (pTask->isCancelled())
450 catch(Poco::Exception& e)
452 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_DEBUG));
459 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName,
"DRCECommonTask::waitEndAllTask enter", Poco::Message::Priority::PRIO_TRACE));
464 for (SubtasksList::iterator iter=subtasks.begin();iter!=subtasks.end();++iter)
468 subtasks.erase(iter);
473 if (!subtasks.empty())
475 bool needWait =
true;
478 for (
size_t i=0;i<subtasks.size();++i)
482 std::stringstream outMsg;
483 outMsg <<
"DRCECommonTask::waitEndAllTask (" << i <<
") asyncTasksQueue.isExistAsyncTask(" <<subtasks[i].first <<
")";
484 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
488 if ((i+1)==subtasks.size())
491 Poco::Thread::sleep(1000);
493 if (pTask->isCancelled())
514 requestFileBuilder.
build(inputJsonMessage);
516 catch(Poco::Exception& e)
518 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
520 catch(std::exception& e)
522 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.what(), Poco::Message::Priority::PRIO_ERROR));
531 if (requestFile.exists())
536 saveRequest(inputJsonMessage, nodeOptions, message);
539 catch(Poco::Exception& e)
541 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
543 catch(std::exception& e)
545 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.what(), Poco::Message::Priority::PRIO_ERROR));
554 requestFileBuilder.
build(inputJsonMessage);
556 catch(Poco::Exception& e)
558 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
560 catch(std::exception& e)
562 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.what(), Poco::Message::Priority::PRIO_ERROR));
574 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName,
"DRCECommonTask::saveResultData enter", Poco::Message::Priority::PRIO_TRACE));
583 dataFileBuilder.
build(resultData);
586 statusFileBuilder.
build(resultData);
588 catch(Poco::Exception& e)
590 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
592 catch(std::exception& e)
594 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.what(), Poco::Message::Priority::PRIO_ERROR));
600 saveResultData(resultItem, nodeOptions, message, asyncTasksQueue);
606 unsigned int pid,
size_t timeElapsed,
unsigned int exitStatus)
617 resultDataItem.
setTime(timeElapsed);
618 resultDataItem.
setPid(pid);
623 resultDataItem.
setState(static_cast<unsigned int>(taskState));
627 if (statusFile.exists())
638 resultDataItem.
setState(static_cast<unsigned int>(taskState));
640 catch(Poco::Exception& e)
642 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_TRACE));
650 return resultDataItem;
665 parentTaskId = asyncTask.
taskId;
668 catch(Poco::Exception& e)
670 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
678 Poco::File dataFile(fileName);
679 if (dataFile.exists())
686 resultDataItem = dataFileExctractor.
extract(dataFile.path());
688 catch(Poco::Exception& e)
690 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
693 return resultDataItem;