2 HCE project, Python bindings, Distributed Tasks Manager application. 3 ExecutionEnvironmentManager object and related classes definitions. 6 @author bgv bgv.hce@gmail.com 7 @link: http://hierarchical-cluster-engine.com/ 8 @copyright: Copyright © 2013-2014 IOIX Ukraine 9 @license: http://hierarchical-cluster-engine.com/license/ 14 from datetime
import datetime
28 from drce.DRCEManager import ConnectionTimeout, TransportInternalErr, CommandExecutorErr
31 from dtm
import EventObjects
35 from EventObjects
import EEResponseData
43 logger = logging.getLogger(DTM_CONSTS.LOGGER_NAME)
55 CONFIG_SERVER =
"server" 56 CONFIG_TASKS_MANAGER_CLIENT =
"clientTasksManager" 57 CONFIG_TASKS_MANAGER_DATA_CLIENT =
"clientTasksDataManager" 58 CONFIG_DRCE_HOST =
"DRCEHost" 59 CONFIG_DRCE_PORT =
"DRCEPort" 60 CONFIG_DRCE_TIMEOUT =
"DRCETimeout" 61 CONFIG_HCE_NODE_ADMIN_TIMEOUT =
"HCENodeAdminTimeout" 64 ERROR_MSG_DRCE_ROUTER_NEW_TASK =
"DRCE Router request error!" 65 ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION =
"HCE node Admin API response processing exception" 66 ERROR_HCE_RESPONSE_PROCESSING_SPLIT =
"HCE node Admin API response processing can't to split status code" 67 ERROR_INSERT_EE_DATA =
"Error insert EE response data operation" 68 ERROR_UPDATE_TASKS_FIELDS =
"Update tasks fields error" 69 ERROR_WRONG_OBJECT_TYPE =
"Wrong object type from TasksDataManager" 70 ERROR_EE_RESPONSE_OBJECT_TYPE_OR_RESPONSE_ERROR =
"EEResponseData object error or wrong response structure" 71 ERROR_HCE_ADMIN_REQUEST_ERROR =
"HCE Admin request error" 73 ERROR_DELETE_TASK_RESULTS = 1
74 ERROR_DELETE_TASK_RESULTS_MESSAGE =
"Delete task results error of EE request response or TaskManager!" 76 OPERATION_NEW_TASK = 0
77 OPERATION_DELETE_TASK = 1
78 OPERATION_CHECK_STATE = 2
79 OPERATION_FETCH_RESULTS = 3
87 def __init__(self, configParser, connectionBuilderLight=None):
88 super(ExecutionEnvironmentManager, self).
__init__()
91 if connectionBuilderLight ==
None:
94 className = self.__class__.__name__
102 serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.
serverName)
103 tasksManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT, self.
clientTasksManagerName)
104 tasksDataManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
152 executeTasks = event.eventObj
155 fetchTaskDataEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.FETCH_TASK_DATA, fetchTaskData)
158 logger.info(
"Sent request FetchTaskData to TasksDataManager, id=" + str(executeTasks.id))
159 except Exception
as err:
160 logger.error(
"Exception: " + str(err.message) +
"\n" + Utils.getTracebackInfo())
173 logger.info(
"New task processing started, id=" + str(obj.id))
176 logger.info(
"New task processing finished, id=" + str(obj.id))
179 logger.info(
"Delete task processing started, id=" + str(obj.id))
183 logger.debug(
"GetTaskManagerFields request sent, id=" + str(obj.id))
185 logger.error(
"Wrong type received from TasksDataManager!")
189 except Exception
as err:
190 tbi = Utils.getTracebackInfo()
191 logger.error(
"Exception: " + str(err.message) +
"\n" + tbi)
202 logger.debug(
"eeResponseData:" + str(vars(eeResponseData)))
206 if eeResponseData.errorCode == EEResponseData.ERROR_CODE_OK:
207 logger.debug(
"EE returned OK result!")
209 updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = eeResponseData.state
210 updateTaskFields.fields[
"pId"] = eeResponseData.pId
211 updateTaskFields.fields[
"nodeName"] = eeResponseData.nodeName
212 updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] = eeResponseData.nodeHost
213 updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] = eeResponseData.nodePort
214 if DTM_CONSTS.DRCE_FIELDS.URRAM
in eeResponseData.fields:
215 updateTaskFields.fields[
"uRRAM"] = eeResponseData.fields[DTM_CONSTS.DRCE_FIELDS.URRAM]
216 if DTM_CONSTS.DRCE_FIELDS.UVRAM
in eeResponseData.fields:
217 updateTaskFields.fields[
"uVRAM"] = eeResponseData.fields[DTM_CONSTS.DRCE_FIELDS.UVRAM]
218 if DTM_CONSTS.DRCE_FIELDS.UCPU
in eeResponseData.fields:
219 updateTaskFields.fields[
"uCPU"] = eeResponseData.fields[DTM_CONSTS.DRCE_FIELDS.UCPU]
220 if DTM_CONSTS.DRCE_FIELDS.UTHREADS
in eeResponseData.fields:
221 updateTaskFields.fields[
"uThreads"] = eeResponseData.fields[DTM_CONSTS.DRCE_FIELDS.UTHREADS]
223 logger.error(
"EE error returned!")
226 updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = EEResponseData.TASK_STATE_SET_ERROR
227 updateTaskFields.fields[
"sDate"] = datetime.now()
228 if eeResponseData.type == EEResponseData.REQUEST_TYPE_SET:
229 updateTaskFields.fields[
"rDate"] = updateTaskFields.fields[
"sDate"]
234 updateTaskFields.fields[
"rDate"] = datetime.now()
235 updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = EEResponseData.TASK_STATE_TERMINATED
238 if hasattr(eeResponseData, DTM_CONSTS.DRCE_FIELDS.STATE):
240 updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = eeResponseData.state
243 updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] = EEResponseData.TASK_STATE_UNDEFINED
244 logger.debug(
"No state field update, task Id: " + str(eeResponseData.id) +
", treated as UNDEFINED!")
247 if hasattr(eeResponseData,
"deleteTaskId"):
248 updateTaskFields.fields[
"deleteTaskId"] = eeResponseData.deleteTaskId
255 if hasattr(eeResponseData,
"deleteTaskState"):
256 updateTaskFields.fields[
"deleteTaskState"] = EEResponseData.TASK_STATE_TERMINATED
258 logger.debug(
"Fields to update:\n" + Utils.varDump(updateTaskFields))
260 updateTaskFieldsEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS, updateTaskFields)
261 if cookie
is not None:
262 updateTaskFieldsEvent.cookie = cookie
265 logger.debug(
"Fields sent to update!")
275 taskExecuteStruct.command = newTaskObj.command
276 taskExecuteStruct.files = newTaskObj.files
277 taskExecuteStruct.input = newTaskObj.input
279 taskExecuteStruct.session =
Session(newTaskObj.session[
"tmode"], newTaskObj.session[
"type"],
280 newTaskObj.session[
"time_max"])
281 taskExecuteStruct.session.password = newTaskObj.session[
"password"]
282 taskExecuteStruct.session.port = newTaskObj.session[DTM_CONSTS.DRCE_FIELDS.PORT]
283 taskExecuteStruct.session.shell = newTaskObj.session[
"shell"]
284 taskExecuteStruct.session.timeout = newTaskObj.session[
"timeout"]
285 taskExecuteStruct.session.user = newTaskObj.session[
"user"]
286 taskExecuteStruct.session.home_directory = newTaskObj.session[
"home_directory"]
287 taskExecuteStruct.session.environment = newTaskObj.session[
"environment"]
289 taskExecuteStruct.limits = newTaskObj.limits
293 taskExecuteRequest.data = taskExecuteStruct
295 if "route" in newTaskObj.session
and newTaskObj.session[
"route"]
is not None and newTaskObj.session[
"route"] !=
"":
296 taskExecuteRequest.route = newTaskObj.session[
"route"]
297 if "task_type" in newTaskObj.session
and newTaskObj.session[
"task_type"]
is not None and\
298 newTaskObj.session[
"task_type"] !=
"":
299 taskExecuteRequest.task_type = newTaskObj.session[
"task_type"]
300 logger.debug(
"Sending task to DRCE router, id=" + str(newTaskObj.id) +
", route:" + str(taskExecuteRequest.route))
303 logger.debug(
"Received from DRCE router, id=" + str(newTaskObj.id))
306 logger.debug(
"Response body:\n" + Utils.varDump(response))
307 logger.debug(
"eeResponseData object:\n" + Utils.varDump(eeResponseData))
309 if eeResponseData.id == newTaskObj.id:
312 logger.debug(
"Process update TaskFields in the TasksManager, id=" + str(newTaskObj.id))
315 logger.error(
"Wrong task Id= " + str(eeResponseData.id) +
" returned from DRCE, expected id=" + \
316 str(newTaskObj.id) +
". TasksManager's fields not updated, task state not changed. Response:\n" +
317 Utils.varDump(eeResponseData))
326 logger.debug(
"DRCE router sending request\n" + Utils.varDump(request))
330 except (ConnectionTimeout, TransportInternalErr, CommandExecutorErr)
as err:
344 if response
is not None and len(response.items) > 0:
347 eeR.type = response.items[0].type
348 eeR.errorCode = response.items[0].error_code
349 eeR.errorMessage = response.items[0].error_message
350 eeR.state = response.items[0].state
351 eeR.pId = response.items[0].pid
352 eeR.requestTime = response.items[0].time
353 eeR.nodeHost = response.items[0].host
354 eeR.nodePort = response.items[0].port
355 eeR.nodeName = response.items[0].node
356 eeR.stdout = response.items[0].stdout
357 eeR.stderr = response.items[0].stderror
358 eeR.exitStatus = response.items[0].exit_status
359 eeR.taskTime = response.items[0].time
360 eeR.files = response.items[0].files
361 eeR.fields = response.items[0].fields
362 logger.debug(
"To EEResponseData converted!")
366 eeR.errorCode = EEResponseData.ERROR_CODE_TIMEOUT
367 eeR.errorMessage = EEResponseData.ERROR_MESSAGE_TIMEOUT
382 items = rawResponse.split(admin.Constants.COMMAND_DELIM)
387 logger.debug(
"To taskResponse converted!")
403 checkTaskStateObj = event.eventObj
406 except Exception
as err:
407 tbi = Utils.getTracebackInfo()
408 logger.error(
"Exception: " + str(err.message) +
"\n" + tbi)
418 fetchTaskResultsObj = event.eventObj
421 except Exception
as err:
422 tbi = Utils.getTracebackInfo()
423 logger.error(
"Exception: " + str(err.message) +
"\n" + tbi)
433 deleteTaskResultsObj = event.eventObj
436 except Exception
as err:
437 tbi = Utils.getTracebackInfo()
438 logger.error(
"Exception: " + str(err.message) +
"\n" + tbi)
451 params = [messageParameters]
455 admin.Constants.ADMIN_HANDLER_TYPES.DATA_PROCESSOR_DATA,
457 requestBody = command.generateBody()
458 message = {admin.Constants.STRING_MSGID_NAME : self.
drceIdGenerator.get_uid(),
459 admin.Constants.STRING_BODY_NAME : requestBody}
461 logger.debug(
"Response from HCE node Admin received!")
463 return response.getBody()
478 getTaskManagerFieldsEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.GET_TASK_FIELDS,
479 getTaskManagerFieldsObj)
480 if cookieData
is not None:
481 getTaskManagerFieldsEvent.cookie = cookieData
484 logger.debug(
"GetTaskManagerFields sent!")
493 taskManagerFields = event.eventObj
496 if len(taskManagerFields.fields) > 0:
498 if taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_FINISHED
or\
499 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_CRASHED
or\
500 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_TERMINATED
or\
501 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL
or\
502 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_NEW:
503 fetchTaskResultsObj = event.cookie[1].eventObj
505 taskGetDataRequest =
TaskGetDataRequest(fetchTaskResultsObj.id, fetchTaskResultsObj.type)
508 logger.debug(
"(line 508) Call sendToHCENodeAdmin() use Host: '%s' and Port '%s'",
509 str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]),
510 str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]))
512 rawResponse = self.
sendToHCENodeAdmin(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST],
513 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT],
516 logger.debug(
"Response received from EE for onFetchTaskResults!")
517 if eeResponseData.errorCode == EEResponseData.ERROR_CODE_OK
and \
518 EEResponseData.TASK_STATE_FINISHED:
520 logger.debug(
"Update EEResponseDtata object in the TasksDataManager container sent!")
521 insertEEDataEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.INSERT_EE_DATA, eeResponseData)
524 logger.debug(
"Update TaskFields in the TasksManager!")
529 eeResponseData.state = taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE]
530 logger.debug(
"Wrong task state " + str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE]) +
531 " for FetchTaskResults operation!")
534 eeResponseData.state = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
535 eeResponseData.errorCode = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
536 eeResponseData.errorMessage = EEResponseData.ERROR_MESSAGE_TASK_NOT_FOUND
537 eeResponseData.exitStatus = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
538 logger.error(
"Empty mandatory fields received from TasksManager:\n%s", Utils.varDump(taskManagerFields))
540 fetchTaskResultsReplyEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.FETCH_TASK_RESULTS_RESPONSE,
542 self.
reply(event.cookie[1], fetchTaskResultsReplyEvent)
551 taskManagerFields = event.eventObj
554 if len(taskManagerFields.fields) > 0
and taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]
is not None and\
555 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] !=
"" and\
556 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]
is not None and\
557 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] !=
"":
560 checkTaskStateObj = event.cookie[1].eventObj
562 taskCheckRequest =
TaskCheckRequest(checkTaskStateObj.id, checkTaskStateObj.type)
565 logger.debug(
"Make HCE node admin request!")
566 logger.debug(
"(line 566) Call sendToHCENodeAdmin() use Host: '%s' and Port '%s'",
567 str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]),
568 str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]))
570 rawResponse = self.
sendToHCENodeAdmin(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST],
571 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT],
574 if eeResponseData.errorCode == EEResponseData.ERROR_CODE_OK:
575 if eeResponseData.nodeHost ==
"" or eeResponseData.nodePort ==
"":
576 logger.error(str(vars(eeResponseData)))
577 logger.debug(
"Received Host or Port is empty!")
579 logger.debug(
"Update TaskFields in the TasksManager!")
583 eeResponseData.state = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
584 eeResponseData.errorCode = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
585 eeResponseData.errorMessage = EEResponseData.ERROR_MESSAGE_TASK_NOT_FOUND
586 eeResponseData.exitStatus = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
587 logger.error(
"Empty mandatory fields received from TasksManager:\n%s", Utils.varDump(taskManagerFields))
589 checkTaskStateReplyEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE_RESPONSE,
591 self.
reply(event.cookie[1], checkTaskStateReplyEvent)
592 logger.debug(
"Check task reply event sent!")
601 if deleteTaskObj.action == EventObjects.DeleteTask.ACTION_DELETE_TASK_DATA:
604 elif deleteTaskObj.action == EventObjects.DeleteTask.ACTION_TERMINATE_TASK_AND_DELETE_DATA:
610 ret.data[
"cleanup"] = DRCE_CONSTS.TERMINATE_DATA_SAVE
620 if state == EEResponseData.TASK_STATE_FINISHED
or\
621 state == EEResponseData.TASK_STATE_CRASHED
or\
622 state == EEResponseData.TASK_STATE_TERMINATED
or\
623 state == EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL
or\
624 state == EEResponseData.TASK_STATE_DELETED
or\
625 state == EEResponseData.TASK_STATE_UNDEFINED
or\
626 state == EEResponseData.TASK_STATE_SET_ERROR
or\
627 state == EEResponseData.TASK_STATE_NOT_FOUND
or\
628 state == EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED
or\
629 action == EventObjects.DeleteTask.ACTION_TERMINATE_TASK_AND_DELETE_DATA:
630 if action != EventObjects.DeleteTask.ACTION_DELETE_ON_DTM:
640 taskManagerFields = event.eventObj
641 deleteTaskObj = event.cookie[1]
644 if len(taskManagerFields.fields) > 0:
645 eeResponseData =
None 646 deletedTasksState = taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE]
if \
647 DTM_CONSTS.DRCE_FIELDS.STATE
in taskManagerFields.fields
else None 649 if taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]
is not None and\
650 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] !=
"" and\
651 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]
is not None and\
652 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] !=
"":
656 logger.debug(
"Send TaskDeleteRequest to HCE node Admin API, taskId=" + str(deleteTaskObj.deleteTaskId))
657 logger.debug(
"(line 657) Call sendToHCENodeAdmin() use Host: '%s' and Port '%s'",
658 str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]),
659 str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]))
661 rawResponse = self.
sendToHCENodeAdmin(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST],
662 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT],
664 logger.debug(
"TaskDeleteequest rawResponse=[" + rawResponse +
"]")
665 if rawResponse !=
"":
667 logger.debug(
"TaskDeleteRequest response taskId=" + str(deleteTaskObj.deleteTaskId) + \
668 ", state:" + str(eeResponseData.state) + \
669 ", exitStatus:" + str(eeResponseData.exitStatus) + \
670 ", stdout: " + eeResponseData.stdout +
", stderr:" + eeResponseData.stderr)
671 eeResponseData.id = deleteTaskObj.id
673 if eeResponseData.state != EEResponseData.TASK_STATE_IN_PROGRESS
and\
674 eeResponseData.state != EEResponseData.TASK_STATE_NEW:
675 logger.debug(
"Task state was substituted from " + str(eeResponseData.state) +
" to " + \
676 str(EEResponseData.TASK_STATE_DELETED) +
", taskId=" + str(deleteTaskObj.deleteTaskId))
679 eeResponseData.nodeName =
"" 680 eeResponseData.nodeHost =
"" 681 eeResponseData.nodePort = 0
682 eeResponseData.state = EEResponseData.TASK_STATE_DELETED
683 if eeResponseData.errorCode > 0:
684 logger.debug(
"TaskDelete request response error:" + str(eeResponseData.errorCode) +
" : " + \
685 eeResponseData.errorMessage)
686 eeResponseData.state = EEResponseData.TASK_STATE_SET_ERROR
687 eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
688 eeResponseData.deleteTaskState =
None 690 eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
691 eeResponseData.deleteTaskState = eeResponseData.state
695 eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
696 eeResponseData.deleteTaskState = EEResponseData.TASK_STATE_DELETED
697 eeResponseData.state = EEResponseData.TASK_STATE_DELETED
698 logger.error(
"TaskDeleteRequest response fault, taskId=" + str(deleteTaskObj.deleteTaskId) +
"!")
701 eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
702 if deleteTaskObj.action == EventObjects.DeleteTask.ACTION_DELETE_ON_DTM:
703 eeResponseData.state = EEResponseData.TASK_STATE_DELETED
704 eeResponseData.deleteTaskState = EEResponseData.TASK_STATE_DELETED
705 logger.debug(
"Deleted task only on DTM")
708 eeResponseData.state = EEResponseData.TASK_STATE_SET_ERROR
709 logger.debug(
"Deleted task " + str(eeResponseData.deleteTaskId) +
" has bad[Not deleted state] " +
710 str(deletedTasksState) +
" delete error")
713 msg =
"Host or Port is empty! Set state as TERMINATED to push delete task, deleteTaskObj.id=" + \
714 str(deleteTaskObj.id) +
", deleteTaskObj.deleteTaskId=" + str(deleteTaskObj.deleteTaskId)
716 eeResponseData.errorCode = EEResponseData.ERROR_CODE_TASK_NOT_FOUND
717 eeResponseData.errorMessage = msg
718 eeResponseData.deleteTaskId = deleteTaskObj.deleteTaskId
719 eeResponseData.deleteTaskState = EEResponseData.TASK_STATE_TERMINATED
720 eeResponseData.state = EEResponseData.TASK_STATE_TERMINATED
723 if eeResponseData
is not None:
726 logger.debug(
"Update TaskFields in the TasksManager!")
728 logger.error(
"Empty fields received from TasksManager for DeleteTask!")
733 generalResponseObj.errorCode = errCode
734 generalResponseObj.errorMessage = errMessage
736 return generalResponseObj
744 taskManagerFields = event.eventObj
746 generalResponseObj =
None 747 hceNodeAdminRequestState = EEResponseData.ERROR_CODE_OK
750 if len(taskManagerFields.fields) > 0:
752 if taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_FINISHED
or\
753 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_CRASHED
or\
754 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_TERMINATED
or\
755 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL
or\
756 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_DELETED
or\
757 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_UNDEFINED
or\
758 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE] == EEResponseData.TASK_STATE_NOT_FOUND:
759 deleteTaskResultsObj = event.cookie[1].eventObj
761 logger.info(
"Send TaskGetDataRequest with DELETE data, taskId=" + str(deleteTaskResultsObj.id))
766 logger.debug(
"(line 766) Call sendToHCENodeAdmin() use Host: '%s' and Port '%s'",
767 str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST]),
768 str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT]))
770 rawResponse = self.
sendToHCENodeAdmin(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST],
771 taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT],
773 logger.info(
"TaskDeleteResultsRequest rawResponse=[" + rawResponse +
"]")
774 if rawResponse !=
"":
776 if eeResponseData.errorCode == EEResponseData.ERROR_CODE_OK:
777 logger.info(
"Response received from EE")
780 EventObjects.DeleteTaskResults.DRCE_ERROR_MESSAGE,
781 "DRCE error error=" + str(eeResponseData.errorCode))
784 EventObjects.DeleteTaskResults.EMPRY_RAW_ERROR_MESSAGE,
"Empty rawResponse")
788 EventObjects.DeleteTaskResults.TASK_STATE_ERROR_MESSAGE,
789 "Wrong task state " + str(taskManagerFields.fields[DTM_CONSTS.DRCE_FIELDS.STATE]) +
793 EventObjects.DeleteTaskResults.TASK_NOT_FOUND_ERROR_MESSAGE,
794 "Empty fields from TasksManager for DeleteTaskResults, possible task not found!")
796 if generalResponseObj ==
None:
797 logger.debug(
"Send DELETE_EE_DATA request to the TasksDataManager!")
799 deleteEEResponseDataEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_EE_DATA, deleteEEResponseDataObj)
800 deleteEEResponseDataEvent.cookie = (
"processDeleteTaskResults", event.cookie[1], hceNodeAdminRequestState)
802 logger.debug(
"Sent request DeleteEEResponseDataObj to TasksDataManager!")
804 deleteTaskResultsReplyEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK_RESULTS_RESPONSE,
806 self.
reply(event.cookie[1], deleteTaskResultsReplyEvent)
807 logger.info(
"Send response GeneralResponse to ClientInterfaceService!")
816 if event.cookie
is not None and event.cookie[0] ==
"onFetchTaskResults":
820 if event.cookie
is not None and event.cookie[0] ==
"onCheckTaskState":
824 if event.cookie
is not None and event.cookie[0] ==
"onFetchTaskDataResponse":
828 if event.cookie
is not None and event.cookie[0] ==
"onDeleteTaskResults":
831 except Exception
as err:
832 tbi = Utils.getTracebackInfo()
833 logger.error(
"Exception: " + str(err.message) +
"\n" + tbi)
843 generalResponse = event.eventObj
844 if generalResponse.errorCode != EventObjects.GeneralResponse.ERROR_OK:
846 except Exception
as err:
847 tbi = Utils.getTracebackInfo()
848 logger.error(
"Exception: " + str(err.message) +
"\n" + tbi)
858 generalResponse = event.eventObj
859 if generalResponse.errorCode != EventObjects.GeneralResponse.ERROR_OK:
861 except Exception
as err:
862 tbi = Utils.getTracebackInfo()
863 logger.error(
"Exception: " + str(err.message) +
"\n" + tbi)
873 generalResponse = event.eventObj
874 if generalResponse.errorCode != EventObjects.GeneralResponse.ERROR_OK:
877 if event.cookie
is not None and event.cookie[0] ==
"processDeleteTaskResults":
880 generalResponseObj.statuses = (event.cookie[2], generalResponse.errorCode)
881 if generalResponse.errorCode != EventObjects.GeneralResponse.ERROR_OK
or\
882 event.cookie[2] != EEResponseData.ERROR_CODE_OK:
886 deleteTaskResultsReplyEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK_RESULTS_RESPONSE,
888 self.
reply(event.cookie[1], deleteTaskResultsReplyEvent)
889 except Exception
as err:
890 tbi = Utils.getTracebackInfo()
891 logger.error(
"Exception: " + str(err.message) +
"\n" + tbi)
def convertToEEResponse(self, response)
def reply(self, event, reply_event)
wrapper for sending event in reply for event
UpdateTaskFields event object, for update task fields operation.
def processCheckTaskState(self, event)
def onUpdateTasksFieldsResponse(self, event)
int OPERATION_FETCH_RESULTS
string ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION
def makeRequest(self, node, message, commandTimeout=None)
makeRequest main class method, it gets node and message params, interact with transport layer...
string ERROR_HCE_ADMIN_REQUEST_ERROR
string ERROR_INSERT_EE_DATA
string CONFIG_DRCE_TIMEOUT
def process(self, event)
process event call the event handler method that was set by user or on_unhandled_event method if not ...
NodeManagerRequest class contents all data needed for admin level's request sending.
wrapper for TaskExecuteStruct
string CONFIG_HCE_NODE_ADMIN_TIMEOUT
NewTask event object, defines the Task object fields.
GeneralResponse event object, represents general state response for multipurpose usage.
def processDeleteTask(self, event)
def processDeleteTaskResults(self, event)
Command class contents "commad" data and processing methods.
string ERROR_DELETE_TASK_RESULTS_MESSAGE
def onFetchTaskDataResponse(self, event)
def processUpdateTaskFields(self, operationType, eeResponseData, cookie=None)
string CONFIG_TASKS_MANAGER_CLIENT
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def createTaskDeleteRequest(self, deleteTaskObj)
def addConnection(self, name, connection)
DeleteTask event object, to delete task from DTM application and from EE.
def onExecuteTask(self, event)
This is app base class for management server connection end-points and parallel transport messages pr...
int OPERATION_DELETE_TASK
wrapper for Session fields array of execute task
def onGetTaskManagerFieldsResponse(self, event)
def processFetchTaskResults(self, event)
def createGeneralResponse(self, errCode, errMessage, errLog)
UIDGenerator is used to generate unique message id.
def sendToDRCERouter(self, request)
GetTaskManagerFields event object, for get task fields values operation.
Class hides routines of bulding connection objects.
def send(self, connect_name, event)
send event
string CONFIG_TASKS_MANAGER_DATA_CLIENT
string ERROR_UPDATE_TASKS_FIELDS
int OPERATION_CHECK_STATE
string ERROR_EE_RESPONSE_OBJECT_TYPE_OR_RESPONSE_ERROR
DeleteEEResponseData event object, to delete EE response data from the storage.
def __init__(self, configParser, connectionBuilderLight=None)
FetchTaskData event object, to fetch task data from the storage.
def sendGetTaskManagerFieldsRequest(self, taskId, cookieData=None)
string ERROR_WRONG_OBJECT_TYPE
def sendToHCENodeAdmin(self, host, port, messageParameters)
int ERROR_DELETE_TASK_RESULTS
def onDeleteTaskResults(self, event)
def convertToTaskResponse(self, rawResponse)
def onCheckTaskState(self, event)
Convertor which used to convert Task*Reques to json and TaskResponse from json.
EEResponseData event object, store task results data, returned from EE.
def onDeleteEEDataResponse(self, event)
def checkDelTaskState(self, state, action)
def onInsertEEDataResponse(self, event)
Get task's data request.
def onFetchTaskResults(self, event)
clientTasksDataManagerName
def processNewTask(self, newTaskObj)
string ERROR_HCE_RESPONSE_PROCESSING_SPLIT