HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
BaseServerManager.py
Go to the documentation of this file.
1 '''
2 Created on Feb 25, 2014
3 
4 @author: igor, bgv
5 '''
6 
7 
8 import logging
9 import sys
10 import threading
11 import json
12 import os
13 import psutil
14 
15 from datetime import datetime
16 from app.Utils import ExceptionLog
17 import app.Utils as Utils # pylint: disable=F0401
18 import app.Consts as APP_CONSTS
19 from app.PollerManager import PollerManager
20 from dtm.Constants import EVENT_TYPES #@todo move the module in transport
21 from dtm.EventObjects import AdminState
22 from dtm.EventObjects import GeneralResponse
23 import dtm.Constants as DTM_CONSTS
24 from transport.Connection import ConnectionTimeout, TransportInternalErr
25 from transport.ConnectionBuilderLight import ConnectionBuilderLight
26 from transport.Event import EventBuilder
27 from transport.Response import Response
28 import transport.Consts as trasnport_consts
29 
30 
31 # Logger initialization
32 logger = logging.getLogger(APP_CONSTS.LOGGER_NAME)
33 
34 
35 
38 class BaseServerManager(threading.Thread):
39  ADMIN_CONNECT_ENDPOINT = "Admin"
40  ADMIN_CONNECT_CLIENT = "Admin"
41  POLL_TIMEOUT_DEFAULT = 3000
42 
43  STAT_FIELDS_OPERATION_ADD = 0
44  STAT_FIELDS_OPERATION_SUB = 1
45  STAT_FIELDS_OPERATION_SET = 2
46  STAT_FIELDS_OPERATION_INIT = 3
47 
48  POLL_TIMEOUT_CONFIG_VAR_NAME = "POLL_TIMEOUT"
49  LOG_LEVEL_CONFIG_VAR_NAME = "LOG_LEVEL"
50 
51  STAT_DUMPS_DEFAULT_DIR = "/tmp/"
52  STAT_DUMPS_DEFAULT_NAME = "%APP_NAME%_%CLASS_NAME%_stat_vars.dump"
53 
54  LOGGERS_NAMES = {APP_CONSTS.LOGGER_NAME, "dc", "dtm", "root", ""}
55 
56 
58  def __init__(self, poller_manager=None, admin_connection=None, conectionLightBuilder=None, exceptionForward=False, \
59  dumpStatVars=True):
60  threading.Thread.__init__(self)
61 
62  logger.info("Thread-based class instance constructor begin!")
63 
64  self.dumpStatVars = dumpStatVars
65 
66  if poller_manager is None:
68  else:
69  self.poller_manager = poller_manager
70 
72 
73  self.exit_flag = False
75 
76  #map {"name":connection}
77  self.connections = dict()
78  #map {event_type:handler}
79  self.event_handlers = dict()
80 
81 
83  self.statFields = dict()
84  self.loadStatVarsDump()
85 
86  #Config fields container
87  self.configVars = dict()
88 
89  #by default -add client admin connection
90  conectLightBuilder = conectionLightBuilder
91  admin_connect = admin_connection
92  if conectLightBuilder is None:
93  conectLightBuilder = ConnectionBuilderLight()
94  if admin_connect is None:
95  admin_connect = conectLightBuilder.build(trasnport_consts.CLIENT_CONNECT, self.ADMIN_CONNECT_ENDPOINT)
96 
97  self.addConnection(self.ADMIN_CONNECT_CLIENT, admin_connect)
98  #Set event handler for ADMIN_STATE event
99  self.setEventHandler(EVENT_TYPES.ADMIN_STATE, self.onAdminState)
100  self.setEventHandler(EVENT_TYPES.ADMIN_FETCH_STAT_DATA, self.onAdminFetchStatData)
101  self.setEventHandler(EVENT_TYPES.ADMIN_GET_CONFIG_VARS, self.onAdminGetConfigVars)
102  self.setEventHandler(EVENT_TYPES.ADMIN_SET_CONFIG_VARS, self.onAdminSetConfigVars)
103  self.setEventHandler(EVENT_TYPES.ADMIN_SUSPEND, self.onAdminSuspend)
104  self.sendAdminReadyEvent()
105  #Set exception forwarding behavior, True - means forward exceptions farther, False - handle locally
106  self.exceptionForward = exceptionForward
107  #Init log level in config vars storage
109  #Init start date in stat vars
110  self.updateStatField(APP_CONSTS.START_DATE_NAME, datetime.now().__str__(), self.STAT_FIELDS_OPERATION_SET)
111  logger.info("Thread-based class instance constructor end!")
112 
113 
114  #@param name mane of connection
115  #@param connection instance of Connection
116  def addConnection(self, name, connection):
117  self.initStatFields(name)
118  self.connections[name] = connection
119 
120 
121 
126  def setEventHandler(self, eventType, handler):
127  self.event_handlers[eventType] = handler
128 
129 
130 
134  def send(self, connect_name, event):
135  try:
136  logger.debug("Send to " + str(connect_name) + "\n" + self.createLogMsg(event))
137  if self.is_connection_registered(connect_name):
138  self.connections[connect_name].send(event)
139  self.updateStatField(connect_name + "_send_cnt", 1)
140  self.updateStatField(connect_name + "_send_bytes", sys.getsizeof(event))
141  else:
142  logger.error("Unregistered connection [" + str(connect_name) + "] network transport event!")
143  except IOError as e:
144  del e
145  except EnvironmentError as e:
146  del e
147  except Exception, err:
148  logger.error("Error `%s`", str(err))
149  except: # pylint: disable=W0702
150  pass
151 
152 
153 
157  def reply(self, event, reply_event):
158  reply_event.uid = event.uid
159  reply_event.connect_identity = event.connect_identity
160  reply_event.cookie = event.cookie
161  self.send(event.connect_name, reply_event)
162 
163 
164 
167  def poll(self):
168  connect_names = dict()
169  timedout = False
170 
171  try:
172  if self.POLL_TIMEOUT_CONFIG_VAR_NAME in self.configVars and \
173  int(self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME]) > 0:
174  timeout = int(self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME])
175  else:
176  timeout = self.pollTimeout
177  #Try to poll all registered connections
178  connect_names = self.poller_manager.poll(timeout) # pylint: disable=R0204
179  except ConnectionTimeout:
180  timedout = True
181  except TransportInternalErr as err:
182  logger.error("ZMQ transport error: " + str(err.message))
183  except IOError as e:
184  del e
185  except Exception as err:
186  ExceptionLog.handler(logger, err, "Polling error:")
187 
188  if timedout is True:
189  try:
190  self.on_poll_timeout()
191  except IOError as e:
192  del e
193  except EnvironmentError as e:
194  del e
195  except Exception as err:
196  try:
197  ExceptionLog.handler(logger, err, "Call of on_poll_timeout() error:")
198  except IOError as e:
199  del e
200  else:
201  #Read data from sockets of connections list returned
202  for name in connect_names:
203  try:
204  if self.is_connection_registered(name):
205  event = self.connections[name].recv()
206  if isinstance(event, Response):
207  #process data from Connection
208  event = self.eventBuilder.build(EVENT_TYPES.SERVER_TCP_RAW, event)
209  event.connect_name = name
210  #Process received event
211  self.process(event)
212  else:
213  logger.error("Unregistered connection [" + str(name) + "] network transport event!")
214  except IOError as e:
215  del e
216  except EnvironmentError as e:
217  del e
218  except Exception as err:
219  ExceptionLog.handler(logger, err, "Event processing error:")
220 
221 
222 
226  def process(self, event):
227  try:
228  try:
229  logger.debug("Got " + self.createLogMsg(event))
230  except IOError as e:
231  del e
232 
233  self.updateStatField(event.connect_name + "_recv_cnt", 1)
234  self.updateStatField(event.connect_name + "_recv_bytes", sys.getsizeof(event))
235  if event.eventType in self.event_handlers:
236  self.event_handlers[event.eventType](event)
237  else:
238  self.on_unhandled_event(event)
239  except IOError as e:
240  del e
241  except EnvironmentError as e:
242  del e
243 
244 
245  def run(self):
246  while not self.exit_flag:
247  try:
248  self.build_poller_list()
249  self.poll()
250  self.clear_poller()
251  except IOError as e:
252  del e
253  except Exception, e:
254  try:
255  logger.error("Unhandled exception in thread-based class : " + str(e.message) + "\n" + \
256  Utils.getTracebackInfo())
257  if self.exceptionForward:
258  logger.error("Exception forwarded.")
259  raise e
260  except IOError as e:
261  del e
262 
263  self.saveStatVarsDump()
264 
265 
266 
267 
272  def is_connection_registered(self, name):
273  if name in self.connections:
274  return True
275  return False
276 
277 
278 
280  def on_poll_timeout(self):
281  pass
282 
283 
284 
288  def on_unhandled_event(self, event):
289  logStr = "Got UNHANDLED EVENT\n" + self.createLogMsg(event)
290  logger.debug(logStr)
291 
292 
293  #common for all manager => in separate class
294  def build_poller_list(self):
295  for item in self.connections:
296  self.poller_manager.add(self.connections[item], item)
297 
298 
299  def clear_poller(self):
300  for item in self.connections:
301  self.poller_manager.remove(self.connections[item])
302 
303 
304 
308  def onAdminState(self, event):
309  adminState = event.eventObj
310  className = self.__class__.__name__
311  response = AdminState(className, AdminState.STATE_SHUTDOWN)
312  try:
313  if adminState.command == AdminState.STATE_SHUTDOWN and adminState.className == className:
314  logger.info("Has successfully shutdown!")
315  self.exit_flag = True
316  else:
317  logger.error("Got unsupported admin command [" + str(adminState.command) + "] for " + str(adminState.className))
318  response = AdminState(className, AdminState.STATE_ERROR)
319  except IOError as e:
320  del e
321 
322  responseEvent = self.eventBuilder.build(EVENT_TYPES.ADMIN_STATE_RESPONSE, response)
323  self.reply(event, responseEvent)
324 
325 
326 
327 
331  def onAdminFetchStatData(self, event):
332  adminStatData = event.eventObj
333  if adminStatData.className == self.__class__.__name__:
334  adminStatData.fields = self.getStatDataFields(adminStatData.fields)
335  else:
336  err_msg = "Got wrong admin class name [" + adminStatData.className + "]"
337  try:
338  logger.error(err_msg + str(adminStatData.className))
339  except IOError as e:
340  del e
341 
342  responseEvent = self.eventBuilder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE, adminStatData)
343  self.reply(event, responseEvent)
344 
345 
346 
347 
351  def onAdminSuspend(self, event):
352  responseObj = GeneralResponse(GeneralResponse.ERROR_OK, (">>> Suspend Processed " + str(self.__class__.__name__)))
353 
354  try:
355  logger.debug(">>> SUSPEND BASE processed class=" + str(self.__class__.__name__))
356  except IOError as e:
357  del e
358 
359  responseEvent = self.eventBuilder.build(EVENT_TYPES.ADMIN_SUSPEND_RESPONSE, responseObj)
360  self.reply(event, responseEvent)
361 
362 
363 
367  def getStatDataFields(self, fields):
368  if len(fields) == 0:
369  fields = self.statFields
370  else:
371  for field_name in fields:
372  if field_name in self.statFields:
373  fields[field_name] = self.statFields[field_name]
374  if isinstance(fields, dict):
375  fields.update(self.getSystemStat())
376  return fields
377 
378 
379 
383  def getSystemStat(self):
384  fields = {'RAMV':0, 'RAMR':0, 'CPUU':0, 'CPUS':0, 'THREADS':0}
385  try:
386  py = psutil.Process(os.getpid())
387  m = py.memory_info()
388  fields['RAMV'] = m.vms
389  fields['RAMR'] = m.rss
390  c = py.cpu_times()
391  fields['CPUU'] = c.user
392  fields['CPUS'] = c.system
393  fields['THREADS'] = py.num_threads()
394  except Exception as e:
395  del e
396 
397  return fields
398 
399 
400 
404  def getConfigVarsFields(self, fields):
405  if len(fields) == 0 or "*" in fields:
406  fields = self.configVars
407  else:
408  for field_name in fields:
409  if field_name in self.configVars:
410  fields[field_name] = self.configVars[field_name]
411 
412  return fields
413 
414 
415 
416 
420  def onAdminGetConfigVars(self, event):
421  getConfigVars = event.eventObj
422  if getConfigVars.className == self.__class__.__name__:
423  if len(getConfigVars.fields) > 0 and "*" not in getConfigVars.fields:
424  for fieldName in getConfigVars.fields:
425  if fieldName in self.configVars:
426  getConfigVars.fields[fieldName] = self.configVars[fieldName]
427  else:
428  getConfigVars.fields[fieldName] = None
429  else:
430  getConfigVars.fields = self.configVars
431  else:
432  try:
433  logger.error("Wrong admin class name [" + str(getConfigVars.className) + "]")
434  except IOError as e:
435  del e
436 
437  responseEvent = self.eventBuilder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE, getConfigVars)
438  self.reply(event, responseEvent)
439 
440 
441 
442 
446  def onAdminSetConfigVars(self, event):
447  setConfigVars = event.eventObj
448  responseEvent = self.eventBuilder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE,
449  self.setConfigVars(setConfigVars))
450  self.reply(event, responseEvent)
451 
452 
453 
454 
458  def setConfigVars(self, setConfigVars):
459  if setConfigVars.className == self.__class__.__name__:
460  for fieldName in setConfigVars.fields:
461  if fieldName in self.configVars:
462  if type(self.configVars[fieldName]) == type(setConfigVars.fields[fieldName]): # pylint: disable=C0123
463  self.configVars[fieldName] = setConfigVars.fields[fieldName]
464  self.processSpecialConfigVars(fieldName, setConfigVars.fields[fieldName])
465  else:
466  setConfigVars.fields[fieldName] = None
467  else:
468  self.configVars[fieldName] = setConfigVars.fields[fieldName]
469  else:
470  try:
471  logger.error("Wrong admin class name [" + str(setConfigVars.className) + "]")
472  except IOError as e:
473  del e
474 
475  return setConfigVars
476 
477 
478 
479 
482  className = self.__class__.__name__
483  ready = AdminState(className, AdminState.STATE_READY)
484  readyEvent = self.eventBuilder.build(EVENT_TYPES.ADMIN_STATE_RESPONSE, ready)
485  self.send(self.ADMIN_CONNECT_CLIENT, readyEvent)
486 
487 
488 
489 
493  def createLogMsg(self, event):
494  logMsg = "event:\n" + str(vars(event)) + "\neventObj:\n"
495  if event.eventObj and hasattr(event.eventObj, "__dict__"):
496  logMsg = logMsg + str(vars(event.eventObj))
497  else:
498  logMsg = logMsg + str(event.eventObj)
499  return logMsg
500 
501 
502 
503 
506  def initStatFields(self, connect_name):
507  if connect_name + "_send_cnt" not in self.statFields:
508  self.statFields[connect_name + "_send_cnt"] = 0
509  if connect_name + "_recv_cnt" not in self.statFields:
510  self.statFields[connect_name + "_recv_cnt"] = 0
511  if connect_name + "_send_bytes" not in self.statFields:
512  self.statFields[connect_name + "_send_bytes"] = 0
513  if connect_name + "_recv_bytes" not in self.statFields:
514  self.statFields[connect_name + "_recv_bytes"] = 0
515 
516 
517 
518 
522  def updateStatField(self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD):
523  if field_name in self.statFields:
524  if operation == self.STAT_FIELDS_OPERATION_ADD:
525  self.statFields[field_name] += int(value)
526  elif operation == self.STAT_FIELDS_OPERATION_SUB:
527  self.statFields[field_name] -= int(value)
528  elif operation == self.STAT_FIELDS_OPERATION_SET:
529  self.statFields[field_name] = value
530  else:
531  if operation == self.STAT_FIELDS_OPERATION_SET or operation == self.STAT_FIELDS_OPERATION_INIT:
532  self.statFields[field_name] = value
533  else:
534  try:
535  logger.error("Stat update: key is not valid " + field_name)
536  except IOError as e:
537  del e
538 
539 
540 
541 
545  def processSpecialConfigVars(self, name, value):
546  try:
547  if name == self.LOG_LEVEL_CONFIG_VAR_NAME:
548  #logger.setLevel(value)
549  self.setLogLevel(value)
550  except IOError as e:
551  del e
552  except Exception as err:
553  try:
554  ExceptionLog.handler(logger, err, "Exception:")
555  except IOError as e:
556  del e
557 
558 
559 
560 
564  def getLogLevel(self):
565  level = None
566  for name in logging.Logger.manager.loggerDict.keys():
567  if isinstance(logging.Logger.manager.loggerDict[name], logging.Logger) and name in self.LOGGERS_NAMES:
568  level = logging.Logger.manager.loggerDict[name].getEffectiveLevel()
569  break
570  else:
571  pass
572 
573  return level
574 
575 
576 
577 
581  def setLogLevel(self, level):
582  for name in logging.Logger.manager.loggerDict.keys():
583  if isinstance(logging.Logger.manager.loggerDict[name], logging.Logger) and name in self.LOGGERS_NAMES:
584  logging.Logger.manager.loggerDict[name].setLevel(level)
585  ll = logging.getLogger("")
586  ll.setLevel(level)
587 
588 
589 
590 
592  def saveStatVarsDump(self):
593  if self.dumpStatVars:
594  try:
595  name = self.getStatVarsDumpFileName()
596  with open(name, "w") as f:
597  f.write(json.dumps(self.statFields, indent=4))
598  except Exception as err:
599  try:
600  ExceptionLog.handler(logger, err, "Error save stat vars to file `" + name + "`: ")
601  except IOError as e:
602  del e
603 
604 
605 
606 
608  def loadStatVarsDump(self):
609  if self.dumpStatVars:
610  try:
611  name = self.getStatVarsDumpFileName()
612  if os.path.exists(name):
613  with open(name, 'r') as f:
614  data = f.read()
615  self.statFields = json.loads(str(data))
616  #APP_CONSTS.START_DATE_NAME
617  self.updateStatField(APP_CONSTS.START_DATE_NAME, datetime.now().__str__(), self.STAT_FIELDS_OPERATION_SET)
618  except IOError as e:
619  del e
620  except Exception as err:
621  try:
622  ExceptionLog.handler(logger, err, "Error load stat vars from file `" + name + "`: ")
623  logger.error(str(data))
624  except IOError as e:
625  del e
626 
627 
628 
629 
632  appName = os.path.splitext(os.path.basename(sys.argv[0]))[0]
633  return self.STAT_DUMPS_DEFAULT_DIR + self.STAT_DUMPS_DEFAULT_NAME.replace("%APP_NAME%", appName).\
634  replace("%CLASS_NAME%", self.__class__.__name__)
635 
636 
637  # #create dict config (dict object)
638  #
639  def createDBIDict(self, configParser):
640  # get section
641  return dict(configParser.items(DTM_CONSTS.DB_CONFIG_SECTION))
def getConfigVarsFields(self, fields)
getConfigVarsFields returns config vars from storage
def getStatVarsDumpFileName(self)
Get stat vars file name.
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def setLogLevel(self, level)
Set log level for all loggers.
def saveStatVarsDump(self)
Save stat vars in json file.
def is_connection_registered(self, name)
check is a connection was registered in a instance of BaseServerManager i object
def process(self, event)
process event call the event handler method that was set by user or on_unhandled_event method if not ...
def initStatFields(self, connect_name)
add record in statFields
def getSystemStat(self)
getSystemStat returns stat data for system indicators: RAMV, RAMR and CPU
def updateStatField(self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
update values of stat field - default sum
def onAdminState(self, event)
onAdminState event handler process admin SHUTDOWN command
GeneralResponse event object, represents general state response for multipurpose usage.
def createLogMsg(self, event)
from string message from event object
def getStatDataFields(self, fields)
getStatDataFields returns stat data from storage
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
def loadStatVarsDump(self)
Load stat vars in json file.
def setConfigVars(self, setConfigVars)
processSetConfigVars sets config vars in storage
def poll(self)
poll function polling connections receive as multipart msg, the second argument is pickled pyobj ...
def on_poll_timeout(self)
function will call every time when ConnectionTimeout exception arrive
def __init__(self, poller_manager=None, admin_connection=None, conectionLightBuilder=None, exceptionForward=False, dumpStatVars=True)
constructor
AdminState event object, for admin manage change application state commands, like shutdown...
Class hides routines of bulding connection objects.
def send(self, connect_name, event)
send event
def processSpecialConfigVars(self, name, value)
send ready event to notify adminInterfaceService
def onAdminFetchStatData(self, event)
onAdminState event handler process admin command
def getLogLevel(self)
Get log level from first of existing loggers.
def onAdminGetConfigVars(self, event)
onAdminGetConfigVars event handler process getConfigVars admin command, fill and return config vars a...
def sendAdminReadyEvent(self)
send ready event to notify adminInterfaceService
def onAdminSetConfigVars(self, event)
onAdminSetConfigVars event handler process setConfigVars admin command
def on_unhandled_event(self, event)
function will call every time when arrive doesn't set handler for event type of event.evenType
def onAdminSuspend(self, event)
onAdminState event handler process admin command