HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager Class Reference
Inheritance diagram for dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager:
Collaboration diagram for dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager:

Public Member Functions

def __init__ (self, configParser, connectionBuilderLight=None)
 
def onExecuteTask (self, event)
 
def onFetchTaskDataResponse (self, event)
 
def processUpdateTaskFields (self, operationType, eeResponseData, cookie=None)
 
def processNewTask (self, newTaskObj)
 
def sendToDRCERouter (self, request)
 
def convertToEEResponse (self, response)
 
def convertToTaskResponse (self, rawResponse)
 
def onCheckTaskState (self, event)
 
def onFetchTaskResults (self, event)
 
def onDeleteTaskResults (self, event)
 
def sendToHCENodeAdmin (self, host, port, messageParameters)
 
def sendGetTaskManagerFieldsRequest (self, taskId, cookieData=None)
 
def processFetchTaskResults (self, event)
 
def processCheckTaskState (self, event)
 
def createTaskDeleteRequest (self, deleteTaskObj)
 
def checkDelTaskState (self, state, action)
 
def processDeleteTask (self, event)
 
def createGeneralResponse (self, errCode, errMessage, errLog)
 
def processDeleteTaskResults (self, event)
 
def onGetTaskManagerFieldsResponse (self, event)
 
def onInsertEEDataResponse (self, event)
 
def onUpdateTasksFieldsResponse (self, event)
 
def onDeleteEEDataResponse (self, event)
 
- Public Member Functions inherited from app.BaseServerManager.BaseServerManager
def __init__ (self, poller_manager=None, admin_connection=None, conectionLightBuilder=None, exceptionForward=False, dumpStatVars=True)
 constructor More...
 
def addConnection (self, name, connection)
 
def setEventHandler (self, eventType, handler)
 set event handler rewrite the current handler for eventType More...
 
def send (self, connect_name, event)
 send event More...
 
def reply (self, event, reply_event)
 wrapper for sending event in reply for event More...
 
def poll (self)
 poll function polling connections receive as multipart msg, the second argument is pickled pyobj More...
 
def process (self, event)
 process event call the event handler method that was set by user or on_unhandled_event method if not set More...
 
def run (self)
 
def is_connection_registered (self, name)
 check is a connection was registered in a instance of BaseServerManager i object More...
 
def on_poll_timeout (self)
 function will call every time when ConnectionTimeout exception arrive More...
 
def on_unhandled_event (self, event)
 function will call every time when arrive doesn't set handler for event type of event.evenType More...
 
def build_poller_list (self)
 
def clear_poller (self)
 
def onAdminState (self, event)
 onAdminState event handler process admin SHUTDOWN command More...
 
def onAdminFetchStatData (self, event)
 onAdminState event handler process admin command More...
 
def onAdminSuspend (self, event)
 onAdminState event handler process admin command More...
 
def getStatDataFields (self, fields)
 getStatDataFields returns stat data from storage More...
 
def getSystemStat (self)
 getSystemStat returns stat data for system indicators: RAMV, RAMR and CPU More...
 
def getConfigVarsFields (self, fields)
 getConfigVarsFields returns config vars from storage More...
 
def onAdminGetConfigVars (self, event)
 onAdminGetConfigVars event handler process getConfigVars admin command, fill and return config vars array from internal storage More...
 
def onAdminSetConfigVars (self, event)
 onAdminSetConfigVars event handler process setConfigVars admin command More...
 
def setConfigVars (self, setConfigVars)
 processSetConfigVars sets config vars in storage More...
 
def sendAdminReadyEvent (self)
 send ready event to notify adminInterfaceService More...
 
def createLogMsg (self, event)
 from string message from event object More...
 
def initStatFields (self, connect_name)
 add record in statFields More...
 
def updateStatField (self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
 update values of stat field - default sum More...
 
def processSpecialConfigVars (self, name, value)
 send ready event to notify adminInterfaceService More...
 
def getLogLevel (self)
 Get log level from first of existing loggers. More...
 
def setLogLevel (self, level)
 Set log level for all loggers. More...
 
def saveStatVarsDump (self)
 Save stat vars in json file. More...
 
def loadStatVarsDump (self)
 Load stat vars in json file. More...
 
def getStatVarsDumpFileName (self)
 Get stat vars file name. More...
 
def createDBIDict (self, configParser)
 

Public Attributes

 serverName
 
 clientTasksManagerName
 
 clientTasksDataManagerName
 
 drceHost
 
 drcePort
 
 drceTimeout
 
 drceManager
 
 drceIdGenerator
 
 drceCommandConvertor
 
 hceNodeAdminTimeout
 
 hceNodeManagerRequest
 
- Public Attributes inherited from app.BaseServerManager.BaseServerManager
 dumpStatVars
 
 poller_manager
 
 eventBuilder
 
 exit_flag
 
 pollTimeout
 
 connections
 
 event_handlers
 
 statFields
 stat fields container More...
 
 configVars
 
 exceptionForward
 

Static Public Attributes

string CONFIG_SERVER = "server"
 
string CONFIG_TASKS_MANAGER_CLIENT = "clientTasksManager"
 
string CONFIG_TASKS_MANAGER_DATA_CLIENT = "clientTasksDataManager"
 
string CONFIG_DRCE_HOST = "DRCEHost"
 
string CONFIG_DRCE_PORT = "DRCEPort"
 
string CONFIG_DRCE_TIMEOUT = "DRCETimeout"
 
string CONFIG_HCE_NODE_ADMIN_TIMEOUT = "HCENodeAdminTimeout"
 
string ERROR_MSG_DRCE_ROUTER_NEW_TASK = "DRCE Router request error!"
 
string ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION = "HCE node Admin API response processing exception"
 
string ERROR_HCE_RESPONSE_PROCESSING_SPLIT = "HCE node Admin API response processing can't to split status code"
 
string ERROR_INSERT_EE_DATA = "Error insert EE response data operation"
 
string ERROR_UPDATE_TASKS_FIELDS = "Update tasks fields error"
 
string ERROR_WRONG_OBJECT_TYPE = "Wrong object type from TasksDataManager"
 
string ERROR_EE_RESPONSE_OBJECT_TYPE_OR_RESPONSE_ERROR = "EEResponseData object error or wrong response structure"
 
string ERROR_HCE_ADMIN_REQUEST_ERROR = "HCE Admin request error"
 
int ERROR_DELETE_TASK_RESULTS = 1
 
string ERROR_DELETE_TASK_RESULTS_MESSAGE = "Delete task results error of EE request response or TaskManager!"
 
int OPERATION_NEW_TASK = 0
 
int OPERATION_DELETE_TASK = 1
 
int OPERATION_CHECK_STATE = 2
 
int OPERATION_FETCH_RESULTS = 3
 
- Static Public Attributes inherited from app.BaseServerManager.BaseServerManager
string ADMIN_CONNECT_ENDPOINT = "Admin"
 
string ADMIN_CONNECT_CLIENT = "Admin"
 
int POLL_TIMEOUT_DEFAULT = 3000
 
int STAT_FIELDS_OPERATION_ADD = 0
 
int STAT_FIELDS_OPERATION_SUB = 1
 
int STAT_FIELDS_OPERATION_SET = 2
 
int STAT_FIELDS_OPERATION_INIT = 3
 
string POLL_TIMEOUT_CONFIG_VAR_NAME = "POLL_TIMEOUT"
 
string LOG_LEVEL_CONFIG_VAR_NAME = "LOG_LEVEL"
 
string STAT_DUMPS_DEFAULT_DIR = "/tmp/"
 
string STAT_DUMPS_DEFAULT_NAME = "%APP_NAME%_%CLASS_NAME%_stat_vars.dump"
 
dictionary LOGGERS_NAMES = {APP_CONSTS.LOGGER_NAME, "dc", "dtm", "root", ""}
 

Detailed Description

Definition at line 52 of file ExecutionEnvironmentManager.py.

Constructor & Destructor Documentation

◆ __init__()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.__init__ (   self,
  configParser,
  connectionBuilderLight = None 
)

Definition at line 87 of file ExecutionEnvironmentManager.py.

87  def __init__(self, configParser, connectionBuilderLight=None):
88  super(ExecutionEnvironmentManager, self).__init__()
89 
90  # Instantiate the connection builder light if not set
91  if connectionBuilderLight == None:
92  connectionBuilderLight = ConnectionBuilderLight()
93 
94  className = self.__class__.__name__
95 
96  # Get configuration settings
97  self.serverName = configParser.get(className, self.CONFIG_SERVER)
98  self.clientTasksManagerName = configParser.get(className, self.CONFIG_TASKS_MANAGER_CLIENT)
99  self.clientTasksDataManagerName = configParser.get(className, self.CONFIG_TASKS_MANAGER_DATA_CLIENT)
100 
101  # Create connections and raise bind or connect actions for correspondent connection type
102  serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.serverName)
103  tasksManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT, self.clientTasksManagerName)
104  tasksDataManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
105  self.clientTasksDataManagerName)
106  # Add connections to the polling set
107  self.addConnection(self.serverName, serverConnection)
108  self.addConnection(self.clientTasksManagerName, tasksManagerConnection)
109  self.addConnection(self.clientTasksDataManagerName, tasksDataManagerConnection)
110 
111  # Set handlers
112  # Set event handler for EXECUTE_TASK event
113  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.EXECUTE_TASK, self.onExecuteTask)
114  # Set event handler for CHECK_TASK_STATE event
115  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE, self.onCheckTaskState)
116  # Set event handler for FETCH_TASK_RESULTS event
117  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.FETCH_TASK_RESULTS, self.onFetchTaskResults)
118  # Set event handler for FETCH_TASK_DATA_RESPONSE event
119  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.FETCH_TASK_DATA_RESPONSE, self.onFetchTaskDataResponse)
120  # Set event handler for GET_TASK_FIELDS_RESPONSE event
121  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.GET_TASK_FIELDS_RESPONSE, self.onGetTaskManagerFieldsResponse)
122  # Set event handler for INSERT_EE_DATA_RESPONSE event
123  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.INSERT_EE_DATA_RESPONSE, self.onInsertEEDataResponse)
124  # Set event handler for UPDATE_TASK_FIELDS_RESPONSE event
125  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS_RESPONSE, self.onUpdateTasksFieldsResponse)
126  # Set event handler for DELETE_TASK_RESULTS event
127  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.DELETE_TASK_RESULTS, self.onDeleteTaskResults)
128  # Set event handler for DELETE_EE_DATA_RESPONSE event
129  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.DELETE_EE_DATA_RESPONSE, self.onDeleteEEDataResponse)
130 
131  # Initialize DRCE API
132  self.drceHost = configParser.get(className, self.CONFIG_DRCE_HOST)
133  self.drcePort = configParser.get(className, self.CONFIG_DRCE_PORT)
134  self.drceTimeout = configParser.getint(className, self.CONFIG_DRCE_TIMEOUT)
135  hostParams = HostParams(self.drceHost, self.drcePort)
136  self.drceManager = DRCEManager()
137  self.drceManager.activate_host(hostParams)
138  self.drceIdGenerator = UIDGenerator()
139  self.drceCommandConvertor = CommandConvertor()
140 
141  # Initialize HCE node Admin API
142  self.hceNodeAdminTimeout = configParser.getint(className, self.CONFIG_HCE_NODE_ADMIN_TIMEOUT)
143  self.hceNodeManagerRequest = admin.NodeManagerRequest.NodeManagerRequest()
144 
145 
NodeManagerRequest class contents all data needed for admin level's request sending.
def __init__(self)
constructor
Definition: UIDGenerator.py:19

Member Function Documentation

◆ checkDelTaskState()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.checkDelTaskState (   self,
  state,
  action 
)

Definition at line 618 of file ExecutionEnvironmentManager.py.

618  def checkDelTaskState(self, state, action):
619  ret = False
620  if state == EEResponseData.TASK_STATE_FINISHED or\
621  state == EEResponseData.TASK_STATE_CRASHED or\
622  state == EEResponseData.TASK_STATE_TERMINATED or\
623  state == EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL or\
624  state == EEResponseData.TASK_STATE_DELETED or\
625  state == EEResponseData.TASK_STATE_UNDEFINED or\
626  state == EEResponseData.TASK_STATE_SET_ERROR or\
627  state == EEResponseData.TASK_STATE_NOT_FOUND or\
628  state == EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED or\
629  action == EventObjects.DeleteTask.ACTION_TERMINATE_TASK_AND_DELETE_DATA:
630  if action != EventObjects.DeleteTask.ACTION_DELETE_ON_DTM:
631  ret = True
632  return ret
633 
634 
Here is the caller graph for this function:

◆ convertToEEResponse()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.convertToEEResponse (   self,
  response 
)

Definition at line 342 of file ExecutionEnvironmentManager.py.

342  def convertToEEResponse(self, response):
343  # Check response on validity, None if timeout reached
344  if response is not None and len(response.items) > 0:
345  eeR = EEResponseData(response.items[0].id)
346  # Fill eeR with fields from the returned object from EE
347  eeR.type = response.items[0].type
348  eeR.errorCode = response.items[0].error_code
349  eeR.errorMessage = response.items[0].error_message
350  eeR.state = response.items[0].state
351  eeR.pId = response.items[0].pid
352  eeR.requestTime = response.items[0].time
353  eeR.nodeHost = response.items[0].host
354  eeR.nodePort = response.items[0].port
355  eeR.nodeName = response.items[0].node
356  eeR.stdout = response.items[0].stdout
357  eeR.stderr = response.items[0].stderror
358  eeR.exitStatus = response.items[0].exit_status
359  eeR.taskTime = response.items[0].time
360  eeR.files = response.items[0].files
361  eeR.fields = response.items[0].fields
362  logger.debug("To EEResponseData converted!")
363  else:
364  # Fill eeR with error state info if timeout reached
365  eeR = EEResponseData(0)
366  eeR.errorCode = EEResponseData.ERROR_CODE_TIMEOUT
367  eeR.errorMessage = EEResponseData.ERROR_MESSAGE_TIMEOUT
368  logger.error(self.ERROR_EE_RESPONSE_OBJECT_TYPE_OR_RESPONSE_ERROR)
369 
370  return eeR
371 
372 
373 
Here is the caller graph for this function:

◆ convertToTaskResponse()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.convertToTaskResponse (   self,
  rawResponse 
)

Definition at line 378 of file ExecutionEnvironmentManager.py.

378  def convertToTaskResponse(self, rawResponse):
379  taskResponse = None
380 
381  # Parse response status
382  items = rawResponse.split(admin.Constants.COMMAND_DELIM)
383  if len(items) > 1:
384  # Convert DRCE jason protocol response to TaskResponse object
385  try:
386  taskResponse = self.drceCommandConvertor.from_json(items[1])
387  logger.debug("To taskResponse converted!")
388  except Exception, e:
389  logger.error(self.ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION + " : " + e.__doc__ + " : " + str(e.message))
390  else:
391  logger.error(self.ERROR_HCE_RESPONSE_PROCESSING_SPLIT)
392 
393  return taskResponse
394 
395 
396 
Here is the caller graph for this function:

◆ createGeneralResponse()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.createGeneralResponse (   self,
  errCode,
  errMessage,
  errLog 
)

Definition at line 731 of file ExecutionEnvironmentManager.py.

731  def createGeneralResponse(self, errCode, errMessage, errLog):
732  generalResponseObj = EventObjects.GeneralResponse()
733  generalResponseObj.errorCode = errCode
734  generalResponseObj.errorMessage = errMessage
735  logger.debug(errLog)
736  return generalResponseObj
737 
738 
Here is the caller graph for this function:

◆ createTaskDeleteRequest()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.createTaskDeleteRequest (   self,
  deleteTaskObj 
)

Definition at line 599 of file ExecutionEnvironmentManager.py.

599  def createTaskDeleteRequest(self, deleteTaskObj):
600  ret = None
601  if deleteTaskObj.action == EventObjects.DeleteTask.ACTION_DELETE_TASK_DATA:
602  # Prepare DRCE request object for delete task's data
603  ret = TaskDeleteRequest(deleteTaskObj.deleteTaskId)
604  elif deleteTaskObj.action == EventObjects.DeleteTask.ACTION_TERMINATE_TASK_AND_DELETE_DATA:
605  # Prepare DRCE request object for terminate task and delete it's data (default init)
606  ret = TaskTerminateRequest(deleteTaskObj.deleteTaskId)
607  else:
608  # Prepare DRCE request object for terminate task and leave it's data
609  ret = TaskTerminateRequest(deleteTaskObj.deleteTaskId)
610  ret.data["cleanup"] = DRCE_CONSTS.TERMINATE_DATA_SAVE
611  return ret
612 
613 
Here is the caller graph for this function:

◆ onCheckTaskState()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.onCheckTaskState (   self,
  event 
)

Definition at line 400 of file ExecutionEnvironmentManager.py.

400  def onCheckTaskState(self, event):
401  try:
402  # Get event object
403  checkTaskStateObj = event.eventObj
404  # Get TaskManager fields
405  self.sendGetTaskManagerFieldsRequest(checkTaskStateObj.id, ("onCheckTaskState", event))
406  except Exception as err:
407  tbi = Utils.getTracebackInfo()
408  logger.error("Exception: " + str(err.message) + "\n" + tbi)
409 
410 
411 
Here is the call graph for this function:

◆ onDeleteEEDataResponse()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.onDeleteEEDataResponse (   self,
  event 
)

Definition at line 870 of file ExecutionEnvironmentManager.py.

870  def onDeleteEEDataResponse(self, event):
871  try:
872  # Get task Id from event
873  generalResponse = event.eventObj
874  if generalResponse.errorCode != EventObjects.GeneralResponse.ERROR_OK:
875  logger.error(LogFormatterEvent(event, [], self.ERROR_UPDATE_TASKS_FIELDS))
876 
877  if event.cookie is not None and event.cookie[0] == "processDeleteTaskResults":
878  # Continue the onDeleteTaskResults handling
879  generalResponseObj = EventObjects.GeneralResponse()
880  generalResponseObj.statuses = (event.cookie[2], generalResponse.errorCode)
881  if generalResponse.errorCode != EventObjects.GeneralResponse.ERROR_OK or\
882  event.cookie[2] != EEResponseData.ERROR_CODE_OK:
883  generalResponseObj.errorCode = self.ERROR_DELETE_TASK_RESULTS
884  generalResponseObj.errorMessage = self.ERROR_DELETE_TASK_RESULTS_MESSAGE
885  # Prepare delete task results reply event for error case
886  deleteTaskResultsReplyEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK_RESULTS_RESPONSE,
887  generalResponseObj)
888  self.reply(event.cookie[1], deleteTaskResultsReplyEvent)
889  except Exception as err:
890  tbi = Utils.getTracebackInfo()
891  logger.error("Exception: " + str(err.message) + "\n" + tbi)
892 
893 
Here is the call graph for this function:

◆ onDeleteTaskResults()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.onDeleteTaskResults (   self,
  event 
)

Definition at line 430 of file ExecutionEnvironmentManager.py.

430  def onDeleteTaskResults(self, event):
431  try:
432  # Get event object
433  deleteTaskResultsObj = event.eventObj
434  # Send the GetTaskManagerFields request
435  self.sendGetTaskManagerFieldsRequest(deleteTaskResultsObj.id, ("onDeleteTaskResults", event))
436  except Exception as err:
437  tbi = Utils.getTracebackInfo()
438  logger.error("Exception: " + str(err.message) + "\n" + tbi)
439 
440 
441 
Here is the call graph for this function:

◆ onExecuteTask()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.onExecuteTask (   self,
  event 
)

Definition at line 149 of file ExecutionEnvironmentManager.py.

149  def onExecuteTask(self, event):
150  try:
151  # Get task Id from event
152  executeTasks = event.eventObj
153  # Request TaskManagerData to get task's data
154  fetchTaskData = EventObjects.FetchTaskData(executeTasks.id)
155  fetchTaskDataEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.FETCH_TASK_DATA, fetchTaskData)
156  # Send request FetchTaskData to TasksManager
157  self.send(self.clientTasksDataManagerName, fetchTaskDataEvent)
158  logger.info("Sent request FetchTaskData to TasksDataManager, id=" + str(executeTasks.id))
159  except Exception as err:
160  logger.error("Exception: " + str(err.message) + "\n" + Utils.getTracebackInfo())
161 
162 
163 
Here is the call graph for this function:

◆ onFetchTaskDataResponse()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.onFetchTaskDataResponse (   self,
  event 
)

Definition at line 167 of file ExecutionEnvironmentManager.py.

167  def onFetchTaskDataResponse(self, event):
168  try:
169  # Get task Id from event
170  obj = event.eventObj
171  # Get unknown object type from event
172  if type(obj) == EventObjects.NewTask:
173  logger.info("New task processing started, id=" + str(obj.id))
174  # Process NewTask action
175  self.processNewTask(obj)
176  logger.info("New task processing finished, id=" + str(obj.id))
177  else:
178  if type(obj) == EventObjects.DeleteTask:
179  logger.info("Delete task processing started, id=" + str(obj.id))
180  # Process DeleteTask
181  # Make GetTaskManagerFields request
182  self.sendGetTaskManagerFieldsRequest(obj.deleteTaskId, ("onFetchTaskDataResponse", obj))
183  logger.debug("GetTaskManagerFields request sent, id=" + str(obj.id))
184  else:
185  logger.error("Wrong type received from TasksDataManager!")
186  logger.error(LogFormatterEvent(event, [], self.ERROR_WRONG_OBJECT_TYPE + " [" + obj.__class__.__name__ + "]" +
187  " received but [" + str(EventObjects.NewTask) + "] or [" +
188  str(EventObjects.DeleteTask) + "] expected!"))
189  except Exception as err:
190  tbi = Utils.getTracebackInfo()
191  logger.error("Exception: " + str(err.message) + "\n" + tbi)
192 
193 
194 
Here is the call graph for this function:

◆ onFetchTaskResults()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.onFetchTaskResults (   self,
  event 
)

Definition at line 415 of file ExecutionEnvironmentManager.py.

415  def onFetchTaskResults(self, event):
416  try:
417  # Get event object
418  fetchTaskResultsObj = event.eventObj
419  # Send the GetTaskManagerFields request
420  self.sendGetTaskManagerFieldsRequest(fetchTaskResultsObj.id, ("onFetchTaskResults", event))
421  except Exception as err:
422  tbi = Utils.getTracebackInfo()
423  logger.error("Exception: " + str(err.message) + "\n" + tbi)
424 
425 
426 
Here is the call graph for this function:

◆ onGetTaskManagerFieldsResponse()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.onGetTaskManagerFieldsResponse (   self,
  event 
)

Definition at line 814 of file ExecutionEnvironmentManager.py.

814  def onGetTaskManagerFieldsResponse(self, event):
815  try:
816  if event.cookie is not None and event.cookie[0] == "onFetchTaskResults":
817  # Continue the onFetchTaskResults handling
818  self.processFetchTaskResults(event)
819 
820  if event.cookie is not None and event.cookie[0] == "onCheckTaskState":
821  # Continue the onCheckTaskState handling
822  self.processCheckTaskState(event)
823 
824  if event.cookie is not None and event.cookie[0] == "onFetchTaskDataResponse":
825  # Continue DeleteTask
826  self.processDeleteTask(event)
827 
828  if event.cookie is not None and event.cookie[0] == "onDeleteTaskResults":
829  # Continue DeleteTask
830  self.processDeleteTaskResults(event)
831  except Exception as err:
832  tbi = Utils.getTracebackInfo()
833  logger.error("Exception: " + str(err.message) + "\n" + tbi)
834 
835 
836 
Here is the call graph for this function:

◆ onInsertEEDataResponse()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.onInsertEEDataResponse (   self,
  event 
)

Definition at line 840 of file ExecutionEnvironmentManager.py.

840  def onInsertEEDataResponse(self, event):
841  try:
842  # Get task Id from event
843  generalResponse = event.eventObj
844  if generalResponse.errorCode != EventObjects.GeneralResponse.ERROR_OK:
845  logger.error(LogFormatterEvent(event, [], self.ERROR_INSERT_EE_DATA))
846  except Exception as err:
847  tbi = Utils.getTracebackInfo()
848  logger.error("Exception: " + str(err.message) + "\n" + tbi)
849 
850 
851 

◆ onUpdateTasksFieldsResponse()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.onUpdateTasksFieldsResponse (   self,
  event 
)

Definition at line 855 of file ExecutionEnvironmentManager.py.

855  def onUpdateTasksFieldsResponse(self, event):
856  try:
857  # Get task Id from event
858  generalResponse = event.eventObj
859  if generalResponse.errorCode != EventObjects.GeneralResponse.ERROR_OK:
860  logger.error(LogFormatterEvent(event, [], self.ERROR_UPDATE_TASKS_FIELDS))
861  except Exception as err:
862  tbi = Utils.getTracebackInfo()
863  logger.error("Exception: " + str(err.message) + "\n" + tbi)
864 
865 
866 

◆ processCheckTaskState()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.processCheckTaskState (   self,
  event 
)

Definition at line 549 of file ExecutionEnvironmentManager.py.

549  def processCheckTaskState(self, event):
550  # Get event object
551  taskManagerFields = event.eventObj
552 
553  # Check is task found
554  if len(taskManagerFields.fields) > 0 and taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] is not None and\
555  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] != "" and\
556  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] is not None and\
557  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] != "":
558 
559  # Check task state
560  checkTaskStateObj = event.cookie[1].eventObj
561  # Prepare the messageBodyJson for DRCE request
562  taskCheckRequest = TaskCheckRequest(checkTaskStateObj.id, checkTaskStateObj.type)
563  messageBodyJson = self.drceCommandConvertor.to_json(taskCheckRequest)
564  # Make HCE node admin request
565  logger.debug("Make HCE node admin request!")
566  logger.debug("(line 566) Call sendToHCENodeAdmin() use Host: '%s' and Port '%s'",
567  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]),
568  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]))
569 
570  rawResponse = self.sendToHCENodeAdmin(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST],
571  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT],
572  messageBodyJson)
573  eeResponseData = self.convertToEEResponse(self.convertToTaskResponse(rawResponse))
574  if eeResponseData.errorCode == EEResponseData.ERROR_CODE_OK:
575  if eeResponseData.nodeHost == "" or eeResponseData.nodePort == "":
576  logger.error(str(vars(eeResponseData)))
577  logger.debug("Received Host or Port is empty!")
578  # Update TaskFields in the TasksManager
579  logger.debug("Update TaskFields in the TasksManager!")
580  self.processUpdateTaskFields(self.OPERATION_CHECK_STATE, eeResponseData)
581  else:
582  eeResponseData = EEResponseData(taskManagerFields.id)
583  eeResponseData.state = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
584  eeResponseData.errorCode = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
585  eeResponseData.errorMessage = EEResponseData.ERROR_MESSAGE_TASK_NOT_FOUND
586  eeResponseData.exitStatus = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
587  logger.error("Empty mandatory fields received from TasksManager:\n%s", Utils.varDump(taskManagerFields))
588  # Prepare check task reply event
589  checkTaskStateReplyEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE_RESPONSE,
590  eeResponseData)
591  self.reply(event.cookie[1], checkTaskStateReplyEvent)
592  logger.debug("Check task reply event sent!")
593 
594 
595 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processDeleteTask()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.processDeleteTask (   self,
  event 
)

Definition at line 638 of file ExecutionEnvironmentManager.py.

638  def processDeleteTask(self, event):
639  # Get event object
640  taskManagerFields = event.eventObj
641  deleteTaskObj = event.cookie[1]
642 
643  # Check is task found
644  if len(taskManagerFields.fields) > 0:
645  eeResponseData = None
646  deletedTasksState = taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] if \
647  DTM_CONSTS.DRCE_FIELDS.STATE in taskManagerFields.fields else None
648  # Check task data
649  if taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] is not None and\
650  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] != "" and\
651  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] is not None and\
652  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] != "":
653  if self.checkDelTaskState(deletedTasksState, deleteTaskObj.action):
654  taskDeleteRequest = self.createTaskDeleteRequest(deleteTaskObj)
655  messageBodyJson = self.drceCommandConvertor.to_json(taskDeleteRequest)
656  logger.debug("Send TaskDeleteRequest to HCE node Admin API, taskId=" + str(deleteTaskObj.deleteTaskId))
657  logger.debug("(line 657) Call sendToHCENodeAdmin() use Host: '%s' and Port '%s'",
658  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]),
659  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]))
660 
661  rawResponse = self.sendToHCENodeAdmin(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST],
662  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT],
663  messageBodyJson)
664  logger.debug("TaskDeleteequest rawResponse=[" + rawResponse + "]")
665  if rawResponse != "":
666  eeResponseData = self.convertToEEResponse(self.convertToTaskResponse(rawResponse))
667  logger.debug("TaskDeleteRequest response taskId=" + str(deleteTaskObj.deleteTaskId) + \
668  ", state:" + str(eeResponseData.state) + \
669  ", exitStatus:" + str(eeResponseData.exitStatus) + \
670  ", stdout: " + eeResponseData.stdout + ", stderr:" + eeResponseData.stderr)
671  eeResponseData.id = deleteTaskObj.id
672  #!!!Replace the unsupported states like UNDEFINED or NOTFOUND to push to remove this tasks from TasksManager
673  if eeResponseData.state != EEResponseData.TASK_STATE_IN_PROGRESS and\
674  eeResponseData.state != EEResponseData.TASK_STATE_NEW:
675  logger.debug("Task state was substituted from " + str(eeResponseData.state) + " to " + \
676  str(EEResponseData.TASK_STATE_DELETED) + ", taskId=" + str(deleteTaskObj.deleteTaskId))
677  # eeResponseData.state = EEResponseData.TASK_STATE_DELETED
678  # Substitute state for task to delete and deleted task
679  eeResponseData.nodeName = ""
680  eeResponseData.nodeHost = ""
681  eeResponseData.nodePort = 0
682  eeResponseData.state = EEResponseData.TASK_STATE_DELETED
683  if eeResponseData.errorCode > 0:
684  logger.debug("TaskDelete request response error:" + str(eeResponseData.errorCode) + " : " + \
685  eeResponseData.errorMessage)
686  eeResponseData.state = EEResponseData.TASK_STATE_SET_ERROR
687  eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
688  eeResponseData.deleteTaskState = None
689  else:
690  eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
691  eeResponseData.deleteTaskState = eeResponseData.state
692  else:
693  # Set state as TERMINATED to push the TasksManager to delete task's data on DTM, save the dat file in DRCE node
694  eeResponseData = EEResponseData(deleteTaskObj.id)
695  eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
696  eeResponseData.deleteTaskState = EEResponseData.TASK_STATE_DELETED
697  eeResponseData.state = EEResponseData.TASK_STATE_DELETED
698  logger.error("TaskDeleteRequest response fault, taskId=" + str(deleteTaskObj.deleteTaskId) + "!")
699  else:
700  eeResponseData = EEResponseData(deleteTaskObj.id)
701  eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
702  if deleteTaskObj.action == EventObjects.DeleteTask.ACTION_DELETE_ON_DTM:
703  eeResponseData.state = EEResponseData.TASK_STATE_DELETED
704  eeResponseData.deleteTaskState = EEResponseData.TASK_STATE_DELETED
705  logger.debug("Deleted task only on DTM")
706 # eeResponseData.deleteTaskState = deletedTasksState
707  else:
708  eeResponseData.state = EEResponseData.TASK_STATE_SET_ERROR
709  logger.debug("Deleted task " + str(eeResponseData.deleteTaskId) + " has bad[Not deleted state] " +
710  str(deletedTasksState) + " delete error")
711  else:
712  # Set state as TERMINATED to push the TasksManager to delete task's data on DTM, save the dat file in DRCE node
713  msg = "Host or Port is empty! Set state as TERMINATED to push delete task, deleteTaskObj.id=" + \
714  str(deleteTaskObj.id) + ", deleteTaskObj.deleteTaskId=" + str(deleteTaskObj.deleteTaskId)
715  eeResponseData = EEResponseData(deleteTaskObj.id)
716  eeResponseData.errorCode = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
717  eeResponseData.errorMessage = msg
718  eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
719  eeResponseData.deleteTaskState = EEResponseData.TASK_STATE_TERMINATED
720  eeResponseData.state = EEResponseData.TASK_STATE_TERMINATED
721  logger.error(msg)
722 
723  if eeResponseData is not None:
724  # Update TaskFields in the TasksManager
725  self.processUpdateTaskFields(self.OPERATION_DELETE_TASK, eeResponseData)
726  logger.debug("Update TaskFields in the TasksManager!")
727  else:
728  logger.error("Empty fields received from TasksManager for DeleteTask!")
729 
730 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processDeleteTaskResults()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.processDeleteTaskResults (   self,
  event 
)

Definition at line 742 of file ExecutionEnvironmentManager.py.

742  def processDeleteTaskResults(self, event):
743  # Get event object
744  taskManagerFields = event.eventObj
745  # New General response event object for error case
746  generalResponseObj = None
747  hceNodeAdminRequestState = EEResponseData.ERROR_CODE_OK
748 
749  # Check is task found
750  if len(taskManagerFields.fields) > 0:
751  # Check task data
752  if taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_FINISHED or\
753  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_CRASHED or\
754  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_TERMINATED or\
755  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL or\
756  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_DELETED or\
757  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_UNDEFINED or\
758  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_NOT_FOUND:
759  deleteTaskResultsObj = event.cookie[1].eventObj
760  # Prepare the messageBodyJson for DRCE request
761  logger.info("Send TaskGetDataRequest with DELETE data, taskId=" + str(deleteTaskResultsObj.id))
762  # TODO: Use GetDataRequest to delete results, need to be replaced with native command later
763  taskDeleteRequest = TaskDeleteRequest(deleteTaskResultsObj.id)
764  messageBodyJson = self.drceCommandConvertor.to_json(taskDeleteRequest)
765 
766  logger.debug("(line 766) Call sendToHCENodeAdmin() use Host: '%s' and Port '%s'",
767  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]),
768  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]))
769 
770  rawResponse = self.sendToHCENodeAdmin(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST],
771  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT],
772  messageBodyJson)
773  logger.info("TaskDeleteResultsRequest rawResponse=[" + rawResponse + "]")
774  if rawResponse != "":
775  eeResponseData = self.convertToEEResponse(self.convertToTaskResponse(rawResponse))
776  if eeResponseData.errorCode == EEResponseData.ERROR_CODE_OK:
777  logger.info("Response received from EE")
778  else:
779  generalResponseObj = self.createGeneralResponse(EventObjects.DeleteTaskResults.DRCE_ERROR,
780  EventObjects.DeleteTaskResults.DRCE_ERROR_MESSAGE,
781  "DRCE error error=" + str(eeResponseData.errorCode))
782  else:
783  generalResponseObj = self.createGeneralResponse(EventObjects.DeleteTaskResults.EMPRY_RAW_ERROR,
784  EventObjects.DeleteTaskResults.EMPRY_RAW_ERROR_MESSAGE, "Empty rawResponse")
785  else:
786  # Return state, requested operation can't to be done on DRCE cause task is not in proper state
787  generalResponseObj = self.createGeneralResponse(EventObjects.DeleteTaskResults.TASK_STATE_ERROR,
788  EventObjects.DeleteTaskResults.TASK_STATE_ERROR_MESSAGE,
789  "Wrong task state " + str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE]) +
790  ", can't cleanup!")
791  else:
792  generalResponseObj = self.createGeneralResponse(EventObjects.DeleteTaskResults.TASK_NOT_FOUND_ERROR,
793  EventObjects.DeleteTaskResults.TASK_NOT_FOUND_ERROR_MESSAGE,
794  "Empty fields from TasksManager for DeleteTaskResults, possible task not found!")
795  # Send DELETE_EE_DATA request to the TasksDataManager
796  if generalResponseObj == None:
797  logger.debug("Send DELETE_EE_DATA request to the TasksDataManager!")
798  deleteEEResponseDataObj = EventObjects.DeleteEEResponseData(taskManagerFields.id)
799  deleteEEResponseDataEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_EE_DATA, deleteEEResponseDataObj)
800  deleteEEResponseDataEvent.cookie = ("processDeleteTaskResults", event.cookie[1], hceNodeAdminRequestState)
801  self.send(self.clientTasksDataManagerName, deleteEEResponseDataEvent)
802  logger.debug("Sent request DeleteEEResponseDataObj to TasksDataManager!")
803  else:
804  deleteTaskResultsReplyEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK_RESULTS_RESPONSE,
805  generalResponseObj)
806  self.reply(event.cookie[1], deleteTaskResultsReplyEvent)
807  logger.info("Send response GeneralResponse to ClientInterfaceService!")
808 
809 
810 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processFetchTaskResults()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.processFetchTaskResults (   self,
  event 
)

Definition at line 491 of file ExecutionEnvironmentManager.py.

491  def processFetchTaskResults(self, event):
492  # Get event object
493  taskManagerFields = event.eventObj
494 
495  # Check is task found
496  if len(taskManagerFields.fields) > 0:
497  # Check task state
498  if taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_FINISHED or\
499  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_CRASHED or\
500  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_TERMINATED or\
501  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL or\
502  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_NEW:
503  fetchTaskResultsObj = event.cookie[1].eventObj
504  # Prepare the messageBodyJson for DRCE request
505  taskGetDataRequest = TaskGetDataRequest(fetchTaskResultsObj.id, fetchTaskResultsObj.type)
506  messageBodyJson = self.drceCommandConvertor.to_json(taskGetDataRequest)
507 
508  logger.debug("(line 508) Call sendToHCENodeAdmin() use Host: '%s' and Port '%s'",
509  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]),
510  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]))
511 
512  rawResponse = self.sendToHCENodeAdmin(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST],
513  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT],
514  messageBodyJson)
515  eeResponseData = self.convertToEEResponse(self.convertToTaskResponse(rawResponse))
516  logger.debug("Response received from EE for onFetchTaskResults!")
517  if eeResponseData.errorCode == EEResponseData.ERROR_CODE_OK and \
518  EEResponseData.TASK_STATE_FINISHED:
519  # Update EEResponseDtata object in the TasksDataManager container
520  logger.debug("Update EEResponseDtata object in the TasksDataManager container sent!")
521  insertEEDataEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.INSERT_EE_DATA, eeResponseData)
522  self.send(self.clientTasksDataManagerName, insertEEDataEvent)
523  # Update TaskFields in the TasksManager
524  logger.debug("Update TaskFields in the TasksManager!")
525  self.processUpdateTaskFields(self.OPERATION_FETCH_RESULTS, eeResponseData)
526  else:
527  # Return state, requested operation can't to be done on DRCE cause task is not in proper state
528  eeResponseData = EEResponseData(taskManagerFields.id)
529  eeResponseData.state = taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE]
530  logger.debug("Wrong task state " + str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE]) +
531  " for FetchTaskResults operation!")
532  else:
533  eeResponseData = EEResponseData(0)
534  eeResponseData.state = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
535  eeResponseData.errorCode = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
536  eeResponseData.errorMessage = EEResponseData.ERROR_MESSAGE_TASK_NOT_FOUND
537  eeResponseData.exitStatus = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
538  logger.error("Empty mandatory fields received from TasksManager:\n%s", Utils.varDump(taskManagerFields))
539  # Prepare check task reply event
540  fetchTaskResultsReplyEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.FETCH_TASK_RESULTS_RESPONSE,
541  eeResponseData)
542  self.reply(event.cookie[1], fetchTaskResultsReplyEvent)
543 
544 
545 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processNewTask()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.processNewTask (   self,
  newTaskObj 
)

Definition at line 272 of file ExecutionEnvironmentManager.py.

272  def processNewTask(self, newTaskObj):
273  # Prepare DRCE request object
274  taskExecuteStruct = TaskExecuteStruct()
275  taskExecuteStruct.command = newTaskObj.command
276  taskExecuteStruct.files = newTaskObj.files
277  taskExecuteStruct.input = newTaskObj.input
278  # Set session
279  taskExecuteStruct.session = Session(newTaskObj.session["tmode"], newTaskObj.session["type"],
280  newTaskObj.session["time_max"])
281  taskExecuteStruct.session.password = newTaskObj.session["password"]
282  taskExecuteStruct.session.port = newTaskObj.session[DTM_CONSTS.DRCE_FIELDS.PORT]
283  taskExecuteStruct.session.shell = newTaskObj.session["shell"]
284  taskExecuteStruct.session.timeout = newTaskObj.session["timeout"]
285  taskExecuteStruct.session.user = newTaskObj.session["user"]
286  taskExecuteStruct.session.home_directory = newTaskObj.session["home_directory"]
287  taskExecuteStruct.session.environment = newTaskObj.session["environment"]
288  # Set limits
289  taskExecuteStruct.limits = newTaskObj.limits
290  # Create DRCE TaskExecuteRequest object
291  taskExecuteRequest = TaskExecuteRequest(newTaskObj.id)
292  # Set taskExecuteRequest fields
293  taskExecuteRequest.data = taskExecuteStruct
294  # If session has route field - set custom route
295  if "route" in newTaskObj.session and newTaskObj.session["route"] is not None and newTaskObj.session["route"] != "":
296  taskExecuteRequest.route = newTaskObj.session["route"]
297  if "task_type" in newTaskObj.session and newTaskObj.session["task_type"] is not None and\
298  newTaskObj.session["task_type"] != "":
299  taskExecuteRequest.task_type = newTaskObj.session["task_type"]
300  logger.debug("Sending task to DRCE router, id=" + str(newTaskObj.id) + ", route:" + str(taskExecuteRequest.route))
301  # Send request to DRCE Cluster router
302  response = self.sendToDRCERouter(taskExecuteRequest)
303  logger.debug("Received from DRCE router, id=" + str(newTaskObj.id))
304  # Convert response to EEResponse object
305  eeResponseData = self.convertToEEResponse(response)
306  logger.debug("Response body:\n" + Utils.varDump(response))
307  logger.debug("eeResponseData object:\n" + Utils.varDump(eeResponseData))
308  # Update task Id to set proper Id in case of request timed out and no task Id in response
309  if eeResponseData.id == newTaskObj.id:
310  # eeResponseData.id = newTaskObj.id
311  # Update TaskFields in the TasksManager
312  logger.debug("Process update TaskFields in the TasksManager, id=" + str(newTaskObj.id))
313  self.processUpdateTaskFields(self.OPERATION_NEW_TASK, eeResponseData)
314  else:
315  logger.error("Wrong task Id= " + str(eeResponseData.id) + " returned from DRCE, expected id=" + \
316  str(newTaskObj.id) + ". TasksManager's fields not updated, task state not changed. Response:\n" +
317  Utils.varDump(eeResponseData))
318 
319 
320 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processUpdateTaskFields()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.processUpdateTaskFields (   self,
  operationType,
  eeResponseData,
  cookie = None 
)

Definition at line 201 of file ExecutionEnvironmentManager.py.

201  def processUpdateTaskFields(self, operationType, eeResponseData, cookie=None):
202  logger.debug("eeResponseData:" + str(vars(eeResponseData)))
203  # Update task's fields on TasksManager
204  updateTaskFields = EventObjects.UpdateTaskFields(eeResponseData.id)
205  # If no errors in response from EE
206  if eeResponseData.errorCode == EEResponseData.ERROR_CODE_OK:
207  logger.debug("EE returned OK result!")
208  # Fill fields to update
209  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = eeResponseData.state
210  updateTaskFields.fields["pId"] = eeResponseData.pId
211  updateTaskFields.fields["nodeName"] = eeResponseData.nodeName
212  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] = eeResponseData.nodeHost
213  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] = eeResponseData.nodePort
214  if DTM_CONSTS.DRCE_FIELDS.URRAM in eeResponseData.fields:
215  updateTaskFields.fields["uRRAM"] = eeResponseData.fields[DTM_CONSTS.DRCE_FIELDS.URRAM]
216  if DTM_CONSTS.DRCE_FIELDS.UVRAM in eeResponseData.fields:
217  updateTaskFields.fields["uVRAM"] = eeResponseData.fields[DTM_CONSTS.DRCE_FIELDS.UVRAM]
218  if DTM_CONSTS.DRCE_FIELDS.UCPU in eeResponseData.fields:
219  updateTaskFields.fields["uCPU"] = eeResponseData.fields[DTM_CONSTS.DRCE_FIELDS.UCPU]
220  if DTM_CONSTS.DRCE_FIELDS.UTHREADS in eeResponseData.fields:
221  updateTaskFields.fields["uThreads"] = eeResponseData.fields[DTM_CONSTS.DRCE_FIELDS.UTHREADS]
222  else:
223  logger.error("EE error returned!")
224  if operationType == self.OPERATION_NEW_TASK:
225  # For new task operation
226  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = EEResponseData.TASK_STATE_SET_ERROR
227  updateTaskFields.fields["sDate"] = datetime.now()
228  if eeResponseData.type == EEResponseData.REQUEST_TYPE_SET:
229  updateTaskFields.fields["rDate"] = updateTaskFields.fields["sDate"]
230  else:
231  # if hasattr(eeResponseData, "deleteTaskId"):
232  if operationType == self.OPERATION_DELETE_TASK:
233  # For DeleteTask action fake task Id
234  updateTaskFields.fields["rDate"] = datetime.now()
235  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = EEResponseData.TASK_STATE_TERMINATED
236  else:
237  # For all another operations
238  if hasattr(eeResponseData, DTM_CONSTS.DRCE_FIELDS.STATE):
239  # If EE returned state - set this state to update
240  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = eeResponseData.state
241  else:
242  # If EE error, timeout or another kind and no state returned
243  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = EEResponseData.TASK_STATE_UNDEFINED
244  logger.debug("No state field update, task Id: " + str(eeResponseData.id) + ", treated as UNDEFINED!")
245 
246  if operationType == self.OPERATION_DELETE_TASK:
247  if hasattr(eeResponseData, "deleteTaskId"):
248  updateTaskFields.fields["deleteTaskId"] = eeResponseData.deleteTaskId
249  # If EE has not found task - push it to delete on TasksManager
250  # if hasattr(eeResponseData, "deleteTaskState"):
251  # if eeResponseData.deleteTaskState == EEResponseData.TASK_STATE_NOT_FOUND:
252  # updateTaskFields.fields["deleteTaskState"] = EEResponseData.TASK_STATE_TERMINATED
253  # else:
254  # updateTaskFields.fields["deleteTaskState"] = eeResponseData.deleteTaskState
255  if hasattr(eeResponseData, "deleteTaskState"):
256  updateTaskFields.fields["deleteTaskState"] = EEResponseData.TASK_STATE_TERMINATED
257 
258  logger.debug("Fields to update:\n" + Utils.varDump(updateTaskFields))
259  # Create update event
260  updateTaskFieldsEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS, updateTaskFields)
261  if cookie is not None:
262  updateTaskFieldsEvent.cookie = cookie
263  # Send update event to TasksManager
264  self.send(self.clientTasksManagerName, updateTaskFieldsEvent)
265  logger.debug("Fields sent to update!")
266 
267 
268 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ sendGetTaskManagerFieldsRequest()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.sendGetTaskManagerFieldsRequest (   self,
  taskId,
  cookieData = None 
)

Definition at line 474 of file ExecutionEnvironmentManager.py.

474  def sendGetTaskManagerFieldsRequest(self, taskId, cookieData=None):
475  # Get TaskManager fields
476  # Prepare synch GetTaskFields request to the TasksManager
477  getTaskManagerFieldsObj = EventObjects.GetTaskManagerFields(taskId)
478  getTaskManagerFieldsEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.GET_TASK_FIELDS,
479  getTaskManagerFieldsObj)
480  if cookieData is not None:
481  getTaskManagerFieldsEvent.cookie = cookieData
482 
483  self.send(self.clientTasksManagerName, getTaskManagerFieldsEvent)
484  logger.debug("GetTaskManagerFields sent!")
485 
486 
487 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ sendToDRCERouter()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.sendToDRCERouter (   self,
  request 
)

Definition at line 325 of file ExecutionEnvironmentManager.py.

325  def sendToDRCERouter(self, request):
326  logger.debug("DRCE router sending request\n" + Utils.varDump(request))
327  # Try to execute request
328  try:
329  response = self.drceManager.process(request, self.drceTimeout)
330  except (ConnectionTimeout, TransportInternalErr, CommandExecutorErr) as err:
331  response = None
332  logger.error(err)
333 
334  return response
335 
336 
337 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ sendToHCENodeAdmin()

def dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.sendToHCENodeAdmin (   self,
  host,
  port,
  messageParameters 
)

Definition at line 448 of file ExecutionEnvironmentManager.py.

448  def sendToHCENodeAdmin(self, host, port, messageParameters):
449  # Execute EE node admin request
450  node = admin.Node.Node(host, port)
451  params = [messageParameters]
452  try:
453  command = admin.Command.Command(admin.Constants.COMMAND_NAMES.DRCE,
454  params,
455  admin.Constants.ADMIN_HANDLER_TYPES.DATA_PROCESSOR_DATA,
456  self.hceNodeAdminTimeout)
457  requestBody = command.generateBody()
458  message = {admin.Constants.STRING_MSGID_NAME : self.drceIdGenerator.get_uid(),
459  admin.Constants.STRING_BODY_NAME : requestBody}
460  response = self.hceNodeManagerRequest.makeRequest(node, message, self.hceNodeAdminTimeout)
461  logger.debug("Response from HCE node Admin received!")
462 
463  return response.getBody()
464  except Exception, e:
465  logger.error(self.ERROR_HCE_ADMIN_REQUEST_ERROR + " : " + str(e.message))
466  return ""
467 
468 
469 
def makeRequest(self, node, message, commandTimeout=None)
makeRequest main class method, it gets node and message params, interact with transport layer...
def get_uid(self, idType=0)
get_uid
Definition: UIDGenerator.py:31
Command class contents "commad" data and processing methods.
Definition: Command.py:16
Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ clientTasksDataManagerName

dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.clientTasksDataManagerName

Definition at line 99 of file ExecutionEnvironmentManager.py.

◆ clientTasksManagerName

dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.clientTasksManagerName

Definition at line 98 of file ExecutionEnvironmentManager.py.

◆ CONFIG_DRCE_HOST

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.CONFIG_DRCE_HOST = "DRCEHost"
static

Definition at line 58 of file ExecutionEnvironmentManager.py.

◆ CONFIG_DRCE_PORT

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.CONFIG_DRCE_PORT = "DRCEPort"
static

Definition at line 59 of file ExecutionEnvironmentManager.py.

◆ CONFIG_DRCE_TIMEOUT

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.CONFIG_DRCE_TIMEOUT = "DRCETimeout"
static

Definition at line 60 of file ExecutionEnvironmentManager.py.

◆ CONFIG_HCE_NODE_ADMIN_TIMEOUT

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.CONFIG_HCE_NODE_ADMIN_TIMEOUT = "HCENodeAdminTimeout"
static

Definition at line 61 of file ExecutionEnvironmentManager.py.

◆ CONFIG_SERVER

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.CONFIG_SERVER = "server"
static

Definition at line 55 of file ExecutionEnvironmentManager.py.

◆ CONFIG_TASKS_MANAGER_CLIENT

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.CONFIG_TASKS_MANAGER_CLIENT = "clientTasksManager"
static

Definition at line 56 of file ExecutionEnvironmentManager.py.

◆ CONFIG_TASKS_MANAGER_DATA_CLIENT

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.CONFIG_TASKS_MANAGER_DATA_CLIENT = "clientTasksDataManager"
static

Definition at line 57 of file ExecutionEnvironmentManager.py.

◆ drceCommandConvertor

dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.drceCommandConvertor

Definition at line 139 of file ExecutionEnvironmentManager.py.

◆ drceHost

dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.drceHost

Definition at line 132 of file ExecutionEnvironmentManager.py.

◆ drceIdGenerator

dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.drceIdGenerator

Definition at line 138 of file ExecutionEnvironmentManager.py.

◆ drceManager

dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.drceManager

Definition at line 136 of file ExecutionEnvironmentManager.py.

◆ drcePort

dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.drcePort

Definition at line 133 of file ExecutionEnvironmentManager.py.

◆ drceTimeout

dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.drceTimeout

Definition at line 134 of file ExecutionEnvironmentManager.py.

◆ ERROR_DELETE_TASK_RESULTS

int dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.ERROR_DELETE_TASK_RESULTS = 1
static

Definition at line 73 of file ExecutionEnvironmentManager.py.

◆ ERROR_DELETE_TASK_RESULTS_MESSAGE

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.ERROR_DELETE_TASK_RESULTS_MESSAGE = "Delete task results error of EE request response or TaskManager!"
static

Definition at line 74 of file ExecutionEnvironmentManager.py.

◆ ERROR_EE_RESPONSE_OBJECT_TYPE_OR_RESPONSE_ERROR

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.ERROR_EE_RESPONSE_OBJECT_TYPE_OR_RESPONSE_ERROR = "EEResponseData object error or wrong response structure"
static

Definition at line 70 of file ExecutionEnvironmentManager.py.

◆ ERROR_HCE_ADMIN_REQUEST_ERROR

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.ERROR_HCE_ADMIN_REQUEST_ERROR = "HCE Admin request error"
static

Definition at line 71 of file ExecutionEnvironmentManager.py.

◆ ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION = "HCE node Admin API response processing exception"
static

Definition at line 65 of file ExecutionEnvironmentManager.py.

◆ ERROR_HCE_RESPONSE_PROCESSING_SPLIT

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.ERROR_HCE_RESPONSE_PROCESSING_SPLIT = "HCE node Admin API response processing can't to split status code"
static

Definition at line 66 of file ExecutionEnvironmentManager.py.

◆ ERROR_INSERT_EE_DATA

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.ERROR_INSERT_EE_DATA = "Error insert EE response data operation"
static

Definition at line 67 of file ExecutionEnvironmentManager.py.

◆ ERROR_MSG_DRCE_ROUTER_NEW_TASK

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.ERROR_MSG_DRCE_ROUTER_NEW_TASK = "DRCE Router request error!"
static

Definition at line 64 of file ExecutionEnvironmentManager.py.

◆ ERROR_UPDATE_TASKS_FIELDS

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.ERROR_UPDATE_TASKS_FIELDS = "Update tasks fields error"
static

Definition at line 68 of file ExecutionEnvironmentManager.py.

◆ ERROR_WRONG_OBJECT_TYPE

string dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.ERROR_WRONG_OBJECT_TYPE = "Wrong object type from TasksDataManager"
static

Definition at line 69 of file ExecutionEnvironmentManager.py.

◆ hceNodeAdminTimeout

dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.hceNodeAdminTimeout

Definition at line 142 of file ExecutionEnvironmentManager.py.

◆ hceNodeManagerRequest

dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.hceNodeManagerRequest

Definition at line 143 of file ExecutionEnvironmentManager.py.

◆ OPERATION_CHECK_STATE

int dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.OPERATION_CHECK_STATE = 2
static

Definition at line 78 of file ExecutionEnvironmentManager.py.

◆ OPERATION_DELETE_TASK

int dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.OPERATION_DELETE_TASK = 1
static

Definition at line 77 of file ExecutionEnvironmentManager.py.

◆ OPERATION_FETCH_RESULTS

int dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.OPERATION_FETCH_RESULTS = 3
static

Definition at line 79 of file ExecutionEnvironmentManager.py.

◆ OPERATION_NEW_TASK

int dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.OPERATION_NEW_TASK = 0
static

Definition at line 76 of file ExecutionEnvironmentManager.py.

◆ serverName

dtm.ExecutionEnvironmentManager.ExecutionEnvironmentManager.serverName

Definition at line 97 of file ExecutionEnvironmentManager.py.


The documentation for this class was generated from the following file: