HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
BatchTasksManagerProcess.py
Go to the documentation of this file.
1 '''
2 HCE project, Python bindings, Distributed Crawler application.
3 BatchTasksManagerProcess object and related classes definitions.
4 
5 @package: dc
6 @author bgv bgv.hce@gmail.com
7 @link: http://hierarchical-cluster-engine.com/
8 @copyright: Copyright © 2015 IOIX Ukraine
9 @license: http://hierarchical-cluster-engine.com/license/
10 @since: 0.1
11 '''
12 
13 
14 import time
15 try:
16  import cPickle as pickle
17 except ImportError:
18  import pickle
19 
20 import dc.Constants as DC_CONSTS
21 from dc import EventObjects
22 from transport.ConnectionBuilderLight import ConnectionBuilderLight
23 import transport.Consts as TRANSPORT_CONSTS
24 import dtm.EventObjects
25 import dtm.Constants as DTM_CONSTS
26 from app.BaseServerManager import BaseServerManager
27 from app.Utils import SQLExpression, varDump
28 from app.Utils import ExceptionLog
29 import app.Utils as Utils # pylint: disable=F0401
30 
31 # Logger initialization
32 logger = Utils.MPLogger().getLogger()
33 
34 
35 # #The BatchTasksManager class, is a main crawling logic of DC application.
36 #
37 # This object is a main crawling batches algorithms of DC application.
38 # It supports complete DTM protocol requests and process responses from DTM server, operates with tasks and monitors
39 # tasks state.
40 #
42 
43  DTM_TASK_CHECK_STATE_METHOD_STATUS = 0
44  DTM_TASK_CHECK_STATE_METHOD_STATE = 1
45 
46  # Configuration settings options names
47  CONFIG_SERVER = "server"
48  CONFIG_DTMD_HOST = "DTMDHost"
49  CONFIG_DTMD_PORT = "DTMDPort"
50  CONFIG_DTMD_TIMEOUT = "DTMDTimeout"
51  CONFIG_POLLING_TIMEOUT = "PollingTimeout"
52  CONFIG_SITES_MANAGER_CLIENT = "clientSitesManager"
53 
54  CONFIG_DRCE_PROCESSOR_APP_NAME = "DRCEProcessorAppName"
55  CONFIG_DRCE_DB_APP_NAME = "DRCEDBAppName"
56  CONFIG_PROCESS_PERIOD = "ProcessingPeriod"
57  CONFIG_PROCESS_MODE = "ProcessingMode"
58 
59  CONFIG_BATCH_DEFAULT_MAX_TIME = "BatchDefaultMaxExecutionTime"
60  CONFIG_BATCH_MAX_URLS = "BatchDefaultMaxURLs"
61  CONFIG_BATCH_ORDER_BY_URLS = "BatchDefaultOrderByURLs"
62  CONFIG_BATCH_MAX_TASKS = "BatchDefaultMaxTasks"
63  CONFIG_BATCH_QUEUE_PERIOD = "BatchQueueProcessingPeriod"
64  CONFIG_BATCH_QUEUE_TASK_TTL = "BatchQueueTaskTTL"
65  CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD = "BatchQueueTaskCheckStateMethod"
66  CONFIG_BATCH_DEFAULT_STARTER = "BatchTask_STARTER"
67  CONFIG_BATCH_WHERE_URLS = "BatchDefaultWhereURLs"
68  CONFIG_BATCH_WHERE_SITES = "BatchDefaultWhereSites"
69  CONFIG_BATCH_MAX_TIME = "BatchMaxExecutionTime"
70  CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS = "RemoveUnprocessedBatchItems"
71 
72  # The tasks's strategy configuration parameters for DTM service
73  CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX = "BatchTask_IO_WAIT_MAX"
74  CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX = "BatchTask_CPU_LOAD_MAX"
75  CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN = "BatchTask_RAM_FREE_MIN"
76  CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY = "BatchTask_RDELAY"
77  CONFIG_BATCH_DEFAULT_STRATEGY_RETRY = "BatchTask_RETRY"
78  CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL = "BatchTask_autocleanup_TTL"
79  CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE = "BatchTask_autocleanup_DeleteType"
80  CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES = "BatchTask_autocleanup_DeleteRetries"
81  CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE = "BatchTask_autocleanup_State"
82 
83  # Processing task DTM name
84  CONFIG_TASK_DTM_NAME_PROCESS = "BatchTaskDTMNameProcess"
85  # Processing task DTM type
86  CONFIG_TASK_DTM_TYPE_PROCESS = "BatchTaskDTMTypeProcess"
87 
88 
89  # #constructor
90  # initialize fields
91  #
92  # @param configParser config parser object
93  # @param connectBuilderLight connection builder light
94  #
95  def __init__(self, configParser, connectionBuilderLight=None):
96  super(BatchTasksManagerProcess, self).__init__()
97 
98  # Instantiate the connection builder light if not set
99  if connectionBuilderLight is None:
100  connectionBuilderLight = ConnectionBuilderLight()
101 
102  # Batches counter init in stat vars
103  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_TOTAL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
104  # Batches in queue counter init in stat vars
105  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_QUEUE_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
106  # Batches that fault processing counter init in stat vars
107  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
108  # Batches that not empty counter init in stat vars
109  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_FILLED_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
110  # Batches urls total counter init in stat vars
111  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_URLS_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
112  # Fault batches urls total counter init in stat vars
113  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_URLS_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
114  # Processing delete task requests fault counter name for stat variables
115  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_DELETE_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
116  # Processing check task requests fault counter name for stat variables
117  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_CHECK_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
118  # Processing batches fault TTL counter name for stat variables
119  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_FAULT_TTL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
120  # Processing batches cancelled counter name for stat variables
121  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_CANCELLED_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
122 
123  # Get configuration settings
124  className = self.__class__.__name__
125  self.serverName = configParser.get(className, self.CONFIG_SERVER)
126  self.clientSitesManagerName = configParser.get(className, self.CONFIG_SITES_MANAGER_CLIENT)
127  # Configuration settings for DTMD server interaction
128  self.configVars[self.CONFIG_DTMD_HOST] = configParser.get(className, self.CONFIG_DTMD_HOST)
129  self.configVars[self.CONFIG_DTMD_PORT] = configParser.get(className, self.CONFIG_DTMD_PORT)
130  self.configVars[self.CONFIG_DTMD_TIMEOUT] = configParser.getint(className, self.CONFIG_DTMD_TIMEOUT)
131 
132  # Max URLs per batch
133  self.configVars[self.CONFIG_BATCH_MAX_URLS] = configParser.getint(className, self.CONFIG_BATCH_MAX_URLS)
134  # Set connections poll timeout, defines period of HCE cluster monitoring cycle, msec
135  self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] = configParser.getint(className, self.CONFIG_POLLING_TIMEOUT)
136  # Set crawler task app name
138  configParser.get(className, self.CONFIG_DRCE_PROCESSOR_APP_NAME)
140  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_MAX_TIME)
141 
142  # #TODO experemental
143  # Max execution time for batch
144  self.configVars[self.CONFIG_BATCH_MAX_TIME] = configParser.getint(className, self.CONFIG_BATCH_MAX_TIME)
145  # Remove unprocessed items for batch
147  configParser.getint(className, self.CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS)
148 
149  # Create connections and raise bind or connect actions for correspondent connection type
150  serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.serverName)
151  sitesManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT, self.clientSitesManagerName)
152 
153  # Init the DTMD connection
154  self.dtmdConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
155  self.configVars[self.CONFIG_DTMD_HOST] + ":" + \
156  self.configVars[self.CONFIG_DTMD_PORT],
157  TRANSPORT_CONSTS.TCP_TYPE)
158 
159  # Add connections to the polling set
160  self.addConnection(self.serverName, serverConnection)
161  self.addConnection(self.clientSitesManagerName, sitesManagerConnection)
162 
163  # Set event handler for URL_UPDATE_RESPONSE event
164  self.setEventHandler(DC_CONSTS.EVENT_TYPES.URL_UPDATE_RESPONSE, self.onURLUpdateResponse)
165  # Set event handler for URL_DELETE_RESPONSE event
166  self.setEventHandler(DC_CONSTS.EVENT_TYPES.URL_DELETE_RESPONSE, self.onURLDeleteResponse)
167 
168  # Initialize the DTM tasks queue, the key is taskId, the value is the Batch object
169  self.dtmTasksQueue = {}
170 
171  # Processing init
172  self.configVars[self.CONFIG_DRCE_DB_APP_NAME] = configParser.get(className, self.CONFIG_DRCE_DB_APP_NAME)
173  self.configVars[self.CONFIG_PROCESS_PERIOD] = configParser.getint(className, self.CONFIG_PROCESS_PERIOD)
174  self.processProcessingLastTs = time.time()
175 
176  # The Batch default order by criterion to fetch URLs
177  self.configVars[self.CONFIG_BATCH_ORDER_BY_URLS] = configParser.get(className, self.CONFIG_BATCH_ORDER_BY_URLS)
178  # The Batch max tasks in batch queue, if limit reached new batch tasks will not be started; zero means unlimited
179  self.configVars[self.CONFIG_BATCH_MAX_TASKS] = configParser.getint(className, self.CONFIG_BATCH_MAX_TASKS)
180  # The Batch queue processing init
181  self.configVars[self.CONFIG_BATCH_QUEUE_PERIOD] = configParser.getint(className, self.CONFIG_BATCH_QUEUE_PERIOD)
182  self.processBatchQueuelLastTs = time.time()
183  # The Batch queue task TTL, sec
184  self.configVars[self.CONFIG_BATCH_QUEUE_TASK_TTL] = configParser.getint(className, self.CONFIG_BATCH_QUEUE_TASK_TTL)
185  # The Batch queue tasks state check method, see ini file comments
187  configParser.getint(className, self.CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD)
188  # The Batch DRCE task starter name
189  self.configVars[self.CONFIG_BATCH_DEFAULT_STARTER] = configParser.get(className, self.CONFIG_BATCH_DEFAULT_STARTER)
190  # The Batch tasks's strategy configuration parameters for DTM service load
192  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX)
194  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX)
196  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN)
198  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY)
200  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_RETRY)
201  # The Batch DRCE tasks auto cleanup fields
203  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL)
205  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE)
207  configParser.get(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES)
209  configParser.get(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE)
210  # The Batch default where criterion to fetch URLs
211  self.configVars[self.CONFIG_BATCH_WHERE_URLS] = configParser.get(className, self.CONFIG_BATCH_WHERE_URLS)
212  # The Batch default where criterion to fetch Sites
213  self.configVars[self.CONFIG_BATCH_WHERE_SITES] = configParser.get(className, self.CONFIG_BATCH_WHERE_SITES)
214 
215  # Init the config processing mode and runtime values
216  self.configVars[self.CONFIG_PROCESS_MODE] = configParser.getint(className, self.CONFIG_PROCESS_MODE)
217 
218  # Processing task DTM name
219  self.configVars[self.CONFIG_TASK_DTM_NAME_PROCESS] = configParser.get(className, self.CONFIG_TASK_DTM_NAME_PROCESS)
220  # Processing task DTM type
221  self.configVars[self.CONFIG_TASK_DTM_TYPE_PROCESS] = configParser.getint(className,
223 
224 
225 
226  # #Events wait timeout handler, for timeout state of the connections polling. Executes periodical check of DTM tasks
227  # and initiate the main processing batching iteration cycle
228  #
229  def on_poll_timeout(self):
230  logger.debug("Periodic iteration started.")
231  try:
232  # Process the Processing batch
233  if self.configVars[self.CONFIG_PROCESS_PERIOD] > 0 and\
234  time.time() - self.processProcessingLastTs > self.configVars[self.CONFIG_PROCESS_PERIOD]:
235  self.processProcessingLastTs = time.time()
236  if self.configVars[self.CONFIG_PROCESS_MODE] == 1:
237  logger.debug("Processing batch cycle iteration started")
238  if self.configVars[self.CONFIG_BATCH_MAX_TASKS] > len(self.dtmTasksQueue):
239  if self.setProcessBatch():
240  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_TOTAL_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
241  else:
242  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_CANCELLED_NAME, 1,
244  logger.debug("Max processing batch tasks %s in queue reached, new batch is not created!",
245  str(len(self.dtmTasksQueue)))
246  else:
247  logger.debug("Processing batch disabled!")
248 
249  # Process the DRCE Batch tasks queue
250  if self.configVars[self.CONFIG_BATCH_QUEUE_PERIOD] > 0 and\
251  time.time() - self.processBatchQueuelLastTs > self.configVars[self.CONFIG_BATCH_QUEUE_PERIOD]:
252  self.processBatchQueuelLastTs = time.time()
253  logger.debug("Process DTM tasks queue!")
254  # Process the DTM tasks queue
255  self.processDTMTasksQueue()
256 
257  except IOError as e:
258  del e
259  except Exception as err:
260  ExceptionLog.handler(logger, err, "Exception:")
261 
262  logger.debug("Periodic iteration finished.")
263 
264 
265 
266  # #Create new process batch, send it to execute as asynchronous DRCE task and insert in to the batches queue
267  #
268  #
269  def setProcessBatch(self):
270  ret = False
271 
272  try:
273  # Create the URLUpdate object to set in progress of processing state for selected URLs
274  urlUpdate = EventObjects.URLUpdate(0, "", EventObjects.URLStatus.URL_TYPE_MD5, None,
275  EventObjects.URL.STATUS_SELECTED_PROCESSING)
276  urlUpdate.tcDate = SQLExpression("NOW()")
277  # Create URLFetch object with URLUpdate to update selected URLs state
278  sCrit = "IFNULL((SELECT `Value` FROM `sites_properties` WHERE `Name`='PROCESS_WHERE_SITES'), " + \
279  self.configVars[self.CONFIG_BATCH_WHERE_SITES] + ")"
280  sitesCriterions = {EventObjects.URLFetch.CRITERION_WHERE: sCrit,
281  EventObjects.URLFetch.CRITERION_ORDER:"Priority DESC, TcDateProcess ASC"}
282  uCrit = "IFNULL(%PROCESS_WHERE_URLS%, " + self.configVars[self.CONFIG_BATCH_WHERE_URLS] + ")"
283  urlCriterions = {EventObjects.URLFetch.CRITERION_WHERE: uCrit,
284  EventObjects.URLFetch.CRITERION_ORDER: self.configVars[self.CONFIG_BATCH_ORDER_BY_URLS],
285  EventObjects.URLFetch.CRITERION_LIMIT: str(self.configVars[self.CONFIG_BATCH_MAX_URLS]),
286  EventObjects.URLFetch.CRITERION_SQL: {
287  "PROCESS_WHERE_URLS": "SELECT `Value` FROM `dc_sites`.`sites_properties` WHERE " + \
288  "`Name`='PROCESS_WHERE_URLS' AND `Site_Id`=\"%SITE_ID%\" LIMIT 1" # pylint: disable=C0301
289  }
290  }
291  siteUpdate = EventObjects.SiteUpdate(0)
292  siteUpdate.tcDateProcess = EventObjects.SQLExpression("Now()")
293  urlFetch = EventObjects.URLFetch(None, urlCriterions, sitesCriterions, urlUpdate, siteUpdate)
294  urlFetch.algorithm = EventObjects.URLFetch.PROPORTIONAL_ALGORITHM
295  urlFetch.maxURLs = self.configVars[self.CONFIG_BATCH_MAX_URLS]
296  taskId = self.sendBatchTaskToDTM(urlFetch)
297  if taskId > 0:
298  logger.debug("DTM process batch was set, taskId=%s", str(taskId))
299  # Insert the Batch object in to the queue
300  urlFetch.QueuedTs = time.time()
301  urlFetch.crawlerType = EventObjects.Batch.TYPE_PROCESS
302  self.dtmTasksQueue[taskId] = urlFetch
303  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_QUEUE_NAME, len(self.dtmTasksQueue),
305  ret = True
306  else:
307  logger.error("Error send process batch task to DTM!")
308 
309  except Exception as err:
310  ExceptionLog.handler(logger, err, "Exception:")
311 
312  return ret
313 
314 
315 
316  # #make the Batch object from the ClientResponse object items
317  # @param clientResponseItems The list of ClientResponseItem objects
318  #
319  def sendBatchTaskToDTM(self, batch):
320  taskId = 0
321  # Prepare NewTask object
322  appName = self.configVars[self.CONFIG_DRCE_DB_APP_NAME]
323  newTaskObj = dtm.EventObjects.NewTask(appName)
324  newTaskObj.name = self.configVars[self.CONFIG_TASK_DTM_NAME_PROCESS]
325  newTaskObj.type = self.configVars[self.CONFIG_TASK_DTM_TYPE_PROCESS]
326  newTaskObj.setSessionVar("tmode", dtm.EventObjects.Task.TASK_MODE_ASYNCH)
327  newTaskObj.setSessionVar("shell", self.configVars[self.CONFIG_BATCH_DEFAULT_STARTER])
328  newTaskObj.setSessionVar("time_max", int(self.configVars[self.CONFIG_BATCH_DEFAULT_MAX_TIME]) * 1000)
329  newTaskObj.setSessionVar("task_type", int(newTaskObj.type))
330  # Configure task's strategy
332  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_IO_WAIT_MAX,
335  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_CPU_LOAD_MAX,
338  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RAM_FREE,
341  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RDELAY,
344  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RETRY,
346  # Set auto cleanup fields
347  autoCleanupFields = {}
349  autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_TTL] = \
352  autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_DELETE_TYPE] = \
355  autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_DELETE_RETRIES] = \
358  autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_SSTATE] = \
360  if len(autoCleanupFields) > 0:
361  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_autoCleanupFields, autoCleanupFields)
362  batch.id = newTaskObj.id
363  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_FETCH, [batch])
364  newTaskObj.input = pickle.dumps(drceSyncTasksCoverObj)
365  newTaskEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.NEW_TASK, newTaskObj)
366  generalResponse = self.dtmdRequestExecute(newTaskEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
367  if generalResponse is not None:
368  if generalResponse.errorCode == dtm.EventObjects.GeneralResponse.ERROR_OK:
369  # New crawling Batch task set successfully
370  taskId = newTaskObj.id
371  else:
372  # Some error of task operation
373  logger.error("DTM set batch task error: " + str(generalResponse.errorCode) + " : " + \
374  generalResponse.errorMessage + ", statuses:" + varDump(generalResponse))
375  else:
376  logger.error("DTM set batch task response error, possible timeout!")
377 
378  # TODO: return the Task Id any case error or not to check it state later
379  taskId = newTaskObj.id
380 
381  return taskId
382 
383 
384 
385  # #Get the DTM task state by involving one of two methods on DTM service
386  #
387  # @param taskId Id of DTM task
388  def getDTMTaskState(self, taskId):
389  taskState = None
390 
392  # Check the task status on DRCE EE hce-node
393  logger.debug("Check state of taskId=" + str(taskId))
394  checkTaskStateObj = dtm.EventObjects.CheckTaskState(taskId)
395  checkStateEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE, checkTaskStateObj)
396  eeResponseData = self.dtmdRequestExecute(checkStateEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
397  logger.debug("DTM CheckTaskState request finished, taskId=" + str(taskId))
398  if eeResponseData is not None and type(eeResponseData) == type(dtm.EventObjects.EEResponseData(0)):
399  taskState = eeResponseData.state
400  else:
401  # Get task status on DTM service
402  logger.debug("Get status of taskId=" + str(taskId))
403  getTasksStatusObj = dtm.EventObjects.GetTasksStatus([taskId])
404  getTasksStatusEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.GET_TASK_STATUS, getTasksStatusObj)
405  listTaskManagerFields = self.dtmdRequestExecute(getTasksStatusEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
406  logger.debug("DTM getTasksStatus request finished, taskId=" + str(taskId))
407  if listTaskManagerFields is not None and isinstance(listTaskManagerFields, list):
408  if len(listTaskManagerFields) > 0:
409  taskState = listTaskManagerFields[0].fields["state"]
410  else:
411  # Set TASK_STATE_FINISHED state to push task to delete from queue
412  taskState = dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED
413  else:
414  logger.error("DTM getTasksStatus taskId=" + str(taskId) + " returned wrong data:\n" + \
415  Utils.varDump(listTaskManagerFields))
416 
417  return taskState
418 
419 
420 
421  # #Process the DTM tasks queue
422  #
423  #
425  tmpQueue = {}
426 
427  logger.debug("Process batch tasks in queue:" + str(len(self.dtmTasksQueue)))
428  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_QUEUE_NAME, len(self.dtmTasksQueue),
430 
431  # Process the DTM tasks queue
432  for taskId, taskBatch in self.dtmTasksQueue.items():
433  ttl = self.configVars[self.CONFIG_BATCH_QUEUE_TASK_TTL]
434  batchState = self.getDTMTaskState(taskId)
435  if batchState != None:
436  logger.debug("Process batch state " + str(batchState) + ", Id=" + str(taskId))
437  if batchState == dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED or\
438  batchState == dtm.EventObjects.EEResponseData.TASK_STATE_CRASHED or\
439  batchState == dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED or\
440  batchState == dtm.EventObjects.EEResponseData.TASK_STATE_UNDEFINED or\
441  batchState == dtm.EventObjects.EEResponseData.TASK_STATE_SET_ERROR or\
442  batchState == dtm.EventObjects.EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED:
443  # Delete task in DTM and task's data in EE (DRCE)
444  deleteTaskObj = dtm.EventObjects.DeleteTask(taskId)
445  deleteTaskEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK, deleteTaskObj)
446  generalResponse = self.dtmdRequestExecute(deleteTaskEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
447  logger.debug("DTM DeleteTask request finished, taskId=" + str(taskId))
448  if generalResponse is not None:
449  if generalResponse.errorCode == dtm.EventObjects.GeneralResponse.ERROR_OK:
450  logger.debug("DTM task deleted, taskId=" + str(taskId))
451  # if batchState == dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED:
452  if self.isDTMTaskDead(batchState):
453  logger.debug("batch:\n" + varDump(taskBatch) + "\n finished, taskId=" + str(taskId))
454  self.processFinishedBatch(taskBatch)
455  else:
456  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
457  logger.debug("batch:" + varDump(taskBatch) + " not finished, state= " + str(batchState))
458  # TODO: Send update URLs of not finished batch on all nodes to get possibility to process them next time
459  # self.sendURLUpdate(taskBatch.items, taskBatch.id, False)
460  else:
461  # Save this batch to check it later
462  tmpQueue[taskId] = taskBatch
463  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_DELETE_FAULT_NAME, 1,
465  logger.error("DTM delete task taskId=" + str(taskId) + " error: " + str(generalResponse.errorCode) + \
466  " : " + generalResponse.errorMessage + ", statuses:" + varDump(generalResponse))
467  else:
468  # Save this batch to check it later
469  tmpQueue[taskId] = taskBatch
470  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_DELETE_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
471  logger.error("DTM delete task error: wrong response or timeout, taskId=" + str(taskId) + "!")
472  else:
473  logger.debug("DTM task Id=" + str(taskId) + " state=" + str(batchState))
474  if time.time() - taskBatch.QueuedTs > ttl:
475  # Terminate task and delete it's data request
476  deleteTaskObj = dtm.EventObjects.DeleteTask(taskId)
477  deleteTaskObj.action = dtm.EventObjects.DeleteTask.ACTION_TERMINATE_TASK_AND_DELETE_DATA
478  deleteTaskEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK, deleteTaskObj)
479  generalResponse = self.dtmdRequestExecute(deleteTaskEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
480  logger.error("DTM task Id=" + str(taskId) + " terminated and removed from queue by TTL:" + str(ttl))
481  # TODO: Send update URLs of not finished batch on all nodes to get possibility to crawl them next time
482  # self.sendURLUpdate(taskBatch.items, taskBatch.id, False)
483  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
484  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_FAULT_TTL_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
485  else:
486  # Save this batch to check it later
487  tmpQueue[taskId] = taskBatch
488  logger.debug("DTM task Id=" + str(taskId) + " still in queue")
489  else:
490  logger.error("DTM check task state error: wrong response or timeout, taskId=" + str(taskId) + "!")
491  if time.time() - taskBatch.QueuedTs > ttl:
492  logger.error("DTM task Id=" + str(taskId) + " removed from queue by TTL:" + str(ttl))
493  else:
494  # Save this batch to check it later
495  tmpQueue[taskId] = taskBatch
496  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_CHECK_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
497  logger.error("DTM task Id=" + str(taskId) + " saved in queue.")
498 
499  self.dtmTasksQueue = tmpQueue
500  self.updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_QUEUE_NAME, len(self.dtmTasksQueue),
502  logger.debug("The DTM tasks queue processing finished, batch tasks in queue " + str(len(self.dtmTasksQueue)))
503 
504 
505 
506  # #Check is DTM task alive by status code verification, returns True if yes or False if not
507  #
508  #
509  def isDTMTaskDead(self, state):
510  ret = False
511  if state == dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED or\
512  state == dtm.EventObjects.EEResponseData.TASK_STATE_CRASHED or\
513  state == dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED or\
514  state == dtm.EventObjects.EEResponseData.TASK_STATE_UNDEFINED or\
515  state == dtm.EventObjects.EEResponseData.TASK_STATE_SET_ERROR or\
516  state == dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL or\
517  state == dtm.EventObjects.EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED:
518  ret = True
519 
520  return ret
521 
522 
523 
524  # #Do some post batch processing after batch was successfully finished
525  #
526  # @param taskBatch the Batch object
527  #
528  def processFinishedBatch(self, taskBatch):
529  # if self.configVars[self.CONFIG_CRAWLED_URLS_STRATEGY] == 0:
530  # self.sendURLUpdate(taskBatch.items, taskBatch.id, True)
531  # logger.debug("Send update URLs from batch for all foreign hosts by the Batch_Id")
532  # else:
533  # self.sendURLDelete(taskBatch.items, taskBatch.id)
534  # logger.debug("Send delete URLs from batch for all foreign hosts by the Batch_Id")
535  pass
536  # TODO: update or delete URLs from returned batch
537 
538 
539 
540  # #Execute DTMD task request
541  #
542  # @param requestEvent The request event to send to DTMD
543  # @param timeout The DTMD request timeout
544  #
545  def dtmdRequestExecute(self, requestEvent, timeout, maxTries=100):
546  ret = None
547  if maxTries < 0:
548  maxTries = 0
549 
550  try:
551  # Send DTMD request
552  self.dtmdConnection.send(requestEvent)
553 
554  for i in range(maxTries + 1):
555  # Poll DTMD connection
556  if self.dtmdConnection.poll(int(timeout)) == 0:
557  logger.error("DTMD request timeout reached " + str(timeout) + "!")
558  break
559  else:
560  # Recv DTMD response
561  retEvent = self.dtmdConnection.recv()
562  if retEvent != None:
563  # Get response object
564  if type(retEvent.eventObj) == type(dtm.EventObjects.EEResponseData(0)) or\
565  type(retEvent.eventObj) == type(dtm.EventObjects.GeneralResponse()) or\
566  isinstance(retEvent.eventObj, list):
567  if retEvent.uid == requestEvent.uid:
568  ret = retEvent.eventObj
569  break
570  else:
571  logger.error("DTMD returned wrong object uid: " + str(retEvent.uid) + " but " + \
572  str(requestEvent.uid) + " expected, iteration " + str(i) + "!")
573  else:
574  logger.error("DTMD returned wrong object type: " + str(type(retEvent.eventObj)) + "!")
575  else:
576  logger.error("DTMD returned None event!")
577  except Exception, e:
578  logger.error("DTMD request execution exception: " + e.message + "!")
579 
580  logger.debug("The DTMD request finished!")
581 
582  return ret
583 
584 
585 
586  # #Send URL update for batch URLs
587  #
588  # @param batchItemsList List of BatchItem objects
589  # @param batchId the Batch Id
590  # @param batchState state of batch operation False - means return URLs to New state, True - means set crawled state
591  def sendURLUpdate(self, batchItemsList, batchId, batchState):
592  urlsList = []
593 
594  # Prepare URLs list to update
595  for batchItem in batchItemsList:
596  # Set state value depends on update reason - crawled successfully or not
597  if batchState is True:
598  status = EventObjects.URL.STATUS_CRAWLED
599  sqlExpression = SQLExpression("`URLMd5`='" + str(batchItem.urlId) + "' AND `Batch_Id` <>" + str(batchId) + \
600  " AND `Status` NOT IN (" + str(EventObjects.URL.STATUS_CRAWLED) + \
601  "," + str(EventObjects.URL.STATUS_SELECTED_PROCESSING) + ")")
602  else:
603  status = EventObjects.URL.STATUS_NEW
604  sqlExpression = SQLExpression("`URLMd5`='" + str(batchItem.urlId) + "' AND `Status` IN (" +
605  str(EventObjects.URL.STATUS_SELECTED_CRAWLING) + "," + \
606  str(EventObjects.URL.STATUS_SELECTED_CRAWLING_INCREMENTAL) + ")")
607 
608  urlUpdate = EventObjects.URLUpdate(batchItem.siteId, batchItem.urlId, EventObjects.URLStatus.URL_TYPE_MD5,
609  None, status)
610  urlUpdate.criterions[EventObjects.URLFetch.CRITERION_WHERE] = sqlExpression
611  logger.debug("URLUpdate: " + varDump(urlUpdate))
612  urlsList.append(urlUpdate)
613 
614  # Make URLUpdate event
615  urlUpdateEvent = self.eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_UPDATE, urlsList)
616  # Send request URLUpdate to SitesManager
617  self.send(self.clientSitesManagerName, urlUpdateEvent)
618  logger.debug("The URLUpdate request to SitesManager sent!")
619 
620 
621 
622  # #Send URL delete for batch URLs that is not processed and stays in SELECTED_FOR_PROCESS state (5)
623  #
624  # @param batchItemsList List of BatchItem objects
625  # @param batchId Id of the batch
626  def sendURLDelete(self, batchItemsList, batchId):
627  urlsList = []
628 
629  # Prepare URLs list to delete
630  for batchItem in batchItemsList:
631  sqlExpression = SQLExpression("`ParentMd5`<>'' AND `URLMd5`='" + str(batchItem.urlId) + "' AND `Batch_Id`<>" + \
632  str(batchId))
633  urlDelete = EventObjects.URLDelete(batchItem.siteId, None, EventObjects.URLStatus.URL_TYPE_URL,
634  {EventObjects.URLFetch.CRITERION_WHERE:sqlExpression,
635  EventObjects.URLFetch.CRITERION_LIMIT:1},
636  reason=EventObjects.URLDelete.REASON_SELECT_TO_PROCESS_TTL)
637  logger.debug("URLDelete: " + varDump(urlDelete))
638  urlsList.append(urlDelete)
639 
640  # Make URLDelete event
641  urlDeleteEvent = self.eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_DELETE, urlsList)
642  # Send request URLDelete to SitesManager
643  self.send(self.clientSitesManagerName, urlDeleteEvent)
644  logger.debug("The URLDelete request to SitesManager sent!")
645 
646 
647 
648  # #onURLUpdateResponse event handler
649  #
650  # @param event instance of Event object
651  def onURLUpdateResponse(self, event):
652  try:
653  logger.debug("Reply received on URL update.")
654  clientResponse = event.eventObj
655  if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
656  if len(clientResponse.itemsList) > 0:
657  for clientResponseItem in clientResponse.itemsList:
658  if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
659  logger.error("URLUpdate response error: " + str(clientResponseItem.errorCode) + " : " + \
660  clientResponseItem.errorMessage + ", host:" + clientResponseItem.host + ", port:" + \
661  clientResponseItem.port + ", node:" + clientResponseItem.node + "!")
662  else:
663  logger.error("URLUpdate response empty list!")
664  else:
665  logger.error("URLUpdate response error:" + str(clientResponse.errorCode) + " : " + clientResponse.errorMessage)
666  except Exception as err:
667  ExceptionLog.handler(logger, err, "Exception:")
668 
669 
670  # #onURLDeleteResponse event handler
671  #
672  # @param event instance of Event object
673  def onURLDeleteResponse(self, event):
674  try:
675  logger.debug("Reply received on URL delete.")
676  clientResponse = event.eventObj
677  if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
678  if len(clientResponse.itemsList) > 0:
679  for clientResponseItem in clientResponse.itemsList:
680  if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
681  logger.error("URLDelete response error: " + str(clientResponseItem.errorCode) + " : " + \
682  clientResponseItem.errorMessage + ", host:" + clientResponseItem.host + ", port:" + \
683  clientResponseItem.port + ", node:" + clientResponseItem.node + "!")
684  else:
685  logger.error("URLDelete response empty list!")
686  else:
687  logger.error("URLDelete response error:" + str(clientResponse.errorCode) + " : " + clientResponse.errorMessage)
688  except Exception as err:
689  ExceptionLog.handler(logger, err, "Exception:")
690 
def __init__(self, configParser, connectionBuilderLight=None)
def updateStatField(self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
update values of stat field - default sum
NewTask event object, defines the Task object fields.
GeneralResponse event object, represents general state response for multipurpose usage.
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def addConnection(self, name, connection)
def sendURLUpdate(self, batchItemsList, batchId, batchState)
DeleteTask event object, to delete task from DTM application and from EE.
This is app base class for management server connection end-points and parallel transport messages pr...
def poll(self)
poll function polling connections receive as multipart msg, the second argument is pickled pyobj ...
CheckTaskState event object, for check task status inside EE.
Class hides routines of bulding connection objects.
def send(self, connect_name, event)
send event
def dtmdRequestExecute(self, requestEvent, timeout, maxTries=100)
GetTasksStatus event object, for check task status operation.
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
EEResponseData event object, store task results data, returned from EE.