2 HCE project, Python bindings, Distributed Tasks Manager application. 3 ResourcesStateMonitor object and related classes definitions. 4 This object acts as monitor of resources of HCE Cluster. 5 Periodically reads cluster schema from json schema file and make DRCE synch task request for all "replica" or 6 "shard" nodes. The DRCE task request is prepared and stored in the json external file that read each time when 7 monitoring cycle happened. The DRCE synch task acts as set of Linux commands and collects system information like 8 AVG CPU, disk, memory usage and so on. This information parsed and some system resource usage indicators detected. 9 This indicators updated the resource database that used for tasks planning. 13 @author bgv bgv.hce@gmail.com 14 @link: http://hierarchical-cluster-engine.com/ 15 @copyright: Copyright © 2013-2014 IOIX Ukraine 16 @license: http://hierarchical-cluster-engine.com/license/ 41 logger = logging.getLogger(DTM_CONSTS.LOGGER_NAME)
57 CONFIG_RESOURCES_MANAGER_CLIENT =
"clientResourcesManager" 58 CONFIG_HCE_NODE_ADMIN_TIMEOUT =
"HCENodeAdminTimeout" 59 CONFIG_UPDATE_RESOURCES_DRCE_JSON =
"FetchResourcesStateDRCEJsonFile" 60 CONFIG_HCE_CLUSTER_SCHEMA_FILE =
"HCEClusterSchemaFile" 61 CONFIG_POLLING_TIMEOUT =
"PollingTimeout" 63 ERROR_UPDATE_RESOURCES =
"Not all resources updated!" 64 ERROR_JSON_DECODE =
"Json decode error!" 65 ERROR_RESOURCES_LISTS_NOT_EQUAL =
"Lists of sent and received messages not equal!" 66 ERROR_READ_FILE =
"Error read file" 67 ERROR_READ_DRCE_JSON_FILE =
"Error read DRCE update resources state request json file" 68 ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION =
"HCE node Admin API response processing exception" 69 ERROR_HCE_CLUSTER_SCHEMA_STRUCTURE_WRONG =
"Wrong structure of HCE cluster schema object" 70 ERROR_HCE_NODE_REQUEST_ERROR =
"HCE node request error" 71 ERROR_HCE_RESPONSE_PROCESSING_FORMAT_ADMIN =
"HCE node response format error, cant to split" 72 ERROR_HCE_RESPONSE_PROCESSING_NO_RESOURCE_IN_RESPONSE =
"No resource data found in response or response with error" 73 ERROR_NO_ITEMS_IN_DRCE_RESPONSE =
"No items in DRCE response" 74 ERROR_DRCE_RESPONSE_ERROR_CODE =
"DRCE response with error" 75 ERROR_RE_PARSE_NO_MATCHES =
"RE parse results from DRCE node, no matches!" 84 def __init__(self, configParser, connectionBuilderLight=None):
85 super(ResourcesStateMonitor, self).
__init__()
88 if connectionBuilderLight ==
None:
91 className = self.__class__.__name__
103 resourcesManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
120 logger.debug(
"Constructor passed")
128 generalResponse = event.eventObj
130 resourcesToUpdate = event.cookie
131 if len(resourcesToUpdate) == len(generalResponse.statuses):
133 for i
in range(len(generalResponse.statuses)):
134 if generalResponse.statuses[i] ==
False:
135 errorObjects.append(resourcesToUpdate[i])
139 "send " + str(len(resourcesToUpdate)) +
" returned " +
140 str(len(generalResponse.statuses))
156 if drceRequestJson
is not None:
163 if rawResponse
is not None:
166 logger.debug(
"Raw node response: " + str(rawResponse) +
", type = " + str(
type(rawResponse)))
167 parts = rawResponse.split(admin.Constants.COMMAND_DELIM)
171 logger.debug(
"Received taskResponse object: " + str(vars(taskResponse)))
174 logger.debug(
"Received Resource object: " + str(
varDump(resource)))
176 if resource
is not None:
177 resources.append(resource)
187 if len(resources) > 0:
202 if len(taskResponseObjects.items) > 0:
205 if taskResponseObjects.items[0].error_code == 0
or taskResponseObjects.items[0].error_code > 0:
207 str(taskResponseObjects.items[0].port))
208 resource.nodeName = taskResponseObjects.items[0].node
209 resource.state = dtm.EventObjects.Resource.STATE_ACTIVE
211 reTemplate1 =
r"---CPU LA---(.*)"\
212 "---cpu cores---(.*)"\
214 "---processes---(.*)"\
215 "---threads max---(.*)"\
216 "---threads actual---(.*)"\
221 groupsItems = re.match (reTemplate1, taskResponseObjects.items[0].stdout, re.M | re.I | re.S)
228 cpuCores = int(groupsItems.group(2))
230 resource.cpuCores = cpuCores
233 lines = (
' '.
join(groupsItems.group(3).split(
"\n")[4].split())).split(
' ')
234 cpuIdle = int(lines[14])
236 cpuLoad = 100 - cpuIdle
243 resource.cpu = cpuLoad
246 lines = (
' '.
join(groupsItems.group(3).split(
"\n")[4].split())).split(
' ')
247 cpuIO = int(lines[15])
252 processes = int(groupsItems.group(4).lstrip().rstrip())
254 resource.processes = processes
261 threadsActual = int(groupsItems.group(6).lstrip().rstrip())
263 resource.threads = threadsActual
266 lines = groupsItems.group(7).split(
"\n")
268 ramTotal = int(lines[2][4: 18].lstrip()) * 1000
270 resource.ramR = ramTotal
272 ramUsed = int(lines[2][19: 29].lstrip()) * 1000
274 resource.ramRU = ramUsed
276 ramCached = int(lines[2][63:].lstrip()) * 1000
278 resource.ramRU = resource.ramRU - ramCached
281 swapTotal = int(lines[4][5: 18].lstrip()) * 1000
283 resource.swap = swapTotal
285 swapUsed = int(lines[4][19: 29].lstrip()) * 1000
287 resource.swapU = swapUsed
290 lines = groupsItems.group(8).split(
"\n")
291 lines = (
' '.
join(lines[2].split())).split(
" ")
293 diskTotal = int(lines[1]) * 1024
295 resource.disk = diskTotal
297 diskUsed = int(lines[2]) * 1024
299 resource.diskU = diskUsed
307 taskResponseObjects.items[0].stdout +
"\nstderr:\n" + taskResponseObjects.items[0].stderror)
311 " : " + taskResponseObjects.items[0].error_message)
326 updateResourceDataEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.UPDATE_RESOURCES_DATA, resourcesList)
327 updateResourceDataEvent.cookie = resourcesList
341 for item
in clusterSchemaObj[
"cluster"][
"nodes"]:
342 if (item[
"role"] ==
"replica" or item[
"role"] ==
"shard")
and (
'connection' in item):
343 port = item[
"admin"].split(
":")
362 logger.debug(
"sendToHCENodeAdmin() use Host: '%s' and Port '%s'", str(host), str(port))
366 params = [messageParameters]
369 admin.Constants.ADMIN_HANDLER_TYPES.DATA_PROCESSOR_DATA
371 requestBody = command.generateBody()
372 message = {admin.Constants.STRING_MSGID_NAME : self.
drceIdGenerator.get_uid(),
373 admin.Constants.STRING_BODY_NAME : requestBody}
375 logger.debug(
"!!! Before makeRequest() msg_id = %s", str(message[admin.Constants.STRING_MSGID_NAME]))
383 return response.getBody()
397 if schemaJsonString
is not None:
401 schemaDic = json.loads(str(schemaJsonString))
404 logger.debug(schemaJsonString)
420 fileContent = open(filePath,
'r').read()
string ERROR_HCE_NODE_REQUEST_ERROR
string ERROR_HCE_CLUSTER_SCHEMA_STRUCTURE_WRONG
def __init__(self, configParser, connectionBuilderLight=None)
def makeRequest(self, node, message, commandTimeout=None)
makeRequest main class method, it gets node and message params, interact with transport layer...
string ERROR_UPDATE_RESOURCES
string ERROR_HCE_RESPONSE_PROCESSING_FORMAT_ADMIN
NodeManagerRequest class contents all data needed for admin level's request sending.
def sendUpdateResourceDataRequest(self, resourcesList)
string CONFIG_HCE_CLUSTER_SCHEMA_FILE
string POLL_TIMEOUT_CONFIG_VAR_NAME
def onUpdateResourcesDataResponse(self, event)
Command class contents "commad" data and processing methods.
string ERROR_HCE_RESPONSE_PROCESSING_NO_RESOURCE_IN_RESPONSE
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
string CONFIG_HCE_NODE_ADMIN_TIMEOUT
def addConnection(self, name, connection)
string ERROR_DRCE_RESPONSE_ERROR_CODE
string CONFIG_UPDATE_RESOURCES_DRCE_JSON
resourcesUpdateDRCERequestJsonFile
def getConnectedNodesFromSchema(self, clusterSchemaObj)
This is app base class for management server connection end-points and parallel transport messages pr...
def on_poll_timeout(self)
Resource event object, represents resource's data fields .
string ERROR_RESOURCES_LISTS_NOT_EQUAL
UIDGenerator is used to generate unique message id.
Class hides routines of bulding connection objects.
def send(self, connect_name, event)
send event
clientResourcesManagerName
string ERROR_HCE_RESPONSE_PROCESSING_EXCEPTION
string CONFIG_RESOURCES_MANAGER_CLIENT
string ERROR_READ_DRCE_JSON_FILE
def getResourceFromTaskResponse(self, taskResponseObjects)
string CONFIG_POLLING_TIMEOUT
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
def getObjectFromJsonFile(self, filePath)
string ERROR_RE_PARSE_NO_MATCHES
string ERROR_NO_ITEMS_IN_DRCE_RESPONSE
Convertor which used to convert Task*Reques to json and TaskResponse from json.
def sendToHCENodeAdmin(self, host, port, messageParameters)
def getTracebackInfo(linesNumberMax=None)
def loadFromFile(self, filePath)