HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ResourcesStateMonitor.py
Go to the documentation of this file.
1 '''
2 HCE project, Python bindings, Distributed Tasks Manager application.
3 ResourcesStateMonitor object and related classes definitions.
4 This object acts as monitor of resources of HCE Cluster.
5 Periodically reads cluster schema from json schema file and make DRCE synch task request for all "replica" or
6 "shard" nodes. The DRCE task request is prepared and stored in the json external file that read each time when
7 monitoring cycle happened. The DRCE synch task acts as set of Linux commands and collects system information like
8 AVG CPU, disk, memory usage and so on. This information parsed and some system resource usage indicators detected.
9 This indicators updated the resource database that used for tasks planning.
10 
11 
12 @package: dtm
13 @author bgv bgv.hce@gmail.com
14 @link: http://hierarchical-cluster-engine.com/
15 @copyright: Copyright © 2013-2014 IOIX Ukraine
16 @license: http://hierarchical-cluster-engine.com/license/
17 @since: 0.1
18 '''
19 
20 
21 import json
22 import logging
23 import re
24 
25 import admin.Command
26 import admin.Node
28 from app.BaseServerManager import BaseServerManager
29 from app.LogFormatter import LogFormatterEvent
30 from drce.CommandConvertor import CommandConvertor
31 import dtm.EventObjects
32 from transport.ConnectionBuilderLight import ConnectionBuilderLight
33 from transport.UIDGenerator import UIDGenerator
34 import dtm.Constants as DTM_CONSTS
35 import transport.Consts as TRANSPORT_CONSTS
36 import app.Utils
37 from app.Utils import varDump
38 
39 # Logger initialization
40 # logger = logging.getLogger(__name__)
41 logger = logging.getLogger(DTM_CONSTS.LOGGER_NAME)
42 
43 
44 
45 # #The ResourcesStateMonitor class, is a monitor of resources of HCE Cluster.
46 #
47 # This object acts as monitor of resources of HCE Cluster.
48 # Periodically reads cluster schema from json schema file and make DRCE synch task request for all "replica" or
49 # "shard" nodes. The DRCE task request is prepared and stored in the json external file that read each time when
50 # monitoring cycle happened. The DRCE synch task acts as set of Linux commands and collects system information like
51 # AVG CPU, disk, memory usage and so on. This information parsed and some system resource usage indicators detected.
52 # This indicators updated the resource database that used for tasks planning.
53 #
55 
56  # Configuration settings options names
57  CONFIG_RESOURCES_MANAGER_CLIENT = "clientResourcesManager"
58  CONFIG_HCE_NODE_ADMIN_TIMEOUT = "HCENodeAdminTimeout"
59  CONFIG_UPDATE_RESOURCES_DRCE_JSON = "FetchResourcesStateDRCEJsonFile"
60  CONFIG_HCE_CLUSTER_SCHEMA_FILE = "HCEClusterSchemaFile"
61  CONFIG_POLLING_TIMEOUT = "PollingTimeout"
62 
63  ERROR_UPDATE_RESOURCES = "Not all resources updated!"
64  ERROR_JSON_DECODE = "Json decode error!"
65  ERROR_RESOURCES_LISTS_NOT_EQUAL = "Lists of sent and received messages not equal!"
66  ERROR_READ_FILE = "Error read file"
67  ERROR_READ_DRCE_JSON_FILE = "Error read DRCE update resources state request json file"
68  ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION = "HCE node Admin API response processing exception"
69  ERROR_HCE_CLUSTER_SCHEMA_STRUCTURE_WRONG = "Wrong structure of HCE cluster schema object"
70  ERROR_HCE_NODE_REQUEST_ERROR = "HCE node request error"
71  ERROR_HCE_RESPONSE_PROCESSING_FORMAT_ADMIN = "HCE node response format error, cant to split"
72  ERROR_HCE_RESPONSE_PROCESSING_NO_RESOURCE_IN_RESPONSE = "No resource data found in response or response with error"
73  ERROR_NO_ITEMS_IN_DRCE_RESPONSE = "No items in DRCE response"
74  ERROR_DRCE_RESPONSE_ERROR_CODE = "DRCE response with error"
75  ERROR_RE_PARSE_NO_MATCHES = "RE parse results from DRCE node, no matches!"
76 
77 
78  # #constructor
79  # initialize fields
80  #
81  # @param configParser config parser object
82  # @param connectBuilderLight connection builder light
83  #
84  def __init__(self, configParser, connectionBuilderLight=None):
85  super(ResourcesStateMonitor, self).__init__()
86 
87  # Instantiate the connection builder light if not set
88  if connectionBuilderLight == None:
89  connectionBuilderLight = ConnectionBuilderLight()
90 
91  className = self.__class__.__name__
92 
93  # Get configuration settings
94  self.clientResourcesManagerName = configParser.get(className, self.CONFIG_RESOURCES_MANAGER_CLIENT)
96  self.hceClusterSchemaFile = configParser.get(className, self.CONFIG_HCE_CLUSTER_SCHEMA_FILE)
97 
98  # Set connections poll timeout, defines period of HCE cluster monitoring cycle
99  # self.pollTimeout = configParser.getint(className, self.CONFIG_POLLING_TIMEOUT)
100  self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] = configParser.getint(className, self.CONFIG_POLLING_TIMEOUT)
101 
102  # Create connections and raise bind or connect actions for correspondent connection type
103  resourcesManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
105 
106  # Add connections to the polling set
107  self.addConnection(self.clientResourcesManagerName, resourcesManagerConnection)
108 
109  # Set event handler for EXECUTE_TASK event
110  self.setEventHandler(DTM_CONSTS.EVENT_TYPES.UPDATE_RESOURCES_DATA_RESPONSE, self.onUpdateResourcesDataResponse)
111 
112  # Initialize HCE node Admin API
113  self.hceNodeAdminTimeout = configParser.getint(className, self.CONFIG_HCE_NODE_ADMIN_TIMEOUT)
115 
116  # Initialize unique Id generator
118  # Initialize DRCE commands convertor
120  logger.debug("Constructor passed")
121 
122 
123  # #onUpdateResourcesDataResponse event handler
124  #
125  # @param event instance of Event object
127  # Get task Id from event
128  generalResponse = event.eventObj
129  # Get list of sent resources to update
130  resourcesToUpdate = event.cookie
131  if len(resourcesToUpdate) == len(generalResponse.statuses):
132  errorObjects = []
133  for i in range(len(generalResponse.statuses)):
134  if generalResponse.statuses[i] == False:
135  errorObjects.append(resourcesToUpdate[i])
136  logger.error(LogFormatterEvent(event, errorObjects, self.ERROR_UPDATE_RESOURCES))
137  else:
138  logger.error(LogFormatterEvent(event, [], self.ERROR_RESOURCES_LISTS_NOT_EQUAL + "\n" +
139  "send " + str(len(resourcesToUpdate)) + " returned " +
140  str(len(generalResponse.statuses))
141  ))
142 
143 
144 
145  # #Events wait timeout handler, for timeout state of the connections polling. Executes periodical processing of EE
146  # resources state monitoring
147  #
148  def on_poll_timeout(self):
149  # Initialize list of resources objects to update
150  resources = []
151 
152  # Get HCE Cluster schema
153  clusterSchemaObj = self.getObjectFromJsonFile(self.hceClusterSchemaFile)
154  # Get DRCE FO request to get resources state from file
155  drceRequestJson = self.loadFromFile(self.resourcesUpdateDRCERequestJsonFile)
156  if drceRequestJson is not None:
157  # Get connected data nodes from schema
158  nodes = self.getConnectedNodesFromSchema(clusterSchemaObj)
159  # For each DRCE node execute HCE node admin request
160  for node in nodes:
161  # Send request to HCE node Admin API
162  rawResponse = self.sendToHCENodeAdmin(node.host, node.port, drceRequestJson)
163  if rawResponse is not None:
164  try:
165  # Split admin response parts
166  logger.debug("Raw node response: " + str(rawResponse) + ", type = " + str(type(rawResponse)))
167  parts = rawResponse.split(admin.Constants.COMMAND_DELIM)
168  if len(parts) > 1:
169  # Convert DRCE jason protocol response to TaskResponse object
170  taskResponse = self.drceCommandConvertor.from_json(parts[1])
171  logger.debug("Received taskResponse object: " + str(vars(taskResponse)))
172  # Get resource info data from the TaskResponse object and create Resource object
173  resource = self.getResourceFromTaskResponse(taskResponse)
174  logger.debug("Received Resource object: " + str(varDump(resource)))
175  # Collect resource if valid data detected
176  if resource is not None:
177  resources.append(resource)
178  else:
180  else:
182  except Exception, e:
183  logger.error(self.ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION + " : " + str(e.message) + "\n" + \
185 
186  # Send update for collected resources objects
187  if len(resources) > 0:
188  self.sendUpdateResourceDataRequest(resources)
189  else:
190  logger.error(self.ERROR_READ_DRCE_JSON_FILE + " " + self.resourcesUpdateDRCERequestJsonFile)
191 
192 
193 
194  # #Get the Resource object created on the basis of information from the TaskResponse object of DRCE get resources data
195  # request
196  #
197  # @param taskResponseObjects The TaskResponse objects container
198  # @return the first Resource object if success or None if not
199  def getResourceFromTaskResponse(self, taskResponseObjects):
200  resource = None
201 
202  if len(taskResponseObjects.items) > 0:
203  # logger.debug("Received taskResponseObjects.items[0]:" + str(vars(taskResponseObjects.items[0])))
204 
205  if taskResponseObjects.items[0].error_code == 0 or taskResponseObjects.items[0].error_code > 0:
206  resource = dtm.EventObjects.Resource(taskResponseObjects.items[0].host + ":" +
207  str(taskResponseObjects.items[0].port))
208  resource.nodeName = taskResponseObjects.items[0].node
209  resource.state = dtm.EventObjects.Resource.STATE_ACTIVE
210  # logger.debug("Received stdout:\n" + str(taskResponseObjects.items[0].stdout))
211  reTemplate1 = r"---CPU LA---(.*)"\
212  "---cpu cores---(.*)"\
213  "---vmstat---(.*)"\
214  "---processes---(.*)"\
215  "---threads max---(.*)"\
216  "---threads actual---(.*)"\
217  "---RAM---(.*)"\
218  "---Disk---(.*)"\
219  "---uptime---(.*)"\
220  "---END---(.*)"
221  groupsItems = re.match (reTemplate1, taskResponseObjects.items[0].stdout, re.M | re.I | re.S)
222  if groupsItems:
223  # CPU LA
224  # cpuLA = int(float(groupsItems.group(1).split(" ")[0].lstrip()) * 100)
225  # print "cpuLA*100%=" + str(cpuLA)
226 
227  # CPU cores
228  cpuCores = int(groupsItems.group(2))
229  # print "cpuCores=" + str(cpuCores)
230  resource.cpuCores = cpuCores
231 
232  # CPU Load
233  lines = (' '.join(groupsItems.group(3).split("\n")[4].split())).split(' ')
234  cpuIdle = int(lines[14])
235  # logger.error("cpuIdle=" + str(cpuIdle))
236  cpuLoad = 100 - cpuIdle
237  if cpuLoad > 100:
238  cpuLoad = 100
239  else:
240  if cpuLoad < 0:
241  cpuLoad = 0
242  # print "cpuLoad=" + str(cpuLoad)
243  resource.cpu = cpuLoad
244 
245  # IO wait
246  lines = (' '.join(groupsItems.group(3).split("\n")[4].split())).split(' ')
247  cpuIO = int(lines[15])
248  # logger.error("cpuIO=" + str(cpuIO))
249  resource.io = cpuIO
250 
251  # Processes
252  processes = int(groupsItems.group(4).lstrip().rstrip())
253  # print "processes=" + str(processes)
254  resource.processes = processes
255 
256  # Threads max
257  # threadsMax = int(groupsItems.group(5).lstrip().rstrip())
258  # print "threadsMax=" + str(threadsMax)
259 
260  # Threads actual
261  threadsActual = int(groupsItems.group(6).lstrip().rstrip())
262  # print "threadsActual=" + str(threadsActual)
263  resource.threads = threadsActual
264 
265  # RAM
266  lines = groupsItems.group(7).split("\n")
267  # RAM total
268  ramTotal = int(lines[2][4: 18].lstrip()) * 1000
269  # print "[" + str(ramTotal) + "]"
270  resource.ramR = ramTotal
271  # RAM used
272  ramUsed = int(lines[2][19: 29].lstrip()) * 1000
273  # print "[" + str(ramUsed) + "]"
274  resource.ramRU = ramUsed
275  # RAM cached
276  ramCached = int(lines[2][63:].lstrip()) * 1000
277  # print "[" + str(ramCached) + "]"
278  resource.ramRU = resource.ramRU - ramCached
279 
280  # Swap total
281  swapTotal = int(lines[4][5: 18].lstrip()) * 1000
282  # print "swapTotal=" + str(swapTotal)
283  resource.swap = swapTotal
284  # Swap used
285  swapUsed = int(lines[4][19: 29].lstrip()) * 1000
286  # print "swapUsed=" + str(swapUsed)
287  resource.swapU = swapUsed
288 
289  # Disk
290  lines = groupsItems.group(8).split("\n")
291  lines = (' '.join(lines[2].split())).split(" ")
292  # Disk total
293  diskTotal = int(lines[1]) * 1024
294  # print "diskTotal=" + str(diskTotal)
295  resource.disk = diskTotal
296  # Disk used
297  diskUsed = int(lines[2]) * 1024
298  # print "diskUsed=" + str(diskUsed)
299  resource.diskU = diskUsed
300 
301  resource.ramV = 0
302  resource.ramVU = 0
303 
304  # print vars(resource)
305  else:
306  logger.error(self.ERROR_RE_PARSE_NO_MATCHES + ", node:" + resource.nodeName + "\nstdout:\n" +
307  taskResponseObjects.items[0].stdout + "\nstderr:\n" + taskResponseObjects.items[0].stderror)
308  resource = None
309  else:
310  logger.error(self.ERROR_DRCE_RESPONSE_ERROR_CODE + " : " + str(taskResponseObjects.items[0].error_code) + \
311  " : " + taskResponseObjects.items[0].error_message)
312  resource = None
313  else:
314  logger.error(self.ERROR_NO_ITEMS_IN_DRCE_RESPONSE)
315 
316  return resource
317 
318 
319 
320  # #Send UpdateResourceData request to the ResourcesManager object
321  #
322  # @param resourcesList The list of Resource objects to send
323  def sendUpdateResourceDataRequest(self, resourcesList):
324  # Get TaskManager fields
325  # Prepare synch GetTaskFields request to the TasksManager
326  updateResourceDataEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.UPDATE_RESOURCES_DATA, resourcesList)
327  updateResourceDataEvent.cookie = resourcesList
328  self.send(self.clientResourcesManagerName, updateResourceDataEvent)
329 
330 
331 
332  # #Get the list of admin.Node objects from the cluster schema representation
333  #
334  # @param clusterSchemaObj The schema dic object
335  # @return the list of admin.Node objects or empty list if error of schema object structure
336  def getConnectedNodesFromSchema(self, clusterSchemaObj):
337  nodes = []
338  # Not sure in structure integrity
339  try:
340  # Seek on connected data node
341  for item in clusterSchemaObj["cluster"]["nodes"]:
342  if (item["role"] == "replica" or item["role"] == "shard") and ('connection' in item):
343  port = item["admin"].split(":")
344  if len(port) > 2:
345  nodes.append(admin.Node.Node(item["host"], port[2]))
346  except Exception, e:
347  logger.error(self.ERROR_HCE_CLUSTER_SCHEMA_STRUCTURE_WRONG + " : " + str(e.message))
348 
349  return nodes
350 
351 
352 
353  # #Send to EE transport node admin connection
354  #
355  # @param host HCE node host
356  # @param port HCE node port
357  # @param messageParameters HCE node Admin request message parameters string
358  # @return the raw body of HCE Admin API response
359  def sendToHCENodeAdmin(self, host, port, messageParameters):
360  response = None
361 
362  logger.debug("sendToHCENodeAdmin() use Host: '%s' and Port '%s'", str(host), str(port))
363 
364  # Execute EE node admin request
365  node = admin.Node.Node(host, port)
366  params = [messageParameters]
367  command = admin.Command.Command(admin.Constants.COMMAND_NAMES.DRCE,
368  params,
369  admin.Constants.ADMIN_HANDLER_TYPES.DATA_PROCESSOR_DATA
370  )
371  requestBody = command.generateBody()
372  message = {admin.Constants.STRING_MSGID_NAME : self.drceIdGenerator.get_uid(),
373  admin.Constants.STRING_BODY_NAME : requestBody}
374  try:
375  logger.debug("!!! Before makeRequest() msg_id = %s", str(message[admin.Constants.STRING_MSGID_NAME]))
376  response = self.hceNodeManagerRequest.makeRequest(node, message, self.hceNodeAdminTimeout)
377  except Exception, e:
378  logger.error(self.ERROR_HCE_NODE_REQUEST_ERROR + " : " + str(e.message))
379 
380  if response is None:
381  return response
382  else:
383  return response.getBody()
384 
385 
386 
387  # #Get dict object from json file
388  #
389  # @param filePath The file path name
390  # @return the dict object or None if error
391  def getObjectFromJsonFile(self, filePath):
392  # Initialize returned dic
393  schemaDic = None
394 
395  # Load schema from file
396  schemaJsonString = self.loadFromFile(filePath)
397  if schemaJsonString is not None:
398  try:
399  # Decode json
400  # schemaDic = json.loads(schemaJsonString).decode('utf-8')
401  schemaDic = json.loads(str(schemaJsonString))
402  except ValueError:
403  logger.error(self.ERROR_JSON_DECODE)
404  logger.debug(schemaJsonString)
405 
406  return schemaDic
407 
408 
409 
410  # #Get DRCE request json formated to fetch resources state of HCE node
411  # Supposes execution of regular Linux commands and accumulation the textual printout to parse
412  #
413  # @param filePath The file pathname
414  # @return the file content or None if error
415  def loadFromFile(self, filePath):
416  # Initialize file content
417  fileContent = None
418 
419  try:
420  fileContent = open(filePath, 'r').read()
421  except IOError:
422  logger.error(self.ERROR_READ_FILE + " " + filePath)
423 
424  return fileContent
425 
426 
427 
def __init__(self, configParser, connectionBuilderLight=None)
Log formatter event, defines the object to format message string.
Definition: LogFormatter.py:16
def makeRequest(self, node, message, commandTimeout=None)
makeRequest main class method, it gets node and message params, interact with transport layer...
NodeManagerRequest class contents all data needed for admin level&#39;s request sending.
Command class contents "commad" data and processing methods.
Definition: Command.py:16
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
Resource event object, represents resource&#39;s data fields .
UIDGenerator is used to generate unique message id.
Definition: UIDGenerator.py:14
Class hides routines of bulding connection objects.
def send(self, connect_name, event)
send event
def getResourceFromTaskResponse(self, taskResponseObjects)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Convertor which used to convert Task*Reques to json and TaskResponse from json.
Definition: join.py:1
def sendToHCENodeAdmin(self, host, port, messageParameters)
def getTracebackInfo(linesNumberMax=None)
Definition: Utils.py:218