HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
Scheduler.py
Go to the documentation of this file.
1 '''
2 @package: dtm
3 @author igor
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 import time
10 import datetime
11 import logging
12 import math
13 #import pickle
14 try:
15  import cPickle as pickle
16 except ImportError:
17  import pickle
18 from sqlalchemy.exc import SQLAlchemyError
19 
20 from app.BaseServerManager import BaseServerManager
21 from dbi.dbi import DBI
22 from dbi.dbi import DBIErr
23 from dtm.EventObjects import DeleteTask
24 
25 from Constants import EVENT_TYPES
26 from EventObjects import ScheduledTask, GeneralResponse, Task, NewTask, GetScheduledTasksResponse, UpdateTaskFields
27 from EventObjects import EEResponseData
28 from SchedulerTask import SchedulerTask
29 from SchedulerTaskScheme import SchedulerTaskScheme
30 import dbi.dbi as row_dbi
31 import transport.Consts as consts
32 import dtm.Constants as DTM_CONSTS
33 
34 
36 logger = logging.getLogger(DTM_CONSTS.LOGGER_NAME)
37 
38 
41 class LogicErr(Exception):
42  ERR_CODE = 1101
43 
44  def __init__(self, errCode, message):
45  self.errCode = errCode
46  Exception.__init__(self, message)
47 
48 
49 
50 
53 
54  #@todo move to suitable place
55  #operations errors
56  OPERATION_ERR = 1024
57  OPERATION_ERR_MSG = "Previous task operations is not finished"
58 
59  RESOURCES_EXCEED_ERR = 1025
60  RESOURCES_EXCEED_ERR_MSG = "Resources are exceed"
61 
62  #Configuration settings options names
63  SERVER = "server"
64  RESOURCES_MANAGER_CLIENT = "clientResourcesManager"
65  CLIENT_INTERFACE_SERVICE_CLIENT = "clientClientInterfaceService"
66  TIME_SLOT_PERIOD = "timeSlotPeriod"
67  MAX_TASKS = "maxTasksPerTimeSlot"
68 
69 
70 
73  def __init__(self, configParser, connectBuilderLight, pollerManager=None):
74  super(Scheduler, self).__init__(pollerManager)
75 
76  self.cfg_section = self.__class__.__name__
77  self.tasksToUpdate = list()
78 
79  serverAddr = configParser.get(self.cfg_section, self.SERVER)
80  resourcesManagerAddr = configParser.get(self.cfg_section, self.RESOURCES_MANAGER_CLIENT)
81  clientInterfaceServiceAddr = configParser.get(self.cfg_section, self.CLIENT_INTERFACE_SERVICE_CLIENT)
82  self.maxTasks = int(configParser.get(self.cfg_section, self.MAX_TASKS))
83 
84 
86  self.timeSlot = int(configParser.get(self.cfg_section, self.TIME_SLOT_PERIOD))
87 
88  serverConnection = connectBuilderLight.build(consts.SERVER_CONNECT, serverAddr)
89  resourcesManagerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, resourcesManagerAddr)
90  clientInterfaceServiceConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, clientInterfaceServiceAddr,
91  real_connect=False)
92 
93  self.addConnection(self.SERVER, serverConnection)
94  self.addConnection(self.RESOURCES_MANAGER_CLIENT, resourcesManagerConnection)
95  self.addConnection(self.CLIENT_INTERFACE_SERVICE_CLIENT, clientInterfaceServiceConnection)
96 
97  #TasksManager events
98  self.setEventHandler(EVENT_TYPES.SCHEDULE_TASK, self.onNewTask)
99  self.setEventHandler(EVENT_TYPES.UPDATE_TASK, self.onUpdateTask)
100  self.setEventHandler(EVENT_TYPES.DELETE_TASK, self.onDeleteTask)
101  self.setEventHandler(EVENT_TYPES.GET_SCHEDULED_TASKS, self.onGetSheduledTasks)
102  self.setEventHandler(EVENT_TYPES.UPDATE_TASK_FIELDS_RESPONSE, self.onUpdateTaskFieldsResponse)
103 
104  #ResourcesManager
105  self.setEventHandler(EVENT_TYPES.GET_AVG_RESOURCES_RESPONSE, self.onAVGResourcesResponse)
106 
107 
110  self.waitResourcesEvents = dict()
111 
112 
115  self.waitResourcesTasks = dict()
116 
117 
119  self.dbi = DBI(self.createDBIDict(configParser))
120 
121  isClearOnStart = configParser.get(self.cfg_section, DTM_CONSTS.CLEAR_ON_START)
122  if isClearOnStart == "True":
123  schedulerTask = SchedulerTask()
124  schedulerTask.state = ScheduledTask.STATE_CLOSED
125  try:
126  self.dbi.update(SchedulerTaskScheme(schedulerTask), "id=id")
127  except DBIErr as err:
128  logger.error(">>> Some DBI error in Scheduler.__init__ [" + str(err.message) + "]")
129 
130 
131 
133  def createDBIDict(self, configParser):
134  #get section
135  return dict(configParser.items(DTM_CONSTS.DB_CONFIG_SECTION))
136 
137 
138 
141  def onNewTask(self, event):
142  task = event.eventObj
143  try:
144  response = GeneralResponse()
145  if "DATE" in task.session:
146  response = self.planTaskTimeToRun(task)
147  else:
148  self.checkCorrectTaskType(task)
149  self.modifyTaskInSchedule(task)
150  self.reschedulingTasks()
151 
152  except (DBIErr, LogicErr) as err:
153  logger.error(">>> Some DBI error in Scheduler.onNewTask [" + str(err.message) + "]")
154  response = GeneralResponse(err.errCode, err.message)
155  finally:
156  responseEvent = self.eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK_RESPONSE , response)
157  self.reply(event, responseEvent)
158 
159 
160 
161  #@todo for testing purpose
162  def planTaskTimeToRun(self, task):
163  if "DATE" in task.session: #@todo only for test
164  date_string = task.session["DATE"]
165  plannedTime = self.getPlannedRunTime(date_string)
166  leftBorderMs = int(math.floor(plannedTime / self.timeSlot)) * self.timeSlot
167  rightBorderMs = leftBorderMs + self.timeSlot
168  try:
170  taskInSlotNumber = len(row_dbi.db.session.query(type(scheme)).filter(type(scheme).rTime <= rightBorderMs).\
171  filter(type(scheme).rTime >= leftBorderMs).filter_by(state=ScheduledTask.STATE_PLANNED).all())
172  logger.debug("taskInSlotNumber=" + str(taskInSlotNumber))
173 
174  #@todo add more complex condition for planning
175  task.rtime = plannedTime
176  self.modifyTaskInSchedule(task)
177 
178  return GeneralResponse()
179 
180  except DBIErr as err:
181  logger.error(">>> Some DBI error in Scheduler.planTaskTimeToRun [" + str(err.message) + "]")
182 
183 
184 
187  def onUpdateTask(self, event):
188  updateTask = event.eventObj
189  if updateTask.id not in self.waitResourcesTasks:
190  self.onNewTask(event)
191  else:
192  self.sendOperationProcessingError(event)
193 
194 
195 
198  def onDeleteTask(self, event):
199  deleteTask = event.eventObj
200  if deleteTask.id not in self.waitResourcesTasks:
201  try:
202  response = GeneralResponse()
203  self.deleteTaskFromSchedule(deleteTask)
204  self.reschedulingTasks()
205  except DBIErr as err:
206  logger.error(">>> Some DBI error in Scheduler.__init__ [" + str(err.message) + "]")
207  response = GeneralResponse(err.errCode, err.message)
208  finally:
209  responseEvent = self.eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK_RESPONSE , response)
210  self.reply(event, responseEvent)
211  else:
212  self.sendOperationProcessingError(event)
213 
214 
215 
218  def onGetSheduledTasks(self, event):
219  #get the current resources state
220  self.addPendingEvent(event)
221  getAVGResourcesEvent = self.eventBuilder.build(EVENT_TYPES.GET_AVG_RESOURCES, None)
222 
223  getAVGResourcesEvent.uid = event.uid
224  self.send(self.RESOURCES_MANAGER_CLIENT, getAVGResourcesEvent)
225 
226 
227 
230  def onAVGResourcesResponse(self, event):
231  #get response on event sending in onGetSheduledTasks
232  resourcesAVG = event.eventObj
233  if event.uid in self.waitResourcesEvents:
234  getScheduledTasks = self.waitResourcesEvents[event.uid].eventObj
235  ids = list()
236  try:
237  #@todo add correct select condition
238  scheduledTimeSlot = getScheduledTasks.timeSlot
239  #get right border of current time slot
240  curTime = self.getTimeSinceEpoch(datetime.datetime.now())
241  rightBorderMs = int(math.floor(curTime / scheduledTimeSlot)) * scheduledTimeSlot + scheduledTimeSlot
242  clause = "SELECT * FROM scheduler_task_scheme WHERE rTime < " + str(rightBorderMs) + " AND state=" + \
243  str(ScheduledTask.STATE_PLANNED) + " ORDER BY rTime ASC, priority DESC, tries DESC"
244  if self.maxTasks > 0:
245  clause = clause + " LIMIT " + str(self.maxTasks)
247  taskSchemes = self.dbi.sql(scheme, clause)
248  if hasattr(taskSchemes, '__iter__'):
249  for taskScheme in taskSchemes:
250  # pylint: disable-msg=W0212
251  schedulerTask = taskScheme._getSchedulerTask()
252  if schedulerTask.rTime > 0:
253  rt = datetime.datetime.utcfromtimestamp(schedulerTask.rTime / 1000)
254  rb = datetime.datetime.utcfromtimestamp(rightBorderMs / 1000)
255  ct = datetime.datetime.utcfromtimestamp(curTime / 1000)
256  logger.debug("Task " + str(schedulerTask.id) + " selected by rTime=" + str(rt) + \
257  ", rborder=" + str(rb) + ", now=" + str(ct))
258  task_strategy = pickle.loads(str(schedulerTask.strategy))
259  if self.isPossibleToRun(resourcesAVG, task_strategy):
260  ids.append(schedulerTask.id)
261  schedulerTask.state = ScheduledTask.STATE_INPROGRESS
262  self.dbi.update(SchedulerTaskScheme(schedulerTask), "id=%s" % str(schedulerTask.id))
263  else:
264  logger.debug("Run isn't possible id=%s tries=%s", str(schedulerTask.id), str(schedulerTask.tries))
265  schedulerTask.tries += 1
266  schedulerTask.state = self.stateRecalculate(task_strategy, schedulerTask)
267  if schedulerTask.state == ScheduledTask.STATE_CLOSED or \
268  (Task.STRATEGY_SDELAY in task_strategy and task_strategy[Task.STRATEGY_RDELAY] == 0):
269  self.tasksToUpdate.append(schedulerTask)
270  else:
271  schedulerTask.rTime = self.rTimeCalc(schedulerTask, task_strategy, True)
272  self.dbi.update(SchedulerTaskScheme(schedulerTask), "id=%s" % str(schedulerTask.id))
273  #@todo maybe need some state for task isn't selected on execution
274  if self.maxTasks > 0 and len(taskSchemes) == self.maxTasks:
275  logger.debug(">>> Scheduled tasks limit reached LIMIT = " + str(self.maxTasks))
276  except SQLAlchemyError as err:
277  logger.critical(str(err.message))
278  row_dbi.db.session.rollback() # pylint: disable-all
279  except DBIErr as err:
280  logger.critical(">>> Some DBI error in Scheduler.onAVGResourcesResponse [" + str(err.message) + "]")
281 
282  getScheduledTasksResponse = GetScheduledTasksResponse(ids)
283  responseEvent = self.eventBuilder.build(EVENT_TYPES.GET_SCHEDULED_TASKS_RESPONSE , getScheduledTasksResponse)
284  self.reply(self.waitResourcesEvents[event.uid], responseEvent)
285  self.deletePendingEvent(event)
286  self.taskUpdateProcess()
287  else:
288  logger.error("get resourceAVG for non exist event " + str(event.uid))
289 
290 
291 
293  def onUpdateTaskFieldsResponse(self, event): # pylint: disable=W0613
294  logger.debug("Task send update response, tasks len = " + str(len(self.tasksToUpdate)))
295  if len(self.tasksToUpdate) > 0:
296  del self.tasksToUpdate[0]
297  self.taskUpdateProcess()
298 
299 
300 
303  def taskUpdateProcess(self):
304  if len(self.tasksToUpdate) > 0:
305  updateTaskFields = UpdateTaskFields(self.tasksToUpdate[0].id)
306  updateTaskFields.fields["state"] = EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED
307  event = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK_FIELDS, updateTaskFields)
308  self.send(self.CLIENT_INTERFACE_SERVICE_CLIENT, event)
309  logger.debug("Task send to update id = " + str(self.tasksToUpdate[0].id))
310 
311 
312 
316  def rTimeCalc(self, schedulerTask, task_strategy, replanned):
317  logger.debug("Old rTime=" + str(datetime.datetime.utcfromtimestamp(schedulerTask.rTime / 1000)))
318 
319  # Get NOW time
320  ret = self.getTimeSinceEpoch()
321  #logger.debug("Now time=" + str(datetime.datetime.utcfromtimestamp(ret / 1000)))
322  if replanned:
323  if Task.STRATEGY_RDELAY in task_strategy and int(task_strategy[Task.STRATEGY_RDELAY]) > 0:
324  ret += int(task_strategy[Task.STRATEGY_RDELAY])
325  logger.debug("New (RDELAY) rTime=" + str(datetime.datetime.utcfromtimestamp(ret / 1000)))
326  else:
327  if Task.STRATEGY_SDELAY in task_strategy and int(task_strategy[Task.STRATEGY_SDELAY]) > 0:
328  ret += int(task_strategy[Task.STRATEGY_SDELAY])
329  logger.debug("New (SDELAY) rTime=" + str(datetime.datetime.utcfromtimestamp(ret / 1000)))
330 
331  '''
332  if Task.STRATEGY_RDELAY in task_strategy and int(task_strategy[Task.STRATEGY_RDELAY]) > 0:
333  ret += int(task_strategy[Task.STRATEGY_RDELAY])
334  logger.debug("New rTime=" + str(datetime.datetime.utcfromtimestamp(ret / 1000)))
335  '''
336 
337  return ret
338 
339 
340 
343  def addPendingEvent(self, event):
344  #task = event.eventObj
345  #self.waitResourcesTasks[task.id] = True
346  self.waitResourcesEvents[event.uid] = event
347 
348 
349 
352  def deletePendingEvent(self, event):
353  #taskId = self.waitResourcesEvents[event.uid].eventObj.id
354  #del self.waitResourcesTasks[taskId]
355  del self.waitResourcesEvents[event.uid]
356 
357 
358 
362  def stateRecalculate(self, task_strategy, schedulerTask):
363  ret = schedulerTask.state
364  if Task.STRATEGY_RETRY in task_strategy and task_strategy[Task.STRATEGY_RETRY] <= schedulerTask.tries:
365  ret = ScheduledTask.STATE_CLOSED
366  return ret
367 
368 
369 
373  response = GeneralResponse(self.OPERATION_ERR, self.OPERATION_ERR_MSG)
374  responseEvent = self.eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK_RESPONSE , response)
375  self.reply(event, responseEvent)
376 
377 
378 
381  def deleteTaskFromSchedule(self, deleteTask, schedulerTask=None):
382  if schedulerTask == None:
383  schedulerTask = SchedulerTask()
384  schedulerTask.id = deleteTask.id
385  self.dbi.delete(SchedulerTaskScheme(schedulerTask), "id=%s" % deleteTask.id)
386 
387 
388 
390  def reschedulingTasks(self):
391  #@todo implement strategy
392  pass
393 
394 
395 
398  def modifyTaskInSchedule(self, task):
399  schedulerTask = SchedulerTask()
400  schedulerTask.id = task.id
401  schedulerTask.state = ScheduledTask.STATE_PLANNED
402  schedulerTask.strategy = pickle.dumps(task.strategy)
403 
404  if hasattr(task, "rtime"): #@todo only for tests
405  schedulerTask.rTime = task.rtime
406  else:
407  schedulerTask.rTime = self.rTimeCalc(schedulerTask, task.strategy, False)
408 
409  priority = 0
410  if Task.STRATEGY_PRIORITY in task.strategy:
411  priority = task.strategy[Task.STRATEGY_PRIORITY]
412  schedulerTask.priority = priority
413  if isinstance(task, (NewTask, DeleteTask)):
414  self.dbi.insert(SchedulerTaskScheme(schedulerTask))
415  else:
416  self.dbi.update(SchedulerTaskScheme(schedulerTask), "id = %s" % schedulerTask.id)
417 
418 
419 
423  def checkCorrectTaskType(self, task):
424  result = self.dbi.fetch(SchedulerTaskScheme(SchedulerTask()), "id = %s" % task.id)
425  if isinstance(task, (NewTask, DeleteTask)):
426  if len(result) > 0:
427  raise LogicErr(LogicErr.ERR_CODE, "Task is already in schedule")
428  else:
429  if len(result) == 0:
430  raise LogicErr(LogicErr.ERR_CODE, "Task is wrong type:" + str(type(task)))
431 
432 
433 
438  def isPossibleToRun(self, resourcesAVG, task_strategy):
439  ret = True
440  if ret and "CPU" in task_strategy and resourcesAVG.cpuCores > 0:
441  if (resourcesAVG.threads / resourcesAVG.cpuCores) <= task_strategy["CPU"]:
442  logger.debug("CPU limit %s %s", str(resourcesAVG.threads / resourcesAVG.cpuCores),
443  str(task_strategy["CPU"]))
444  ret = False
445 
446  if ret and "CPU_LOAD_MAX" in task_strategy:
447  if resourcesAVG.cpu >= task_strategy["CPU_LOAD_MAX"]:
448  logger.debug("CPU_LOAD_MAX limit %s %s", str(resourcesAVG.cpu), str(task_strategy["CPU_LOAD_MAX"]))
449  ret = False
450 
451  if ret and "IO_WAIT_MAX" in task_strategy:
452  if resourcesAVG.io > task_strategy["IO_WAIT_MAX"]:
453  logger.debug("IO_WAIT_MAX limit %s %s", str(resourcesAVG.io), str(task_strategy["IO_WAIT_MAX"]))
454  ret = False
455 
456  if ret and "RAM_FREE" in task_strategy:
457  #logger.debug("FREE RAM: %s, LIMIT %s", str(resourcesAVG.ramR - resourcesAVG.ramRU), str(task_strategy["RAM_FREE"]))
458  if resourcesAVG.ramR > 0 and resourcesAVG.ramRU > 0 and \
459  (resourcesAVG.ramR - resourcesAVG.ramRU) < task_strategy["RAM_FREE"]:
460  logger.debug("RAM_FREE limit %s < %s", str(resourcesAVG.ramR - resourcesAVG.ramRU),
461  str(task_strategy["RAM_FREE"]))
462  ret = False
463 
464  return ret
465 
466 
467 
470  def getTimeSinceEpoch(self, date=None):
471  if date is None:
472  date = datetime.datetime.now()
473  epoch = datetime.datetime.utcfromtimestamp(0) #@todo move to init
474  delta = date - epoch #datetime.datetime.now() - epoch
475  return int(((delta.days * 24 * 60 * 60 + delta.seconds) * 1000 + delta.microseconds / 1000.0))
476 
477 
478 
482  def getPlannedRunTime(self, date_string):
483  planed_time = datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S,%f")
484  return self.getTimeSinceEpoch(planed_time)
485 
def reply(self, event, reply_event)
wrapper for sending event in reply for event
UpdateTaskFields event object, for update task fields operation.
def onNewTask(self, event)
onNewTask event handler
Definition: Scheduler.py:141
string CLIENT_INTERFACE_SERVICE_CLIENT
Definition: Scheduler.py:65
def sendOperationProcessingError(self, event)
send operation processing error
Definition: Scheduler.py:372
def onDeleteTask(self, event)
onDeleteTask event handler
Definition: Scheduler.py:198
dbi
db contains schedule table
Definition: Scheduler.py:119
def taskUpdateProcess(self)
method start task delete process
Definition: Scheduler.py:303
def addPendingEvent(self, event)
add pending event in all auxiliary structures
Definition: Scheduler.py:343
def getTimeSinceEpoch(self, date=None)
get time since epoch in millisec
Definition: Scheduler.py:470
def __init__(self, configParser, connectBuilderLight, pollerManager=None)
constructor initialise all connections and event handlers
Definition: Scheduler.py:73
GeneralResponse event object, represents general state response for multipurpose usage.
timeSlot
value of the timeSlot used in scheduling tasks
Definition: Scheduler.py:86
string RESOURCES_MANAGER_CLIENT
Definition: Scheduler.py:64
Class is used to inform about logic error.
Definition: Scheduler.py:41
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
Class describes structures of task item used in Scheduler.
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
def deleteTaskFromSchedule(self, deleteTask, schedulerTask=None)
delete task from schedule
Definition: Scheduler.py:381
Definition: dbi.py:1
def modifyTaskInSchedule(self, task)
add task in schedule
Definition: Scheduler.py:398
def __init__(self, errCode, message)
Definition: Scheduler.py:44
def onUpdateTaskFieldsResponse(self, event)
method handler for DeleteTskResponse event
Definition: Scheduler.py:293
def send(self, connect_name, event)
send event
def onAVGResourcesResponse(self, event)
onAVGResourcesResponse event handler
Definition: Scheduler.py:230
def checkCorrectTaskType(self, task)
check that logic of requred operation corresponds with schedule state prevent wrong operation(update ...
Definition: Scheduler.py:423
def rTimeCalc(self, schedulerTask, task_strategy, replanned)
method recalculates new value of rTime field
Definition: Scheduler.py:316
def getPlannedRunTime(self, date_string)
get planned run time
Definition: Scheduler.py:482
def stateRecalculate(self, task_strategy, schedulerTask)
recalcutes state field
Definition: Scheduler.py:362
def createDBIDict(self, configParser)
create dict config (dict object)
Definition: Scheduler.py:133
def onGetSheduledTasks(self, event)
onGetSheduledTasks event handler
Definition: Scheduler.py:218
The Scheduler object implements algorithms of tasks scheduling.
Definition: Scheduler.py:52
def reschedulingTasks(self)
rescheduling schedule after all changes in schedule
Definition: Scheduler.py:390
def isPossibleToRun(self, resourcesAVG, task_strategy)
Check is task possible to run by comparison of required limits and actual resources.
Definition: Scheduler.py:438
def deletePendingEvent(self, event)
delete pending event from all auxiliary structures
Definition: Scheduler.py:352
def planTaskTimeToRun(self, task)
Definition: Scheduler.py:162
def onUpdateTask(self, event)
onUpdateTask event handler
Definition: Scheduler.py:187
GetScheduledTasksResponse event object, to return list of task from the Scheduler.