HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime Class Reference
Inheritance diagram for dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime:
Collaboration diagram for dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime:

Public Member Functions

def __init__ (self, configParser, connectionBuilderLight=None)
 
def on_poll_timeout (self)
 
def onEventsHandler (self, event)
 
def forkBatch (self, loggingObj, event)
 
def sendClientResponse (self, clientRequestEvent, clientResponseObj)
 
def prepareDRCERequest (self, eventObj)
 
def processDRCERequest (self, taskExecuteRequest)
 
def sendToDRCERouter (self, request)
 
- Public Member Functions inherited from app.BaseServerManager.BaseServerManager
def __init__ (self, poller_manager=None, admin_connection=None, conectionLightBuilder=None, exceptionForward=False, dumpStatVars=True)
 constructor More...
 
def addConnection (self, name, connection)
 
def setEventHandler (self, eventType, handler)
 set event handler rewrite the current handler for eventType More...
 
def send (self, connect_name, event)
 send event More...
 
def reply (self, event, reply_event)
 wrapper for sending event in reply for event More...
 
def poll (self)
 poll function polling connections receive as multipart msg, the second argument is pickled pyobj More...
 
def process (self, event)
 process event call the event handler method that was set by user or on_unhandled_event method if not set More...
 
def run (self)
 
def is_connection_registered (self, name)
 check is a connection was registered in a instance of BaseServerManager i object More...
 
def on_poll_timeout (self)
 function will call every time when ConnectionTimeout exception arrive More...
 
def on_unhandled_event (self, event)
 function will call every time when arrive doesn't set handler for event type of event.evenType More...
 
def build_poller_list (self)
 
def clear_poller (self)
 
def onAdminState (self, event)
 onAdminState event handler process admin SHUTDOWN command More...
 
def onAdminFetchStatData (self, event)
 onAdminState event handler process admin command More...
 
def onAdminSuspend (self, event)
 onAdminState event handler process admin command More...
 
def getStatDataFields (self, fields)
 getStatDataFields returns stat data from storage More...
 
def getSystemStat (self)
 getSystemStat returns stat data for system indicators: RAMV, RAMR and CPU More...
 
def getConfigVarsFields (self, fields)
 getConfigVarsFields returns config vars from storage More...
 
def onAdminGetConfigVars (self, event)
 onAdminGetConfigVars event handler process getConfigVars admin command, fill and return config vars array from internal storage More...
 
def onAdminSetConfigVars (self, event)
 onAdminSetConfigVars event handler process setConfigVars admin command More...
 
def setConfigVars (self, setConfigVars)
 processSetConfigVars sets config vars in storage More...
 
def sendAdminReadyEvent (self)
 send ready event to notify adminInterfaceService More...
 
def createLogMsg (self, event)
 from string message from event object More...
 
def initStatFields (self, connect_name)
 add record in statFields More...
 
def updateStatField (self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
 update values of stat field - default sum More...
 
def processSpecialConfigVars (self, name, value)
 send ready event to notify adminInterfaceService More...
 
def getLogLevel (self)
 Get log level from first of existing loggers. More...
 
def setLogLevel (self, level)
 Set log level for all loggers. More...
 
def saveStatVarsDump (self)
 Save stat vars in json file. More...
 
def loadStatVarsDump (self)
 Load stat vars in json file. More...
 
def getStatVarsDumpFileName (self)
 Get stat vars file name. More...
 
def createDBIDict (self, configParser)
 

Public Attributes

 serverName
 
 drceHost
 
 drcePort
 
 drceIdGenerator
 
 drceCommandConvertor
 
 totalTime
 
- Public Attributes inherited from app.BaseServerManager.BaseServerManager
 dumpStatVars
 
 poller_manager
 
 eventBuilder
 
 exit_flag
 
 pollTimeout
 
 connections
 
 event_handlers
 
 statFields
 stat fields container More...
 
 configVars
 
 exceptionForward
 

Static Public Attributes

int DRCE_REDUCER_TTL = 3000000
 
int REQUEST_ERROR_OBJECT_TYPE = 1
 
int REQUEST_ERROR_URLS_COUNT = 2
 
int REQUEST_ERROR_THREADS_NUMBER_EXCEEDED = 3
 
int CONFIG_DRCE_REQUEST_ROUTING_DEFAULT = 1
 
int CONFIG_BATCH_MAX_ITERATIONS_DEFAULT = 2
 
string CONFIG_SERVER = "server"
 
string CONFIG_DRCE_STARTER_NAME = "DRCEStarterName"
 
string CONFIG_DRCE_HOST = "DRCEHost"
 
string CONFIG_DRCE_PORT = "DRCEPort"
 
string CONFIG_DRCE_TIMEOUT = "DRCETimeout"
 
string CONFIG_DRCE_CRAWLER_APP_NAME = "DRCECrawlerAppName"
 
string CONFIG_BATCH_MAX_TIME = "BatchMaxExecutionTime"
 
string CONFIG_BATCH_MAX_URLS = "BatchMaxURLs"
 
string CONFIG_MAX_THREADS = "MaxThreads"
 
string CONFIG_POLLING_TIMEOUT = "PollingTimeout"
 
string CONFIG_DRCE_REQUEST_ROUTING = "DRCERequestRouting"
 
string CONFIG_BATCH_MAX_ITERATIONS = "BatchMaxIterations"
 
string REAL_TIME_CRAWL_THREAD_NAME_PREFIX = 'RtCrawl_'
 
- Static Public Attributes inherited from app.BaseServerManager.BaseServerManager
string ADMIN_CONNECT_ENDPOINT = "Admin"
 
string ADMIN_CONNECT_CLIENT = "Admin"
 
int POLL_TIMEOUT_DEFAULT = 3000
 
int STAT_FIELDS_OPERATION_ADD = 0
 
int STAT_FIELDS_OPERATION_SUB = 1
 
int STAT_FIELDS_OPERATION_SET = 2
 
int STAT_FIELDS_OPERATION_INIT = 3
 
string POLL_TIMEOUT_CONFIG_VAR_NAME = "POLL_TIMEOUT"
 
string LOG_LEVEL_CONFIG_VAR_NAME = "LOG_LEVEL"
 
string STAT_DUMPS_DEFAULT_DIR = "/tmp/"
 
string STAT_DUMPS_DEFAULT_NAME = "%APP_NAME%_%CLASS_NAME%_stat_vars.dump"
 
dictionary LOGGERS_NAMES = {APP_CONSTS.LOGGER_NAME, "dc", "dtm", "root", ""}
 

Detailed Description

Definition at line 51 of file BatchTasksManagerRealTime.py.

Constructor & Destructor Documentation

◆ __init__()

def dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.__init__ (   self,
  configParser,
  connectionBuilderLight = None 
)

Definition at line 83 of file BatchTasksManagerRealTime.py.

83  def __init__(self, configParser, connectionBuilderLight=None):
84  super(BatchTasksManagerRealTime, self).__init__()
85 
86  # Instantiate the connection builder light if not set
87  if connectionBuilderLight is None:
88  connectionBuilderLight = ConnectionBuilderLight()
89 
90  # Get configuration settings
91  className = self.__class__.__name__
92  self.serverName = configParser.get(className, self.CONFIG_SERVER)
93 
94  # Create connections and raise bind or connect actions for correspondent connection type
95  serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.serverName)
96  self.setEventHandler(DC_CONSTS.EVENT_TYPES.BATCH, self.onEventsHandler)
97 
98  # Initialize DRCE API
99  self.configVars[self.CONFIG_DRCE_TIMEOUT] = configParser.getint(className, self.CONFIG_DRCE_TIMEOUT)
100  self.drceHost = configParser.get(className, self.CONFIG_DRCE_HOST)
101  self.drcePort = configParser.get(className, self.CONFIG_DRCE_PORT)
102  # self.drceManager = DRCEManager()
103  # self.drceManager.activate_host(HostParams(self.drceHost, self.drcePort))
104  self.drceIdGenerator = UIDGenerator()
105  self.drceCommandConvertor = CommandConvertor()
106 
107  # Add connections to the polling set
108  self.addConnection(self.serverName, serverConnection)
109 
110  # Max URLs per batch
111  self.configVars[self.CONFIG_BATCH_MAX_URLS] = configParser.getint(className, self.CONFIG_BATCH_MAX_URLS)
112 
113  # Set crawler task app name
114  self.configVars[self.CONFIG_DRCE_CRAWLER_APP_NAME] = configParser.get(className, self.CONFIG_DRCE_CRAWLER_APP_NAME)
115  self.configVars[self.CONFIG_BATCH_MAX_TIME] = configParser.getint(className, self.CONFIG_BATCH_MAX_TIME)
116  self.configVars[self.CONFIG_DRCE_STARTER_NAME] = configParser.get(className, self.CONFIG_DRCE_STARTER_NAME)
117  self.configVars[self.CONFIG_MAX_THREADS] = configParser.getint(className, self.CONFIG_MAX_THREADS)
118  if configParser.has_option(className, self.CONFIG_DRCE_REQUEST_ROUTING):
119  self.configVars[self.CONFIG_DRCE_REQUEST_ROUTING] = configParser.getint(className,
120  self.CONFIG_DRCE_REQUEST_ROUTING)
121  else:
122  self.configVars[self.CONFIG_DRCE_REQUEST_ROUTING] = self.CONFIG_DRCE_REQUEST_ROUTING_DEFAULT
123  if configParser.has_option(className, self.CONFIG_BATCH_MAX_ITERATIONS):
124  self.configVars[self.CONFIG_BATCH_MAX_ITERATIONS] = configParser.getint(className,
125  self.CONFIG_BATCH_MAX_ITERATIONS)
126  else:
127  self.configVars[self.CONFIG_BATCH_MAX_ITERATIONS] = self.CONFIG_BATCH_MAX_ITERATIONS_DEFAULT
128 
129  # Batches counter init in stat vars
130  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_TOTAL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
131  # Batches that fault processing counter init in stat vars
132  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
133  # Batches urls total counter init in stat vars
134  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
135  # Fault batches urls total counter init in stat vars
136  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
137  # Avg processing time init in stat vars
138  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_TIME_AVG_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
139  # Crawling batches real-time threads number init in stat vars
140  self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
141  # Crawling batches real-time threads created total number init in stat vars
142  self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_CREATED_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
143 
144  # Total time of processing sum from all request
145  self.totalTime = 0
146 
147  # Set connections poll timeout, defines period of HCE cluster monitoring cycle, msec
148  self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] = configParser.getint(className, self.CONFIG_POLLING_TIMEOUT)
149 
150 
def __init__(self)
constructor
Definition: UIDGenerator.py:19

Member Function Documentation

◆ forkBatch()

def dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.forkBatch (   self,
  loggingObj,
  event 
)

Definition at line 244 of file BatchTasksManagerRealTime.py.

244  def forkBatch(self, loggingObj, event):
245  try:
246  global logger # pylint: disable=W0603
247  lock.acquire()
248  logger = loggingObj.getLogger(DC_CONSTS.LOGGER_NAME)
249  self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
250  lock.release()
251  logger.info("THREAD_STARTED")
252  logger.debug("event:\n%s", Utils.varDump(event))
253  # Set start time
254  t = time.time()
255  # Set crawlerType to prevent wrong processing
256  event.eventObj.crawlerType = EventObjects.Batch.TYPE_REAL_TIME_CRAWLER
257  event.eventObj.dbMode = EventObjects.Batch.DB_MODE_R
258  if event.eventObj.maxIterations > self.configVars[self.CONFIG_BATCH_MAX_ITERATIONS]:
259  event.eventObj.maxIterations = self.configVars[self.CONFIG_BATCH_MAX_ITERATIONS]
260  # Prepare request
261  lock.acquire()
262  taskExecuteRequest = self.prepareDRCERequest(event.eventObj)
263  lock.release()
264  # Process send request
265  clientResponseObj = self.processDRCERequest(taskExecuteRequest)
266  logger.debug("ClientResponseObj object:\n" + Utils.varDump(clientResponseObj))
267  lock.acquire()
268  self.totalTime = self.totalTime + (time.time() - t)
269  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_TIME_AVG_NAME,
270  str(self.totalTime / float(1 + self.statFields[DC_CONSTS.BATCHES_CRAWL_COUNTER_TOTAL_NAME])),
271  self.STAT_FIELDS_OPERATION_SET)
272  # Send response to client
273  self.sendClientResponse(event, clientResponseObj)
274  logger.info("THREAD_FINISHED")
275  logger.debug("clientResponseObj:\n%s", Utils.varDump(clientResponseObj))
276  lock.release()
277  except Exception as err:
278  msg = "Thread exception:" + str(err)
279  lock.acquire()
280  logger.error(msg)
281  clientResponseObj = EventObjects.ClientResponse()
282  clientResponseObj.errorCode = self.REQUEST_ERROR_OBJECT_TYPE
283  clientResponseObj.errorMessage = msg
284  self.sendClientResponse(event, clientResponseObj)
285  lock.release()
286  except: # pylint: disable=W0702
287  msg = "Unknown thread exception!"
288  lock.acquire()
289  logger.error(msg)
290  clientResponseObj = EventObjects.ClientResponse()
291  clientResponseObj.errorCode = self.REQUEST_ERROR_OBJECT_TYPE
292  clientResponseObj.errorMessage = msg
293  self.sendClientResponse(event, clientResponseObj)
294  lock.release()
295  # Decrement of counter of threads
296  lock.acquire()
297  self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, 1, self.STAT_FIELDS_OPERATION_SUB)
298  lock.release()
299 
300 
301 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ on_poll_timeout()

def dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.on_poll_timeout (   self)

Definition at line 154 of file BatchTasksManagerRealTime.py.

154  def on_poll_timeout(self):
155  lock.acquire()
156  # self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, threading.active_count(),
157  # self.STAT_FIELDS_OPERATION_SET)
158  # Correct the number of clients to fix some crashes
159  # if self.statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_NAME] > threading.active_count():
160  # self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, threading.active_count(),
161  # self.STAT_FIELDS_OPERATION_SET)
162  # Calc threads number
163  n = 0
164  main_thread = threading.currentThread()
165  for t in threading.enumerate():
166  if t is not main_thread and t.getName().startswith(self.REAL_TIME_CRAWL_THREAD_NAME_PREFIX):
167  n += 1
168  if self.statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_NAME] > n:
169  self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, n, self.STAT_FIELDS_OPERATION_SET)
170 
171  lock.release()
172 
173 
174 
Here is the call graph for this function:

◆ onEventsHandler()

def dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.onEventsHandler (   self,
  event 
)

Definition at line 178 of file BatchTasksManagerRealTime.py.

178  def onEventsHandler(self, event):
179  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_TOTAL_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
180  if isinstance(event.eventObj, EventObjects.Batch):
181  if len(event.eventObj.items) > int(self.configVars[self.CONFIG_BATCH_MAX_URLS]):
182  clientResponseObj = EventObjects.ClientResponse()
183  clientResponseObj.errorCode = self.REQUEST_ERROR_URLS_COUNT
184  clientResponseObj.errorMessage = "Wrong requested object type " + str(len(event.eventObj.items)) + \
185  ", Batch expected."
186  else:
187  if self.configVars[self.CONFIG_MAX_THREADS] == 0:
188  logger.info("Single thread processing started")
189  # Process batch in single thread
190  self.forkBatch(logging, event)
191  return
192  else:
193  if self.configVars[self.CONFIG_MAX_THREADS] > 0:
194  # if self.configVars[self.CONFIG_MAX_THREADS] > threading.active_count():
195  if self.configVars[self.CONFIG_MAX_THREADS] > self.statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_NAME]:
196  # Process batch in separated thread
197  logger.info("Forking new thread")
198  self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_CREATED_COUNTER_NAME, 1,
199  self.STAT_FIELDS_OPERATION_ADD)
200  t1 = threading.Thread(target=self.forkBatch, args=(logging, event,))
201  t1.setName(self.REAL_TIME_CRAWL_THREAD_NAME_PREFIX + \
202  str(self.statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_CREATED_COUNTER_NAME]))
203  t1.start()
204  logger.info("New thread forked")
205  # self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, threading.active_count(),
206  # self.STAT_FIELDS_OPERATION_SET)
207  return
208  else:
209  # Return overload error
210  clientResponseObj = EventObjects.ClientResponse()
211  clientResponseObj.errorCode = self.REQUEST_ERROR_THREADS_NUMBER_EXCEEDED
212  lock.acquire()
213  clientResponseObj.errorMessage = "Service overloaded, " + \
214  str(self.statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_NAME]) + " workers."
215  logger.error(clientResponseObj.errorMessage)
216  lock.release()
217  else:
218  # Return fake error
219  clientResponseObj = EventObjects.ClientResponse()
220  clientResponseObj.errorCode = self.REQUEST_ERROR_OBJECT_TYPE
221  clientResponseObj.errorMessage = "STUB fake error response"
222  logger.info(clientResponseObj.errorMessage)
223  else:
224  # Return error
225  clientResponseObj = EventObjects.ClientResponse()
226  clientResponseObj.errorCode = self.REQUEST_ERROR_OBJECT_TYPE
227  clientResponseObj.errorMessage = "Wrong requested object type " + type(event.eventObj) + ", Batch expected."
228  logger.error(clientResponseObj.errorMessage)
229 
230  # Send response with error to client
231  self.sendClientResponse(event, clientResponseObj)
232 
233  # self.updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_NAME, threading.active_count(),
234  # self.STAT_FIELDS_OPERATION_SET)
235 
236 
237 
Here is the call graph for this function:

◆ prepareDRCERequest()

def dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.prepareDRCERequest (   self,
  eventObj 
)

Definition at line 320 of file BatchTasksManagerRealTime.py.

320  def prepareDRCERequest(self, eventObj):
321  # Create DRCE task Id
322  idGenerator = IDGenerator()
323  taskId = ctypes.c_uint32(zlib.crc32(idGenerator.get_connection_uid(), int(time.time()))).value
324 
325  # Prepare DRCE request object
326  taskExecuteStruct = TaskExecuteStruct()
327  taskExecuteStruct.command = self.configVars[self.CONFIG_DRCE_CRAWLER_APP_NAME]
328  if eventObj.id == 0:
329  eventObj.id = taskId
330  if eventObj.maxExecutionTime == 0:
331  mt = self.configVars[self.CONFIG_BATCH_MAX_TIME]
332  else:
333  mt = eventObj.maxExecutionTime
334  if int(mt) > int(self.configVars[self.CONFIG_DRCE_TIMEOUT] / 1000):
335  mt = int(self.configVars[self.CONFIG_DRCE_TIMEOUT]) / 1000
336  logger.debug("Custom max DRCE task execution set: %s", str(mt))
337  taskExecuteStruct.input = pickle.dumps(eventObj)
338  taskExecuteStruct.session = Session(Session.TMODE_SYNC, 0, int(mt) * 1000)
339  taskExecuteStruct.session.shell = self.configVars[self.CONFIG_DRCE_STARTER_NAME]
340  logger.debug("DRCE taskExecuteStruct:\n" + Utils.varDump(taskExecuteStruct))
341 
342  # Create DRCE TaskExecuteRequest object
343  taskExecuteRequest = TaskExecuteRequest(taskId)
344  # Set taskExecuteRequest fields
345  taskExecuteRequest.data = taskExecuteStruct
346  # Set route as resource-usage balancing if number of items in Batch is 1
347  if len(eventObj.items) < 2:
348  if self.configVars[self.CONFIG_DRCE_REQUEST_ROUTING] == 5:
349  taskExecuteRequest.route = DC_CONSTS.DRCE_REQUEST_ROUTING_RESOURCE_USAGE
350  if self.configVars[self.CONFIG_DRCE_REQUEST_ROUTING] == 1:
351  taskExecuteRequest.route = DC_CONSTS.DRCE_REQUEST_ROUTING_ROUND_ROBIN
352  if self.configVars[self.CONFIG_DRCE_REQUEST_ROUTING] == 0:
353  taskExecuteRequest.route = DC_CONSTS.DRCE_REQUEST_ROUTING_MULTICAST
354  if self.configVars[self.CONFIG_DRCE_REQUEST_ROUTING] == 4:
355  taskExecuteRequest.route = DC_CONSTS.DRCE_REQUEST_ROUTING_RND
356 
357  logger.debug("DRCE taskExecuteRequest:\n" + Utils.varDump(taskExecuteRequest))
358 
359  return taskExecuteRequest
360 
361 
362 
Here is the caller graph for this function:

◆ processDRCERequest()

def dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.processDRCERequest (   self,
  taskExecuteRequest 
)

Definition at line 366 of file BatchTasksManagerRealTime.py.

366  def processDRCERequest(self, taskExecuteRequest):
367 
368  logger.info("Sending sync task id:" + str(taskExecuteRequest.id) + " to DRCE router!")
369  # Send request to DRCE Cluster router
370  response = self.sendToDRCERouter(taskExecuteRequest)
371  logger.info("Received response on sync task from DRCE router!")
372  logger.debug("Response: %s", Utils.varDump(response))
373 
374  # Create new client response object
375  clientResponse = EventObjects.ClientResponse()
376 
377  # Check response returned
378  if response is None:
379  clientResponse.errorCode = EventObjects.ClientResponse.STATUS_ERROR_NONE
380  clientResponse.errorMessage = "Response error, None returned from DRCE, possible timeout " + \
381  str(self.configVars[self.CONFIG_DRCE_TIMEOUT]) + " msec!"
382  logger.error(clientResponse.errorMessage)
383  lock.acquire()
384  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
385  lock.release()
386  else:
387  if len(response.items) == 0:
388  clientResponse.errorCode = EventObjects.ClientResponse.STATUS_ERROR_EMPTY_LIST
389  clientResponse.errorMessage = "Response error, empty list returned from DRCE, possible no one node in cluster!"
390  logger.error(clientResponse.errorMessage)
391  lock.acquire()
392  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
393  lock.release()
394  else:
395  for item in response.items:
396  # New ClientResponseItem object
397  clientResponseItem = EventObjects.ClientResponseItem(None)
398  # If some error in response item or cli application exit status
399  if item.error_code > 0 or item.exit_status > 0:
400  clientResponseItem.errorCode = clientResponseItem.STATUS_ERROR_DRCE
401  clientResponseItem.errorMessage = "error_message=" + item.error_message + \
402  ", error_code=" + str(item.error_code) + \
403  ", exit_status=" + str(item.exit_status) + \
404  ", stderror=" + str(item.stderror)
405  logger.error(clientResponseItem.errorMessage)
406  lock.acquire()
407  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
408  lock.release()
409  else:
410  # Try to restore serialized response object from dump
411  try:
412  clientResponseItem.itemObject = pickle.loads(item.stdout)
413  if clientResponseItem.itemObject is not None and isinstance(clientResponseItem.itemObject, list):
414  urlContents = len(clientResponseItem.itemObject)
415  lock.acquire()
416  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_NAME, urlContents,
417  self.STAT_FIELDS_OPERATION_ADD)
418  lock.release()
419  except Exception as e:
420  clientResponseItem.errorCode = EventObjects.ClientResponseItem.STATUS_ERROR_RESTORE_OBJECT
421  clientResponseItem.errorMessage = EventObjects.ClientResponseItem.MSG_ERROR_RESTORE_OBJECT + "\n" + \
422  str(e.message) + "\nstdout=" + str(item.stdout) + \
423  ", stderror=" + str(item.stderror)
424  logger.error(clientResponseItem.errorMessage)
425  lock.acquire()
426  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
427  lock.release()
428  # Set all information fields in any case
429  clientResponseItem.id = item.id
430  clientResponseItem.host = item.host
431  clientResponseItem.port = item.port
432  clientResponseItem.node = item.node
433  clientResponseItem.time = item.time
434  # Add ClientResponseItem object
435  clientResponse.itemsList.append(clientResponseItem)
436 
437  return clientResponse
438 
439 
440 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ sendClientResponse()

def dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.sendClientResponse (   self,
  clientRequestEvent,
  clientResponseObj 
)

Definition at line 306 of file BatchTasksManagerRealTime.py.

306  def sendClientResponse(self, clientRequestEvent, clientResponseObj):
307  # Prepare reply event
308  replyEvent = self.eventBuilder.build(DC_CONSTS.EVENT_TYPES.BATCH_RESPONSE, clientResponseObj)
309  # Send reply
310  self.reply(clientRequestEvent, replyEvent)
311  logger.info("Response to client sent")
312 
313 
314 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ sendToDRCERouter()

def dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.sendToDRCERouter (   self,
  request 
)

Definition at line 445 of file BatchTasksManagerRealTime.py.

445  def sendToDRCERouter(self, request):
446  lock.acquire()
447  drceManager = DRCEManager()
448  drceManager.activate_host(HostParams(self.drceHost, self.drcePort))
449  lock.release()
450 
451  logger.info("DRCE router sending with timeout=" + str(self.configVars[self.CONFIG_DRCE_TIMEOUT]) + \
452  ", host:" + str(self.drceHost) + ", port:" + str(self.drcePort))
453  # Try to execute request
454  try:
455  response = drceManager.process(request, self.configVars[self.CONFIG_DRCE_TIMEOUT], self.DRCE_REDUCER_TTL)
456  except (ConnectionTimeout, TransportInternalErr, CommandExecutorErr) as err:
457  response = None
458  logger.error("DRCE router send error : " + str(err.message))
459 
460  logger.info("DRCE router sent!")
461 
462  lock.acquire()
463  drceManager.clear_host()
464  lock.release()
465 
466  return response
467 
Here is the caller graph for this function:

Member Data Documentation

◆ CONFIG_BATCH_MAX_ITERATIONS

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_BATCH_MAX_ITERATIONS = "BatchMaxIterations"
static

Definition at line 73 of file BatchTasksManagerRealTime.py.

◆ CONFIG_BATCH_MAX_ITERATIONS_DEFAULT

int dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_BATCH_MAX_ITERATIONS_DEFAULT = 2
static

Definition at line 58 of file BatchTasksManagerRealTime.py.

◆ CONFIG_BATCH_MAX_TIME

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_BATCH_MAX_TIME = "BatchMaxExecutionTime"
static

Definition at line 68 of file BatchTasksManagerRealTime.py.

◆ CONFIG_BATCH_MAX_URLS

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_BATCH_MAX_URLS = "BatchMaxURLs"
static

Definition at line 69 of file BatchTasksManagerRealTime.py.

◆ CONFIG_DRCE_CRAWLER_APP_NAME

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_DRCE_CRAWLER_APP_NAME = "DRCECrawlerAppName"
static

Definition at line 67 of file BatchTasksManagerRealTime.py.

◆ CONFIG_DRCE_HOST

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_DRCE_HOST = "DRCEHost"
static

Definition at line 64 of file BatchTasksManagerRealTime.py.

◆ CONFIG_DRCE_PORT

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_DRCE_PORT = "DRCEPort"
static

Definition at line 65 of file BatchTasksManagerRealTime.py.

◆ CONFIG_DRCE_REQUEST_ROUTING

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_DRCE_REQUEST_ROUTING = "DRCERequestRouting"
static

Definition at line 72 of file BatchTasksManagerRealTime.py.

◆ CONFIG_DRCE_REQUEST_ROUTING_DEFAULT

int dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_DRCE_REQUEST_ROUTING_DEFAULT = 1
static

Definition at line 57 of file BatchTasksManagerRealTime.py.

◆ CONFIG_DRCE_STARTER_NAME

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_DRCE_STARTER_NAME = "DRCEStarterName"
static

Definition at line 63 of file BatchTasksManagerRealTime.py.

◆ CONFIG_DRCE_TIMEOUT

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_DRCE_TIMEOUT = "DRCETimeout"
static

Definition at line 66 of file BatchTasksManagerRealTime.py.

◆ CONFIG_MAX_THREADS

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_MAX_THREADS = "MaxThreads"
static

Definition at line 70 of file BatchTasksManagerRealTime.py.

◆ CONFIG_POLLING_TIMEOUT

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_POLLING_TIMEOUT = "PollingTimeout"
static

Definition at line 71 of file BatchTasksManagerRealTime.py.

◆ CONFIG_SERVER

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.CONFIG_SERVER = "server"
static

Definition at line 61 of file BatchTasksManagerRealTime.py.

◆ DRCE_REDUCER_TTL

int dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.DRCE_REDUCER_TTL = 3000000
static

Definition at line 53 of file BatchTasksManagerRealTime.py.

◆ drceCommandConvertor

dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.drceCommandConvertor

Definition at line 105 of file BatchTasksManagerRealTime.py.

◆ drceHost

dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.drceHost

Definition at line 100 of file BatchTasksManagerRealTime.py.

◆ drceIdGenerator

dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.drceIdGenerator

Definition at line 104 of file BatchTasksManagerRealTime.py.

◆ drcePort

dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.drcePort

Definition at line 101 of file BatchTasksManagerRealTime.py.

◆ REAL_TIME_CRAWL_THREAD_NAME_PREFIX

string dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.REAL_TIME_CRAWL_THREAD_NAME_PREFIX = 'RtCrawl_'
static

Definition at line 75 of file BatchTasksManagerRealTime.py.

◆ REQUEST_ERROR_OBJECT_TYPE

int dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.REQUEST_ERROR_OBJECT_TYPE = 1
static

Definition at line 54 of file BatchTasksManagerRealTime.py.

◆ REQUEST_ERROR_THREADS_NUMBER_EXCEEDED

int dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.REQUEST_ERROR_THREADS_NUMBER_EXCEEDED = 3
static

Definition at line 56 of file BatchTasksManagerRealTime.py.

◆ REQUEST_ERROR_URLS_COUNT

int dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.REQUEST_ERROR_URLS_COUNT = 2
static

Definition at line 55 of file BatchTasksManagerRealTime.py.

◆ serverName

dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.serverName

Definition at line 92 of file BatchTasksManagerRealTime.py.

◆ totalTime

dc.BatchTasksManagerRealTime.BatchTasksManagerRealTime.totalTime

Definition at line 145 of file BatchTasksManagerRealTime.py.


The documentation for this class was generated from the following file: