HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ExecutionEnvironmentManager.py
Go to the documentation of this file.
1 '''
2 HCE project, Python bindings, Distributed Tasks Manager application.
3 ExecutionEnvironmentManager object and related classes definitions.
4 
5 @package: dtm
6 @author bgv bgv.hce@gmail.com
7 @link: http://hierarchical-cluster-engine.com/
8 @copyright: Copyright © 2013-2014 IOIX Ukraine
9 @license: http://hierarchical-cluster-engine.com/license/
10 @since: 0.1
11 '''
12 
13 
14 from datetime import datetime
15 import logging
16 
18 from app.BaseServerManager import BaseServerManager
19 from app.LogFormatter import LogFormatterEvent
20 from drce.CommandConvertor import CommandConvertor
21 from drce.Commands import Session
22 from drce.Commands import TaskCheckRequest
23 from drce.Commands import TaskExecuteRequest
24 from drce.Commands import TaskExecuteStruct
25 from drce.Commands import TaskGetDataRequest
26 from drce.Commands import TaskTerminateRequest
27 from drce.Commands import TaskDeleteRequest
28 from drce.DRCEManager import ConnectionTimeout, TransportInternalErr, CommandExecutorErr
29 from drce.DRCEManager import DRCEManager
30 from drce.DRCEManager import HostParams
31 from dtm import EventObjects
32 from transport.ConnectionBuilderLight import ConnectionBuilderLight
33 from transport.UIDGenerator import UIDGenerator
34 import app.Utils as Utils # pylint: disable=F0401
35 from EventObjects import EEResponseData
36 import dtm.Constants as DTM_CONSTS
37 import transport.Consts as TRANSPORT_CONSTS
38 import drce.Consts as DRCE_CONSTS
39 
40 
41 # Logger initialization
42 # logger = logging.getLogger(__name__)
43 logger = logging.getLogger(DTM_CONSTS.LOGGER_NAME)
44 
45 
46 # #The ExecutionEnvironmentManager class, is a main interface of DTM application and Execution Environment.
47 #
48 # This object is a main interface of DTM application and Execution Environment that operates with Task units and executes
49 # requests to the DRCE Cluster and HCE node using both API levels. It supports complete DRCE protocol requests and
50 # process responses from DRCE cluster. Also, updates task's related data in the tasks data storage containers and
51 # TasksManager runtime data.
53 
54  # Configuration settings options names
55  CONFIG_SERVER = "server"
56  CONFIG_TASKS_MANAGER_CLIENT = "clientTasksManager"
57  CONFIG_TASKS_MANAGER_DATA_CLIENT = "clientTasksDataManager"
58  CONFIG_DRCE_HOST = "DRCEHost"
59  CONFIG_DRCE_PORT = "DRCEPort"
60  CONFIG_DRCE_TIMEOUT = "DRCETimeout"
61  CONFIG_HCE_NODE_ADMIN_TIMEOUT = "HCENodeAdminTimeout"
62 
63 
64  ERROR_MSG_DRCE_ROUTER_NEW_TASK = "DRCE Router request error!"
65  ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION = "HCE node Admin API response processing exception"
66  ERROR_HCE_RESPONSE_PROCESSING_SPLIT = "HCE node Admin API response processing can't to split status code"
67  ERROR_INSERT_EE_DATA = "Error insert EE response data operation"
68  ERROR_UPDATE_TASKS_FIELDS = "Update tasks fields error"
69  ERROR_WRONG_OBJECT_TYPE = "Wrong object type from TasksDataManager"
70  ERROR_EE_RESPONSE_OBJECT_TYPE_OR_RESPONSE_ERROR = "EEResponseData object error or wrong response structure"
71  ERROR_HCE_ADMIN_REQUEST_ERROR = "HCE Admin request error"
72 
73  ERROR_DELETE_TASK_RESULTS = 1
74  ERROR_DELETE_TASK_RESULTS_MESSAGE = "Delete task results error of EE request response or TaskManager!"
75 
76  OPERATION_NEW_TASK = 0
77  OPERATION_DELETE_TASK = 1
78  OPERATION_CHECK_STATE = 2
79  OPERATION_FETCH_RESULTS = 3
80 
81  # #constructor
82  # initialize fields
83  #
84  # @param configParser config parser object
85  # @param connectBuilderLight connection builder light
86  #
87  def __init__(self, configParser, connectionBuilderLight=None):
88  super(ExecutionEnvironmentManager, self).__init__()
89 
90  # Instantiate the connection builder light if not set
91  if connectionBuilderLight == None:
92  connectionBuilderLight = ConnectionBuilderLight()
93 
94  className = self.__class__.__name__
95 
96  # Get configuration settings
97  self.serverName = configParser.get(className, self.CONFIG_SERVER)
98  self.clientTasksManagerName = configParser.get(className, self.CONFIG_TASKS_MANAGER_CLIENT)
99  self.clientTasksDataManagerName = configParser.get(className, self.CONFIG_TASKS_MANAGER_DATA_CLIENT)
100 
101  # Create connections and raise bind or connect actions for correspondent connection type
102  serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.serverName)
103  tasksManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT, self.clientTasksManagerName)
104  tasksDataManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
106  # Add connections to the polling set
107  self.addConnection(self.serverName, serverConnection)
108  self.addConnection(self.clientTasksManagerName, tasksManagerConnection)
109  self.addConnection(self.clientTasksDataManagerName, tasksDataManagerConnection)
110 
111  # Set handlers
112  # Set event handler for EXECUTE_TASK event
113  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.EXECUTE_TASK, self.onExecuteTask)
114  # Set event handler for CHECK_TASK_STATE event
115  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE, self.onCheckTaskState)
116  # Set event handler for FETCH_TASK_RESULTS event
117  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.FETCH_TASK_RESULTS, self.onFetchTaskResults)
118  # Set event handler for FETCH_TASK_DATA_RESPONSE event
119  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.FETCH_TASK_DATA_RESPONSE, self.onFetchTaskDataResponse)
120  # Set event handler for GET_TASK_FIELDS_RESPONSE event
121  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.GET_TASK_FIELDS_RESPONSE, self.onGetTaskManagerFieldsResponse)
122  # Set event handler for INSERT_EE_DATA_RESPONSE event
123  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.INSERT_EE_DATA_RESPONSE, self.onInsertEEDataResponse)
124  # Set event handler for UPDATE_TASK_FIELDS_RESPONSE event
125  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS_RESPONSE, self.onUpdateTasksFieldsResponse)
126  # Set event handler for DELETE_TASK_RESULTS event
127  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.DELETE_TASK_RESULTS, self.onDeleteTaskResults)
128  # Set event handler for DELETE_EE_DATA_RESPONSE event
129  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.DELETE_EE_DATA_RESPONSE, self.onDeleteEEDataResponse)
130 
131  # Initialize DRCE API
132  self.drceHost = configParser.get(className, self.CONFIG_DRCE_HOST)
133  self.drcePort = configParser.get(className, self.CONFIG_DRCE_PORT)
134  self.drceTimeout = configParser.getint(className, self.CONFIG_DRCE_TIMEOUT)
135  hostParams = HostParams(self.drceHost, self.drcePort)
137  self.drceManager.activate_host(hostParams)
140 
141  # Initialize HCE node Admin API
142  self.hceNodeAdminTimeout = configParser.getint(className, self.CONFIG_HCE_NODE_ADMIN_TIMEOUT)
144 
145 
146  # #ExecuteTask event handler
147  #
148  # @param event instance of Event object
149  def onExecuteTask(self, event):
150  try:
151  # Get task Id from event
152  executeTasks = event.eventObj
153  # Request TaskManagerData to get task's data
154  fetchTaskData = EventObjects.FetchTaskData(executeTasks.id)
155  fetchTaskDataEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.FETCH_TASK_DATA, fetchTaskData)
156  # Send request FetchTaskData to TasksManager
157  self.send(self.clientTasksDataManagerName, fetchTaskDataEvent)
158  logger.info("Sent request FetchTaskData to TasksDataManager, id=" + str(executeTasks.id))
159  except Exception as err:
160  logger.error("Exception: " + str(err.message) + "\n" + Utils.getTracebackInfo())
161 
162 
163 
164  # #ExecuteTask event handler
165  #
166  # @param event instance of Event object
167  def onFetchTaskDataResponse(self, event):
168  try:
169  # Get task Id from event
170  obj = event.eventObj
171  # Get unknown object type from event
172  if type(obj) == EventObjects.NewTask:
173  logger.info("New task processing started, id=" + str(obj.id))
174  # Process NewTask action
175  self.processNewTask(obj)
176  logger.info("New task processing finished, id=" + str(obj.id))
177  else:
178  if type(obj) == EventObjects.DeleteTask:
179  logger.info("Delete task processing started, id=" + str(obj.id))
180  # Process DeleteTask
181  # Make GetTaskManagerFields request
182  self.sendGetTaskManagerFieldsRequest(obj.deleteTaskId, ("onFetchTaskDataResponse", obj))
183  logger.debug("GetTaskManagerFields request sent, id=" + str(obj.id))
184  else:
185  logger.error("Wrong type received from TasksDataManager!")
186  logger.error(LogFormatterEvent(event, [], self.ERROR_WRONG_OBJECT_TYPE + " [" + obj.__class__.__name__ + "]" +
187  " received but [" + str(EventObjects.NewTask) + "] or [" +
188  str(EventObjects.DeleteTask) + "] expected!"))
189  except Exception as err:
190  tbi = Utils.getTracebackInfo()
191  logger.error("Exception: " + str(err.message) + "\n" + tbi)
192 
193 
194 
195  # #Send UpdateTasksData event to the TasksManager
196  #
197  # @param operationType is a name of action that is cause of task's fields update, can be OPERATION_NEW_TASK,
198  # OPERATION_DELETE_TASK, OPERATION_CHECK_STATE, OPERATION_FETCH_RESULTS
199  # @param eeResponseDataObj The EE response data object
200  # @param cookie is used for the event
201  def processUpdateTaskFields(self, operationType, eeResponseData, cookie=None):
202  logger.debug("eeResponseData:" + str(vars(eeResponseData)))
203  # Update task's fields on TasksManager
204  updateTaskFields = EventObjects.UpdateTaskFields(eeResponseData.id)
205  # If no errors in response from EE
206  if eeResponseData.errorCode == EEResponseData.ERROR_CODE_OK:
207  logger.debug("EE returned OK result!")
208  # Fill fields to update
209  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = eeResponseData.state
210  updateTaskFields.fields["pId"] = eeResponseData.pId
211  updateTaskFields.fields["nodeName"] = eeResponseData.nodeName
212  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] = eeResponseData.nodeHost
213  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] = eeResponseData.nodePort
214  if DTM_CONSTS.DRCE_FIELDS.URRAM in eeResponseData.fields:
215  updateTaskFields.fields["uRRAM"] = eeResponseData.fields[DTM_CONSTS.DRCE_FIELDS.URRAM]
216  if DTM_CONSTS.DRCE_FIELDS.UVRAM in eeResponseData.fields:
217  updateTaskFields.fields["uVRAM"] = eeResponseData.fields[DTM_CONSTS.DRCE_FIELDS.UVRAM]
218  if DTM_CONSTS.DRCE_FIELDS.UCPU in eeResponseData.fields:
219  updateTaskFields.fields["uCPU"] = eeResponseData.fields[DTM_CONSTS.DRCE_FIELDS.UCPU]
220  if DTM_CONSTS.DRCE_FIELDS.UTHREADS in eeResponseData.fields:
221  updateTaskFields.fields["uThreads"] = eeResponseData.fields[DTM_CONSTS.DRCE_FIELDS.UTHREADS]
222  else:
223  logger.error("EE error returned!")
224  if operationType == self.OPERATION_NEW_TASK:
225  # For new task operation
226  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = EEResponseData.TASK_STATE_SET_ERROR
227  updateTaskFields.fields["sDate"] = datetime.now()
228  if eeResponseData.type == EEResponseData.REQUEST_TYPE_SET:
229  updateTaskFields.fields["rDate"] = updateTaskFields.fields["sDate"]
230  else:
231  # if hasattr(eeResponseData, "deleteTaskId"):
232  if operationType == self.OPERATION_DELETE_TASK:
233  # For DeleteTask action fake task Id
234  updateTaskFields.fields["rDate"] = datetime.now()
235  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = EEResponseData.TASK_STATE_TERMINATED
236  else:
237  # For all another operations
238  if hasattr(eeResponseData, DTM_CONSTS.DRCE_FIELDS.STATE):
239  # If EE returned state - set this state to update
240  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = eeResponseData.state
241  else:
242  # If EE error, timeout or another kind and no state returned
243  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = EEResponseData.TASK_STATE_UNDEFINED
244  logger.debug("No state field update, task Id: " + str(eeResponseData.id) + ", treated as UNDEFINED!")
245 
246  if operationType == self.OPERATION_DELETE_TASK:
247  if hasattr(eeResponseData, "deleteTaskId"):
248  updateTaskFields.fields["deleteTaskId"] = eeResponseData.deleteTaskId
249  # If EE has not found task - push it to delete on TasksManager
250  # if hasattr(eeResponseData, "deleteTaskState"):
251  # if eeResponseData.deleteTaskState == EEResponseData.TASK_STATE_NOT_FOUND:
252  # updateTaskFields.fields["deleteTaskState"] = EEResponseData.TASK_STATE_TERMINATED
253  # else:
254  # updateTaskFields.fields["deleteTaskState"] = eeResponseData.deleteTaskState
255  if hasattr(eeResponseData, "deleteTaskState"):
256  updateTaskFields.fields["deleteTaskState"] = EEResponseData.TASK_STATE_TERMINATED
257 
258  logger.debug("Fields to update:\n" + Utils.varDump(updateTaskFields))
259  # Create update event
260  updateTaskFieldsEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS, updateTaskFields)
261  if cookie is not None:
262  updateTaskFieldsEvent.cookie = cookie
263  # Send update event to TasksManager
264  self.send(self.clientTasksManagerName, updateTaskFieldsEvent)
265  logger.debug("Fields sent to update!")
266 
267 
268 
269  # #NewTasks action processor
270  #
271  # @param newTaskObj object
272  def processNewTask(self, newTaskObj):
273  # Prepare DRCE request object
274  taskExecuteStruct = TaskExecuteStruct()
275  taskExecuteStruct.command = newTaskObj.command
276  taskExecuteStruct.files = newTaskObj.files
277  taskExecuteStruct.input = newTaskObj.input
278  # Set session
279  taskExecuteStruct.session = Session(newTaskObj.session["tmode"], newTaskObj.session["type"],
280  newTaskObj.session["time_max"])
281  taskExecuteStruct.session.password = newTaskObj.session["password"]
282  taskExecuteStruct.session.port = newTaskObj.session[DTM_CONSTS.DRCE_FIELDS.PORT]
283  taskExecuteStruct.session.shell = newTaskObj.session["shell"]
284  taskExecuteStruct.session.timeout = newTaskObj.session["timeout"]
285  taskExecuteStruct.session.user = newTaskObj.session["user"]
286  taskExecuteStruct.session.home_directory = newTaskObj.session["home_directory"]
287  taskExecuteStruct.session.environment = newTaskObj.session["environment"]
288  # Set limits
289  taskExecuteStruct.limits = newTaskObj.limits
290  # Create DRCE TaskExecuteRequest object
291  taskExecuteRequest = TaskExecuteRequest(newTaskObj.id)
292  # Set taskExecuteRequest fields
293  taskExecuteRequest.data = taskExecuteStruct
294  # If session has route field - set custom route
295  if "route" in newTaskObj.session and newTaskObj.session["route"] is not None and newTaskObj.session["route"] != "":
296  taskExecuteRequest.route = newTaskObj.session["route"]
297  if "task_type" in newTaskObj.session and newTaskObj.session["task_type"] is not None and\
298  newTaskObj.session["task_type"] != "":
299  taskExecuteRequest.task_type = newTaskObj.session["task_type"]
300  logger.debug("Sending task to DRCE router, id=" + str(newTaskObj.id) + ", route:" + str(taskExecuteRequest.route))
301  # Send request to DRCE Cluster router
302  response = self.sendToDRCERouter(taskExecuteRequest)
303  logger.debug("Received from DRCE router, id=" + str(newTaskObj.id))
304  # Convert response to EEResponse object
305  eeResponseData = self.convertToEEResponse(response)
306  logger.debug("Response body:\n" + Utils.varDump(response))
307  logger.debug("eeResponseData object:\n" + Utils.varDump(eeResponseData))
308  # Update task Id to set proper Id in case of request timed out and no task Id in response
309  if eeResponseData.id == newTaskObj.id:
310  # eeResponseData.id = newTaskObj.id
311  # Update TaskFields in the TasksManager
312  logger.debug("Process update TaskFields in the TasksManager, id=" + str(newTaskObj.id))
313  self.processUpdateTaskFields(self.OPERATION_NEW_TASK, eeResponseData)
314  else:
315  logger.error("Wrong task Id= " + str(eeResponseData.id) + " returned from DRCE, expected id=" + \
316  str(newTaskObj.id) + ". TasksManager's fields not updated, task state not changed. Response:\n" +
317  Utils.varDump(eeResponseData))
318 
319 
320 
321  # #Send to send to DRCE Router transport router connection
322  #
323  # @param messageBody body of the message
324  # @return EEResponseData object instance
325  def sendToDRCERouter(self, request):
326  logger.debug("DRCE router sending request\n" + Utils.varDump(request))
327  # Try to execute request
328  try:
329  response = self.drceManager.process(request, self.drceTimeout)
330  except (ConnectionTimeout, TransportInternalErr, CommandExecutorErr) as err:
331  response = None
332  logger.error(err)
333 
334  return response
335 
336 
337 
338  # #Converts DRCE TaskResponse object to the EEResponseData object
339  #
340  # @param response drce.Commands.TaskResponse object
341  # @return EEResponseData object instance
342  def convertToEEResponse(self, response):
343  # Check response on validity, None if timeout reached
344  if response is not None and len(response.items) > 0:
345  eeR = EEResponseData(response.items[0].id)
346  # Fill eeR with fields from the returned object from EE
347  eeR.type = response.items[0].type
348  eeR.errorCode = response.items[0].error_code
349  eeR.errorMessage = response.items[0].error_message
350  eeR.state = response.items[0].state
351  eeR.pId = response.items[0].pid
352  eeR.requestTime = response.items[0].time
353  eeR.nodeHost = response.items[0].host
354  eeR.nodePort = response.items[0].port
355  eeR.nodeName = response.items[0].node
356  eeR.stdout = response.items[0].stdout
357  eeR.stderr = response.items[0].stderror
358  eeR.exitStatus = response.items[0].exit_status
359  eeR.taskTime = response.items[0].time
360  eeR.files = response.items[0].files
361  eeR.fields = response.items[0].fields
362  logger.debug("To EEResponseData converted!")
363  else:
364  # Fill eeR with error state info if timeout reached
365  eeR = EEResponseData(0)
366  eeR.errorCode = EEResponseData.ERROR_CODE_TIMEOUT
367  eeR.errorMessage = EEResponseData.ERROR_MESSAGE_TIMEOUT
369 
370  return eeR
371 
372 
373 
374  # #Converts raw HCE Admin API response to DRCE TaskResponse object
375  #
376  # @param rawResponse HCE Admin API response raw buffer
377  # @return drce.Commands.TaskResponse object
378  def convertToTaskResponse(self, rawResponse):
379  taskResponse = None
380 
381  # Parse response status
382  items = rawResponse.split(admin.Constants.COMMAND_DELIM)
383  if len(items) > 1:
384  # Convert DRCE jason protocol response to TaskResponse object
385  try:
386  taskResponse = self.drceCommandConvertor.from_json(items[1])
387  logger.debug("To taskResponse converted!")
388  except Exception, e:
389  logger.error(self.ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION + " : " + e.__doc__ + " : " + str(e.message))
390  else:
391  logger.error(self.ERROR_HCE_RESPONSE_PROCESSING_SPLIT)
392 
393  return taskResponse
394 
395 
396 
397  # #onCheckTaskState event handler
398  #
399  # @param event instance of Event object
400  def onCheckTaskState(self, event):
401  try:
402  # Get event object
403  checkTaskStateObj = event.eventObj
404  # Get TaskManager fields
405  self.sendGetTaskManagerFieldsRequest(checkTaskStateObj.id, ("onCheckTaskState", event))
406  except Exception as err:
407  tbi = Utils.getTracebackInfo()
408  logger.error("Exception: " + str(err.message) + "\n" + tbi)
409 
410 
411 
412  # #onFetchTaskResults event handler
413  #
414  # @param event instance of Event object
415  def onFetchTaskResults(self, event):
416  try:
417  # Get event object
418  fetchTaskResultsObj = event.eventObj
419  # Send the GetTaskManagerFields request
420  self.sendGetTaskManagerFieldsRequest(fetchTaskResultsObj.id, ("onFetchTaskResults", event))
421  except Exception as err:
422  tbi = Utils.getTracebackInfo()
423  logger.error("Exception: " + str(err.message) + "\n" + tbi)
424 
425 
426 
427  # #onDeleteTaskResults event handler
428  #
429  # @param event instance of Event object
430  def onDeleteTaskResults(self, event):
431  try:
432  # Get event object
433  deleteTaskResultsObj = event.eventObj
434  # Send the GetTaskManagerFields request
435  self.sendGetTaskManagerFieldsRequest(deleteTaskResultsObj.id, ("onDeleteTaskResults", event))
436  except Exception as err:
437  tbi = Utils.getTracebackInfo()
438  logger.error("Exception: " + str(err.message) + "\n" + tbi)
439 
440 
441 
442  # #Send to EE transport node admin connection
443  #
444  # @param host HCE node host
445  # @param port HCE node port
446  # @param messageParameters HCE node Admin request message parameters string
447  # @return the raw body of HCE Admin API response if success or empty string if not
448  def sendToHCENodeAdmin(self, host, port, messageParameters):
449  # Execute EE node admin request
450  node = admin.Node.Node(host, port)
451  params = [messageParameters]
452  try:
453  command = admin.Command.Command(admin.Constants.COMMAND_NAMES.DRCE,
454  params,
455  admin.Constants.ADMIN_HANDLER_TYPES.DATA_PROCESSOR_DATA,
456  self.hceNodeAdminTimeout)
457  requestBody = command.generateBody()
458  message = {admin.Constants.STRING_MSGID_NAME : self.drceIdGenerator.get_uid(),
459  admin.Constants.STRING_BODY_NAME : requestBody}
460  response = self.hceNodeManagerRequest.makeRequest(node, message, self.hceNodeAdminTimeout)
461  logger.debug("Response from HCE node Admin received!")
462 
463  return response.getBody()
464  except Exception, e:
465  logger.error(self.ERROR_HCE_ADMIN_REQUEST_ERROR + " : " + str(e.message))
466  return ""
467 
468 
469 
470  # #Send GetTaskManager request
471  #
472  # @param taskId task Id
473  # @param cookieData data that will be copied from send event to reply
474  def sendGetTaskManagerFieldsRequest(self, taskId, cookieData=None):
475  # Get TaskManager fields
476  # Prepare synch GetTaskFields request to the TasksManager
477  getTaskManagerFieldsObj = EventObjects.GetTaskManagerFields(taskId)
478  getTaskManagerFieldsEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.GET_TASK_FIELDS,
479  getTaskManagerFieldsObj)
480  if cookieData is not None:
481  getTaskManagerFieldsEvent.cookie = cookieData
482 
483  self.send(self.clientTasksManagerName, getTaskManagerFieldsEvent)
484  logger.debug("GetTaskManagerFields sent!")
485 
486 
487 
488  # #processFetchTaskResult action
489  #
490  # @param event received from TasksManager
491  def processFetchTaskResults(self, event):
492  # Get event object
493  taskManagerFields = event.eventObj
494 
495  # Check is task found
496  if len(taskManagerFields.fields) > 0:
497  # Check task state
498  if taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_FINISHED or\
499  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_CRASHED or\
500  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_TERMINATED or\
501  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL or\
502  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_NEW:
503  fetchTaskResultsObj = event.cookie[1].eventObj
504  # Prepare the messageBodyJson for DRCE request
505  taskGetDataRequest = TaskGetDataRequest(fetchTaskResultsObj.id, fetchTaskResultsObj.type)
506  messageBodyJson = self.drceCommandConvertor.to_json(taskGetDataRequest)
507 
508  logger.debug("(line 508) Call sendToHCENodeAdmin() use Host: '%s' and Port '%s'",
509  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]),
510  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]))
511 
512  rawResponse = self.sendToHCENodeAdmin(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST],
513  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT],
514  messageBodyJson)
515  eeResponseData = self.convertToEEResponse(self.convertToTaskResponse(rawResponse))
516  logger.debug("Response received from EE for onFetchTaskResults!")
517  if eeResponseData.errorCode == EEResponseData.ERROR_CODE_OK and \
518  EEResponseData.TASK_STATE_FINISHED:
519  # Update EEResponseDtata object in the TasksDataManager container
520  logger.debug("Update EEResponseDtata object in the TasksDataManager container sent!")
521  insertEEDataEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.INSERT_EE_DATA, eeResponseData)
522  self.send(self.clientTasksDataManagerName, insertEEDataEvent)
523  # Update TaskFields in the TasksManager
524  logger.debug("Update TaskFields in the TasksManager!")
525  self.processUpdateTaskFields(self.OPERATION_FETCH_RESULTS, eeResponseData)
526  else:
527  # Return state, requested operation can't to be done on DRCE cause task is not in proper state
528  eeResponseData = EEResponseData(taskManagerFields.id)
529  eeResponseData.state = taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE]
530  logger.debug("Wrong task state " + str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE]) +
531  " for FetchTaskResults operation!")
532  else:
533  eeResponseData = EEResponseData(0)
534  eeResponseData.state = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
535  eeResponseData.errorCode = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
536  eeResponseData.errorMessage = EEResponseData.ERROR_MESSAGE_TASK_NOT_FOUND
537  eeResponseData.exitStatus = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
538  logger.error("Empty mandatory fields received from TasksManager:\n%s", Utils.varDump(taskManagerFields))
539  # Prepare check task reply event
540  fetchTaskResultsReplyEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.FETCH_TASK_RESULTS_RESPONSE,
541  eeResponseData)
542  self.reply(event.cookie[1], fetchTaskResultsReplyEvent)
543 
544 
545 
546  # #processCheckTaskState action
547  #
548  # @param event received from TasksManager
549  def processCheckTaskState(self, event):
550  # Get event object
551  taskManagerFields = event.eventObj
552 
553  # Check is task found
554  if len(taskManagerFields.fields) > 0 and taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] is not None and\
555  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] != "" and\
556  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] is not None and\
557  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] != "":
558 
559  # Check task state
560  checkTaskStateObj = event.cookie[1].eventObj
561  # Prepare the messageBodyJson for DRCE request
562  taskCheckRequest = TaskCheckRequest(checkTaskStateObj.id, checkTaskStateObj.type)
563  messageBodyJson = self.drceCommandConvertor.to_json(taskCheckRequest)
564  # Make HCE node admin request
565  logger.debug("Make HCE node admin request!")
566  logger.debug("(line 566) Call sendToHCENodeAdmin() use Host: '%s' and Port '%s'",
567  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]),
568  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]))
569 
570  rawResponse = self.sendToHCENodeAdmin(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST],
571  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT],
572  messageBodyJson)
573  eeResponseData = self.convertToEEResponse(self.convertToTaskResponse(rawResponse))
574  if eeResponseData.errorCode == EEResponseData.ERROR_CODE_OK:
575  if eeResponseData.nodeHost == "" or eeResponseData.nodePort == "":
576  logger.error(str(vars(eeResponseData)))
577  logger.debug("Received Host or Port is empty!")
578  # Update TaskFields in the TasksManager
579  logger.debug("Update TaskFields in the TasksManager!")
580  self.processUpdateTaskFields(self.OPERATION_CHECK_STATE, eeResponseData)
581  else:
582  eeResponseData = EEResponseData(taskManagerFields.id)
583  eeResponseData.state = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
584  eeResponseData.errorCode = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
585  eeResponseData.errorMessage = EEResponseData.ERROR_MESSAGE_TASK_NOT_FOUND
586  eeResponseData.exitStatus = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
587  logger.error("Empty mandatory fields received from TasksManager:\n%s", Utils.varDump(taskManagerFields))
588  # Prepare check task reply event
589  checkTaskStateReplyEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE_RESPONSE,
590  eeResponseData)
591  self.reply(event.cookie[1], checkTaskStateReplyEvent)
592  logger.debug("Check task reply event sent!")
593 
594 
595 
596  # #createTaskDeleteRequest creates and returns TaskTerminateRequest or TaskDeleteRequest objects
597  #
598  # @param deleteTaskObj - deleted tasks object
599  def createTaskDeleteRequest(self, deleteTaskObj):
600  ret = None
601  if deleteTaskObj.action == EventObjects.DeleteTask.ACTION_DELETE_TASK_DATA:
602  # Prepare DRCE request object for delete task's data
603  ret = TaskDeleteRequest(deleteTaskObj.deleteTaskId)
604  elif deleteTaskObj.action == EventObjects.DeleteTask.ACTION_TERMINATE_TASK_AND_DELETE_DATA:
605  # Prepare DRCE request object for terminate task and delete it's data (default init)
606  ret = TaskTerminateRequest(deleteTaskObj.deleteTaskId)
607  else:
608  # Prepare DRCE request object for terminate task and leave it's data
609  ret = TaskTerminateRequest(deleteTaskObj.deleteTaskId)
610  ret.data["cleanup"] = DRCE_CONSTS.TERMINATE_DATA_SAVE
611  return ret
612 
613 
614  # #checkDelTaskState check deleted task state
615  #
616  # @param state deleteed task state
617  # return bool value -available task to delete or not
618  def checkDelTaskState(self, state, action):
619  ret = False
620  if state == EEResponseData.TASK_STATE_FINISHED or\
621  state == EEResponseData.TASK_STATE_CRASHED or\
622  state == EEResponseData.TASK_STATE_TERMINATED or\
623  state == EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL or\
624  state == EEResponseData.TASK_STATE_DELETED or\
625  state == EEResponseData.TASK_STATE_UNDEFINED or\
626  state == EEResponseData.TASK_STATE_SET_ERROR or\
627  state == EEResponseData.TASK_STATE_NOT_FOUND or\
628  state == EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED or\
629  action == EventObjects.DeleteTask.ACTION_TERMINATE_TASK_AND_DELETE_DATA:
630  if action != EventObjects.DeleteTask.ACTION_DELETE_ON_DTM:
631  ret = True
632  return ret
633 
634 
635  # #processDeleteTask action
636  #
637  # @param event received from TasksManager
638  def processDeleteTask(self, event):
639  # Get event object
640  taskManagerFields = event.eventObj
641  deleteTaskObj = event.cookie[1]
642 
643  # Check is task found
644  if len(taskManagerFields.fields) > 0:
645  eeResponseData = None
646  deletedTasksState = taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] if \
647  DTM_CONSTS.DRCE_FIELDS.STATE in taskManagerFields.fields else None
648  # Check task data
649  if taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] is not None and\
650  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] != "" and\
651  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] is not None and\
652  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] != "":
653  if self.checkDelTaskState(deletedTasksState, deleteTaskObj.action):
654  taskDeleteRequest = self.createTaskDeleteRequest(deleteTaskObj)
655  messageBodyJson = self.drceCommandConvertor.to_json(taskDeleteRequest)
656  logger.debug("Send TaskDeleteRequest to HCE node Admin API, taskId=" + str(deleteTaskObj.deleteTaskId))
657  logger.debug("(line 657) Call sendToHCENodeAdmin() use Host: '%s' and Port '%s'",
658  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]),
659  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]))
660 
661  rawResponse = self.sendToHCENodeAdmin(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST],
662  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT],
663  messageBodyJson)
664  logger.debug("TaskDeleteequest rawResponse=[" + rawResponse + "]")
665  if rawResponse != "":
666  eeResponseData = self.convertToEEResponse(self.convertToTaskResponse(rawResponse))
667  logger.debug("TaskDeleteRequest response taskId=" + str(deleteTaskObj.deleteTaskId) + \
668  ", state:" + str(eeResponseData.state) + \
669  ", exitStatus:" + str(eeResponseData.exitStatus) + \
670  ", stdout: " + eeResponseData.stdout + ", stderr:" + eeResponseData.stderr)
671  eeResponseData.id = deleteTaskObj.id
672  #!!!Replace the unsupported states like UNDEFINED or NOTFOUND to push to remove this tasks from TasksManager
673  if eeResponseData.state != EEResponseData.TASK_STATE_IN_PROGRESS and\
674  eeResponseData.state != EEResponseData.TASK_STATE_NEW:
675  logger.debug("Task state was substituted from " + str(eeResponseData.state) + " to " + \
676  str(EEResponseData.TASK_STATE_DELETED) + ", taskId=" + str(deleteTaskObj.deleteTaskId))
677  # eeResponseData.state = EEResponseData.TASK_STATE_DELETED
678  # Substitute state for task to delete and deleted task
679  eeResponseData.nodeName = ""
680  eeResponseData.nodeHost = ""
681  eeResponseData.nodePort = 0
682  eeResponseData.state = EEResponseData.TASK_STATE_DELETED
683  if eeResponseData.errorCode > 0:
684  logger.debug("TaskDelete request response error:" + str(eeResponseData.errorCode) + " : " + \
685  eeResponseData.errorMessage)
686  eeResponseData.state = EEResponseData.TASK_STATE_SET_ERROR
687  eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
688  eeResponseData.deleteTaskState = None
689  else:
690  eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
691  eeResponseData.deleteTaskState = eeResponseData.state
692  else:
693  # Set state as TERMINATED to push the TasksManager to delete task's data on DTM, save the dat file in DRCE node
694  eeResponseData = EEResponseData(deleteTaskObj.id)
695  eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
696  eeResponseData.deleteTaskState = EEResponseData.TASK_STATE_DELETED
697  eeResponseData.state = EEResponseData.TASK_STATE_DELETED
698  logger.error("TaskDeleteRequest response fault, taskId=" + str(deleteTaskObj.deleteTaskId) + "!")
699  else:
700  eeResponseData = EEResponseData(deleteTaskObj.id)
701  eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
702  if deleteTaskObj.action == EventObjects.DeleteTask.ACTION_DELETE_ON_DTM:
703  eeResponseData.state = EEResponseData.TASK_STATE_DELETED
704  eeResponseData.deleteTaskState = EEResponseData.TASK_STATE_DELETED
705  logger.debug("Deleted task only on DTM")
706 # eeResponseData.deleteTaskState = deletedTasksState
707  else:
708  eeResponseData.state = EEResponseData.TASK_STATE_SET_ERROR
709  logger.debug("Deleted task " + str(eeResponseData.deleteTaskId) + " has bad[Not deleted state] " +
710  str(deletedTasksState) + " delete error")
711  else:
712  # Set state as TERMINATED to push the TasksManager to delete task's data on DTM, save the dat file in DRCE node
713  msg = "Host or Port is empty! Set state as TERMINATED to push delete task, deleteTaskObj.id=" + \
714  str(deleteTaskObj.id) + ", deleteTaskObj.deleteTaskId=" + str(deleteTaskObj.deleteTaskId)
715  eeResponseData = EEResponseData(deleteTaskObj.id)
716  eeResponseData.errorCode = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
717  eeResponseData.errorMessage = msg
718  eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
719  eeResponseData.deleteTaskState = EEResponseData.TASK_STATE_TERMINATED
720  eeResponseData.state = EEResponseData.TASK_STATE_TERMINATED
721  logger.error(msg)
722 
723  if eeResponseData is not None:
724  # Update TaskFields in the TasksManager
725  self.processUpdateTaskFields(self.OPERATION_DELETE_TASK, eeResponseData)
726  logger.debug("Update TaskFields in the TasksManager!")
727  else:
728  logger.error("Empty fields received from TasksManager for DeleteTask!")
729 
730 
731  def createGeneralResponse(self, errCode, errMessage, errLog):
732  generalResponseObj = EventObjects.GeneralResponse()
733  generalResponseObj.errorCode = errCode
734  generalResponseObj.errorMessage = errMessage
735  logger.debug(errLog)
736  return generalResponseObj
737 
738 
739  # #processDeleteTaskResult action
740  #
741  # @param event received from TasksManager
742  def processDeleteTaskResults(self, event):
743  # Get event object
744  taskManagerFields = event.eventObj
745  # New General response event object for error case
746  generalResponseObj = None
747  hceNodeAdminRequestState = EEResponseData.ERROR_CODE_OK
748 
749  # Check is task found
750  if len(taskManagerFields.fields) > 0:
751  # Check task data
752  if taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_FINISHED or\
753  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_CRASHED or\
754  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_TERMINATED or\
755  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL or\
756  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_DELETED or\
757  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_UNDEFINED or\
758  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_NOT_FOUND:
759  deleteTaskResultsObj = event.cookie[1].eventObj
760  # Prepare the messageBodyJson for DRCE request
761  logger.info("Send TaskGetDataRequest with DELETE data, taskId=" + str(deleteTaskResultsObj.id))
762  # TODO: Use GetDataRequest to delete results, need to be replaced with native command later
763  taskDeleteRequest = TaskDeleteRequest(deleteTaskResultsObj.id)
764  messageBodyJson = self.drceCommandConvertor.to_json(taskDeleteRequest)
765 
766  logger.debug("(line 766) Call sendToHCENodeAdmin() use Host: '%s' and Port '%s'",
767  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]),
768  str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]))
769 
770  rawResponse = self.sendToHCENodeAdmin(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST],
771  taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT],
772  messageBodyJson)
773  logger.info("TaskDeleteResultsRequest rawResponse=[" + rawResponse + "]")
774  if rawResponse != "":
775  eeResponseData = self.convertToEEResponse(self.convertToTaskResponse(rawResponse))
776  if eeResponseData.errorCode == EEResponseData.ERROR_CODE_OK:
777  logger.info("Response received from EE")
778  else:
779  generalResponseObj = self.createGeneralResponse(EventObjects.DeleteTaskResults.DRCE_ERROR,
780  EventObjects.DeleteTaskResults.DRCE_ERROR_MESSAGE,
781  "DRCE error error=" + str(eeResponseData.errorCode))
782  else:
783  generalResponseObj = self.createGeneralResponse(EventObjects.DeleteTaskResults.EMPRY_RAW_ERROR,
784  EventObjects.DeleteTaskResults.EMPRY_RAW_ERROR_MESSAGE, "Empty rawResponse")
785  else:
786  # Return state, requested operation can't to be done on DRCE cause task is not in proper state
787  generalResponseObj = self.createGeneralResponse(EventObjects.DeleteTaskResults.TASK_STATE_ERROR,
788  EventObjects.DeleteTaskResults.TASK_STATE_ERROR_MESSAGE,
789  "Wrong task state " + str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE]) +
790  ", can't cleanup!")
791  else:
792  generalResponseObj = self.createGeneralResponse(EventObjects.DeleteTaskResults.TASK_NOT_FOUND_ERROR,
793  EventObjects.DeleteTaskResults.TASK_NOT_FOUND_ERROR_MESSAGE,
794  "Empty fields from TasksManager for DeleteTaskResults, possible task not found!")
795  # Send DELETE_EE_DATA request to the TasksDataManager
796  if generalResponseObj == None:
797  logger.debug("Send DELETE_EE_DATA request to the TasksDataManager!")
798  deleteEEResponseDataObj = EventObjects.DeleteEEResponseData(taskManagerFields.id)
799  deleteEEResponseDataEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_EE_DATA, deleteEEResponseDataObj)
800  deleteEEResponseDataEvent.cookie = ("processDeleteTaskResults", event.cookie[1], hceNodeAdminRequestState)
801  self.send(self.clientTasksDataManagerName, deleteEEResponseDataEvent)
802  logger.debug("Sent request DeleteEEResponseDataObj to TasksDataManager!")
803  else:
804  deleteTaskResultsReplyEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK_RESULTS_RESPONSE,
805  generalResponseObj)
806  self.reply(event.cookie[1], deleteTaskResultsReplyEvent)
807  logger.info("Send response GeneralResponse to ClientInterfaceService!")
808 
809 
810 
811  # #onGetTaskManagerFieldsResponse event handler
812  #
813  # @param event instance of Event object
815  try:
816  if event.cookie is not None and event.cookie[0] == "onFetchTaskResults":
817  # Continue the onFetchTaskResults handling
818  self.processFetchTaskResults(event)
819 
820  if event.cookie is not None and event.cookie[0] == "onCheckTaskState":
821  # Continue the onCheckTaskState handling
822  self.processCheckTaskState(event)
823 
824  if event.cookie is not None and event.cookie[0] == "onFetchTaskDataResponse":
825  # Continue DeleteTask
826  self.processDeleteTask(event)
827 
828  if event.cookie is not None and event.cookie[0] == "onDeleteTaskResults":
829  # Continue DeleteTask
830  self.processDeleteTaskResults(event)
831  except Exception as err:
832  tbi = Utils.getTracebackInfo()
833  logger.error("Exception: " + str(err.message) + "\n" + tbi)
834 
835 
836 
837  # #onInsertEEDataResponse event handler, process the response from the TasksDataManager object
838  #
839  # @param event instance of Event object
840  def onInsertEEDataResponse(self, event):
841  try:
842  # Get task Id from event
843  generalResponse = event.eventObj
844  if generalResponse.errorCode != EventObjects.GeneralResponse.ERROR_OK:
845  logger.error(LogFormatterEvent(event, [], self.ERROR_INSERT_EE_DATA))
846  except Exception as err:
847  tbi = Utils.getTracebackInfo()
848  logger.error("Exception: " + str(err.message) + "\n" + tbi)
849 
850 
851 
852  # #onUpdateTasksFieldsResponse event handler, process the response from the TasksManager object
853  #
854  # @param event instance of Event object
855  def onUpdateTasksFieldsResponse(self, event):
856  try:
857  # Get task Id from event
858  generalResponse = event.eventObj
859  if generalResponse.errorCode != EventObjects.GeneralResponse.ERROR_OK:
860  logger.error(LogFormatterEvent(event, [], self.ERROR_UPDATE_TASKS_FIELDS))
861  except Exception as err:
862  tbi = Utils.getTracebackInfo()
863  logger.error("Exception: " + str(err.message) + "\n" + tbi)
864 
865 
866 
867  # #onDeleteEEDataResponse event handler, process the response from the TasksDataManager object
868  #
869  # @param event instance of Event object
870  def onDeleteEEDataResponse(self, event):
871  try:
872  # Get task Id from event
873  generalResponse = event.eventObj
874  if generalResponse.errorCode != EventObjects.GeneralResponse.ERROR_OK:
875  logger.error(LogFormatterEvent(event, [], self.ERROR_UPDATE_TASKS_FIELDS))
876 
877  if event.cookie is not None and event.cookie[0] == "processDeleteTaskResults":
878  # Continue the onDeleteTaskResults handling
879  generalResponseObj = EventObjects.GeneralResponse()
880  generalResponseObj.statuses = (event.cookie[2], generalResponse.errorCode)
881  if generalResponse.errorCode != EventObjects.GeneralResponse.ERROR_OK or\
882  event.cookie[2] != EEResponseData.ERROR_CODE_OK:
883  generalResponseObj.errorCode = self.ERROR_DELETE_TASK_RESULTS
884  generalResponseObj.errorMessage = self.ERROR_DELETE_TASK_RESULTS_MESSAGE
885  # Prepare delete task results reply event for error case
886  deleteTaskResultsReplyEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK_RESULTS_RESPONSE,
887  generalResponseObj)
888  self.reply(event.cookie[1], deleteTaskResultsReplyEvent)
889  except Exception as err:
890  tbi = Utils.getTracebackInfo()
891  logger.error("Exception: " + str(err.message) + "\n" + tbi)
892 
def reply(self, event, reply_event)
wrapper for sending event in reply for event
UpdateTaskFields event object, for update task fields operation.
Log formatter event, defines the object to format message string.
Definition: LogFormatter.py:16
def makeRequest(self, node, message, commandTimeout=None)
makeRequest main class method, it gets node and message params, interact with transport layer...
Check task request.
Definition: Commands.py:169
Delete task request.
Definition: Commands.py:208
def process(self, event)
process event call the event handler method that was set by user or on_unhandled_event method if not ...
NodeManagerRequest class contents all data needed for admin level's request sending.
wrapper for TaskExecuteStruct
Definition: Commands.py:87
NewTask event object, defines the Task object fields.
GeneralResponse event object, represents general state response for multipurpose usage.
Command class contents "commad" data and processing methods.
Definition: Command.py:16
def processUpdateTaskFields(self, operationType, eeResponseData, cookie=None)
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def addConnection(self, name, connection)
DeleteTask event object, to delete task from DTM application and from EE.
This is app base class for management server connection end-points and parallel transport messages pr...
wrapper for task request
Definition: Commands.py:158
wrapper for Session fields array of execute task
Definition: Commands.py:32
UIDGenerator is used to generate unique message id.
Definition: UIDGenerator.py:14
GetTaskManagerFields event object, for get task fields values operation.
Class hides routines of bulding connection objects.
def send(self, connect_name, event)
send event
DeleteEEResponseData event object, to delete EE response data from the storage.
def __init__(self, configParser, connectionBuilderLight=None)
FetchTaskData event object, to fetch task data from the storage.
Convertor which used to convert Task*Reques to json and TaskResponse from json.
EEResponseData event object, store task results data, returned from EE.
Get task's data request.
Definition: Commands.py:180
Terminate task request.
Definition: Commands.py:191