hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DRCECommonTask.cpp
Go to the documentation of this file.
1 #include <string.h>
2 #include <errno.h>
3 #include <list>
4 #include <Poco/Logger.h>
5 #include <Poco/File.h>
6 
7 #include "DRCEMessageConst.hpp"
9 #include "DRCEError.hpp"
10 #include "DRCEFileStream.hpp"
11 #include "DRCECommonTask.hpp"
12 #include "DRCEFileExtractor.hpp"
13 #include "DRCEFileBuilder.hpp"
15 
16 namespace HCE
17 {
18 namespace drce
19 {
20 //-----------------------------------------------------------------------------
22  DRCETaskRequest::TaskState allowedTaskState_, SessionOptions::ThreadMode threadMode_)
23 : asyncTasksQueue(asyncTasksQueue_), asyncTask(asyncTask_), allowedTaskState(allowedTaskState_), threadMode(threadMode_)
24 {
25 }
26 //-----------------------------------------------------------------------------
28 {
29  if (threadMode == SessionOptions::ThreadMode::tmAsync)
30  {
31  if (asyncTasksQueue.isExistAsyncTask(asyncTask.taskId))
32  {
33  AsyncTasks task = asyncTasksQueue.getAsyncTask(asyncTask.taskId);
34  if (task.state == allowedTaskState)
35  {
36  asyncTask.pid = pid;
37  asyncTask.timeElapsed = DRCECommonTask::getElapsedTimeMsec(Poco::Timestamp::fromEpochTime(asyncTask.timeStart));
38  asyncTasksQueue.safeChangeState(std::forward<AsyncTasks>(AsyncTasks(asyncTask)), nullptr);
39  }
40  }
41  }
42  else if (threadMode == SessionOptions::ThreadMode::tmSync)
43  {
44  asyncTask.pid = pid;
45  asyncTask.timeElapsed = DRCECommonTask::getElapsedTimeMsec(Poco::Timestamp::fromEpochTime(asyncTask.timeStart));
46  }
47 }
48 //-----------------------------------------------------------------------------
49 //-----------------------------------------------------------------------------
50 DRCECommonTask::DRCECommonTask(const std::string& taskName_, DRCENodeOptions& nodeOptions_, CustomMessage& message_,
51  const DRCETaskRequestSetExecute& taskRequestSetExecute_, DRCEAsyncTasksQueue& asyncTasksQueue_,
52  DRCEResourceMonitor& resourceMonitor_, const DRCEInputJsonMessage& inputJsonMessage_, unsigned int parentTaskId_)
53 : inherited(taskName_), nodeOptions(nodeOptions_), message(message_), taskRequestSetExecute(taskRequestSetExecute_),
54  asyncTasksQueue(asyncTasksQueue_), resourceMonitor(resourceMonitor_), inputJsonMessage(inputJsonMessage_), parentTaskId(parentTaskId_)
55 {
56 }
57 //-----------------------------------------------------------------------------
58 void DRCECommonTask::makeFilesBefore(DRCEFilesList& filesList) throw (Poco::Exception)
59 {
60  const size_t filesCount = filesList.getFilesCount();
61 
62  for (size_t i=0;i<filesCount;++i)
63  {
64  const std::string fileName = filesList.getFileItem(i).name;
65 
66  if ((filesList.getFileItem(i).actionType & FileItem::CREATE_BEFORE_EXEC_AND_WRITE_DATA)!=0)
67  {
68  std::ofstream ofs(fileName.c_str(), std::fstream::trunc|std::fstream::binary);
69  if (!ofs)
70  throw Poco::Exception(message(message_const::CREATE_FILE, fileName) , ERROR_CREATE_FILE_BEFORE_EXEC);
71 
72  ofs << (((filesList.getFileItem(i).actionType & FileItem::ENCODE_DECODE_CONTENT_BASE64)!=0)?DRCEDecodeBase64(filesList.getFileItem(i).data):filesList.getFileItem(i).data);
73  ofs.close();
74  }
75  if ((filesList.getFileItem(i).actionType & FileItem::DELETE_BEFORE_EXEC)!=0)
76  {
77  if (remove(fileName.c_str())!=0)
78  throw Poco::Exception(message(message_const::DELETE_FILE, fileName)+
79  message(message_const::ERROR, strerror(errno)),
81  }
82  }
83 }
84 //-----------------------------------------------------------------------------
86 {
87  const size_t filesCount = filesList.getFilesCount();
88  for (size_t i=0;i<filesCount;++i)
89  {
90  const std::string fileName = filesList.getFileItem(i).name;
91 
92  try
93  {
95  {
96  std::ifstream ifs;
97  ifs.open(fileName, std::fstream::binary);
98  if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
99  throw Poco::Exception(message(message_const::READ_FILE, fileName), ERROR_READ_FILE_AFTER_EXEC);
100 
101  std::string fileContent;
102  ifs.seekg(0, std::ios::end);
103  fileContent.resize(ifs.tellg());
104  ifs.seekg(0, std::ios::beg);
105  ifs.read(const_cast<char*>(fileContent.c_str()), fileContent.size());
106  ifs.close();
107 
108  FileItem fileItem;
109  fileItem.actionType = filesList.getFileItem(i).actionType;
110  fileItem.name = filesList.getFileItem(i).name;
111  fileItem.data = (((filesList.getFileItem(i).actionType & FileItem::ENCODE_DECODE_CONTENT_BASE64)!=0)?DRCEEncodeBase64(fileContent):fileContent);
112  resultDataItem.addFileItem(std::forward<FileItem>(fileItem));
113  }
114  if ((filesList.getFileItem(i).actionType & FileItem::DELETE_AFTER_EXEC)!=0)
115  {
116  if (remove(fileName.c_str())!=0)
117  throw Poco::Exception(message(message_const::DELETE_FILE, fileName)+
118  message(message_const::ERROR, strerror(errno)),
120  }
121  }
122  catch(Poco::Exception& e)
123  {
124  std::stringstream errMsg(resultDataItem.getErrorMessage());
125  if (!errMsg.str().empty())
126  errMsg << ", ";
127  errMsg << e.message();
128  resultDataItem.setErrorMessage(errMsg.str());
129 
130  resultDataItem.setErrorCode(e.code());
131  }
132  }
133 }
134 //-----------------------------------------------------------------------------
135 void DRCECommonTask::cleanup(bool needCleanup)
136 {
138 }
139 //-----------------------------------------------------------------------------
140 void DRCECommonTask::cleanup(unsigned int taskId, DRCENodeOptions& nodeOptions, CustomMessage& message, bool needCleanup)
141 {
142  Poco::File requetFile(FileStream::getRequestFileName(nodeOptions.getTasksDataDir(), taskId));
143  if (requetFile.exists())
144  {
148  DRCECommonTask::cleanup(inputJsonMessage, nodeOptions, message, needCleanup);
149  }
150  else
151  {
152  if (needCleanup)
153  {
160  }
161  }
162 }
163 //-----------------------------------------------------------------------------
164 void DRCECommonTask::cleanup(DRCEInputJsonMessage& inputJsonMessage, DRCENodeOptions& nodeOptions, CustomMessage& message, bool needCleanup)
165 {
166  try
167  {
168  DRCETaskRequestSetExecute* pTaskRequestSetExecute = dynamic_cast<DRCETaskRequestSetExecute*>(inputJsonMessage.getTaskRequest().get());
169  if (pTaskRequestSetExecute)
170  {
171  if (!needCleanup && pTaskRequestSetExecute->getSessionOptions().cleanup == SessionOptions::CleanupFlag::cfDelete)
172  needCleanup = true;
173 
174  if (needCleanup)
175  {
176  for (size_t i=0;i<inputJsonMessage.getSubtasksCount();++i)
177  {
178  DRCEInputJsonMessage inputMessage = inputJsonMessage.getSubtaskItem(i);
179  DRCECommonTask::cleanup(inputMessage, nodeOptions, message, needCleanup);
180  }
188  const DRCEFilesList& filesList = static_cast<const DRCEFilesList&>(*pTaskRequestSetExecute);
189  for (size_t i=0;i<filesList.getFilesCount();++i)
190  {
192  }
193  }
194  }
195  }
196  catch(Poco::Exception& e)
197  {
198  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
199  }
200 }
201 //-----------------------------------------------------------------------------
202 void DRCECommonTask::safeCleanup(const std::string& path)
203 {
204  try
205  {
206  Poco::File file(path);
207  if (file.exists())
208  {
209  file.remove(true);
210 
211  std::stringstream outMsg;
212  outMsg << "DRCECommonTask::safeCleanup path: " << path;
213  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
214  }
215  }
216  catch(Poco::Exception& e)
217  {
218  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
219  }
220 }
221 //-----------------------------------------------------------------------------
222 size_t DRCECommonTask::getElapsedTimeMsec(const Poco::Timestamp& tsStart)
223 {
224  Poco::Timestamp tsStop;
225  Poco::Timestamp::TimeDiff diff = tsStop-tsStart;
226  return static_cast<size_t>(diff/1000);
227 }
228 //-----------------------------------------------------------------------------
230 {
231  std::stringstream outMsg;
232  outMsg << "DRCECommonTask::executeSubtasks enter taskId: " << resultDataItem.getRequestId();
233  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
234 
236  {
238  for (size_t i=0;i<inputJsonMessage.getSubtasksCount();++i)
239  {
240  Poco::SharedPtr<DRCETaskRequest> pTaskRequest = inputJsonMessage.getSubtaskItem(i).getTaskRequest();
241  if (!pTaskRequest.isNull() && !isCancelled())
242  {
244  pTaskRequest->setRequestType(inputMessage.getRequestType());
245  pTaskRequest->setTaskId(inputMessage.getRequestId());
246  pTaskRequest->setParentTaskId(inputJsonMessage.getRequestId());
247 
248  DRCEResultDataItem resultItem = taskRequestSetExecuteExecutor.execute(pTaskRequest.get(), inputMessage);
249  resultDataItem.addSubtaskItem(std::forward<DRCEResultDataItem>(resultItem));
250  }
251  }
252  }
253 
254  if (!isCancelled())
255  {
256  try
257  {
258  AsyncTaskLocker lock(asyncTasksQueue, resultDataItem.getRequestId());
260  DRCEResultData resultData;
261  resultData.addDataItem(resultDataItem);
262  dataFileBuilder.build(resultData);
263  }
264  catch(Poco::Exception& e)
265  {
266  std::stringstream outMsg;
267  outMsg << "DRCECommonTask::executeSubtasks dataFileBuilder.build " << e.displayText();
268  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_ERROR));
269  throw;
270  }
271  catch(std::exception& e)
272  {
273  std::stringstream outMsg;
274  outMsg << "DRCECommonTask::executeSubtasks dataFileBuilder.build " << e.what();
275  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_ERROR));
276  throw;
277  }
278  }
279 }
280 //-----------------------------------------------------------------------------
282  DRCENodeOptions& nodeOptions,
283  CustomMessage& message,
284  DRCEAsyncTasksQueue& asyncTasksQueue,
285  unsigned int& progressCount,
286  DRCECommonTask* pCommonTask)
287 {
288  const std::string dataFileName = FileStream::getDataFileName(nodeOptions.getTasksDataDir(), resultDataItem.getRequestId());
289  Poco::File dataFile(dataFileName);
290  if (dataFile.exists())
291  {
292  AsyncTaskLocker lock(asyncTasksQueue, resultDataItem.getRequestId());
293  try
294  {
295  std::stringstream outMsg;
296  outMsg << "DRCECommonTask::updateTasks taskId: " << resultDataItem.getRequestId();
297  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
298 
299  try
300  {
301  DataFileExtractor dataFileExtractor(message);
302  resultDataItem = dataFileExtractor.extract(dataFileName);
303  }
304  catch(Poco::Exception& e)
305  {
306  std::stringstream outMsg;
307  outMsg << "DRCECommonTask::updateTasks dataFileExtractor.extract: " << e.displayText();
308  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
309  }
310  catch(std::exception& e)
311  {
312  std::stringstream outMsg;
313  outMsg << "DRCECommonTask::updateTasks dataFileExtractor.extract: " << e.what();
314  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_DEBUG));
315  }
316 
317  if (resultDataItem.getState()==static_cast<unsigned int>(DRCETaskRequest::TaskState::SET_AS_NEW) ||
318  resultDataItem.getState()==static_cast<unsigned int>(DRCETaskRequest::TaskState::IN_PROGRESS) ||
319  resultDataItem.getState()==static_cast<unsigned int>(DRCETaskRequest::TaskState::QUEUED_TO_RUN))
320  ++progressCount;
321 
322  // restart task if it continue hold in queue to run
323  if (resultDataItem.getState()==static_cast<unsigned int>(DRCETaskRequest::TaskState::QUEUED_TO_RUN))
324  {
325  outMsg.str("");
326  outMsg << "DRCECommonTask::updateTasks pCommonTask->executeSubtasks taskId: " << resultDataItem.getRequestId();
327  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
328 
329  if (pCommonTask)
330  pCommonTask->executeSubtasks(resultDataItem);
331  }
332 
333  for (size_t i=0;i<resultDataItem.getSubtasksCount();++i)
334  {
335  DRCEResultDataItem resultItem(resultDataItem.getSubtaskItem(i));
336  unsigned int subtasksProgressCount = 0;
337  updateTasks(resultItem, nodeOptions, message, asyncTasksQueue, subtasksProgressCount, pCommonTask);
338  resultDataItem.setSubtaskItem(i, std::forward<DRCEResultDataItem>(resultItem));
339  }
340 
341  DRCEResultData resultData;
342  resultData.addDataItem(resultDataItem);
343  DataFileBuilder dataFileBuilder(message, nodeOptions.getTasksDataDir());
344  dataFileBuilder.build(resultData);
345 
346  StatusFileBuilder statusFileBuilder(message, nodeOptions.getTasksStatusDir());
347  statusFileBuilder.build(resultData);
348  }
349  catch(Poco::Exception& e)
350  {
351  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::ERROR, e.displayText()), Poco::Message::Priority::PRIO_ERROR));
352  }
353  catch(std::exception& e)
354  {
355  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::ERROR, e.what()), Poco::Message::Priority::PRIO_ERROR));
356  }
357  }
358 }
359 //-----------------------------------------------------------------------------
360 void DRCECommonTask::updateTasks(DRCEResultDataItem& resultDataItem, unsigned int& progressCount)
361 {
362  updateTasks(resultDataItem, nodeOptions, message, asyncTasksQueue, progressCount, this);
363 }
364 //-----------------------------------------------------------------------------
366 {
367  if (inputJsonMessage.getRequestId())
368  {
369  DRCETaskRequestSetExecute* pTaskRequestSetExecute = dynamic_cast<DRCETaskRequestSetExecute*>(inputJsonMessage.getTaskRequest().get());
370  if (pTaskRequestSetExecute)
371  {
372  subtasks.push_back(std::make_pair(inputJsonMessage.getRequestId(), pTaskRequestSetExecute->getSessionOptions().cleanup));
373 
374  std::stringstream outMsg;
375  outMsg << "DRCECommonTask::getSubtasksList taskId: " << inputJsonMessage.getRequestId()
376  << " cleanup: " << std::boolalpha << (pTaskRequestSetExecute->getSessionOptions().cleanup==SessionOptions::CleanupFlag::cfDelete);
377  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
378 
379  for (size_t i=0;i<inputJsonMessage.getSubtasksCount();++i)
380  {
381  DRCEInputJsonMessage inputMessage(inputJsonMessage.getSubtaskItem(i));
382  getSubtasksList(inputMessage, subtasks);
383  }
384  }
385  }
386 }
387 //-----------------------------------------------------------------------------
389 {
391  try
392  {
393  Poco::File requestFile(FileStream::getRequestFileName(nodeOptions.getTasksDataDir(), taskId));
394  if (requestFile.exists())
395  {
396  RequestFileExtractor requestFileExtractor(message);
397  inputJsonMessage = requestFileExtractor.extract(requestFile.path());
398  }
399  }
400  catch(Poco::Exception& e)
401  {
402  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
403  }
404  catch(std::exception& e)
405  {
406  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.what(), Poco::Message::Priority::PRIO_ERROR));
407  }
408  return inputJsonMessage;
409 }
410 //-----------------------------------------------------------------------------
412  DRCENodeOptions& nodeOptions,
413  CustomMessage& message,
414  DRCEAsyncTasksQueue& asyncTasksQueue,
415  Poco::Task* pTask)
416 {
417  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, "DRCECommonTask::waitUpdateAllTasks enter", Poco::Message::Priority::PRIO_TRACE));
418 
419  const std::string dataFileName = FileStream::getDataFileName(nodeOptions.getTasksDataDir(), taskId);
420  Poco::File dataFile(dataFileName);
421 
422  std::stringstream outMsg;
423  outMsg << "DRCECommonTask::waitUpdateAllTasks '" << dataFileName << "' isExist = " << std::boolalpha << dataFile.exists() << std::endl;
424  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
425 
426  if (dataFile.exists())
427  {
428  DataFileExtractor dataFileExtractor(message);
429  try
430  {
431  DRCEResultDataItem resultDataItem = dataFileExtractor.extract(dataFileName);
432  unsigned int progressCount = 0;
433  DRCECommonTask::updateTasks(resultDataItem, nodeOptions, message, asyncTasksQueue, progressCount);
434  while (progressCount)
435  {
436  progressCount = 0;
437  Poco::Thread::sleep(1000);
438  DRCECommonTask::updateTasks(resultDataItem, nodeOptions, message, asyncTasksQueue, progressCount);
439 
440 
441  std::stringstream outMsg;
442  outMsg << "DRCECommonTask::waitUpdateAllTasks taskId: " << resultDataItem.getRequestId() << " progressCount: " << progressCount;
443  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
444 
445  if (pTask)
446  if (pTask->isCancelled())
447  break;
448  }
449  }
450  catch(Poco::Exception& e)
451  {
452  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_DEBUG));
453  }
454  }
455 }
456 //-----------------------------------------------------------------------------
457 void DRCECommonTask::waitEndAllTask(DRCEInputJsonMessage& inputJsonMessage, DRCEAsyncTasksQueue& asyncTasksQueue, Poco::Task* pTask)
458 {
459  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, "DRCECommonTask::waitEndAllTask enter", Poco::Message::Priority::PRIO_TRACE));
460 
461  SubtasksList subtasks;
462  getSubtasksList(inputJsonMessage, subtasks);
463 
464  for (SubtasksList::iterator iter=subtasks.begin();iter!=subtasks.end();++iter)
465  {
466  if ((*iter).first == inputJsonMessage.getRequestId())
467  {
468  subtasks.erase(iter);
469  break;
470  }
471  }
472 
473  if (!subtasks.empty())
474  {
475  bool needWait = true;
476  while (needWait)
477  {
478  for (size_t i=0;i<subtasks.size();++i)
479  {
480  if (asyncTasksQueue.isExistAsyncTask(subtasks[i].first))
481  {
482  std::stringstream outMsg;
483  outMsg << "DRCECommonTask::waitEndAllTask (" << i << ") asyncTasksQueue.isExistAsyncTask(" <<subtasks[i].first <<")";
484  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, outMsg.str(), Poco::Message::Priority::PRIO_TRACE));
485 
486  break;
487  }
488  if ((i+1)==subtasks.size())
489  return;
490  }
491  Poco::Thread::sleep(1000);
492  if (pTask)
493  if (pTask->isCancelled())
494  break;
495  }
496  }
497 }
498 //-----------------------------------------------------------------------------
500 {
502 }
503 //-----------------------------------------------------------------------------
505 {
507 }
508 //-----------------------------------------------------------------------------
510 {
511  try
512  {
513  RequestFileBuilder requestFileBuilder(message, nodeOptions.getTasksDataDir());
514  requestFileBuilder.build(inputJsonMessage);
515  }
516  catch(Poco::Exception& e)
517  {
518  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
519  }
520  catch(std::exception& e)
521  {
522  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.what(), Poco::Message::Priority::PRIO_ERROR));
523  }
524 }
525 //-----------------------------------------------------------------------------
526 void DRCECommonTask::saveRequest(unsigned int taskId, DRCENodeOptions& nodeOptions, CustomMessage& message)
527 {
528  try
529  {
530  Poco::File requestFile(FileStream::getRequestFileName(nodeOptions.getTasksDataDir(), taskId));
531  if (requestFile.exists())
532  {
533  RequestFileExtractor requestFileExtractor(message);
534  DRCEInputJsonMessage inputJsonMessage = requestFileExtractor.extract(requestFile.path());
535 
536  saveRequest(inputJsonMessage, nodeOptions, message);
537  }
538  }
539  catch(Poco::Exception& e)
540  {
541  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
542  }
543  catch(std::exception& e)
544  {
545  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.what(), Poco::Message::Priority::PRIO_ERROR));
546  }
547 }
548 //-----------------------------------------------------------------------------
549 void DRCECommonTask::saveRequest(DRCEInputJsonMessage& inputJsonMessage, DRCENodeOptions& nodeOptions, CustomMessage& message)
550 {
551  try
552  {
553  RequestFileBuilder requestFileBuilder(message, nodeOptions.getTasksDataDir());
554  requestFileBuilder.build(inputJsonMessage);
555  }
556  catch(Poco::Exception& e)
557  {
558  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
559  }
560  catch(std::exception& e)
561  {
562  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.what(), Poco::Message::Priority::PRIO_ERROR));
563  }
564 
565  for (size_t i=0;i<inputJsonMessage.getSubtasksCount();++i)
566  {
567  DRCEInputJsonMessage inputMessage(inputJsonMessage.getSubtaskItem(i));
568  saveRequest(inputMessage, nodeOptions, message);
569  }
570 }
571 //-----------------------------------------------------------------------------
572 void DRCECommonTask::saveResultData(DRCEResultDataItem& resultDataItem, DRCENodeOptions& nodeOptions, CustomMessage& message, DRCEAsyncTasksQueue& asyncTasksQueue)
573 {
574  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, "DRCECommonTask::saveResultData enter", Poco::Message::Priority::PRIO_TRACE));
575 
576  try
577  {
578  AsyncTaskLocker lock(asyncTasksQueue, resultDataItem.getRequestId());
579 
580  DRCEResultData resultData;
581  resultData.addDataItem(resultDataItem);
582  DataFileBuilder dataFileBuilder(message, nodeOptions.getTasksDataDir());
583  dataFileBuilder.build(resultData);
584 
585  StatusFileBuilder statusFileBuilder(message, nodeOptions.getTasksStatusDir());
586  statusFileBuilder.build(resultData);
587  }
588  catch(Poco::Exception& e)
589  {
590  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
591  }
592  catch(std::exception& e)
593  {
594  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.what(), Poco::Message::Priority::PRIO_ERROR));
595  }
596 
597  for (size_t i=0;i<resultDataItem.getSubtasksCount();++i)
598  {
599  DRCEResultDataItem resultItem(resultDataItem.getSubtaskItem(i));
600  saveResultData(resultItem, nodeOptions, message, asyncTasksQueue);
601  }
602 }
603 //-----------------------------------------------------------------------------
606  unsigned int pid, size_t timeElapsed, unsigned int exitStatus)
607 {
608  DRCEResultDataItem resultDataItem;
609 
610  for (size_t i=0;i<inputJsonMessage.getSubtasksCount();++i)
611  {
612  DRCEInputJsonMessage inputMessage(inputJsonMessage.getSubtaskItem(i));
613  DRCEResultDataItem resultItem = makeResultDataItem(inputMessage, nodeOptions, message, requestType, taskState, errorCode, errorMessage, 0, timeElapsed, exitStatus);
614  resultDataItem.addSubtaskItem(resultItem);
615  }
616 
617  resultDataItem.setTime(timeElapsed);
618  resultDataItem.setPid(pid);
619  resultDataItem.setErrorCode(errorCode);
620  resultDataItem.setErrorMessage(errorMessage);
621  resultDataItem.setRequestId(inputJsonMessage.getRequestId());
622  resultDataItem.setRequestType(requestType);
623  resultDataItem.setState(static_cast<unsigned int>(taskState));
624  resultDataItem.setExitStatus(exitStatus);
625 
626  Poco::File statusFile(FileStream::getStatusFileName(nodeOptions.getTasksStatusDir(), inputJsonMessage.getRequestId()));
627  if (statusFile.exists())
628  {
629  try
630  {
631  DataFileExtractor dataFileExtractor(message);
632  DRCEResultDataItem resultItem = dataFileExtractor.extract(statusFile.path());
633 
634  resultDataItem.setPid(resultItem.getPid());
635  resultDataItem.setTime(resultItem.getTime());
636  resultDataItem.setRequestId(resultItem.getRequestId());
637  resultDataItem.setRequestType(requestType);
638  resultDataItem.setState(static_cast<unsigned int>(taskState));
639  }
640  catch(Poco::Exception& e)
641  {
642  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_TRACE));
643  }
644  }
645 
646  resultDataItem.setNodeName(nodeOptions.getNodeName());
647  resultDataItem.setNodeHost(nodeOptions.getNodeHost());
648  resultDataItem.setNodePort(nodeOptions.getNodePort());
649 
650  return resultDataItem;
651 }
652 //-----------------------------------------------------------------------------
653 unsigned int DRCECommonTask::getHostParentTaskId(unsigned int taskId, DRCEAsyncTasksQueue& asyncTasksQueue)
654 {
655  unsigned int parentTaskId = taskId;
656 
657  try
658  {
659  if (asyncTasksQueue.isExistAsyncTask(taskId))
660  {
661  AsyncTasks asyncTask = asyncTasksQueue.getAsyncTask(taskId);
662  if (asyncTask.parentTaskId)
663  parentTaskId = getHostParentTaskId(asyncTask.parentTaskId, asyncTasksQueue);
664  else
665  parentTaskId = asyncTask.taskId;
666  }
667  }
668  catch(Poco::Exception& e)
669  {
670  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
671  }
672  return parentTaskId;
673 }
674 //-----------------------------------------------------------------------------
676 {
677  DRCEResultDataItem resultDataItem;
678  Poco::File dataFile(fileName);
679  if (dataFile.exists())
680  {
681  try
682  {
683  MessagesCollection messagesCollection(message_const::messages);
684  CustomMessage message(messagesCollection);
685  DataFileExtractor dataFileExctractor(message);
686  resultDataItem = dataFileExctractor.extract(dataFile.path());
687  }
688  catch(Poco::Exception& e)
689  {
690  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
691  }
692  }
693  return resultDataItem;
694 }
695 //-----------------------------------------------------------------------------
696 // cppcheck-suppress unusedFunction
698  DRCENodeOptions& nodeOptions,
699  DRCEAsyncTasksQueue& asyncTasksQueue)
700 {
701  AsyncTaskLocker lock(asyncTasksQueue, taskId);
703 }
704 //-----------------------------------------------------------------------------
705 // cppcheck-suppress unusedFunction
707  DRCENodeOptions& nodeOptions,
708  DRCEAsyncTasksQueue& asyncTasksQueue)
709 {
710  AsyncTaskLocker lock(asyncTasksQueue, taskId);
712 }
713 //-----------------------------------------------------------------------------
714 //-----------------------------------------------------------------------------
715 } // end namespace drce
716 } // end namespace HCE