HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
BatchTasksManagerRealTime.py
Go to the documentation of this file.
1 '''
2 HCE project, Python bindings, Distributed Crawler application.
3 BatchTasksManagerRealTime object and related classes definitions.
4 
5 @package: dc
6 @author bgv bgv.hce@gmail.com
7 @link: http://hierarchical-cluster-engine.com/
8 @copyright: Copyright © 2014 IOIX Ukraine
9 @license: http://hierarchical-cluster-engine.com/license/
10 @since: 0.1
11 '''
12 
13 
14 import ctypes
15 import zlib
16 import logging
17 import time
18 import threading
19 
20 try:
21  import cPickle as pickle
22 except ImportError:
23  import pickle
24 
25 from app.BaseServerManager import BaseServerManager
26 # from app.Utils import varDump
27 import app.Utils as Utils # pylint: disable=F0401
28 from dc import EventObjects
29 import dc.Constants as DC_CONSTS
30 from drce.CommandConvertor import CommandConvertor
31 from drce.Commands import Session
32 from drce.Commands import TaskExecuteRequest
33 from drce.Commands import TaskExecuteStruct
34 from drce.DRCEManager import ConnectionTimeout, TransportInternalErr, CommandExecutorErr
35 from drce.DRCEManager import DRCEManager
36 from drce.DRCEManager import HostParams
37 import transport.Consts as TRANSPORT_CONSTS
38 from transport.ConnectionBuilderLight import ConnectionBuilderLight
39 from transport.IDGenerator import IDGenerator
40 from transport.UIDGenerator import UIDGenerator
41 
42 # Logger initialization
43 logger = Utils.MPLogger().getLogger()
44 lock = threading.Lock()
45 
46 # #The BatchTasksManagerRealTime class, is a crawling logic of DC application implemented for on demand requests.
47 #
48 # This object is a crawling batches algorithm of DC service for real time on demand crawling requests.
49 # It uses the DRCE protocol API to send asynchronous tasks directly to the hce-node cluster router
50 #
52 
53  DRCE_REDUCER_TTL = 3000000
54  REQUEST_ERROR_OBJECT_TYPE = 1
55  REQUEST_ERROR_URLS_COUNT = 2
56  REQUEST_ERROR_THREADS_NUMBER_EXCEEDED = 3
57  CONFIG_DRCE_REQUEST_ROUTING_DEFAULT = 1
58  CONFIG_BATCH_MAX_ITERATIONS_DEFAULT = 2
59 
60  # Configuration settings options names
61  CONFIG_SERVER = "server"
62 
63  CONFIG_DRCE_STARTER_NAME = "DRCEStarterName"
64  CONFIG_DRCE_HOST = "DRCEHost"
65  CONFIG_DRCE_PORT = "DRCEPort"
66  CONFIG_DRCE_TIMEOUT = "DRCETimeout"
67  CONFIG_DRCE_CRAWLER_APP_NAME = "DRCECrawlerAppName"
68  CONFIG_BATCH_MAX_TIME = "BatchMaxExecutionTime"
69  CONFIG_BATCH_MAX_URLS = "BatchMaxURLs"
70  CONFIG_MAX_THREADS = "MaxThreads"
71  CONFIG_POLLING_TIMEOUT = "PollingTimeout"
72  CONFIG_DRCE_REQUEST_ROUTING = "DRCERequestRouting"
73  CONFIG_BATCH_MAX_ITERATIONS = "BatchMaxIterations"
74 
75  REAL_TIME_CRAWL_THREAD_NAME_PREFIX = 'RtCrawl_'
76 
77  # #constructor
78  # initialize fields
79  #
80  # @param configParser config parser object
81  # @param connectBuilderLight connection builder light
82  #
83  def __init__(self, configParser, connectionBuilderLight=None):
84  super(BatchTasksManagerRealTime, self).__init__()
85 
86  # Instantiate the connection builder light if not set
87  if connectionBuilderLight is None:
88  connectionBuilderLight = ConnectionBuilderLight()
89 
90  # Get configuration settings
91  className = self.__class__.__name__
92  self.serverName = configParser.get(className, self.CONFIG_SERVER)
93 
94  # Create connections and raise bind or connect actions for correspondent connection type
95  serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.serverName)
96  self.setEventHandler(DC_CONSTS.EVENT_TYPES.BATCH, self.onEventsHandler)
97 
98  # Initialize DRCE API
99  self.configVars[self.CONFIG_DRCE_TIMEOUT] = configParser.getint(className, self.CONFIG_DRCE_TIMEOUT)
100  self.drceHost = configParser.get(className, self.CONFIG_DRCE_HOST)
101  self.drcePort = configParser.get(className, self.CONFIG_DRCE_PORT)
102  # self.drceManager = DRCEManager()
103  # self.drceManager.activate_host(HostParams(self.drceHost, self.drcePort))
106 
107  # Add connections to the polling set
108  self.addConnection(self.serverName, serverConnection)
109 
110  # Max URLs per batch
111  self.configVars[self.CONFIG_BATCH_MAX_URLS] = configParser.getint(className, self.CONFIG_BATCH_MAX_URLS)
112 
113  # Set crawler task app name
114  self.configVars[self.CONFIG_DRCE_CRAWLER_APP_NAME] = configParser.get(className, self.CONFIG_DRCE_CRAWLER_APP_NAME)
115  self.configVars[self.CONFIG_BATCH_MAX_TIME] = configParser.getint(className, self.CONFIG_BATCH_MAX_TIME)
116  self.configVars[self.CONFIG_DRCE_STARTER_NAME] = configParser.get(className, self.CONFIG_DRCE_STARTER_NAME)
117  self.configVars[self.CONFIG_MAX_THREADS] = configParser.getint(className, self.CONFIG_MAX_THREADS)
118  if configParser.has_option(className, self.CONFIG_DRCE_REQUEST_ROUTING):
119  self.configVars[self.CONFIG_DRCE_REQUEST_ROUTING] = configParser.getint(className,
121  else:
123  if configParser.has_option(className, self.CONFIG_BATCH_MAX_ITERATIONS):
124  self.configVars[self.CONFIG_BATCH_MAX_ITERATIONS] = configParser.getint(className,
126  else:
128 
129  # Batches counter init in stat vars
130  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_TOTAL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
131  # Batches that fault processing counter init in stat vars
132  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
133  # Batches urls total counter init in stat vars
134  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
135  # Fault batches urls total counter init in stat vars
136  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
137  # Avg processing time init in stat vars
138  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_TIME_AVG_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
139  # Crawling batches real-time threads number init in stat vars
140  self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
141  # Crawling batches real-time threads created total number init in stat vars
142  self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_CREATED_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
143 
144  # Total time of processing sum from all request
145  self.totalTime = 0
146 
147  # Set connections poll timeout, defines period of HCE cluster monitoring cycle, msec
148  self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] = configParser.getint(className, self.CONFIG_POLLING_TIMEOUT)
149 
150 
151  # #Events wait timeout handler, for timeout state of the connections polling.
152  # Executes update of workers threads stat counter
153  #
154  def on_poll_timeout(self):
155  lock.acquire()
156  # self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, threading.active_count(),
157  # self.STAT_FIELDS_OPERATION_SET)
158  # Correct the number of clients to fix some crashes
159  # if self.statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_NAME] > threading.active_count():
160  # self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, threading.active_count(),
161  # self.STAT_FIELDS_OPERATION_SET)
162  # Calc threads number
163  n = 0
164  main_thread = threading.currentThread()
165  for t in threading.enumerate():
166  if t is not main_thread and t.getName().startswith(self.REAL_TIME_CRAWL_THREAD_NAME_PREFIX):
167  n += 1
168  if self.statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_NAME] > n:
169  self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, n, self.STAT_FIELDS_OPERATION_SET)
170 
171  lock.release()
172 
173 
174 
175  # #onEventsHandler event handler
176  #
177  # @param event instance of Event object
178  def onEventsHandler(self, event):
179  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_TOTAL_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
180  if isinstance(event.eventObj, EventObjects.Batch):
181  if len(event.eventObj.items) > int(self.configVars[self.CONFIG_BATCH_MAX_URLS]):
182  clientResponseObj = EventObjects.ClientResponse()
183  clientResponseObj.errorCode = self.REQUEST_ERROR_URLS_COUNT
184  clientResponseObj.errorMessage = "Wrong requested object type " + str(len(event.eventObj.items)) + \
185  ", Batch expected."
186  else:
187  if self.configVars[self.CONFIG_MAX_THREADS] == 0:
188  logger.info("Single thread processing started")
189  # Process batch in single thread
190  self.forkBatch(logging, event)
191  return
192  else:
193  if self.configVars[self.CONFIG_MAX_THREADS] > 0:
194  # if self.configVars[self.CONFIG_MAX_THREADS] > threading.active_count():
195  if self.configVars[self.CONFIG_MAX_THREADS] > self.statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_NAME]:
196  # Process batch in separated thread
197  logger.info("Forking new thread")
198  self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_CREATED_COUNTER_NAME, 1,
200  t1 = threading.Thread(target=self.forkBatch, args=(logging, event,))
201  t1.setName(self.REAL_TIME_CRAWL_THREAD_NAME_PREFIX + \
202  str(self.statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_CREATED_COUNTER_NAME]))
203  t1.start()
204  logger.info("New thread forked")
205  # self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, threading.active_count(),
206  # self.STAT_FIELDS_OPERATION_SET)
207  return
208  else:
209  # Return overload error
210  clientResponseObj = EventObjects.ClientResponse()
211  clientResponseObj.errorCode = self.REQUEST_ERROR_THREADS_NUMBER_EXCEEDED
212  lock.acquire()
213  clientResponseObj.errorMessage = "Service overloaded, " + \
214  str(self.statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_NAME]) + " workers."
215  logger.error(clientResponseObj.errorMessage)
216  lock.release()
217  else:
218  # Return fake error
219  clientResponseObj = EventObjects.ClientResponse()
220  clientResponseObj.errorCode = self.REQUEST_ERROR_OBJECT_TYPE
221  clientResponseObj.errorMessage = "STUB fake error response"
222  logger.info(clientResponseObj.errorMessage)
223  else:
224  # Return error
225  clientResponseObj = EventObjects.ClientResponse()
226  clientResponseObj.errorCode = self.REQUEST_ERROR_OBJECT_TYPE
227  clientResponseObj.errorMessage = "Wrong requested object type " + type(event.eventObj) + ", Batch expected."
228  logger.error(clientResponseObj.errorMessage)
229 
230  # Send response with error to client
231  self.sendClientResponse(event, clientResponseObj)
232 
233  # self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, threading.active_count(),
234  # self.STAT_FIELDS_OPERATION_SET)
235 
236 
237 
238  # #forkBatch create thread, process batch request and return response to client application
239  #
240  # @param logging object instance
241  # @param client request event
242  # @return None if DRCE request execution okay or clientResponse object if some error happened
243  # def forkBatch(self, event, loggerObj):
244  def forkBatch(self, loggingObj, event):
245  try:
246  global logger # pylint: disable=W0603
247  lock.acquire()
248  logger = loggingObj.getLogger(DC_CONSTS.LOGGER_NAME)
249  self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
250  lock.release()
251  logger.info("THREAD_STARTED")
252  logger.debug("event:\n%s", Utils.varDump(event))
253  # Set start time
254  t = time.time()
255  # Set crawlerType to prevent wrong processing
256  event.eventObj.crawlerType = EventObjects.Batch.TYPE_REAL_TIME_CRAWLER
257  event.eventObj.dbMode = EventObjects.Batch.DB_MODE_R
258  if event.eventObj.maxIterations > self.configVars[self.CONFIG_BATCH_MAX_ITERATIONS]:
259  event.eventObj.maxIterations = self.configVars[self.CONFIG_BATCH_MAX_ITERATIONS]
260  # Prepare request
261  lock.acquire()
262  taskExecuteRequest = self.prepareDRCERequest(event.eventObj)
263  lock.release()
264  # Process send request
265  clientResponseObj = self.processDRCERequest(taskExecuteRequest)
266  logger.debug("ClientResponseObj object:\n" + Utils.varDump(clientResponseObj))
267  lock.acquire()
268  self.totalTime = self.totalTime + (time.time() - t)
269  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_TIME_AVG_NAME,
270  str(self.totalTime / float(1 + self.statFields[DC_CONSTS.BATCHES_CRAWL_COUNTER_TOTAL_NAME])),
272  # Send response to client
273  self.sendClientResponse(event, clientResponseObj)
274  logger.info("THREAD_FINISHED")
275  logger.debug("clientResponseObj:\n%s", Utils.varDump(clientResponseObj))
276  lock.release()
277  except Exception as err:
278  msg = "Thread exception:" + str(err)
279  lock.acquire()
280  logger.error(msg)
281  clientResponseObj = EventObjects.ClientResponse()
282  clientResponseObj.errorCode = self.REQUEST_ERROR_OBJECT_TYPE
283  clientResponseObj.errorMessage = msg
284  self.sendClientResponse(event, clientResponseObj)
285  lock.release()
286  except: # pylint: disable=W0702
287  msg = "Unknown thread exception!"
288  lock.acquire()
289  logger.error(msg)
290  clientResponseObj = EventObjects.ClientResponse()
291  clientResponseObj.errorCode = self.REQUEST_ERROR_OBJECT_TYPE
292  clientResponseObj.errorMessage = msg
293  self.sendClientResponse(event, clientResponseObj)
294  lock.release()
295  # Decrement of counter of threads
296  lock.acquire()
297  self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, 1, self.STAT_FIELDS_OPERATION_SUB)
298  lock.release()
299 
300 
301 
302  # #sendClientResponse sends response to client by request event
303  #
304  # @param client request event
305  # @return None if DRCE request execution okay or clientResponse object if some error happened
306  def sendClientResponse(self, clientRequestEvent, clientResponseObj):
307  # Prepare reply event
308  replyEvent = self.eventBuilder.build(DC_CONSTS.EVENT_TYPES.BATCH_RESPONSE, clientResponseObj)
309  # Send reply
310  self.reply(clientRequestEvent, replyEvent)
311  logger.info("Response to client sent")
312 
313 
314 
315  # #Prepare DRCE request
316  #
317  # @param eventType event type from Constants
318  # @param eventObj instance of Event object
319  # @return the TaskExecuteStruct object instance
320  def prepareDRCERequest(self, eventObj):
321  # Create DRCE task Id
322  idGenerator = IDGenerator()
323  taskId = ctypes.c_uint32(zlib.crc32(idGenerator.get_connection_uid(), int(time.time()))).value
324 
325  # Prepare DRCE request object
326  taskExecuteStruct = TaskExecuteStruct()
327  taskExecuteStruct.command = self.configVars[self.CONFIG_DRCE_CRAWLER_APP_NAME]
328  if eventObj.id == 0:
329  eventObj.id = taskId
330  if eventObj.maxExecutionTime == 0:
331  mt = self.configVars[self.CONFIG_BATCH_MAX_TIME]
332  else:
333  mt = eventObj.maxExecutionTime
334  if int(mt) > int(self.configVars[self.CONFIG_DRCE_TIMEOUT] / 1000):
335  mt = int(self.configVars[self.CONFIG_DRCE_TIMEOUT]) / 1000
336  logger.debug("Custom max DRCE task execution set: %s", str(mt))
337  taskExecuteStruct.input = pickle.dumps(eventObj)
338  taskExecuteStruct.session = Session(Session.TMODE_SYNC, 0, int(mt) * 1000)
339  taskExecuteStruct.session.shell = self.configVars[self.CONFIG_DRCE_STARTER_NAME]
340  logger.debug("DRCE taskExecuteStruct:\n" + Utils.varDump(taskExecuteStruct))
341 
342  # Create DRCE TaskExecuteRequest object
343  taskExecuteRequest = TaskExecuteRequest(taskId)
344  # Set taskExecuteRequest fields
345  taskExecuteRequest.data = taskExecuteStruct
346  # Set route as resource-usage balancing if number of items in Batch is 1
347  if len(eventObj.items) < 2:
348  if self.configVars[self.CONFIG_DRCE_REQUEST_ROUTING] == 5:
349  taskExecuteRequest.route = DC_CONSTS.DRCE_REQUEST_ROUTING_RESOURCE_USAGE
350  if self.configVars[self.CONFIG_DRCE_REQUEST_ROUTING] == 1:
351  taskExecuteRequest.route = DC_CONSTS.DRCE_REQUEST_ROUTING_ROUND_ROBIN
352  if self.configVars[self.CONFIG_DRCE_REQUEST_ROUTING] == 0:
353  taskExecuteRequest.route = DC_CONSTS.DRCE_REQUEST_ROUTING_MULTICAST
354  if self.configVars[self.CONFIG_DRCE_REQUEST_ROUTING] == 4:
355  taskExecuteRequest.route = DC_CONSTS.DRCE_REQUEST_ROUTING_RND
356 
357  logger.debug("DRCE taskExecuteRequest:\n" + Utils.varDump(taskExecuteRequest))
358 
359  return taskExecuteRequest
360 
361 
362 
363  # #Request action processor for DRCE DB cluster
364  #
365  # @param taskExecuteRequest object
366  def processDRCERequest(self, taskExecuteRequest):
367 
368  logger.info("Sending sync task id:" + str(taskExecuteRequest.id) + " to DRCE router!")
369  # Send request to DRCE Cluster router
370  response = self.sendToDRCERouter(taskExecuteRequest)
371  logger.info("Received response on sync task from DRCE router!")
372  logger.debug("Response: %s", Utils.varDump(response))
373 
374  # Create new client response object
375  clientResponse = EventObjects.ClientResponse()
376 
377  # Check response returned
378  if response is None:
379  clientResponse.errorCode = EventObjects.ClientResponse.STATUS_ERROR_NONE
380  clientResponse.errorMessage = "Response error, None returned from DRCE, possible timeout " + \
381  str(self.configVars[self.CONFIG_DRCE_TIMEOUT]) + " msec!"
382  logger.error(clientResponse.errorMessage)
383  lock.acquire()
384  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
385  lock.release()
386  else:
387  if len(response.items) == 0:
388  clientResponse.errorCode = EventObjects.ClientResponse.STATUS_ERROR_EMPTY_LIST
389  clientResponse.errorMessage = "Response error, empty list returned from DRCE, possible no one node in cluster!"
390  logger.error(clientResponse.errorMessage)
391  lock.acquire()
392  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
393  lock.release()
394  else:
395  for item in response.items:
396  # New ClientResponseItem object
397  clientResponseItem = EventObjects.ClientResponseItem(None)
398  # If some error in response item or cli application exit status
399  if item.error_code > 0 or item.exit_status > 0:
400  clientResponseItem.errorCode = clientResponseItem.STATUS_ERROR_DRCE
401  clientResponseItem.errorMessage = "error_message=" + item.error_message + \
402  ", error_code=" + str(item.error_code) + \
403  ", exit_status=" + str(item.exit_status) + \
404  ", stderror=" + str(item.stderror)
405  logger.error(clientResponseItem.errorMessage)
406  lock.acquire()
407  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
408  lock.release()
409  else:
410  # Try to restore serialized response object from dump
411  try:
412  clientResponseItem.itemObject = pickle.loads(item.stdout)
413  if clientResponseItem.itemObject is not None and isinstance(clientResponseItem.itemObject, list):
414  urlContents = len(clientResponseItem.itemObject)
415  lock.acquire()
416  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_NAME, urlContents,
418  lock.release()
419  except Exception as e:
420  clientResponseItem.errorCode = EventObjects.ClientResponseItem.STATUS_ERROR_RESTORE_OBJECT
421  clientResponseItem.errorMessage = EventObjects.ClientResponseItem.MSG_ERROR_RESTORE_OBJECT + "\n" + \
422  str(e.message) + "\nstdout=" + str(item.stdout) + \
423  ", stderror=" + str(item.stderror)
424  logger.error(clientResponseItem.errorMessage)
425  lock.acquire()
426  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
427  lock.release()
428  # Set all information fields in any case
429  clientResponseItem.id = item.id
430  clientResponseItem.host = item.host
431  clientResponseItem.port = item.port
432  clientResponseItem.node = item.node
433  clientResponseItem.time = item.time
434  # Add ClientResponseItem object
435  clientResponse.itemsList.append(clientResponseItem)
436 
437  return clientResponse
438 
439 
440 
441  # #Send to send to DRCE Router transport router connection
442  #
443  # @param messageBody body of the message
444  # @return EEResponseData object instance
445  def sendToDRCERouter(self, request):
446  lock.acquire()
447  drceManager = DRCEManager()
448  drceManager.activate_host(HostParams(self.drceHost, self.drcePort))
449  lock.release()
450 
451  logger.info("DRCE router sending with timeout=" + str(self.configVars[self.CONFIG_DRCE_TIMEOUT]) + \
452  ", host:" + str(self.drceHost) + ", port:" + str(self.drcePort))
453  # Try to execute request
454  try:
455  response = drceManager.process(request, self.configVars[self.CONFIG_DRCE_TIMEOUT], self.DRCE_REDUCER_TTL)
456  except (ConnectionTimeout, TransportInternalErr, CommandExecutorErr) as err:
457  response = None
458  logger.error("DRCE router send error : " + str(err.message))
459 
460  logger.info("DRCE router sent!")
461 
462  lock.acquire()
463  drceManager.clear_host()
464  lock.release()
465 
466  return response
467 
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def __init__(self, configParser, connectionBuilderLight=None)
def updateStatField(self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
update values of stat field - default sum
wrapper for TaskExecuteStruct
Definition: Commands.py:87
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def sendClientResponse(self, clientRequestEvent, clientResponseObj)
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
wrapper for task request
Definition: Commands.py:158
wrapper for Session fields array of execute task
Definition: Commands.py:32
UIDGenerator is used to generate unique message id.
Definition: UIDGenerator.py:14
Class hides routines of bulding connection objects.
IDGenerator is used to generate unique id for connections.
Definition: IDGenerator.py:15
Convertor which used to convert Task*Reques to json and TaskResponse from json.