hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DRCETaskRequestGetDataExecutor.cpp
Go to the documentation of this file.
1 #include <string.h>
2 #include <errno.h>
3 
4 #include "DRCEFileStream.hpp"
7 #include "DRCEFileExtractor.hpp"
8 
11 
12 namespace HCE
13 {
14 namespace drce
15 {
16 //-----------------------------------------------------------------------------
18  DRCENodeOptions& nodeOptions_, CustomMessage& message_, DRCEResourceMonitor& resourceMonitor_)
19 : inherited(asyncTaskQueue_, nodeOptions_, message_, resourceMonitor_)
20 {
21 }
22 //-----------------------------------------------------------------------------
24 {
25  DRCEResultDataItem resultDataItem = getResultDataItem(*pTaskRequest, DRCETaskRequest::TaskState::UNDEFINED);
26 
27  try
28  {
29  DRCETaskRequestGetData* pTaskRequestGetData = dynamic_cast<DRCETaskRequestGetData*>(pTaskRequest);
30  if (!pTaskRequestGetData)
32 
33  if (pTaskRequestGetData->getFetchType()!=DRCETaskRequestGetData::FetchType::ftDeleteDataAfterFetch &&
34  pTaskRequestGetData->getFetchType()!=DRCETaskRequestGetData::FetchType::ftNotDeleteDataAfterFetch)
35  throw Poco::Exception(message(message_const::WRONG_FETCH_TYPE, static_cast<unsigned int>(pTaskRequestGetData->getFetchType())), ERROR_BAD_COMMAND_PARAMS);
36 
37  if (asyncTaskQueue.isExistAsyncTask(pTaskRequestGetData->getTaskId()))
38  {
39  DRCETaskRequest::TaskState taskState = asyncTaskQueue.getAsyncTask(pTaskRequestGetData->getTaskId()).state;
40  if (taskState == DRCETaskRequest::TaskState::IN_PROGRESS ||
41  taskState == DRCETaskRequest::TaskState::SET_AS_NEW ||
42  taskState == DRCETaskRequest::TaskState::QUEUED_TO_RUN)
43  {
44  resultDataItem.setState(taskState);
45  }
46  }
47  else
48  {
49  const std::string dataFileName = FileStream::getDataFileName(nodeOptions.getTasksDataDir(), pTaskRequestGetData->getTaskId());
50 
51  resultDataItem.setState(static_cast<unsigned int>(DRCETaskRequest::TaskState::NOT_FOUND));
52 
53  AsyncTaskLocker lock(asyncTaskQueue, pTaskRequestGetData->getTaskId());
54  // read data file content
55  DataFileExtractor dataFileExtractor(message);
56  resultDataItem = dataFileExtractor.extract(dataFileName);
57 
58  // delete data file after fetch if need
59  if (pTaskRequestGetData->getFetchType() == DRCETaskRequestGetData::FetchType::ftDeleteDataAfterFetch)
60  {
61  DRCECommonTask::cleanup(pTaskRequestGetData->getTaskId(), nodeOptions, message, true);
62  }
63  }
64  }
65  catch(Poco::Exception& e)
66  {
67  resultDataItem.setErrorCode(e.code());
68  resultDataItem.setErrorMessage(e.message());
69  resultDataItem.setState(static_cast<unsigned int>(DRCETaskRequest::TaskState::CRASHED));
70  }
71  catch(std::exception& e)
72  {
73  resultDataItem.setErrorCode(ERROR_EXECUTE_TASK);
74  resultDataItem.setErrorMessage(e.what());
75  resultDataItem.setState(static_cast<unsigned int>(DRCETaskRequest::TaskState::CRASHED));
76  }
77  return resultDataItem;
78 }
79 //-----------------------------------------------------------------------------
80 //-----------------------------------------------------------------------------
81 } // end namespace drce
82 } // end namespace HCE