2 Created on Feb 25, 2014 15 from datetime
import datetime
32 logger = logging.getLogger(APP_CONSTS.LOGGER_NAME)
39 ADMIN_CONNECT_ENDPOINT =
"Admin" 40 ADMIN_CONNECT_CLIENT =
"Admin" 41 POLL_TIMEOUT_DEFAULT = 3000
43 STAT_FIELDS_OPERATION_ADD = 0
44 STAT_FIELDS_OPERATION_SUB = 1
45 STAT_FIELDS_OPERATION_SET = 2
46 STAT_FIELDS_OPERATION_INIT = 3
48 POLL_TIMEOUT_CONFIG_VAR_NAME =
"POLL_TIMEOUT" 49 LOG_LEVEL_CONFIG_VAR_NAME =
"LOG_LEVEL" 51 STAT_DUMPS_DEFAULT_DIR =
"/tmp/" 52 STAT_DUMPS_DEFAULT_NAME =
"%APP_NAME%_%CLASS_NAME%_stat_vars.dump" 54 LOGGERS_NAMES = {APP_CONSTS.LOGGER_NAME,
"dc",
"dtm",
"root",
""}
58 def __init__(self, poller_manager=None, admin_connection=None, conectionLightBuilder=None, exceptionForward=False, \
60 threading.Thread.__init__(self)
62 logger.info(
"Thread-based class instance constructor begin!")
66 if poller_manager
is None:
90 conectLightBuilder = conectionLightBuilder
91 admin_connect = admin_connection
92 if conectLightBuilder
is None:
94 if admin_connect
is None:
111 logger.info(
"Thread-based class instance constructor end!")
134 def send(self, connect_name, event):
136 logger.debug(
"Send to " + str(connect_name) +
"\n" + self.
createLogMsg(event))
140 self.
updateStatField(connect_name +
"_send_bytes", sys.getsizeof(event))
142 logger.error(
"Unregistered connection [" + str(connect_name) +
"] network transport event!")
145 except EnvironmentError
as e:
147 except Exception, err:
148 logger.error(
"Error `%s`", str(err))
157 def reply(self, event, reply_event):
158 reply_event.uid = event.uid
159 reply_event.connect_identity = event.connect_identity
160 reply_event.cookie = event.cookie
161 self.
send(event.connect_name, reply_event)
168 connect_names = dict()
179 except ConnectionTimeout:
181 except TransportInternalErr
as err:
182 logger.error(
"ZMQ transport error: " + str(err.message))
185 except Exception
as err:
186 ExceptionLog.handler(logger, err,
"Polling error:")
193 except EnvironmentError
as e:
195 except Exception
as err:
197 ExceptionLog.handler(logger, err,
"Call of on_poll_timeout() error:")
202 for name
in connect_names:
206 if isinstance(event, Response):
208 event = self.
eventBuilder.build(EVENT_TYPES.SERVER_TCP_RAW, event)
209 event.connect_name = name
213 logger.error(
"Unregistered connection [" + str(name) +
"] network transport event!")
216 except EnvironmentError
as e:
218 except Exception
as err:
219 ExceptionLog.handler(logger, err,
"Event processing error:")
234 self.
updateStatField(event.connect_name +
"_recv_bytes", sys.getsizeof(event))
241 except EnvironmentError
as e:
255 logger.error(
"Unhandled exception in thread-based class : " + str(e.message) +
"\n" + \
256 Utils.getTracebackInfo())
258 logger.error(
"Exception forwarded.")
289 logStr =
"Got UNHANDLED EVENT\n" + self.
createLogMsg(event)
309 adminState = event.eventObj
310 className = self.__class__.__name__
311 response =
AdminState(className, AdminState.STATE_SHUTDOWN)
313 if adminState.command == AdminState.STATE_SHUTDOWN
and adminState.className == className:
314 logger.info(
"Has successfully shutdown!")
317 logger.error(
"Got unsupported admin command [" + str(adminState.command) +
"] for " + str(adminState.className))
318 response =
AdminState(className, AdminState.STATE_ERROR)
322 responseEvent = self.
eventBuilder.build(EVENT_TYPES.ADMIN_STATE_RESPONSE, response)
323 self.
reply(event, responseEvent)
332 adminStatData = event.eventObj
333 if adminStatData.className == self.__class__.__name__:
336 err_msg =
"Got wrong admin class name [" + adminStatData.className +
"]" 338 logger.error(err_msg + str(adminStatData.className))
342 responseEvent = self.
eventBuilder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE, adminStatData)
343 self.
reply(event, responseEvent)
352 responseObj =
GeneralResponse(GeneralResponse.ERROR_OK, (
">>> Suspend Processed " + str(self.__class__.__name__)))
355 logger.debug(
">>> SUSPEND BASE processed class=" + str(self.__class__.__name__))
359 responseEvent = self.
eventBuilder.build(EVENT_TYPES.ADMIN_SUSPEND_RESPONSE, responseObj)
360 self.
reply(event, responseEvent)
371 for field_name
in fields:
373 fields[field_name] = self.
statFields[field_name]
374 if isinstance(fields, dict):
384 fields = {
'RAMV':0,
'RAMR':0,
'CPUU':0,
'CPUS':0,
'THREADS':0}
386 py = psutil.Process(os.getpid())
388 fields[
'RAMV'] = m.vms
389 fields[
'RAMR'] = m.rss
391 fields[
'CPUU'] = c.user
392 fields[
'CPUS'] = c.system
393 fields[
'THREADS'] = py.num_threads()
394 except Exception
as e:
405 if len(fields) == 0
or "*" in fields:
408 for field_name
in fields:
410 fields[field_name] = self.
configVars[field_name]
421 getConfigVars = event.eventObj
422 if getConfigVars.className == self.__class__.__name__:
423 if len(getConfigVars.fields) > 0
and "*" not in getConfigVars.fields:
424 for fieldName
in getConfigVars.fields:
426 getConfigVars.fields[fieldName] = self.
configVars[fieldName]
428 getConfigVars.fields[fieldName] =
None 433 logger.error(
"Wrong admin class name [" + str(getConfigVars.className) +
"]")
437 responseEvent = self.
eventBuilder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE, getConfigVars)
438 self.
reply(event, responseEvent)
447 setConfigVars = event.eventObj
448 responseEvent = self.
eventBuilder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE,
450 self.
reply(event, responseEvent)
459 if setConfigVars.className == self.__class__.__name__:
460 for fieldName
in setConfigVars.fields:
463 self.
configVars[fieldName] = setConfigVars.fields[fieldName]
466 setConfigVars.fields[fieldName] =
None 468 self.
configVars[fieldName] = setConfigVars.fields[fieldName]
471 logger.error(
"Wrong admin class name [" + str(setConfigVars.className) +
"]")
482 className = self.__class__.__name__
483 ready =
AdminState(className, AdminState.STATE_READY)
484 readyEvent = self.
eventBuilder.build(EVENT_TYPES.ADMIN_STATE_RESPONSE, ready)
494 logMsg =
"event:\n" + str(vars(event)) +
"\neventObj:\n" 495 if event.eventObj
and hasattr(event.eventObj,
"__dict__"):
496 logMsg = logMsg + str(vars(event.eventObj))
498 logMsg = logMsg + str(event.eventObj)
507 if connect_name +
"_send_cnt" not in self.
statFields:
508 self.
statFields[connect_name +
"_send_cnt"] = 0
509 if connect_name +
"_recv_cnt" not in self.
statFields:
510 self.
statFields[connect_name +
"_recv_cnt"] = 0
511 if connect_name +
"_send_bytes" not in self.
statFields:
512 self.
statFields[connect_name +
"_send_bytes"] = 0
513 if connect_name +
"_recv_bytes" not in self.
statFields:
514 self.
statFields[connect_name +
"_recv_bytes"] = 0
535 logger.error(
"Stat update: key is not valid " + field_name)
552 except Exception
as err:
554 ExceptionLog.handler(logger, err,
"Exception:")
566 for name
in logging.Logger.manager.loggerDict.keys():
567 if isinstance(logging.Logger.manager.loggerDict[name], logging.Logger)
and name
in self.
LOGGERS_NAMES:
568 level = logging.Logger.manager.loggerDict[name].getEffectiveLevel()
582 for name
in logging.Logger.manager.loggerDict.keys():
583 if isinstance(logging.Logger.manager.loggerDict[name], logging.Logger)
and name
in self.
LOGGERS_NAMES:
584 logging.Logger.manager.loggerDict[name].setLevel(level)
585 ll = logging.getLogger(
"")
596 with open(name,
"w")
as f:
597 f.write(json.dumps(self.
statFields, indent=4))
598 except Exception
as err:
600 ExceptionLog.handler(logger, err,
"Error save stat vars to file `" + name +
"`: ")
612 if os.path.exists(name):
613 with open(name,
'r') as f: 620 except Exception
as err:
622 ExceptionLog.handler(logger, err,
"Error load stat vars from file `" + name +
"`: ")
623 logger.error(str(data))
632 appName = os.path.splitext(os.path.basename(sys.argv[0]))[0]
634 replace(
"%CLASS_NAME%", self.__class__.__name__)
641 return dict(configParser.items(DTM_CONSTS.DB_CONFIG_SECTION))
def getConfigVarsFields(self, fields)
getConfigVarsFields returns config vars from storage
def getStatVarsDumpFileName(self)
Get stat vars file name.
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def createDBIDict(self, configParser)
def setLogLevel(self, level)
Set log level for all loggers.
def saveStatVarsDump(self)
Save stat vars in json file.
def is_connection_registered(self, name)
check is a connection was registered in a instance of BaseServerManager i object
def process(self, event)
process event call the event handler method that was set by user or on_unhandled_event method if not ...
def initStatFields(self, connect_name)
add record in statFields
def getSystemStat(self)
getSystemStat returns stat data for system indicators: RAMV, RAMR and CPU
def updateStatField(self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
update values of stat field - default sum
def onAdminState(self, event)
onAdminState event handler process admin SHUTDOWN command
GeneralResponse event object, represents general state response for multipurpose usage.
def createLogMsg(self, event)
from string message from event object
string POLL_TIMEOUT_CONFIG_VAR_NAME
def getStatDataFields(self, fields)
getStatDataFields returns stat data from storage
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
def loadStatVarsDump(self)
Load stat vars in json file.
int STAT_FIELDS_OPERATION_SET
def setConfigVars(self, setConfigVars)
processSetConfigVars sets config vars in storage
def poll(self)
poll function polling connections receive as multipart msg, the second argument is pickled pyobj ...
def on_poll_timeout(self)
function will call every time when ConnectionTimeout exception arrive
def __init__(self, poller_manager=None, admin_connection=None, conectionLightBuilder=None, exceptionForward=False, dumpStatVars=True)
constructor
int STAT_FIELDS_OPERATION_ADD
AdminState event object, for admin manage change application state commands, like shutdown...
string ADMIN_CONNECT_CLIENT
Class hides routines of bulding connection objects.
statFields
stat fields container
def send(self, connect_name, event)
send event
def build_poller_list(self)
string ADMIN_CONNECT_ENDPOINT
def processSpecialConfigVars(self, name, value)
send ready event to notify adminInterfaceService
def onAdminFetchStatData(self, event)
onAdminState event handler process admin command
def getLogLevel(self)
Get log level from first of existing loggers.
def onAdminGetConfigVars(self, event)
onAdminGetConfigVars event handler process getConfigVars admin command, fill and return config vars a...
def sendAdminReadyEvent(self)
send ready event to notify adminInterfaceService
int STAT_FIELDS_OPERATION_SUB
string LOG_LEVEL_CONFIG_VAR_NAME
def onAdminSetConfigVars(self, event)
onAdminSetConfigVars event handler process setConfigVars admin command
def on_unhandled_event(self, event)
function will call every time when arrive doesn't set handler for event type of event.evenType
int STAT_FIELDS_OPERATION_INIT
string STAT_DUMPS_DEFAULT_NAME
def onAdminSuspend(self, event)
onAdminState event handler process admin command
string STAT_DUMPS_DEFAULT_DIR