9 #include <Poco/TaskManager.h>
10 #include <Poco/Timestamp.h>
11 #include <Poco/File.h>
12 #include <Poco/Logger.h>
38 this->log(
"Started task: "+pNf->task()->name());
46 this->log(
"Cancelled task: "+pNf->task()->name());
53 this->log(
"Stopped task: "+pNf->task()->name());
57 void ProgressHandler::log(
const std::string& msg)
61 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, msg, Poco::Message::Priority::PRIO_INFORMATION));
63 catch(Poco::Exception& e)
65 std::cerr <<
"ProgressHandler: " << e.displayText() <<
std::endl;
71 :
taskId(0), parentTaskId(0),
pid(0),
timeMax(0), timeStart(0), timeElapsed(0),
76 AsyncTasks::AsyncTasks(
unsigned int taskId_,
unsigned int parentTaskId_, pid_t pid_,
unsigned int timeMax_,
size_t timeStart_,
78 :
taskId(taskId_), parentTaskId(parentTaskId_),
pid(pid_),
timeMax(timeMax_), timeStart(timeStart_), timeElapsed(timeElapsed_),
79 state(state_), locked(false), usageLimits()
84 :
taskId(0), parentTaskId(0),
pid(0),
timeMax(0), timeStart(0), timeElapsed(0),
91 :
taskId(0), parentTaskId(0),
pid(0),
timeMax(0), timeStart(0), timeElapsed(0),
94 (*this) = std::forward<AsyncTasks>(rhs);
118 taskId = std::move(rhs.taskId);
120 pid = std::move(rhs.pid);
121 timeMax = std::move(rhs.timeMax);
124 state = std::move(rhs.state);
125 locked = std::move(rhs.locked);
139 state = DRCETaskRequest::TaskState::UNDEFINED;
147 <<
static_cast<unsigned int>(rhs.
state);
159 unsigned int state = 0;
170 : maxThreadCount(DEFAULT_MAX_CAPACITY), threadPool(), taskManager(threadPool), progressHandler(*this), thread(),
171 nodeOptions(nodeOptions_), notificationExecutor(notificationExecutor_), message(message_),
172 tasks(), subtasks(), mutex(), terminated(false), tasksQueueDumpPeriod(DEFAULT_QUEUE_DUMP_PERIOD), cleanupTaskManager(), syncTask(), resetErrorCodeStateNotification(false)
174 taskManager.addObserver(Poco::Observer<ProgressHandler, Poco::TaskStartedNotification>(progressHandler, &
ProgressHandler::onStarted));
175 taskManager.addObserver(Poco::Observer<ProgressHandler, Poco::TaskFinishedNotification>(progressHandler, &
ProgressHandler::onFinished));
176 taskManager.addObserver(Poco::Observer<ProgressHandler, Poco::TaskCancelledNotification>(progressHandler, &
ProgressHandler::onCancelled));
192 taskManager.cancelAll();
194 cleanupTaskManager.cancelAll();
199 taskManager.joinAll();
200 cleanupTaskManager.joinAll();
210 taskId = std::stoul(pTask->name());
212 catch(std::exception& e)
217 if (isExistAsyncTask(taskId))
220 taskManager.start(pTask);
230 const std::string taskName = std::to_string(taskId);
232 Poco::TaskManager::TaskList taskList = taskManager.taskList();
233 for (Poco::TaskManager::TaskList::iterator iter=taskList.begin();iter!=taskList.end();++iter)
235 if ((*iter)->name() == taskName)
242 catch(std::exception& e)
253 const std::string taskName = std::to_string(taskId);
254 Poco::TaskManager::TaskList taskList = taskManager.taskList();
255 for (Poco::TaskManager::TaskList::iterator iter=taskList.begin();iter!=taskList.end();++iter)
257 if ((*iter)->name() == taskName)
264 catch(std::exception& e)
272 if (threadCount < maxThreadCount)
273 threadPool.addCapacity(maxThreadCount - threadPool.capacity());
275 threadPool.addCapacity(static_cast<int>(threadCount) - threadPool.capacity());
280 return static_cast<unsigned int>(threadPool.capacity());
286 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName,
"DRCEAsyncTasksQueue::changeState enter", Poco::Message::Priority::PRIO_TRACE));
288 if (!asyncTask.timeStart)
291 "!!! DRCEAsyncTasksQueue::changeState recieved empty 'asyncTask.timeStart' !!!",
292 Poco::Message::Priority::PRIO_TRACE));
300 std::stringstream outMsg;
303 if (dataFile.exists())
308 resultDataItem = dataFileExtractor.
extract(dataFile.path());
310 catch(Poco::Exception& e)
313 outMsg <<
"!!!!! DataFileExtractor::extract(" << dataFile.path() <<
"): " << e.displayText();
314 outMsg <<
" SET STATE TO IN_PROGRESS";
315 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
317 resultDataItem.
setState(static_cast<unsigned int>(DRCETaskRequest::TaskState::IN_PROGRESS));
323 resultDataItem = *pResultDataItem;
327 outMsg <<
"DRCEAsyncTasksQueue::changeState taskId: " << asyncTask.taskId
328 <<
" asyncTasks.state: " << (int)asyncTask.state
329 <<
" resultDataItem.state: " << resultDataItem.
getState()
330 <<
" asyncTask.parentTaskId: " << asyncTask.parentTaskId;
331 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
333 resultDataItem.
setNodeName(nodeOptions.getNodeName());
334 resultDataItem.
setNodeHost(nodeOptions.getNodeHost());
335 resultDataItem.
setNodePort(nodeOptions.getNodePort());
338 resultDataItem.
setPid(asyncTask.pid);
339 resultDataItem.
setState(static_cast<unsigned int>(asyncTask.state));
340 resultDataItem.
setTime(asyncTask.timeElapsed);
347 resultData.
addDataItem(std::forward<DRCEResultDataItem>(resultDataItem));
350 if (!asyncTask.parentTaskId)
352 AsyncTasks prevStateAsyncTask = getAsyncTask(asyncTask.taskId);
353 if (prevStateAsyncTask.
state != asyncTask.state || prevStateAsyncTask.
pid != asyncTask.pid)
356 setAsyncTask(asyncTask);
358 executeNotification(resultData, DRCETaskRequest::RequestType::rtTaskStateNotification);
362 if ((asyncTask.state == DRCETaskRequest::TaskState::FINISHED) ||
363 (asyncTask.state == DRCETaskRequest::TaskState::TERMINATED) ||
364 (asyncTask.state == DRCETaskRequest::TaskState::CRASHED) ||
365 (asyncTask.state == DRCETaskRequest::TaskState::DELETED) ||
366 (asyncTask.state == DRCETaskRequest::TaskState::UNDEFINED) ||
367 (asyncTask.state == DRCETaskRequest::TaskState::TERMINATED_BY_TTL))
369 bool ret = removeAsyncTask(asyncTask.taskId);
371 outMsg <<
"TASK (ID: " << asyncTask.taskId <<
", PID: " << asyncTask.pid <<
", TIME: " << asyncTask.timeElapsed
372 <<
", STATE: " <<
static_cast<unsigned int>(asyncTask.state) <<
") WAS " << ((ret)?
"SUCCESS":
"FAILED") <<
" ERASED FROM QUEUE !!!";
373 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
380 if (asyncTask.state != DRCETaskRequest::TaskState::DELETED)
383 DataFileBuilder dataFileBuilder(message, nodeOptions.getTasksDataDir());
384 dataFileBuilder.
build(resultData);
387 statusFileBuilder.
build(resultData);
396 changeState(std::forward<AsyncTasks>(asyncTask), pResultDataItem, errorMessage, errorCode);
398 catch(Poco::Exception& e)
402 catch(std::exception& e)
411 Poco::Mutex::ScopedLock lock(mutex);
413 Tasks::iterator iter = tasks.find(taskId);
414 if (iter != tasks.end())
416 asyncTask = (*iter).second;
423 Poco::Mutex::ScopedLock lock(mutex);
424 tasks[asyncTask.
taskId] = asyncTask;
429 Poco::Mutex::ScopedLock lock(mutex);
430 tasks[asyncTask.taskId] = std::forward<AsyncTasks>(asyncTask);
435 Poco::Mutex::ScopedLock lock(mutex);
442 Poco::Mutex::ScopedLock lock(mutex);
443 Tasks::iterator iter = tasks.find(taskId);
444 if (iter != tasks.end())
454 Poco::Mutex::ScopedLock lock(mutex);
455 return (tasks.find(taskId)!=tasks.end());
460 Poco::Mutex::ScopedLock lock(mutex);
461 Tasks::iterator iter = tasks.find(taskId);
462 if (iter != tasks.end())
464 (*iter).second.locked =
true;
475 Poco::Mutex::ScopedLock lock(mutex);
476 Tasks::iterator iter = tasks.find(taskId);
477 if (iter != tasks.end())
479 (*iter).second.locked =
false;
491 Poco::Mutex::ScopedLock lock(mutex);
492 Tasks::iterator iter = tasks.find(taskId);
493 if (iter != tasks.end())
495 result = (*iter).second.locked;
502 Poco::Mutex::ScopedLock lock(mutex);
508 Poco::Mutex::ScopedLock lock(mutex);
514 Poco::Mutex::ScopedLock lock(mutex);
515 tasksQueueDumpPeriod = tasksQueueDumpPeriod_;
520 Poco::Mutex::ScopedLock lock(mutex);
521 return tasksQueueDumpPeriod;
526 Poco::Mutex::ScopedLock lock(mutex);
530 if (fileStream.isOpen())
532 std::string fileContent;
533 fileStream >> fileContent;
534 std::istringstream istr(fileContent);
536 while(std::getline(istr, line))
539 std::stringstream str(line);
542 std::stringstream outMsg;
543 outMsg <<
">>> loadTasksQueue\ttaskId = " << asyncTask.
taskId <<
" parentTaskId = " << asyncTask.parentTaskId <<
" pid = " << asyncTask.pid
544 <<
" timeMax = " << asyncTask.timeMax <<
" timeStart = " << asyncTask.timeStart <<
" state = " << (
unsigned int)(asyncTask.state);
545 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
547 unsigned int taskId = asyncTask.taskId;
550 tasks[
taskId] = std::forward<AsyncTasks>(asyncTask);
554 asyncTask.state = DRCETaskRequest::TaskState::UNDEFINED;
555 tasks.insert(Tasks::value_type(asyncTask.taskId, asyncTask));
561 Poco::File dumpFile(dumpFileName);
562 if (dumpFile.exists())
565 catch(Poco::Exception& e)
574 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName,
"DRCEAsyncTasksQueue::saveTasksQueue enter", Poco::Message::Priority::PRIO_TRACE));
576 Poco::Mutex::ScopedLock lock(mutex);
580 for (Tasks::iterator iter=tasks.begin();iter!=tasks.end();++iter)
582 std::stringstream ostr;
584 fileStream << ostr.str();
590 Poco::Mutex::ScopedLock lock(mutex);
591 notificationExecutor.
execute(json);
601 if (resetErrorCodeStateNotification)
603 for (
size_t i=0;i<itemCount;++i)
610 localResultData.
setDataItem(i, std::forward<DRCEResultDataItem>(resultDataItem));
614 for (
size_t i=0;i<itemCount;++i)
618 localResultData.
setDataItem(i, std::forward<DRCEResultDataItem>(resultDataItem));
623 if (requestType == DRCETaskRequest::RequestType::rtTaskStateNotification)
629 catch(Poco::Exception& e)
633 catch(std::exception& e)
641 Poco::Mutex::ScopedLock lock(mutex);
644 for (Tasks::iterator iter=tasks.begin();iter!=tasks.end();++iter)
649 if (statusFile.exists())
659 if (asyncTask.
state!=HCE::drce::DRCETaskRequest::TaskState::FINISHED)
662 resultData.
addDataItem(std::forward<DRCEResultDataItem>(resultDataItem));
664 catch(Poco::Exception& e)
666 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, e.message(), Poco::Message::Priority::PRIO_DEBUG));
677 cleanupTaskManager.start(
new DRCECleanupTask(taskId, nodeOptions, message, *
this));
679 catch(Poco::NoThreadAvailableException& e)
683 catch(Poco::Exception& e)
692 Poco::Mutex::ScopedLock lock(mutex);
693 for (Tasks::iterator iter=tasks.begin();iter!=tasks.end();++iter)
696 (*iter).second.timeStart,
697 (*iter).second.state,
698 SessionOptions::ThreadMode::tmAsync)));
705 std::string resultJson;
711 Poco::Message::Priority::PRIO_ERROR));
718 std::stringstream outMsg;
719 outMsg << taskStatus << delimiter
720 << asyncTask.
taskId << delimiter
722 <<
static_cast<unsigned int>(asyncTask.
state) << delimiter
723 << static_cast<unsigned int>(SessionOptions::ThreadMode::tmAsync);
727 void DRCEAsyncTasksQueue::setSyncTaskAsTerminated(
const std::string&
errorMessage,
unsigned int errorCode)
729 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName,
"DRCEAsyncTasksQueue::setSyncTaskAsTerminated enter", Poco::Message::Priority::PRIO_TRACE));
734 "!!! DRCEAsyncTasksQueue::setSyncTaskAsTerminated recieved empty 'syncTask.timeStart' !!!",
735 Poco::Message::Priority::PRIO_TRACE));
739 syncTask.
state = DRCETaskRequest::TaskState::TERMINATED_BY_TTL;
741 DRCEResultData resultData;
742 DRCEResultDataItem resultDataItem;
745 if (dataFile.exists())
749 DataFileExtractor dataFileExtractor(message);
750 resultDataItem = dataFileExtractor.extract(dataFile.path());
752 catch(Poco::Exception& e)
754 std::stringstream outMsg;
755 outMsg <<
"!!!!!DataFileExtractor::extract(" << dataFile.path() <<
"): " << e.displayText();
756 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
759 std::stringstream outMsg;
760 outMsg <<
">>>>> DataFileExtractor::extract Size of " << syncTask.
taskId <<
".data: " << dataFile.getSize();
761 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
764 resultDataItem.setNodeName(nodeOptions.
getNodeName());
765 resultDataItem.setNodeHost(nodeOptions.
getNodeHost());
766 resultDataItem.setNodePort(nodeOptions.
getNodePort());
768 resultDataItem.setRequestId(syncTask.
taskId);
769 resultDataItem.setPid(syncTask.
pid);
770 resultDataItem.setState(static_cast<unsigned int>(DRCETaskRequest::TaskState::TERMINATED_BY_TTL));
771 resultDataItem.setTime(Poco::Timestamp::fromEpochTime(syncTask.
timeStart).elapsed()/1000);
772 resultDataItem.setErrorMessage(errorMessage);
773 resultDataItem.setErrorCode(errorCode);
775 resultData.addDataItem(std::forward<DRCEResultDataItem>(resultDataItem));
780 DataFileBuilder dataFileBuilder(message, nodeOptions.
getTasksDataDir());
781 dataFileBuilder.build(resultData);
784 statusFileBuilder.build(resultData);
786 catch(Poco::Exception& e)
788 std::stringstream outMsg;
789 outMsg <<
"!!!!!FileBuilder::build: " << e.displayText();
790 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_ERROR));
792 catch(std::exception& e)
794 std::stringstream outMsg;
795 outMsg <<
"!!!!!FileBuilder::build: " << e.what();
796 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_ERROR));
802 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName,
"DRCEAsyncTasksQueue::checkExpiredTime enter", Poco::Message::Priority::PRIO_TRACE));
804 Poco::Mutex::ScopedLock lock(mutex);
806 for (Tasks::iterator iter=tasks.begin();iter!=tasks.end();)
808 std::stringstream outMsg;
809 if (!(*iter).second.timeStart || !(*iter).second.timeMax || !(*iter).second.taskId)
811 outMsg <<
"taskId: " << (*iter).second.taskId
812 <<
" pid: " << (*iter).second.pid
813 <<
" timeMax: " << (*iter).second.timeMax
814 <<
" timeStart: " << (*iter).second.timeStart
815 <<
" state: " << (*iter).second.state <<
" WAS FORCE ERASE";
816 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
817 iter = tasks.erase(iter);
823 Tasks taskList = tasks;
824 for (Tasks::iterator iter=taskList.begin();iter!=taskList.end();++iter)
826 std::stringstream outMsg;
827 outMsg <<
"taskId: " << (*iter).second.taskId
828 <<
" pid: " << (*iter).second.pid
829 <<
" timeMax: " << (*iter).second.timeMax
830 <<
" timeStart: " << (*iter).second.timeStart
831 <<
" state: " << (*iter).second.state;
833 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
835 Poco::Timestamp::TimeDiff diff = Poco::Timestamp::fromEpochTime((*iter).second.timeStart).elapsed();
836 (*iter).second.timeElapsed =
static_cast<size_t>(diff/1000);
839 outMsg <<
"diff = " << (diff/1000);
840 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
842 if ((*iter).second.timeMax &&
843 ((*iter).second.timeMax < (diff/1000)))
849 std::stringstream outMsg;
850 outMsg <<
"TASK (ID: " << asyncTask.
taskId <<
", PID: " << asyncTask.
pid <<
", TIME: " << asyncTask.
timeElapsed <<
") WAS TERMINATED AS EXPIRED !!!";
851 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_INFORMATION));
854 outMsg <<
"DRCEAsyncTasksQueue::checkExpiredTime getSubtaskCleanup(" << asyncTask.
taskId <<
") = " << std::boolalpha <<
getSubtaskCleanup(asyncTask.
taskId);
855 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
861 std::stringstream outMsg;
862 Poco::Timestamp::TimeDiff diff = Poco::Timestamp::fromEpochTime(syncTask.
timeStart).elapsed();
864 outMsg <<
"taskId: " << syncTask.
taskId
865 <<
" pid: " << syncTask.
pid
866 <<
" timeMax: " << syncTask.
timeMax
868 <<
" state: " << syncTask.
state;
869 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
872 outMsg <<
"diff = " << (diff/1000);
873 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
883 for (
size_t i=0;i<descendants.size();++i)
884 kill(descendants[i], SIGKILL);
887 outMsg <<
"TASK (ID: " << syncTask.
taskId <<
", PID: " << syncTask.
pid <<
", TIME: " << (
unsigned int)(diff/1000) <<
") WAS TERMINATED AS EXPIRED !!!";
888 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_INFORMATION));
896 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName,
"DRCEAsyncTasksQueue::checkUsageResources enter", Poco::Message::Priority::PRIO_TRACE));
898 Poco::Mutex::ScopedLock lock(mutex);
902 Tasks taskList = tasks;
903 for (Tasks::iterator iter=taskList.begin();iter!=taskList.end();++iter)
907 resourceLimitsChecker.
checkLimits((*iter).second, (*iter).second.usageLimits);
909 catch(Poco::Exception& e)
911 std::stringstream outMsg;
912 outMsg <<
"TASK (ID: " << (*iter).second.taskId <<
", PID: " << (*iter).second.pid <<
", TIME: " << (*iter).second.timeElapsed <<
") WAS TERMINATED AS EXCEEDED USAGE RESOURCES !!!";
913 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_INFORMATION));
925 catch(Poco::Exception& e)
929 Poco::Timestamp::TimeDiff diff = Poco::Timestamp::fromEpochTime(syncTask.
timeStart).elapsed();
931 std::stringstream outMsg;
932 outMsg <<
"TASK (ID: " << syncTask.
taskId <<
", PID: " << syncTask.
pid <<
", TIME: " << (
unsigned int)(diff/1000) <<
") WAS TERMINATED AS EXCEEDED USAGE RESOURCES !!!";
933 Poco::Logger::root().log(Poco::Message(
drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_INFORMATION));
936 setSyncTaskAsTerminated(e.message(), e.code());
939 for (
size_t i=0;i<descendants.size();++i)
940 kill(descendants[i], SIGKILL);
948 Poco::Mutex::ScopedLock lock(mutex);
950 for (Tasks::iterator iter=tasks.begin();iter!=tasks.end(); ++iter)
952 if ((*iter).second.state == DRCETaskRequest::TaskState::IN_PROGRESS ||
953 (*iter).second.state == DRCETaskRequest::TaskState::SET_AS_NEW ||
954 (*iter).second.state == DRCETaskRequest::TaskState::QUEUED_TO_RUN)
957 asyncTask.
state = DRCETaskRequest::TaskState::TERMINATED;
965 kill(syncTask.
pid, SIGKILL);
972 const unsigned int periodSleep = 1000;
977 if (periodMs > periodSleep)
979 unsigned int repeat = periodMs/periodSleep;
980 for (
unsigned int i=0;i<repeat;++i)
982 Poco::Thread::sleep(periodSleep);
989 Poco::Thread::sleep(periodMs);
1001 catch(Poco::Exception& e)
1005 catch(std::exception& e)
1014 Poco::Mutex::ScopedLock lock(mutex);
1015 subtasks[
taskId][subtaskId] = needCleanup;
1020 Poco::Mutex::ScopedLock lock(mutex);
1021 for (SubTasks::iterator iter=subtasks.begin();iter!=subtasks.end();++iter)
1023 std::map<unsigned int, bool>::iterator it = (*iter).second.find(subtaskId);
1024 if (it!=(*iter).second.end())
1026 (*it).second = needCleanup;
1034 Poco::Mutex::ScopedLock lock(mutex);
1035 bool needCleanup =
false;
1036 for (SubTasks::iterator iter=subtasks.begin();iter!=subtasks.end();++iter)
1038 std::map<unsigned int, bool>::iterator it = (*iter).second.find(subtaskId);
1039 if (it!=(*iter).second.end())
1041 needCleanup = (*it).second;
1050 Poco::Mutex::ScopedLock lock(mutex);
1051 subtasks.erase(taskId);
1056 Poco::Mutex::ScopedLock lock(mutex);
1057 SubTasks::iterator iter = subtasks.find(taskId);
1058 if (iter!=subtasks.end())
1060 for (std::map<unsigned int, bool>::iterator iT=(*iter).second.begin();iT!=(*iter).second.end();++iT)
1062 (*iT).second =
true;
1069 Poco::Mutex::ScopedLock lock(mutex);
1070 return subtasks.begin();
1075 Poco::Mutex::ScopedLock lock(mutex);
1076 return subtasks.end();
1081 Poco::Mutex::ScopedLock lock(mutex);
1082 std::map<unsigned int, bool> sub;
1083 SubTasks::iterator iter = subtasks.find(taskId);
1084 if (iter!=subtasks.end())
1086 sub = (*iter).second;
1093 : asyncTasksQueue(asyncTasksQueue_),
taskId(taskId_)
1099 : asyncTasksQueue(asyncTasksQueue_),
taskId(asyncTask.
taskId)