hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DRCEAsyncTasksQueue.hpp
Go to the documentation of this file.
1 
14 #ifndef DRCE_ASYNC_TASKS_QUEUE_HPP
15 #define DRCE_ASYNC_TASKS_QUEUE_HPP
16 
17 #include <map>
18 #include <list>
19 #include <Poco/SharedPtr.h>
20 #include <Poco/Observer.h>
21 #include <Poco/Task.h>
22 #include <Poco/Thread.h>
23 #include <Poco/Mutex.h>
24 #include <Poco/ThreadPool.h>
25 #include <Poco/TaskManager.h>
26 #include <Poco/TaskNotification.h>
27 
28 #include "DRCETaskRequest.hpp"
29 #include "DRCENodeOptions.hpp"
31 #include "CustomMessage.hpp"
33 #include "DRCEResourceLimits.hpp"
34 
35 namespace HCE
36 {
37 namespace drce
38 {
39 class DRCEAsyncTasksQueue;
40 class DRCEResultDataItem;
41 class DRCEResultData;
42 class DRCECommonTask;
43 class DRCETasksQueue;
44 //-----------------------------------------------------------------------------
46 {
47 public:
48  ProgressHandler(DRCEAsyncTasksQueue& asyncTasksQueue_) : asyncTasksQueue(asyncTasksQueue_) {};
49 
50  void onStarted(Poco::TaskStartedNotification* pNf);
51  void onFinished(Poco::TaskFinishedNotification* pNf);
52  void onCancelled(Poco::TaskCancelledNotification* pNf);
53 private:
54  static void log(const std::string& msg);
55 
56  DRCEAsyncTasksQueue& asyncTasksQueue;
57 };
58 //-----------------------------------------------------------------------------
59 //-----------------------------------------------------------------------------
60 struct AsyncTasks
61 {
62 public:
63  AsyncTasks(void);
64  AsyncTasks(unsigned int taskId_,
65  unsigned int parentTaskId_,
66  pid_t pid_,
67  unsigned int timeMax_,
68  size_t timeStart_,
69  size_t timeElapsed_,
71  AsyncTasks(const AsyncTasks& rhs);
72  AsyncTasks(AsyncTasks&& rhs);
73 
74  AsyncTasks& operator=(const AsyncTasks& rhs);
76 
77  void clear(void);
78 
79  unsigned int taskId; // task ID
80  unsigned int parentTaskId; // parent task ID
81  unsigned int pid; // task process ID
82  unsigned int timeMax; // max time allowed for execution
83  size_t timeStart; // time start of task for execution
84  size_t timeElapsed; // time elapsed for task
86  bool locked; // flag for locked access to structure on disk
87  ResourceUsageLimits usageLimits; // resource limits during execution of task
88 
89  friend std::ostream& operator<<(std::ostream& os, const AsyncTasks& rhs);
90  friend std::istream& operator>>(std::istream& is, AsyncTasks& rhs);
91 };
92 //-----------------------------------------------------------------------------
93 //-----------------------------------------------------------------------------
94 class DRCEAsyncTasksQueue : Poco::Runnable
95 {
96  enum {DEFAULT_MAX_CAPACITY=16};
97  enum {DEFAULT_QUEUE_DUMP_PERIOD=10000};
98 public:
99  typedef std::map<unsigned int, AsyncTasks> Tasks;
100  typedef std::map<unsigned int, std::map<unsigned int, bool> > SubTasks;
102 
103  DRCEAsyncTasksQueue(DRCENodeOptions& nodeOptions_,
104  DRCENotificationExecutor& notificationExecutor_,
105  CustomMessage& message_);
106  ~DRCEAsyncTasksQueue(void);
107 
108  void cancelAll(void);
109  void joinAll(void);
110  void startTask(Poco::Task* pTask) throw (Poco::Exception);
111  bool hasTask(unsigned int taskId);
112  void cancelTask(unsigned int taskId);
113 
114  void setMaxThreadCount(unsigned int threadCount);
115  unsigned int getMaxThreadCount(void) const;
116 
117  void makeCanceledTasks(void);
118  void checkExpiredTime(void);
119  void checkUsageResources(void);
120 
121  AsyncTasks getAsyncTask(unsigned int taskId);
122  void setAsyncTask(const AsyncTasks& asyncTask);
123  void setAsyncTask(AsyncTasks&& asyncTask);
124  size_t getAsyncTasksCount(void);
125  bool isExistAsyncTask(unsigned int taskId);
126 
127  bool removeAsyncTask(unsigned int taskId);
128 
129  void lockAsyncTask(unsigned int taskId);
130  void lockAsyncTask(const AsyncTasks& asyncTask);
131  void unlockAsyncTask(unsigned int taskId);
132  void unlockAsyncTask(const AsyncTasks& asyncTask);
133  bool isLockedAsyncTask(unsigned int taskId);
134 
135  void run(void);
136 
137  void terminate(void);
138  bool isTerminated(void);
139 
140  void changeState(AsyncTasks&& asyncTask, DRCEResultDataItem* pResultDataItem=nullptr,
141  const std::string& errorMessage="", unsigned int errorCode=0) throw (Poco::Exception);
142 
143  void safeChangeState(AsyncTasks&& asyncTask, DRCEResultDataItem* pResultDataItem=nullptr,
144  const std::string& errorMessage="", unsigned int errorCode=0);
145 
146  void setTasksQueueDumpPeriod(unsigned int tasksQueueDumpPeriod_); // period in msec
147  unsigned int getTasksQueueDumpPeriod(void);
148 
149  void loadTasksQueue(void);
150  void saveTasksQueue(void);
151 
152  void executeNotification(const std::string& json);
153  void executeNotification(const DRCEResultData& resultData, DRCETaskRequest::RequestType requestType);
154 
155  void setResetErrorCodeStateNotification(bool resetErrorCodeStateNotification_) {resetErrorCodeStateNotification=resetErrorCodeStateNotification_;}
156  bool getResetErrorCodeStateNotification(void) const {return resetErrorCodeStateNotification;}
157 
159  void startCleanupTask(unsigned int taskId);
160 
162  std::string getCurrentTasksQueueAsString(void);
163 
164  void writeToTasksLog(const std::string& delimiter, const std::string& taskStatus, const AsyncTasks& asyncTask);
165 
166  AsyncTasks& getSyncTasks(void) {return syncTask;}
167 
168  void addSubtask(unsigned int taskId, unsigned int subtaskId, bool needCleanup);
169  void setSubtaskCleanup(unsigned int subtaskId, bool needCleanup);
170  bool getSubtaskCleanup(unsigned int subtaskId);
171  void removeSubtasks(unsigned int taskId);
172  void setAllSubtaskCleanup(unsigned int taskId);
173  SubTasks::const_iterator getSubtasksBegin(void);
174  SubTasks::const_iterator getSubtasksEnd(void);
175  std::map<unsigned int, bool> getSubtasks(unsigned int taskId);
176 private:
177  unsigned int maxThreadCount;
178  Poco::ThreadPool threadPool;
179  Poco::TaskManager taskManager;
180  ProgressHandler progressHandler;
181  Poco::Thread thread;
182  DRCENodeOptions& nodeOptions;
183  DRCENotificationExecutor& notificationExecutor;
184  CustomMessage& message;
185 
186  Tasks tasks;
187  SubTasks subtasks;
188  Poco::Mutex mutex;
189  volatile bool terminated;
190  unsigned int tasksQueueDumpPeriod;
191 
192  Poco::TaskManager cleanupTaskManager;
193  SyncTasks syncTask; // task data use in sync mode
194 
195  bool resetErrorCodeStateNotification;
196 private:
197  void setSyncTaskAsTerminated(const std::string& errorMessage, unsigned int errorCode);
198 };
199 //-----------------------------------------------------------------------------
200 //-----------------------------------------------------------------------------
202 {
203 public:
204  AsyncTaskLocker(DRCEAsyncTasksQueue& asyncTasksQueue_, unsigned int taskId_);
205  AsyncTaskLocker(DRCEAsyncTasksQueue& asyncTasksQueue_, const AsyncTasks& asyncTask);
206  ~AsyncTaskLocker(void);
207 private:
208  DRCEAsyncTasksQueue& asyncTasksQueue;
209  unsigned taskId;
210 };
211 //-----------------------------------------------------------------------------
212 //-----------------------------------------------------------------------------
213 } // end namespace drce
214 } // end namespace HCE
215 
216 #endif // DRCE_ASYNC_TASKS_QUEUE_HPP