HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
dtm.Scheduler.Scheduler Class Reference

The Scheduler object implements algorithms of tasks scheduling. More...

Inheritance diagram for dtm.Scheduler.Scheduler:
Collaboration diagram for dtm.Scheduler.Scheduler:

Public Member Functions

def __init__ (self, configParser, connectBuilderLight, pollerManager=None)
 constructor initialise all connections and event handlers More...
 
def createDBIDict (self, configParser)
 create dict config (dict object) More...
 
def onNewTask (self, event)
 onNewTask event handler More...
 
def planTaskTimeToRun (self, task)
 
def onUpdateTask (self, event)
 onUpdateTask event handler More...
 
def onDeleteTask (self, event)
 onDeleteTask event handler More...
 
def onGetSheduledTasks (self, event)
 onGetSheduledTasks event handler More...
 
def onAVGResourcesResponse (self, event)
 onAVGResourcesResponse event handler More...
 
def onUpdateTaskFieldsResponse (self, event)
 method handler for DeleteTskResponse event More...
 
def taskUpdateProcess (self)
 method start task delete process More...
 
def rTimeCalc (self, schedulerTask, task_strategy, replanned)
 method recalculates new value of rTime field More...
 
def addPendingEvent (self, event)
 add pending event in all auxiliary structures More...
 
def deletePendingEvent (self, event)
 delete pending event from all auxiliary structures More...
 
def stateRecalculate (self, task_strategy, schedulerTask)
 recalcutes state field More...
 
def sendOperationProcessingError (self, event)
 send operation processing error More...
 
def deleteTaskFromSchedule (self, deleteTask, schedulerTask=None)
 delete task from schedule More...
 
def reschedulingTasks (self)
 rescheduling schedule after all changes in schedule More...
 
def modifyTaskInSchedule (self, task)
 add task in schedule More...
 
def checkCorrectTaskType (self, task)
 check that logic of requred operation corresponds with schedule state prevent wrong operation(update absent task, insert the same task) More...
 
def isPossibleToRun (self, resourcesAVG, task_strategy)
 Check is task possible to run by comparison of required limits and actual resources. More...
 
def getTimeSinceEpoch (self, date=None)
 get time since epoch in millisec More...
 
def getPlannedRunTime (self, date_string)
 get planned run time More...
 
- Public Member Functions inherited from app.BaseServerManager.BaseServerManager
def __init__ (self, poller_manager=None, admin_connection=None, conectionLightBuilder=None, exceptionForward=False, dumpStatVars=True)
 constructor More...
 
def addConnection (self, name, connection)
 
def setEventHandler (self, eventType, handler)
 set event handler rewrite the current handler for eventType More...
 
def send (self, connect_name, event)
 send event More...
 
def reply (self, event, reply_event)
 wrapper for sending event in reply for event More...
 
def poll (self)
 poll function polling connections receive as multipart msg, the second argument is pickled pyobj More...
 
def process (self, event)
 process event call the event handler method that was set by user or on_unhandled_event method if not set More...
 
def run (self)
 
def is_connection_registered (self, name)
 check is a connection was registered in a instance of BaseServerManager i object More...
 
def on_poll_timeout (self)
 function will call every time when ConnectionTimeout exception arrive More...
 
def on_unhandled_event (self, event)
 function will call every time when arrive doesn't set handler for event type of event.evenType More...
 
def build_poller_list (self)
 
def clear_poller (self)
 
def onAdminState (self, event)
 onAdminState event handler process admin SHUTDOWN command More...
 
def onAdminFetchStatData (self, event)
 onAdminState event handler process admin command More...
 
def onAdminSuspend (self, event)
 onAdminState event handler process admin command More...
 
def getStatDataFields (self, fields)
 getStatDataFields returns stat data from storage More...
 
def getSystemStat (self)
 getSystemStat returns stat data for system indicators: RAMV, RAMR and CPU More...
 
def getConfigVarsFields (self, fields)
 getConfigVarsFields returns config vars from storage More...
 
def onAdminGetConfigVars (self, event)
 onAdminGetConfigVars event handler process getConfigVars admin command, fill and return config vars array from internal storage More...
 
def onAdminSetConfigVars (self, event)
 onAdminSetConfigVars event handler process setConfigVars admin command More...
 
def setConfigVars (self, setConfigVars)
 processSetConfigVars sets config vars in storage More...
 
def sendAdminReadyEvent (self)
 send ready event to notify adminInterfaceService More...
 
def createLogMsg (self, event)
 from string message from event object More...
 
def initStatFields (self, connect_name)
 add record in statFields More...
 
def updateStatField (self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
 update values of stat field - default sum More...
 
def processSpecialConfigVars (self, name, value)
 send ready event to notify adminInterfaceService More...
 
def getLogLevel (self)
 Get log level from first of existing loggers. More...
 
def setLogLevel (self, level)
 Set log level for all loggers. More...
 
def saveStatVarsDump (self)
 Save stat vars in json file. More...
 
def loadStatVarsDump (self)
 Load stat vars in json file. More...
 
def getStatVarsDumpFileName (self)
 Get stat vars file name. More...
 
def createDBIDict (self, configParser)
 

Public Attributes

 cfg_section
 
 tasksToUpdate
 
 maxTasks
 
 timeSlot
 value of the timeSlot used in scheduling tasks More...
 
 waitResourcesEvents
 
 waitResourcesTasks
 
 dbi
 db contains schedule table More...
 
- Public Attributes inherited from app.BaseServerManager.BaseServerManager
 dumpStatVars
 
 poller_manager
 
 eventBuilder
 
 exit_flag
 
 pollTimeout
 
 connections
 
 event_handlers
 
 statFields
 stat fields container More...
 
 configVars
 
 exceptionForward
 

Static Public Attributes

int OPERATION_ERR = 1024
 
string OPERATION_ERR_MSG = "Previous task operations is not finished"
 
int RESOURCES_EXCEED_ERR = 1025
 
string RESOURCES_EXCEED_ERR_MSG = "Resources are exceed"
 
string SERVER = "server"
 
string RESOURCES_MANAGER_CLIENT = "clientResourcesManager"
 
string CLIENT_INTERFACE_SERVICE_CLIENT = "clientClientInterfaceService"
 
string TIME_SLOT_PERIOD = "timeSlotPeriod"
 
string MAX_TASKS = "maxTasksPerTimeSlot"
 
- Static Public Attributes inherited from app.BaseServerManager.BaseServerManager
string ADMIN_CONNECT_ENDPOINT = "Admin"
 
string ADMIN_CONNECT_CLIENT = "Admin"
 
int POLL_TIMEOUT_DEFAULT = 3000
 
int STAT_FIELDS_OPERATION_ADD = 0
 
int STAT_FIELDS_OPERATION_SUB = 1
 
int STAT_FIELDS_OPERATION_SET = 2
 
int STAT_FIELDS_OPERATION_INIT = 3
 
string POLL_TIMEOUT_CONFIG_VAR_NAME = "POLL_TIMEOUT"
 
string LOG_LEVEL_CONFIG_VAR_NAME = "LOG_LEVEL"
 
string STAT_DUMPS_DEFAULT_DIR = "/tmp/"
 
string STAT_DUMPS_DEFAULT_NAME = "%APP_NAME%_%CLASS_NAME%_stat_vars.dump"
 
dictionary LOGGERS_NAMES = {APP_CONSTS.LOGGER_NAME, "dc", "dtm", "root", ""}
 

Detailed Description

The Scheduler object implements algorithms of tasks scheduling.

Definition at line 52 of file Scheduler.py.

Constructor & Destructor Documentation

◆ __init__()

def dtm.Scheduler.Scheduler.__init__ (   self,
  configParser,
  connectBuilderLight,
  pollerManager = None 
)

constructor initialise all connections and event handlers

Definition at line 73 of file Scheduler.py.

73  def __init__(self, configParser, connectBuilderLight, pollerManager=None):
74  super(Scheduler, self).__init__(pollerManager)
75 
76  self.cfg_section = self.__class__.__name__
77  self.tasksToUpdate = list()
78 
79  serverAddr = configParser.get(self.cfg_section, self.SERVER)
80  resourcesManagerAddr = configParser.get(self.cfg_section, self.RESOURCES_MANAGER_CLIENT)
81  clientInterfaceServiceAddr = configParser.get(self.cfg_section, self.CLIENT_INTERFACE_SERVICE_CLIENT)
82  self.maxTasks = int(configParser.get(self.cfg_section, self.MAX_TASKS))
83 
84 
86  self.timeSlot = int(configParser.get(self.cfg_section, self.TIME_SLOT_PERIOD))
87 
88  serverConnection = connectBuilderLight.build(consts.SERVER_CONNECT, serverAddr)
89  resourcesManagerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, resourcesManagerAddr)
90  clientInterfaceServiceConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, clientInterfaceServiceAddr,
91  real_connect=False)
92 
93  self.addConnection(self.SERVER, serverConnection)
94  self.addConnection(self.RESOURCES_MANAGER_CLIENT, resourcesManagerConnection)
95  self.addConnection(self.CLIENT_INTERFACE_SERVICE_CLIENT, clientInterfaceServiceConnection)
96 
97  #TasksManager events
98  self.setEventHandler(EVENT_TYPES.SCHEDULE_TASK, self.onNewTask)
99  self.setEventHandler(EVENT_TYPES.UPDATE_TASK, self.onUpdateTask)
100  self.setEventHandler(EVENT_TYPES.DELETE_TASK, self.onDeleteTask)
101  self.setEventHandler(EVENT_TYPES.GET_SCHEDULED_TASKS, self.onGetSheduledTasks)
102  self.setEventHandler(EVENT_TYPES.UPDATE_TASK_FIELDS_RESPONSE, self.onUpdateTaskFieldsResponse)
103 
104  #ResourcesManager
105  self.setEventHandler(EVENT_TYPES.GET_AVG_RESOURCES_RESPONSE, self.onAVGResourcesResponse)
106 
107 
110  self.waitResourcesEvents = dict()
111 
112 
115  self.waitResourcesTasks = dict()
116 
117 
119  self.dbi = DBI(self.createDBIDict(configParser))
120 
121  isClearOnStart = configParser.get(self.cfg_section, DTM_CONSTS.CLEAR_ON_START)
122  if isClearOnStart == "True":
123  schedulerTask = SchedulerTask()
124  schedulerTask.state = ScheduledTask.STATE_CLOSED
125  try:
126  self.dbi.update(SchedulerTaskScheme(schedulerTask), "id=id")
127  except DBIErr as err:
128  logger.error(">>> Some DBI error in Scheduler.__init__ [" + str(err.message) + "]")
129 
130 
def __init__(self)
constructor
Definition: UIDGenerator.py:19

Member Function Documentation

◆ addPendingEvent()

def dtm.Scheduler.Scheduler.addPendingEvent (   self,
  event 
)

add pending event in all auxiliary structures

Parameters
eventinstance of Event object

Definition at line 343 of file Scheduler.py.

343  def addPendingEvent(self, event):
344  #task = event.eventObj
345  #self.waitResourcesTasks[task.id] = True
346  self.waitResourcesEvents[event.uid] = event
347 
348 
Here is the caller graph for this function:

◆ checkCorrectTaskType()

def dtm.Scheduler.Scheduler.checkCorrectTaskType (   self,
  task 
)

check that logic of requred operation corresponds with schedule state prevent wrong operation(update absent task, insert the same task)

Parameters
taskinstance of Task inheritor

Definition at line 423 of file Scheduler.py.

423  def checkCorrectTaskType(self, task):
424  result = self.dbi.fetch(SchedulerTaskScheme(SchedulerTask()), "id = %s" % task.id)
425  if isinstance(task, (NewTask, DeleteTask)):
426  if len(result) > 0:
427  raise LogicErr(LogicErr.ERR_CODE, "Task is already in schedule")
428  else:
429  if len(result) == 0:
430  raise LogicErr(LogicErr.ERR_CODE, "Task is wrong type:" + str(type(task)))
431 
432 
Here is the caller graph for this function:

◆ createDBIDict()

def dtm.Scheduler.Scheduler.createDBIDict (   self,
  configParser 
)

create dict config (dict object)

Definition at line 133 of file Scheduler.py.

133  def createDBIDict(self, configParser):
134  #get section
135  return dict(configParser.items(DTM_CONSTS.DB_CONFIG_SECTION))
136 
137 

◆ deletePendingEvent()

def dtm.Scheduler.Scheduler.deletePendingEvent (   self,
  event 
)

delete pending event from all auxiliary structures

Parameters
eventinstance of Event object

Definition at line 352 of file Scheduler.py.

352  def deletePendingEvent(self, event):
353  #taskId = self.waitResourcesEvents[event.uid].eventObj.id
354  #del self.waitResourcesTasks[taskId]
355  del self.waitResourcesEvents[event.uid]
356 
357 
Here is the caller graph for this function:

◆ deleteTaskFromSchedule()

def dtm.Scheduler.Scheduler.deleteTaskFromSchedule (   self,
  deleteTask,
  schedulerTask = None 
)

delete task from schedule

Parameters
deleteTaskinstance of DeleteTask object

Definition at line 381 of file Scheduler.py.

381  def deleteTaskFromSchedule(self, deleteTask, schedulerTask=None):
382  if schedulerTask == None:
383  schedulerTask = SchedulerTask()
384  schedulerTask.id = deleteTask.id
385  self.dbi.delete(SchedulerTaskScheme(schedulerTask), "id=%s" % deleteTask.id)
386 
387 
Here is the caller graph for this function:

◆ getPlannedRunTime()

def dtm.Scheduler.Scheduler.getPlannedRunTime (   self,
  date_string 
)

get planned run time

Parameters
date_stringsting contains data in strict format
Returns
numeric

Definition at line 482 of file Scheduler.py.

482  def getPlannedRunTime(self, date_string):
483  planed_time = datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S,%f")
484  return self.getTimeSinceEpoch(planed_time)
485 
486 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ getTimeSinceEpoch()

def dtm.Scheduler.Scheduler.getTimeSinceEpoch (   self,
  date = None 
)

get time since epoch in millisec

Returns
numeric

Definition at line 470 of file Scheduler.py.

470  def getTimeSinceEpoch(self, date=None):
471  if date is None:
472  date = datetime.datetime.now()
473  epoch = datetime.datetime.utcfromtimestamp(0) #@todo move to init
474  delta = date - epoch #datetime.datetime.now() - epoch
475  return int(((delta.days * 24 * 60 * 60 + delta.seconds) * 1000 + delta.microseconds / 1000.0))
476 
477 
Here is the caller graph for this function:

◆ isPossibleToRun()

def dtm.Scheduler.Scheduler.isPossibleToRun (   self,
  resourcesAVG,
  task_strategy 
)

Check is task possible to run by comparison of required limits and actual resources.

Parameters
resourcesAVGinstance of ResourcesAVG object
taskinstance of Task object
Returns
True if actual resource >= required

Definition at line 438 of file Scheduler.py.

438  def isPossibleToRun(self, resourcesAVG, task_strategy):
439  ret = True
440  if ret and "CPU" in task_strategy and resourcesAVG.cpuCores > 0:
441  if (resourcesAVG.threads / resourcesAVG.cpuCores) <= task_strategy["CPU"]:
442  logger.debug("CPU limit %s %s", str(resourcesAVG.threads / resourcesAVG.cpuCores),
443  str(task_strategy["CPU"]))
444  ret = False
445 
446  if ret and "CPU_LOAD_MAX" in task_strategy:
447  if resourcesAVG.cpu >= task_strategy["CPU_LOAD_MAX"]:
448  logger.debug("CPU_LOAD_MAX limit %s %s", str(resourcesAVG.cpu), str(task_strategy["CPU_LOAD_MAX"]))
449  ret = False
450 
451  if ret and "IO_WAIT_MAX" in task_strategy:
452  if resourcesAVG.io > task_strategy["IO_WAIT_MAX"]:
453  logger.debug("IO_WAIT_MAX limit %s %s", str(resourcesAVG.io), str(task_strategy["IO_WAIT_MAX"]))
454  ret = False
455 
456  if ret and "RAM_FREE" in task_strategy:
457  #logger.debug("FREE RAM: %s, LIMIT %s", str(resourcesAVG.ramR - resourcesAVG.ramRU), str(task_strategy["RAM_FREE"]))
458  if resourcesAVG.ramR > 0 and resourcesAVG.ramRU > 0 and \
459  (resourcesAVG.ramR - resourcesAVG.ramRU) < task_strategy["RAM_FREE"]:
460  logger.debug("RAM_FREE limit %s < %s", str(resourcesAVG.ramR - resourcesAVG.ramRU),
461  str(task_strategy["RAM_FREE"]))
462  ret = False
463 
464  return ret
465 
466 
Here is the caller graph for this function:

◆ modifyTaskInSchedule()

def dtm.Scheduler.Scheduler.modifyTaskInSchedule (   self,
  task 
)

add task in schedule

Parameters
taskinstance of Task object

Definition at line 398 of file Scheduler.py.

398  def modifyTaskInSchedule(self, task):
399  schedulerTask = SchedulerTask()
400  schedulerTask.id = task.id
401  schedulerTask.state = ScheduledTask.STATE_PLANNED
402  schedulerTask.strategy = pickle.dumps(task.strategy)
403 
404  if hasattr(task, "rtime"): #@todo only for tests
405  schedulerTask.rTime = task.rtime
406  else:
407  schedulerTask.rTime = self.rTimeCalc(schedulerTask, task.strategy, False)
408 
409  priority = 0
410  if Task.STRATEGY_PRIORITY in task.strategy:
411  priority = task.strategy[Task.STRATEGY_PRIORITY]
412  schedulerTask.priority = priority
413  if isinstance(task, (NewTask, DeleteTask)):
414  self.dbi.insert(SchedulerTaskScheme(schedulerTask))
415  else:
416  self.dbi.update(SchedulerTaskScheme(schedulerTask), "id = %s" % schedulerTask.id)
417 
418 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ onAVGResourcesResponse()

def dtm.Scheduler.Scheduler.onAVGResourcesResponse (   self,
  event 
)

onAVGResourcesResponse event handler

Parameters
eventinstance of Event object

Definition at line 230 of file Scheduler.py.

230  def onAVGResourcesResponse(self, event):
231  #get response on event sending in onGetSheduledTasks
232  resourcesAVG = event.eventObj
233  if event.uid in self.waitResourcesEvents:
234  getScheduledTasks = self.waitResourcesEvents[event.uid].eventObj
235  ids = list()
236  try:
237  #@todo add correct select condition
238  scheduledTimeSlot = getScheduledTasks.timeSlot
239  #get right border of current time slot
240  curTime = self.getTimeSinceEpoch(datetime.datetime.now())
241  rightBorderMs = int(math.floor(curTime / scheduledTimeSlot)) * scheduledTimeSlot + scheduledTimeSlot
242  clause = "SELECT * FROM scheduler_task_scheme WHERE rTime < " + str(rightBorderMs) + " AND state=" + \
243  str(ScheduledTask.STATE_PLANNED) + " ORDER BY rTime ASC, priority DESC, tries DESC"
244  if self.maxTasks > 0:
245  clause = clause + " LIMIT " + str(self.maxTasks)
246  scheme = SchedulerTaskScheme(SchedulerTask())
247  taskSchemes = self.dbi.sql(scheme, clause)
248  if hasattr(taskSchemes, '__iter__'):
249  for taskScheme in taskSchemes:
250  # pylint: disable-msg=W0212
251  schedulerTask = taskScheme._getSchedulerTask()
252  if schedulerTask.rTime > 0:
253  rt = datetime.datetime.utcfromtimestamp(schedulerTask.rTime / 1000)
254  rb = datetime.datetime.utcfromtimestamp(rightBorderMs / 1000)
255  ct = datetime.datetime.utcfromtimestamp(curTime / 1000)
256  logger.debug("Task " + str(schedulerTask.id) + " selected by rTime=" + str(rt) + \
257  ", rborder=" + str(rb) + ", now=" + str(ct))
258  task_strategy = pickle.loads(str(schedulerTask.strategy))
259  if self.isPossibleToRun(resourcesAVG, task_strategy):
260  ids.append(schedulerTask.id)
261  schedulerTask.state = ScheduledTask.STATE_INPROGRESS
262  self.dbi.update(SchedulerTaskScheme(schedulerTask), "id=%s" % str(schedulerTask.id))
263  else:
264  logger.debug("Run isn't possible id=%s tries=%s", str(schedulerTask.id), str(schedulerTask.tries))
265  schedulerTask.tries += 1
266  schedulerTask.state = self.stateRecalculate(task_strategy, schedulerTask)
267  if schedulerTask.state == ScheduledTask.STATE_CLOSED or \
268  (Task.STRATEGY_SDELAY in task_strategy and task_strategy[Task.STRATEGY_RDELAY] == 0):
269  self.tasksToUpdate.append(schedulerTask)
270  else:
271  schedulerTask.rTime = self.rTimeCalc(schedulerTask, task_strategy, True)
272  self.dbi.update(SchedulerTaskScheme(schedulerTask), "id=%s" % str(schedulerTask.id))
273  #@todo maybe need some state for task isn't selected on execution
274  if self.maxTasks > 0 and len(taskSchemes) == self.maxTasks:
275  logger.debug(">>> Scheduled tasks limit reached LIMIT = " + str(self.maxTasks))
276  except SQLAlchemyError as err:
277  logger.critical(str(err.message))
278  row_dbi.db.session.rollback() # pylint: disable-all
279  except DBIErr as err:
280  logger.critical(">>> Some DBI error in Scheduler.onAVGResourcesResponse [" + str(err.message) + "]")
281 
282  getScheduledTasksResponse = GetScheduledTasksResponse(ids)
283  responseEvent = self.eventBuilder.build(EVENT_TYPES.GET_SCHEDULED_TASKS_RESPONSE , getScheduledTasksResponse)
284  self.reply(self.waitResourcesEvents[event.uid], responseEvent)
285  self.deletePendingEvent(event)
286  self.taskUpdateProcess()
287  else:
288  logger.error("get resourceAVG for non exist event " + str(event.uid))
289 
290 
Here is the call graph for this function:

◆ onDeleteTask()

def dtm.Scheduler.Scheduler.onDeleteTask (   self,
  event 
)

onDeleteTask event handler

Parameters
eventinstance of Event object

Definition at line 198 of file Scheduler.py.

198  def onDeleteTask(self, event):
199  deleteTask = event.eventObj
200  if deleteTask.id not in self.waitResourcesTasks:
201  try:
202  response = GeneralResponse()
203  self.deleteTaskFromSchedule(deleteTask)
204  self.reschedulingTasks()
205  except DBIErr as err:
206  logger.error(">>> Some DBI error in Scheduler.__init__ [" + str(err.message) + "]")
207  response = GeneralResponse(err.errCode, err.message)
208  finally:
209  responseEvent = self.eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK_RESPONSE , response)
210  self.reply(event, responseEvent)
211  else:
212  self.sendOperationProcessingError(event)
213 
214 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ onGetSheduledTasks()

def dtm.Scheduler.Scheduler.onGetSheduledTasks (   self,
  event 
)

onGetSheduledTasks event handler

Parameters
eventinstance of Event object

Definition at line 218 of file Scheduler.py.

218  def onGetSheduledTasks(self, event):
219  #get the current resources state
220  self.addPendingEvent(event)
221  getAVGResourcesEvent = self.eventBuilder.build(EVENT_TYPES.GET_AVG_RESOURCES, None)
222 
223  getAVGResourcesEvent.uid = event.uid
224  self.send(self.RESOURCES_MANAGER_CLIENT, getAVGResourcesEvent)
225 
226 
Here is the call graph for this function:

◆ onNewTask()

def dtm.Scheduler.Scheduler.onNewTask (   self,
  event 
)

onNewTask event handler

Parameters
eventinstance of Event object

Definition at line 141 of file Scheduler.py.

141  def onNewTask(self, event):
142  task = event.eventObj
143  try:
144  response = GeneralResponse()
145  if "DATE" in task.session:
146  response = self.planTaskTimeToRun(task)
147  else:
148  self.checkCorrectTaskType(task)
149  self.modifyTaskInSchedule(task)
150  self.reschedulingTasks()
151 
152  except (DBIErr, LogicErr) as err:
153  logger.error(">>> Some DBI error in Scheduler.onNewTask [" + str(err.message) + "]")
154  response = GeneralResponse(err.errCode, err.message)
155  finally:
156  responseEvent = self.eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK_RESPONSE , response)
157  self.reply(event, responseEvent)
158 
159 
160 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ onUpdateTask()

def dtm.Scheduler.Scheduler.onUpdateTask (   self,
  event 
)

onUpdateTask event handler

Parameters
eventinstance of Event object

Definition at line 187 of file Scheduler.py.

187  def onUpdateTask(self, event):
188  updateTask = event.eventObj
189  if updateTask.id not in self.waitResourcesTasks:
190  self.onNewTask(event)
191  else:
192  self.sendOperationProcessingError(event)
193 
194 
Here is the call graph for this function:

◆ onUpdateTaskFieldsResponse()

def dtm.Scheduler.Scheduler.onUpdateTaskFieldsResponse (   self,
  event 
)

method handler for DeleteTskResponse event

Definition at line 293 of file Scheduler.py.

293  def onUpdateTaskFieldsResponse(self, event): # pylint: disable=W0613
294  logger.debug("Task send update response, tasks len = " + str(len(self.tasksToUpdate)))
295  if len(self.tasksToUpdate) > 0:
296  del self.tasksToUpdate[0]
297  self.taskUpdateProcess()
298 
299 
Here is the call graph for this function:

◆ planTaskTimeToRun()

def dtm.Scheduler.Scheduler.planTaskTimeToRun (   self,
  task 
)

Definition at line 162 of file Scheduler.py.

162  def planTaskTimeToRun(self, task):
163  if "DATE" in task.session: #@todo only for test
164  date_string = task.session["DATE"]
165  plannedTime = self.getPlannedRunTime(date_string)
166  leftBorderMs = int(math.floor(plannedTime / self.timeSlot)) * self.timeSlot
167  rightBorderMs = leftBorderMs + self.timeSlot
168  try:
169  scheme = SchedulerTaskScheme(SchedulerTask())
170  taskInSlotNumber = len(row_dbi.db.session.query(type(scheme)).filter(type(scheme).rTime <= rightBorderMs).\
171  filter(type(scheme).rTime >= leftBorderMs).filter_by(state=ScheduledTask.STATE_PLANNED).all())
172  logger.debug("taskInSlotNumber=" + str(taskInSlotNumber))
173 
174  #@todo add more complex condition for planning
175  task.rtime = plannedTime
176  self.modifyTaskInSchedule(task)
177 
178  return GeneralResponse()
179 
180  except DBIErr as err:
181  logger.error(">>> Some DBI error in Scheduler.planTaskTimeToRun [" + str(err.message) + "]")
182 
183 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ reschedulingTasks()

def dtm.Scheduler.Scheduler.reschedulingTasks (   self)

rescheduling schedule after all changes in schedule

Definition at line 390 of file Scheduler.py.

390  def reschedulingTasks(self):
391  #@todo implement strategy
392  pass
393 
394 
Here is the caller graph for this function:

◆ rTimeCalc()

def dtm.Scheduler.Scheduler.rTimeCalc (   self,
  schedulerTask,
  task_strategy,
  replanned 
)

method recalculates new value of rTime field

Parameters
schedulerTaskincoming schedulerTask
task_strategyincoming strategy dict

Definition at line 316 of file Scheduler.py.

316  def rTimeCalc(self, schedulerTask, task_strategy, replanned):
317  logger.debug("Old rTime=" + str(datetime.datetime.utcfromtimestamp(schedulerTask.rTime / 1000)))
318 
319  # Get NOW time
320  ret = self.getTimeSinceEpoch()
321  #logger.debug("Now time=" + str(datetime.datetime.utcfromtimestamp(ret / 1000)))
322  if replanned:
323  if Task.STRATEGY_RDELAY in task_strategy and int(task_strategy[Task.STRATEGY_RDELAY]) > 0:
324  ret += int(task_strategy[Task.STRATEGY_RDELAY])
325  logger.debug("New (RDELAY) rTime=" + str(datetime.datetime.utcfromtimestamp(ret / 1000)))
326  else:
327  if Task.STRATEGY_SDELAY in task_strategy and int(task_strategy[Task.STRATEGY_SDELAY]) > 0:
328  ret += int(task_strategy[Task.STRATEGY_SDELAY])
329  logger.debug("New (SDELAY) rTime=" + str(datetime.datetime.utcfromtimestamp(ret / 1000)))
330 
331  '''
332  if Task.STRATEGY_RDELAY in task_strategy and int(task_strategy[Task.STRATEGY_RDELAY]) > 0:
333  ret += int(task_strategy[Task.STRATEGY_RDELAY])
334  logger.debug("New rTime=" + str(datetime.datetime.utcfromtimestamp(ret / 1000)))
335  '''
336 
337  return ret
338 
339 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ sendOperationProcessingError()

def dtm.Scheduler.Scheduler.sendOperationProcessingError (   self,
  event 
)

send operation processing error

Parameters
eventinstance of Event object

Definition at line 372 of file Scheduler.py.

372  def sendOperationProcessingError(self, event):
373  response = GeneralResponse(self.OPERATION_ERR, self.OPERATION_ERR_MSG)
374  responseEvent = self.eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK_RESPONSE , response)
375  self.reply(event, responseEvent)
376 
377 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ stateRecalculate()

def dtm.Scheduler.Scheduler.stateRecalculate (   self,
  task_strategy,
  schedulerTask 
)

recalcutes state field

Parameters
task_strategytask's strategies
schedulerTasktask object

Definition at line 362 of file Scheduler.py.

362  def stateRecalculate(self, task_strategy, schedulerTask):
363  ret = schedulerTask.state
364  if Task.STRATEGY_RETRY in task_strategy and task_strategy[Task.STRATEGY_RETRY] <= schedulerTask.tries:
365  ret = ScheduledTask.STATE_CLOSED
366  return ret
367 
368 
Here is the caller graph for this function:

◆ taskUpdateProcess()

def dtm.Scheduler.Scheduler.taskUpdateProcess (   self)

method start task delete process

Parameters
idsToDeletelist tasks for deleting

Definition at line 303 of file Scheduler.py.

303  def taskUpdateProcess(self):
304  if len(self.tasksToUpdate) > 0:
305  updateTaskFields = UpdateTaskFields(self.tasksToUpdate[0].id)
306  updateTaskFields.fields["state"] = EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED
307  event = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK_FIELDS, updateTaskFields)
308  self.send(self.CLIENT_INTERFACE_SERVICE_CLIENT, event)
309  logger.debug("Task send to update id = " + str(self.tasksToUpdate[0].id))
310 
311 
Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ cfg_section

dtm.Scheduler.Scheduler.cfg_section

Definition at line 76 of file Scheduler.py.

◆ CLIENT_INTERFACE_SERVICE_CLIENT

string dtm.Scheduler.Scheduler.CLIENT_INTERFACE_SERVICE_CLIENT = "clientClientInterfaceService"
static

Definition at line 65 of file Scheduler.py.

◆ dbi

dtm.Scheduler.Scheduler.dbi

db contains schedule table

Definition at line 119 of file Scheduler.py.

◆ MAX_TASKS

string dtm.Scheduler.Scheduler.MAX_TASKS = "maxTasksPerTimeSlot"
static

Definition at line 67 of file Scheduler.py.

◆ maxTasks

dtm.Scheduler.Scheduler.maxTasks

Definition at line 82 of file Scheduler.py.

◆ OPERATION_ERR

int dtm.Scheduler.Scheduler.OPERATION_ERR = 1024
static

Definition at line 56 of file Scheduler.py.

◆ OPERATION_ERR_MSG

string dtm.Scheduler.Scheduler.OPERATION_ERR_MSG = "Previous task operations is not finished"
static

Definition at line 57 of file Scheduler.py.

◆ RESOURCES_EXCEED_ERR

int dtm.Scheduler.Scheduler.RESOURCES_EXCEED_ERR = 1025
static

Definition at line 59 of file Scheduler.py.

◆ RESOURCES_EXCEED_ERR_MSG

string dtm.Scheduler.Scheduler.RESOURCES_EXCEED_ERR_MSG = "Resources are exceed"
static

Definition at line 60 of file Scheduler.py.

◆ RESOURCES_MANAGER_CLIENT

string dtm.Scheduler.Scheduler.RESOURCES_MANAGER_CLIENT = "clientResourcesManager"
static

Definition at line 64 of file Scheduler.py.

◆ SERVER

string dtm.Scheduler.Scheduler.SERVER = "server"
static

Definition at line 63 of file Scheduler.py.

◆ tasksToUpdate

dtm.Scheduler.Scheduler.tasksToUpdate

Definition at line 77 of file Scheduler.py.

◆ TIME_SLOT_PERIOD

string dtm.Scheduler.Scheduler.TIME_SLOT_PERIOD = "timeSlotPeriod"
static

Definition at line 66 of file Scheduler.py.

◆ timeSlot

dtm.Scheduler.Scheduler.timeSlot

value of the timeSlot used in scheduling tasks

Definition at line 86 of file Scheduler.py.

◆ waitResourcesEvents

dtm.Scheduler.Scheduler.waitResourcesEvents

Definition at line 110 of file Scheduler.py.

◆ waitResourcesTasks

dtm.Scheduler.Scheduler.waitResourcesTasks

Definition at line 115 of file Scheduler.py.


The documentation for this class was generated from the following file: