HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
dtm.TasksManager.TasksManager Class Reference
Inheritance diagram for dtm.TasksManager.TasksManager:
Collaboration diagram for dtm.TasksManager.TasksManager:

Public Member Functions

def __init__ (self, configParser, connectBuilderLight, pollerManager=None)
 
def cleanUpOnStart (self, isClearOnStart)
 
def onNewTask (self, event)
 
def createOperationSteps (self, event, dataManagerEventType, onlyLastTwo=False)
 
def addNewTaskData (self, event)
 
def finishNewTaskData (self, event)
 
def newTaskRollback (self, event)
 
def onUpdateTask (self, event)
 
def getDeletedTask (self, taskLog)
 
def onDeleteTask (self, event)
 
def simpleDeleteTask (self, event, responseEventType)
 
def sendGroupDeleteResponse (self)
 
def onGetTaskStatus (self, event)
 
def onFetchResultsCache (self, event)
 
def onGetTaskFields (self, event)
 
def onUpdateTaskField (self, event)
 
def onFetchTaskDataResponse (self, event)
 
def restoreTaskSteps (self, taskId)
 
def statFieldsRecalculate (self, taskLogScheme)
 
def cleanUpTask (self, taskBackLogScheme)
 
def cleanUpTaskNetworkOperation (self, taskId, delFromTDMData)
 
def updateTaskBackLogToSchedulerStep (self, localId, incr, newState)
 
def onTasksManagerGeneralResponse (self, event)
 
def processOperationStep (self, event)
 
def replyGeneralResponse (self, event)
 
def onFetchResultResponse (self, event)
 
def createTaskLog (self, taskObj)
 
def cleanAfterDBIErr (self, event)
 
def checkTaskPresence (self, taskId)
 
def createTaskManagerFields (self, taskLog)
 
def createTaskLogFromDic (self, fields)
 
def clearEmptyFields (self, fields, taskId)
 
def processTasksDataManagerFailure (self, event)
 
def processSchedulerFailure (self, event)
 
def onDeleteTaskResponse (self, event)
 
def onFetchAvailableTasks (self, event)
 
def deleteOnCleanUp (self, taskData, autoCleanupFields)
 
def checkCleanUp (self, taskId=None)
 
def on_poll_timeout (self)
 
def cleanupTables (self)
 
- Public Member Functions inherited from app.BaseServerManager.BaseServerManager
def __init__ (self, poller_manager=None, admin_connection=None, conectionLightBuilder=None, exceptionForward=False, dumpStatVars=True)
 constructor More...
 
def addConnection (self, name, connection)
 
def setEventHandler (self, eventType, handler)
 set event handler rewrite the current handler for eventType More...
 
def send (self, connect_name, event)
 send event More...
 
def reply (self, event, reply_event)
 wrapper for sending event in reply for event More...
 
def poll (self)
 poll function polling connections receive as multipart msg, the second argument is pickled pyobj More...
 
def process (self, event)
 process event call the event handler method that was set by user or on_unhandled_event method if not set More...
 
def run (self)
 
def is_connection_registered (self, name)
 check is a connection was registered in a instance of BaseServerManager i object More...
 
def on_poll_timeout (self)
 function will call every time when ConnectionTimeout exception arrive More...
 
def on_unhandled_event (self, event)
 function will call every time when arrive doesn't set handler for event type of event.evenType More...
 
def build_poller_list (self)
 
def clear_poller (self)
 
def onAdminState (self, event)
 onAdminState event handler process admin SHUTDOWN command More...
 
def onAdminFetchStatData (self, event)
 onAdminState event handler process admin command More...
 
def onAdminSuspend (self, event)
 onAdminState event handler process admin command More...
 
def getStatDataFields (self, fields)
 getStatDataFields returns stat data from storage More...
 
def getSystemStat (self)
 getSystemStat returns stat data for system indicators: RAMV, RAMR and CPU More...
 
def getConfigVarsFields (self, fields)
 getConfigVarsFields returns config vars from storage More...
 
def onAdminGetConfigVars (self, event)
 onAdminGetConfigVars event handler process getConfigVars admin command, fill and return config vars array from internal storage More...
 
def onAdminSetConfigVars (self, event)
 onAdminSetConfigVars event handler process setConfigVars admin command More...
 
def setConfigVars (self, setConfigVars)
 processSetConfigVars sets config vars in storage More...
 
def sendAdminReadyEvent (self)
 send ready event to notify adminInterfaceService More...
 
def createLogMsg (self, event)
 from string message from event object More...
 
def initStatFields (self, connect_name)
 add record in statFields More...
 
def updateStatField (self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
 update values of stat field - default sum More...
 
def processSpecialConfigVars (self, name, value)
 send ready event to notify adminInterfaceService More...
 
def getLogLevel (self)
 Get log level from first of existing loggers. More...
 
def setLogLevel (self, level)
 Set log level for all loggers. More...
 
def saveStatVarsDump (self)
 Save stat vars in json file. More...
 
def loadStatVarsDump (self)
 Load stat vars in json file. More...
 
def getStatVarsDumpFileName (self)
 Get stat vars file name. More...
 
def createDBIDict (self, configParser)
 

Public Attributes

 cfg_section
 
 groupDeleteResponseEvent
 
 tasksQueue
 
 pendingTasks
 
 dbi
 
 fetchEvents
 
 cleanUpTimeout
 
 prevCleanUpTime
 
- Public Attributes inherited from app.BaseServerManager.BaseServerManager
 dumpStatVars
 
 poller_manager
 
 eventBuilder
 
 exit_flag
 
 pollTimeout
 
 connections
 
 event_handlers
 
 statFields
 stat fields container More...
 
 configVars
 
 exceptionForward
 

Static Public Attributes

string SERVER = "server"
 
string TASKS_DATA_MANAGER_CLIENT = "clientTasksDataManager"
 
string SCHEDULER_CLIENT = "clientScheduler"
 
string CONFIG_TIME_SLOT_PERIOD = "timeSlotPeriod"
 
string AUTO_CLEANUP_TIME_SLOT_PERIOD = "autoCleanUpSlotPeriod"
 
string VAR_TASKS_TOTAL = "tasks_total"
 
string VAR_TASKS_TOTAL_DEL = "tasks_total_del"
 
string VAR_TASKS_TIME_SUM = "tasks_time_sum"
 
string VAR_TASKS_TIME_COUNT = "tasks_time_count"
 
string VAR_TASKS_TIME_AVG = "tasks_time_avg"
 
string VAR_TASKS_TIME_MIN = "tasks_time_min"
 
string VAR_TASKS_TIME_MAX = "tasks_time_max"
 
string VAR_TASKS_ERRORS = "tasks_errors"
 
string VAR_TASKS_RETRIES = "tasks_retries"
 
string VAR_TASKS_RETRIES_DEL = "tasks_retries_del"
 
string VAR_TASKS_DELETE_TRIES = "tasks_delete_tries"
 
list CLEANUP_TABLES_LIST
 
- Static Public Attributes inherited from app.BaseServerManager.BaseServerManager
string ADMIN_CONNECT_ENDPOINT = "Admin"
 
string ADMIN_CONNECT_CLIENT = "Admin"
 
int POLL_TIMEOUT_DEFAULT = 3000
 
int STAT_FIELDS_OPERATION_ADD = 0
 
int STAT_FIELDS_OPERATION_SUB = 1
 
int STAT_FIELDS_OPERATION_SET = 2
 
int STAT_FIELDS_OPERATION_INIT = 3
 
string POLL_TIMEOUT_CONFIG_VAR_NAME = "POLL_TIMEOUT"
 
string LOG_LEVEL_CONFIG_VAR_NAME = "LOG_LEVEL"
 
string STAT_DUMPS_DEFAULT_DIR = "/tmp/"
 
string STAT_DUMPS_DEFAULT_NAME = "%APP_NAME%_%CLASS_NAME%_stat_vars.dump"
 
dictionary LOGGERS_NAMES = {APP_CONSTS.LOGGER_NAME, "dc", "dtm", "root", ""}
 

Detailed Description

Definition at line 67 of file TasksManager.py.

Constructor & Destructor Documentation

◆ __init__()

def dtm.TasksManager.TasksManager.__init__ (   self,
  configParser,
  connectBuilderLight,
  pollerManager = None 
)

Definition at line 98 of file TasksManager.py.

98  def __init__(self, configParser, connectBuilderLight, pollerManager=None):
99  super(TasksManager, self).__init__(pollerManager)
100 
101  self.cfg_section = self.__class__.__name__
102 
103  self.groupDeleteResponseEvent = None
104  serverAddr = configParser.get(self.cfg_section, self.SERVER)
105  tasksDataManagerAddr = configParser.get(self.cfg_section, self.TASKS_DATA_MANAGER_CLIENT)
106  schedulerAddr = configParser.get(self.cfg_section, self.SCHEDULER_CLIENT)
107 
108  serverConnection = connectBuilderLight.build(consts.SERVER_CONNECT, serverAddr)
109  tasksDataManagerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, tasksDataManagerAddr)
110  schedulerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, schedulerAddr)
111 
112  self.addConnection(self.SERVER, serverConnection)
113  self.addConnection(self.TASKS_DATA_MANAGER_CLIENT, tasksDataManagerConnection)
114  self.addConnection(self.SCHEDULER_CLIENT, schedulerConnection)
115 
116  # server events
117  self.setEventHandler(EVENT_TYPES.NEW_TASK, self.onNewTask)
118  self.setEventHandler(EVENT_TYPES.UPDATE_TASK, self.onUpdateTask)
119  self.setEventHandler(EVENT_TYPES.GET_TASK_STATUS, self.onGetTaskStatus)
120  self.setEventHandler(EVENT_TYPES.FETCH_RESULTS_CACHE, self.onFetchResultsCache)
121  self.setEventHandler(EVENT_TYPES.DELETE_TASK, self.onDeleteTask)
122  self.setEventHandler(EVENT_TYPES.GET_TASK_FIELDS, self.onGetTaskFields)
123  self.setEventHandler(EVENT_TYPES.UPDATE_TASK_FIELDS, self.onUpdateTaskField)
124  self.setEventHandler(EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS, self.onFetchAvailableTasks)
125  # tasksDataManager, Scheduler events - new,update, delete
126  self.setEventHandler(EVENT_TYPES.NEW_TASK_RESPONSE, self.onTasksManagerGeneralResponse)
127  self.setEventHandler(EVENT_TYPES.SCHEDULE_TASK_RESPONSE, self.onTasksManagerGeneralResponse)
128  self.setEventHandler(EVENT_TYPES.UPDATE_TASK_RESPONSE, self.onTasksManagerGeneralResponse)
129  self.setEventHandler(EVENT_TYPES.FETCH_TASK_RESULTS_RESPONSE, self.onFetchResultResponse)
130  self.setEventHandler(EVENT_TYPES.DELETE_TASK_RESPONSE, self.onDeleteTaskResponse)
131  self.setEventHandler(EVENT_TYPES.DELETE_EE_DATA_RESPONSE, self.onDeleteTaskResponse)
132  self.setEventHandler(EVENT_TYPES.DELETE_TASK_DATA_RESPONSE, self.onDeleteTaskResponse)
133  self.setEventHandler(EVENT_TYPES.FETCH_TASK_DATA_RESPONSE, self.onFetchTaskDataResponse)
134 
135 
136  # #@var tasksQueue
137  # map task.id => task without files field
138  self.tasksQueue = {}
139 
140  # #@var pendingTasks
141  # contains all pending tasks in task manager (operation steps)
142  # map event.uid => list of operations steps
143  self.pendingTasks = {}
144 
145  # #@var dbi
146  # db contains two tables log and backlog
147  self.dbi = DBI(self.createDBIDict(configParser))
148 
149  # #@var fetchEvents
150  # map event.uid => event
151  self.fetchEvents = {}
152 
153  isClearOnStart = configParser.get(self.cfg_section, DTM_CONSTS.CLEAR_ON_START)
154  self.cleanUpOnStart(isClearOnStart)
155 
156  # get time slot period
157  self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] = configParser.getint(self.cfg_section,
158  self.CONFIG_TIME_SLOT_PERIOD)
159  if self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] is not None:
160  self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] = self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] * 1000
161  self.cleanUpTimeout = configParser.getint(self.cfg_section, self.AUTO_CLEANUP_TIME_SLOT_PERIOD)
162  self.prevCleanUpTime = 0
163  self.updateStatField(self.VAR_TASKS_TOTAL, 0, self.STAT_FIELDS_OPERATION_INIT)
164  self.updateStatField(self.VAR_TASKS_TOTAL_DEL, 0, self.STAT_FIELDS_OPERATION_INIT)
165  self.updateStatField(self.VAR_TASKS_TIME_SUM, 0, self.STAT_FIELDS_OPERATION_INIT)
166  self.updateStatField(self.VAR_TASKS_TIME_COUNT, 0, self.STAT_FIELDS_OPERATION_INIT)
167  self.updateStatField(self.VAR_TASKS_TIME_AVG, 0, self.STAT_FIELDS_OPERATION_INIT)
168  self.updateStatField(self.VAR_TASKS_TIME_MIN, 0, self.STAT_FIELDS_OPERATION_INIT)
169  self.updateStatField(self.VAR_TASKS_TIME_MAX, 0, self.STAT_FIELDS_OPERATION_INIT)
170  self.updateStatField(self.VAR_TASKS_ERRORS, 0, self.STAT_FIELDS_OPERATION_INIT)
171  self.updateStatField(self.VAR_TASKS_RETRIES, 0, self.STAT_FIELDS_OPERATION_INIT)
172  self.updateStatField(self.VAR_TASKS_RETRIES_DEL, 0, self.STAT_FIELDS_OPERATION_INIT)
173  self.updateStatField(self.VAR_TASKS_DELETE_TRIES, 0, self.STAT_FIELDS_OPERATION_INIT)
174 
175 
def __init__(self)
constructor
Definition: UIDGenerator.py:19

Member Function Documentation

◆ addNewTaskData()

def dtm.TasksManager.TasksManager.addNewTaskData (   self,
  event 
)

Definition at line 262 of file TasksManager.py.

262  def addNewTaskData(self, event):
263  try:
264  logger.debug("New task data id = " + str(event.eventObj.id) + " type = " + str(type(event.eventObj)))
265  newTask = event.eventObj
266  newTask.files = None
267  self.tasksQueue[newTask.id] = newTask
268  taskLog = self.createTaskLog(newTask)
269  taskLog.state = EEResponseData.TASK_STATE_NEW_DATA_STORED
270  taskLog.cDate = datetime.now()
271  taskLog.tries = 0
272  taskLog.name = newTask.name
273  taskLog.type = newTask.type
274  if "time_max" in newTask.session:
275  taskLog.pTimeMax = newTask.session["time_max"]
276  if newTask.autoCleanupFields is not None:
277  if "TTL" in newTask.autoCleanupFields and newTask.autoCleanupFields["TTL"] is not None:
278  newTask.autoCleanupFields["TTL"] = newTask.autoCleanupFields["TTL"] + time.time()
279  taskLog.autoCleanupFields = pickle.dumps(newTask.autoCleanupFields)
280  if taskLog.deleteTaskId == None:
281  self.updateStatField(self.VAR_TASKS_TOTAL, 1, self.STAT_FIELDS_OPERATION_ADD)
282  else:
283  self.updateStatField(self.VAR_TASKS_TOTAL_DEL, 1, self.STAT_FIELDS_OPERATION_ADD)
284  self.dbi.insert(TaskBackLogScheme(taskLog))
285 
286  # go to the next step
287  responseEvent = self.eventBuilder.build(EVENT_TYPES.GENERAL_RESPONSE, GeneralResponse())
288  responseEvent.uid = event.uid
289 
290  self.processOperationStep(responseEvent)
291 
292  except DBIErr as err:
293  logger.error(">>> Some DBI error in TasksManager.addNewTaskData [" + str(err.message) + "]")
294  responseEvent = self.eventBuilder.build(EVENT_TYPES.GENERAL_RESPONSE, GeneralResponse(err.errCode, err.message))
295  responseEvent.uid = event.uid
296  self.processOperationStep(responseEvent)
297 
298 
299 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ checkCleanUp()

def dtm.TasksManager.TasksManager.checkCleanUp (   self,
  taskId = None 
)

Definition at line 1003 of file TasksManager.py.

1003  def checkCleanUp(self, taskId=None):
1004  logger.debug(">>> Start cleanup method id=" + str(taskId))
1005  try:
1006  suspendedTasks = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=id")
1007  if hasattr(suspendedTasks, '__iter__'):
1008  for taskData in suspendedTasks:
1009  if taskData is not None and taskData.autoCleanupFields is not None and str(taskData.autoCleanupFields) != "":
1010  autoCleanupFields = pickle.loads(str(taskData.autoCleanupFields))
1011  if type(autoCleanupFields) == types.DictType:
1012  if taskData.id == taskId and "State" in autoCleanupFields and autoCleanupFields["State"] is not None and \
1013  autoCleanupFields["State"] == taskData.state:
1014  self.deleteOnCleanUp(taskData, autoCleanupFields)
1015  continue
1016  if "TTL" in autoCleanupFields and autoCleanupFields["TTL"] is not None and \
1017  autoCleanupFields["TTL"] < time.time():
1018  self.deleteOnCleanUp(taskData, autoCleanupFields)
1019  else:
1020  logger.debug(">>> End cleanup method [Bad kvdb fetching]")
1021  except DBIErr as err:
1022  logger.error(">>> Some DBI error in TasksManager.checkCleanUp [" + str(err.message) + "]")
1023 
1024 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ checkTaskPresence()

def dtm.TasksManager.TasksManager.checkTaskPresence (   self,
  taskId 
)

Definition at line 843 of file TasksManager.py.

843  def checkTaskPresence(self, taskId):
844  if not taskId in self.tasksQueue:
845  raise TaskNoPresentErr("The task is not present in tasksManager id=" + str(taskId))
846 
847 
848 
Here is the caller graph for this function:

◆ cleanAfterDBIErr()

def dtm.TasksManager.TasksManager.cleanAfterDBIErr (   self,
  event 
)

Definition at line 832 of file TasksManager.py.

832  def cleanAfterDBIErr(self, event):
833  deleteTaskData = DeleteTaskData(event.eventObj.id)
834  deleteEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK_DATA, deleteTaskData)
835  logger.debug("Delete from tasksQueue[] item " + str(event.eventObj.id))
836  del self.tasksQueue[event.eventObj.id]
837  self.send(self.TASKS_DATA_MANAGER_CLIENT, deleteEvent)
838 
839 
840 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ cleanUpOnStart()

def dtm.TasksManager.TasksManager.cleanUpOnStart (   self,
  isClearOnStart 
)

Definition at line 179 of file TasksManager.py.

179  def cleanUpOnStart(self, isClearOnStart):
180  if isClearOnStart == "True":
181  try:
182  suspendedTasks = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=id")
183  if hasattr(suspendedTasks, '__iter__') and len(suspendedTasks) > 0 and suspendedTasks[0] is not None:
184  for task in suspendedTasks:
185  logger.debug(">>> Start suspend task to delete id=%s", task.id)
186  task.state = EEResponseData.TASK_STATE_FINISHED
187  task.deleteTaskId = 0
188  self.dbi.update(task, "id=%s" % str(task.id))
189 
190  for task in suspendedTasks:
191  deleteObj = DeleteTask(task.id)
192  delEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK, deleteObj)
193  self.onDeleteTask(delEvent)
194  except DBIErr as err:
195  logger.error(">>> Some DBI error in TasksManager.cleanUpOnStart [" + str(err.message) + "]")
196 
197 
Here is the call graph for this function:

◆ cleanupTables()

def dtm.TasksManager.TasksManager.cleanupTables (   self)

Definition at line 1035 of file TasksManager.py.

1035  def cleanupTables(self):
1036  logger.debug("Cleanup tables started")
1037 
1038  for tableName in self.CLEANUP_TABLES_LIST:
1039  customQuery = "DELETE FROM %s WHERE 1" % tableName
1040  try:
1041  self.dbi.session.execute(text(customQuery))
1042  self.dbi.session.commit()
1043 
1044  logger.debug("Cleanup of '%s' successfully finished", tableName)
1045 
1046  except sqlalchemy.exc.SQLAlchemyError, err:
1047  self.dbi.session.rollback()
1048  logger.error("Cleanup of '%s' failed. %s", tableName, str(err))
1049  except Exception, err:
1050  logger.error("Cleanup of '%s' failed. %s", tableName, str(err))
1051 
1052  logger.debug("Cleanup tables finished")

◆ cleanUpTask()

def dtm.TasksManager.TasksManager.cleanUpTask (   self,
  taskBackLogScheme 
)

Definition at line 670 of file TasksManager.py.

670  def cleanUpTask(self, taskBackLogScheme):
671  if taskBackLogScheme is not None and len(taskBackLogScheme) > 0:
672  # pylint: disable-msg=W0212
673  try:
674  taskBackLogScheme[0].fDate = datetime.now()
675  taskLogScheme = TaskLogScheme(taskBackLogScheme[0]._getTaskLog())
676  if taskLogScheme.pTime is not None:
677  self.statFieldsRecalculate(taskLogScheme)
678  try:
679  customQuery = "select count(*) from %s where id = '%s'" \
680  % (str(taskLogScheme.__tablename__), str(taskLogScheme.id))
681  logger.debug("!!! customQuery: %s", str(customQuery))
682  customResponse = self.dbi.sqlCustom(customQuery)
683  logger.debug("!!! customResponse: %s", varDump(customResponse))
684  if len(customResponse) > 0 and len(customResponse[0]) > 0 and int(customResponse[0][0]) > 0:
685  logger.debug("!!! taskId = %s already exist. Try delete from table.", str(taskLogScheme.id))
686  self.dbi.delete(taskLogScheme, "id=%s" % taskLogScheme.id)
687 
688  self.dbi.insert(taskLogScheme)
689  except DBIErr as err:
690  logger.error(">>> Some DBI error in TasksManager.cleanUpTask moving backlog/log " +
691  "operation [" + str(err.message) + "]")
692  self.dbi.delete(taskBackLogScheme[0], "id=%s" % taskLogScheme.id)
693  # Delete task from queue
694  logger.debug("Delete from tasksQueue[] item " + str(taskLogScheme.id))
695  self.cleanUpTaskNetworkOperation(taskLogScheme.id, True)
696  self.checkTaskPresence(taskLogScheme.id)
697  del self.tasksQueue[taskLogScheme.id]
698  except TaskNoPresentErr as err:
699  logger.error(err.message)
700  except DBIErr as err:
701  logger.error(">>> Some DBI error in TasksManager.cleanUpTask [" + str(err.message) + "]")
702  else:
703  logger.error("The input taskBackLogScheme is None or empty: " + Utils.varDump(taskBackLogScheme))
704 
705 
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:
Here is the caller graph for this function:

◆ cleanUpTaskNetworkOperation()

def dtm.TasksManager.TasksManager.cleanUpTaskNetworkOperation (   self,
  taskId,
  delFromTDMData 
)

Definition at line 710 of file TasksManager.py.

710  def cleanUpTaskNetworkOperation(self, taskId, delFromTDMData):
711  deleteTask = DeleteTask(0, taskId)
712  if delFromTDMData:
713  deleteTaskData = DeleteTaskData(taskId)
714  deleteEEResponseData = DeleteEEResponseData(taskId)
715  if delFromTDMData:
716  new_event = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK_DATA, deleteTaskData)
717  self.send(self.TASKS_DATA_MANAGER_CLIENT, new_event)
718  # Delete EE Data
719  new_event = self.eventBuilder.build(EVENT_TYPES.DELETE_EE_DATA, deleteEEResponseData)
720  self.send(self.TASKS_DATA_MANAGER_CLIENT, new_event)
721  # Delete task
722  new_event = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK, deleteTask)
723  self.send(self.SCHEDULER_CLIENT, new_event)
724 
725 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ clearEmptyFields()

def dtm.TasksManager.TasksManager.clearEmptyFields (   self,
  fields,
  taskId 
)

Definition at line 876 of file TasksManager.py.

876  def clearEmptyFields(self, fields, taskId):
877  ret = fields
878  try:
879  lookBackTaskLogScheme = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=%s" % taskId)
880  if lookBackTaskLogScheme != None and len(lookBackTaskLogScheme) > 0 and lookBackTaskLogScheme[0] != None:
881  if DTM_CONSTS.DRCE_FIELDS.HOST in dir(lookBackTaskLogScheme[0]) and \
882  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.HOST) != None and \
883  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.HOST) != "":
884  ret[DTM_CONSTS.DRCE_FIELDS.HOST] = None
885  logger.debug(str(DTM_CONSTS.DRCE_FIELDS.HOST) + " is not empty, set to None")
886  if DTM_CONSTS.DRCE_FIELDS.PORT in dir(lookBackTaskLogScheme[0]) and \
887  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.PORT) != None and \
888  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.PORT) != "":
889  ret[DTM_CONSTS.DRCE_FIELDS.PORT] = None
890  logger.debug(str(DTM_CONSTS.DRCE_FIELDS.PORT) + " is not empty, set to None")
891 
892  # TODO: temporary fix for wrong update state from finished to in progress or from in progress to new
893  if (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_IN_PROGRESS and\
894  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == EEResponseData.TASK_STATE_FINISHED) or\
895  (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_NEW and\
896  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == EEResponseData.TASK_STATE_IN_PROGRESS) or\
897  (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_FINISHED and\
898  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == \
899  EEResponseData.TASK_STATE_SCHEDULED_TO_DELETE) or\
900  (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_IN_PROGRESS and\
901  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == \
902  EEResponseData.TASK_STATE_SCHEDULED_TO_DELETE) or\
903  (ret[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_NEW and\
904  getattr(lookBackTaskLogScheme[0], DTM_CONSTS.DRCE_FIELDS.STATE) == EEResponseData.TASK_STATE_FINISHED):
905  ret = {}
906  # pylint: disable-msg=W0212
907  logger.debug("Aborted update with wrong state value for task " + str(taskId) + "\nfields to update: " + \
908  str(fields) + "\nfields in db: " + Utils.varDump(lookBackTaskLogScheme[0]._getTaskLog()))
909  except DBIErr as err:
910  logger.error(">>> Some DBI error in TasksManager.clearEmptyFields [" + str(err.message) + "]")
911  return ret
912 
913 
914 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ createOperationSteps()

def dtm.TasksManager.TasksManager.createOperationSteps (   self,
  event,
  dataManagerEventType,
  onlyLastTwo = False 
)

Definition at line 224 of file TasksManager.py.

224  def createOperationSteps(self, event, dataManagerEventType, onlyLastTwo=False):
225  if not onlyLastTwo:
226  dataManagerEvent = self.eventBuilder.build(dataManagerEventType, event.eventObj)
227  dataManagerEvent.uid = event.uid
228  ok_callback = partial(self.send, self.TASKS_DATA_MANAGER_CLIENT, dataManagerEvent)
229  err_callback = None
230  desc = STEP_SEND_TO_TASKS_DATA_MANAGER
231  first = TaskStep(ok_callback, err_callback, desc, event)
232 
233  ok_callback = partial(self.addNewTaskData, event)
234  err_callback = partial(self.cleanAfterDBIErr, event)
235  desc = STEP_ADD_TO_INTERNAL_STRUCTURES
236  second = TaskStep(ok_callback, err_callback, desc, event)
237 
238  newEvent = self.eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK, event.eventObj)
239  newEvent.uid = event.uid
240 
241  ok_callback = partial(self.send, self.SCHEDULER_CLIENT, newEvent)
242  err_callback = partial(self.newTaskRollback, event)
243  desc = STEP_SEND_TO_SCHEDULER
244  third = TaskStep(ok_callback, err_callback, desc, event)
245 
246  ok_callback = partial(self.finishNewTaskData, event)
247  err_callback = partial(self.newTaskRollback, event)
248  desc = STEP_UPDATE_STATE
249  four = TaskStep(ok_callback, err_callback, desc, event)
250 
251  if onlyLastTwo:
252  ret = [third, four]
253  else:
254  ret = [first, second, third, four]
255  return ret
256 
257 
258 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ createTaskLog()

def dtm.TasksManager.TasksManager.createTaskLog (   self,
  taskObj 
)

Definition at line 821 of file TasksManager.py.

821  def createTaskLog(self, taskObj):
822  taskLog = TaskLog()
823  taskLog.id = taskObj.id
824  if "deleteTaskId" in taskObj.__dict__:
825  taskLog.deleteTaskId = taskObj.deleteTaskId
826  return taskLog
827 
828 
829 
Here is the caller graph for this function:

◆ createTaskLogFromDic()

def dtm.TasksManager.TasksManager.createTaskLogFromDic (   self,
  fields 
)

Definition at line 864 of file TasksManager.py.

864  def createTaskLogFromDic(self, fields):
865  taskLog = TaskLog()
866  for attr in [attr for attr in dir(taskLog) if not attr.startswith('__') and not attr.startswith('_')]:
867  if attr in fields:
868  setattr(taskLog, attr, fields[attr])
869  return taskLog
870 
871 
872 
Here is the caller graph for this function:

◆ createTaskManagerFields()

def dtm.TasksManager.TasksManager.createTaskManagerFields (   self,
  taskLog 
)

Definition at line 852 of file TasksManager.py.

852  def createTaskManagerFields(self, taskLog):
853  taskManagerFields = TaskManagerFields(taskLog.id)
854  attributes = [attr for attr in dir(taskLog) if not attr.startswith('__') and not attr.startswith('_')]
855  for attr in attributes:
856  taskManagerFields.fields[attr] = getattr(taskLog, attr, None)
857  return taskManagerFields
858 
859 
860 
Here is the caller graph for this function:

◆ deleteOnCleanUp()

def dtm.TasksManager.TasksManager.deleteOnCleanUp (   self,
  taskData,
  autoCleanupFields 
)

Definition at line 981 of file TasksManager.py.

981  def deleteOnCleanUp(self, taskData, autoCleanupFields):
982  logger.debug(">>> Start cleanup taskId=" + str(taskData.id))
983  try:
984  self.checkTaskPresence(taskData.id)
985  deleteObj = DeleteTask(taskData.id)
986  if "DeleteType" in autoCleanupFields and autoCleanupFields["DeleteType"] is not None:
987  deleteObj.action = autoCleanupFields["DeleteType"]
988  if "DeleteRetries" in autoCleanupFields and autoCleanupFields["DeleteRetries"] is not None:
989  deleteObj.strategy["RETRY"] = autoCleanupFields["DeleteRetries"]
990  delEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK, deleteObj)
991  self.onDeleteTask(delEvent)
992  logger.debug(">>> Clear autocleanup field taskId=" + str(taskData.id))
993  taskData.autoCleanupFields = None
994  self.dbi.update(taskData, "id=%s" % str(taskData.id))
995  except TaskNoPresentErr as err:
996  logger.error(err.message)
997  except DBIErr as err:
998  logger.error(">>> Some DBI error in TasksManager.deleteOnCleanUp [" + str(err.message) + "]")
999 
1000 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ finishNewTaskData()

def dtm.TasksManager.TasksManager.finishNewTaskData (   self,
  event 
)

Definition at line 303 of file TasksManager.py.

303  def finishNewTaskData(self, event):
304  try:
305  newTask = event.eventObj
306  taskLog = self.createTaskLog(newTask)
307  taskLog.state = EEResponseData.TASK_STATE_NEW_SCHEDULED
308  self.dbi.update(TaskBackLogScheme(taskLog), "id = %s" % newTask.id)
309 
310  responseEvent = self.eventBuilder.build(EVENT_TYPES.GENERAL_RESPONSE, GeneralResponse())
311  responseEvent.uid = event.uid
312 
313  self.processOperationStep(responseEvent)
314 
315  except DBIErr as err:
316  logger.error(">>> Some DBI error in TasksManager.finishNewTaskData [" + str(err.message) + "]")
317  responseEvent = self.eventBuilder.build(EVENT_TYPES.GENERAL_RESPONSE, GeneralResponse(err.errCode, err.message))
318  responseEvent.uid = event.uid
319  self.processOperationStep(responseEvent)
320 
321 
322 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ getDeletedTask()

def dtm.TasksManager.TasksManager.getDeletedTask (   self,
  taskLog 
)

Definition at line 364 of file TasksManager.py.

364  def getDeletedTask(self, taskLog):
365  ret = None
366  try:
367  dbiRecords = self.dbi.fetch(TaskBackLogScheme(TaskLog), "deleteTaskId=%s" % taskLog.id)
368  except DBIErr as err:
369  logger.error(">>> Some DBI error in TasksManager.onDeleteTask [" + str(err.message) + "]")
370  else:
371  if hasattr(dbiRecords, '__iter__') and len(dbiRecords) > 0 and dbiRecords[0] is not None:
372  ret = dbiRecords[0].id
373  return ret
374 
375 
376 
Here is the caller graph for this function:

◆ newTaskRollback()

def dtm.TasksManager.TasksManager.newTaskRollback (   self,
  event 
)

Definition at line 326 of file TasksManager.py.

326  def newTaskRollback(self, event):
327  self.processSchedulerFailure(event)
328  # #@todo disccuss rescheduling strategy
329  self.cleanAfterDBIErr(event)
330 
331 
332 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ on_poll_timeout()

def dtm.TasksManager.TasksManager.on_poll_timeout (   self)

Definition at line 1027 of file TasksManager.py.

1027  def on_poll_timeout(self):
1028  localTimestamp = time.time()
1029  if self.prevCleanUpTime + self.cleanUpTimeout < localTimestamp:
1030  self.checkCleanUp()
1031  self.prevCleanUpTime = localTimestamp
1032 
1033 
Here is the call graph for this function:

◆ onDeleteTask()

def dtm.TasksManager.TasksManager.onDeleteTask (   self,
  event 
)

Definition at line 380 of file TasksManager.py.

380  def onDeleteTask(self, event):
381  response = None
382  # branch for multi-deleting
383  if event.eventObj.deleteTaskId == DeleteTask.GROUP_DELETE:
384  try:
385  dbiRecords = self.dbi.fetch(TaskBackLogScheme(TaskLog), "deleteTaskId=0")
386  except DBIErr as err:
387  logger.error(">>> Some DBI error in TasksManager.onDeleteTask [" + str(err.message) + "]")
388  response = GeneralResponse()
389  else:
390  if hasattr(dbiRecords, '__iter__') and len(dbiRecords) > 0 and dbiRecords[0] is not None:
391  self.groupDeleteResponseEvent = event
392  for record in dbiRecords:
393  taskLog = self.createTaskLog(event.eventObj)
394  taskLog.id = record.id
395  if self.getDeletedTask(taskLog) is None:
396  localDeleteObj = DeleteTask(taskLog.id)
397  delObject = copy.copy(event.eventObj)
398  delObject.deleteTaskId = taskLog.id
399  delObject.id = localDeleteObj.id
400  delEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK, delObject)
401  self.simpleDeleteTask(delEvent, None)
402  else:
403  response = GeneralResponse()
404  # branch for simple-deleting
405  else:
406  response = self.simpleDeleteTask(event, EVENT_TYPES.DELETE_TASK_RESPONSE)
407  if response != None:
408  responseEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK_RESPONSE, response)
409  self.reply(event, responseEvent)
410 
411 
412 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ onDeleteTaskResponse()

def dtm.TasksManager.TasksManager.onDeleteTaskResponse (   self,
  event 
)

Definition at line 931 of file TasksManager.py.

931  def onDeleteTaskResponse(self, event):
932  # discuss
933  # pass
934  logger.debug("Event: " + str(event))
935 
936 
937 

◆ onFetchAvailableTasks()

def dtm.TasksManager.TasksManager.onFetchAvailableTasks (   self,
  event 
)

Definition at line 941 of file TasksManager.py.

941  def onFetchAvailableTasks(self, event):
942  resultList = []
943  tasksList = None
944  try:
945  additionWhere = ""
946  SELECT_TEMPLATE_STR = "SELECT %s from %s%s"
947  if event.eventObj.criterions is not None:
948  additionWhere = app.SQLCriterions.generateCriterionSQL(event.eventObj.criterions)
949 
950  if event.eventObj.fetchAdditionalFields:
951  tasksList = []
952  clause = (SELECT_TEMPLATE_STR % ('*', event.eventObj.tableName, additionWhere))
953  else:
954  clause = (SELECT_TEMPLATE_STR % ('`id`', event.eventObj.tableName, additionWhere))
955 
956  logger.debug(">>> Get tasks SQL == " + clause)
957  results = self.dbi.sql(TaskBackLogScheme(TaskLog), clause)
958  # logger.debug(self.dbi)
959  if len(results) > 0:
960  for record in results:
961  if len(resultList) >= event.eventObj.fetchNum:
962  break
963  resultList.append(record.id)
964  if event.eventObj.fetchAdditionalFields:
965  tasksList.append(record._getTaskLog())
966  else:
967  logger.debug("No tasks selected for auto check state")
968  except DBIErr as err:
969  logger.error(">>> Some DBI error in TasksManager.onFetchAvailableTasks [" + str(err.message) + "]")
970  except Exception as err:
971  ExceptionLog.handler(logger, err, "Exception:")
972  res = AvailableTaskIds(resultList, tasksList)
973  responseEvent = self.eventBuilder.build(EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, res)
974  self.reply(event, responseEvent)
975 
976 
def generateCriterionSQL(criterions, additionWhere=None, siteId=None)
Here is the call graph for this function:

◆ onFetchResultResponse()

def dtm.TasksManager.TasksManager.onFetchResultResponse (   self,
  event 
)

Definition at line 797 of file TasksManager.py.

797  def onFetchResultResponse(self, event):
798  try:
799  if event.uid in self.fetchEvents:
800  replyEvent = self.eventBuilder.build(EVENT_TYPES.FETCH_TASK_RESULTS_RESPONSE, event.eventObj)
801  self.reply(self.fetchEvents[event.uid], replyEvent)
802  logger.debug("Delete from fetchEvents[] item " + str(event.uid))
803  del self.fetchEvents[event.uid]
804  else:
805  logger.error("onFetchResultResponse no such event.uid" + str(event.uid))
806  except Exception as err:
807  ExceptionLog.handler(logger, err, "Exception:")
808 
809 
Here is the call graph for this function:

◆ onFetchResultsCache()

def dtm.TasksManager.TasksManager.onFetchResultsCache (   self,
  event 
)

Definition at line 496 of file TasksManager.py.

496  def onFetchResultsCache(self, event):
497  try:
498  fetchResultsCacheTask = event.eventObj
499  for taskId in fetchResultsCacheTask.ids:
500  self.checkTaskPresence(taskId)
501  # store events for reply
502  self.fetchEvents[event.uid] = event
503  self.send(self.TASKS_DATA_MANAGER_CLIENT, event)
504  except TaskNoPresentErr as err:
505  logger.error(err.message)
506  response = GeneralResponse(TaskNoPresentErr.ERR_CODE, err.message)
507  responseEvent = self.eventBuilder.build(EVENT_TYPES.GET_TASK_STATUS_RESPONSE, response)
508  self.reply(event, responseEvent)
509  except Exception as err:
510  ExceptionLog.handler(logger, err, "Exception:")
511 
512 
Here is the call graph for this function:

◆ onFetchTaskDataResponse()

def dtm.TasksManager.TasksManager.onFetchTaskDataResponse (   self,
  event 
)

Definition at line 591 of file TasksManager.py.

591  def onFetchTaskDataResponse(self, event):
592  logger.debug("onFetchTaskDataResponse income id = " + str(event.eventObj.id))
593  taskLog = TaskLog()
594  isCleanUp = True
595  try:
596  if event.eventObj != None:
597  if event.eventObj.strategy != None and Task.STRATEGY_RETRY in event.eventObj.strategy:
598  tempBackTaskLogScheme = self.dbi.fetch(TaskBackLogScheme(taskLog), "id=%s" % event.eventObj.id)
599  logger.debug("onFetchTaskDataResponse tempBackTaskLogScheme len = " + str(len(tempBackTaskLogScheme)))
600  if type(tempBackTaskLogScheme) == types.ListType and len(tempBackTaskLogScheme) > 0 and \
601  event.eventObj.strategy[Task.STRATEGY_RETRY] > (tempBackTaskLogScheme[0].tries + 1):
602  isCleanUp = False
603  self.updateStatField(self.VAR_TASKS_RETRIES, 1, self.STAT_FIELDS_OPERATION_ADD)
604  if tempBackTaskLogScheme[0].deleteTaskId != 0:
605  self.updateStatField(self.VAR_TASKS_DELETE_TRIES, 1, self.STAT_FIELDS_OPERATION_ADD)
606  self.updateStatField(self.VAR_TASKS_RETRIES_DEL, 1, self.STAT_FIELDS_OPERATION_ADD)
607  if isCleanUp:
608  logger.debug("onFetchTaskDataResponse cleanup income id = " + str(event.eventObj.id))
609  tempBackTaskLogScheme = TaskBackLogScheme(TaskLog())
610  tempBackTaskLogScheme.id = event.eventObj.id
611  self.cleanUpTask([tempBackTaskLogScheme])
612  # self.updateTaskBackLogToSchedulerStep(event.eventObj.id, 1, EEResponseData.TASK_STATE_SET_ERROR)
613  else:
614  logger.debug("onFetchTaskDataResponse rescheduler income id = " + str(event.eventObj.id))
615  self.updateTaskBackLogToSchedulerStep(event.eventObj.id, 1, EEResponseData.TASK_STATE_NEW_DATA_STORED)
616  operation_steps = self.createOperationSteps(event, None, True)
617  self.pendingTasks[event.uid] = TaskRecord(operation_steps, None, None)
618  operation_steps[0].ok_callback()
619  except DBIErr as err:
620  logger.error(">>> Some DBI error in TasksManager.onFetchTaskDataResponse [" + str(err.message) + "]")
621 
622 
Here is the call graph for this function:

◆ onGetTaskFields()

def dtm.TasksManager.TasksManager.onGetTaskFields (   self,
  event 
)

Definition at line 516 of file TasksManager.py.

516  def onGetTaskFields(self, event):
517  try:
518  getTaskManagerFields = event.eventObj
519  taskId = getTaskManagerFields.id
520 
521  taskManagerFields = TaskManagerFields([taskId])
522  lookTaskLogScheme = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=%s" % taskId)
523  if len(lookTaskLogScheme) > 0 and lookTaskLogScheme[0] is not None:
524  # pylint: disable-msg=W0212
525  taskManagerFields = self.createTaskManagerFields(lookTaskLogScheme[0]._getTaskLog())
526  responseEvent = self.eventBuilder.build(EVENT_TYPES.GET_TASK_FIELDS_RESPONSE, taskManagerFields)
527  self.reply(event, responseEvent)
528  except DBIErr as err:
529  logger.error(">>> Some DBI error in TasksManager.onGetTaskStatus [" + str(err.message) + "]")
530  except Exception as err:
531  ExceptionLog.handler(logger, err, "Exception:")
532 
533 
Here is the call graph for this function:

◆ onGetTaskStatus()

def dtm.TasksManager.TasksManager.onGetTaskStatus (   self,
  event 
)

Definition at line 461 of file TasksManager.py.

461  def onGetTaskStatus(self, event):
462  logger.debug("GetTaskStatus received: " + Utils.varDump(event))
463  try:
464  # #as discuss don't check states
465  getTaskStatus = event.eventObj
466  results = list()
467  for taskId in getTaskStatus.ids:
468  try:
469  lookBackTaskLogScheme = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=%s" % taskId)
470  # #always return list
471  taskManagerFields = TaskManagerFields([taskId])
472  if (hasattr(lookBackTaskLogScheme, '__iter__') and len(lookBackTaskLogScheme) > 0):
473  # pylint: disable-msg=W0212
474  taskManagerFields = self.createTaskManagerFields(lookBackTaskLogScheme[0]._getTaskLog())
475  results.append(taskManagerFields)
476  elif type(getTaskStatus.strategy) is types.DictType and GetTasksStatus.LOG_STRATEGY in \
477  getTaskStatus.strategy and getTaskStatus.strategy[GetTasksStatus.LOG_STRATEGY] == GetTasksStatus.CHECK_LOG_YES:
478  lookTaskLogScheme = self.dbi.fetch(TaskLogScheme(TaskLog), "id=%s" % taskId)
479  if (hasattr(lookTaskLogScheme, '__iter__') and len(lookTaskLogScheme) > 0):
480  for foundTask in lookTaskLogScheme:
481  # pylint: disable-msg=W0212
482  taskManagerFields = self.createTaskManagerFields(foundTask._getTaskLog())
483  results.append(taskManagerFields)
484  except DBIErr as err:
485  logger.error(">>> Some DBI error in TasksManager.onGetTaskStatus [" + str(err.message) + "]")
486  results.append(None)
487  responseEvent = self.eventBuilder.build(EVENT_TYPES.GET_TASK_STATUS_RESPONSE, results)
488  self.reply(event, responseEvent)
489  except Exception as err:
490  ExceptionLog.handler(logger, err, "Exception:")
491 
492 
Here is the call graph for this function:

◆ onNewTask()

def dtm.TasksManager.TasksManager.onNewTask (   self,
  event 
)

Definition at line 201 of file TasksManager.py.

201  def onNewTask(self, event):
202  try:
203  operation_steps = self.createOperationSteps(event, EVENT_TYPES.NEW_TASK)
204  logger.debug("Insert pendingTasks[] item " + str(event.uid))
205  logger.debug("New Task event " + str(event.eventObj.id))
206  dbiRecords = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=%s" % str(event.eventObj.id))
207  if hasattr(dbiRecords, '__iter__') and len(dbiRecords) > 0 and dbiRecords[0] is not None:
208  logger.debug("Task Id already exist")
209  responseEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK_RESPONSE, GeneralResponse())
210  responseEvent.uid = event.uid
211  self.reply(event, responseEvent)
212  else:
213  self.pendingTasks[event.uid] = TaskRecord(operation_steps, event, EVENT_TYPES.NEW_TASK_RESPONSE)
214  operation_steps[0].ok_callback()
215  except Exception as err:
216  ExceptionLog.handler(logger, err, "Exception:")
217  self.checkCleanUp()
218 
219 
Here is the call graph for this function:

◆ onTasksManagerGeneralResponse()

def dtm.TasksManager.TasksManager.onTasksManagerGeneralResponse (   self,
  event 
)

Definition at line 743 of file TasksManager.py.

743  def onTasksManagerGeneralResponse(self, event):
744  try:
745  logger.debug("onTasksManagerGeneralResponse, event:" + str(event.uid) + "\n" + Utils.varDump(event))
746  self.processOperationStep(event)
747  except Exception as err:
748  ExceptionLog.handler(logger, err, "Exception:")
749 
750 
Here is the call graph for this function:

◆ onUpdateTask()

def dtm.TasksManager.TasksManager.onUpdateTask (   self,
  event 
)

Definition at line 336 of file TasksManager.py.

336  def onUpdateTask(self, event):
337  try:
338  response = GeneralResponse()
339  updateTask = event.eventObj
340  self.checkTaskPresence(updateTask.id)
341 
342  # don't need to update backlog?? because we use only Task.id
343  self.send(self.TASKS_DATA_MANAGER_CLIENT, event)
344  updateTask.files = None
345  self.tasksQueue[updateTask.id] = updateTask
346  schedulerEvent = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK, self.tasksQueue[updateTask.id])
347  self.send(self.SCHEDULER_CLIENT, schedulerEvent)
348 
349  except TaskNoPresentErr as err:
350  logger.error(err.message)
351  response = GeneralResponse(TaskNoPresentErr.ERR_CODE, err.message)
352  except Exception as err:
353  ExceptionLog.handler(logger, err, "Exception:")
354  finally:
355  responseEvent = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK_RESPONSE, response)
356  self.reply(event, responseEvent)
357 
358 
359 
Here is the call graph for this function:

◆ onUpdateTaskField()

def dtm.TasksManager.TasksManager.onUpdateTaskField (   self,
  event 
)

Definition at line 537 of file TasksManager.py.

537  def onUpdateTaskField(self, event):
538  generalResponse = GeneralResponse()
539  updateTaskFields = event.eventObj
540  try:
541  # Removing "host" and "port" fields from updateTaskFields.fields and check is state value to update valid
542  updateTaskFields.fields = self.clearEmptyFields(updateTaskFields.fields, updateTaskFields.id)
543 
544  taskLog = self.createTaskLogFromDic(updateTaskFields.fields)
545  taskLogBackScheme = self.dbi.update(TaskBackLogScheme(taskLog), "id=%s" % updateTaskFields.id)
546  if taskLog.state == EEResponseData.TASK_STATE_SET_ERROR or taskLog.state == EEResponseData.TASK_STATE_CRASHED:
547  self.updateStatField(self.VAR_TASKS_ERRORS, 1, self.STAT_FIELDS_OPERATION_ADD)
548  if taskLog.state == EEResponseData.TASK_STATE_DELETED or taskLog.state == EEResponseData.TASK_STATE_TERMINATED:
549  logger.debug("Delete task is deleted, call cleanUpTask() [%s]", str(taskLog.state))
550  self.cleanUpTask(taskLogBackScheme)
551  if "deleteTaskId" in updateTaskFields.fields and updateTaskFields.fields["deleteTaskId"] != None and \
552  updateTaskFields.fields["deleteTaskId"] > 0:
553  taskLogTerm = TaskLog()
554  taskLogTerm.id = int(updateTaskFields.fields["deleteTaskId"])
555  taskLogTerm.state = int(updateTaskFields.fields["deleteTaskState"])
556  taskLogBackSchemeTerm = self.dbi.update(TaskBackLogScheme(taskLogTerm), "id=%s" % taskLogTerm.id)
557  if taskLogTerm.state == EEResponseData.TASK_STATE_DELETED or \
558  taskLogTerm.state == EEResponseData.TASK_STATE_TERMINATED:
559  logger.debug("Task to delete is terminated, call cleanUpTask()")
560  self.cleanUpTask(taskLogBackSchemeTerm)
561  else:
562  logger.debug("Task to delete is not terminated, state: " + str(taskLogTerm.state))
563  else:
564  logger.debug("Task to delete is None or Empty")
565  generalResponse.errorCode = DeleteTask.RESPONSE_CODE_DRCE_ERROR
566  generalResponse.errorMessage = "EEManager returns empty deleteTaskId"
567  self.sendGroupDeleteResponse()
568  elif taskLog.state == EEResponseData.TASK_STATE_SET_ERROR:
569  logger.debug("Delete task by state=TASK_STATE_SET_ERROR, id = " + str(updateTaskFields.id))
570  self.cleanUpTaskNetworkOperation(updateTaskFields.id, False)
571  self.restoreTaskSteps(updateTaskFields.id)
572  else:
573  logger.debug("State is: " + str(taskLog.state))
574  self.checkCleanUp(updateTaskFields.id)
575  except DBIErr as err:
576  logger.error(">>> Some DBI error in TasksManager.onUpdateTaskField [" + str(err.message) + "]")
577  generalResponse.errorCode = DeleteTask.RESPONSE_CODE_DBI_ERROR
578  generalResponse.errorMessage = ("DBIError=%s" % str(err.message))
579  except Exception as err:
580  ExceptionLog.handler(logger, err, "Exception:")
581  generalResponse.errorCode = DeleteTask.RESPONSE_CODE_UNKNOWN_ERROR
582  generalResponse.errorMessage = ("Unknown Error=%s" % str(err.message))
583  responseEvent = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK_FIELDS_RESPONSE, generalResponse)
584  self.reply(event, responseEvent)
585 
586 
587 
Here is the call graph for this function:

◆ processOperationStep()

def dtm.TasksManager.TasksManager.processOperationStep (   self,
  event 
)

Definition at line 754 of file TasksManager.py.

754  def processOperationStep(self, event):
755  generalResponse = event.eventObj
756  if event.uid in self.pendingTasks:
757  if generalResponse.errorCode == GeneralResponse.ERROR_OK:
758  if len(self.pendingTasks[event.uid].tasksteps) > 1:
759  if event.uid in self.pendingTasks:
760  logger.debug("Update pendingTasks[] item " + str(event.uid))
761  else:
762  logger.debug("Insert pendingTasks[] item " + str(event.uid))
763  self.pendingTasks[event.uid] = TaskRecord(self.pendingTasks[event.uid].tasksteps[1:],
764  self.pendingTasks[event.uid].event,
765  self.pendingTasks[event.uid].responseEventType)
766  self.pendingTasks[event.uid].tasksteps[0].ok_callback()
767  else:
768  self.replyGeneralResponse(event)
769  logger.debug("Delete 1 from pendingTasks[] item " + str(event.uid))
770  del self.pendingTasks[event.uid]
771  else:
772  if self.pendingTasks[event.uid].tasksteps[0].err_callback:
773  self.pendingTasks[event.uid].tasksteps[0].err_callback()
774  self.replyGeneralResponse(event)
775  logger.debug("Delete 2 from pendingTasks[] item " + str(event.uid))
776  del self.pendingTasks[event.uid]
777  else:
778  logger.debug("processOperationStepr: pending event " + str(event.uid) + " " + str(self.pendingTasks))
779 
780 
781 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processSchedulerFailure()

def dtm.TasksManager.TasksManager.processSchedulerFailure (   self,
  event 
)

Definition at line 921 of file TasksManager.py.

921  def processSchedulerFailure(self, event):
922  # discuss rescheduling strategy and etc
923  # pass
924  logger.debug("Event: " + str(event))
925 
926 
927 
Here is the caller graph for this function:

◆ processTasksDataManagerFailure()

def dtm.TasksManager.TasksManager.processTasksDataManagerFailure (   self,
  event 
)

Definition at line 915 of file TasksManager.py.

915  def processTasksDataManagerFailure(self, event):
916  # discuss
917  logger.debug("Event: " + str(event))
918 
919 
920 

◆ replyGeneralResponse()

def dtm.TasksManager.TasksManager.replyGeneralResponse (   self,
  event 
)

Definition at line 786 of file TasksManager.py.

786  def replyGeneralResponse(self, event):
787  if self.pendingTasks[event.uid].responseEventType != None:
788  generalResponse = event.eventObj
789  responseEvent = self.eventBuilder.build(self.pendingTasks[event.uid].responseEventType, generalResponse)
790  self.reply(self.pendingTasks[event.uid].event, responseEvent)
791 
792 
793 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ restoreTaskSteps()

def dtm.TasksManager.TasksManager.restoreTaskSteps (   self,
  taskId 
)

Definition at line 626 of file TasksManager.py.

626  def restoreTaskSteps(self, taskId):
627  logger.debug("restoreTaskSteps id = " + str(taskId))
628  fetchTaskData = FetchTaskData(taskId)
629  new_event = self.eventBuilder.build(EVENT_TYPES.FETCH_TASK_DATA, fetchTaskData)
630  self.send(self.TASKS_DATA_MANAGER_CLIENT, new_event)
631 
632 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ sendGroupDeleteResponse()

def dtm.TasksManager.TasksManager.sendGroupDeleteResponse (   self)

Definition at line 446 of file TasksManager.py.

446  def sendGroupDeleteResponse(self):
447  if self.groupDeleteResponseEvent is not None:
448  suspendedTasks = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=id")
449  if not (hasattr(suspendedTasks, '__iter__') and len(suspendedTasks) > 0 and suspendedTasks[0] is not None):
450  response = GeneralResponse()
451  responseEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK_RESPONSE, response)
452  self.reply(self.groupDeleteResponseEvent, responseEvent)
453  logger.debug(">>> Send Group delete back")
454  self.groupDeleteResponseEvent = None
455 
456 
457 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ simpleDeleteTask()

def dtm.TasksManager.TasksManager.simpleDeleteTask (   self,
  event,
  responseEventType 
)

Definition at line 416 of file TasksManager.py.

416  def simpleDeleteTask(self, event, responseEventType):
417  response = None
418  try:
419  deleteTask = event.eventObj
420  taskLog = self.createTaskLog(deleteTask)
421  taskLog.id = deleteTask.deleteTaskId
422  if self.getDeletedTask(taskLog) is None:
423  dbiRecords = self.dbi.fetch(TaskBackLogScheme(TaskLog), "id=%s" % taskLog.id)
424  if len(dbiRecords) > 0:
425  operation_steps = self.createOperationSteps(event, EVENT_TYPES.NEW_TASK)
426  logger.debug("Update pendingTasks[] item " + str(event.uid))
427  self.pendingTasks[event.uid] = TaskRecord(operation_steps, event, responseEventType)
428  operation_steps[0].ok_callback()
429  self.updateStatField(self.VAR_TASKS_DELETE_TRIES, 1, self.STAT_FIELDS_OPERATION_ADD)
430  else:
431  raise DBIErr(dbi_consts.DBI_SUCCESS_CODE + 1, "Task id=" + str(taskLog.id) + " is absent in taskBackLog")
432  else:
433  raise DBIErr(dbi_consts.DBI_SUCCESS_CODE + 2, "Task id=" + str(taskLog.id) + " is deleted by other task")
434  except DBIErr as err:
435  logger.error(">>> Some DBI error in TasksManager.simpleDeleteTask [" + str(err.message) + "]")
436  response = GeneralResponse(GeneralResponse.ERROR_OK, err.message)
437  response.statuses.append(err.errCode)
438  except Exception as err:
439  ExceptionLog.handler(logger, err, "Exception:")
440  return response
441 
442 
443 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ statFieldsRecalculate()

def dtm.TasksManager.TasksManager.statFieldsRecalculate (   self,
  taskLogScheme 
)

Definition at line 637 of file TasksManager.py.

637  def statFieldsRecalculate(self, taskLogScheme):
638  logger.debug(">>> Start static recalculationg")
639  localDict = {}
640  localDict[self.VAR_TASKS_TIME_MIN] = None
641  localDict = self.getStatDataFields(localDict)
642  if localDict is not None and self.VAR_TASKS_TIME_MIN in localDict and localDict[self.VAR_TASKS_TIME_MIN] \
643  is not None:
644  if taskLogScheme.pTime < localDict[self.VAR_TASKS_TIME_MIN] or localDict[self.VAR_TASKS_TIME_MIN] == 0:
645  self.updateStatField(self.VAR_TASKS_TIME_MIN, taskLogScheme.pTime, self.STAT_FIELDS_OPERATION_SET)
646  localDict = {}
647  localDict[self.VAR_TASKS_TIME_MAX] = None
648  localDict = self.getStatDataFields(localDict)
649  if localDict is not None and self.VAR_TASKS_TIME_MAX in localDict and localDict[self.VAR_TASKS_TIME_MAX] \
650  is not None:
651  if taskLogScheme.pTime > localDict[self.VAR_TASKS_TIME_MAX]:
652  self.updateStatField(self.VAR_TASKS_TIME_MAX, taskLogScheme.pTime, self.STAT_FIELDS_OPERATION_SET)
653 
654  self.updateStatField(self.VAR_TASKS_TIME_SUM, taskLogScheme.pTime, self.STAT_FIELDS_OPERATION_ADD)
655  self.updateStatField(self.VAR_TASKS_TIME_COUNT, 1, self.STAT_FIELDS_OPERATION_ADD)
656  localDict = {}
657  localDict[self.VAR_TASKS_TIME_SUM] = None
658  localDict[self.VAR_TASKS_TIME_COUNT] = None
659 
660  localDict = self.getStatDataFields(localDict)
661  if localDict is not None and self.VAR_TASKS_TIME_SUM in localDict and localDict[self.VAR_TASKS_TIME_SUM] is not \
662  None and self.VAR_TASKS_TIME_COUNT in localDict and localDict[self.VAR_TASKS_TIME_COUNT] is not None:
663  self.updateStatField(self.VAR_TASKS_TIME_AVG, \
664  localDict[self.VAR_TASKS_TIME_SUM] / localDict[self.VAR_TASKS_TIME_COUNT], self.STAT_FIELDS_OPERATION_SET)
665 
666 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ updateTaskBackLogToSchedulerStep()

def dtm.TasksManager.TasksManager.updateTaskBackLogToSchedulerStep (   self,
  localId,
  incr,
  newState 
)

Definition at line 729 of file TasksManager.py.

729  def updateTaskBackLogToSchedulerStep(self, localId, incr, newState):
730  taskLog = TaskLog()
731  tempBackTaskLogScheme = self.dbi.fetch(TaskBackLogScheme(taskLog), "id=%s" % localId)
732  if hasattr(tempBackTaskLogScheme, '__iter__') and len(tempBackTaskLogScheme) > 0:
733  tempBackTaskLogScheme[0].tries += incr
734  tempBackTaskLogScheme[0].state = newState
735  self.dbi.update(tempBackTaskLogScheme[0], "id=%s" % localId)
736  else:
737  logger.error("Error can't fetch record from TaskBackLog with task id = " + str(localId))
738 
739 
Here is the caller graph for this function:

Member Data Documentation

◆ AUTO_CLEANUP_TIME_SLOT_PERIOD

string dtm.TasksManager.TasksManager.AUTO_CLEANUP_TIME_SLOT_PERIOD = "autoCleanUpSlotPeriod"
static

Definition at line 74 of file TasksManager.py.

◆ cfg_section

dtm.TasksManager.TasksManager.cfg_section

Definition at line 101 of file TasksManager.py.

◆ CLEANUP_TABLES_LIST

list dtm.TasksManager.TasksManager.CLEANUP_TABLES_LIST
static
Initial value:
= ['task_back_log_scheme',
'tasks_data_table',
'scheduler_task_scheme',
'ee_responses_table',
'resources_table']

Definition at line 89 of file TasksManager.py.

◆ cleanUpTimeout

dtm.TasksManager.TasksManager.cleanUpTimeout

Definition at line 161 of file TasksManager.py.

◆ CONFIG_TIME_SLOT_PERIOD

string dtm.TasksManager.TasksManager.CONFIG_TIME_SLOT_PERIOD = "timeSlotPeriod"
static

Definition at line 73 of file TasksManager.py.

◆ dbi

dtm.TasksManager.TasksManager.dbi

Definition at line 147 of file TasksManager.py.

◆ fetchEvents

dtm.TasksManager.TasksManager.fetchEvents

Definition at line 151 of file TasksManager.py.

◆ groupDeleteResponseEvent

dtm.TasksManager.TasksManager.groupDeleteResponseEvent

Definition at line 103 of file TasksManager.py.

◆ pendingTasks

dtm.TasksManager.TasksManager.pendingTasks

Definition at line 143 of file TasksManager.py.

◆ prevCleanUpTime

dtm.TasksManager.TasksManager.prevCleanUpTime

Definition at line 162 of file TasksManager.py.

◆ SCHEDULER_CLIENT

string dtm.TasksManager.TasksManager.SCHEDULER_CLIENT = "clientScheduler"
static

Definition at line 72 of file TasksManager.py.

◆ SERVER

string dtm.TasksManager.TasksManager.SERVER = "server"
static

Definition at line 70 of file TasksManager.py.

◆ TASKS_DATA_MANAGER_CLIENT

string dtm.TasksManager.TasksManager.TASKS_DATA_MANAGER_CLIENT = "clientTasksDataManager"
static

Definition at line 71 of file TasksManager.py.

◆ tasksQueue

dtm.TasksManager.TasksManager.tasksQueue

Definition at line 138 of file TasksManager.py.

◆ VAR_TASKS_DELETE_TRIES

string dtm.TasksManager.TasksManager.VAR_TASKS_DELETE_TRIES = "tasks_delete_tries"
static

Definition at line 86 of file TasksManager.py.

◆ VAR_TASKS_ERRORS

string dtm.TasksManager.TasksManager.VAR_TASKS_ERRORS = "tasks_errors"
static

Definition at line 83 of file TasksManager.py.

◆ VAR_TASKS_RETRIES

string dtm.TasksManager.TasksManager.VAR_TASKS_RETRIES = "tasks_retries"
static

Definition at line 84 of file TasksManager.py.

◆ VAR_TASKS_RETRIES_DEL

string dtm.TasksManager.TasksManager.VAR_TASKS_RETRIES_DEL = "tasks_retries_del"
static

Definition at line 85 of file TasksManager.py.

◆ VAR_TASKS_TIME_AVG

string dtm.TasksManager.TasksManager.VAR_TASKS_TIME_AVG = "tasks_time_avg"
static

Definition at line 80 of file TasksManager.py.

◆ VAR_TASKS_TIME_COUNT

string dtm.TasksManager.TasksManager.VAR_TASKS_TIME_COUNT = "tasks_time_count"
static

Definition at line 79 of file TasksManager.py.

◆ VAR_TASKS_TIME_MAX

string dtm.TasksManager.TasksManager.VAR_TASKS_TIME_MAX = "tasks_time_max"
static

Definition at line 82 of file TasksManager.py.

◆ VAR_TASKS_TIME_MIN

string dtm.TasksManager.TasksManager.VAR_TASKS_TIME_MIN = "tasks_time_min"
static

Definition at line 81 of file TasksManager.py.

◆ VAR_TASKS_TIME_SUM

string dtm.TasksManager.TasksManager.VAR_TASKS_TIME_SUM = "tasks_time_sum"
static

Definition at line 78 of file TasksManager.py.

◆ VAR_TASKS_TOTAL

string dtm.TasksManager.TasksManager.VAR_TASKS_TOTAL = "tasks_total"
static

Definition at line 76 of file TasksManager.py.

◆ VAR_TASKS_TOTAL_DEL

string dtm.TasksManager.TasksManager.VAR_TASKS_TOTAL_DEL = "tasks_total_del"
static

Definition at line 77 of file TasksManager.py.


The documentation for this class was generated from the following file: