4 @link: http://hierarchical-cluster-engine.com/ 5 @copyright: Copyright © 2013-2014 IOIX Ukraine 6 @license: http://hierarchical-cluster-engine.com/license/ 12 from collections
import namedtuple
13 from functools
import partial
14 from datetime
import datetime
20 from Constants
import EVENT_TYPES, LOGGER_NAME
21 from TaskBackLogScheme
import TaskBackLogScheme
22 from TaskLog
import TaskLog
23 from TaskLogScheme
import TaskLogScheme
24 from EventObjects
import GeneralResponse, DeleteTask, TaskManagerFields, EEResponseData, FetchTaskData, Task
25 from EventObjects
import DeleteTaskData, DeleteEEResponseData, AvailableTaskIds, GetTasksStatus
36 from sqlalchemy
import text
38 logger = logging.getLogger(LOGGER_NAME)
41 TaskStep = namedtuple(
"TaskStep",
"ok_callback err_callback desc eventNewTask")
44 TaskRecord = namedtuple(
"TaskRecord",
"tasksteps event responseEventType")
49 STEP_SEND_TO_TASKS_DATA_MANAGER = 1
50 STEP_ADD_TO_INTERNAL_STRUCTURES = 2
51 STEP_SEND_TO_SCHEDULER = 3
61 Exception.__init__(self, message)
71 TASKS_DATA_MANAGER_CLIENT =
"clientTasksDataManager" 72 SCHEDULER_CLIENT =
"clientScheduler" 73 CONFIG_TIME_SLOT_PERIOD =
"timeSlotPeriod" 74 AUTO_CLEANUP_TIME_SLOT_PERIOD =
"autoCleanUpSlotPeriod" 76 VAR_TASKS_TOTAL =
"tasks_total" 77 VAR_TASKS_TOTAL_DEL =
"tasks_total_del" 78 VAR_TASKS_TIME_SUM =
"tasks_time_sum" 79 VAR_TASKS_TIME_COUNT =
"tasks_time_count" 80 VAR_TASKS_TIME_AVG =
"tasks_time_avg" 81 VAR_TASKS_TIME_MIN =
"tasks_time_min" 82 VAR_TASKS_TIME_MAX =
"tasks_time_max" 83 VAR_TASKS_ERRORS =
"tasks_errors" 84 VAR_TASKS_RETRIES =
"tasks_retries" 85 VAR_TASKS_RETRIES_DEL =
"tasks_retries_del" 86 VAR_TASKS_DELETE_TRIES =
"tasks_delete_tries" 89 CLEANUP_TABLES_LIST = [
'task_back_log_scheme',
91 'scheduler_task_scheme',
98 def __init__(self, configParser, connectBuilderLight, pollerManager=None):
99 super(TasksManager, self).
__init__(pollerManager)
108 serverConnection = connectBuilderLight.build(consts.SERVER_CONNECT, serverAddr)
109 tasksDataManagerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, tasksDataManagerAddr)
110 schedulerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, schedulerAddr)
153 isClearOnStart = configParser.get(self.
cfg_section, DTM_CONSTS.CLEAR_ON_START)
180 if isClearOnStart ==
"True":
183 if hasattr(suspendedTasks,
'__iter__')
and len(suspendedTasks) > 0
and suspendedTasks[0]
is not None:
184 for task
in suspendedTasks:
185 logger.debug(
">>> Start suspend task to delete id=%s", task.id)
186 task.state = EEResponseData.TASK_STATE_FINISHED
187 task.deleteTaskId = 0
188 self.
dbi.update(task,
"id=%s" % str(task.id))
190 for task
in suspendedTasks:
192 delEvent = self.
eventBuilder.build(EVENT_TYPES.DELETE_TASK, deleteObj)
194 except DBIErr
as err:
195 logger.error(
">>> Some DBI error in TasksManager.cleanUpOnStart [" + str(err.message) +
"]")
204 logger.debug(
"Insert pendingTasks[] item " + str(event.uid))
205 logger.debug(
"New Task event " + str(event.eventObj.id))
207 if hasattr(dbiRecords,
'__iter__')
and len(dbiRecords) > 0
and dbiRecords[0]
is not None:
208 logger.debug(
"Task Id already exist")
210 responseEvent.uid = event.uid
211 self.
reply(event, responseEvent)
214 operation_steps[0].ok_callback()
215 except Exception
as err:
216 ExceptionLog.handler(logger, err,
"Exception:")
226 dataManagerEvent = self.
eventBuilder.build(dataManagerEventType, event.eventObj)
227 dataManagerEvent.uid = event.uid
230 desc = STEP_SEND_TO_TASKS_DATA_MANAGER
231 first =
TaskStep(ok_callback, err_callback, desc, event)
235 desc = STEP_ADD_TO_INTERNAL_STRUCTURES
236 second =
TaskStep(ok_callback, err_callback, desc, event)
238 newEvent = self.
eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK, event.eventObj)
239 newEvent.uid = event.uid
243 desc = STEP_SEND_TO_SCHEDULER
244 third =
TaskStep(ok_callback, err_callback, desc, event)
248 desc = STEP_UPDATE_STATE
249 four =
TaskStep(ok_callback, err_callback, desc, event)
254 ret = [first, second, third, four]
264 logger.debug(
"New task data id = " + str(event.eventObj.id) +
" type = " + str(
type(event.eventObj)))
265 newTask = event.eventObj
269 taskLog.state = EEResponseData.TASK_STATE_NEW_DATA_STORED
270 taskLog.cDate = datetime.now()
272 taskLog.name = newTask.name
273 taskLog.type = newTask.type
274 if "time_max" in newTask.session:
275 taskLog.pTimeMax = newTask.session[
"time_max"]
276 if newTask.autoCleanupFields
is not None:
277 if "TTL" in newTask.autoCleanupFields
and newTask.autoCleanupFields[
"TTL"]
is not None:
278 newTask.autoCleanupFields[
"TTL"] = newTask.autoCleanupFields[
"TTL"] + time.time()
279 taskLog.autoCleanupFields = pickle.dumps(newTask.autoCleanupFields)
280 if taskLog.deleteTaskId ==
None:
288 responseEvent.uid = event.uid
292 except DBIErr
as err:
293 logger.error(
">>> Some DBI error in TasksManager.addNewTaskData [" + str(err.message) +
"]")
295 responseEvent.uid = event.uid
305 newTask = event.eventObj
307 taskLog.state = EEResponseData.TASK_STATE_NEW_SCHEDULED
311 responseEvent.uid = event.uid
315 except DBIErr
as err:
316 logger.error(
">>> Some DBI error in TasksManager.finishNewTaskData [" + str(err.message) +
"]")
318 responseEvent.uid = event.uid
339 updateTask = event.eventObj
344 updateTask.files =
None 349 except TaskNoPresentErr
as err:
350 logger.error(err.message)
352 except Exception
as err:
353 ExceptionLog.handler(logger, err,
"Exception:")
355 responseEvent = self.
eventBuilder.build(EVENT_TYPES.UPDATE_TASK_RESPONSE, response)
356 self.
reply(event, responseEvent)
368 except DBIErr
as err:
369 logger.error(
">>> Some DBI error in TasksManager.onDeleteTask [" + str(err.message) +
"]")
371 if hasattr(dbiRecords,
'__iter__')
and len(dbiRecords) > 0
and dbiRecords[0]
is not None:
372 ret = dbiRecords[0].id
383 if event.eventObj.deleteTaskId == DeleteTask.GROUP_DELETE:
386 except DBIErr
as err:
387 logger.error(
">>> Some DBI error in TasksManager.onDeleteTask [" + str(err.message) +
"]")
390 if hasattr(dbiRecords,
'__iter__')
and len(dbiRecords) > 0
and dbiRecords[0]
is not None:
392 for record
in dbiRecords:
394 taskLog.id = record.id
397 delObject = copy.copy(event.eventObj)
398 delObject.deleteTaskId = taskLog.id
399 delObject.id = localDeleteObj.id
400 delEvent = self.
eventBuilder.build(EVENT_TYPES.DELETE_TASK, delObject)
408 responseEvent = self.
eventBuilder.build(EVENT_TYPES.DELETE_TASK_RESPONSE, response)
409 self.
reply(event, responseEvent)
419 deleteTask = event.eventObj
421 taskLog.id = deleteTask.deleteTaskId
424 if len(dbiRecords) > 0:
426 logger.debug(
"Update pendingTasks[] item " + str(event.uid))
428 operation_steps[0].ok_callback()
431 raise DBIErr(dbi_consts.DBI_SUCCESS_CODE + 1,
"Task id=" + str(taskLog.id) +
" is absent in taskBackLog")
433 raise DBIErr(dbi_consts.DBI_SUCCESS_CODE + 2,
"Task id=" + str(taskLog.id) +
" is deleted by other task")
434 except DBIErr
as err:
435 logger.error(
">>> Some DBI error in TasksManager.simpleDeleteTask [" + str(err.message) +
"]")
437 response.statuses.append(err.errCode)
438 except Exception
as err:
439 ExceptionLog.handler(logger, err,
"Exception:")
449 if not (hasattr(suspendedTasks,
'__iter__')
and len(suspendedTasks) > 0
and suspendedTasks[0]
is not None):
451 responseEvent = self.
eventBuilder.build(EVENT_TYPES.DELETE_TASK_RESPONSE, response)
453 logger.debug(
">>> Send Group delete back")
462 logger.debug(
"GetTaskStatus received: " + Utils.varDump(event))
465 getTaskStatus = event.eventObj
467 for taskId
in getTaskStatus.ids:
472 if (hasattr(lookBackTaskLogScheme,
'__iter__')
and len(lookBackTaskLogScheme) > 0):
475 results.append(taskManagerFields)
476 elif type(getTaskStatus.strategy)
is types.DictType
and GetTasksStatus.LOG_STRATEGY
in \
477 getTaskStatus.strategy
and getTaskStatus.strategy[GetTasksStatus.LOG_STRATEGY] == GetTasksStatus.CHECK_LOG_YES:
478 lookTaskLogScheme = self.
dbi.fetch(
TaskLogScheme(TaskLog),
"id=%s" % taskId)
479 if (hasattr(lookTaskLogScheme,
'__iter__')
and len(lookTaskLogScheme) > 0):
480 for foundTask
in lookTaskLogScheme:
483 results.append(taskManagerFields)
484 except DBIErr
as err:
485 logger.error(
">>> Some DBI error in TasksManager.onGetTaskStatus [" + str(err.message) +
"]")
487 responseEvent = self.
eventBuilder.build(EVENT_TYPES.GET_TASK_STATUS_RESPONSE, results)
488 self.
reply(event, responseEvent)
489 except Exception
as err:
490 ExceptionLog.handler(logger, err,
"Exception:")
498 fetchResultsCacheTask = event.eventObj
499 for taskId
in fetchResultsCacheTask.ids:
504 except TaskNoPresentErr
as err:
505 logger.error(err.message)
507 responseEvent = self.
eventBuilder.build(EVENT_TYPES.GET_TASK_STATUS_RESPONSE, response)
508 self.
reply(event, responseEvent)
509 except Exception
as err:
510 ExceptionLog.handler(logger, err,
"Exception:")
518 getTaskManagerFields = event.eventObj
519 taskId = getTaskManagerFields.id
523 if len(lookTaskLogScheme) > 0
and lookTaskLogScheme[0]
is not None:
526 responseEvent = self.
eventBuilder.build(EVENT_TYPES.GET_TASK_FIELDS_RESPONSE, taskManagerFields)
527 self.
reply(event, responseEvent)
528 except DBIErr
as err:
529 logger.error(
">>> Some DBI error in TasksManager.onGetTaskStatus [" + str(err.message) +
"]")
530 except Exception
as err:
531 ExceptionLog.handler(logger, err,
"Exception:")
539 updateTaskFields = event.eventObj
542 updateTaskFields.fields = self.
clearEmptyFields(updateTaskFields.fields, updateTaskFields.id)
546 if taskLog.state == EEResponseData.TASK_STATE_SET_ERROR
or taskLog.state == EEResponseData.TASK_STATE_CRASHED:
548 if taskLog.state == EEResponseData.TASK_STATE_DELETED
or taskLog.state == EEResponseData.TASK_STATE_TERMINATED:
549 logger.debug(
"Delete task is deleted, call cleanUpTask() [%s]", str(taskLog.state))
551 if "deleteTaskId" in updateTaskFields.fields
and updateTaskFields.fields[
"deleteTaskId"] !=
None and \
552 updateTaskFields.fields[
"deleteTaskId"] > 0:
554 taskLogTerm.id = int(updateTaskFields.fields[
"deleteTaskId"])
555 taskLogTerm.state = int(updateTaskFields.fields[
"deleteTaskState"])
556 taskLogBackSchemeTerm = self.
dbi.update(
TaskBackLogScheme(taskLogTerm),
"id=%s" % taskLogTerm.id)
557 if taskLogTerm.state == EEResponseData.TASK_STATE_DELETED
or \
558 taskLogTerm.state == EEResponseData.TASK_STATE_TERMINATED:
559 logger.debug(
"Task to delete is terminated, call cleanUpTask()")
562 logger.debug(
"Task to delete is not terminated, state: " + str(taskLogTerm.state))
564 logger.debug(
"Task to delete is None or Empty")
565 generalResponse.errorCode = DeleteTask.RESPONSE_CODE_DRCE_ERROR
566 generalResponse.errorMessage =
"EEManager returns empty deleteTaskId" 568 elif taskLog.state == EEResponseData.TASK_STATE_SET_ERROR:
569 logger.debug(
"Delete task by state=TASK_STATE_SET_ERROR, id = " + str(updateTaskFields.id))
573 logger.debug(
"State is: " + str(taskLog.state))
575 except DBIErr
as err:
576 logger.error(
">>> Some DBI error in TasksManager.onUpdateTaskField [" + str(err.message) +
"]")
577 generalResponse.errorCode = DeleteTask.RESPONSE_CODE_DBI_ERROR
578 generalResponse.errorMessage = (
"DBIError=%s" % str(err.message))
579 except Exception
as err:
580 ExceptionLog.handler(logger, err,
"Exception:")
581 generalResponse.errorCode = DeleteTask.RESPONSE_CODE_UNKNOWN_ERROR
582 generalResponse.errorMessage = (
"Unknown Error=%s" % str(err.message))
583 responseEvent = self.
eventBuilder.build(EVENT_TYPES.UPDATE_TASK_FIELDS_RESPONSE, generalResponse)
584 self.
reply(event, responseEvent)
592 logger.debug(
"onFetchTaskDataResponse income id = " + str(event.eventObj.id))
596 if event.eventObj !=
None:
597 if event.eventObj.strategy !=
None and Task.STRATEGY_RETRY
in event.eventObj.strategy:
599 logger.debug(
"onFetchTaskDataResponse tempBackTaskLogScheme len = " + str(len(tempBackTaskLogScheme)))
600 if type(tempBackTaskLogScheme) == types.ListType
and len(tempBackTaskLogScheme) > 0
and \
601 event.eventObj.strategy[Task.STRATEGY_RETRY] > (tempBackTaskLogScheme[0].tries + 1):
604 if tempBackTaskLogScheme[0].deleteTaskId != 0:
608 logger.debug(
"onFetchTaskDataResponse cleanup income id = " + str(event.eventObj.id))
610 tempBackTaskLogScheme.id = event.eventObj.id
614 logger.debug(
"onFetchTaskDataResponse rescheduler income id = " + str(event.eventObj.id))
618 operation_steps[0].ok_callback()
619 except DBIErr
as err:
620 logger.error(
">>> Some DBI error in TasksManager.onFetchTaskDataResponse [" + str(err.message) +
"]")
627 logger.debug(
"restoreTaskSteps id = " + str(taskId))
629 new_event = self.
eventBuilder.build(EVENT_TYPES.FETCH_TASK_DATA, fetchTaskData)
638 logger.debug(
">>> Start static recalculationg")
671 if taskBackLogScheme
is not None and len(taskBackLogScheme) > 0:
674 taskBackLogScheme[0].fDate = datetime.now()
676 if taskLogScheme.pTime
is not None:
679 customQuery =
"select count(*) from %s where id = '%s'" \
680 % (str(taskLogScheme.__tablename__), str(taskLogScheme.id))
681 logger.debug(
"!!! customQuery: %s", str(customQuery))
682 customResponse = self.
dbi.sqlCustom(customQuery)
683 logger.debug(
"!!! customResponse: %s",
varDump(customResponse))
684 if len(customResponse) > 0
and len(customResponse[0]) > 0
and int(customResponse[0][0]) > 0:
685 logger.debug(
"!!! taskId = %s already exist. Try delete from table.", str(taskLogScheme.id))
686 self.
dbi.delete(taskLogScheme,
"id=%s" % taskLogScheme.id)
688 self.
dbi.insert(taskLogScheme)
689 except DBIErr
as err:
690 logger.error(
">>> Some DBI error in TasksManager.cleanUpTask moving backlog/log " +
691 "operation [" + str(err.message) +
"]")
692 self.
dbi.delete(taskBackLogScheme[0],
"id=%s" % taskLogScheme.id)
694 logger.debug(
"Delete from tasksQueue[] item " + str(taskLogScheme.id))
698 except TaskNoPresentErr
as err:
699 logger.error(err.message)
700 except DBIErr
as err:
701 logger.error(
">>> Some DBI error in TasksManager.cleanUpTask [" + str(err.message) +
"]")
703 logger.error(
"The input taskBackLogScheme is None or empty: " + Utils.varDump(taskBackLogScheme))
716 new_event = self.
eventBuilder.build(EVENT_TYPES.DELETE_TASK_DATA, deleteTaskData)
719 new_event = self.
eventBuilder.build(EVENT_TYPES.DELETE_EE_DATA, deleteEEResponseData)
722 new_event = self.
eventBuilder.build(EVENT_TYPES.DELETE_TASK, deleteTask)
732 if hasattr(tempBackTaskLogScheme,
'__iter__')
and len(tempBackTaskLogScheme) > 0:
733 tempBackTaskLogScheme[0].tries += incr
734 tempBackTaskLogScheme[0].state = newState
735 self.
dbi.update(tempBackTaskLogScheme[0],
"id=%s" % localId)
737 logger.error(
"Error can't fetch record from TaskBackLog with task id = " + str(localId))
745 logger.debug(
"onTasksManagerGeneralResponse, event:" + str(event.uid) +
"\n" + Utils.varDump(event))
747 except Exception
as err:
748 ExceptionLog.handler(logger, err,
"Exception:")
755 generalResponse = event.eventObj
757 if generalResponse.errorCode == GeneralResponse.ERROR_OK:
760 logger.debug(
"Update pendingTasks[] item " + str(event.uid))
762 logger.debug(
"Insert pendingTasks[] item " + str(event.uid))
769 logger.debug(
"Delete 1 from pendingTasks[] item " + str(event.uid))
772 if self.
pendingTasks[event.uid].tasksteps[0].err_callback:
773 self.
pendingTasks[event.uid].tasksteps[0].err_callback()
775 logger.debug(
"Delete 2 from pendingTasks[] item " + str(event.uid))
778 logger.debug(
"processOperationStepr: pending event " + str(event.uid) +
" " + str(self.
pendingTasks))
787 if self.
pendingTasks[event.uid].responseEventType !=
None:
788 generalResponse = event.eventObj
800 replyEvent = self.
eventBuilder.build(EVENT_TYPES.FETCH_TASK_RESULTS_RESPONSE, event.eventObj)
802 logger.debug(
"Delete from fetchEvents[] item " + str(event.uid))
805 logger.error(
"onFetchResultResponse no such event.uid" + str(event.uid))
806 except Exception
as err:
807 ExceptionLog.handler(logger, err,
"Exception:")
811 def checkDBIState(self): 812 if self.dbi.getErrorCode() != dbi_consts.DBI_SUCCESS_CODE: 813 logger.debug("DBI error:" + str(self.dbi.getErrorCode()) + " : " + self.dbi.getErrorMsg()) 814 raise DBIErr(self.dbi.getErrorCode(), self.dbi.getErrorMsg()) 823 taskLog.id = taskObj.id
824 if "deleteTaskId" in taskObj.__dict__:
825 taskLog.deleteTaskId = taskObj.deleteTaskId
834 deleteEvent = self.
eventBuilder.build(EVENT_TYPES.DELETE_TASK_DATA, deleteTaskData)
835 logger.debug(
"Delete from tasksQueue[] item " + str(event.eventObj.id))
845 raise TaskNoPresentErr(
"The task is not present in tasksManager id=" + str(taskId))
854 attributes = [attr
for attr
in dir(taskLog)
if not attr.startswith(
'__')
and not attr.startswith(
'_')]
855 for attr
in attributes:
856 taskManagerFields.fields[attr] = getattr(taskLog, attr,
None)
857 return taskManagerFields
866 for attr
in [attr
for attr
in dir(taskLog)
if not attr.startswith(
'__')
and not attr.startswith(
'_')]:
868 setattr(taskLog, attr, fields[attr])
880 if lookBackTaskLogScheme !=
None and len(lookBackTaskLogScheme) > 0
and lookBackTaskLogScheme[0] !=
None:
881 if DTM_CONSTS.DRCE_FIELDS.HOST
in dir(lookBackTaskLogScheme[0])
and \
882 getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.HOST) !=
None and \
883 getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.HOST) !=
"":
884 ret[DTM_CONSTS.DRCE_FIELDS.HOST] =
None 885 logger.debug(str(DTM_CONSTS.DRCE_FIELDS.HOST) +
" is not empty, set to None")
886 if DTM_CONSTS.DRCE_FIELDS.PORT
in dir(lookBackTaskLogScheme[0])
and \
887 getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.PORT) !=
None and \
888 getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.PORT) !=
"":
889 ret[DTM_CONSTS.DRCE_FIELDS.PORT] =
None 890 logger.debug(str(DTM_CONSTS.DRCE_FIELDS.PORT) +
" is not empty, set to None")
893 if (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_IN_PROGRESS
and\
894 getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == EEResponseData.TASK_STATE_FINISHED)
or\
895 (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_NEW
and\
896 getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == EEResponseData.TASK_STATE_IN_PROGRESS)
or\
897 (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_FINISHED
and\
898 getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == \
899 EEResponseData.TASK_STATE_SCHEDULED_TO_DELETE)
or\
900 (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_IN_PROGRESS
and\
901 getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == \
902 EEResponseData.TASK_STATE_SCHEDULED_TO_DELETE)
or\
903 (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_NEW
and\
904 getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == EEResponseData.TASK_STATE_FINISHED):
907 logger.debug(
"Aborted update with wrong state value for task " + str(taskId) +
"\nfields to update: " + \
908 str(fields) +
"\nfields in db: " + Utils.varDump(lookBackTaskLogScheme[0].
_getTaskLog()))
909 except DBIErr
as err:
910 logger.error(
">>> Some DBI error in TasksManager.clearEmptyFields [" + str(err.message) +
"]")
917 logger.debug(
"Event: " + str(event))
924 logger.debug(
"Event: " + str(event))
934 logger.debug(
"Event: " + str(event))
946 SELECT_TEMPLATE_STR =
"SELECT %s from %s%s" 947 if event.eventObj.criterions
is not None:
950 if event.eventObj.fetchAdditionalFields:
952 clause = (SELECT_TEMPLATE_STR % (
'*', event.eventObj.tableName, additionWhere))
954 clause = (SELECT_TEMPLATE_STR % (
'`id`', event.eventObj.tableName, additionWhere))
956 logger.debug(
">>> Get tasks SQL == " + clause)
960 for record
in results:
961 if len(resultList) >= event.eventObj.fetchNum:
963 resultList.append(record.id)
964 if event.eventObj.fetchAdditionalFields:
965 tasksList.append(record._getTaskLog())
967 logger.debug(
"No tasks selected for auto check state")
968 except DBIErr
as err:
969 logger.error(
">>> Some DBI error in TasksManager.onFetchAvailableTasks [" + str(err.message) +
"]")
970 except Exception
as err:
971 ExceptionLog.handler(logger, err,
"Exception:")
973 responseEvent = self.
eventBuilder.build(EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, res)
974 self.
reply(event, responseEvent)
982 logger.debug(
">>> Start cleanup taskId=" + str(taskData.id))
986 if "DeleteType" in autoCleanupFields
and autoCleanupFields[
"DeleteType"]
is not None:
987 deleteObj.action = autoCleanupFields[
"DeleteType"]
988 if "DeleteRetries" in autoCleanupFields
and autoCleanupFields[
"DeleteRetries"]
is not None:
989 deleteObj.strategy[
"RETRY"] = autoCleanupFields[
"DeleteRetries"]
990 delEvent = self.
eventBuilder.build(EVENT_TYPES.DELETE_TASK, deleteObj)
992 logger.debug(
">>> Clear autocleanup field taskId=" + str(taskData.id))
993 taskData.autoCleanupFields =
None 994 self.
dbi.update(taskData,
"id=%s" % str(taskData.id))
995 except TaskNoPresentErr
as err:
996 logger.error(err.message)
997 except DBIErr
as err:
998 logger.error(
">>> Some DBI error in TasksManager.deleteOnCleanUp [" + str(err.message) +
"]")
1004 logger.debug(
">>> Start cleanup method id=" + str(taskId))
1007 if hasattr(suspendedTasks,
'__iter__'):
1008 for taskData
in suspendedTasks:
1009 if taskData
is not None and taskData.autoCleanupFields
is not None and str(taskData.autoCleanupFields) !=
"":
1010 autoCleanupFields = pickle.loads(str(taskData.autoCleanupFields))
1011 if type(autoCleanupFields) == types.DictType:
1012 if taskData.id == taskId
and "State" in autoCleanupFields
and autoCleanupFields[
"State"]
is not None and \
1013 autoCleanupFields[
"State"] == taskData.state:
1016 if "TTL" in autoCleanupFields
and autoCleanupFields[
"TTL"]
is not None and \
1017 autoCleanupFields[
"TTL"] < time.time():
1020 logger.debug(
">>> End cleanup method [Bad kvdb fetching]")
1021 except DBIErr
as err:
1022 logger.error(
">>> Some DBI error in TasksManager.checkCleanUp [" + str(err.message) +
"]")
1028 localTimestamp = time.time()
1036 logger.debug(
"Cleanup tables started")
1039 customQuery =
"DELETE FROM %s WHERE 1" % tableName
1041 self.
dbi.session.execute(
text(customQuery))
1042 self.
dbi.session.commit()
1044 logger.debug(
"Cleanup of '%s' successfully finished", tableName)
1046 except sqlalchemy.exc.SQLAlchemyError, err:
1047 self.
dbi.session.rollback()
1048 logger.error(
"Cleanup of '%s' failed. %s", tableName, str(err))
1049 except Exception, err:
1050 logger.error(
"Cleanup of '%s' failed. %s", tableName, str(err))
1052 logger.debug(
"Cleanup tables finished")
string VAR_TASKS_TIME_MIN
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def createDBIDict(self, configParser)
string AUTO_CLEANUP_TIME_SLOT_PERIOD
def simpleDeleteTask(self, event, responseEventType)
def createTaskManagerFields(self, taskLog)
string TASKS_DATA_MANAGER_CLIENT
def checkCleanUp(self, taskId=None)
def onDeleteTask(self, event)
def cleanUpTask(self, taskBackLogScheme)
def createOperationSteps(self, event, dataManagerEventType, onlyLastTwo=False)
def restoreTaskSteps(self, taskId)
def updateStatField(self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
update values of stat field - default sum
def createTaskLog(self, taskObj)
def cleanUpOnStart(self, isClearOnStart)
def onGetTaskStatus(self, event)
def sendGroupDeleteResponse(self)
def processSchedulerFailure(self, event)
GeneralResponse event object, represents general state response for multipurpose usage.
def createTaskLogFromDic(self, fields)
AvailableTaskIds event object, for return all available task id.
def generateCriterionSQL(criterions, additionWhere=None, siteId=None)
string POLL_TIMEOUT_CONFIG_VAR_NAME
def clearEmptyFields(self, fields, taskId)
def getStatDataFields(self, fields)
getStatDataFields returns stat data from storage
def replyGeneralResponse(self, event)
def newTaskRollback(self, event)
def cleanAfterDBIErr(self, event)
def finishNewTaskData(self, event)
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
Class describes structures of task item used in TaskManager.
def onTasksManagerGeneralResponse(self, event)
def addConnection(self, name, connection)
def getDeletedTask(self, taskLog)
DeleteTaskData event object, to delete task's data in the storage.
def checkTaskPresence(self, taskId)
DeleteTask event object, to delete task from DTM application and from EE.
string VAR_TASKS_TOTAL_DEL
def onFetchAvailableTasks(self, event)
This is app base class for management server connection end-points and parallel transport messages pr...
int STAT_FIELDS_OPERATION_SET
string VAR_TASKS_DELETE_TRIES
def updateTaskBackLogToSchedulerStep(self, localId, incr, newState)
def onFetchResultsCache(self, event)
def processOperationStep(self, event)
int STAT_FIELDS_OPERATION_ADD
string CONFIG_TIME_SLOT_PERIOD
def onGetTaskFields(self, event)
def send(self, connect_name, event)
send event
def deleteOnCleanUp(self, taskData, autoCleanupFields)
def onDeleteTaskResponse(self, event)
def onFetchResultResponse(self, event)
def cleanUpTaskNetworkOperation(self, taskId, delFromTDMData)
def addNewTaskData(self, event)
DeleteEEResponseData event object, to delete EE response data from the storage.
def onUpdateTask(self, event)
TaskManagerFields event object, for return task fields values.
FetchTaskData event object, to fetch task data from the storage.
def statFieldsRecalculate(self, taskLogScheme)
string VAR_TASKS_TIME_SUM
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
string VAR_TASKS_TIME_MAX
def processTasksDataManagerFailure(self, event)
def __init__(self, configParser, connectBuilderLight, pollerManager=None)
def onUpdateTaskField(self, event)
string VAR_TASKS_RETRIES_DEL
def on_poll_timeout(self)
def onNewTask(self, event)
string VAR_TASKS_TIME_AVG
string VAR_TASKS_TIME_COUNT
def __init__(self, message)
int STAT_FIELDS_OPERATION_INIT
def onFetchTaskDataResponse(self, event)