hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DRCEFunctionalObject.cpp
Go to the documentation of this file.
1 #include <unistd.h>
2 #include <fstream>
3 #include <Poco/Logger.h>
4 
7 #include "DRCEDefaultJSON.hpp"
8 #include "DRCEMessageConst.hpp"
14 #include "DRCEReadProcessData.hpp"
15 #include "DRCEListAllTasks.hpp"
19 
20 namespace HCE
21 {
22 namespace drce
23 {
24 //-----------------------------------------------------------------------------
25 DRCEFunctionalObject::DRCEFunctionalObject(const std::string& nodeName_, const std::string& homeDir_,
26  const std::string& tasksDataDir_, const std::string& tasksStatusDir_,
27  const std::string& nodeHost_, const std::string& nodePort_)
28 : DRCEFunctionalObjectBase(), DRCENodeOptions(nodeName_, "", tasksDataDir_, tasksStatusDir_, nodeHost_, nodePort_),
29  messagesCollection(message_const::messages), message(messagesCollection), notificationExecutor(),
30  asyncTaskQueue(*this, notificationExecutor, message), resourceMonitor(asyncTaskQueue, message), requestTime(0)
31 {
32  setHomeDir(homeDir_);
35 }
36 //-----------------------------------------------------------------------------
38 {
41 }
42 //-----------------------------------------------------------------------------
44 {
46 }
47 //-----------------------------------------------------------------------------
49 {
51 }
52 //-----------------------------------------------------------------------------
54 {
56 }
57 //-----------------------------------------------------------------------------
59 {
61 }
62 //-----------------------------------------------------------------------------
64 {
66 }
67 //-----------------------------------------------------------------------------
69 {
71 }
72 //-----------------------------------------------------------------------------
74 {
76 }
77 //-----------------------------------------------------------------------------
79 {
81 }
82 //-----------------------------------------------------------------------------
84 {
86 }
87 //-----------------------------------------------------------------------------
89 {
91 }
92 //-----------------------------------------------------------------------------
93 std::string DRCEFunctionalObject::makeErrorMessage(const std::string& inputMessage,
94  unsigned int state,
95  unsigned int errorCode,
96  const std::string& errorMessage)
97 {
98  HCE::drce::DRCEInputJsonMessage inputJsonMessage(inputMessage);
99  HCE::drce::DRCEResultDataItem resultDataItem;
100  resultDataItem.setRequestId(inputJsonMessage.getRequestId());
101  resultDataItem.setState(state);
102  resultDataItem.setErrorCode(errorCode);
103  resultDataItem.setErrorMessage(errorMessage);
104 
105  HCE::drce::DRCEResultData resultData;
106  resultData.addDataItem(std::forward<HCE::drce::DRCEResultDataItem>(resultDataItem));
107 
108  HCE::drce::DRCEOutputJsonMessage outputJsonMessage;
109  outputJsonMessage.setResultData(std::forward<HCE::drce::DRCEResultData>(resultData));
110 
111  std::string resultJsonMessage;
112  if (!outputJsonMessage.serialize(resultJsonMessage))
113  {
114  std::stringstream errMsg;
115  errMsg << message_const::serializationOutputJsonMessageError << ": " << outputJsonMessage.getErrorMsg();
116  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, errMsg.str(), Poco::Message::Priority::PRIO_ERROR));
117  }
118  return resultJsonMessage;
119 }
120 //-----------------------------------------------------------------------------
121 void DRCEFunctionalObject::setNotificationFunctor(std::function<std::string(const std::string&)> notificationFunctor_)
122 {
123  notificationExecutor.setNotificationFunctor(notificationFunctor_);
124 }
125 //-----------------------------------------------------------------------------
126 bool DRCEFunctionalObject::setHomeDir(const std::string& homeDir_)
127 {
128  resetError();
129  if (chdir(homeDir_.c_str())==0)
130  {
131  homeDir=homeDir_;
132  }
133  else
134  {
135  isError(true);
137  setErrorMsg(message(message_const::SET_HOME_DIR)+message(message_const::ERROR, strerror(errno)));
138  }
139  return !_isError;
140 }
141 //-----------------------------------------------------------------------------
143 {
144  isError(false);
146  setErrorMsg("");
147 }
148 //-----------------------------------------------------------------------------
149 void DRCEFunctionalObject::setMaxThreadCount(unsigned int threadCount)
150 {
151  asyncTaskQueue.setMaxThreadCount(threadCount);
152 }
153 //-----------------------------------------------------------------------------
155 {
156  return asyncTaskQueue.getMaxThreadCount();
157 }
158 //-----------------------------------------------------------------------------
159 // cppcheck-suppress unusedFunction
161 {
162  asyncTaskQueue.saveTasksQueue();
163 }
164 //-----------------------------------------------------------------------------
165 // cppcheck-suppress unusedFunction
167 {
168  asyncTaskQueue.loadTasksQueue();
169 }
170 //-----------------------------------------------------------------------------
172 {
173  asyncTaskQueue.setTasksQueueDumpPeriod(periodMs);
174 }
175 //-----------------------------------------------------------------------------
177 {
178  return asyncTaskQueue.getTasksQueueDumpPeriod();
179 }
180 //-----------------------------------------------------------------------------
181 // cppcheck-suppress unusedFunction
183 {
184  resourceMonitor.setTimePeriod(timePeriod_);
185 }
186 //-----------------------------------------------------------------------------
187 // cppcheck-suppress unusedFunction
189 {
190  return resourceMonitor.getTimePeriod();
191 }
192 //-----------------------------------------------------------------------------
193 // cppcheck-suppress unusedFunction
194 void DRCEFunctionalObject::setResetErrorCodeStateNotification(bool resetErrorCodeStateNotification_)
195 {
196  asyncTaskQueue.setResetErrorCodeStateNotification(resetErrorCodeStateNotification_);
197 }
198 //-----------------------------------------------------------------------------
199 // cppcheck-suppress unusedFunction
201 {
202  return asyncTaskQueue.getResetErrorCodeStateNotification();
203 }
204 //-----------------------------------------------------------------------------
206 {
207  StatVariables statVariables;
208  AvgCounts avgCounts;
209  resourceMonitor.dump(statVariables, avgCounts);
210 
211  StatVariablesConverter statVariablesConverter(statVariables);
212  Dictionary statVars = statVariablesConverter.toDictionary();
213 
214  AvgCountsConverter avgCountsConverter(avgCounts);
215  Dictionary avgVars = avgCountsConverter.toDictionary();
216 
217  statVars.insert(avgVars.begin(), avgVars.end());
218  return statVars;
219 }
220 //-----------------------------------------------------------------------------
222 {
223  StatVariablesConverter statVariablesConverter(dictionary);
224  StatVariables statVariables = statVariablesConverter.toObject();
225 
226  AvgCountsConverter avgCountsConverter(dictionary);
227  AvgCounts avgCounts = avgCountsConverter.toObject();
228 
229  resourceMonitor.restore(statVariables, avgCounts);
230 }
231 //-----------------------------------------------------------------------------
232 // cppcheck-suppress unusedFunction
234 {
235  return resourceMonitor.getStatVariables();
236 }
237 //-----------------------------------------------------------------------------
239 {
240  return asyncTaskQueue.getAsyncTasksCount();
241 }
242 //-----------------------------------------------------------------------------
244 {
245  return asyncTaskQueue.hasTask(taskId);
246 }
247 //-----------------------------------------------------------------------------
249 {
250  return asyncTaskQueue.getCurrentTasksQueueAsString();
251 }
252 //-----------------------------------------------------------------------------
254 {
255  std::string result;
256  DRCEListAllTasks listAllTasks;
257  if (listAllTasks.rebuild(getTasksStatusDir()))
258  {
259  result = listAllTasks.toString();
260  }
261 
262  if (listAllTasks.isError())
263  {
264  Poco::Logger::root().log(Poco::Message(drce_const::moduleName,
265  message(message_const::GET_LIST_ALL_TASKS_ERROR, listAllTasks.getErrorMsg()),
266  Poco::Message::Priority::PRIO_ERROR));
267  }
268  return result;
269 }
270 //-----------------------------------------------------------------------------
272 {
273  std::string result;
275  try
276  {
278  HCE::drce::ResourceUsageSerializator resourceUsageSerializator(resourceUsage);
279  resourceUsageSerializator.serialize(result);
280  if (resourceUsageSerializator.isError())
281  {
282  Poco::Logger::root().log(Poco::Message(drce_const::moduleName,
283  message(message_const::GET_RESOURCE_USAGE_ERROR, resourceUsageSerializator.getErrorMsg()),
284  Poco::Message::Priority::PRIO_ERROR));
285  }
286  }
287  catch(Poco::Exception& e)
288  {
289  Poco::Logger::root().log(Poco::Message(drce_const::moduleName,
290  message(message_const::GET_RESOURCE_USAGE_ERROR, e.message()),
291  Poco::Message::Priority::PRIO_ERROR));
292  }
293  return result;
294 }
295 //-----------------------------------------------------------------------------
296 std::string DRCEFunctionalObject::getDefaultJSON(const std::string& jsonErrorMessage, unsigned int jsonErrorCode)
297 {
298  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::CREATE_DEFAULT_JSON), Poco::Message::Priority::PRIO_TRACE));
299 
300  DRCEDefaultJSON defaultJson;
301  defaultJson.setJsonErrorMessage(jsonErrorMessage);
302  defaultJson.setJsonErrorCode(jsonErrorCode);
303  defaultJson.setJsonNodeName(nodeName);
304  defaultJson.setJsonNodeHost(nodeHost);
305  defaultJson.setJsonNodePort(nodePort);
306  defaultJson.makeJSON();
307 
308  return defaultJson.getJSON();
309 }
310 //-----------------------------------------------------------------------------
311 std::string DRCEFunctionalObject::Process(const std::string& json)
312 {
313  resetError();
314  std::string resJson;
315 
316  try
317  {
318  DRCEInputJsonMessage inputJsonMessage(json);
319  if (inputJsonMessage.isError())
320  throw Poco::Exception(inputJsonMessage.getErrorMsg(), PARSE_ERROR);
321 
322  DRCEOutputJsonMessage outputMessage = Process(inputJsonMessage);
323 
324  if (!outputMessage.serialize(resJson))
325  throw Poco::Exception(outputMessage.getErrorMsg(), outputMessage.getErrorCode());
326  }
327  catch(Poco::Exception& e)
328  {
329  isError(true);
330  errorMsg = e.message();
331  errorCode = e.code();
332 
333  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, message(message_const::ERROR_MSG, errorMsg)+
335  Poco::Message::Priority::PRIO_ERROR));
336  }
337  return (resJson.empty())?getDefaultJSON(errorMsg, errorCode):resJson;
338 }
339 //-----------------------------------------------------------------------------
341 {
342  resetError();
343  requestTime = 0;
344  Poco::Timestamp tsStart;
345  DRCEOutputJsonMessage outputMessage;
346  DRCEResultData resultData;
347  DRCEResultDataItem resultDataItem;
348 
349  if (!process(inputJsonMessage, resultDataItem))
350  {
351  resultDataItem = DRCETaskRequestExecutor::getResultDataItem(inputJsonMessage.getRequestId(),
352  inputJsonMessage.getRequestType(),
353  *this,
354  DRCETaskRequest::TaskState::UNDEFINED);
355  }
356  resultData.addDataItem(std::forward<DRCEResultDataItem>(resultDataItem));
357  outputMessage.setResultData(resultData);
358 
359  requestTime = getElapsedTimeMsec(tsStart);
360 
361  return outputMessage;
362 }
363 //-----------------------------------------------------------------------------
364 bool DRCEFunctionalObject::process(DRCEInputJsonMessage& inputJsonMessage, DRCEResultDataItem& resultDataItem)
365 {
366  bool result = false;
367  try
368  {
369  Poco::SharedPtr<DRCETaskRequest> pTaskRequest = inputJsonMessage.getTaskRequest();
370  if (pTaskRequest.isNull())
371  throw Poco::Exception(message(message_const::WRONG_TYPE_TASK_REQUEST));
372 
373  Poco::SharedPtr<DRCETaskRequestExecutor> pTaskRequestExecutor = DRCETaskRequestExecutorFactory::create(inputJsonMessage.getRequestType(),
374  asyncTaskQueue, *this, message, resourceMonitor);
375 
376  if (pTaskRequestExecutor.isNull())
377  throw Poco::Exception(message(message_const::WRONG_TYPE_TASK_REQUEST));
378 
379  pTaskRequest->setTaskId(inputJsonMessage.getRequestId());
380 
381  if (pTaskRequest->getRequestType() == DRCETaskRequest::RequestType::rtSetTaskExecute)
382  applyEnvironments(pTaskRequest.get());
383 
384  resultDataItem = pTaskRequestExecutor->execute(pTaskRequest.get(), inputJsonMessage);
385 
386  result = true;
387  }
388  catch(Poco::Exception& e)
389  {
390  isError(true);
391  setErrorCode(e.code());
392  setErrorMsg(e.message());
393  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.displayText(), Poco::Message::Priority::PRIO_ERROR));
394  }
395  catch(std::exception& e)
396  {
397  isError(true);
399  setErrorMsg(e.what());
400  Poco::Logger::root().log(Poco::Message(drce_const::moduleName, e.what(), Poco::Message::Priority::PRIO_ERROR));
401  }
402 
403  return result;
404 }
405 //-----------------------------------------------------------------------------
406 //-----------------------------------------------------------------------------
407 } // namespace drce
408 } // namespace HCE