1 #include <gtest/gtest.h>
38 catch(Poco::Exception& e)
40 printError(
"Extract result data ("+e.message()+
")");
42 return resultDataItem;
45 std::string DRCECleanupTest::getInputJson(
unsigned int startTaskId,
51 DRCEInputJsonMessage inputJsonMessage = getInputJsonMessage(startTaskId, threadMode, cleanupFlag, commandLine, timeMax);
53 const unsigned int SZ = 6;
55 for (
unsigned int i=0;i<SZ;++i)
57 subtasks[i] = getInputJsonMessage(++startTaskId,
58 (!(i%2))?SessionOptions::ThreadMode::tmAsync:SessionOptions::ThreadMode::tmSync,
59 SessionOptions::CleanupFlag::cfNotDelete,
73 std::stringstream jsonStream;
74 jsonStream << inputJsonMessage;
76 return jsonStream.str();
86 taskRequestSetExecute.
setRequestType(DRCETaskRequest::RequestType::rtSetTaskExecute);
89 if (commandLine.empty())
98 sessionOptions.
tmode = threadMode;
99 sessionOptions.
cleanup = cleanupFlag;
102 taskRequestSetExecute.
setSessionOptions(std::forward<SessionOptions>(sessionOptions));
104 std::stringstream jsonStream;
106 jsonStream << taskRequestSetExecute;
110 inputJsonMessage.
setRequestType(DRCETaskRequest::RequestType::rtSetTaskExecute);
113 return inputJsonMessage;
116 std::string DRCECleanupTest::getInputJson(
unsigned int startTaskId,
120 DRCEInputJsonMessage inputJsonMessage = getInputJsonMessage(startTaskId, taskThreadMode, SessionOptions::CleanupFlag::cfNotDelete);
122 const unsigned int SZ = 6;
123 DRCEInputJsonMessage subtasks[SZ];
124 for (
unsigned int i=0;i<SZ;++i)
126 subtasks[i] = getInputJsonMessage(++startTaskId, subtasksThreadMode, SessionOptions::CleanupFlag::cfNotDelete);
129 subtasks[0].addSubtaskItem(subtasks[2]);
130 subtasks[0].addSubtaskItem(subtasks[3]);
132 subtasks[1].addSubtaskItem(subtasks[4]);
133 subtasks[1].addSubtaskItem(subtasks[5]);
135 inputJsonMessage.addSubtaskItem(subtasks[0]);
136 inputJsonMessage.addSubtaskItem(subtasks[1]);
138 std::stringstream jsonStream;
139 jsonStream << inputJsonMessage;
141 return jsonStream.str();
146 DRCEInputJsonMessage inputJsonMessage = getInputJsonMessage(startTaskId, taskThreadMode, SessionOptions::CleanupFlag::cfNotDelete);
148 const unsigned int SZ = 14;
149 DRCEInputJsonMessage subtasks[SZ];
150 for (
unsigned int i=0;i<SZ;++i)
152 subtasks[i] = getInputJsonMessage(++startTaskId,
153 (!(i%2))?SessionOptions::ThreadMode::tmAsync:SessionOptions::ThreadMode::tmSync,
154 SessionOptions::CleanupFlag::cfNotDelete);
157 subtasks[2].addSubtaskItem(subtasks[6]);
158 subtasks[2].addSubtaskItem(subtasks[7]);
160 subtasks[3].addSubtaskItem(subtasks[8]);
161 subtasks[3].addSubtaskItem(subtasks[9]);
163 subtasks[4].addSubtaskItem(subtasks[10]);
164 subtasks[4].addSubtaskItem(subtasks[11]);
166 subtasks[5].addSubtaskItem(subtasks[12]);
167 subtasks[5].addSubtaskItem(subtasks[13]);
170 subtasks[0].addSubtaskItem(subtasks[2]);
171 subtasks[0].addSubtaskItem(subtasks[3]);
173 subtasks[1].addSubtaskItem(subtasks[4]);
174 subtasks[1].addSubtaskItem(subtasks[5]);
176 inputJsonMessage.addSubtaskItem(subtasks[0]);
177 inputJsonMessage.addSubtaskItem(subtasks[1]);
179 std::stringstream jsonStream;
180 jsonStream << inputJsonMessage;
182 return jsonStream.str();
185 std::string DRCECleanupTest::getInputJsonCleanup(
unsigned int startTaskId,
SessionOptions::ThreadMode taskThreadMode,
const std::string& commandLine,
unsigned int timeMax)
187 DRCEInputJsonMessage inputJsonMessage = getInputJsonMessage(startTaskId, taskThreadMode, SessionOptions::CleanupFlag::cfNotDelete, commandLine, timeMax);
189 const unsigned int SZ = 14;
190 DRCEInputJsonMessage subtasks[SZ];
191 for (
unsigned int i=0;i<SZ;++i)
193 subtasks[i] = getInputJsonMessage(++startTaskId,
194 (!(i%2))?SessionOptions::ThreadMode::tmAsync:SessionOptions::ThreadMode::tmSync,
195 (!(i%2))?SessionOptions::CleanupFlag::cfDelete:SessionOptions::CleanupFlag::cfNotDelete,
200 subtasks[2].addSubtaskItem(subtasks[6]);
201 subtasks[2].addSubtaskItem(subtasks[7]);
203 subtasks[3].addSubtaskItem(subtasks[8]);
204 subtasks[3].addSubtaskItem(subtasks[9]);
206 subtasks[4].addSubtaskItem(subtasks[10]);
207 subtasks[4].addSubtaskItem(subtasks[11]);
209 subtasks[5].addSubtaskItem(subtasks[12]);
210 subtasks[5].addSubtaskItem(subtasks[13]);
213 subtasks[0].addSubtaskItem(subtasks[2]);
214 subtasks[0].addSubtaskItem(subtasks[3]);
216 subtasks[1].addSubtaskItem(subtasks[4]);
217 subtasks[1].addSubtaskItem(subtasks[5]);
219 inputJsonMessage.addSubtaskItem(subtasks[0]);
220 inputJsonMessage.addSubtaskItem(subtasks[1]);
222 std::stringstream jsonStream;
223 jsonStream << inputJsonMessage;
225 return jsonStream.str();
231 DRCEInputJsonMessage inputJsonMessage = getInputJsonMessage(startTaskId, taskThreadMode, SessionOptions::CleanupFlag::cfNotDelete, commandLine);
233 const unsigned int SZ = 14;
234 DRCEInputJsonMessage subtasks[SZ];
235 for (
unsigned int i=0;i<SZ;++i)
237 subtasks[i] = getInputJsonMessage(++startTaskId,
238 (!(i%2))?SessionOptions::ThreadMode::tmAsync:SessionOptions::ThreadMode::tmSync, cleanupFlag, commandLine);
241 subtasks[2].addSubtaskItem(subtasks[6]);
242 subtasks[2].addSubtaskItem(subtasks[7]);
244 subtasks[3].addSubtaskItem(subtasks[8]);
245 subtasks[3].addSubtaskItem(subtasks[9]);
247 subtasks[4].addSubtaskItem(subtasks[10]);
248 subtasks[4].addSubtaskItem(subtasks[11]);
250 subtasks[5].addSubtaskItem(subtasks[12]);
251 subtasks[5].addSubtaskItem(subtasks[13]);
254 subtasks[0].addSubtaskItem(subtasks[2]);
255 subtasks[0].addSubtaskItem(subtasks[3]);
257 subtasks[1].addSubtaskItem(subtasks[4]);
258 subtasks[1].addSubtaskItem(subtasks[5]);
260 inputJsonMessage.addSubtaskItem(subtasks[0]);
261 inputJsonMessage.addSubtaskItem(subtasks[1]);
263 std::stringstream jsonStream;
264 jsonStream << inputJsonMessage;
266 return jsonStream.str();
269 bool DRCECleanupTest::isExistTaskData(
unsigned int taskId, DRCENodeOptions& nodeOptions)
275 return (dataFile.exists() && statusFile.exists() && requestFile.exists());
278 bool DRCECleanupTest::isNotExistTaskData(
unsigned int taskId, DRCENodeOptions& nodeOptions)
284 return (!dataFile.exists() && !statusFile.exists() && !requestFile.exists());
287 void DRCECleanupTest::cleanupDir(
const std::string& dirName)
289 Poco::File directory(dirName);
290 if (directory.exists())
292 std::vector<std::string> files;
293 directory.list(files);
294 for (
const std::string
fileName : files)
305 #if CLEANUP_TEST == 1
318 fObj.setNodePort(
"NODE_PORT");
319 fObj.setNodeName(
"NODE_NAME");
321 unsigned int startTaskId = 600;
323 std::string sourceJson = getInputJson(startTaskId, SessionOptions::ThreadMode::tmAsync, SessionOptions::ThreadMode::tmAsync);
327 std::string resultJson = fObj.Process(sourceJson);
329 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
330 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
331 ASSERT_TRUE(fObj.getErrorMsg().empty());
332 printSuccess(
"DRCEFunctionalObject Process");
341 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
342 printSuccess(
"DRCEOutputJsonMessage unserialize");
377 while (fObj.getAsyncTasksCount())
379 Poco::Thread::sleep(5000);
380 std::cout <<
"wait end task" <<
std::endl;
382 Poco::Thread::sleep(2000);
388 ASSERT_EQ(resultDataItem.
getNodeName(), fObj.getNodeName());
389 ASSERT_EQ(resultDataItem.
getNodeHost(), fObj.getNodeHost());
390 ASSERT_EQ(resultDataItem.
getNodePort(), fObj.getNodePort());
391 ASSERT_EQ(resultDataItem.
getState(), DRCETaskRequest::TaskState::FINISHED);
416 for (
unsigned int i=0;i<6;++i)
418 ASSERT_TRUE(isExistTaskData(startTaskId+i, nodeOptions));
421 printSuccess(
"!!! Test Async task and Async subtasks");
430 fObj.setNodePort(
"NODE_PORT");
431 fObj.setNodeName(
"NODE_NAME");
433 unsigned int startTaskId = 600;
435 std::string sourceJson = getInputJson(startTaskId, SessionOptions::ThreadMode::tmAsync, SessionOptions::ThreadMode::tmSync);
439 std::string resultJson = fObj.Process(sourceJson);
441 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
442 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
443 ASSERT_TRUE(fObj.getErrorMsg().empty());
444 printSuccess(
"DRCEFunctionalObject Process");
453 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
454 printSuccess(
"DRCEOutputJsonMessage unserialize");
491 while (fObj.getAsyncTasksCount())
493 Poco::Thread::sleep(5000);
494 std::cout <<
"wait end task" <<
std::endl;
496 Poco::Thread::sleep(2000);
502 ASSERT_EQ(resultDataItem.
getNodeName(), fObj.getNodeName());
503 ASSERT_EQ(resultDataItem.
getNodeHost(), fObj.getNodeHost());
504 ASSERT_EQ(resultDataItem.
getNodePort(), fObj.getNodePort());
505 ASSERT_EQ(resultDataItem.
getState(), DRCETaskRequest::TaskState::FINISHED);
524 for (
unsigned int i=0;i<6;++i)
526 ASSERT_TRUE(isExistTaskData(startTaskId+i, nodeOptions));
529 printSuccess(
"!!! Test Async task and Sync subtasks");
538 fObj.setNodePort(
"NODE_PORT");
539 fObj.setNodeName(
"NODE_NAME");
541 unsigned int startTaskId = 600;
543 std::string sourceJson = getInputJson(startTaskId, SessionOptions::ThreadMode::tmSync, SessionOptions::ThreadMode::tmAsync);
547 std::string resultJson = fObj.Process(sourceJson);
549 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
550 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
551 ASSERT_TRUE(fObj.getErrorMsg().empty());
552 printSuccess(
"DRCEFunctionalObject Process");
561 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
562 printSuccess(
"DRCEOutputJsonMessage unserialize");
564 while (fObj.getAsyncTasksCount())
566 Poco::Thread::sleep(5000);
567 std::cout <<
"wait end task" <<
std::endl;
569 Poco::Thread::sleep(2000);
601 for (
unsigned int i=0;i<6;++i)
603 ASSERT_TRUE(isExistTaskData(startTaskId+i, nodeOptions));
606 printSuccess(
"!!! Test Sync task and Async subtasks");
615 fObj.setNodePort(
"NODE_PORT");
616 fObj.setNodeName(
"NODE_NAME");
618 unsigned int startTaskId = 600;
620 std::string sourceJson = getInputJson(startTaskId, SessionOptions::ThreadMode::tmSync, SessionOptions::ThreadMode::tmSync);
624 std::string resultJson = fObj.Process(sourceJson);
626 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
627 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
628 ASSERT_TRUE(fObj.getErrorMsg().empty());
629 printSuccess(
"DRCEFunctionalObject Process");
638 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
639 printSuccess(
"DRCEOutputJsonMessage unserialize");
641 while (fObj.getAsyncTasksCount() || fObj.hasTask(startTaskId))
643 Poco::Thread::sleep(5000);
644 std::cout <<
"wait end task" <<
std::endl;
646 Poco::Thread::sleep(5000);
673 for (
unsigned int i=0;i<6;++i)
675 ASSERT_TRUE(isExistTaskData((startTaskId+i), nodeOptions)) <<
" Task ID: " << (startTaskId+i);
678 printSuccess(
"!!! Test Sync task and Sync subtasks");
687 fObj.setNodePort(
"NODE_PORT");
688 fObj.setNodeName(
"NODE_NAME");
690 unsigned int startTaskId = 600;
692 std::string sourceJson = getInputJson(startTaskId, SessionOptions::ThreadMode::tmAsync);
696 std::string resultJson = fObj.Process(sourceJson);
698 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
699 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
700 ASSERT_TRUE(fObj.getErrorMsg().empty());
701 printSuccess(
"DRCEFunctionalObject Process");
710 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
711 printSuccess(
"DRCEOutputJsonMessage unserialize");
746 while (fObj.getAsyncTasksCount())
748 Poco::Thread::sleep(5000);
749 std::cout <<
"wait end task" <<
std::endl;
751 Poco::Thread::sleep(2000);
757 ASSERT_EQ(resultDataItem.
getNodeName(), fObj.getNodeName());
758 ASSERT_EQ(resultDataItem.
getNodeHost(), fObj.getNodeHost());
759 ASSERT_EQ(resultDataItem.
getNodePort(), fObj.getNodePort());
760 ASSERT_EQ(resultDataItem.
getState(), DRCETaskRequest::TaskState::FINISHED);
779 for (
unsigned int i=0;i<14;++i)
781 ASSERT_TRUE(isExistTaskData(startTaskId+i, nodeOptions));
784 printSuccess(
"!!! Test Async task and Mix subtasks");
793 fObj.setNodePort(
"NODE_PORT");
794 fObj.setNodeName(
"NODE_NAME");
796 unsigned int startTaskId = 600;
798 std::string sourceJson = getInputJson(startTaskId, SessionOptions::ThreadMode::tmSync);
802 std::string resultJson = fObj.Process(sourceJson);
804 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
805 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
806 ASSERT_TRUE(fObj.getErrorMsg().empty());
807 printSuccess(
"DRCEFunctionalObject Process");
816 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
817 printSuccess(
"DRCEOutputJsonMessage unserialize");
819 while (fObj.getAsyncTasksCount())
821 Poco::Thread::sleep(5000);
822 std::cout <<
"wait end task" <<
std::endl;
824 Poco::Thread::sleep(5000);
855 for (
unsigned int i=0;i<14;++i)
857 ASSERT_TRUE(isExistTaskData(startTaskId+i, nodeOptions));
860 printSuccess(
"!!! Test Sync task and Mix subtasks");
869 fObj.setNodePort(
"NODE_PORT");
870 fObj.setNodeName(
"NODE_NAME");
872 unsigned int startTaskId = 600;
874 std::string sourceJson = getInputJsonCleanup(startTaskId, SessionOptions::ThreadMode::tmAsync,
"");
878 std::string resultJson = fObj.Process(sourceJson);
880 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
881 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
882 ASSERT_TRUE(fObj.getErrorMsg().empty());
883 printSuccess(
"DRCEFunctionalObject Process");
885 std::cout <<
"result JSON: " << resultJson <<
std::endl;
892 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
893 printSuccess(
"DRCEOutputJsonMessage unserialize");
895 while (fObj.getAsyncTasksCount() || fObj.hasTask(startTaskId))
897 Poco::Thread::sleep(5000);
898 std::cout <<
"wait end task" <<
std::endl;
900 Poco::Thread::sleep(2000);
906 ASSERT_EQ(resultDataItem.
getNodeName(), fObj.getNodeName());
907 ASSERT_EQ(resultDataItem.
getNodeHost(), fObj.getNodeHost());
908 ASSERT_EQ(resultDataItem.
getNodePort(), fObj.getNodePort());
909 ASSERT_EQ(resultDataItem.
getState(), DRCETaskRequest::TaskState::FINISHED);
928 ASSERT_TRUE(isExistTaskData(startTaskId, nodeOptions));
929 ASSERT_TRUE(isExistTaskData(startTaskId+2, nodeOptions));
930 ASSERT_TRUE(isExistTaskData(startTaskId+6, nodeOptions));
931 ASSERT_TRUE(isExistTaskData(startTaskId+14, nodeOptions));
934 printSuccess(
"!!! Test Async task and Mix subtasks Cleanup");
943 fObj.setNodePort(
"NODE_PORT");
944 fObj.setNodeName(
"NODE_NAME");
946 unsigned int startTaskId = 600;
948 std::string sourceJson = getInputJsonCleanup(startTaskId, SessionOptions::ThreadMode::tmSync,
"");
952 std::string resultJson = fObj.Process(sourceJson);
954 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
955 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
956 ASSERT_TRUE(fObj.getErrorMsg().empty());
957 printSuccess(
"DRCEFunctionalObject Process");
966 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
967 printSuccess(
"DRCEOutputJsonMessage unserialize");
969 while (fObj.getAsyncTasksCount())
971 Poco::Thread::sleep(5000);
972 std::cout <<
"wait end task" <<
std::endl;
974 Poco::Thread::sleep(2000);
980 ASSERT_EQ(resultDataItem.
getNodeName(), fObj.getNodeName());
981 ASSERT_EQ(resultDataItem.
getNodeHost(), fObj.getNodeHost());
982 ASSERT_EQ(resultDataItem.
getNodePort(), fObj.getNodePort());
983 ASSERT_EQ(resultDataItem.
getState(), DRCETaskRequest::TaskState::FINISHED);
1002 ASSERT_TRUE(isExistTaskData(startTaskId, nodeOptions));
1003 ASSERT_TRUE(isExistTaskData(startTaskId+2, nodeOptions));
1004 ASSERT_TRUE(isExistTaskData(startTaskId+6, nodeOptions));
1005 ASSERT_TRUE(isExistTaskData(startTaskId+14, nodeOptions));
1008 printSuccess(
"!!! Test Sync task and Mix subtasks Cleanup");
1017 fObj.setNodePort(
"NODE_PORT");
1018 fObj.setNodeName(
"NODE_NAME");
1020 unsigned int startTaskId = 600;
1022 std::string sourceJson = getInputJsonCleanup(startTaskId, SessionOptions::ThreadMode::tmAsync,
"");
1026 std::string resultJson = fObj.Process(sourceJson);
1028 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
1029 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
1030 ASSERT_TRUE(fObj.getErrorMsg().empty());
1031 printSuccess(
"DRCEFunctionalObject Process");
1040 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
1041 printSuccess(
"DRCEOutputJsonMessage unserialize");
1043 while (fObj.getAsyncTasksCount() || fObj.hasTask(startTaskId))
1045 Poco::Thread::sleep(5000);
1046 std::cout <<
"wait end task" <<
std::endl;
1048 Poco::Thread::sleep(5000);
1053 taskRequestGetData.
setFetchType(DRCETaskRequestGetData::FetchType::ftDeleteDataAfterFetch);
1055 ASSERT_FALSE(taskRequestGetData.
isError());
1056 printSuccess(
"DRCETaskRequestGetData serialize");
1062 inputJsonMessage.
setRequestType(DRCETaskRequest::RequestType::rtGetTaskData);
1066 std::stringstream sourceJsonStream;
1067 sourceJsonStream << inputJsonMessage;
1071 ASSERT_FALSE(inputJsonMessage.isError()) <<
printError(inputJsonMessage);
1072 ASSERT_EQ(inputJsonMessage.getErrorCode(),
NO_ERROR);
1073 ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
1074 printSuccess(
"DRCEInputJsonMessage serialize");
1076 resultJson = fObj.Process(sourceJsonStream.str());
1078 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
1079 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
1080 ASSERT_TRUE(fObj.getErrorMsg().empty());
1081 printSuccess(
"DRCEFunctionalObject Process");
1090 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
1091 printSuccess(
"DRCEOutputJsonMessage unserialize");
1127 Poco::Thread::sleep(5000);
1128 for (
unsigned int i=0;i<14;++i)
1130 ASSERT_TRUE(isNotExistTaskData((startTaskId+i), nodeOptions)) <<
"Task ID: " << (startTaskId+i);
1134 printSuccess(
"!!! Test Request Get Data with Cleanup");
1143 fObj.setNodePort(
"NODE_PORT");
1144 fObj.setNodeName(
"NODE_NAME");
1146 unsigned int startTaskId = 700;
1148 std::string sourceJson = getInputJsonCleanup(startTaskId, SessionOptions::ThreadMode::tmAsync,
"echo \"Hello\" && sleep 5 && ps ", 100000);
1152 std::string resultJson = fObj.Process(sourceJson);
1154 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
1155 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
1156 ASSERT_TRUE(fObj.getErrorMsg().empty());
1157 printSuccess(
"DRCEFunctionalObject Process");
1166 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
1167 printSuccess(
"DRCEOutputJsonMessage unserialize");
1169 while (fObj.getAsyncTasksCount() || fObj.hasTask(startTaskId))
1171 Poco::Thread::sleep(10000);
1172 std::cout <<
"wait end task" <<
std::endl;
1174 Poco::Thread::sleep(10000);
1180 ASSERT_FALSE(taskRequestDeleteData.
isError());
1181 printSuccess(
"DRCETaskRequestDeleteData serialize");
1187 inputJsonMessage.
setRequestType(DRCETaskRequest::RequestType::rtDeleteTaskData);
1191 std::stringstream sourceJsonStream;
1192 sourceJsonStream << inputJsonMessage;
1196 ASSERT_FALSE(inputJsonMessage.isError()) <<
printError(inputJsonMessage);
1197 ASSERT_EQ(inputJsonMessage.getErrorCode(),
NO_ERROR) <<
printError(inputJsonMessage);
1198 ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty()) <<
printError(inputJsonMessage);
1199 printSuccess(
"DRCEInputJsonMessage serialize");
1201 resultJson = fObj.Process(sourceJsonStream.str());
1203 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
1204 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
1205 ASSERT_TRUE(fObj.getErrorMsg().empty());
1206 printSuccess(
"DRCEFunctionalObject Process");
1208 std::cout <<
"result JSON: " << resultJson <<
std::endl;
1216 printSuccess(
"DRCEOutputJsonMessage unserialize");
1243 Poco::Thread::sleep(5000);
1244 for (
unsigned int i=0;i<14;++i)
1246 ASSERT_TRUE(isNotExistTaskData((startTaskId+i), nodeOptions)) <<
"Task ID: " << (startTaskId+i);
1250 printSuccess(
"!!! Test Request Delete Data with Cleanup");
1259 fObj.setNodePort(
"NODE_PORT");
1260 fObj.setNodeName(
"NODE_NAME");
1262 unsigned int startTaskId(600), terminateTaskId(603);
1263 const std::string commandLine =
"echo \"scale=400; 4*a(1)\" | bc -l && sleep 10 && ps xau";
1265 std::string sourceJson = getInputJsonCleanup(startTaskId, SessionOptions::ThreadMode::tmAsync, SessionOptions::CleanupFlag::cfNotDelete, commandLine);
1269 std::string resultJson = fObj.Process(sourceJson);
1271 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
1272 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
1273 ASSERT_TRUE(fObj.getErrorMsg().empty());
1274 printSuccess(
"DRCEFunctionalObject Process");
1283 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
1284 printSuccess(
"DRCEOutputJsonMessage unserialize");
1290 while (!statusFile.exists() || tryCount-->0)
1292 Poco::Thread::sleep(2000);
1293 std::cout <<
"Wait appear task (" << terminateTaskId <<
") in async queue ..." <<
std::endl;
1295 Poco::Thread::sleep(2000);
1300 taskRequestTerminate.
setAlgorithmType(DRCETaskRequestTerminate::AlgorithmType::atDefault);
1304 taskRequestTerminate.
setCleanupFlag(DRCETaskRequestTerminate::CleanupFlag::cfDelete);
1306 ASSERT_FALSE(taskRequestTerminate.
isError());
1307 printSuccess(
"DRCETaskRequestTerminate serialize");
1313 inputJsonMessage.
setRequestType(DRCETaskRequest::RequestType::rtTerminateTask);
1317 std::stringstream sourceJsonStream;
1318 sourceJsonStream << inputJsonMessage;
1322 ASSERT_FALSE(inputJsonMessage.isError()) <<
printError(inputJsonMessage);
1323 ASSERT_EQ(inputJsonMessage.getErrorCode(),
NO_ERROR);
1324 ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty());
1325 printSuccess(
"DRCEInputJsonMessage serialize");
1327 resultJson = fObj.Process(sourceJsonStream.str());
1329 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
1330 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
1331 ASSERT_TRUE(fObj.getErrorMsg().empty());
1332 printSuccess(
"DRCEFunctionalObject Process");
1334 std::cout <<
"result JSON: " << resultJson <<
std::endl;
1341 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
1342 printSuccess(
"DRCEOutputJsonMessage unserialize");
1344 while (fObj.getAsyncTasksCount() || fObj.hasTask(startTaskId))
1346 Poco::Thread::sleep(5000);
1347 std::cout <<
"wait end task" <<
std::endl;
1349 Poco::Thread::sleep(5000);
1377 Poco::Thread::sleep(10000);
1378 std::vector<unsigned int> cleanupTasks = {603, 607, 608};
1379 for (
unsigned int i=0;i<cleanupTasks.size();++i)
1381 ASSERT_TRUE(isNotExistTaskData(cleanupTasks[i], nodeOptions)) <<
"isNotExistTaskData cleanupTasks[" << i <<
"]: " << cleanupTasks[i];
1384 for (
unsigned int i=0;i<14;++i)
1386 unsigned int taskId = startTaskId+i;
1387 bool isNotExist =
false;
1388 for (
unsigned int i=0;i<cleanupTasks.size();++i)
1390 if (cleanupTasks[i]==taskId)
1398 ASSERT_TRUE(isExistTaskData(taskId, nodeOptions)) <<
" isExistTaskData taskId: " <<
taskId;
1403 printSuccess(
"!!! Test Request Terminate with Cleanup");
1412 fObj.setNodePort(
"NODE_PORT");
1413 fObj.setNodeName(
"NODE_NAME");
1415 unsigned int startTaskId = 600;
1417 DRCEInputJsonMessage inputJsonMessage = getInputJsonMessage(startTaskId, SessionOptions::ThreadMode::tmAsync, SessionOptions::CleanupFlag::cfNotDelete,
1418 "echo \"Hello\" && sleep 5 && ps xau",
1421 std::stringstream jsonStream;
1422 jsonStream << inputJsonMessage;
1424 ASSERT_FALSE(inputJsonMessage.isError()) <<
printError(inputJsonMessage);
1425 ASSERT_EQ(inputJsonMessage.getErrorCode(),
NO_ERROR) <<
printError(inputJsonMessage);
1426 ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty()) <<
printError(inputJsonMessage);
1430 std::string resultJson = fObj.Process(jsonStream.str());
1432 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
1433 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
1434 ASSERT_TRUE(fObj.getErrorMsg().empty());
1435 printSuccess(
"DRCEFunctionalObject Process");
1444 ASSERT_TRUE(resultJsonMessage.
getErrorMsg().empty());
1445 printSuccess(
"DRCEOutputJsonMessage unserialize");
1447 while (fObj.getAsyncTasksCount() || fObj.hasTask(startTaskId))
1449 Poco::Thread::sleep(5000);
1450 std::cout <<
"wait end task" <<
std::endl;
1452 Poco::Thread::sleep(5000);
1458 ASSERT_FALSE(taskRequestDeleteData.
isError());
1459 printSuccess(
"DRCETaskRequestDeleteData serialize");
1461 inputJsonMessage.clear();
1462 inputJsonMessage.setRequestType(DRCETaskRequest::RequestType::rtDeleteTaskData);
1463 inputJsonMessage.setRequestData(json);
1464 inputJsonMessage.setRequestId(startTaskId);
1466 std::stringstream sourceJsonStream;
1467 sourceJsonStream << inputJsonMessage;
1471 ASSERT_FALSE(inputJsonMessage.isError()) <<
printError(inputJsonMessage);
1472 ASSERT_EQ(inputJsonMessage.getErrorCode(),
NO_ERROR) <<
printError(inputJsonMessage);
1473 ASSERT_TRUE(inputJsonMessage.getErrorMsg().empty()) <<
printError(inputJsonMessage);
1474 printSuccess(
"DRCEInputJsonMessage serialize");
1476 resultJson = fObj.Process(sourceJsonStream.str());
1478 ASSERT_FALSE(fObj.isError()) <<
printError(fObj);
1479 ASSERT_EQ(fObj.getErrorCode(),
NO_ERROR);
1480 ASSERT_TRUE(fObj.getErrorMsg().empty());
1481 printSuccess(
"DRCEFunctionalObject Process");
1491 printSuccess(
"DRCEOutputJsonMessage unserialize");