HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
dc.BatchTasksManagerProcess.BatchTasksManagerProcess Class Reference
Inheritance diagram for dc.BatchTasksManagerProcess.BatchTasksManagerProcess:
Collaboration diagram for dc.BatchTasksManagerProcess.BatchTasksManagerProcess:

Public Member Functions

def __init__ (self, configParser, connectionBuilderLight=None)
 
def on_poll_timeout (self)
 
def setProcessBatch (self)
 
def sendBatchTaskToDTM (self, batch)
 
def getDTMTaskState (self, taskId)
 
def processDTMTasksQueue (self)
 
def isDTMTaskDead (self, state)
 
def processFinishedBatch (self, taskBatch)
 
def dtmdRequestExecute (self, requestEvent, timeout, maxTries=100)
 
def sendURLUpdate (self, batchItemsList, batchId, batchState)
 
def sendURLDelete (self, batchItemsList, batchId)
 
def onURLUpdateResponse (self, event)
 
def onURLDeleteResponse (self, event)
 
- Public Member Functions inherited from app.BaseServerManager.BaseServerManager
def __init__ (self, poller_manager=None, admin_connection=None, conectionLightBuilder=None, exceptionForward=False, dumpStatVars=True)
 constructor More...
 
def addConnection (self, name, connection)
 
def setEventHandler (self, eventType, handler)
 set event handler rewrite the current handler for eventType More...
 
def send (self, connect_name, event)
 send event More...
 
def reply (self, event, reply_event)
 wrapper for sending event in reply for event More...
 
def poll (self)
 poll function polling connections receive as multipart msg, the second argument is pickled pyobj More...
 
def process (self, event)
 process event call the event handler method that was set by user or on_unhandled_event method if not set More...
 
def run (self)
 
def is_connection_registered (self, name)
 check is a connection was registered in a instance of BaseServerManager i object More...
 
def on_poll_timeout (self)
 function will call every time when ConnectionTimeout exception arrive More...
 
def on_unhandled_event (self, event)
 function will call every time when arrive doesn't set handler for event type of event.evenType More...
 
def build_poller_list (self)
 
def clear_poller (self)
 
def onAdminState (self, event)
 onAdminState event handler process admin SHUTDOWN command More...
 
def onAdminFetchStatData (self, event)
 onAdminState event handler process admin command More...
 
def onAdminSuspend (self, event)
 onAdminState event handler process admin command More...
 
def getStatDataFields (self, fields)
 getStatDataFields returns stat data from storage More...
 
def getSystemStat (self)
 getSystemStat returns stat data for system indicators: RAMV, RAMR and CPU More...
 
def getConfigVarsFields (self, fields)
 getConfigVarsFields returns config vars from storage More...
 
def onAdminGetConfigVars (self, event)
 onAdminGetConfigVars event handler process getConfigVars admin command, fill and return config vars array from internal storage More...
 
def onAdminSetConfigVars (self, event)
 onAdminSetConfigVars event handler process setConfigVars admin command More...
 
def setConfigVars (self, setConfigVars)
 processSetConfigVars sets config vars in storage More...
 
def sendAdminReadyEvent (self)
 send ready event to notify adminInterfaceService More...
 
def createLogMsg (self, event)
 from string message from event object More...
 
def initStatFields (self, connect_name)
 add record in statFields More...
 
def updateStatField (self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
 update values of stat field - default sum More...
 
def processSpecialConfigVars (self, name, value)
 send ready event to notify adminInterfaceService More...
 
def getLogLevel (self)
 Get log level from first of existing loggers. More...
 
def setLogLevel (self, level)
 Set log level for all loggers. More...
 
def saveStatVarsDump (self)
 Save stat vars in json file. More...
 
def loadStatVarsDump (self)
 Load stat vars in json file. More...
 
def getStatVarsDumpFileName (self)
 Get stat vars file name. More...
 
def createDBIDict (self, configParser)
 

Public Attributes

 serverName
 
 clientSitesManagerName
 
 dtmdConnection
 
 dtmTasksQueue
 
 processProcessingLastTs
 
 processBatchQueuelLastTs
 
- Public Attributes inherited from app.BaseServerManager.BaseServerManager
 dumpStatVars
 
 poller_manager
 
 eventBuilder
 
 exit_flag
 
 pollTimeout
 
 connections
 
 event_handlers
 
 statFields
 stat fields container More...
 
 configVars
 
 exceptionForward
 

Static Public Attributes

int DTM_TASK_CHECK_STATE_METHOD_STATUS = 0
 
int DTM_TASK_CHECK_STATE_METHOD_STATE = 1
 
string CONFIG_SERVER = "server"
 
string CONFIG_DTMD_HOST = "DTMDHost"
 
string CONFIG_DTMD_PORT = "DTMDPort"
 
string CONFIG_DTMD_TIMEOUT = "DTMDTimeout"
 
string CONFIG_POLLING_TIMEOUT = "PollingTimeout"
 
string CONFIG_SITES_MANAGER_CLIENT = "clientSitesManager"
 
string CONFIG_DRCE_PROCESSOR_APP_NAME = "DRCEProcessorAppName"
 
string CONFIG_DRCE_DB_APP_NAME = "DRCEDBAppName"
 
string CONFIG_PROCESS_PERIOD = "ProcessingPeriod"
 
string CONFIG_PROCESS_MODE = "ProcessingMode"
 
string CONFIG_BATCH_DEFAULT_MAX_TIME = "BatchDefaultMaxExecutionTime"
 
string CONFIG_BATCH_MAX_URLS = "BatchDefaultMaxURLs"
 
string CONFIG_BATCH_ORDER_BY_URLS = "BatchDefaultOrderByURLs"
 
string CONFIG_BATCH_MAX_TASKS = "BatchDefaultMaxTasks"
 
string CONFIG_BATCH_QUEUE_PERIOD = "BatchQueueProcessingPeriod"
 
string CONFIG_BATCH_QUEUE_TASK_TTL = "BatchQueueTaskTTL"
 
string CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD = "BatchQueueTaskCheckStateMethod"
 
string CONFIG_BATCH_DEFAULT_STARTER = "BatchTask_STARTER"
 
string CONFIG_BATCH_WHERE_URLS = "BatchDefaultWhereURLs"
 
string CONFIG_BATCH_WHERE_SITES = "BatchDefaultWhereSites"
 
string CONFIG_BATCH_MAX_TIME = "BatchMaxExecutionTime"
 
string CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS = "RemoveUnprocessedBatchItems"
 
string CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX = "BatchTask_IO_WAIT_MAX"
 
string CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX = "BatchTask_CPU_LOAD_MAX"
 
string CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN = "BatchTask_RAM_FREE_MIN"
 
string CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY = "BatchTask_RDELAY"
 
string CONFIG_BATCH_DEFAULT_STRATEGY_RETRY = "BatchTask_RETRY"
 
string CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL = "BatchTask_autocleanup_TTL"
 
string CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE = "BatchTask_autocleanup_DeleteType"
 
string CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES = "BatchTask_autocleanup_DeleteRetries"
 
string CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE = "BatchTask_autocleanup_State"
 
string CONFIG_TASK_DTM_NAME_PROCESS = "BatchTaskDTMNameProcess"
 
string CONFIG_TASK_DTM_TYPE_PROCESS = "BatchTaskDTMTypeProcess"
 
- Static Public Attributes inherited from app.BaseServerManager.BaseServerManager
string ADMIN_CONNECT_ENDPOINT = "Admin"
 
string ADMIN_CONNECT_CLIENT = "Admin"
 
int POLL_TIMEOUT_DEFAULT = 3000
 
int STAT_FIELDS_OPERATION_ADD = 0
 
int STAT_FIELDS_OPERATION_SUB = 1
 
int STAT_FIELDS_OPERATION_SET = 2
 
int STAT_FIELDS_OPERATION_INIT = 3
 
string POLL_TIMEOUT_CONFIG_VAR_NAME = "POLL_TIMEOUT"
 
string LOG_LEVEL_CONFIG_VAR_NAME = "LOG_LEVEL"
 
string STAT_DUMPS_DEFAULT_DIR = "/tmp/"
 
string STAT_DUMPS_DEFAULT_NAME = "%APP_NAME%_%CLASS_NAME%_stat_vars.dump"
 
dictionary LOGGERS_NAMES = {APP_CONSTS.LOGGER_NAME, "dc", "dtm", "root", ""}
 

Detailed Description

Definition at line 41 of file BatchTasksManagerProcess.py.

Constructor & Destructor Documentation

◆ __init__()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.__init__ (   self,
  configParser,
  connectionBuilderLight = None 
)

Definition at line 95 of file BatchTasksManagerProcess.py.

95  def __init__(self, configParser, connectionBuilderLight=None):
96  super(BatchTasksManagerProcess, self).__init__()
97 
98  # Instantiate the connection builder light if not set
99  if connectionBuilderLight is None:
100  connectionBuilderLight = ConnectionBuilderLight()
101 
102  # Batches counter init in stat vars
103  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_TOTAL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
104  # Batches in queue counter init in stat vars
105  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_QUEUE_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
106  # Batches that fault processing counter init in stat vars
107  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
108  # Batches that not empty counter init in stat vars
109  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_FILLED_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
110  # Batches urls total counter init in stat vars
111  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_URLS_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
112  # Fault batches urls total counter init in stat vars
113  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_URLS_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
114  # Processing delete task requests fault counter name for stat variables
115  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_DELETE_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
116  # Processing check task requests fault counter name for stat variables
117  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_CHECK_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
118  # Processing batches fault TTL counter name for stat variables
119  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_FAULT_TTL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
120  # Processing batches cancelled counter name for stat variables
121  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_CANCELLED_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
122 
123  # Get configuration settings
124  className = self.__class__.__name__
125  self.serverName = configParser.get(className, self.CONFIG_SERVER)
126  self.clientSitesManagerName = configParser.get(className, self.CONFIG_SITES_MANAGER_CLIENT)
127  # Configuration settings for DTMD server interaction
128  self.configVars[self.CONFIG_DTMD_HOST] = configParser.get(className, self.CONFIG_DTMD_HOST)
129  self.configVars[self.CONFIG_DTMD_PORT] = configParser.get(className, self.CONFIG_DTMD_PORT)
130  self.configVars[self.CONFIG_DTMD_TIMEOUT] = configParser.getint(className, self.CONFIG_DTMD_TIMEOUT)
131 
132  # Max URLs per batch
133  self.configVars[self.CONFIG_BATCH_MAX_URLS] = configParser.getint(className, self.CONFIG_BATCH_MAX_URLS)
134  # Set connections poll timeout, defines period of HCE cluster monitoring cycle, msec
135  self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] = configParser.getint(className, self.CONFIG_POLLING_TIMEOUT)
136  # Set crawler task app name
137  self.configVars[self.CONFIG_DRCE_PROCESSOR_APP_NAME] = \
138  configParser.get(className, self.CONFIG_DRCE_PROCESSOR_APP_NAME)
139  self.configVars[self.CONFIG_BATCH_DEFAULT_MAX_TIME] = \
140  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_MAX_TIME)
141 
142  # #TODO experemental
143  # Max execution time for batch
144  self.configVars[self.CONFIG_BATCH_MAX_TIME] = configParser.getint(className, self.CONFIG_BATCH_MAX_TIME)
145  # Remove unprocessed items for batch
146  self.configVars[self.CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS] = \
147  configParser.getint(className, self.CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS)
148 
149  # Create connections and raise bind or connect actions for correspondent connection type
150  serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.serverName)
151  sitesManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT, self.clientSitesManagerName)
152 
153  # Init the DTMD connection
154  self.dtmdConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
155  self.configVars[self.CONFIG_DTMD_HOST] + ":" + \
156  self.configVars[self.CONFIG_DTMD_PORT],
157  TRANSPORT_CONSTS.TCP_TYPE)
158 
159  # Add connections to the polling set
160  self.addConnection(self.serverName, serverConnection)
161  self.addConnection(self.clientSitesManagerName, sitesManagerConnection)
162 
163  # Set event handler for URL_UPDATE_RESPONSE event
164  self.setEventHandler(DC_CONSTS.EVENT_TYPES.URL_UPDATE_RESPONSE, self.onURLUpdateResponse)
165  # Set event handler for URL_DELETE_RESPONSE event
166  self.setEventHandler(DC_CONSTS.EVENT_TYPES.URL_DELETE_RESPONSE, self.onURLDeleteResponse)
167 
168  # Initialize the DTM tasks queue, the key is taskId, the value is the Batch object
169  self.dtmTasksQueue = {}
170 
171  # Processing init
172  self.configVars[self.CONFIG_DRCE_DB_APP_NAME] = configParser.get(className, self.CONFIG_DRCE_DB_APP_NAME)
173  self.configVars[self.CONFIG_PROCESS_PERIOD] = configParser.getint(className, self.CONFIG_PROCESS_PERIOD)
174  self.processProcessingLastTs = time.time()
175 
176  # The Batch default order by criterion to fetch URLs
177  self.configVars[self.CONFIG_BATCH_ORDER_BY_URLS] = configParser.get(className, self.CONFIG_BATCH_ORDER_BY_URLS)
178  # The Batch max tasks in batch queue, if limit reached new batch tasks will not be started; zero means unlimited
179  self.configVars[self.CONFIG_BATCH_MAX_TASKS] = configParser.getint(className, self.CONFIG_BATCH_MAX_TASKS)
180  # The Batch queue processing init
181  self.configVars[self.CONFIG_BATCH_QUEUE_PERIOD] = configParser.getint(className, self.CONFIG_BATCH_QUEUE_PERIOD)
182  self.processBatchQueuelLastTs = time.time()
183  # The Batch queue task TTL, sec
184  self.configVars[self.CONFIG_BATCH_QUEUE_TASK_TTL] = configParser.getint(className, self.CONFIG_BATCH_QUEUE_TASK_TTL)
185  # The Batch queue tasks state check method, see ini file comments
186  self.configVars[self.CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD] = \
187  configParser.getint(className, self.CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD)
188  # The Batch DRCE task starter name
189  self.configVars[self.CONFIG_BATCH_DEFAULT_STARTER] = configParser.get(className, self.CONFIG_BATCH_DEFAULT_STARTER)
190  # The Batch tasks's strategy configuration parameters for DTM service load
191  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX] = \
192  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX)
193  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX] = \
194  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX)
195  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN] = \
196  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN)
197  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY] = \
198  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY)
199  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_RETRY] = \
200  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_RETRY)
201  # The Batch DRCE tasks auto cleanup fields
202  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL] = \
203  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL)
204  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE] = \
205  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE)
206  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES] = \
207  configParser.get(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES)
208  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE] = \
209  configParser.get(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE)
210  # The Batch default where criterion to fetch URLs
211  self.configVars[self.CONFIG_BATCH_WHERE_URLS] = configParser.get(className, self.CONFIG_BATCH_WHERE_URLS)
212  # The Batch default where criterion to fetch Sites
213  self.configVars[self.CONFIG_BATCH_WHERE_SITES] = configParser.get(className, self.CONFIG_BATCH_WHERE_SITES)
214 
215  # Init the config processing mode and runtime values
216  self.configVars[self.CONFIG_PROCESS_MODE] = configParser.getint(className, self.CONFIG_PROCESS_MODE)
217 
218  # Processing task DTM name
219  self.configVars[self.CONFIG_TASK_DTM_NAME_PROCESS] = configParser.get(className, self.CONFIG_TASK_DTM_NAME_PROCESS)
220  # Processing task DTM type
221  self.configVars[self.CONFIG_TASK_DTM_TYPE_PROCESS] = configParser.getint(className,
222  self.CONFIG_TASK_DTM_TYPE_PROCESS)
223 
224 
225 
def __init__(self)
constructor
Definition: UIDGenerator.py:19
Here is the call graph for this function:

Member Function Documentation

◆ dtmdRequestExecute()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.dtmdRequestExecute (   self,
  requestEvent,
  timeout,
  maxTries = 100 
)

Definition at line 545 of file BatchTasksManagerProcess.py.

545  def dtmdRequestExecute(self, requestEvent, timeout, maxTries=100):
546  ret = None
547  if maxTries < 0:
548  maxTries = 0
549 
550  try:
551  # Send DTMD request
552  self.dtmdConnection.send(requestEvent)
553 
554  for i in range(maxTries + 1):
555  # Poll DTMD connection
556  if self.dtmdConnection.poll(int(timeout)) == 0:
557  logger.error("DTMD request timeout reached " + str(timeout) + "!")
558  break
559  else:
560  # Recv DTMD response
561  retEvent = self.dtmdConnection.recv()
562  if retEvent != None:
563  # Get response object
564  if type(retEvent.eventObj) == type(dtm.EventObjects.EEResponseData(0)) or\
565  type(retEvent.eventObj) == type(dtm.EventObjects.GeneralResponse()) or\
566  isinstance(retEvent.eventObj, list):
567  if retEvent.uid == requestEvent.uid:
568  ret = retEvent.eventObj
569  break
570  else:
571  logger.error("DTMD returned wrong object uid: " + str(retEvent.uid) + " but " + \
572  str(requestEvent.uid) + " expected, iteration " + str(i) + "!")
573  else:
574  logger.error("DTMD returned wrong object type: " + str(type(retEvent.eventObj)) + "!")
575  else:
576  logger.error("DTMD returned None event!")
577  except Exception, e:
578  logger.error("DTMD request execution exception: " + e.message + "!")
579 
580  logger.debug("The DTMD request finished!")
581 
582  return ret
583 
584 
585 
GeneralResponse event object, represents general state response for multipurpose usage.
EEResponseData event object, store task results data, returned from EE.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ getDTMTaskState()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.getDTMTaskState (   self,
  taskId 
)

Definition at line 388 of file BatchTasksManagerProcess.py.

388  def getDTMTaskState(self, taskId):
389  taskState = None
390 
391  if self.configVars[self.CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD] == self.DTM_TASK_CHECK_STATE_METHOD_STATUS:
392  # Check the task status on DRCE EE hce-node
393  logger.debug("Check state of taskId=" + str(taskId))
394  checkTaskStateObj = dtm.EventObjects.CheckTaskState(taskId)
395  checkStateEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE, checkTaskStateObj)
396  eeResponseData = self.dtmdRequestExecute(checkStateEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
397  logger.debug("DTM CheckTaskState request finished, taskId=" + str(taskId))
398  if eeResponseData is not None and type(eeResponseData) == type(dtm.EventObjects.EEResponseData(0)):
399  taskState = eeResponseData.state
400  else:
401  # Get task status on DTM service
402  logger.debug("Get status of taskId=" + str(taskId))
403  getTasksStatusObj = dtm.EventObjects.GetTasksStatus([taskId])
404  getTasksStatusEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.GET_TASK_STATUS, getTasksStatusObj)
405  listTaskManagerFields = self.dtmdRequestExecute(getTasksStatusEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
406  logger.debug("DTM getTasksStatus request finished, taskId=" + str(taskId))
407  if listTaskManagerFields is not None and isinstance(listTaskManagerFields, list):
408  if len(listTaskManagerFields) > 0:
409  taskState = listTaskManagerFields[0].fields["state"]
410  else:
411  # Set TASK_STATE_FINISHED state to push task to delete from queue
412  taskState = dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED
413  else:
414  logger.error("DTM getTasksStatus taskId=" + str(taskId) + " returned wrong data:\n" + \
415  Utils.varDump(listTaskManagerFields))
416 
417  return taskState
418 
419 
420 
CheckTaskState event object, for check task status inside EE.
GetTasksStatus event object, for check task status operation.
EEResponseData event object, store task results data, returned from EE.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ isDTMTaskDead()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.isDTMTaskDead (   self,
  state 
)

Definition at line 509 of file BatchTasksManagerProcess.py.

509  def isDTMTaskDead(self, state):
510  ret = False
511  if state == dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED or\
512  state == dtm.EventObjects.EEResponseData.TASK_STATE_CRASHED or\
513  state == dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED or\
514  state == dtm.EventObjects.EEResponseData.TASK_STATE_UNDEFINED or\
515  state == dtm.EventObjects.EEResponseData.TASK_STATE_SET_ERROR or\
516  state == dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL or\
517  state == dtm.EventObjects.EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED:
518  ret = True
519 
520  return ret
521 
522 
523 
Here is the caller graph for this function:

◆ on_poll_timeout()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.on_poll_timeout (   self)

Definition at line 229 of file BatchTasksManagerProcess.py.

229  def on_poll_timeout(self):
230  logger.debug("Periodic iteration started.")
231  try:
232  # Process the Processing batch
233  if self.configVars[self.CONFIG_PROCESS_PERIOD] > 0 and\
234  time.time() - self.processProcessingLastTs > self.configVars[self.CONFIG_PROCESS_PERIOD]:
235  self.processProcessingLastTs = time.time()
236  if self.configVars[self.CONFIG_PROCESS_MODE] == 1:
237  logger.debug("Processing batch cycle iteration started")
238  if self.configVars[self.CONFIG_BATCH_MAX_TASKS] > len(self.dtmTasksQueue):
239  if self.setProcessBatch():
240  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_TOTAL_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
241  else:
242  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_CANCELLED_NAME, 1,
243  self.STAT_FIELDS_OPERATION_ADD)
244  logger.debug("Max processing batch tasks %s in queue reached, new batch is not created!",
245  str(len(self.dtmTasksQueue)))
246  else:
247  logger.debug("Processing batch disabled!")
248 
249  # Process the DRCE Batch tasks queue
250  if self.configVars[self.CONFIG_BATCH_QUEUE_PERIOD] > 0 and\
251  time.time() - self.processBatchQueuelLastTs > self.configVars[self.CONFIG_BATCH_QUEUE_PERIOD]:
252  self.processBatchQueuelLastTs = time.time()
253  logger.debug("Process DTM tasks queue!")
254  # Process the DTM tasks queue
255  self.processDTMTasksQueue()
256 
257  except IOError as e:
258  del e
259  except Exception as err:
260  ExceptionLog.handler(logger, err, "Exception:")
261 
262  logger.debug("Periodic iteration finished.")
263 
264 
265 
Here is the call graph for this function:

◆ onURLDeleteResponse()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.onURLDeleteResponse (   self,
  event 
)

Definition at line 673 of file BatchTasksManagerProcess.py.

673  def onURLDeleteResponse(self, event):
674  try:
675  logger.debug("Reply received on URL delete.")
676  clientResponse = event.eventObj
677  if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
678  if len(clientResponse.itemsList) > 0:
679  for clientResponseItem in clientResponse.itemsList:
680  if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
681  logger.error("URLDelete response error: " + str(clientResponseItem.errorCode) + " : " + \
682  clientResponseItem.errorMessage + ", host:" + clientResponseItem.host + ", port:" + \
683  clientResponseItem.port + ", node:" + clientResponseItem.node + "!")
684  else:
685  logger.error("URLDelete response empty list!")
686  else:
687  logger.error("URLDelete response error:" + str(clientResponse.errorCode) + " : " + clientResponse.errorMessage)
688  except Exception as err:
689  ExceptionLog.handler(logger, err, "Exception:")
690 
691 

◆ onURLUpdateResponse()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.onURLUpdateResponse (   self,
  event 
)

Definition at line 651 of file BatchTasksManagerProcess.py.

651  def onURLUpdateResponse(self, event):
652  try:
653  logger.debug("Reply received on URL update.")
654  clientResponse = event.eventObj
655  if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
656  if len(clientResponse.itemsList) > 0:
657  for clientResponseItem in clientResponse.itemsList:
658  if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
659  logger.error("URLUpdate response error: " + str(clientResponseItem.errorCode) + " : " + \
660  clientResponseItem.errorMessage + ", host:" + clientResponseItem.host + ", port:" + \
661  clientResponseItem.port + ", node:" + clientResponseItem.node + "!")
662  else:
663  logger.error("URLUpdate response empty list!")
664  else:
665  logger.error("URLUpdate response error:" + str(clientResponse.errorCode) + " : " + clientResponse.errorMessage)
666  except Exception as err:
667  ExceptionLog.handler(logger, err, "Exception:")
668 
669 

◆ processDTMTasksQueue()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.processDTMTasksQueue (   self)

Definition at line 424 of file BatchTasksManagerProcess.py.

424  def processDTMTasksQueue(self):
425  tmpQueue = {}
426 
427  logger.debug("Process batch tasks in queue:" + str(len(self.dtmTasksQueue)))
428  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_QUEUE_NAME, len(self.dtmTasksQueue),
429  self.STAT_FIELDS_OPERATION_SET)
430 
431  # Process the DTM tasks queue
432  for taskId, taskBatch in self.dtmTasksQueue.items():
433  ttl = self.configVars[self.CONFIG_BATCH_QUEUE_TASK_TTL]
434  batchState = self.getDTMTaskState(taskId)
435  if batchState != None:
436  logger.debug("Process batch state " + str(batchState) + ", Id=" + str(taskId))
437  if batchState == dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED or\
438  batchState == dtm.EventObjects.EEResponseData.TASK_STATE_CRASHED or\
439  batchState == dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED or\
440  batchState == dtm.EventObjects.EEResponseData.TASK_STATE_UNDEFINED or\
441  batchState == dtm.EventObjects.EEResponseData.TASK_STATE_SET_ERROR or\
442  batchState == dtm.EventObjects.EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED:
443  # Delete task in DTM and task's data in EE (DRCE)
444  deleteTaskObj = dtm.EventObjects.DeleteTask(taskId)
445  deleteTaskEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK, deleteTaskObj)
446  generalResponse = self.dtmdRequestExecute(deleteTaskEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
447  logger.debug("DTM DeleteTask request finished, taskId=" + str(taskId))
448  if generalResponse is not None:
449  if generalResponse.errorCode == dtm.EventObjects.GeneralResponse.ERROR_OK:
450  logger.debug("DTM task deleted, taskId=" + str(taskId))
451  # if batchState == dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED:
452  if self.isDTMTaskDead(batchState):
453  logger.debug("batch:\n" + varDump(taskBatch) + "\n finished, taskId=" + str(taskId))
454  self.processFinishedBatch(taskBatch)
455  else:
456  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
457  logger.debug("batch:" + varDump(taskBatch) + " not finished, state= " + str(batchState))
458  # TODO: Send update URLs of not finished batch on all nodes to get possibility to process them next time
459  # self.sendURLUpdate(taskBatch.items, taskBatch.id, False)
460  else:
461  # Save this batch to check it later
462  tmpQueue[taskId] = taskBatch
463  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_DELETE_FAULT_NAME, 1,
464  self.STAT_FIELDS_OPERATION_ADD)
465  logger.error("DTM delete task taskId=" + str(taskId) + " error: " + str(generalResponse.errorCode) + \
466  " : " + generalResponse.errorMessage + ", statuses:" + varDump(generalResponse))
467  else:
468  # Save this batch to check it later
469  tmpQueue[taskId] = taskBatch
470  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_DELETE_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
471  logger.error("DTM delete task error: wrong response or timeout, taskId=" + str(taskId) + "!")
472  else:
473  logger.debug("DTM task Id=" + str(taskId) + " state=" + str(batchState))
474  if time.time() - taskBatch.QueuedTs > ttl:
475  # Terminate task and delete it's data request
476  deleteTaskObj = dtm.EventObjects.DeleteTask(taskId)
477  deleteTaskObj.action = dtm.EventObjects.DeleteTask.ACTION_TERMINATE_TASK_AND_DELETE_DATA
478  deleteTaskEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK, deleteTaskObj)
479  generalResponse = self.dtmdRequestExecute(deleteTaskEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
480  logger.error("DTM task Id=" + str(taskId) + " terminated and removed from queue by TTL:" + str(ttl))
481  # TODO: Send update URLs of not finished batch on all nodes to get possibility to crawl them next time
482  # self.sendURLUpdate(taskBatch.items, taskBatch.id, False)
483  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
484  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_FAULT_TTL_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
485  else:
486  # Save this batch to check it later
487  tmpQueue[taskId] = taskBatch
488  logger.debug("DTM task Id=" + str(taskId) + " still in queue")
489  else:
490  logger.error("DTM check task state error: wrong response or timeout, taskId=" + str(taskId) + "!")
491  if time.time() - taskBatch.QueuedTs > ttl:
492  logger.error("DTM task Id=" + str(taskId) + " removed from queue by TTL:" + str(ttl))
493  else:
494  # Save this batch to check it later
495  tmpQueue[taskId] = taskBatch
496  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_CHECK_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
497  logger.error("DTM task Id=" + str(taskId) + " saved in queue.")
498 
499  self.dtmTasksQueue = tmpQueue
500  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_QUEUE_NAME, len(self.dtmTasksQueue),
501  self.STAT_FIELDS_OPERATION_SET)
502  logger.debug("The DTM tasks queue processing finished, batch tasks in queue " + str(len(self.dtmTasksQueue)))
503 
504 
505 
DeleteTask event object, to delete task from DTM application and from EE.
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processFinishedBatch()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.processFinishedBatch (   self,
  taskBatch 
)

Definition at line 528 of file BatchTasksManagerProcess.py.

528  def processFinishedBatch(self, taskBatch):
529  # if self.configVars[self.CONFIG_CRAWLED_URLS_STRATEGY] == 0:
530  # self.sendURLUpdate(taskBatch.items, taskBatch.id, True)
531  # logger.debug("Send update URLs from batch for all foreign hosts by the Batch_Id")
532  # else:
533  # self.sendURLDelete(taskBatch.items, taskBatch.id)
534  # logger.debug("Send delete URLs from batch for all foreign hosts by the Batch_Id")
535  pass
536  # TODO: update or delete URLs from returned batch
537 
538 
539 
Here is the caller graph for this function:

◆ sendBatchTaskToDTM()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.sendBatchTaskToDTM (   self,
  batch 
)

Definition at line 319 of file BatchTasksManagerProcess.py.

319  def sendBatchTaskToDTM(self, batch):
320  taskId = 0
321  # Prepare NewTask object
322  appName = self.configVars[self.CONFIG_DRCE_DB_APP_NAME]
323  newTaskObj = dtm.EventObjects.NewTask(appName)
324  newTaskObj.name = self.configVars[self.CONFIG_TASK_DTM_NAME_PROCESS]
325  newTaskObj.type = self.configVars[self.CONFIG_TASK_DTM_TYPE_PROCESS]
326  newTaskObj.setSessionVar("tmode", dtm.EventObjects.Task.TASK_MODE_ASYNCH)
327  newTaskObj.setSessionVar("shell", self.configVars[self.CONFIG_BATCH_DEFAULT_STARTER])
328  newTaskObj.setSessionVar("time_max", int(self.configVars[self.CONFIG_BATCH_DEFAULT_MAX_TIME]) * 1000)
329  newTaskObj.setSessionVar("task_type", int(newTaskObj.type))
330  # Configure task's strategy
331  if self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX] > 0:
332  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_IO_WAIT_MAX,
333  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX])
334  if self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX] > 0:
335  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_CPU_LOAD_MAX,
336  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX])
337  if self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN] > 0:
338  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RAM_FREE,
339  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN])
340  if self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY] > 0:
341  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RDELAY,
342  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY])
343  if self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_RETRY] > 0:
344  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RETRY,
345  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_RETRY])
346  # Set auto cleanup fields
347  autoCleanupFields = {}
348  if self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL] > -1:
349  autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_TTL] = \
350  int(self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL]) * 1000
351  if self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE] > -1:
352  autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_DELETE_TYPE] = \
353  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE]
354  if self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES] > -1:
355  autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_DELETE_RETRIES] = \
356  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES]
357  if self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE] > -1:
358  autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_SSTATE] = \
359  self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE]
360  if len(autoCleanupFields) > 0:
361  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_autoCleanupFields, autoCleanupFields)
362  batch.id = newTaskObj.id
363  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_FETCH, [batch])
364  newTaskObj.input = pickle.dumps(drceSyncTasksCoverObj)
365  newTaskEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.NEW_TASK, newTaskObj)
366  generalResponse = self.dtmdRequestExecute(newTaskEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
367  if generalResponse is not None:
368  if generalResponse.errorCode == dtm.EventObjects.GeneralResponse.ERROR_OK:
369  # New crawling Batch task set successfully
370  taskId = newTaskObj.id
371  else:
372  # Some error of task operation
373  logger.error("DTM set batch task error: " + str(generalResponse.errorCode) + " : " + \
374  generalResponse.errorMessage + ", statuses:" + varDump(generalResponse))
375  else:
376  logger.error("DTM set batch task response error, possible timeout!")
377 
378  # TODO: return the Task Id any case error or not to check it state later
379  taskId = newTaskObj.id
380 
381  return taskId
382 
383 
384 
NewTask event object, defines the Task object fields.
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:
Here is the caller graph for this function:

◆ sendURLDelete()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.sendURLDelete (   self,
  batchItemsList,
  batchId 
)

Definition at line 626 of file BatchTasksManagerProcess.py.

626  def sendURLDelete(self, batchItemsList, batchId):
627  urlsList = []
628 
629  # Prepare URLs list to delete
630  for batchItem in batchItemsList:
631  sqlExpression = SQLExpression("`ParentMd5`<>'' AND `URLMd5`='" + str(batchItem.urlId) + "' AND `Batch_Id`<>" + \
632  str(batchId))
633  urlDelete = EventObjects.URLDelete(batchItem.siteId, None, EventObjects.URLStatus.URL_TYPE_URL,
634  {EventObjects.URLFetch.CRITERION_WHERE:sqlExpression,
635  EventObjects.URLFetch.CRITERION_LIMIT:1},
636  reason=EventObjects.URLDelete.REASON_SELECT_TO_PROCESS_TTL)
637  logger.debug("URLDelete: " + varDump(urlDelete))
638  urlsList.append(urlDelete)
639 
640  # Make URLDelete event
641  urlDeleteEvent = self.eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_DELETE, urlsList)
642  # Send request URLDelete to SitesManager
643  self.send(self.clientSitesManagerName, urlDeleteEvent)
644  logger.debug("The URLDelete request to SitesManager sent!")
645 
646 
647 
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:

◆ sendURLUpdate()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.sendURLUpdate (   self,
  batchItemsList,
  batchId,
  batchState 
)

Definition at line 591 of file BatchTasksManagerProcess.py.

591  def sendURLUpdate(self, batchItemsList, batchId, batchState):
592  urlsList = []
593 
594  # Prepare URLs list to update
595  for batchItem in batchItemsList:
596  # Set state value depends on update reason - crawled successfully or not
597  if batchState is True:
598  status = EventObjects.URL.STATUS_CRAWLED
599  sqlExpression = SQLExpression("`URLMd5`='" + str(batchItem.urlId) + "' AND `Batch_Id` <>" + str(batchId) + \
600  " AND `Status` NOT IN (" + str(EventObjects.URL.STATUS_CRAWLED) + \
601  "," + str(EventObjects.URL.STATUS_SELECTED_PROCESSING) + ")")
602  else:
603  status = EventObjects.URL.STATUS_NEW
604  sqlExpression = SQLExpression("`URLMd5`='" + str(batchItem.urlId) + "' AND `Status` IN (" +
605  str(EventObjects.URL.STATUS_SELECTED_CRAWLING) + "," + \
606  str(EventObjects.URL.STATUS_SELECTED_CRAWLING_INCREMENTAL) + ")")
607 
608  urlUpdate = EventObjects.URLUpdate(batchItem.siteId, batchItem.urlId, EventObjects.URLStatus.URL_TYPE_MD5,
609  None, status)
610  urlUpdate.criterions[EventObjects.URLFetch.CRITERION_WHERE] = sqlExpression
611  logger.debug("URLUpdate: " + varDump(urlUpdate))
612  urlsList.append(urlUpdate)
613 
614  # Make URLUpdate event
615  urlUpdateEvent = self.eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_UPDATE, urlsList)
616  # Send request URLUpdate to SitesManager
617  self.send(self.clientSitesManagerName, urlUpdateEvent)
618  logger.debug("The URLUpdate request to SitesManager sent!")
619 
620 
621 
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:

◆ setProcessBatch()

def dc.BatchTasksManagerProcess.BatchTasksManagerProcess.setProcessBatch (   self)

Definition at line 269 of file BatchTasksManagerProcess.py.

269  def setProcessBatch(self):
270  ret = False
271 
272  try:
273  # Create the URLUpdate object to set in progress of processing state for selected URLs
274  urlUpdate = EventObjects.URLUpdate(0, "", EventObjects.URLStatus.URL_TYPE_MD5, None,
275  EventObjects.URL.STATUS_SELECTED_PROCESSING)
276  urlUpdate.tcDate = SQLExpression("NOW()")
277  # Create URLFetch object with URLUpdate to update selected URLs state
278  sCrit = "IFNULL((SELECT `Value` FROM `sites_properties` WHERE `Name`='PROCESS_WHERE_SITES'), " + \
279  self.configVars[self.CONFIG_BATCH_WHERE_SITES] + ")"
280  sitesCriterions = {EventObjects.URLFetch.CRITERION_WHERE: sCrit,
281  EventObjects.URLFetch.CRITERION_ORDER:"Priority DESC, TcDateProcess ASC"}
282  uCrit = "IFNULL(%PROCESS_WHERE_URLS%, " + self.configVars[self.CONFIG_BATCH_WHERE_URLS] + ")"
283  urlCriterions = {EventObjects.URLFetch.CRITERION_WHERE: uCrit,
284  EventObjects.URLFetch.CRITERION_ORDER: self.configVars[self.CONFIG_BATCH_ORDER_BY_URLS],
285  EventObjects.URLFetch.CRITERION_LIMIT: str(self.configVars[self.CONFIG_BATCH_MAX_URLS]),
286  EventObjects.URLFetch.CRITERION_SQL: {
287  "PROCESS_WHERE_URLS": "SELECT `Value` FROM `dc_sites`.`sites_properties` WHERE " + \
288  "`Name`='PROCESS_WHERE_URLS' AND `Site_Id`=\"%SITE_ID%\" LIMIT 1" # pylint: disable=C0301
289  }
290  }
291  siteUpdate = EventObjects.SiteUpdate(0)
292  siteUpdate.tcDateProcess = EventObjects.SQLExpression("Now()")
293  urlFetch = EventObjects.URLFetch(None, urlCriterions, sitesCriterions, urlUpdate, siteUpdate)
294  urlFetch.algorithm = EventObjects.URLFetch.PROPORTIONAL_ALGORITHM
295  urlFetch.maxURLs = self.configVars[self.CONFIG_BATCH_MAX_URLS]
296  taskId = self.sendBatchTaskToDTM(urlFetch)
297  if taskId > 0:
298  logger.debug("DTM process batch was set, taskId=%s", str(taskId))
299  # Insert the Batch object in to the queue
300  urlFetch.QueuedTs = time.time()
301  urlFetch.crawlerType = EventObjects.Batch.TYPE_PROCESS
302  self.dtmTasksQueue[taskId] = urlFetch
303  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_QUEUE_NAME, len(self.dtmTasksQueue),
304  self.STAT_FIELDS_OPERATION_SET)
305  ret = True
306  else:
307  logger.error("Error send process batch task to DTM!")
308 
309  except Exception as err:
310  ExceptionLog.handler(logger, err, "Exception:")
311 
312  return ret
313 
314 
315 
Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ clientSitesManagerName

dc.BatchTasksManagerProcess.BatchTasksManagerProcess.clientSitesManagerName

Definition at line 126 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_DEFAULT_MAX_TIME

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_DEFAULT_MAX_TIME = "BatchDefaultMaxExecutionTime"
static

Definition at line 59 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_DEFAULT_STARTER

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_DEFAULT_STARTER = "BatchTask_STARTER"
static

Definition at line 66 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES = "BatchTask_autocleanup_DeleteRetries"
static

Definition at line 80 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE = "BatchTask_autocleanup_DeleteType"
static

Definition at line 79 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE = "BatchTask_autocleanup_State"
static

Definition at line 81 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL = "BatchTask_autocleanup_TTL"
static

Definition at line 78 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX = "BatchTask_CPU_LOAD_MAX"
static

Definition at line 74 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX = "BatchTask_IO_WAIT_MAX"
static

Definition at line 73 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN = "BatchTask_RAM_FREE_MIN"
static

Definition at line 75 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_DEFAULT_STRATEGY_RETRY

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_DEFAULT_STRATEGY_RETRY = "BatchTask_RETRY"
static

Definition at line 77 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY = "BatchTask_RDELAY"
static

Definition at line 76 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_MAX_TASKS

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_MAX_TASKS = "BatchDefaultMaxTasks"
static

Definition at line 62 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_MAX_TIME

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_MAX_TIME = "BatchMaxExecutionTime"
static

Definition at line 69 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_MAX_URLS

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_MAX_URLS = "BatchDefaultMaxURLs"
static

Definition at line 60 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_ORDER_BY_URLS

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_ORDER_BY_URLS = "BatchDefaultOrderByURLs"
static

Definition at line 61 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_QUEUE_PERIOD

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_QUEUE_PERIOD = "BatchQueueProcessingPeriod"
static

Definition at line 63 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD = "BatchQueueTaskCheckStateMethod"
static

Definition at line 65 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_QUEUE_TASK_TTL

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_QUEUE_TASK_TTL = "BatchQueueTaskTTL"
static

Definition at line 64 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS = "RemoveUnprocessedBatchItems"
static

Definition at line 70 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_WHERE_SITES

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_WHERE_SITES = "BatchDefaultWhereSites"
static

Definition at line 68 of file BatchTasksManagerProcess.py.

◆ CONFIG_BATCH_WHERE_URLS

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_BATCH_WHERE_URLS = "BatchDefaultWhereURLs"
static

Definition at line 67 of file BatchTasksManagerProcess.py.

◆ CONFIG_DRCE_DB_APP_NAME

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_DRCE_DB_APP_NAME = "DRCEDBAppName"
static

Definition at line 55 of file BatchTasksManagerProcess.py.

◆ CONFIG_DRCE_PROCESSOR_APP_NAME

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_DRCE_PROCESSOR_APP_NAME = "DRCEProcessorAppName"
static

Definition at line 54 of file BatchTasksManagerProcess.py.

◆ CONFIG_DTMD_HOST

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_DTMD_HOST = "DTMDHost"
static

Definition at line 48 of file BatchTasksManagerProcess.py.

◆ CONFIG_DTMD_PORT

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_DTMD_PORT = "DTMDPort"
static

Definition at line 49 of file BatchTasksManagerProcess.py.

◆ CONFIG_DTMD_TIMEOUT

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_DTMD_TIMEOUT = "DTMDTimeout"
static

Definition at line 50 of file BatchTasksManagerProcess.py.

◆ CONFIG_POLLING_TIMEOUT

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_POLLING_TIMEOUT = "PollingTimeout"
static

Definition at line 51 of file BatchTasksManagerProcess.py.

◆ CONFIG_PROCESS_MODE

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_PROCESS_MODE = "ProcessingMode"
static

Definition at line 57 of file BatchTasksManagerProcess.py.

◆ CONFIG_PROCESS_PERIOD

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_PROCESS_PERIOD = "ProcessingPeriod"
static

Definition at line 56 of file BatchTasksManagerProcess.py.

◆ CONFIG_SERVER

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_SERVER = "server"
static

Definition at line 47 of file BatchTasksManagerProcess.py.

◆ CONFIG_SITES_MANAGER_CLIENT

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_SITES_MANAGER_CLIENT = "clientSitesManager"
static

Definition at line 52 of file BatchTasksManagerProcess.py.

◆ CONFIG_TASK_DTM_NAME_PROCESS

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_TASK_DTM_NAME_PROCESS = "BatchTaskDTMNameProcess"
static

Definition at line 84 of file BatchTasksManagerProcess.py.

◆ CONFIG_TASK_DTM_TYPE_PROCESS

string dc.BatchTasksManagerProcess.BatchTasksManagerProcess.CONFIG_TASK_DTM_TYPE_PROCESS = "BatchTaskDTMTypeProcess"
static

Definition at line 86 of file BatchTasksManagerProcess.py.

◆ DTM_TASK_CHECK_STATE_METHOD_STATE

int dc.BatchTasksManagerProcess.BatchTasksManagerProcess.DTM_TASK_CHECK_STATE_METHOD_STATE = 1
static

Definition at line 44 of file BatchTasksManagerProcess.py.

◆ DTM_TASK_CHECK_STATE_METHOD_STATUS

int dc.BatchTasksManagerProcess.BatchTasksManagerProcess.DTM_TASK_CHECK_STATE_METHOD_STATUS = 0
static

Definition at line 43 of file BatchTasksManagerProcess.py.

◆ dtmdConnection

dc.BatchTasksManagerProcess.BatchTasksManagerProcess.dtmdConnection

Definition at line 154 of file BatchTasksManagerProcess.py.

◆ dtmTasksQueue

dc.BatchTasksManagerProcess.BatchTasksManagerProcess.dtmTasksQueue

Definition at line 169 of file BatchTasksManagerProcess.py.

◆ processBatchQueuelLastTs

dc.BatchTasksManagerProcess.BatchTasksManagerProcess.processBatchQueuelLastTs

Definition at line 182 of file BatchTasksManagerProcess.py.

◆ processProcessingLastTs

dc.BatchTasksManagerProcess.BatchTasksManagerProcess.processProcessingLastTs

Definition at line 174 of file BatchTasksManagerProcess.py.

◆ serverName

dc.BatchTasksManagerProcess.BatchTasksManagerProcess.serverName

Definition at line 125 of file BatchTasksManagerProcess.py.


The documentation for this class was generated from the following file: