HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
TasksManager.py
Go to the documentation of this file.
1 '''
2 @package: dtm
3 @author igor, bgv
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 
11 import logging
12 from collections import namedtuple
13 from functools import partial
14 from datetime import datetime
15 import time
16 import copy
17 from app.BaseServerManager import BaseServerManager
18 from dbi.dbi import DBI
19 from dbi.dbi import DBIErr
20 from Constants import EVENT_TYPES, LOGGER_NAME
21 from TaskBackLogScheme import TaskBackLogScheme
22 from TaskLog import TaskLog
23 from TaskLogScheme import TaskLogScheme
24 from EventObjects import GeneralResponse, DeleteTask, TaskManagerFields, EEResponseData, FetchTaskData, Task
25 from EventObjects import DeleteTaskData, DeleteEEResponseData, AvailableTaskIds, GetTasksStatus
26 import dbi.Constants as dbi_consts
27 import app.SQLCriterions
28 import transport.Consts as consts
29 import dtm.Constants as DTM_CONSTS
30 import app.Utils as Utils # pylint: disable=F0401
31 import pickle
32 import types
33 from app.Utils import ExceptionLog
34 from app.Utils import varDump
35 import sqlalchemy
36 from sqlalchemy import text
37 
38 logger = logging.getLogger(LOGGER_NAME)
39 
40 # tuple describes one step of processing sequence
41 TaskStep = namedtuple("TaskStep", "ok_callback err_callback desc eventNewTask")
42 
43 # one processing sequence
44 TaskRecord = namedtuple("TaskRecord", "tasksteps event responseEventType")
45 
46 
47 
48 # #@todo remore const in suitable place
49 STEP_SEND_TO_TASKS_DATA_MANAGER = 1
50 STEP_ADD_TO_INTERNAL_STRUCTURES = 2
51 STEP_SEND_TO_SCHEDULER = 3
52 STEP_UPDATE_STATE = 4
53 
54 # #Class is used to inform about error of manipulating tasks
55 # which are not presented in TaskManager.tasksQueue
56 #
57 class TaskNoPresentErr(Exception):
58  ERR_CODE = 101
59 
60  def __init__(self, message):
61  Exception.__init__(self, message)
62 
63 
64 
65 # #It is a main object that manages tasks
66 #
68 
69  # Configuration settings options names
70  SERVER = "server"
71  TASKS_DATA_MANAGER_CLIENT = "clientTasksDataManager"
72  SCHEDULER_CLIENT = "clientScheduler"
73  CONFIG_TIME_SLOT_PERIOD = "timeSlotPeriod"
74  AUTO_CLEANUP_TIME_SLOT_PERIOD = "autoCleanUpSlotPeriod"
75 
76  VAR_TASKS_TOTAL = "tasks_total"
77  VAR_TASKS_TOTAL_DEL = "tasks_total_del"
78  VAR_TASKS_TIME_SUM = "tasks_time_sum"
79  VAR_TASKS_TIME_COUNT = "tasks_time_count"
80  VAR_TASKS_TIME_AVG = "tasks_time_avg"
81  VAR_TASKS_TIME_MIN = "tasks_time_min"
82  VAR_TASKS_TIME_MAX = "tasks_time_max"
83  VAR_TASKS_ERRORS = "tasks_errors"
84  VAR_TASKS_RETRIES = "tasks_retries"
85  VAR_TASKS_RETRIES_DEL = "tasks_retries_del"
86  VAR_TASKS_DELETE_TRIES = "tasks_delete_tries"
87 
88  # Tables list for cleanup
89  CLEANUP_TABLES_LIST = ['task_back_log_scheme',
90  'tasks_data_table',
91  'scheduler_task_scheme',
92  'ee_responses_table',
93  'resources_table']
94 
95  # #constructor
96  # initialise all connections and event handlers
97  #
98  def __init__(self, configParser, connectBuilderLight, pollerManager=None):
99  super(TasksManager, self).__init__(pollerManager)
100 
101  self.cfg_section = self.__class__.__name__
102 
104  serverAddr = configParser.get(self.cfg_section, self.SERVER)
105  tasksDataManagerAddr = configParser.get(self.cfg_section, self.TASKS_DATA_MANAGER_CLIENT)
106  schedulerAddr = configParser.get(self.cfg_section, self.SCHEDULER_CLIENT)
107 
108  serverConnection = connectBuilderLight.build(consts.SERVER_CONNECT, serverAddr)
109  tasksDataManagerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, tasksDataManagerAddr)
110  schedulerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, schedulerAddr)
111 
112  self.addConnection(self.SERVER, serverConnection)
113  self.addConnection(self.TASKS_DATA_MANAGER_CLIENT, tasksDataManagerConnection)
114  self.addConnection(self.SCHEDULER_CLIENT, schedulerConnection)
115 
116  # server events
117  self.setEventHandler(EVENT_TYPES.NEW_TASK, self.onNewTask)
118  self.setEventHandler(EVENT_TYPES.UPDATE_TASK, self.onUpdateTask)
119  self.setEventHandler(EVENT_TYPES.GET_TASK_STATUS, self.onGetTaskStatus)
120  self.setEventHandler(EVENT_TYPES.FETCH_RESULTS_CACHE, self.onFetchResultsCache)
121  self.setEventHandler(EVENT_TYPES.DELETE_TASK, self.onDeleteTask)
122  self.setEventHandler(EVENT_TYPES.GET_TASK_FIELDS, self.onGetTaskFields)
123  self.setEventHandler(EVENT_TYPES.UPDATE_TASK_FIELDS, self.onUpdateTaskField)
124  self.setEventHandler(EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS, self.onFetchAvailableTasks)
125  # tasksDataManager, Scheduler events - new,update, delete
126  self.setEventHandler(EVENT_TYPES.NEW_TASK_RESPONSE, self.onTasksManagerGeneralResponse)
127  self.setEventHandler(EVENT_TYPES.SCHEDULE_TASK_RESPONSE, self.onTasksManagerGeneralResponse)
128  self.setEventHandler(EVENT_TYPES.UPDATE_TASK_RESPONSE, self.onTasksManagerGeneralResponse)
129  self.setEventHandler(EVENT_TYPES.FETCH_TASK_RESULTS_RESPONSE, self.onFetchResultResponse)
130  self.setEventHandler(EVENT_TYPES.DELETE_TASK_RESPONSE, self.onDeleteTaskResponse)
131  self.setEventHandler(EVENT_TYPES.DELETE_EE_DATA_RESPONSE, self.onDeleteTaskResponse)
132  self.setEventHandler(EVENT_TYPES.DELETE_TASK_DATA_RESPONSE, self.onDeleteTaskResponse)
133  self.setEventHandler(EVENT_TYPES.FETCH_TASK_DATA_RESPONSE, self.onFetchTaskDataResponse)
134 
135 
136  # #@var tasksQueue
137  # map task.id => task without files field
138  self.tasksQueue = {}
139 
140  # #@var pendingTasks
141  # contains all pending tasks in task manager (operation steps)
142  # map event.uid => list of operations steps
143  self.pendingTasks = {}
144 
145  # #@var dbi
146  # db contains two tables log and backlog
147  self.dbi = DBI(self.createDBIDict(configParser))
148 
149  # #@var fetchEvents
150  # map event.uid => event
151  self.fetchEvents = {}
152 
153  isClearOnStart = configParser.get(self.cfg_section, DTM_CONSTS.CLEAR_ON_START)
154  self.cleanUpOnStart(isClearOnStart)
155 
156  # get time slot period
157  self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] = configParser.getint(self.cfg_section,
159  if self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] is not None:
161  self.cleanUpTimeout = configParser.getint(self.cfg_section, self.AUTO_CLEANUP_TIME_SLOT_PERIOD)
174 
175 
176  # #cleanUpOnStart method cleanups tasks queue before start
177  #
178  # @isClearOnStart param - make cleanup or not
179  def cleanUpOnStart(self, isClearOnStart):
180  if isClearOnStart == "True":
181  try:
182  suspendedTasks = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=id")
183  if hasattr(suspendedTasks, '__iter__') and len(suspendedTasks) > 0 and suspendedTasks[0] is not None:
184  for task in suspendedTasks:
185  logger.debug(">>> Start suspend task to delete id=%s", task.id)
186  task.state = EEResponseData.TASK_STATE_FINISHED
187  task.deleteTaskId = 0
188  self.dbi.update(task, "id=%s" % str(task.id))
189 
190  for task in suspendedTasks:
191  deleteObj = DeleteTask(task.id)
192  delEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK, deleteObj)
193  self.onDeleteTask(delEvent)
194  except DBIErr as err:
195  logger.error(">>> Some DBI error in TasksManager.cleanUpOnStart [" + str(err.message) + "]")
196 
197 
198  # #onNewTask event handler
199  #
200  # @param event instance of Event object
201  def onNewTask(self, event):
202  try:
203  operation_steps = self.createOperationSteps(event, EVENT_TYPES.NEW_TASK)
204  logger.debug("Insert pendingTasks[] item " + str(event.uid))
205  logger.debug("New Task event " + str(event.eventObj.id))
206  dbiRecords = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=%s" % str(event.eventObj.id))
207  if hasattr(dbiRecords, '__iter__') and len(dbiRecords) > 0 and dbiRecords[0] is not None:
208  logger.debug("Task Id already exist")
209  responseEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK_RESPONSE, GeneralResponse())
210  responseEvent.uid = event.uid
211  self.reply(event, responseEvent)
212  else:
213  self.pendingTasks[event.uid] = TaskRecord(operation_steps, event, EVENT_TYPES.NEW_TASK_RESPONSE)
214  operation_steps[0].ok_callback()
215  except Exception as err:
216  ExceptionLog.handler(logger, err, "Exception:")
217  self.checkCleanUp()
218 
219 
220  # #creates operation steps for addition new task
221  #
222  # @param event instance of Event object
223  # @return list of TaskStep objects
224  def createOperationSteps(self, event, dataManagerEventType, onlyLastTwo=False):
225  if not onlyLastTwo:
226  dataManagerEvent = self.eventBuilder.build(dataManagerEventType, event.eventObj)
227  dataManagerEvent.uid = event.uid
228  ok_callback = partial(self.send, self.TASKS_DATA_MANAGER_CLIENT, dataManagerEvent)
229  err_callback = None
230  desc = STEP_SEND_TO_TASKS_DATA_MANAGER
231  first = TaskStep(ok_callback, err_callback, desc, event)
232 
233  ok_callback = partial(self.addNewTaskData, event)
234  err_callback = partial(self.cleanAfterDBIErr, event)
235  desc = STEP_ADD_TO_INTERNAL_STRUCTURES
236  second = TaskStep(ok_callback, err_callback, desc, event)
237 
238  newEvent = self.eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK, event.eventObj)
239  newEvent.uid = event.uid
240 
241  ok_callback = partial(self.send, self.SCHEDULER_CLIENT, newEvent)
242  err_callback = partial(self.newTaskRollback, event)
243  desc = STEP_SEND_TO_SCHEDULER
244  third = TaskStep(ok_callback, err_callback, desc, event)
245 
246  ok_callback = partial(self.finishNewTaskData, event)
247  err_callback = partial(self.newTaskRollback, event)
248  desc = STEP_UPDATE_STATE
249  four = TaskStep(ok_callback, err_callback, desc, event)
250 
251  if onlyLastTwo:
252  ret = [third, four]
253  else:
254  ret = [first, second, third, four]
255  return ret
256 
257 
258 
259  # #creates new task procesing sequence
260  #
261  # @param event instance of Event object
262  def addNewTaskData(self, event):
263  try:
264  logger.debug("New task data id = " + str(event.eventObj.id) + " type = " + str(type(event.eventObj)))
265  newTask = event.eventObj
266  newTask.files = None
267  self.tasksQueue[newTask.id] = newTask
268  taskLog = self.createTaskLog(newTask)
269  taskLog.state = EEResponseData.TASK_STATE_NEW_DATA_STORED
270  taskLog.cDate = datetime.now()
271  taskLog.tries = 0
272  taskLog.name = newTask.name
273  taskLog.type = newTask.type
274  if "time_max" in newTask.session:
275  taskLog.pTimeMax = newTask.session["time_max"]
276  if newTask.autoCleanupFields is not None:
277  if "TTL" in newTask.autoCleanupFields and newTask.autoCleanupFields["TTL"] is not None:
278  newTask.autoCleanupFields["TTL"] = newTask.autoCleanupFields["TTL"] + time.time()
279  taskLog.autoCleanupFields = pickle.dumps(newTask.autoCleanupFields)
280  if taskLog.deleteTaskId == None:
282  else:
284  self.dbi.insert(TaskBackLogScheme(taskLog))
285 
286  # go to the next step
287  responseEvent = self.eventBuilder.build(EVENT_TYPES.GENERAL_RESPONSE, GeneralResponse())
288  responseEvent.uid = event.uid
289 
290  self.processOperationStep(responseEvent)
291 
292  except DBIErr as err:
293  logger.error(">>> Some DBI error in TasksManager.addNewTaskData [" + str(err.message) + "]")
294  responseEvent = self.eventBuilder.build(EVENT_TYPES.GENERAL_RESPONSE, GeneralResponse(err.errCode, err.message))
295  responseEvent.uid = event.uid
296  self.processOperationStep(responseEvent)
297 
298 
299 
300  # #finished processing new task sequence
301  #
302  # @param event instance of Event object
303  def finishNewTaskData(self, event):
304  try:
305  newTask = event.eventObj
306  taskLog = self.createTaskLog(newTask)
307  taskLog.state = EEResponseData.TASK_STATE_NEW_SCHEDULED
308  self.dbi.update(TaskBackLogScheme(taskLog), "id = %s" % newTask.id)
309 
310  responseEvent = self.eventBuilder.build(EVENT_TYPES.GENERAL_RESPONSE, GeneralResponse())
311  responseEvent.uid = event.uid
312 
313  self.processOperationStep(responseEvent)
314 
315  except DBIErr as err:
316  logger.error(">>> Some DBI error in TasksManager.finishNewTaskData [" + str(err.message) + "]")
317  responseEvent = self.eventBuilder.build(EVENT_TYPES.GENERAL_RESPONSE, GeneralResponse(err.errCode, err.message))
318  responseEvent.uid = event.uid
319  self.processOperationStep(responseEvent)
320 
321 
322 
323  # #rollback all changes after scheduler error
324  #
325  # @param event instance of Event object
326  def newTaskRollback(self, event):
327  self.processSchedulerFailure(event)
328  # #@todo disccuss rescheduling strategy
329  self.cleanAfterDBIErr(event)
330 
331 
332 
333  # #onUpdateTask event handler
334  #
335  # @param event instance of Event object
336  def onUpdateTask(self, event):
337  try:
338  response = GeneralResponse()
339  updateTask = event.eventObj
340  self.checkTaskPresence(updateTask.id)
341 
342  # don't need to update backlog?? because we use only Task.id
343  self.send(self.TASKS_DATA_MANAGER_CLIENT, event)
344  updateTask.files = None
345  self.tasksQueue[updateTask.id] = updateTask
346  schedulerEvent = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK, self.tasksQueue[updateTask.id])
347  self.send(self.SCHEDULER_CLIENT, schedulerEvent)
348 
349  except TaskNoPresentErr as err:
350  logger.error(err.message)
351  response = GeneralResponse(TaskNoPresentErr.ERR_CODE, err.message)
352  except Exception as err:
353  ExceptionLog.handler(logger, err, "Exception:")
354  finally:
355  responseEvent = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK_RESPONSE, response)
356  self.reply(event, responseEvent)
357 
358 
359 
360  # #getDeletedTask checks deleted task in kvdb storage
361  #
362  # @param taskLog - incoming deleted task taskLog
363  # return exist deleteTask id or none
364  def getDeletedTask(self, taskLog):
365  ret = None
366  try:
367  dbiRecords = self.dbi.fetch(TaskBackLogScheme(TaskLog), "deleteTaskId=%s" % taskLog.id)
368  except DBIErr as err:
369  logger.error(">>> Some DBI error in TasksManager.onDeleteTask [" + str(err.message) + "]")
370  else:
371  if hasattr(dbiRecords, '__iter__') and len(dbiRecords) > 0 and dbiRecords[0] is not None:
372  ret = dbiRecords[0].id
373  return ret
374 
375 
376 
377  # #onDeleteTask event handler
378  #
379  # @param event instance of Event object
380  def onDeleteTask(self, event):
381  response = None
382  # branch for multi-deleting
383  if event.eventObj.deleteTaskId == DeleteTask.GROUP_DELETE:
384  try:
385  dbiRecords = self.dbi.fetch(TaskBackLogScheme(TaskLog), "deleteTaskId=0")
386  except DBIErr as err:
387  logger.error(">>> Some DBI error in TasksManager.onDeleteTask [" + str(err.message) + "]")
388  response = GeneralResponse()
389  else:
390  if hasattr(dbiRecords, '__iter__') and len(dbiRecords) > 0 and dbiRecords[0] is not None:
391  self.groupDeleteResponseEvent = event
392  for record in dbiRecords:
393  taskLog = self.createTaskLog(event.eventObj)
394  taskLog.id = record.id
395  if self.getDeletedTask(taskLog) is None:
396  localDeleteObj = DeleteTask(taskLog.id)
397  delObject = copy.copy(event.eventObj)
398  delObject.deleteTaskId = taskLog.id
399  delObject.id = localDeleteObj.id
400  delEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK, delObject)
401  self.simpleDeleteTask(delEvent, None)
402  else:
403  response = GeneralResponse()
404  # branch for simple-deleting
405  else:
406  response = self.simpleDeleteTask(event, EVENT_TYPES.DELETE_TASK_RESPONSE)
407  if response != None:
408  responseEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK_RESPONSE, response)
409  self.reply(event, responseEvent)
410 
411 
412 
413  # #makes delete Task operation for simple task
414  #
415  # @param event instance of Event object
416  def simpleDeleteTask(self, event, responseEventType):
417  response = None
418  try:
419  deleteTask = event.eventObj
420  taskLog = self.createTaskLog(deleteTask)
421  taskLog.id = deleteTask.deleteTaskId
422  if self.getDeletedTask(taskLog) is None:
423  dbiRecords = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=%s" % taskLog.id)
424  if len(dbiRecords) > 0:
425  operation_steps = self.createOperationSteps(event, EVENT_TYPES.NEW_TASK)
426  logger.debug("Update pendingTasks[] item " + str(event.uid))
427  self.pendingTasks[event.uid] = TaskRecord(operation_steps, event, responseEventType)
428  operation_steps[0].ok_callback()
430  else:
431  raise DBIErr(dbi_consts.DBI_SUCCESS_CODE + 1, "Task id=" + str(taskLog.id) + " is absent in taskBackLog")
432  else:
433  raise DBIErr(dbi_consts.DBI_SUCCESS_CODE + 2, "Task id=" + str(taskLog.id) + " is deleted by other task")
434  except DBIErr as err:
435  logger.error(">>> Some DBI error in TasksManager.simpleDeleteTask [" + str(err.message) + "]")
436  response = GeneralResponse(GeneralResponse.ERROR_OK, err.message)
437  response.statuses.append(err.errCode)
438  except Exception as err:
439  ExceptionLog.handler(logger, err, "Exception:")
440  return response
441 
442 
443 
444  # #sendGroupDeleteResponse looks records in kvdb and sends back response on group delete operation
445  #
447  if self.groupDeleteResponseEvent is not None:
448  suspendedTasks = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=id")
449  if not (hasattr(suspendedTasks, '__iter__') and len(suspendedTasks) > 0 and suspendedTasks[0] is not None):
450  response = GeneralResponse()
451  responseEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK_RESPONSE, response)
452  self.reply(self.groupDeleteResponseEvent, responseEvent)
453  logger.debug(">>> Send Group delete back")
454  self.groupDeleteResponseEvent = None
455 
456 
457 
458  # #onGetTaskStatus event handler
459  #
460  # @param event instance of Event object
461  def onGetTaskStatus(self, event):
462  logger.debug("GetTaskStatus received: " + Utils.varDump(event))
463  try:
464  # #as discuss don't check states
465  getTaskStatus = event.eventObj
466  results = list()
467  for taskId in getTaskStatus.ids:
468  try:
469  lookBackTaskLogScheme = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=%s" % taskId)
470  # #always return list
471  taskManagerFields = TaskManagerFields([taskId])
472  if (hasattr(lookBackTaskLogScheme, '__iter__') and len(lookBackTaskLogScheme) > 0):
473  # pylint: disable-msg=W0212
474  taskManagerFields = self.createTaskManagerFields(lookBackTaskLogScheme[0]._getTaskLog())
475  results.append(taskManagerFields)
476  elif type(getTaskStatus.strategy) is types.DictType and GetTasksStatus.LOG_STRATEGY in \
477  getTaskStatus.strategy and getTaskStatus.strategy[GetTasksStatus.LOG_STRATEGY] == GetTasksStatus.CHECK_LOG_YES:
478  lookTaskLogScheme = self.dbi.fetch(TaskLogScheme(TaskLog), "id=%s" % taskId)
479  if (hasattr(lookTaskLogScheme, '__iter__') and len(lookTaskLogScheme) > 0):
480  for foundTask in lookTaskLogScheme:
481  # pylint: disable-msg=W0212
482  taskManagerFields = self.createTaskManagerFields(foundTask._getTaskLog())
483  results.append(taskManagerFields)
484  except DBIErr as err:
485  logger.error(">>> Some DBI error in TasksManager.onGetTaskStatus [" + str(err.message) + "]")
486  results.append(None)
487  responseEvent = self.eventBuilder.build(EVENT_TYPES.GET_TASK_STATUS_RESPONSE, results)
488  self.reply(event, responseEvent)
489  except Exception as err:
490  ExceptionLog.handler(logger, err, "Exception:")
491 
492 
493  # #onFetchResultsCache event handler
494  #
495  # @param event instance of Event object
496  def onFetchResultsCache(self, event):
497  try:
498  fetchResultsCacheTask = event.eventObj
499  for taskId in fetchResultsCacheTask.ids:
500  self.checkTaskPresence(taskId)
501  # store events for reply
502  self.fetchEvents[event.uid] = event
503  self.send(self.TASKS_DATA_MANAGER_CLIENT, event)
504  except TaskNoPresentErr as err:
505  logger.error(err.message)
506  response = GeneralResponse(TaskNoPresentErr.ERR_CODE, err.message)
507  responseEvent = self.eventBuilder.build(EVENT_TYPES.GET_TASK_STATUS_RESPONSE, response)
508  self.reply(event, responseEvent)
509  except Exception as err:
510  ExceptionLog.handler(logger, err, "Exception:")
511 
512 
513  # #onGetTaskFields event handler
514  #
515  # @param event instance of Event object
516  def onGetTaskFields(self, event):
517  try:
518  getTaskManagerFields = event.eventObj
519  taskId = getTaskManagerFields.id
520 
521  taskManagerFields = TaskManagerFields([taskId])
522  lookTaskLogScheme = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=%s" % taskId)
523  if len(lookTaskLogScheme) > 0 and lookTaskLogScheme[0] is not None:
524  # pylint: disable-msg=W0212
525  taskManagerFields = self.createTaskManagerFields(lookTaskLogScheme[0]._getTaskLog())
526  responseEvent = self.eventBuilder.build(EVENT_TYPES.GET_TASK_FIELDS_RESPONSE, taskManagerFields)
527  self.reply(event, responseEvent)
528  except DBIErr as err:
529  logger.error(">>> Some DBI error in TasksManager.onGetTaskStatus [" + str(err.message) + "]")
530  except Exception as err:
531  ExceptionLog.handler(logger, err, "Exception:")
532 
533 
534  # #onUpdateTaskField event handler
535  #
536  # @param event instance of Event object
537  def onUpdateTaskField(self, event):
538  generalResponse = GeneralResponse()
539  updateTaskFields = event.eventObj
540  try:
541  # Removing "host" and "port" fields from updateTaskFields.fields and check is state value to update valid
542  updateTaskFields.fields = self.clearEmptyFields(updateTaskFields.fields, updateTaskFields.id)
543 
544  taskLog = self.createTaskLogFromDic(updateTaskFields.fields)
545  taskLogBackScheme = self.dbi.update(TaskBackLogScheme(taskLog), "id=%s" % updateTaskFields.id)
546  if taskLog.state == EEResponseData.TASK_STATE_SET_ERROR or taskLog.state == EEResponseData.TASK_STATE_CRASHED:
548  if taskLog.state == EEResponseData.TASK_STATE_DELETED or taskLog.state == EEResponseData.TASK_STATE_TERMINATED:
549  logger.debug("Delete task is deleted, call cleanUpTask() [%s]", str(taskLog.state))
550  self.cleanUpTask(taskLogBackScheme)
551  if "deleteTaskId" in updateTaskFields.fields and updateTaskFields.fields["deleteTaskId"] != None and \
552  updateTaskFields.fields["deleteTaskId"] > 0:
553  taskLogTerm = TaskLog()
554  taskLogTerm.id = int(updateTaskFields.fields["deleteTaskId"])
555  taskLogTerm.state = int(updateTaskFields.fields["deleteTaskState"])
556  taskLogBackSchemeTerm = self.dbi.update(TaskBackLogScheme(taskLogTerm), "id=%s" % taskLogTerm.id)
557  if taskLogTerm.state == EEResponseData.TASK_STATE_DELETED or \
558  taskLogTerm.state == EEResponseData.TASK_STATE_TERMINATED:
559  logger.debug("Task to delete is terminated, call cleanUpTask()")
560  self.cleanUpTask(taskLogBackSchemeTerm)
561  else:
562  logger.debug("Task to delete is not terminated, state: " + str(taskLogTerm.state))
563  else:
564  logger.debug("Task to delete is None or Empty")
565  generalResponse.errorCode = DeleteTask.RESPONSE_CODE_DRCE_ERROR
566  generalResponse.errorMessage = "EEManager returns empty deleteTaskId"
568  elif taskLog.state == EEResponseData.TASK_STATE_SET_ERROR:
569  logger.debug("Delete task by state=TASK_STATE_SET_ERROR, id = " + str(updateTaskFields.id))
570  self.cleanUpTaskNetworkOperation(updateTaskFields.id, False)
571  self.restoreTaskSteps(updateTaskFields.id)
572  else:
573  logger.debug("State is: " + str(taskLog.state))
574  self.checkCleanUp(updateTaskFields.id)
575  except DBIErr as err:
576  logger.error(">>> Some DBI error in TasksManager.onUpdateTaskField [" + str(err.message) + "]")
577  generalResponse.errorCode = DeleteTask.RESPONSE_CODE_DBI_ERROR
578  generalResponse.errorMessage = ("DBIError=%s" % str(err.message))
579  except Exception as err:
580  ExceptionLog.handler(logger, err, "Exception:")
581  generalResponse.errorCode = DeleteTask.RESPONSE_CODE_UNKNOWN_ERROR
582  generalResponse.errorMessage = ("Unknown Error=%s" % str(err.message))
583  responseEvent = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK_FIELDS_RESPONSE, generalResponse)
584  self.reply(event, responseEvent)
585 
586 
587 
588  # #onFetchTaskDataResponse callback, receive response from TDM
589  #
590  # @param event - incoming event
591  def onFetchTaskDataResponse(self, event):
592  logger.debug("onFetchTaskDataResponse income id = " + str(event.eventObj.id))
593  taskLog = TaskLog()
594  isCleanUp = True
595  try:
596  if event.eventObj != None:
597  if event.eventObj.strategy != None and Task.STRATEGY_RETRY in event.eventObj.strategy:
598  tempBackTaskLogScheme = self.dbi.fetch(TaskBackLogScheme(taskLog), "id=%s" % event.eventObj.id)
599  logger.debug("onFetchTaskDataResponse tempBackTaskLogScheme len = " + str(len(tempBackTaskLogScheme)))
600  if type(tempBackTaskLogScheme) == types.ListType and len(tempBackTaskLogScheme) > 0 and \
601  event.eventObj.strategy[Task.STRATEGY_RETRY] > (tempBackTaskLogScheme[0].tries + 1):
602  isCleanUp = False
604  if tempBackTaskLogScheme[0].deleteTaskId != 0:
607  if isCleanUp:
608  logger.debug("onFetchTaskDataResponse cleanup income id = " + str(event.eventObj.id))
609  tempBackTaskLogScheme = TaskBackLogScheme(TaskLog())
610  tempBackTaskLogScheme.id = event.eventObj.id
611  self.cleanUpTask([tempBackTaskLogScheme])
612  # self.updateTaskBackLogToSchedulerStep(event.eventObj.id, 1, EEResponseData.TASK_STATE_SET_ERROR)
613  else:
614  logger.debug("onFetchTaskDataResponse rescheduler income id = " + str(event.eventObj.id))
615  self.updateTaskBackLogToSchedulerStep(event.eventObj.id, 1, EEResponseData.TASK_STATE_NEW_DATA_STORED)
616  operation_steps = self.createOperationSteps(event, None, True)
617  self.pendingTasks[event.uid] = TaskRecord(operation_steps, None, None)
618  operation_steps[0].ok_callback()
619  except DBIErr as err:
620  logger.error(">>> Some DBI error in TasksManager.onFetchTaskDataResponse [" + str(err.message) + "]")
621 
622 
623  # #restoreTaskSteps send FETCH_TASK_DATA request to the TDM
624  #
625  # @param taskId - task Id
626  def restoreTaskSteps(self, taskId):
627  logger.debug("restoreTaskSteps id = " + str(taskId))
628  fetchTaskData = FetchTaskData(taskId)
629  new_event = self.eventBuilder.build(EVENT_TYPES.FETCH_TASK_DATA, fetchTaskData)
630  self.send(self.TASKS_DATA_MANAGER_CLIENT, new_event)
631 
632 
633  # #statFieldsRecalculate method recalculates value for :"tasks_time_avg", "tasks_time_sum",
634  # "tasks_time_count", "tasks_time_min", "tasks_time_max" statistic fields
635  #
636  # @param taskLogScheme instance of TaskLogScheme object
637  def statFieldsRecalculate(self, taskLogScheme):
638  logger.debug(">>> Start static recalculationg")
639  localDict = {}
640  localDict[self.VAR_TASKS_TIME_MIN] = None
641  localDict = self.getStatDataFields(localDict)
642  if localDict is not None and self.VAR_TASKS_TIME_MIN in localDict and localDict[self.VAR_TASKS_TIME_MIN] \
643  is not None:
644  if taskLogScheme.pTime < localDict[self.VAR_TASKS_TIME_MIN] or localDict[self.VAR_TASKS_TIME_MIN] == 0:
645  self.updateStatField(self.VAR_TASKS_TIME_MIN, taskLogScheme.pTime, self.STAT_FIELDS_OPERATION_SET)
646  localDict = {}
647  localDict[self.VAR_TASKS_TIME_MAX] = None
648  localDict = self.getStatDataFields(localDict)
649  if localDict is not None and self.VAR_TASKS_TIME_MAX in localDict and localDict[self.VAR_TASKS_TIME_MAX] \
650  is not None:
651  if taskLogScheme.pTime > localDict[self.VAR_TASKS_TIME_MAX]:
652  self.updateStatField(self.VAR_TASKS_TIME_MAX, taskLogScheme.pTime, self.STAT_FIELDS_OPERATION_SET)
653 
654  self.updateStatField(self.VAR_TASKS_TIME_SUM, taskLogScheme.pTime, self.STAT_FIELDS_OPERATION_ADD)
656  localDict = {}
657  localDict[self.VAR_TASKS_TIME_SUM] = None
658  localDict[self.VAR_TASKS_TIME_COUNT] = None
659 
660  localDict = self.getStatDataFields(localDict)
661  if localDict is not None and self.VAR_TASKS_TIME_SUM in localDict and localDict[self.VAR_TASKS_TIME_SUM] is not \
662  None and self.VAR_TASKS_TIME_COUNT in localDict and localDict[self.VAR_TASKS_TIME_COUNT] is not None:
664  localDict[self.VAR_TASKS_TIME_SUM] / localDict[self.VAR_TASKS_TIME_COUNT], self.STAT_FIELDS_OPERATION_SET)
665 
666 
667  # #made all necessary manipulations to move to no active state
668  #
669  # @param taskBackLogScheme instance of TaskBackLogScheme object
670  def cleanUpTask(self, taskBackLogScheme):
671  if taskBackLogScheme is not None and len(taskBackLogScheme) > 0:
672  # pylint: disable-msg=W0212
673  try:
674  taskBackLogScheme[0].fDate = datetime.now()
675  taskLogScheme = TaskLogScheme(taskBackLogScheme[0]._getTaskLog())
676  if taskLogScheme.pTime is not None:
677  self.statFieldsRecalculate(taskLogScheme)
678  try:
679  customQuery = "select count(*) from %s where id = '%s'" \
680  % (str(taskLogScheme.__tablename__), str(taskLogScheme.id))
681  logger.debug("!!! customQuery: %s", str(customQuery))
682  customResponse = self.dbi.sqlCustom(customQuery)
683  logger.debug("!!! customResponse: %s", varDump(customResponse))
684  if len(customResponse) > 0 and len(customResponse[0]) > 0 and int(customResponse[0][0]) > 0:
685  logger.debug("!!! taskId = %s already exist. Try delete from table.", str(taskLogScheme.id))
686  self.dbi.delete(taskLogScheme, "id=%s" % taskLogScheme.id)
687 
688  self.dbi.insert(taskLogScheme)
689  except DBIErr as err:
690  logger.error(">>> Some DBI error in TasksManager.cleanUpTask moving backlog/log " +
691  "operation [" + str(err.message) + "]")
692  self.dbi.delete(taskBackLogScheme[0], "id=%s" % taskLogScheme.id)
693  # Delete task from queue
694  logger.debug("Delete from tasksQueue[] item " + str(taskLogScheme.id))
695  self.cleanUpTaskNetworkOperation(taskLogScheme.id, True)
696  self.checkTaskPresence(taskLogScheme.id)
697  del self.tasksQueue[taskLogScheme.id]
698  except TaskNoPresentErr as err:
699  logger.error(err.message)
700  except DBIErr as err:
701  logger.error(">>> Some DBI error in TasksManager.cleanUpTask [" + str(err.message) + "]")
702  else:
703  logger.error("The input taskBackLogScheme is None or empty: " + Utils.varDump(taskBackLogScheme))
704 
705 
706  # #method cleanups tasks data in the paraller threads/modules
707  #
708  # @param taskId - task's id
709  # @param delFromTDMData - bool, make TDM data deleting or not
710  def cleanUpTaskNetworkOperation(self, taskId, delFromTDMData):
711  deleteTask = DeleteTask(0, taskId)
712  if delFromTDMData:
713  deleteTaskData = DeleteTaskData(taskId)
714  deleteEEResponseData = DeleteEEResponseData(taskId)
715  if delFromTDMData:
716  new_event = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK_DATA, deleteTaskData)
717  self.send(self.TASKS_DATA_MANAGER_CLIENT, new_event)
718  # Delete EE Data
719  new_event = self.eventBuilder.build(EVENT_TYPES.DELETE_EE_DATA, deleteEEResponseData)
720  self.send(self.TASKS_DATA_MANAGER_CLIENT, new_event)
721  # Delete task
722  new_event = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK, deleteTask)
723  self.send(self.SCHEDULER_CLIENT, new_event)
724 
725 
726  # #tempBackTaskLogScheme method updates 2 fields (tries, state) in BackTaskLog table
727  #
728  # @param taskLog incoming taskLog object
729  def updateTaskBackLogToSchedulerStep(self, localId, incr, newState):
730  taskLog = TaskLog()
731  tempBackTaskLogScheme = self.dbi.fetch(TaskBackLogScheme(taskLog), "id=%s" % localId)
732  if hasattr(tempBackTaskLogScheme, '__iter__') and len(tempBackTaskLogScheme) > 0:
733  tempBackTaskLogScheme[0].tries += incr
734  tempBackTaskLogScheme[0].state = newState
735  self.dbi.update(tempBackTaskLogScheme[0], "id=%s" % localId)
736  else:
737  logger.error("Error can't fetch record from TaskBackLog with task id = " + str(localId))
738 
739 
740  # #onTasksManagerGeneralResponse event handler
741  #
742  # @param event instance of Event object
744  try:
745  logger.debug("onTasksManagerGeneralResponse, event:" + str(event.uid) + "\n" + Utils.varDump(event))
746  self.processOperationStep(event)
747  except Exception as err:
748  ExceptionLog.handler(logger, err, "Exception:")
749 
750 
751  # #function process
752  #
753  # @param event instance of Event object
754  def processOperationStep(self, event):
755  generalResponse = event.eventObj
756  if event.uid in self.pendingTasks:
757  if generalResponse.errorCode == GeneralResponse.ERROR_OK:
758  if len(self.pendingTasks[event.uid].tasksteps) > 1:
759  if event.uid in self.pendingTasks:
760  logger.debug("Update pendingTasks[] item " + str(event.uid))
761  else:
762  logger.debug("Insert pendingTasks[] item " + str(event.uid))
763  self.pendingTasks[event.uid] = TaskRecord(self.pendingTasks[event.uid].tasksteps[1:],
764  self.pendingTasks[event.uid].event,
765  self.pendingTasks[event.uid].responseEventType)
766  self.pendingTasks[event.uid].tasksteps[0].ok_callback()
767  else:
768  self.replyGeneralResponse(event)
769  logger.debug("Delete 1 from pendingTasks[] item " + str(event.uid))
770  del self.pendingTasks[event.uid]
771  else:
772  if self.pendingTasks[event.uid].tasksteps[0].err_callback:
773  self.pendingTasks[event.uid].tasksteps[0].err_callback()
774  self.replyGeneralResponse(event)
775  logger.debug("Delete 2 from pendingTasks[] item " + str(event.uid))
776  del self.pendingTasks[event.uid]
777  else:
778  logger.debug("processOperationStepr: pending event " + str(event.uid) + " " + str(self.pendingTasks))
779 
780 
781 
782  # #send GeneralResponse reply event at the end of operation queue
783  # or while get err response during execution the queue
784  #
785  # @param event instance of Event object
786  def replyGeneralResponse(self, event):
787  if self.pendingTasks[event.uid].responseEventType != None:
788  generalResponse = event.eventObj
789  responseEvent = self.eventBuilder.build(self.pendingTasks[event.uid].responseEventType, generalResponse)
790  self.reply(self.pendingTasks[event.uid].event, responseEvent)
791 
792 
793 
794  # #onFetchResultResponse event handler
795  #
796  # @param event instance of Event object
797  def onFetchResultResponse(self, event):
798  try:
799  if event.uid in self.fetchEvents:
800  replyEvent = self.eventBuilder.build(EVENT_TYPES.FETCH_TASK_RESULTS_RESPONSE, event.eventObj)
801  self.reply(self.fetchEvents[event.uid], replyEvent)
802  logger.debug("Delete from fetchEvents[] item " + str(event.uid))
803  del self.fetchEvents[event.uid]
804  else:
805  logger.error("onFetchResultResponse no such event.uid" + str(event.uid))
806  except Exception as err:
807  ExceptionLog.handler(logger, err, "Exception:")
808 
809 
810  '''
811  def checkDBIState(self):
812  if self.dbi.getErrorCode() != dbi_consts.DBI_SUCCESS_CODE:
813  logger.debug("DBI error:" + str(self.dbi.getErrorCode()) + " : " + self.dbi.getErrorMsg())
814  raise DBIErr(self.dbi.getErrorCode(), self.dbi.getErrorMsg())
815  '''
816 
817 
818  # #creates taskLog object from EventObjects.Task object
819  #
820  # @param taskObj instance of EventObjects.Task object
821  def createTaskLog(self, taskObj):
822  taskLog = TaskLog()
823  taskLog.id = taskObj.id
824  if "deleteTaskId" in taskObj.__dict__:
825  taskLog.deleteTaskId = taskObj.deleteTaskId
826  return taskLog
827 
828 
829 
830  # #clean up allstructures after error in dbi object
831  #
832  def cleanAfterDBIErr(self, event):
833  deleteTaskData = DeleteTaskData(event.eventObj.id)
834  deleteEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK_DATA, deleteTaskData)
835  logger.debug("Delete from tasksQueue[] item " + str(event.eventObj.id))
836  del self.tasksQueue[event.eventObj.id]
837  self.send(self.TASKS_DATA_MANAGER_CLIENT, deleteEvent)
838 
839 
840 
841  # #check existence task in the manager(in processing mode)
842  #
843  def checkTaskPresence(self, taskId):
844  if not taskId in self.tasksQueue:
845  raise TaskNoPresentErr("The task is not present in tasksManager id=" + str(taskId))
846 
847 
848 
849  # #create TaskManagerFields object from TaslLog object
850  #
851  # @param taskLog instance of TaskLog object
852  def createTaskManagerFields(self, taskLog):
853  taskManagerFields = TaskManagerFields(taskLog.id)
854  attributes = [attr for attr in dir(taskLog) if not attr.startswith('__') and not attr.startswith('_')]
855  for attr in attributes:
856  taskManagerFields.fields[attr] = getattr(taskLog, attr, None)
857  return taskManagerFields
858 
859 
860 
861  # #create TaslLog objects and filling properties from imput dic
862  #
863  # @param fields instance of dic object
864  def createTaskLogFromDic(self, fields):
865  taskLog = TaskLog()
866  for attr in [attr for attr in dir(taskLog) if not attr.startswith('__') and not attr.startswith('_')]:
867  if attr in fields:
868  setattr(taskLog, attr, fields[attr])
869  return taskLog
870 
871 
872 
873  # #set empty fields to None ("host" and "port" field now)
874  #
875  # @param fields instance of dic object
876  def clearEmptyFields(self, fields, taskId):
877  ret = fields
878  try:
879  lookBackTaskLogScheme = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=%s" % taskId)
880  if lookBackTaskLogScheme != None and len(lookBackTaskLogScheme) > 0 and lookBackTaskLogScheme[0] != None:
881  if DTM_CONSTS.DRCE_FIELDS.HOST in dir(lookBackTaskLogScheme[0]) and \
882  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.HOST) != None and \
883  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.HOST) != "":
884  ret[DTM_CONSTS.DRCE_FIELDS.HOST] = None
885  logger.debug(str(DTM_CONSTS.DRCE_FIELDS.HOST) + " is not empty, set to None")
886  if DTM_CONSTS.DRCE_FIELDS.PORT in dir(lookBackTaskLogScheme[0]) and \
887  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.PORT) != None and \
888  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.PORT) != "":
889  ret[DTM_CONSTS.DRCE_FIELDS.PORT] = None
890  logger.debug(str(DTM_CONSTS.DRCE_FIELDS.PORT) + " is not empty, set to None")
891 
892  # TODO: temporary fix for wrong update state from finished to in progress or from in progress to new
893  if (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_IN_PROGRESS and\
894  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == EEResponseData.TASK_STATE_FINISHED) or\
895  (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_NEW and\
896  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == EEResponseData.TASK_STATE_IN_PROGRESS) or\
897  (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_FINISHED and\
898  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == \
899  EEResponseData.TASK_STATE_SCHEDULED_TO_DELETE) or\
900  (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_IN_PROGRESS and\
901  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == \
902  EEResponseData.TASK_STATE_SCHEDULED_TO_DELETE) or\
903  (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_NEW and\
904  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == EEResponseData.TASK_STATE_FINISHED):
905  ret = {}
906  # pylint: disable-msg=W0212
907  logger.debug("Aborted update with wrong state value for task " + str(taskId) + "\nfields to update: " + \
908  str(fields) + "\nfields in db: " + Utils.varDump(lookBackTaskLogScheme[0]._getTaskLog()))
909  except DBIErr as err:
910  logger.error(">>> Some DBI error in TasksManager.clearEmptyFields [" + str(err.message) + "]")
911  return ret
912 
913 
914 
916  # discuss
917  logger.debug("Event: " + str(event))
918 
919 
920 
921  def processSchedulerFailure(self, event):
922  # discuss rescheduling strategy and etc
923  # pass
924  logger.debug("Event: " + str(event))
925 
926 
927 
928  # #onDeleteTaskResponse event handler
929  #
930  # @param event instance of Event object
931  def onDeleteTaskResponse(self, event):
932  # discuss
933  # pass
934  logger.debug("Event: " + str(event))
935 
936 
937 
938  # #onFetchAvailableTasks event handler
939  # fetch oldest running tasks from DB
940  # @param event instance of Event object
941  def onFetchAvailableTasks(self, event):
942  resultList = []
943  tasksList = None
944  try:
945  additionWhere = ""
946  SELECT_TEMPLATE_STR = "SELECT %s from %s%s"
947  if event.eventObj.criterions is not None:
948  additionWhere = app.SQLCriterions.generateCriterionSQL(event.eventObj.criterions)
949 
950  if event.eventObj.fetchAdditionalFields:
951  tasksList = []
952  clause = (SELECT_TEMPLATE_STR % ('*', event.eventObj.tableName, additionWhere))
953  else:
954  clause = (SELECT_TEMPLATE_STR % ('`id`', event.eventObj.tableName, additionWhere))
955 
956  logger.debug(">>> Get tasks SQL == " + clause)
957  results = self.dbi.sql(TaskBackLogScheme(TaskLog), clause)
958  # logger.debug(self.dbi)
959  if len(results) > 0:
960  for record in results:
961  if len(resultList) >= event.eventObj.fetchNum:
962  break
963  resultList.append(record.id)
964  if event.eventObj.fetchAdditionalFields:
965  tasksList.append(record._getTaskLog())
966  else:
967  logger.debug("No tasks selected for auto check state")
968  except DBIErr as err:
969  logger.error(">>> Some DBI error in TasksManager.onFetchAvailableTasks [" + str(err.message) + "]")
970  except Exception as err:
971  ExceptionLog.handler(logger, err, "Exception:")
972  res = AvailableTaskIds(resultList, tasksList)
973  responseEvent = self.eventBuilder.build(EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, res)
974  self.reply(event, responseEvent)
975 
976 
977  # #deleteOnCleanUp method creates DeleteTask object and start tasks deleting process, on autoCleanUp event
978  #
979  # @param taskData - incoming tasks data (fetching from kvdb)
980  # @param autoCleanupFields - tasks autoCleanUp fields
981  def deleteOnCleanUp(self, taskData, autoCleanupFields):
982  logger.debug(">>> Start cleanup taskId=" + str(taskData.id))
983  try:
984  self.checkTaskPresence(taskData.id)
985  deleteObj = DeleteTask(taskData.id)
986  if "DeleteType" in autoCleanupFields and autoCleanupFields["DeleteType"] is not None:
987  deleteObj.action = autoCleanupFields["DeleteType"]
988  if "DeleteRetries" in autoCleanupFields and autoCleanupFields["DeleteRetries"] is not None:
989  deleteObj.strategy["RETRY"] = autoCleanupFields["DeleteRetries"]
990  delEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK, deleteObj)
991  self.onDeleteTask(delEvent)
992  logger.debug(">>> Clear autocleanup field taskId=" + str(taskData.id))
993  taskData.autoCleanupFields = None
994  self.dbi.update(taskData, "id=%s" % str(taskData.id))
995  except TaskNoPresentErr as err:
996  logger.error(err.message)
997  except DBIErr as err:
998  logger.error(">>> Some DBI error in TasksManager.deleteOnCleanUp [" + str(err.message) + "]")
999 
1000 
1001  # #checkCleanUp method check tasks on autocleanup condition
1002  #
1003  def checkCleanUp(self, taskId=None):
1004  logger.debug(">>> Start cleanup method id=" + str(taskId))
1005  try:
1006  suspendedTasks = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=id")
1007  if hasattr(suspendedTasks, '__iter__'):
1008  for taskData in suspendedTasks:
1009  if taskData is not None and taskData.autoCleanupFields is not None and str(taskData.autoCleanupFields) != "":
1010  autoCleanupFields = pickle.loads(str(taskData.autoCleanupFields))
1011  if type(autoCleanupFields) == types.DictType:
1012  if taskData.id == taskId and "State" in autoCleanupFields and autoCleanupFields["State"] is not None and \
1013  autoCleanupFields["State"] == taskData.state:
1014  self.deleteOnCleanUp(taskData, autoCleanupFields)
1015  continue
1016  if "TTL" in autoCleanupFields and autoCleanupFields["TTL"] is not None and \
1017  autoCleanupFields["TTL"] < time.time():
1018  self.deleteOnCleanUp(taskData, autoCleanupFields)
1019  else:
1020  logger.debug(">>> End cleanup method [Bad kvdb fetching]")
1021  except DBIErr as err:
1022  logger.error(">>> Some DBI error in TasksManager.checkCleanUp [" + str(err.message) + "]")
1023 
1024 
1025  # #function will call every time when ConnectionTimeout exception arrive
1026  #
1027  def on_poll_timeout(self):
1028  localTimestamp = time.time()
1029  if self.prevCleanUpTime + self.cleanUpTimeout < localTimestamp:
1030  self.checkCleanUp()
1031  self.prevCleanUpTime = localTimestamp
1032 
1033 
1034  # Cleanup dtm tables
1035  def cleanupTables(self):
1036  logger.debug("Cleanup tables started")
1037 
1038  for tableName in self.CLEANUP_TABLES_LIST:
1039  customQuery = "DELETE FROM %s WHERE 1" % tableName
1040  try:
1041  self.dbi.session.execute(text(customQuery))
1042  self.dbi.session.commit()
1043 
1044  logger.debug("Cleanup of '%s' successfully finished", tableName)
1045 
1046  except sqlalchemy.exc.SQLAlchemyError, err:
1047  self.dbi.session.rollback()
1048  logger.error("Cleanup of '%s' failed. %s", tableName, str(err))
1049  except Exception, err:
1050  logger.error("Cleanup of '%s' failed. %s", tableName, str(err))
1051 
1052  logger.debug("Cleanup tables finished")
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def simpleDeleteTask(self, event, responseEventType)
def createTaskManagerFields(self, taskLog)
def checkCleanUp(self, taskId=None)
def onDeleteTask(self, event)
def cleanUpTask(self, taskBackLogScheme)
def createOperationSteps(self, event, dataManagerEventType, onlyLastTwo=False)
def restoreTaskSteps(self, taskId)
def updateStatField(self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
update values of stat field - default sum
def createTaskLog(self, taskObj)
def cleanUpOnStart(self, isClearOnStart)
def onGetTaskStatus(self, event)
def processSchedulerFailure(self, event)
GeneralResponse event object, represents general state response for multipurpose usage.
def createTaskLogFromDic(self, fields)
AvailableTaskIds event object, for return all available task id.
def generateCriterionSQL(criterions, additionWhere=None, siteId=None)
def clearEmptyFields(self, fields, taskId)
def getStatDataFields(self, fields)
getStatDataFields returns stat data from storage
def replyGeneralResponse(self, event)
def newTaskRollback(self, event)
def cleanAfterDBIErr(self, event)
def finishNewTaskData(self, event)
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
Class describes structures of task item used in TaskManager.
Definition: TaskLog.py:10
def onTasksManagerGeneralResponse(self, event)
def addConnection(self, name, connection)
def getDeletedTask(self, taskLog)
DeleteTaskData event object, to delete task&#39;s data in the storage.
def checkTaskPresence(self, taskId)
DeleteTask event object, to delete task from DTM application and from EE.
def onFetchAvailableTasks(self, event)
This is app base class for management server connection end-points and parallel transport messages pr...
def updateTaskBackLogToSchedulerStep(self, localId, incr, newState)
Definition: dbi.py:1
def onFetchResultsCache(self, event)
def processOperationStep(self, event)
def onGetTaskFields(self, event)
def send(self, connect_name, event)
send event
def deleteOnCleanUp(self, taskData, autoCleanupFields)
def onDeleteTaskResponse(self, event)
def onFetchResultResponse(self, event)
def cleanUpTaskNetworkOperation(self, taskId, delFromTDMData)
def addNewTaskData(self, event)
DeleteEEResponseData event object, to delete EE response data from the storage.
def onUpdateTask(self, event)
TaskManagerFields event object, for return task fields values.
FetchTaskData event object, to fetch task data from the storage.
def statFieldsRecalculate(self, taskLogScheme)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
def processTasksDataManagerFailure(self, event)
def __init__(self, configParser, connectBuilderLight, pollerManager=None)
Definition: TasksManager.py:98
def onUpdateTaskField(self, event)
def onFetchTaskDataResponse(self, event)