HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
TasksStateUpdateService.py
Go to the documentation of this file.
1 '''
2 HCE project, Python bindings, Distributed Tasks Manager application.
3 TasksStateUpdateService object and related classes definitions.
4 This object acts as listener of updates of tasks states inside DRCE Execution Environment.
5 The DRCE Functional objects callback connects to this service and send update message when task changes its state.
6 This call initiated by DRCE node FO watchdog.
7 
8 
9 @package: dtm
10 @author bgv bgv.hce@gmail.com
11 @link: http://hierarchical-cluster-engine.com/
12 @copyright: Copyright © 2013-2014 IOIX Ukraine
13 @license: http://hierarchical-cluster-engine.com/license/
14 @since: 0.1
15 '''
16 
17 
18 from datetime import datetime
19 import logging
20 import time
21 from app.BaseServerManager import BaseServerManager
22 from app.LogFormatter import LogFormatterEvent
23 from drce.CommandConvertor import CommandConvertor
24 from transport.Connection import ConnectionParams
25 from transport.ConnectionBuilder import ConnectionBuilder
26 from transport.ConnectionBuilderLight import ConnectionBuilderLight
27 from transport.IDGenerator import IDGenerator
28 from transport.Response import Response
29 from transport.UIDGenerator import UIDGenerator
30 import app.Utils as Utils # pylint: disable=F0401
31 import dtm.EventObjects
32 import app.SQLCriterions
33 
34 import dtm.Constants as DTM_CONSTS
35 import transport.Consts as TRANSPORT_CONSTS
36 
37 #Logger initialization
38 #logger = logging.getLogger(__name__)
39 logger = logging.getLogger(DTM_CONSTS.LOGGER_NAME)
40 
41 
42 
50 
51  #Configuration settings options names
52  CONFIG_SERVER_HOST = "serverHost"
53  CONFIG_SERVER_PORT = "serverPort"
54  CONFIG_TASKS_MANAGER_CLIENT = "clientTasksManager"
55  CONFIG_EE_MANAGER = "clientExecutionEnvironmentManager"
56  CONFIG_CHECK_STATE_NUM = "checkStateNum"
57  CONFIG_CHECK_STATE_INTERVAL = "checkStateInterval"
58  CONFIG_FETCH_TASK_NUM = "checkStateTasks"
59 
60  ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION = "Update request error, possible wrong json format!"
61  ERROR_TASK_FIELDS_UPDATE = "Update of task fields response error."
62 
63  UPDATE_TYPE_TASK_STATE = 100
64  UPDATE_TYPE_RESOURCES_STATE = 101
65 
66 
72  def __init__(self, configParser, connectionBuilderLight=None, connectionBuilder=None):
73  super(TasksStateUpdateService, self).__init__()
74  self.expect_response = Response(["sock_identity", "id", "body"])
75 
76  #Instantiate the connection builder light if not set
77  if connectionBuilderLight is None:
78  connectionBuilderLight = ConnectionBuilderLight()
79  #Instantiate the connection builder if not set
80  if connectionBuilder is None:
81  connectionBuilder = ConnectionBuilder(IDGenerator())
82 
83  className = self.__class__.__name__
84 
85  #Get configuration settings
86  self.clientTasksManagerName = configParser.get(className, self.CONFIG_TASKS_MANAGER_CLIENT)
87  self.serverHost = configParser.get(className, self.CONFIG_SERVER_HOST)
88  self.serverPort = configParser.get(className, self.CONFIG_SERVER_PORT)
89  self.clientExecutionEnvironmentManager = configParser.get(className, self.CONFIG_EE_MANAGER)
90 
91  #Create connections and raise bind or connect actions for correspondent connection type
92  tasksManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT, self.clientTasksManagerName)
93  tcpServerConnection = connectionBuilder.build(TRANSPORT_CONSTS.DATA_CONNECT_TYPE,
95  TRANSPORT_CONSTS.SERVER_CONNECT)
96  eeManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
98 
99  #Add connections to the polling set
100  self.addConnection(self.clientTasksManagerName, tasksManagerConnection)
101  self.addConnection(self.serverHost, tcpServerConnection)
102  self.addConnection(self.clientExecutionEnvironmentManager, eeManagerConnection)
103 
104  #Set event handler for EXECUTE_TASK event
105  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS_RESPONSE, self.onUpdateTaskFieldsResponse)
106  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.SERVER_TCP_RAW, self.onTCPServerRequest)
107  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, self.onReceiveAllTaskIds)
108 
109  #Initialize unique Id generator
111  #Initialize DRCE commands convertor
113 
114  #variables for auto send CheckState to EEManager
115 
118  self.checkStateInterval = configParser.getint(className, self.CONFIG_CHECK_STATE_INTERVAL)
119  self.checkStateNum = configParser.getint(className, self.CONFIG_CHECK_STATE_NUM)
120  self.fetchTaskNum = configParser.getint(className, self.CONFIG_FETCH_TASK_NUM)
121 
122  # Set connections poll timeout, defines period of tasks state checks
124  configParser.getint(className, self.POLL_TIMEOUT_CONFIG_VAR_NAME)
125 
126 
129 
130  logger.debug("Construction finished!")
131 
132 
133 
134 
137  def onUpdateTaskFieldsResponse(self, event):
138  #Get task Id from event
139  generalResponse = event.eventObj
140  #Log error
141  if generalResponse.errorCode != dtm.EventObjects.GeneralResponse.ERROR_OK:
142  logger.error(LogFormatterEvent(event, [], self.ERROR_TASK_FIELDS_UPDATE))
143 
144  logger.debug("Update tasks state response finished!")
145 
146 
147 
148 
152  def onTCPServerRequest(self, event):
153  logger.debug("Update request received!")
154 
155  #Get request raw buffer from eventObj and convert if to the DRCE response object
156  rawDRCEJsonResponse = event.eventObj.get_body()
157  try:
158  #Convert DRCE jason protocol response to TaskResponse object
159  taskResponse = self.drceCommandConvertor.from_json(rawDRCEJsonResponse)
160  logger.debug("rawDRCEJsonResponse:\n" + str(rawDRCEJsonResponse) + "\nObject:\n" + Utils.varDump(taskResponse))
161  #Update task data on TasksManager object
162  self.processUpdateTaskFields(taskResponse)
163  except Exception, e:
164  logger.error(self.ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION + " : " + str(e.message) + " : \n" + \
165  rawDRCEJsonResponse)
166 
167  logger.debug("TCP update request processing finished!")
168 
169 
170 
171 
174  def onReceiveAllTaskIds(self, event):
175  logger.debug("Available tasks list from TasksManager received, items " + str(len(event.eventObj.ids)))
176  self.taskIdsForCheckState += event.eventObj.ids
177  if len(self.taskIdsForCheckState) > 0:
178  self.tryCheckTasksState()
179 
180 
181 
182 
183  def on_poll_timeout(self):
184  logger.debug("Possible time to check state of tasks, interval " + str(self.checkStateInterval) + "!")
185  if time.time() - self.lastCheckStateTs > self.checkStateInterval:
186  logger.debug("Now time to check state of tasks, interval " + str(self.checkStateInterval) + "!")
187  self.lastCheckStateTs = time.time()
188  self.tryCheckTasksState()
189 
190 
191 
192 
195  if self.taskIdsForCheckState:
196  logger.debug("Tasks to check " + str(len(self.taskIdsForCheckState)))
197  for taskId in self.taskIdsForCheckState[:self.checkStateNum]:
198  req = dtm.EventObjects.CheckTaskState(taskId)
199  event = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE, req)
200  event.cookie = self.__class__.__name__
201  self.send(self.clientExecutionEnvironmentManager, event)
202  logger.debug("Task " + str(taskId) + " sent to check state to EEManager")
204  else:
205  logger.debug("Get available tasks list from TasksManager")
207  if req.criterions is not None:
208  req.criterions[app.SQLCriterions.CRITERION_WHERE] = "deleteTaskId = 0 AND state < 100"
209  event = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS, req)
210  self.send(self.clientTasksManagerName, event)
211 
212 
213 
214 
217  def processUpdateTaskFields(self, taskResponse):
218  #taskResponse is an instance of drce.Commands.TaskResponse
219  #it may contains 0 or many response items
220  if len(taskResponse.items) == 0:
221  logger.error("Received empty update request from drce node!")
222  else:
223  # Tasks state update notification
224  for resItem in taskResponse.items:
225  # If no errors in response from EE
226  # TODO: strange, in case of error still continue to use resItem object...
227  if resItem.error_code != dtm.EventObjects.EEResponseData.ERROR_CODE_OK:
228  logger.error("Update request item from node %s, error: %s :%s",
229  resItem.node, resItem.error_code, resItem.error_message)
230  # Update task's fields object for the TasksManager
231  updateTaskFields = dtm.EventObjects.UpdateTaskFields(resItem.id)
232  # Fill fields to update
233  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.HOST] = resItem.host
234  updateTaskFields.fields[DTM_CONSTS.DRCE_FIELDS.PORT] = resItem.port
235 
236  if resItem.type == self.UPDATE_TYPE_RESOURCES_STATE:
237  # Resources state update notification, the "state" field possible is not valid
238  logger.debug("Resources state update notification")
239  updateTaskFields.fields["state"] = None
240  else:
241  # Task state update notification, the "state" field is valid
242  logger.debug("Task state update notification")
243  updateTaskFields.fields["state"] = resItem.state
244 
245  updateTaskFields.fields["pId"] = resItem.pid
246  updateTaskFields.fields["nodeName"] = resItem.node
247  updateTaskFields.fields["pTime"] = resItem.time
248  if resItem.state == dtm.EventObjects.EEResponseData.TASK_STATE_NEW:
249  updateTaskFields.fields["rDate"] = datetime.now()
250  else:
251  if resItem.state == dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED:
252  updateTaskFields.fields["fDate"] = datetime.now()
253  #Fix due the TasksManager does not recognize UNDEFINED or TERMINATED state
254  if resItem.state == dtm.EventObjects.EEResponseData.TASK_STATE_UNDEFINED or\
255  resItem.state == dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED:
256  updateTaskFields.fields["state"] = dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED
257 
258  if DTM_CONSTS.DRCE_FIELDS.URRAM in resItem.fields:
259  updateTaskFields.fields["uRRAM"] = resItem.fields[DTM_CONSTS.DRCE_FIELDS.URRAM]
260  if DTM_CONSTS.DRCE_FIELDS.UVRAM in resItem.fields:
261  updateTaskFields.fields["uVRAM"] = resItem.fields[DTM_CONSTS.DRCE_FIELDS.UVRAM]
262  if DTM_CONSTS.DRCE_FIELDS.UCPU in resItem.fields:
263  updateTaskFields.fields["uCPU"] = resItem.fields[DTM_CONSTS.DRCE_FIELDS.UCPU]
264  if DTM_CONSTS.DRCE_FIELDS.UTHREADS in resItem.fields:
265  updateTaskFields.fields["uThreads"] = resItem.fields[DTM_CONSTS.DRCE_FIELDS.UTHREADS]
266 
267  #Create update event
268  updateTaskFieldsEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS, updateTaskFields)
269  #Send update event to TasksManager
270  self.send(self.clientTasksManagerName, updateTaskFieldsEvent)
271  logger.debug("Update TasksManager fields for task " + str(resItem.id) + " finished!")
272 
273  logger.debug("Update TasksManager fields for all tasks finished!")
UpdateTaskFields event object, for update task fields operation.
Log formatter event, defines the object to format message string.
Definition: LogFormatter.py:16
def onUpdateTaskFieldsResponse(self, event)
onUpdateTaskFieldsResponse event handler
def processUpdateTaskFields(self, taskResponse)
Send UpdateTasksData event to the TasksManager.
It&#39;s a wrapper similar to zmsg.hpp in sense of encapsulation of hce response message structure...
Definition: Response.py:20
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
lastCheckStateTs
timestamp of last send CheckState request
def onTCPServerRequest(self, event)
onTCPServerRequest handler of TCP server requests.
CheckTaskState event object, for check task status inside EE.
UIDGenerator is used to generate unique message id.
Definition: UIDGenerator.py:14
The TasksStateUpdateService class, is a listener of tasks state updates from DRCE FO of cluster nodes...
def __init__(self, configParser, connectionBuilderLight=None, connectionBuilder=None)
constructor initialize fields
Class hides routines of bulding connection objects.
def send(self, connect_name, event)
send event
def on_poll_timeout(self)
on_poll_timeout handler, now just send CheckState to EEManager
def onReceiveAllTaskIds(self, event)
onReceiveAllTaskIds handler for receive running tasks from TasksManager
def tryCheckTasksState(self)
send CheckState message to EEManager if don&#39;t have cached task id, then fetch from TasksManager ...
The builder is used to encapsulation routine of creation various type of connections.
FetchAvailabelTaskIds event object, for fetch available task id.
IDGenerator is used to generate unique id for connections.
Definition: IDGenerator.py:15
Convertor which used to convert Task*Reques to json and TaskResponse from json.