HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
TasksExecutor.py
Go to the documentation of this file.
1 """@package docstring
2  @file TasksExecutor.py
3  @author Oleksii <developers.hce@gmail.com>
4  @link http://hierarchical-cluster-engine.com/
5  @copyright Copyright &copy; 2013 IOIX Ukraine
6  @license http://hierarchical-cluster-engine.com/license/
7  @package HCE project node API
8  @since 0.1
9 """
10 
11 
12 import logging
13 import time
14 
15 from app.BaseServerManager import BaseServerManager
16 
17 from EventObjects import ExecuteTask
18 from EventObjects import GetScheduledTasks
19 import dtm.Constants as DTM_CONSTS
20 import transport.Consts as TRANSPORT_CONSTS
21 
22 
23 #constant's section
24 CONFIG_SECTION = "TasksExecutor"
25 
26 
27 #Logger initialization
28 logger = logging.getLogger(DTM_CONSTS.LOGGER_NAME)
29 #logger = logging.getLogger("aspen")
30 
31 
36 
37  CONFIG_TIME_SLOT_PERIOD = "timeSlotPeriod"
38  STAT_SUSPEND_STATE = "suspendState"
39 
40 
43  def __del__(self):
44  pass
45 
46 
47 
50  def __init__(self, configParser, connectBuilderLight):
51  # create clients
52  super(TasksExecutor, self).__init__()
53 
54  # create clients
55  executionEnvironmentManager = configParser.get(CONFIG_SECTION, "ExecutionEnvironmentManager")
56  scheduler = configParser.get(CONFIG_SECTION, "Scheduler")
57 
58  # create client's connections
59  executionEnvironmentManagerConnection = connectBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
60  executionEnvironmentManager)
61  schedulerConnection = connectBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT, scheduler)
62 
63  # create client's names
64  self.executionEnvironmentManager = "executionEnvironmentManager"
65  self.scheduler = "scheduler"
66 
67  # create connects
68  self.addConnection(self.executionEnvironmentManager, executionEnvironmentManagerConnection)
69  self.addConnection(self.scheduler, schedulerConnection)
70 
71  # create event handlers
72  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.GET_SCHEDULED_TASKS_RESPONSE, self.onSchedulerRoute)
73 
74  # I don't know what it is
75  self.processEvents = dict()
76 
77  # get time slot period
78  self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] = configParser.getint(CONFIG_SECTION,
80 
81  # flag
82  self.isReqSended = True
83 
84  self.old = None
85 
86  self.statFields[self.STAT_SUSPEND_STATE] = False
87 
88 
89 
92  def onSchedulerRoute(self, event):
93  scheduledTasks = event.eventObj
94  for scheduledTask in scheduledTasks.ids:
95  executeTask = ExecuteTask(scheduledTask)
96  eem_event = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.EXECUTE_TASK, executeTask)
97  self.send(self.executionEnvironmentManager, eem_event)
98  # set flag to send requests to the scheduler
99  diff = time.clock() - self.old
100  delay = diff % self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME]
101  time.sleep(delay / 1000.0) # adjust delay in milliseconds
102  self.isReqSended = True
103 
104 
105 
107  def on_poll_timeout(self):
108  # request has not been sent yet
109  if not self.statFields[self.STAT_SUSPEND_STATE]:
110  if self.isReqSended:
111  self.old = time.clock()
112  getScheduledTasks = GetScheduledTasks(self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME])
113  scheduled_event = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.GET_SCHEDULED_TASKS, getScheduledTasks)
114  self.send(self.scheduler, scheduled_event)
115  self.isReqSended = False
116  # request already has not been sent
117  else:
118  pass
119 
120 
121 
125  def onAdminSuspend(self, event):
126  if event.eventObj is not None:
127  self.statFields[self.STAT_SUSPEND_STATE] = event.eventObj.isSuspend()
128  super(TasksExecutor, self).onAdminSuspend(event)
def on_poll_timeout(self)
function will call every time when ConnectionTimeout exception arrive
ExecuteTask event object, to set task to execute on EE.
def __del__(self)
destructor just in case
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
The Tasks Executor object Main job of this object is a selection of scheduled tasks from the schedule...
def onSchedulerRoute(self, event)
handler to route all event to TaksManager
GetScheduledTasks event object, to get tasks per time slot range from the Scheduler.
def send(self, connect_name, event)
send event
def onAdminSuspend(self, event)
onAdminState event handler process admin command
def __init__(self, configParser, connectBuilderLight)
constructor initialise all connections and event handlers