2 HCE project, Python bindings, Distributed Tasks Manager application. 3 TasksStateUpdateService object and related classes definitions. 4 This object acts as listener of updates of tasks states inside DRCE Execution Environment. 5 The DRCE Functional objects callback connects to this service and send update message when task changes its state. 6 This call initiated by DRCE node FO watchdog. 10 @author bgv bgv.hce@gmail.com 11 @link: http://hierarchical-cluster-engine.com/ 12 @copyright: Copyright © 2013-2014 IOIX Ukraine 13 @license: http://hierarchical-cluster-engine.com/license/ 18 from datetime
import datetime
39 logger = logging.getLogger(DTM_CONSTS.LOGGER_NAME)
52 CONFIG_SERVER_HOST =
"serverHost" 53 CONFIG_SERVER_PORT =
"serverPort" 54 CONFIG_TASKS_MANAGER_CLIENT =
"clientTasksManager" 55 CONFIG_EE_MANAGER =
"clientExecutionEnvironmentManager" 56 CONFIG_CHECK_STATE_NUM =
"checkStateNum" 57 CONFIG_CHECK_STATE_INTERVAL =
"checkStateInterval" 58 CONFIG_FETCH_TASK_NUM =
"checkStateTasks" 60 ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION =
"Update request error, possible wrong json format!" 61 ERROR_TASK_FIELDS_UPDATE =
"Update of task fields response error." 63 UPDATE_TYPE_TASK_STATE = 100
64 UPDATE_TYPE_RESOURCES_STATE = 101
72 def __init__(self, configParser, connectionBuilderLight=None, connectionBuilder=None):
73 super(TasksStateUpdateService, self).
__init__()
77 if connectionBuilderLight
is None:
80 if connectionBuilder
is None:
83 className = self.__class__.__name__
92 tasksManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT, self.
clientTasksManagerName)
93 tcpServerConnection = connectionBuilder.build(TRANSPORT_CONSTS.DATA_CONNECT_TYPE,
95 TRANSPORT_CONSTS.SERVER_CONNECT)
96 eeManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
130 logger.debug(
"Construction finished!")
139 generalResponse = event.eventObj
141 if generalResponse.errorCode != dtm.EventObjects.GeneralResponse.ERROR_OK:
144 logger.debug(
"Update tasks state response finished!")
153 logger.debug(
"Update request received!")
156 rawDRCEJsonResponse = event.eventObj.get_body()
160 logger.debug(
"rawDRCEJsonResponse:\n" + str(rawDRCEJsonResponse) +
"\nObject:\n" + Utils.varDump(taskResponse))
167 logger.debug(
"TCP update request processing finished!")
175 logger.debug(
"Available tasks list from TasksManager received, items " + str(len(event.eventObj.ids)))
184 logger.debug(
"Possible time to check state of tasks, interval " + str(self.
checkStateInterval) +
"!")
186 logger.debug(
"Now time to check state of tasks, interval " + str(self.
checkStateInterval) +
"!")
199 event = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE, req)
200 event.cookie = self.__class__.__name__
202 logger.debug(
"Task " + str(taskId) +
" sent to check state to EEManager")
205 logger.debug(
"Get available tasks list from TasksManager")
207 if req.criterions
is not None:
208 req.criterions[app.SQLCriterions.CRITERION_WHERE] =
"deleteTaskId = 0 AND state < 100" 209 event = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS, req)
220 if len(taskResponse.items) == 0:
221 logger.error(
"Received empty update request from drce node!")
224 for resItem
in taskResponse.items:
227 if resItem.error_code != dtm.EventObjects.EEResponseData.ERROR_CODE_OK:
228 logger.error(
"Update request item from node %s, error: %s :%s",
229 resItem.node, resItem.error_code, resItem.error_message)
233 updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] = resItem.host
234 updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] = resItem.port
238 logger.debug(
"Resources state update notification")
239 updateTaskFields.fields[
"state"] =
None 242 logger.debug(
"Task state update notification")
243 updateTaskFields.fields[
"state"] = resItem.state
245 updateTaskFields.fields[
"pId"] = resItem.pid
246 updateTaskFields.fields[
"nodeName"] = resItem.node
247 updateTaskFields.fields[
"pTime"] = resItem.time
248 if resItem.state == dtm.EventObjects.EEResponseData.TASK_STATE_NEW:
249 updateTaskFields.fields[
"rDate"] = datetime.now()
251 if resItem.state == dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED:
252 updateTaskFields.fields[
"fDate"] = datetime.now()
254 if resItem.state == dtm.EventObjects.EEResponseData.TASK_STATE_UNDEFINED
or\
255 resItem.state == dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED:
256 updateTaskFields.fields[
"state"] = dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED
258 if DTM_CONSTS.DRCE_FIELDS.URRAM
in resItem.fields:
259 updateTaskFields.fields[
"uRRAM"] = resItem.fields[DTM_CONSTS.DRCE_FIELDS.URRAM]
260 if DTM_CONSTS.DRCE_FIELDS.UVRAM
in resItem.fields:
261 updateTaskFields.fields[
"uVRAM"] = resItem.fields[DTM_CONSTS.DRCE_FIELDS.UVRAM]
262 if DTM_CONSTS.DRCE_FIELDS.UCPU
in resItem.fields:
263 updateTaskFields.fields[
"uCPU"] = resItem.fields[DTM_CONSTS.DRCE_FIELDS.UCPU]
264 if DTM_CONSTS.DRCE_FIELDS.UTHREADS
in resItem.fields:
265 updateTaskFields.fields[
"uThreads"] = resItem.fields[DTM_CONSTS.DRCE_FIELDS.UTHREADS]
268 updateTaskFieldsEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS, updateTaskFields)
271 logger.debug(
"Update TasksManager fields for task " + str(resItem.id) +
" finished!")
273 logger.debug(
"Update TasksManager fields for all tasks finished!")
string CONFIG_SERVER_PORT
UpdateTaskFields event object, for update task fields operation.
string CONFIG_FETCH_TASK_NUM
string CONFIG_CHECK_STATE_NUM
clientExecutionEnvironmentManager
string ERROR_TASK_FIELDS_UPDATE
def onUpdateTaskFieldsResponse(self, event)
onUpdateTaskFieldsResponse event handler
def processUpdateTaskFields(self, taskResponse)
Send UpdateTasksData event to the TasksManager.
int UPDATE_TYPE_RESOURCES_STATE
string CONFIG_CHECK_STATE_INTERVAL
string POLL_TIMEOUT_CONFIG_VAR_NAME
It's a wrapper similar to zmsg.hpp in sense of encapsulation of hce response message structure...
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
lastCheckStateTs
timestamp of last send CheckState request
def onTCPServerRequest(self, event)
onTCPServerRequest handler of TCP server requests.
taskIdsForCheckState
list of task ids for check state
CheckTaskState event object, for check task status inside EE.
UIDGenerator is used to generate unique message id.
The TasksStateUpdateService class, is a listener of tasks state updates from DRCE FO of cluster nodes...
def __init__(self, configParser, connectionBuilderLight=None, connectionBuilder=None)
constructor initialize fields
Class hides routines of bulding connection objects.
def send(self, connect_name, event)
send event
string CONFIG_TASKS_MANAGER_CLIENT
string ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION
def on_poll_timeout(self)
on_poll_timeout handler, now just send CheckState to EEManager
def onReceiveAllTaskIds(self, event)
onReceiveAllTaskIds handler for receive running tasks from TasksManager
def tryCheckTasksState(self)
send CheckState message to EEManager if don't have cached task id, then fetch from TasksManager ...
The builder is used to encapsulation routine of creation various type of connections.
FetchAvailabelTaskIds event object, for fetch available task id.
IDGenerator is used to generate unique id for connections.
Convertor which used to convert Task*Reques to json and TaskResponse from json.
string CONFIG_SERVER_HOST