4 @link: http://hierarchical-cluster-engine.com/ 5 @copyright: Copyright © 2013-2014 IOIX Ukraine 6 @license: http://hierarchical-cluster-engine.com/license/ 15 import cPickle
as pickle
18 from sqlalchemy.exc
import SQLAlchemyError
25 from Constants
import EVENT_TYPES
26 from EventObjects
import ScheduledTask, GeneralResponse, Task, NewTask, GetScheduledTasksResponse, UpdateTaskFields
27 from EventObjects
import EEResponseData
28 from SchedulerTask
import SchedulerTask
29 from SchedulerTaskScheme
import SchedulerTaskScheme
36 logger = logging.getLogger(DTM_CONSTS.LOGGER_NAME)
46 Exception.__init__(self, message)
57 OPERATION_ERR_MSG =
"Previous task operations is not finished" 59 RESOURCES_EXCEED_ERR = 1025
60 RESOURCES_EXCEED_ERR_MSG =
"Resources are exceed" 64 RESOURCES_MANAGER_CLIENT =
"clientResourcesManager" 65 CLIENT_INTERFACE_SERVICE_CLIENT =
"clientClientInterfaceService" 66 TIME_SLOT_PERIOD =
"timeSlotPeriod" 67 MAX_TASKS =
"maxTasksPerTimeSlot" 73 def __init__(self, configParser, connectBuilderLight, pollerManager=None):
74 super(Scheduler, self).
__init__(pollerManager)
88 serverConnection = connectBuilderLight.build(consts.SERVER_CONNECT, serverAddr)
89 resourcesManagerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, resourcesManagerAddr)
90 clientInterfaceServiceConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, clientInterfaceServiceAddr,
121 isClearOnStart = configParser.get(self.
cfg_section, DTM_CONSTS.CLEAR_ON_START)
122 if isClearOnStart ==
"True":
124 schedulerTask.state = ScheduledTask.STATE_CLOSED
127 except DBIErr
as err:
128 logger.error(
">>> Some DBI error in Scheduler.__init__ [" + str(err.message) +
"]")
135 return dict(configParser.items(DTM_CONSTS.DB_CONFIG_SECTION))
142 task = event.eventObj
145 if "DATE" in task.session:
152 except (DBIErr, LogicErr)
as err:
153 logger.error(
">>> Some DBI error in Scheduler.onNewTask [" + str(err.message) +
"]")
156 responseEvent = self.
eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK_RESPONSE , response)
157 self.
reply(event, responseEvent)
163 if "DATE" in task.session:
164 date_string = task.session[
"DATE"]
167 rightBorderMs = leftBorderMs + self.
timeSlot 170 taskInSlotNumber = len(row_dbi.db.session.query(
type(scheme)).filter(
type(scheme).rTime <= rightBorderMs).\
171 filter(
type(scheme).rTime >= leftBorderMs).filter_by(state=ScheduledTask.STATE_PLANNED).all())
172 logger.debug(
"taskInSlotNumber=" + str(taskInSlotNumber))
175 task.rtime = plannedTime
180 except DBIErr
as err:
181 logger.error(
">>> Some DBI error in Scheduler.planTaskTimeToRun [" + str(err.message) +
"]")
188 updateTask = event.eventObj
199 deleteTask = event.eventObj
205 except DBIErr
as err:
206 logger.error(
">>> Some DBI error in Scheduler.__init__ [" + str(err.message) +
"]")
209 responseEvent = self.
eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK_RESPONSE , response)
210 self.
reply(event, responseEvent)
221 getAVGResourcesEvent = self.
eventBuilder.build(EVENT_TYPES.GET_AVG_RESOURCES,
None)
223 getAVGResourcesEvent.uid = event.uid
232 resourcesAVG = event.eventObj
238 scheduledTimeSlot = getScheduledTasks.timeSlot
241 rightBorderMs = int(math.floor(curTime / scheduledTimeSlot)) * scheduledTimeSlot + scheduledTimeSlot
242 clause =
"SELECT * FROM scheduler_task_scheme WHERE rTime < " + str(rightBorderMs) +
" AND state=" + \
243 str(ScheduledTask.STATE_PLANNED) +
" ORDER BY rTime ASC, priority DESC, tries DESC" 245 clause = clause +
" LIMIT " + str(self.
maxTasks)
247 taskSchemes = self.
dbi.
sql(scheme, clause)
248 if hasattr(taskSchemes,
'__iter__'):
249 for taskScheme
in taskSchemes:
251 schedulerTask = taskScheme._getSchedulerTask()
252 if schedulerTask.rTime > 0:
253 rt = datetime.datetime.utcfromtimestamp(schedulerTask.rTime / 1000)
254 rb = datetime.datetime.utcfromtimestamp(rightBorderMs / 1000)
255 ct = datetime.datetime.utcfromtimestamp(curTime / 1000)
256 logger.debug(
"Task " + str(schedulerTask.id) +
" selected by rTime=" + str(rt) + \
257 ", rborder=" + str(rb) +
", now=" + str(ct))
258 task_strategy = pickle.loads(str(schedulerTask.strategy))
260 ids.append(schedulerTask.id)
261 schedulerTask.state = ScheduledTask.STATE_INPROGRESS
264 logger.debug(
"Run isn't possible id=%s tries=%s", str(schedulerTask.id), str(schedulerTask.tries))
265 schedulerTask.tries += 1
267 if schedulerTask.state == ScheduledTask.STATE_CLOSED
or \
268 (Task.STRATEGY_SDELAY
in task_strategy
and task_strategy[Task.STRATEGY_RDELAY] == 0):
271 schedulerTask.rTime = self.
rTimeCalc(schedulerTask, task_strategy,
True)
275 logger.debug(
">>> Scheduled tasks limit reached LIMIT = " + str(self.
maxTasks))
276 except SQLAlchemyError
as err:
277 logger.critical(str(err.message))
278 row_dbi.db.session.rollback()
279 except DBIErr
as err:
280 logger.critical(
">>> Some DBI error in Scheduler.onAVGResourcesResponse [" + str(err.message) +
"]")
283 responseEvent = self.
eventBuilder.build(EVENT_TYPES.GET_SCHEDULED_TASKS_RESPONSE , getScheduledTasksResponse)
288 logger.error(
"get resourceAVG for non exist event " + str(event.uid))
294 logger.debug(
"Task send update response, tasks len = " + str(len(self.
tasksToUpdate)))
306 updateTaskFields.fields[
"state"] = EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED
307 event = self.
eventBuilder.build(EVENT_TYPES.UPDATE_TASK_FIELDS, updateTaskFields)
309 logger.debug(
"Task send to update id = " + str(self.
tasksToUpdate[0].id))
316 def rTimeCalc(self, schedulerTask, task_strategy, replanned):
317 logger.debug(
"Old rTime=" + str(datetime.datetime.utcfromtimestamp(schedulerTask.rTime / 1000)))
323 if Task.STRATEGY_RDELAY
in task_strategy
and int(task_strategy[Task.STRATEGY_RDELAY]) > 0:
324 ret += int(task_strategy[Task.STRATEGY_RDELAY])
325 logger.debug(
"New (RDELAY) rTime=" + str(datetime.datetime.utcfromtimestamp(ret / 1000)))
327 if Task.STRATEGY_SDELAY
in task_strategy
and int(task_strategy[Task.STRATEGY_SDELAY]) > 0:
328 ret += int(task_strategy[Task.STRATEGY_SDELAY])
329 logger.debug(
"New (SDELAY) rTime=" + str(datetime.datetime.utcfromtimestamp(ret / 1000)))
332 if Task.STRATEGY_RDELAY in task_strategy and int(task_strategy[Task.STRATEGY_RDELAY]) > 0: 333 ret += int(task_strategy[Task.STRATEGY_RDELAY]) 334 logger.debug("New rTime=" + str(datetime.datetime.utcfromtimestamp(ret / 1000))) 363 ret = schedulerTask.state
364 if Task.STRATEGY_RETRY
in task_strategy
and task_strategy[Task.STRATEGY_RETRY] <= schedulerTask.tries:
365 ret = ScheduledTask.STATE_CLOSED
374 responseEvent = self.
eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK_RESPONSE , response)
375 self.
reply(event, responseEvent)
382 if schedulerTask ==
None:
384 schedulerTask.id = deleteTask.id
400 schedulerTask.id = task.id
401 schedulerTask.state = ScheduledTask.STATE_PLANNED
402 schedulerTask.strategy = pickle.dumps(task.strategy)
404 if hasattr(task,
"rtime"):
405 schedulerTask.rTime = task.rtime
407 schedulerTask.rTime = self.
rTimeCalc(schedulerTask, task.strategy,
False)
410 if Task.STRATEGY_PRIORITY
in task.strategy:
411 priority = task.strategy[Task.STRATEGY_PRIORITY]
412 schedulerTask.priority = priority
413 if isinstance(task, (NewTask, DeleteTask)):
425 if isinstance(task, (NewTask, DeleteTask)):
427 raise LogicErr(LogicErr.ERR_CODE,
"Task is already in schedule")
430 raise LogicErr(LogicErr.ERR_CODE,
"Task is wrong type:" + str(
type(task)))
440 if ret
and "CPU" in task_strategy
and resourcesAVG.cpuCores > 0:
441 if (resourcesAVG.threads / resourcesAVG.cpuCores) <= task_strategy[
"CPU"]:
442 logger.debug(
"CPU limit %s %s", str(resourcesAVG.threads / resourcesAVG.cpuCores),
443 str(task_strategy[
"CPU"]))
446 if ret
and "CPU_LOAD_MAX" in task_strategy:
447 if resourcesAVG.cpu >= task_strategy[
"CPU_LOAD_MAX"]:
448 logger.debug(
"CPU_LOAD_MAX limit %s %s", str(resourcesAVG.cpu), str(task_strategy[
"CPU_LOAD_MAX"]))
451 if ret
and "IO_WAIT_MAX" in task_strategy:
452 if resourcesAVG.io > task_strategy[
"IO_WAIT_MAX"]:
453 logger.debug(
"IO_WAIT_MAX limit %s %s", str(resourcesAVG.io), str(task_strategy[
"IO_WAIT_MAX"]))
456 if ret
and "RAM_FREE" in task_strategy:
458 if resourcesAVG.ramR > 0
and resourcesAVG.ramRU > 0
and \
459 (resourcesAVG.ramR - resourcesAVG.ramRU) < task_strategy[
"RAM_FREE"]:
460 logger.debug(
"RAM_FREE limit %s < %s", str(resourcesAVG.ramR - resourcesAVG.ramRU),
461 str(task_strategy[
"RAM_FREE"]))
472 date = datetime.datetime.now()
473 epoch = datetime.datetime.utcfromtimestamp(0)
475 return int(((delta.days * 24 * 60 * 60 + delta.seconds) * 1000 + delta.microseconds / 1000.0))
483 planed_time = datetime.datetime.strptime(date_string,
"%Y-%m-%d %H:%M:%S,%f")
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def createDBIDict(self, configParser)
UpdateTaskFields event object, for update task fields operation.
def onNewTask(self, event)
onNewTask event handler
string CLIENT_INTERFACE_SERVICE_CLIENT
def sendOperationProcessingError(self, event)
send operation processing error
def onDeleteTask(self, event)
onDeleteTask event handler
dbi
db contains schedule table
def taskUpdateProcess(self)
method start task delete process
def addPendingEvent(self, event)
add pending event in all auxiliary structures
def getTimeSinceEpoch(self, date=None)
get time since epoch in millisec
def __init__(self, configParser, connectBuilderLight, pollerManager=None)
constructor initialise all connections and event handlers
GeneralResponse event object, represents general state response for multipurpose usage.
timeSlot
value of the timeSlot used in scheduling tasks
string RESOURCES_MANAGER_CLIENT
Class is used to inform about logic error.
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
Class describes structures of task item used in Scheduler.
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
def deleteTaskFromSchedule(self, deleteTask, schedulerTask=None)
delete task from schedule
def modifyTaskInSchedule(self, task)
add task in schedule
def __init__(self, errCode, message)
def onUpdateTaskFieldsResponse(self, event)
method handler for DeleteTskResponse event
def send(self, connect_name, event)
send event
def onAVGResourcesResponse(self, event)
onAVGResourcesResponse event handler
def checkCorrectTaskType(self, task)
check that logic of requred operation corresponds with schedule state prevent wrong operation(update ...
def rTimeCalc(self, schedulerTask, task_strategy, replanned)
method recalculates new value of rTime field
def getPlannedRunTime(self, date_string)
get planned run time
def stateRecalculate(self, task_strategy, schedulerTask)
recalcutes state field
def createDBIDict(self, configParser)
create dict config (dict object)
def onGetSheduledTasks(self, event)
onGetSheduledTasks event handler
The Scheduler object implements algorithms of tasks scheduling.
def reschedulingTasks(self)
rescheduling schedule after all changes in schedule
def isPossibleToRun(self, resourcesAVG, task_strategy)
Check is task possible to run by comparison of required limits and actual resources.
def deletePendingEvent(self, event)
delete pending event from all auxiliary structures
def planTaskTimeToRun(self, task)
def onUpdateTask(self, event)
onUpdateTask event handler
GetScheduledTasksResponse event object, to return list of task from the Scheduler.