hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DRCETaskThreadingModeTest.cpp
Go to the documentation of this file.
1 #include <gtest/gtest.h>
2 #include <fstream>
3 #include <utility>
4 #include <vector>
5 #include <Poco/File.h>
6 
7 #include "DRCEError.hpp"
8 #include "DRCESessionOptions.hpp"
11 #include "DRCEFunctionalObject.hpp"
13 #include "DRCEResultData.hpp"
14 #include "DRCEPrintStatus.hpp"
15 #include "DRCEFileStream.hpp"
21 #include "DRCEStressTest.hpp"
22 #include "DRCECleanupTest.hpp"
23 #include "DRCETasksQueue.hpp"
24 #include "DRCEFileExtractor.hpp"
25 #include "CustomMessage.hpp"
26 #include "DRCEMessageConst.hpp"
27 #include "DRCEListAllTasks.hpp"
28 #include "DRCEResourceLimits.hpp"
29 
30 namespace HCE
31 {
32 namespace drce
33 {
34 namespace tests
35 {
36 //-----------------------------------------------------------------------------
37 static size_t count = 0;
39 {
40  std::string operator()(const std::string& data)
41  {
42  ++count;
43  return data+"\n";
44  }
45 };
46 static TestFunctor testFunctor;
47 //-----------------------------------------------------------------------------
48 //-----------------------------------------------------------------------------
50 {
51  DRCENodeOptions nodeOptions("", "", "", "");
52  nodeOptions.setNodeHost("NODE_HOST");
53  nodeOptions.setNodePort("NODE_PORT");
54  nodeOptions.setNodeName("NODE_NAME");
55  if (homeDir.empty())
56  {
57  char* dir = get_current_dir_name();
58  nodeOptions.setHomeDir(dir);
59  free(dir);
60  dir = nullptr;
61  }
62  nodeOptions.setTasksDataDir(nodeOptions.getHomeDir()+"/data");
63  nodeOptions.setTasksStatusDir(nodeOptions.getHomeDir()+"/status");
64  mkdir(nodeOptions.getTasksDataDir().c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
65  mkdir(nodeOptions.getTasksStatusDir().c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
66  return nodeOptions;
67 }
68 //-----------------------------------------------------------------------------
70 {
72  return Poco::File(fileName).exists();
73 }
74 //-----------------------------------------------------------------------------
75 //-----------------------------------------------------------------------------
77 {
78  const std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":1,\"cleanup\":0,\"time_max\":5000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{\"LINES\":\"80\",\"HOSTNAME\":\"node22\"}]},\"limits\":{\"proc_max\":400},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[{\"name\":\"file1\",\"data\":\"test content of file1\\n\",\"action\":13},{\"name\":\"file2\",\"data\":\"test content of file2\\n\",\"action\":1}]}";
79 // const std::string requestJson = "{\"session\":{\"type\":0,\"cleanup\":1,\"tmode\":1,\"time_max\":10000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{}]},\"command\":\"echo \\\"scale=4000; 4*a(1)\\\" | bc -l && ps xau && ls -la\",\"input\":\"\",\"files\":[]}";
80 
81  // set request for execute task
82  DRCEInputJsonMessage inputJsonMessage;
83  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtSetTaskExecute);
84  inputJsonMessage.setRequestData(requestJson);
85  inputJsonMessage.setRequestId(requestId);
86 
87  std::stringstream sourceJson;
88 // sourceJson << inputJsonMessage;
89  std::string json;
90  inputJsonMessage.serialize(json);
91  sourceJson.str(json);
92 
93 // std::cout << "source JSON: " << sourceJson.str() << std::endl;
94 
95  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
96  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
97  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
98  printSuccess("DRCEInputJsonMessage serialize");
99 
100  fObj.setNodeHost("NODE_HOST");
101  fObj.setNodePort("NODE_PORT");
102  fObj.setNodeName("NODE_NAME");
103  fObj.setTasksQueueDumpPeriod(2000);
104 
105  std::string resultJson = fObj.Process(sourceJson.str());
106 
107  ASSERT_FALSE(fObj.isError()) << printError(fObj);
108  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
109  ASSERT_TRUE(fObj.getErrorMsg().empty());
110  printSuccess("DRCEFunctionalObject Process");
111 
112 // std::cout << "result JSON: " << resultJson << std::endl;
113 
114  // check right result (unserialize and compare two objects)
115  DRCEOutputJsonMessage resultJsonMessage(resultJson);
116 
117  ASSERT_FALSE(resultJsonMessage.isError()) << printError(resultJsonMessage);
118  ASSERT_EQ(resultJsonMessage.getErrorCode(), NO_ERROR);
119  ASSERT_TRUE(resultJsonMessage.getErrorMsg().empty());
120  printSuccess("DRCEOutputJsonMessage unserialize");
121 
122  ASSERT_EQ(resultJsonMessage.getResultData().getItemsCount(), 1);
123  ASSERT_FALSE(resultJsonMessage.getResultData().getDataItem(0).getStdoutStream().empty());
124  ASSERT_TRUE(resultJsonMessage.getResultData().getDataItem(0).getStderrStream().empty());
125  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeName(), fObj.getNodeName());
126  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getExitStatus(), 0);
127  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getFilesCount(), 1);
128  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getFileItem(0).name, "file1");
129  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getFileItem(0).data, "test content of file1\n");
130  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getFileItem(0).actionType, 13);
131 
133  printSuccess("!!! Execute sync request");
134 }
135 //-----------------------------------------------------------------------------
137 {
138  std::string requestJson;
139  if (requestData.empty())
140  requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"time_max\":40000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{\"LINES\":\"80\",\"HOSTNAME\":\"node22\"}]},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[{\"name\":\"file1\",\"data\":\"test content of file1\\n\",\"action\":13},{\"name\":\"file2\",\"data\":\"test content of file2\\n\",\"action\":1}]}";
141  else
142  requestJson = requestData;
143 
144 // std::cout << "json: " << requestJson << std::endl;
145 
146  // set request for execute task
147  DRCEInputJsonMessage inputJsonMessage;
148  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtSetTaskExecute);
149  inputJsonMessage.setRequestData(requestJson);
150  inputJsonMessage.setRequestId(requestId);
151 
152  std::stringstream sourceJson;
153 // sourceJson << inputJsonMessage;
154  std::string json;
155  inputJsonMessage.serialize(json);
156  sourceJson.str(json);
157 
158 // std::cout << "source JSON: " << sourceJson.str() << std::endl;
159 
160  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
161  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
162  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
163  printSuccess("DRCEInputJsonMessage serialize");
164 
165  std::string resultJson = fObj.Process(sourceJson.str());
166 
167  ASSERT_FALSE(fObj.isError()) << printError(fObj);
168  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
169  ASSERT_TRUE(fObj.getErrorMsg().empty());
170  printSuccess("DRCEFunctionalObject Process");
171 
172  std::cout << "result JSON: " << resultJson << std::endl;
173 
174  // check right result (unserialize and compare two objects)
175  DRCEOutputJsonMessage resultJsonMessage(resultJson);
176 
177  ASSERT_FALSE(resultJsonMessage.isError()) << printError(resultJsonMessage);
178  ASSERT_EQ(resultJsonMessage.getErrorCode(), NO_ERROR);
179  ASSERT_TRUE(resultJsonMessage.getErrorMsg().empty());
180  printSuccess("DRCEOutputJsonMessage unserialize");
181 
182 // std::cout << "ItemsCount = " << resultJsonMessage.getResultData().getItemsCount() << std::endl;
183 // for (size_t i=0;i<resultJsonMessage.getResultData().getItemsCount();++i)
184 // {
185 // std::cout << "stderr: " << resultJsonMessage.getResultData().getDataItem(i).getStderrStream() << std::endl;
186 // std::cout << "stdout: " << resultJsonMessage.getResultData().getDataItem(i).getStdoutStream() << std::endl;
187 // }
188 
189 // DRCECleanupTest::cleanupTestData(fObj);
190  printSuccess("!!! Set task to async execute request");
191 }
192 //-----------------------------------------------------------------------------
194 {
195  std::string requestJson;
196  if (requestData.empty())
197  requestJson = "{\"session\":{\"type\":0,\"tmode\":1,\"time_max\":40000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[]},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[]}";
198  else
199  requestJson = requestData;
200 
201  // set request for execute task
202  DRCEInputJsonMessage inputJsonMessage;
203  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtSetTaskExecute);
204  inputJsonMessage.setRequestData(requestJson);
205  inputJsonMessage.setRequestId(requestId);
206 
207  std::stringstream sourceJson;
208  sourceJson << inputJsonMessage;
209 
210 // std::cout << "source JSON: " << sourceJson.str() << std::endl;
211 
212  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
213  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
214  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
215  printSuccess("DRCEInputJsonMessage serialize");
216 
217  std::string resultJson = fObj.Process(sourceJson.str());
218 
219  std::cout << "SetTaskExecute resultJson: " << resultJson << std::endl;
220 
221  ASSERT_FALSE(fObj.isError()) << printError(fObj);
222  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
223  ASSERT_TRUE(fObj.getErrorMsg().empty());
224  printSuccess("DRCEFunctionalObject Process");
225 
226  Poco::Thread::sleep(5);
227 /*
228  while (fObj.getAsyncTasksCount() || fObj.hasTask(requestId))
229  {
230  std::cout << "isExistTask on HDD: " << std::boolalpha << DRCETaskThreadingModeTest::isExistTask(fObj, requestId) << std::endl;
231 
232  Poco::Thread::sleep(10000);
233  std::cout << "wait the end ...." << std::endl;
234  }
235  Poco::Thread::sleep(5000);
236 */
237  // Check result use Simple flag
238  std::string json;
239  DRCETaskRequestCheckState taskRequestCheckState;
240  taskRequestCheckState.setCheckType(DRCETaskRequestCheckState::CheckType::ctSimple);
241  taskRequestCheckState.serialize(json);
242  ASSERT_FALSE(taskRequestCheckState.isError());
243  printSuccess("DRCETaskRequestCheckState serialize");
244 
245  std::cout << "json: " << json << std::endl;
246 
247  // set request for execute task
248  inputJsonMessage.clear();
249  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtCheckTaskState);
250  inputJsonMessage.setRequestData(json);
251  inputJsonMessage.setRequestId(requestId);
252 
253  sourceJson.str("");
254  sourceJson << inputJsonMessage;
255 
256 // std::cout << "source JSON: " << sourceJson.str() << std::endl;
257 
258  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
259  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
260  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
261  printSuccess("DRCEInputJsonMessage serialize");
262 
263  resultJson = fObj.Process(sourceJson.str());
264 
265  ASSERT_FALSE(fObj.isError()) << printError(fObj);
266  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
267  ASSERT_TRUE(fObj.getErrorMsg().empty());
268  printSuccess("DRCEFunctionalObject Process");
269 
270  std::cout << "CheckTaskState result JSON: " << resultJson << std::endl;
271 
272  // check right result (unserialize and compare two objects)
273  DRCEOutputJsonMessage resultJsonMessage(resultJson);
274 
275  ASSERT_FALSE(resultJsonMessage.isError()) << printError(resultJsonMessage);
276  ASSERT_EQ(resultJsonMessage.getErrorCode(), NO_ERROR);
277  ASSERT_TRUE(resultJsonMessage.getErrorMsg().empty());
278  printSuccess("DRCEOutputJsonMessage unserialize");
279 
280  ASSERT_EQ(resultJsonMessage.getResultData().getItemsCount(), 1);
281  ASSERT_TRUE(resultJsonMessage.getResultData().getDataItem(0).getStdoutStream().empty());
282  ASSERT_TRUE(resultJsonMessage.getResultData().getDataItem(0).getStderrStream().empty());
283  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeName(), fObj.getNodeName());
284  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeHost(), fObj.getNodeHost());
285  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodePort(), fObj.getNodePort());
286 
287  printSuccess("Check state task use Simple flag");
288 
289  // Check result alredy ended task
290  taskRequestCheckState.clear();
291  taskRequestCheckState.setCheckType(DRCETaskRequestCheckState::CheckType::ctExtended);
292  taskRequestCheckState.serialize(json);
293  ASSERT_FALSE(taskRequestCheckState.isError());
294  printSuccess("DRCETaskRequestCheckState serialize");
295 
296 // std::cout << "json: " << json << std::endl;
297  //**********************
298  while (fObj.getAsyncTasksCount() || fObj.hasTask(requestId))
299  {
300  std::cout << "isExistTask on HDD: " << std::boolalpha << DRCETaskThreadingModeTest::isExistTask(fObj, requestId) << std::endl;
301 
302  Poco::Thread::sleep(10000);
303  std::cout << "wait the end ...." << std::endl;
304  }
305  //***********************
306 /*
307  // set request for execute task
308  inputJsonMessage.clear();
309  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtCheckTaskState);
310  inputJsonMessage.setRequestData(json);
311  inputJsonMessage.setRequestId(requestId);
312 
313  sourceJson.str("");
314  sourceJson << inputJsonMessage;
315 
316 // std::cout << "source JSON: " << sourceJson.str() << std::endl;
317 
318  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
319  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
320  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
321  printSuccess("DRCEInputJsonMessage serialize");
322 
323  resultJson = fObj.Process(sourceJson.str());
324 
325  ASSERT_FALSE(fObj.isError()) << printError(fObj);
326  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
327  ASSERT_TRUE(fObj.getErrorMsg().empty());
328  printSuccess("DRCEFunctionalObject Process");
329 
330 // std::cout << "result JSON: " << resultJson << std::endl;
331 
332  // check right result (unserialize and compare two objects)
333  resultJsonMessage.unserialize(resultJson);
334 
335  ASSERT_FALSE(resultJsonMessage.isError()) << printError(resultJsonMessage);
336  ASSERT_EQ(resultJsonMessage.getErrorCode(), NO_ERROR);
337  ASSERT_TRUE(resultJsonMessage.getErrorMsg().empty());
338  printSuccess("DRCEOutputJsonMessage unserialize");
339 
340  ASSERT_EQ(resultJsonMessage.getResultData().getItemsCount(), 1);
341  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getStdoutStream(), "123\nabc\ndef\n");
342  ASSERT_TRUE(resultJsonMessage.getResultData().getDataItem(0).getStderrStream().empty());
343  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getExitStatus(), 0);
344  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeName(), fObj.getNodeName());
345  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeHost(), fObj.getNodeHost());
346  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodePort(), fObj.getNodePort());
347 */
349  printSuccess("!!! Check state task request after execute");
350 }
351 //-----------------------------------------------------------------------------
353 {
354  const std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"time_max\":40000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[]},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[]}";
355 
356  // set request for execute task
357  DRCEInputJsonMessage inputJsonMessage;
358  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtSetTaskExecute);
359  inputJsonMessage.setRequestData(requestJson);
360  inputJsonMessage.setRequestId(requestId);
361 
362  std::stringstream sourceJson;
363  sourceJson << inputJsonMessage;
364 
365 // std::cout << "source JSON: " << sourceJson.str() << std::endl;
366 
367  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
368  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
369  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
370  printSuccess("DRCEInputJsonMessage serialize");
371 
372  std::string resultJson = fObj.Process(sourceJson.str());
373 
374  ASSERT_FALSE(fObj.isError()) << printError(fObj);
375  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
376  ASSERT_TRUE(fObj.getErrorMsg().empty());
377  printSuccess("DRCEFunctionalObject Process");
378 
379 // std::cout << "resultJson: " << resultJson << std::endl;
380 
381  while (fObj.getAsyncTasksCount() || fObj.hasTask(requestId))
382  {
383  Poco::Thread::sleep(10000);
384  std::cout << "wait the end ...." << std::endl;
385  }
386  Poco::Thread::sleep(5000);
387 
388  std::string json;
389  DRCETaskRequestGetData taskRequestGetData;
390  taskRequestGetData.setFetchType(DRCETaskRequestGetData::FetchType::ftNotDeleteDataAfterFetch);
391  taskRequestGetData.serialize(json);
392  ASSERT_FALSE(taskRequestGetData.isError());
393  printSuccess("DRCETaskRequestGetData serialize");
394 
395 // std::cout << "json: " << json << std::endl;
396 
397  // set request for execute task
398  inputJsonMessage.clear();
399  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtGetTaskData);
400  inputJsonMessage.setRequestData(json);
401  inputJsonMessage.setRequestId(requestId);
402 
403  sourceJson.str("");
404  sourceJson << inputJsonMessage;
405 
406 // std::cout << "source JSON: " << sourceJson.str() << std::endl;
407 
408  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
409  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
410  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
411  printSuccess("DRCEInputJsonMessage serialize");
412 
413  resultJson = fObj.Process(sourceJson.str());
414 
415  ASSERT_FALSE(fObj.isError()) << printError(fObj);
416  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
417  ASSERT_TRUE(fObj.getErrorMsg().empty());
418  printSuccess("DRCEFunctionalObject Process");
419 
420 // std::cout << "result JSON: " << resultJson << std::endl;
421 
422  // check right result (unserialize and compare two objects)
423  DRCEOutputJsonMessage resultJsonMessage(resultJson);
424 
425  ASSERT_FALSE(resultJsonMessage.isError()) << printError(resultJsonMessage);
426  ASSERT_EQ(resultJsonMessage.getErrorCode(), NO_ERROR);
427  ASSERT_TRUE(resultJsonMessage.getErrorMsg().empty());
428  printSuccess("DRCEOutputJsonMessage unserialize");
429 
430  ASSERT_EQ(resultJsonMessage.getResultData().getItemsCount(), 1);
431  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getStdoutStream(), "123\nabc\ndef\n");
432  ASSERT_TRUE(resultJsonMessage.getResultData().getDataItem(0).getStderrStream().empty());
433  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getExitStatus(), 0);
434  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeName(), fObj.getNodeName());
435  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeHost(), fObj.getNodeHost());
436  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodePort(), fObj.getNodePort());
437 
439  printSuccess("!!! Get data task request after execute");
440 }
441 //-----------------------------------------------------------------------------
444 {
445 // std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"time_max\":80000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{}]},\"command\":\"echo \\\"scale=400; 4*a(1)\\\" | bc -l && sleep 10 && ps xau\",\"input\":\"\",\"files\":[{\"name\":\"file1\",\"data\":\"test content of file1\\n\",\"action\":13},{\"name\":\"file2\",\"data\":\"test content of file2\\n\",\"action\":1}]}";
446  std::string requestJson = "{\"session\":{\"type\":0,\"cleanup\":0,\"tmode\":2,\"time_max\":80000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{}]},\"command\":\"echo \\\"scale=40; 4*a(1)\\\" | bc -l && /home/alexander/workspace/sleeper/Debug/sleeper 10 && echo Hello && /home/alexander/workspace/sleeper/Debug/sleeper 10\",\"input\":\"\",\"files\":[]}";
447 
448  // set request for execute task
449  DRCEInputJsonMessage inputJsonMessage;
450  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtSetTaskExecute);
451  inputJsonMessage.setRequestData(requestJson);
452  inputJsonMessage.setRequestId(requestId);
453 
454  std::stringstream sourceJson;
455  sourceJson << inputJsonMessage;
456 
457 // std::cout << "source JSON: " << sourceJson.str() << std::endl;
458 
459  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
460  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
461  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
462  printSuccess("DRCEInputJsonMessage serialize");
463 
464  std::string resultJson = fObj.Process(sourceJson.str());
465 
466 // std::cout << "result JSON: " << resultJson << std::endl;
467 
468  ASSERT_FALSE(fObj.isError()) << printError(fObj);
469  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
470  ASSERT_TRUE(fObj.getErrorMsg().empty());
471  printSuccess("DRCEFunctionalObject Process");
472 
473  Poco::Thread::sleep(5000);
474 
475  std::string json;
476  DRCETaskRequestTerminate taskRequestTerminate;
477  taskRequestTerminate.setTaskId(requestId);
478  taskRequestTerminate.setAlgorithmType(algorithmType);
479  taskRequestTerminate.setDelayValue(1000);
480  taskRequestTerminate.setRepeatValue(3);
481  taskRequestTerminate.setSignalValue(9);
482  taskRequestTerminate.setCleanupFlag(cleanupFlag);
483  taskRequestTerminate.serialize(json);
484  ASSERT_FALSE(taskRequestTerminate.isError());
485  printSuccess("DRCETaskRequestTerminate serialize");
486 
487 // std::cout << "json: " << json << std::endl;
488 
489  // set request for execute task
490  inputJsonMessage.clear();
491  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtTerminateTask);
492  inputJsonMessage.setRequestData(json);
493  inputJsonMessage.setRequestId(requestId);
494 
495  sourceJson.str("");
496  sourceJson << inputJsonMessage;
497 
498 // std::cout << "source JSON: " << sourceJson.str() << std::endl;
499 
500  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
501  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
502  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
503  printSuccess("DRCEInputJsonMessage serialize");
504 
505  resultJson = fObj.Process(sourceJson.str());
506 
507  ASSERT_FALSE(fObj.isError()) << printError(fObj);
508  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
509  ASSERT_TRUE(fObj.getErrorMsg().empty());
510  printSuccess("DRCEFunctionalObject Process");
511 
512  std::cout << "result JSON: " << resultJson << std::endl;
513 
514  // check right result (unserialize and compare two objects)
515  DRCEOutputJsonMessage resultJsonMessage(resultJson);
516 
517  ASSERT_FALSE(resultJsonMessage.isError()) << printError(resultJsonMessage);
518  ASSERT_EQ(resultJsonMessage.getErrorCode(), NO_ERROR);
519  ASSERT_TRUE(resultJsonMessage.getErrorMsg().empty());
520  printSuccess("DRCEOutputJsonMessage unserialize");
521 
522  ASSERT_EQ(resultJsonMessage.getResultData().getItemsCount(), 1);
523  ASSERT_TRUE(resultJsonMessage.getResultData().getDataItem(0).getStdoutStream().empty());
524  ASSERT_TRUE(resultJsonMessage.getResultData().getDataItem(0).getStderrStream().empty());
525  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getExitStatus(), (128+taskRequestTerminate.getSignalValue()));
526  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeName(), fObj.getNodeName());
527  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeHost(), fObj.getNodeHost());
528  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodePort(), fObj.getNodePort());
529  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getState(), DRCETaskRequest::TaskState::TERMINATED);
530 
531  while (fObj.getAsyncTasksCount())
532  {
533  Poco::Thread::sleep(5000);
534  std::cout << "wait the end ...." << std::endl;
535  }
536 
538  printSuccess("!!! Terminate task request");
539 }
540 //-----------------------------------------------------------------------------
542 {
543  const std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"time_max\":40000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[]},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[]}";
544 
545  // set request for execute task
546  DRCEInputJsonMessage inputJsonMessage;
547  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtSetTaskExecute);
548  inputJsonMessage.setRequestData(requestJson);
549  inputJsonMessage.setRequestId(requestId);
550 
551  std::stringstream sourceJson;
552  sourceJson << inputJsonMessage;
553 
554 // std::cout << "source JSON: " << sourceJson.str() << std::endl;
555 
556  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
557  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
558  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
559  printSuccess("DRCEInputJsonMessage serialize");
560 
561  std::string resultJson = fObj.Process(sourceJson.str());
562 
563  ASSERT_FALSE(fObj.isError()) << printError(fObj);
564  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
565  ASSERT_TRUE(fObj.getErrorMsg().empty());
566  printSuccess("DRCEFunctionalObject Process");
567 
568  while (fObj.getAsyncTasksCount())
569  {
570  Poco::Thread::sleep(5000);
571  std::cout << "wait the end ...." << std::endl;
572  }
573  Poco::Thread::sleep(2000);
574 
575  std::string json;
576  DRCETaskRequestDeleteData taskRequestDeleteData;
577  taskRequestDeleteData.serialize(json);
578  ASSERT_FALSE(taskRequestDeleteData.isError());
579  printSuccess("DRCETaskRequestDeleteData serialize");
580 
581 // std::cout << "json: " << json << std::endl;
582 
583  // set request for execute task
584  inputJsonMessage.clear();
585  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtDeleteTaskData);
586  inputJsonMessage.setRequestData(json);
587  inputJsonMessage.setRequestId(requestId);
588 
589  sourceJson.str("");
590  sourceJson << inputJsonMessage;
591 
592 // std::cout << "source JSON: " << sourceJson.str() << std::endl;
593 
594  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
595  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
596  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
597  printSuccess("DRCEInputJsonMessage serialize");
598 
599  resultJson = fObj.Process(sourceJson.str());
600 
601  ASSERT_FALSE(fObj.isError()) << printError(fObj);
602  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
603  ASSERT_TRUE(fObj.getErrorMsg().empty());
604  printSuccess("DRCEFunctionalObject Process");
605 
606 // std::cout << "result JSON: " << resultJson << std::endl;
607 
608  // check right result (unserialize and compare two objects)
609  DRCEOutputJsonMessage resultJsonMessage(resultJson);
610 
611  ASSERT_FALSE(resultJsonMessage.isError()) << printError(resultJsonMessage);
612  ASSERT_EQ(resultJsonMessage.getErrorCode(), NO_ERROR);
613  ASSERT_TRUE(resultJsonMessage.getErrorMsg().empty());
614  printSuccess("DRCEOutputJsonMessage unserialize");
615 
616  if (!resultJsonMessage.getResultData().getDataItem(0).getErrorMessage().empty())
617  std::cout << "getErrorMessage: " << resultJsonMessage.getResultData().getDataItem(0).getErrorMessage() << std::endl;
618 
619  ASSERT_EQ(resultJsonMessage.getResultData().getItemsCount(), 1);
620  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getErrorCode(), NO_ERROR);
621  ASSERT_TRUE(resultJsonMessage.getResultData().getDataItem(0).getErrorMessage().empty());
622  ASSERT_TRUE(resultJsonMessage.getResultData().getDataItem(0).getStdoutStream().empty());
623  ASSERT_TRUE(resultJsonMessage.getResultData().getDataItem(0).getStderrStream().empty());
624  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getState(), static_cast<unsigned int>(HCE::drce::DRCETaskRequest::TaskState::DELETED))
625  << "State: " << resultJsonMessage.getResultData().getDataItem(0).getState();
626  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getRequestId(), requestId);
627  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getRequestType(), HCE::drce::DRCETaskRequest::RequestType::rtDeleteTaskData);
628  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getFilesCount(), 0);
629  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getExitStatus(), 0);
630  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeName(), fObj.getNodeName());
631  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeHost(), fObj.getNodeHost());
632  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodePort(), fObj.getNodePort());
633 
634 // std::cout << "log: " << fObj.logMsg() << std::endl;
636  printSuccess("!!! Delete task data request");
637 }
638 //-----------------------------------------------------------------------------
639 //-----------------------------------------------------------------------------
641 {
642  fObj.setTasksQueueDumpPeriod(1000);
643  fObj.setResourceMonitorTimePeriod(1000);
644  fObj.setNodeHost("NODE_HOST");
645  fObj.setNodePort("NODE_PORT");
646  fObj.setNodeName("NODE_NAME");
647  fObj.setNotificationFunctor(testFunctor);
648 // DRCETaskThreadingModeTest::testSetTaskToExecuteRequest(fObj, requestId);
649 
650  const std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"time_max\":90000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{\"LINES\":\"80\",\"HOSTNAME\":\"node22\"}]},\"command\":\"echo \\\"scale=4000; 4*a(1)\\\" | bc -l && sleep 5 && ps xau\",\"input\":\"\",\"files\":[]}";
651 
652  const unsigned int repeat = 1;
653  fObj.setMaxThreadCount(repeat);
654  for (size_t i=0;i<repeat;++i)
655  {
656  std::cout << ">>>>> " << i << ") test enter !!!!!\n";
657  DRCETaskThreadingModeTest::testSetTaskToExecuteRequest(fObj, (requestId+i), requestJson);
658 // DRCETaskThreadingModeTest::testExecuteSyncRequest(fObj, (requestId+i));
659  std::cout << ">>>>> " << i << ") test leave !!!!!\n";
660 
661  std::cout << "fObj.getAsyncTasksCount(): " << fObj.getAsyncTasksCount() << std::endl;
662  Poco::Thread::sleep(500);
663  }
664 // Poco::Thread::sleep(2000);
665  std::stringstream logMsg;
666  while (fObj.getAsyncTasksCount())
667  {
668  Poco::Thread::sleep(5000);
669 // logMsg << fObj.logMsg();
670 // std::cout << "LOG: " << fObj.logMsg() << std::endl;
671  }
672  Poco::Thread::sleep(2000);
673 // std::cout << "log: " << logMsg.str() << std::endl;
674 
675 // ASSERT_EQ(count, (2*repeat));
676 // ASSERT_FALSE(logMsg.str().empty());
677 
679  printSuccess("!!! Test Execute Notification Functor");
680 }
681 //-----------------------------------------------------------------------------
682 //-----------------------------------------------------------------------------
684 {
685  std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":1,\"time_max\":8000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{}]},\"command\":\"echo \\\"Hello\\\" && sleep 10 && ls -la && sleep 10 && ps auxf\",\"input\":\"\",\"files\":[]}";
686 
687  fObj.setNodeHost("NODE_HOST");
688  fObj.setNodePort("NODE_PORT");
689  fObj.setNodeName("NODE_NAME");
690  fObj.setNotificationFunctor(testFunctor);
691 
692  DRCETaskThreadingModeTest::testSetTaskToExecuteRequest(fObj, requestId, requestJson);
693 
694  std::stringstream logMsg;
695  while (fObj.getAsyncTasksCount())
696  {
697  Poco::Thread::sleep(1000);
698 // logMsg << fObj.logMsg();
699  }
700 // std::cout << "log: " << logMsg.str() << std::endl;
701 // ASSERT_FALSE(logMsg.str().empty());
702 
704  printSuccess("!!! Test Terminate Expired Sync Task");
705 }
706 //-----------------------------------------------------------------------------
708 {
709  std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"cleanup\":1,\"time_max\":8000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{}]},\"command\":\"echo \\\"Hello\\\" && sleep 20 && ls -la && sleep 10 && ps auxf\",\"input\":\"\",\"files\":[]}";
710 
711  fObj.setNodeHost("NODE_HOST");
712  fObj.setNodePort("NODE_PORT");
713  fObj.setNodeName("NODE_NAME");
714  fObj.setNotificationFunctor(testFunctor);
715 
716  for (size_t i=0;i<3;++i)
717  DRCETaskThreadingModeTest::testSetTaskToExecuteRequest(fObj, requestId+i, requestJson);
718 
719  std::stringstream logMsg;
720  while (fObj.getAsyncTasksCount())
721  {
722  Poco::Thread::sleep(1000);
723 // logMsg << fObj.logMsg();
724  }
725 // std::cout << "log: " << logMsg.str() << std::endl;
726 // ASSERT_FALSE(logMsg.str().empty());
727 
729  printSuccess("!!! Test Terminate Expired Async Task");
730 }
731 //-----------------------------------------------------------------------------
732 //-----------------------------------------------------------------------------
734 {
735  DRCETaskRequestSetExecute taskRequestSetExecute;
736 
738  sessionOptions.sessionType = SessionOptions::SessionType::stHostShell;
739  sessionOptions.homeDir = fObj.getHomeDir();
740  sessionOptions.tmode = threadMode;
741  sessionOptions.timeMax = timeMax;
742  sessionOptions.cleanup = SessionOptions::CleanupFlag::cfNotDelete;
743 
744  taskRequestSetExecute.setSessionOptions(std::forward<SessionOptions>(sessionOptions));
745  taskRequestSetExecute.setCommandLine("sort");
746  taskRequestSetExecute.setInputStream("def\nabc\n123\n");
747 
748  std::stringstream sourceJson;
749  sourceJson << taskRequestSetExecute;
750 
751  ASSERT_FALSE(taskRequestSetExecute.isError()) << printError(taskRequestSetExecute);
752  ASSERT_EQ(taskRequestSetExecute.getErrorCode(), NO_ERROR);
753  ASSERT_TRUE(taskRequestSetExecute.getErrorMsg().empty());
754  printSuccess("DRCETaskRequestSetExecute serialize");
755 
756  DRCEInputJsonMessage inputJsonMessage;
757  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtSetTaskExecute);
758  inputJsonMessage.setRequestData(sourceJson.str());
759  inputJsonMessage.setRequestId(requestId);
760 
761  sourceJson.str("");
762  sourceJson << inputJsonMessage;
763 
764  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
765  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
766  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
767  printSuccess("DRCEInputJsonMessage serialize");
768 
769 // std::cout << "IN JSON: " << sourceJson.str() << std::endl;
770 
771  std::string resultJson = fObj.Process(sourceJson.str());
772 
773  ASSERT_FALSE(fObj.isError()) << printError(fObj);
774  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
775  ASSERT_TRUE(fObj.getErrorMsg().empty());
776  printSuccess("DRCEFunctionalObject Process");
777 
778 // std::cout << "OUT JSON: " << resultJson << std::endl;
779 
780  DRCEOutputJsonMessage resultJsonMessage(resultJson);
781 
782  ASSERT_FALSE(resultJsonMessage.isError()) << printError(resultJsonMessage);
783  ASSERT_EQ(resultJsonMessage.getErrorCode(), NO_ERROR);
784  ASSERT_TRUE(resultJsonMessage.getErrorMsg().empty());
785  printSuccess("DRCEOutputJsonMessage unserialize");
786 
787  ASSERT_EQ(resultJsonMessage.getResultData().getItemsCount(), 1);
788  if (threadMode==SessionOptions::ThreadMode::tmSync)
789  {
790  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getStdoutStream(), "123\nabc\ndef\n");
791  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getState(), DRCETaskRequest::TaskState::FINISHED);
792  }
793  else
794  {
795  if (resultJsonMessage.getResultData().getDataItem(0).getState() == DRCETaskRequest::TaskState::FINISHED)
796  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getStdoutStream(), "123\nabc\ndef\n");
797  else
798  {
799  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getStdoutStream(), "");
800  ASSERT_TRUE(resultJsonMessage.getResultData().getDataItem(0).getState()==DRCETaskRequest::TaskState::SET_AS_NEW ||
801  resultJsonMessage.getResultData().getDataItem(0).getState()==DRCETaskRequest::TaskState::IN_PROGRESS);
802  }
803  }
804 
805  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getStderrStream(), "");
806  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getErrorCode(), 0);
807  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getErrorMessage(), "");
808  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getExitStatus(), 0);
809  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeHost(), fObj.getNodeHost());
810  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeName(), fObj.getNodeName());
811  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodePort(), fObj.getNodePort());
812  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getRequestId(), requestId);
813  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getRequestType(), DRCETaskRequest::RequestType::rtSetTaskExecute);
814 
815  if (threadMode==SessionOptions::ThreadMode::tmAsync)
816  {
817  while (fObj.getAsyncTasksCount() || fObj.hasTask(requestId))
818  {
819  Poco::Thread::sleep(5000);
820  std::cout << "wait the end ...." << std::endl;
821  }
822  Poco::Thread::sleep(2000);
823 
824  DRCEResultDataItem resultDataItem = DRCECleanupTest::extractResultData(requestId, fObj);
825 
826  ASSERT_EQ(resultDataItem.getStdoutStream(), "123\nabc\ndef\n");
827  }
829  printSuccess("Test Request Time Limits");
830 }
831 //-----------------------------------------------------------------------------
832 //-----------------------------------------------------------------------------
834 {
835  const std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"cleanup\":1,\"time_max\":120000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[]},\"command\":\"echo \\\"scale=4000; 4*a(1)\\\" | bc -l && sleep 60 && ps xau\",\"input\":\"\",\"files\":[]}";
836 
837  fObj.setMaxThreadCount(repeatCount);
838  fObj.setResourceMonitorTimePeriod(10000);
839 // fObj.setLogStream(std::cout);
840 
841  Poco::Timestamp now;
842 
843  for (unsigned int i=0;i<repeatCount;++i)
844  {
845 // std::cout << i << ") test enter\n";
846  DRCETaskThreadingModeTest::testSetTaskToExecuteRequest(fObj, (i+requestId), requestJson);
847 // std::cout << i << ") test leave\n===========\n";
848 // std::cout << "fObj.getAsyncTasksCount(): " << fObj.getAsyncTasksCount() << std::endl;
849  }
850 
851  while (fObj.getAsyncTasksCount())
852  {
853  Poco::Thread::sleep(5000);
854 // std::cout << "wait the end ...." << std::endl;
855  }
856  Poco::Thread::sleep(2000);
857  Poco::Timestamp::TimeDiff diff = now.elapsed();
858 
859  std::cout << "Count tasks = " << repeatCount << std::endl;
860  std::cout << "All time: " << (diff/1000) << " msec." << std::endl;
862 }
863 //-----------------------------------------------------------------------------
864 void DRCETaskThreadingModeTest::testMaxThreadLimits(DRCEFunctionalObject& fObj, unsigned int requestId, unsigned int maxAllowedThreadsCount, unsigned int repeatCount)
865 {
866 // const std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"time_max\":40000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[]},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[]}";
867  const std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"cleanup\":1,\"time_max\":40000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[]},\"command\":\"sleep 20\",\"input\":\"\",\"files\":[]}";
868 
869 // fObj.setNotificationFunctor(testFunctor);
870 
871  fObj.setMaxThreadCount(maxAllowedThreadsCount);
872  Poco::Timestamp now;
873 
874  for (unsigned int i=0;i<repeatCount;++i)
875  {
876  std::cout << i << ") test enter\n";
877  DRCETaskThreadingModeTest::testSetTaskToExecuteRequest(fObj, (i+requestId), requestJson);
878  std::cout << i << ") test leave\n===========\n";
879  std::cout << "fObj.getAsyncTasksCount(): " << fObj.getAsyncTasksCount() << std::endl;
880  }
881 
882  while (fObj.getAsyncTasksCount())
883  {
884  Poco::Thread::sleep(5000);
885  std::cout << "wait the end ...." << std::endl;
886  }
887  Poco::Thread::sleep(2000);
888 // std::cout << "log: " << fObj.logMsg() << std::endl;
889 
890  Poco::Timestamp::TimeDiff diff = now.elapsed();
891 
892  std::cout << "All threads = " << repeatCount << " each can sleep 20 sec."<< std::endl;
893  std::cout << "All time: " << (diff/1000) << " msec." << std::endl;
895 }
896 //-----------------------------------------------------------------------------
897 //-----------------------------------------------------------------------------
899 {
901  DRCEFunctionalObject fObj(nodeOptions.getNodeName(), nodeOptions.getHomeDir(), nodeOptions.getTasksDataDir(), nodeOptions.getTasksStatusDir());
902 
903 // const std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":1,\"time_max\":0,\"cleanup\":1,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{\"LINES\":\"80\",\"HOSTNAME\":\"node22\"}]},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[{\"name\":\"file1\",\"data\":\"test content of file1\\n\",\"action\":13},{\"name\":\"file2\",\"data\":\"test content of file2\\n\",\"action\":1}]}";
904  const std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"time_max\":40000,\"cleanup\":1,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{\"LINES\":\"80\",\"HOSTNAME\":\"node22\"}]},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[{\"name\":\"file1\",\"data\":\"test content of file1\\n\",\"action\":13},{\"name\":\"file2\",\"data\":\"test content of file2\\n\",\"action\":1}]}";
905 // const std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"time_max\":40000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{\"LINES\":\"80\",\"HOSTNAME\":\"node22\"}]},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[{\"name\":\"file1\",\"data\":\"test content of file1\\n\",\"action\":13},{\"name\":\"file2\",\"data\":\"test content of file2\\n\",\"action\":1}]}";
907 
908  while (fObj.getAsyncTasksCount())
909  {
910  Poco::Thread::sleep(2000);
911  std::cout << "wait the end ...." << std::endl;
912  }
913  Poco::Thread::sleep(5000);
914 
915  bool res = DRCETaskThreadingModeTest::isExistTask(fObj, 567);
916  ASSERT_EQ(res, false);
917 
919  printSuccess("!!! Test request local shell async cleanup");
920 }
921 //-----------------------------------------------------------------------------
923 {
925  DRCEFunctionalObject fObj(nodeOptions.getNodeName(), nodeOptions.getHomeDir(), nodeOptions.getTasksDataDir(), nodeOptions.getTasksStatusDir());
926  fObj.setNodeHost("NODE_HOST");
927  fObj.setNodePort("NODE_PORT");
928  fObj.setNodeName("NODE_NAME");
929  fObj.setTasksQueueDumpPeriod(1000);
930  const unsigned int taskId = 567;
931 
932  DRCETaskThreadingModeTest::testTerminateTaskRequest(fObj, taskId, DRCETaskRequestTerminate::AlgorithmType::atDefault, DRCETaskRequestTerminate::CleanupFlag::cfNotDelete);
933  bool res = DRCETaskThreadingModeTest::isExistTask(fObj, taskId);
934  ASSERT_EQ(res, true);
935 
936  DRCETaskThreadingModeTest::testTerminateTaskRequest(fObj, taskId, DRCETaskRequestTerminate::AlgorithmType::atDefault, DRCETaskRequestTerminate::CleanupFlag::cfDelete);
937  int waitRepeat = 10;
938  while (res != false && --waitRepeat>0)
939  {
940  Poco::Thread::sleep(5000);
941  res = DRCETaskThreadingModeTest::isExistTask(fObj, taskId);
942  std::cout << "task " << taskId << " awhile exist...\n";
943  }
944  ASSERT_EQ(res, false);
945 
947  printSuccess("!!! Test Terminate task request with cleanup");
948 }
949 //-----------------------------------------------------------------------------
950 //-----------------------------------------------------------------------------
952 {
954  DRCEFunctionalObject fObj(nodeOptions.getNodeName(), nodeOptions.getHomeDir(), nodeOptions.getTasksDataDir(), nodeOptions.getTasksStatusDir());
955  fObj.setNodeHost("NODE_HOST");
956  fObj.setNodePort("NODE_PORT");
957  fObj.setNodeName("NODE_NAME");
958 
959  fObj.setNotificationFunctor(testFunctor);
960  fObj.setTasksQueueDumpPeriod(1000);
961 
962  unsigned int taskId = 567;
963 
964  DRCETaskThreadingModeTest treadingModetest;
965  treadingModetest.setNotificationFunctor(treadingModetest);
966  fObj.setNotificationFunctor(treadingModetest);
967 
969 
970 // std::string requestJson = "{\"session\":{\"type\":0,\"cleanup\":0,\"tmode\":2,\"time_max\":200,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{}]},\"command\":\"/home/alexander/workspace/sleeper/Debug/sleeper 10\",\"input\":\"\",\"files\":[]}";
971 
972 // const std::string requestJson = "{\"session\":{\"type\":0,\"cleanup\":0,\"tmode\":2,\"time_max\":10000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{\"LINES\":\"120\",\"PAGES\":\"52\"}]},\"command\":\"env\",\"input\":\"\",\"files\":[]}";
973 // const std::string enviroments = "{\"DRCE_NODE\":\"1\",\"DRCE_NODE_COUNT\":\"4\"}";
974 // fObj.setEnvironments(enviroments);
975 
977 
978  while (fObj.getAsyncTasksCount())
979  {
980  Poco::Thread::sleep(5000);
981  std::cout << "wait the end ...." << std::endl;
982  }
983  Poco::Thread::sleep(5000);
984 // std::cout << "log: " << fObj.logMsg() << std::endl;
985 
987 
989 
991 
992  DRCETaskThreadingModeTest::testTerminateTaskRequest(fObj, taskId, DRCETaskRequestTerminate::AlgorithmType::atDefault, DRCETaskRequestTerminate::CleanupFlag::cfDelete);
993  DRCETaskThreadingModeTest::testTerminateTaskRequest(fObj, taskId, DRCETaskRequestTerminate::AlgorithmType::atCustom);
994 
996 
999 
1002 
1003  DRCETaskThreadingModeTest::testTaskRequestTimeLimits(fObj, taskId, SessionOptions::ThreadMode::tmSync, 40000);
1004  DRCETaskThreadingModeTest::testTaskRequestTimeLimits(fObj, taskId, SessionOptions::ThreadMode::tmAsync, 40000);
1005  DRCETaskThreadingModeTest::testTaskRequestTimeLimits(fObj, taskId, SessionOptions::ThreadMode::tmSync, 0);
1006  DRCETaskThreadingModeTest::testTaskRequestTimeLimits(fObj, taskId, SessionOptions::ThreadMode::tmAsync, 0);
1007 
1008  DRCETaskThreadingModeTest::testTaskRequestExpiredTime(fObj, taskId, SessionOptions::ThreadMode::tmSync, SessionOptions::CleanupFlag::cfNotDelete);
1009  DRCETaskThreadingModeTest::testTaskRequestExpiredTime(fObj, taskId, SessionOptions::ThreadMode::tmAsync, SessionOptions::CleanupFlag::cfNotDelete);
1010  DRCETaskThreadingModeTest::testTaskRequestExpiredTime(fObj, taskId, SessionOptions::ThreadMode::tmSync, SessionOptions::CleanupFlag::cfDelete);
1011  DRCETaskThreadingModeTest::testTaskRequestExpiredTime(fObj, taskId, SessionOptions::ThreadMode::tmAsync, SessionOptions::CleanupFlag::cfDelete);
1012 }
1013 //-----------------------------------------------------------------------------
1015 {
1017  DRCECleanupTest::cleanupTestData(nodeOptions);
1018  DRCEFunctionalObject fObj(nodeOptions.getNodeName(), nodeOptions.getHomeDir(), nodeOptions.getTasksDataDir(), nodeOptions.getTasksStatusDir());
1019 
1020  unsigned int requestCount = 10;
1021  DRCEStressTest::testMultipleRequests(fObj, SessionOptions::SessionType::stHostShell, requestCount);
1022  DRCEStressTest::testRequestsTimeDelay(fObj, SessionOptions::SessionType::stHostShell, requestCount, requestCount);
1023  DRCEStressTest::testResourceLimits(fObj, SessionOptions::SessionType::stHostShell, SessionOptions::ThreadMode::tmAsync);
1024  DRCEStressTest::testResourceLimits(fObj, SessionOptions::SessionType::stHostShell, SessionOptions::ThreadMode::tmSync);
1025  DRCEStressTest::testResourceUsageLimits(fObj, SessionOptions::SessionType::stHostShell, SessionOptions::ThreadMode::tmAsync);
1026  DRCEStressTest::testResourceUsageLimits(fObj, SessionOptions::SessionType::stHostShell, SessionOptions::ThreadMode::tmSync);
1027 
1028  DRCECleanupTest::cleanupTestData(nodeOptions);
1029 }
1030 //-----------------------------------------------------------------------------
1032 {
1034  DRCECleanupTest::cleanupTestData(nodeOptions);
1035  DRCEFunctionalObject fObj(nodeOptions.getNodeName(), nodeOptions.getHomeDir(), nodeOptions.getTasksDataDir(), nodeOptions.getTasksStatusDir());
1036  fObj.setNodeHost("NODE_HOST");
1037  fObj.setNodePort("NODE_PORT");
1038  fObj.setNodeName("NODE_NAME");
1039  fObj.setResourceMonitorTimePeriod(1000);
1040 // fObj.setNotificationFunctor(testFunctor);
1041 
1042  const std::string syncRequestJson = "{\"session\":{\"type\":0,\"tmode\":1,\"time_max\":40000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{\"LINES\":\"80\",\"HOSTNAME\":\"node22\"}]},\"limits\":{\"proc_max\":400},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[{\"name\":\"file1\",\"data\":\"test content of file1\\n\",\"action\":5},{\"name\":\"file2\",\"data\":\"test content of file2\\n\",\"action\":1}]}";
1043  const std::string shortRequestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"time_max\":40000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{\"LINES\":\"80\",\"HOSTNAME\":\"node22\"}]},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[{\"name\":\"file1\",\"data\":\"test content of file1\\n\",\"action\":5},{\"name\":\"file2\",\"data\":\"test content of file2\\n\",\"action\":1}]}";
1044  const std::string expiredRequestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"time_max\":5000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[{}]},\"command\":\"sleep 20\",\"input\":\"\",\"files\":[{\"name\":\"file1\",\"data\":\"test content of file1\\n\",\"action\":5},{\"name\":\"file2\",\"data\":\"test content of file2\\n\",\"action\":1}]}";
1045  const std::string longRequestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"cleanup\":1,\"time_max\":120000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[]},\"command\":\"echo \\\"scale=4000; 4*a(1)\\\" | bc -l && sleep 60 && ps xau\",\"input\":\"\",\"files\":[]}";
1046  const std::string bcJson = "{\"session\":{\"type\":0,\"tmode\":2,\"cleanup\":1,\"time_max\":120000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[]},\"command\":\"echo \\\"scale=4000; 4*a(1)\\\" | bc -l && echo \\\"scale=400; 4*a(1)\\\" | bc -l && sleep 10\",\"input\":\"\",\"files\":[]}";
1047 
1048  const size_t requestCount = 10;
1049 
1050  const unsigned int offsetTaskId = 500;
1051  std::vector<std::string> jsonList = /*{bcJson, bcJson, bcJson, bcJson};*/{syncRequestJson, shortRequestJson, expiredRequestJson, longRequestJson};
1052  std::vector<std::pair<unsigned int, std::string> > tasks;
1053  unsigned int count = 0;
1054  while(count < requestCount)
1055  {
1056  for (size_t k=0;k<jsonList.size();++k)
1057  {
1058  tasks.push_back(std::pair<unsigned int, std::string>(++count+offsetTaskId, jsonList[k]));
1059  if (count >= requestCount)
1060  break;
1061  }
1062  }
1063 
1064  for (size_t i=0;i<requestCount;++i)
1065  {
1066  DRCETaskThreadingModeTest::testSetTaskToExecuteRequest(fObj, tasks[i].first, tasks[i].second);
1067  }
1068 
1069  do
1070  {
1071  Poco::Thread::sleep(5000);
1072 
1073  StatVariables vars = fObj.getStatVariables();
1074  std::cout << std::string(80, '=') << std::endl;
1075 
1076  std::cout << " timeAsyncTask: " << vars.timeAsyncTask.min << "\t" << vars.timeAsyncTask.avg << "\t" << vars.timeAsyncTask.max << std::endl;
1077  std::cout << " timeSyncTask: " << vars.timeSyncTask.min << "\t" << vars.timeSyncTask.avg << "\t" << vars.timeSyncTask.max << std::endl;
1078  std::cout << " sizeInputBufferSyncTasks: " << vars.sizeInputBufferSyncTasks.min << "\t" << vars.sizeInputBufferSyncTasks.avg << "\t" << vars.sizeInputBufferSyncTasks.max << std::endl;
1079  std::cout << " sizeOutputBufferSyncTasks: " << vars.sizeOutputBufferSyncTasks.min << "\t" << vars.sizeOutputBufferSyncTasks.avg << "\t" << vars.sizeOutputBufferSyncTasks.max << std::endl;
1080  std::cout << " sizeInputBufferAsyncTasks: " << vars.sizeInputBufferAsyncTasks.min << "\t" << vars.sizeInputBufferAsyncTasks.avg << "\t" << vars.sizeInputBufferAsyncTasks.max << std::endl;
1081  std::cout << "sizeOutputBufferAsyncTasks: " << vars.sizeOutputBufferAsyncTasks.min << "\t" << vars.sizeOutputBufferAsyncTasks.avg << "\t" << vars.sizeOutputBufferAsyncTasks.max << std::endl;
1082  std::cout << " cpuUsageAsyncTasks: " << vars.cpuUsageAsyncTasks.min << "\t" << vars.cpuUsageAsyncTasks.avg << "\t" << vars.cpuUsageAsyncTasks.max << std::endl;
1083  std::cout << " memoryUsageAsyncTasks: " << vars.memoryUsageAsyncTasks.min << "\t" << vars.memoryUsageAsyncTasks.avg << "\t" << vars.memoryUsageAsyncTasks.max << std::endl;
1084  std::cout << " countSyncTasksForMinute: " << vars.countSyncTasksForMinute.min << "\t" << vars.countSyncTasksForMinute.avg << "\t" << vars.countSyncTasksForMinute.max << std::endl;
1085  std::cout << " countAsyncTasksForMinute: " << vars.countAsyncTasksForMinute.min << "\t" << vars.countAsyncTasksForMinute.avg << "\t" << vars.countAsyncTasksForMinute.max << std::endl;
1086 
1087  std::cout << " countSyncTasks: " << vars.countSyncTasks << std::endl;
1088  std::cout << " countAsyncTasks: " << vars.countAsyncTasks << std::endl;
1089  std::cout << " countSyncTasksFail: " << vars.countSyncTasksFail << std::endl;
1090  std::cout << " countAsyncTasksFail: " << vars.countAsyncTasksFail << std::endl;
1091 
1092  std::cout << std::string(80, '=') << std::endl;
1093 
1094  }while (fObj.getAsyncTasksCount());
1095 
1096  DRCECleanupTest::cleanupTestData(nodeOptions);
1097 }
1098 //-----------------------------------------------------------------------------
1100 {
1102  DRCEFunctionalObject fObj(nodeOptions.getNodeName(), nodeOptions.getHomeDir(), nodeOptions.getTasksDataDir(), nodeOptions.getTasksStatusDir());
1103  fObj.setNodeHost("NODE_HOST");
1104  fObj.setNodePort("NODE_PORT");
1105  fObj.setNodeName("NODE_NAME");
1106  const std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"cleanup\":0,\"time_max\":120000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[]},\"command\":\"echo \\\"scale=40; 4*a(1)\\\" | bc -l && sleep 10 && ps xau\",\"input\":\"\",\"files\":[]}";
1107 // const std::string requestJson = "{\"session\":{\"type\":0,\"tmode\":2,\"time_max\":40000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[]},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[]}";
1108 
1109 
1110  const unsigned int requestCount = 5;
1111  const unsigned int startTaskId = 500;
1112 
1113  for (unsigned int i=0;i<requestCount;++i)
1114  {
1115  // set request for execute task
1116  DRCEInputJsonMessage inputJsonMessage;
1117  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtSetTaskExecute);
1118  inputJsonMessage.setRequestData(requestJson);
1119  inputJsonMessage.setRequestId(startTaskId+i);
1120 
1121  std::stringstream sourceJson;
1122  sourceJson << inputJsonMessage;
1123 
1124  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
1125  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
1126  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
1127 
1128  std::string resultJson = fObj.Process(sourceJson.str());
1129 
1130  ASSERT_FALSE(fObj.isError()) << printError(fObj);
1131  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
1132  ASSERT_TRUE(fObj.getErrorMsg().empty());
1133 
1134  Poco::Thread::sleep(100);
1135 // std::cout << "currentTasksQueue: " << fObj.getCurrentTasksQueue() << std::endl << std::endl;
1136  }
1137 
1138  std::string currentTasksQueue = fObj.getCurrentTasksQueue();
1139  DRCETasksQueue tasksQueue;
1140  tasksQueue.unserialize(currentTasksQueue);
1141 
1142  ASSERT_FALSE(tasksQueue.isError()) << printError(tasksQueue);
1143 
1144  ASSERT_EQ(tasksQueue.getItemsCount(), requestCount);
1145 
1146  for (unsigned int i=0;i<tasksQueue.getItemsCount();++i)
1147  {
1148  ASSERT_EQ(tasksQueue.getItem(i).id, (startTaskId+i));
1149  ASSERT_TRUE(tasksQueue.getItem(i).sdate > 0);
1150  ASSERT_TRUE(tasksQueue.getItem(i).state==DRCETaskRequest::TaskState::SET_AS_NEW ||
1151  tasksQueue.getItem(i).state==DRCETaskRequest::TaskState::IN_PROGRESS ||
1152  tasksQueue.getItem(i).state==DRCETaskRequest::TaskState::FINISHED||
1153  tasksQueue.getItem(i).state==DRCETaskRequest::TaskState::CRASHED);
1154  ASSERT_EQ(tasksQueue.getItem(i).tmode, SessionOptions::ThreadMode::tmAsync);
1155  }
1156 
1157  while (fObj.getAsyncTasksCount() || fObj.hasTask(startTaskId))
1158  {
1159  Poco::Thread::sleep(5000);
1160 // std::cout << "currentTasksQueue: " << fObj.getCurrentTasksQueue() << std::endl << std::endl;
1161  }
1162  Poco::Thread::sleep(2000);
1163 
1164  currentTasksQueue = fObj.getCurrentTasksQueue();
1165  tasksQueue.unserialize(currentTasksQueue);
1166 
1167  ASSERT_FALSE(tasksQueue.isError()) << printError(tasksQueue);
1168  ASSERT_EQ(tasksQueue.getItemsCount(), 0);
1169 
1170  DRCECleanupTest::cleanupTestData(nodeOptions);
1171  printSuccess("!!! Test Getting Current Tasks Queue");
1172 }
1173 //-----------------------------------------------------------------------------
1175 {
1177  DRCEFunctionalObject fObj(nodeOptions.getNodeName(), nodeOptions.getHomeDir(), nodeOptions.getTasksDataDir(), nodeOptions.getTasksStatusDir());
1178  fObj.setNodeHost("NODE_HOST");
1179  fObj.setNodePort("NODE_PORT");
1180  fObj.setNodeName("NODE_NAME");
1181 
1182  const std::string requestJson = "{\"session\":{\"type\":0,\"cleanup\":0,\"tmode\":2,\"time_max\":40000,\"port\":0,\"user\":\"\",\"password\":\"\",\"shell\":\"\",\"environment\":[]},\"command\":\"sort\",\"input\":\"def\\nabc\\n123\\n\",\"files\":[]}";
1183 
1184  const unsigned int requestCount = 5;
1185  const unsigned int startTaskId = 700;
1186 
1187  for (unsigned int i=0;i<requestCount;++i)
1188  {
1189  // set request for execute task
1190  DRCEInputJsonMessage inputJsonMessage;
1191  inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtSetTaskExecute);
1192  inputJsonMessage.setRequestData(requestJson);
1193  inputJsonMessage.setRequestId(startTaskId+i);
1194 
1195  std::stringstream sourceJson;
1196  sourceJson << inputJsonMessage;
1197 
1198  ASSERT_FALSE(inputJsonMessage.isError()) << printError(inputJsonMessage);
1199  ASSERT_EQ(inputJsonMessage.getErrorCode(), NO_ERROR);
1200  ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
1201 
1202  std::string resultJson = fObj.Process(sourceJson.str());
1203 
1204  ASSERT_FALSE(fObj.isError()) << printError(fObj);
1205  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
1206  ASSERT_TRUE(fObj.getErrorMsg().empty());
1207  }
1208 
1209  while (fObj.getAsyncTasksCount())
1210  {
1211  Poco::Thread::sleep(2000);
1212  }
1213  Poco::Thread::sleep(2000);
1214  std::string listAllTasks = fObj.getListInfoAllTasks();
1215 
1216 // std::cout << "listAllTasks: " << listAllTasks << std::endl << std::endl;
1217 
1218  DRCEListAllTasks tasks;
1219  tasks.unserialize(listAllTasks);
1220 
1221  ASSERT_FALSE(tasks.isError()) << printError(tasks);
1222  ASSERT_EQ(tasks.getErrorCode(), NO_ERROR) << printError(tasks);
1223  ASSERT_TRUE(tasks.getErrorMsg().empty()) << printError(tasks);
1224 
1225  ASSERT_TRUE(tasks.getItemsCount() >= requestCount) << "ItemsCount: " << tasks.getItemsCount();
1226 
1227  std::vector<unsigned int> ids;
1228  for (unsigned int i=0;i<requestCount;++i)
1229  ids.push_back(startTaskId+i);
1230 
1231  unsigned int found = 0;
1232  for (unsigned int i=0;i<tasks.getItemsCount();++i)
1233  {
1234  for (unsigned int k=0;k<ids.size();++k)
1235  {
1236  if (tasks.getItem(i).taskId == ids[k])
1237  {
1238  ASSERT_FALSE(tasks.getItem(i).infoData.empty());
1239  ASSERT_TRUE(tasks.getItem(i).timeModified > 0);
1240  ++found;
1241  break;
1242  }
1243  }
1244  }
1245  ASSERT_EQ(found, requestCount) << "Count found of tasks: " << found;
1246 
1247  for (unsigned int i=0;i<ids.size();++i)
1248  {
1249  bool res = DRCETaskThreadingModeTest::isExistTask(fObj, ids[i]);
1250  ASSERT_EQ(res, true) << "Task data not found" << ids[i];
1251  }
1252 
1253  DRCECleanupTest::cleanupTestData(nodeOptions);
1254  printSuccess("!!! Test Getting Current Tasks Queue");
1255 }
1256 //-----------------------------------------------------------------------------
1257 //-----------------------------------------------------------------------------
1259  unsigned int requestId,
1260  SessionOptions::ThreadMode threadMode,
1261  SessionOptions::CleanupFlag cleanupFlag)
1262 {
1263  const std::string commandLine = "echo \"Hello\" && sleep 10 && ps xau && sleep 10 && ls -la";
1264 // const std::string commandLine = "\"scale=4000; 4*a(1)\" | bc -l && ps xau && ls -la";
1265  std::string sourceJson = DRCECleanupTest::getInputJson(requestId, threadMode, cleanupFlag, commandLine, 5000);
1266 
1267 // std::cout << "sourceJson: " << sourceJson << std::endl;
1268 
1269  std::string resultJson = fObj.Process(sourceJson);
1270 
1271  ASSERT_FALSE(fObj.isError()) << printError(fObj);
1272  ASSERT_EQ(fObj.getErrorCode(), NO_ERROR);
1273  ASSERT_TRUE(fObj.getErrorMsg().empty());
1274  printSuccess("DRCEFunctionalObject Process");
1275 
1276 // std::cout << "result JSON: " << resultJson << std::endl;
1277 
1278  // check right result (unserialize and compare two objects)
1279  DRCEOutputJsonMessage resultJsonMessage(resultJson);
1280 
1281  ASSERT_FALSE(resultJsonMessage.isError()) << printError(resultJsonMessage);
1282  ASSERT_EQ(resultJsonMessage.getErrorCode(), NO_ERROR);
1283  ASSERT_TRUE(resultJsonMessage.getErrorMsg().empty());
1284  printSuccess("DRCEOutputJsonMessage unserialize");
1285 
1286  if (threadMode==SessionOptions::ThreadMode::tmSync)
1287  {
1288  ASSERT_EQ(resultJsonMessage.getResultData().getItemsCount(), 1);
1289  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getRequestId(), requestId);
1290  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getErrorCode(), ERROR_TERMINATE_EXPIRED_TASK);
1291  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeName(), "NODE_NAME");
1292  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodeHost(), "NODE_HOST");
1293  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getNodePort(), "NODE_PORT");
1294  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getState(), static_cast<unsigned int>(DRCETaskRequest::TaskState::TERMINATED_BY_TTL));
1295  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getSubtasksCount(), 2);
1296 
1297  ASSERT_FALSE(resultJsonMessage.getResultData().getDataItem(0).getSubtaskItem(0).getErrorMessage().empty());
1299  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getSubtaskItem(0).getNodeName(), "NODE_NAME");
1300  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getSubtaskItem(0).getNodeHost(), "NODE_HOST");
1301  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getSubtaskItem(0).getNodePort(), "NODE_PORT");
1302  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getSubtaskItem(0).getState(), static_cast<unsigned int>(DRCETaskRequest::TaskState::TERMINATED_BY_TTL));
1303  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getSubtaskItem(0).getSubtasksCount(), 2);
1304 
1305  ASSERT_FALSE(resultJsonMessage.getResultData().getDataItem(0).getSubtaskItem(1).getErrorMessage().empty());
1307  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getSubtaskItem(1).getNodeName(), "NODE_NAME");
1308  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getSubtaskItem(1).getNodeHost(), "NODE_HOST");
1309  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getSubtaskItem(1).getNodePort(), "NODE_PORT");
1310  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getSubtaskItem(1).getState(), static_cast<unsigned int>(DRCETaskRequest::TaskState::TERMINATED_BY_TTL));
1311  ASSERT_EQ(resultJsonMessage.getResultData().getDataItem(0).getSubtaskItem(1).getSubtasksCount(), 2);
1312 
1313  printSuccess("!!! Test Task Request Expired Time for Sync mode");
1314  }
1315  else
1316  {
1317  while (fObj.getAsyncTasksCount() || fObj.hasTask(requestId))
1318  {
1319  Poco::Thread::sleep(5000);
1320  }
1321 
1322  Poco::File dataFile(FileStream::getDataFileName(fObj.getTasksDataDir(), requestId));
1323 
1324  ASSERT_EQ((cleanupFlag==SessionOptions::CleanupFlag::cfDelete), !dataFile.exists());
1325 
1326  if (dataFile.exists())
1327  {
1328  DRCEResultDataItem resultDataItem = DRCECleanupTest::extractResultData(requestId, fObj);
1329 
1330  ASSERT_EQ(resultDataItem.getRequestId(), requestId);
1331  ASSERT_EQ(resultDataItem.getErrorCode(), ERROR_TERMINATE_EXPIRED_TASK) << "ErrorCode: " << resultDataItem.getErrorCode();
1332  ASSERT_FALSE(resultDataItem.getErrorMessage().empty());
1333  ASSERT_EQ(resultDataItem.getNodeName(), fObj.getNodeName());
1334  ASSERT_EQ(resultDataItem.getNodeHost(), fObj.getNodeHost());
1335  ASSERT_EQ(resultDataItem.getNodePort(), fObj.getNodePort());
1336  ASSERT_EQ(resultDataItem.getState(), static_cast<unsigned int>(DRCETaskRequest::TaskState::TERMINATED_BY_TTL));
1337  ASSERT_EQ(resultDataItem.getSubtasksCount(), 2);
1338 
1340  resultDataItem.getSubtaskItem(0).getErrorCode()==NO_ERROR) << "ErrorCode: " << resultDataItem.getSubtaskItem(0).getErrorCode();
1341 
1343  ASSERT_FALSE(resultDataItem.getSubtaskItem(0).getErrorMessage().empty());
1344  else
1345  ASSERT_TRUE(resultDataItem.getSubtaskItem(0).getErrorMessage().empty());
1346 
1347  ASSERT_EQ(resultDataItem.getSubtaskItem(0).getNodeName(), fObj.getNodeName());
1348  ASSERT_EQ(resultDataItem.getSubtaskItem(0).getNodeHost(), fObj.getNodeHost());
1349  ASSERT_EQ(resultDataItem.getSubtaskItem(0).getNodePort(), fObj.getNodePort());
1350  ASSERT_TRUE(resultDataItem.getSubtaskItem(0).getState()==static_cast<unsigned int>(DRCETaskRequest::TaskState::TERMINATED_BY_TTL))
1351  << "State: " << resultDataItem.getSubtaskItem(0).getState();
1352 
1353  ASSERT_EQ(resultDataItem.getSubtaskItem(0).getSubtasksCount(), 2);
1354 
1356  resultDataItem.getSubtaskItem(1).getErrorCode()==NO_ERROR) << "ErrorCode: " << resultDataItem.getSubtaskItem(1).getErrorCode();
1357 
1359  ASSERT_FALSE(resultDataItem.getSubtaskItem(1).getErrorMessage().empty());
1360  else
1361  ASSERT_TRUE(resultDataItem.getSubtaskItem(1).getErrorMessage().empty());
1362 
1363  ASSERT_EQ(resultDataItem.getSubtaskItem(1).getNodeName(), fObj.getNodeName());
1364  ASSERT_EQ(resultDataItem.getSubtaskItem(1).getNodeHost(), fObj.getNodeHost());
1365  ASSERT_EQ(resultDataItem.getSubtaskItem(1).getNodePort(), fObj.getNodePort());
1366  ASSERT_EQ(resultDataItem.getSubtaskItem(1).getState(), static_cast<unsigned int>(DRCETaskRequest::TaskState::TERMINATED_BY_TTL))
1367  << "State: " << resultDataItem.getSubtaskItem(1).getState();
1368 
1369  ASSERT_EQ(resultDataItem.getSubtaskItem(1).getSubtasksCount(), 2);
1370  }
1371  printSuccess("!!! Test Task Request Expired Time for Async mode");
1372  }
1374 }
1375 //-----------------------------------------------------------------------------
1376 //-----------------------------------------------------------------------------
1377 } // namespace tests
1378 } // namespace drce
1379 } // namespace HCE