HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
BatchTasksManager.py
Go to the documentation of this file.
1 '''
2 HCE project, Python bindings, Distributed Crawler application.
3 BatchTasksManager object and related classes definitions.
4 
5 @package: dc
6 @author bgv bgv.hce@gmail.com
7 @link: http://hierarchical-cluster-engine.com/
8 @copyright: Copyright © 2013-2016 IOIX Ukraine
9 @license: http://hierarchical-cluster-engine.com/license/
10 @since: 0.1
11 '''
12 
13 import time
14 import ConfigParser
15 import json
16 import random
17 
18 # import pickle
19 try:
20  import cPickle as pickle
21 except ImportError:
22  import pickle
23 
24 import copy
25 from app.BaseServerManager import BaseServerManager
26 from app.Utils import SQLExpression, varDump
27 from app.Utils import ExceptionLog
28 import app.Utils as Utils # pylint: disable=F0401
29 import dc.Constants as DC_CONSTS
30 from dc import EventObjects
31 from transport.ConnectionBuilderLight import ConnectionBuilderLight
32 import transport.Consts as TRANSPORT_CONSTS
33 import dtm.EventObjects
34 import dtm.Constants as DTM_CONSTS
35 
36 # Logger initialization
37 logger = Utils.MPLogger().getLogger()
38 
39 
40 # #The BatchTasksManager class, is a main crawling logic of DC application.
41 #
42 # This object is a main crawling batches algorithms of DC application.
43 # It supports complete DTM protocol requests and process responses from DTM server, operates with tasks and monitors
44 # tasks state.
45 #
47 
48  DTM_TASK_CHECK_STATE_METHOD_STATUS = 0
49  DTM_TASK_CHECK_STATE_METHOD_STATE = 1
50  CONFIG_BATCH_MAX_ITERATIONS_DEFAULT = 2
51 
52  BATCH_TASK_TYPE_ALL = 0
53  BATCH_TASK_TYPE_CRAWL = 1
54  BATCH_TASK_TYPE_PURGE = 2
55  BATCH_TASK_TYPE_AGE = 3
56 
57  # Common configuration settings options names
58  CONFIG_SERVER = "server"
59  CONFIG_DTMD_HOST = "DTMDHost"
60  CONFIG_DTMD_PORT = "DTMDPort"
61  CONFIG_DTMD_TIMEOUT = "DTMDTimeout"
62  CONFIG_SITES_MANAGER_CLIENT = "clientSitesManager"
63  CONFIG_POLLING_TIMEOUT = "PollingTimeout"
64  CONFIG_DRCE_CRAWLER_APP_NAME = "DRCECrawlerAppName"
65  CONFIG_CRAWLED_URLS_STRATEGY = "CrawledURLStrategy"
66  CONFIG_REGULAR_CRAWL_PERIOD = "RegularCrawlingPeriod"
67  CONFIG_REGULAR_CRAWL_MODE = "RegularCrawlingMode"
68  CONFIG_REGULAR_CRAWL_PROPAGATE_URLS = "RegularCrawlingPropagateURLs"
69  # Crawling task DTM name
70  CONFIG_TASK_DTM_NAME_CRAWLING = "BatchTaskDTMNameCrawl"
71  # Purging task DTM name
72  CONFIG_TASK_DTM_NAME_PURGING = "BatchTaskDTMNamePurge"
73  # Aging task DTM name
74  CONFIG_TASK_DTM_NAME_AGING = "BatchTaskDTMNameAging"
75  # Crawling task DTM type
76  CONFIG_TASK_DTM_TYPE_CRAWLING = "BatchTaskDTMTypeCrawl"
77  # Purging task DTM type
78  CONFIG_TASK_DTM_TYPE_PURGING = "BatchTaskDTMTypePurge"
79  # Aging task DTM type
80  CONFIG_TASK_DTM_TYPE_AGING = "BatchTaskDTMTypeAging"
81 
82  # Return URLs back state NEW configuration
83  CONFIG_RET_URLS_MAX_NUMBER = "ReturnURLsMaxNumber"
84  CONFIG_RET_URLS_PERIOD = "ReturnURLsPeriod"
85  CONFIG_RET_URLS_TTL = "ReturnURLsTTL"
86  CONFIG_RET_URLS_MODE = "ReturnURLsMode"
87 
88  # The incremental crawling configuration
89  CONFIG_INCR_MIN_FREQ = "IncrMinFreq"
90  CONFIG_INCR_MAX_DEPTH = "IncrMaxDepth"
91  CONFIG_INCR_MAX_URL = "IncrMaxURLs"
92  CONFIG_INCR_CRAWL_PERIOD = "IncrPeriod"
93  CONFIG_INCR_CRAWL_MODE = "IncrMode"
94 
95  # The batch configuration
96  CONFIG_BATCH_DEFAULT_MAX_TIME = "BatchDefaultMaxExecutionTime"
97  CONFIG_BATCH_MAX_URLS = "BatchDefaultMaxURLs"
98  CONFIG_BATCH_ORDER_BY_URLS = "BatchDefaultOrderByURLs"
99  CONFIG_BATCH_MAX_TASKS = "BatchDefaultMaxTasks"
100  CONFIG_BATCH_QUEUE_PERIOD = "BatchQueueProcessingPeriod"
101  CONFIG_BATCH_QUEUE_TASK_TTL = "BatchQueueTaskTTL"
102  CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD = "BatchQueueTaskCheckStateMethod"
103  CONFIG_BATCH_DEFAULT_STARTER = "BatchTask_STARTER"
104  CONFIG_BATCH_DEFAULT_CHECK_URLS_IN_ACTIVE_BATCHES = "BatchDefaultCheckURLsInActiveBatches"
105  CONFIG_BATCH_MAX_ITERATIONS = "BatchMaxIterations"
106  CONFIG_BATCH_FETCH_TYPE = "BatchDefaultFetchTypeOptions"
107  CONFIG_BATCH_MAX_TIME = "BatchMaxExecutionTime"
108  CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS = "RemoveUnprocessedBatchItems"
109 
110  # The tasks's strategy configuration parameters for DTM service
111  CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX = "BatchTask_IO_WAIT_MAX"
112  CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX = "BatchTask_CPU_LOAD_MAX"
113  CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN = "BatchTask_RAM_FREE_MIN"
114  CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY = "BatchTask_RDELAY"
115  CONFIG_BATCH_DEFAULT_STRATEGY_RETRY = "BatchTask_RETRY"
116  CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL = "BatchTask_autocleanup_TTL"
117  CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE = "BatchTask_autocleanup_DeleteType"
118  CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES = "BatchTask_autocleanup_DeleteRetries"
119  CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE = "BatchTask_autocleanup_State"
120 
121  # The purge algorithm configuration
122  CONFIG_PURGE_PERIOD = "PurgePeriod"
123  CONFIG_PURGE_MODE = "PurgeMode"
124  CONFIG_PURGE_BATCH_DEFAULT_MAX_TIME = "PurgeBatchDefaultMaxExecutionTime"
125  CONFIG_PURGE_BATCH_MAX_URLS = "PurgeBatchDefaultMaxURLs"
126  CONFIG_PURGE_BATCH_QUEUE_TASK_TTL = "PurgeBatchQueueTaskTTL"
127  CONFIG_PURGE_BATCH_DEFAULT_STARTER = "PurgeBatchTask_STARTER"
128  CONFIG_DRCE_DB_APP_NAME = "DRCEDBAppName"
129  CONFIG_PURGE_BATCH_MAX_TASKS = "PurgeBatchDefaultMaxTasks"
130 
131  # Aging config names
132  CONFIG_AGING_PERIOD = "AgingPeriod"
133  CONFIG_RESOURCE_AGING_MODE = "AgingMode"
134  CONFIG_AGE_BATCH_DEFAULT_MAX_TIME = "AgingBatchDefaultMaxExecutionTime"
135  CONFIG_AGE_BATCH_MAX_URLS_SITE = "AgingBatchDefaultMaxURLsSite"
136  CONFIG_AGE_BATCH_MAX_URLS_TOTAL = "AgingBatchDefaultMaxURLsTotal"
137  CONFIG_AGE_BATCH_MAX_SITES = "AgingBatchDefaultMaxSites"
138  CONFIG_AGE_BATCH_QUEUE_TASK_TTL = "AgingBatchQueueTaskTTL"
139  CONFIG_AGE_BATCH_DEFAULT_STARTER = "AgingBatchTask_STARTER"
140  CONFIG_AGE_BATCH_MAX_TASKS = "AgingBatchDefaultMaxTasks"
141  CONFIG_AGE_BATCH_URL_CRITERION = "AgingBatchURLsCriterion"
142  CONFIG_AGE_BATCH_SITE_CRITERION = "AgingBatchSitesCriterion"
143 
144  BATCH_FETCH_TYPE_COOKIE_NAME = "FetchType"
145  BATCH_ID_COOKIE_NAME = "batchId"
146 
147 
148  # #constructor
149  # initialize fields
150  #
151  # @param configParser config parser object
152  # @param connectBuilderLight connection builder light
153  #
154  def __init__(self, configParser, connectionBuilderLight=None):
155  super(BatchTasksManager, self).__init__()
156 
157  # Instantiate the connection builder light if not set
158  if connectionBuilderLight is None:
159  connectionBuilderLight = ConnectionBuilderLight()
160 
161  # Batches counter init in stat vars
162  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_TOTAL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
163  # Batches in queue counter init in stat vars
164  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_QUEUE_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
165  # Batches that fault processing counter init in stat vars
166  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
167  # Batches that not empty counter init in stat vars
168  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FILLED_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
169  # Batches urls total counter init in stat vars
170  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
171  # Fault batches urls total counter init in stat vars
172  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
173  # Crawling URL_FETCH requests counter name for stat variables
174  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
175  # Crawling cancelled URL_FETCH requests counter name for stat variables
176  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_CANCELLED_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
177  # Crawling delete task requests fault counter name for stat variables
178  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_DELETE_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
179  # Crawling check task requests fault counter name for stat variables
180  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_CHECK_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
181  # Crawling batches urls returned counter name for stat variables
182  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_RET_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
183  # Crawling incremental URL_FETCH requests counter name for stat variables
184  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_INCR_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
185  # Crawling batches fault TTL counter name for stat variables
186  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FAULT_TTL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
187  # Batches items/urls average counter init in stat vars
188  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_ITEMS_AVG_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
189  # Batches dynamic fetcher's batches counter init in stat vars
190  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FETCHER_DYNAMIC, 0, self.STAT_FIELDS_OPERATION_INIT)
191  # Batches static fetcher's batches counter init in stat vars
192  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FETCHER_STATIC, 0, self.STAT_FIELDS_OPERATION_INIT)
193  # Batches mixed static and dynamic fetcher's batches counter init in stat vars
194  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FETCHER_MIXED, 0, self.STAT_FIELDS_OPERATION_INIT)
195 
196  # Get configuration settings
197  className = self.__class__.__name__
198  self.serverName = configParser.get(className, self.CONFIG_SERVER)
199  self.clientSitesManagerName = configParser.get(className, self.CONFIG_SITES_MANAGER_CLIENT)
200  # Configuration settings for DTMD server interaction
201  self.configVars[self.CONFIG_DTMD_HOST] = configParser.get(className, self.CONFIG_DTMD_HOST)
202  self.configVars[self.CONFIG_DTMD_PORT] = configParser.get(className, self.CONFIG_DTMD_PORT)
203  self.configVars[self.CONFIG_DTMD_TIMEOUT] = configParser.getint(className, self.CONFIG_DTMD_TIMEOUT)
204 
205  # Max URLs per batch
206  self.configVars[self.CONFIG_BATCH_MAX_URLS] = configParser.getint(className, self.CONFIG_BATCH_MAX_URLS)
207 
208  # Max execution time for batch
209  self.configVars[self.CONFIG_BATCH_MAX_TIME] = configParser.getint(className, self.CONFIG_BATCH_MAX_TIME)
210  # Remove unprocessed items for batch
212  bool(configParser.getint(className, self.CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS))
213 
214 
215  # Set connections poll timeout, defines period of HCE cluster monitoring cycle, msec
216  self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] = configParser.getint(className, self.CONFIG_POLLING_TIMEOUT)
217  # Configuration settings for incremental crawling
218  self.configVars[DC_CONSTS.INCR_MIN_FREQ_CONFIG_VAR_NAME] = configParser.getint(className, self.CONFIG_INCR_MIN_FREQ)
219  self.configVars[DC_CONSTS.INCR_MAX_DEPTH_CONFIG_VAR_NAME] = configParser.getint(className,
221  self.configVars[DC_CONSTS.INCR_MAX_URLS_CONFIG_VAR_NAME] = configParser.getint(className, self.CONFIG_INCR_MAX_URL)
222  self.configVars[self.CONFIG_INCR_CRAWL_MODE] = configParser.getint(className, self.CONFIG_INCR_CRAWL_MODE)
223 
224  # Set crawler task app name
225  self.configVars[self.CONFIG_DRCE_CRAWLER_APP_NAME] = configParser.get(className, self.CONFIG_DRCE_CRAWLER_APP_NAME)
227  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_MAX_TIME)
228 
229  # Create connections and raise bind or connect actions for correspondent connection type
230  serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.serverName)
231  sitesManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT, self.clientSitesManagerName)
232 
233  # Init the DTMD connection
234  self.dtmdConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
235  self.configVars[self.CONFIG_DTMD_HOST] + ":" + \
236  self.configVars[self.CONFIG_DTMD_PORT],
237  TRANSPORT_CONSTS.TCP_TYPE)
238 
239  # Add connections to the polling set
240  self.addConnection(self.serverName, serverConnection)
241  self.addConnection(self.clientSitesManagerName, sitesManagerConnection)
242 
243  # Set event handler for URL_FETCH_RESPONSE event
244  self.setEventHandler(DC_CONSTS.EVENT_TYPES.URL_FETCH_RESPONSE, self.onURLFetchResponse)
245  # Set event handler for URL_UPDATE_RESPONSE event
246  self.setEventHandler(DC_CONSTS.EVENT_TYPES.URL_UPDATE_RESPONSE, self.onURLUpdateResponse)
247  # Set event handler for URL_DELETE_RESPONSE event
248  self.setEventHandler(DC_CONSTS.EVENT_TYPES.URL_DELETE_RESPONSE, self.onURLDeleteResponse)
249  # Set event handler for URL_NEW_RESPONSE event
250  self.setEventHandler(DC_CONSTS.EVENT_TYPES.URL_NEW_RESPONSE, self.onURLNewResponse)
251 
252  # Initialize the DTM tasks queue, the key is taskId, the value is the Batch object
253  self.dtmTasksQueue = {}
254 
255  # Init config vars storage for return URLs selected for processing
256  self.configVars[self.CONFIG_RET_URLS_MAX_NUMBER] = configParser.getint(className, self.CONFIG_RET_URLS_MAX_NUMBER)
257  self.configVars[self.CONFIG_RET_URLS_PERIOD] = configParser.getint(className, self.CONFIG_RET_URLS_PERIOD)
258  self.configVars[self.CONFIG_RET_URLS_TTL] = configParser.getint(className, self.CONFIG_RET_URLS_TTL)
259  self.configVars[self.CONFIG_RET_URLS_MODE] = configParser.getint(className, self.CONFIG_RET_URLS_MODE)
260  self.processSelectedURLsRetLastTs = time.time()
261 
262  # Incremental crawling init
263  self.configVars[self.CONFIG_INCR_CRAWL_PERIOD] = configParser.getint(className, self.CONFIG_INCR_CRAWL_PERIOD)
264  self.processIncrCrawlLastTs = time.time()
265 
266  # Crawled URLs strategy for batch
267  self.configVars[self.CONFIG_CRAWLED_URLS_STRATEGY] = configParser.getint(className,
269  # Batch default order by criterion to fetch URLs
270  self.configVars[self.CONFIG_BATCH_ORDER_BY_URLS] = configParser.get(className, self.CONFIG_BATCH_ORDER_BY_URLS)
271 
272  # Batch default fetch type options
273  try:
274  self.configVars[self.CONFIG_BATCH_FETCH_TYPE] = configParser.get(className, self.CONFIG_BATCH_FETCH_TYPE)
275  self.configVars[self.CONFIG_BATCH_FETCH_TYPE] = json.loads(self.configVars[self.CONFIG_BATCH_FETCH_TYPE])
276  except ConfigParser.NoOptionError:
277  self.configVars[self.CONFIG_BATCH_FETCH_TYPE] = {}
278 
279  # Max tasks in batch queue, if limit reached new batch tasks will not be started; zero means unlimited
280  self.configVars[self.CONFIG_BATCH_MAX_TASKS] = configParser.getint(className, self.CONFIG_BATCH_MAX_TASKS)
281  # Regular crawling init
282  self.configVars[self.CONFIG_REGULAR_CRAWL_PERIOD] = configParser.getint(className, self.CONFIG_REGULAR_CRAWL_PERIOD)
283  self.configVars[self.CONFIG_REGULAR_CRAWL_MODE] = configParser.getint(className, self.CONFIG_REGULAR_CRAWL_MODE)
285  configParser.getint(className, self.CONFIG_REGULAR_CRAWL_PROPAGATE_URLS)
286 
287  self.processRegularCrawlLastTs = time.time()
288  # The batch queue processing init
289  self.configVars[self.CONFIG_BATCH_QUEUE_PERIOD] = configParser.getint(className, self.CONFIG_BATCH_QUEUE_PERIOD)
290  self.processBatchQueuelLastTs = time.time()
291  # The batch queue task TTL, sec
292  self.configVars[self.CONFIG_BATCH_QUEUE_TASK_TTL] = configParser.getint(className, self.CONFIG_BATCH_QUEUE_TASK_TTL)
293  # The batch queue tasks state check method, see ini file comments
295  configParser.getint(className, self.CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD)
296 
297  # The tasks's strategy configuration parameters for DTM service load
299  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX)
301  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX)
303  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN)
305  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY)
307  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_RETRY)
308 
309  # The DRCE task starter name
310  self.configVars[self.CONFIG_BATCH_DEFAULT_STARTER] = configParser.get(className, self.CONFIG_BATCH_DEFAULT_STARTER)
311 
312  # The auto cleanup fields
314  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL)
316  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE)
318  configParser.get(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES)
320  configParser.get(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE)
321 
322  # URLFetch requests counter init
324  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_REQUESTS_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
325 
326  # Check URLs in active batches before add in to new batch init
328  configParser.getint(className, self.CONFIG_BATCH_DEFAULT_CHECK_URLS_IN_ACTIVE_BATCHES)
329 
330  # Purge algorithm init
331  self.configVars[self.CONFIG_PURGE_PERIOD] = configParser.getint(className, self.CONFIG_PURGE_PERIOD)
333  configParser.getint(className, self.CONFIG_PURGE_BATCH_DEFAULT_MAX_TIME)
334  self.configVars[self.CONFIG_PURGE_BATCH_MAX_URLS] = configParser.getint(className, self.CONFIG_PURGE_BATCH_MAX_URLS)
336  configParser.getint(className, self.CONFIG_PURGE_BATCH_QUEUE_TASK_TTL)
338  configParser.get(className, self.CONFIG_PURGE_BATCH_DEFAULT_STARTER)
339  self.configVars[self.CONFIG_DRCE_DB_APP_NAME] = configParser.get(className, self.CONFIG_DRCE_DB_APP_NAME)
341  configParser.getint(className, self.CONFIG_PURGE_BATCH_MAX_TASKS)
342  self.configVars[self.CONFIG_PURGE_MODE] = configParser.getint(className, self.CONFIG_PURGE_MODE)
343 
344  # Purge batches in queue counter init in stat vars
345  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
346  self.processPurgeLastTs = time.time()
347  # Purge total batches counter name for stat variables
348  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_TOTAL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
349  # Purge batches cancelled DRCE task requests counter name for stat variables
350  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_CANCELLED_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
351  # Purge batches DRCE task requests error counter name for stat variables
352  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_ERROR_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
353  # Purge batches DRCE task requests execution faults counter name for stat variables
354  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
355  # Purge batches DRCE task check state faults counter name for stat variables
356  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_CHECK_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
357 
358  # Crawling task DTM name
359  self.configVars[self.CONFIG_TASK_DTM_NAME_CRAWLING] = configParser.get(className,
361  # Purging task DTM name
362  self.configVars[self.CONFIG_TASK_DTM_NAME_PURGING] = configParser.get(className,
364  # Crawling task DTM type
365  self.configVars[self.CONFIG_TASK_DTM_TYPE_CRAWLING] = configParser.getint(className,
367  # Purging task DTM type
368  self.configVars[self.CONFIG_TASK_DTM_TYPE_PURGING] = configParser.getint(className,
370 
371  # Init default resource aging settings
372  self.configVars[self.CONFIG_AGING_PERIOD] = configParser.getint(className,
373  self.CONFIG_AGING_PERIOD)
374  self.configVars[self.CONFIG_RESOURCE_AGING_MODE] = configParser.getint(className,
376  self.configVars[self.CONFIG_TASK_DTM_NAME_AGING] = configParser.get(className, self.CONFIG_TASK_DTM_NAME_AGING)
377  self.configVars[self.CONFIG_TASK_DTM_TYPE_AGING] = configParser.getint(className, self.CONFIG_TASK_DTM_TYPE_AGING)
378  self.configVars[self.CONFIG_AGE_BATCH_DEFAULT_MAX_TIME] = configParser.getint(className,
379  self.CONFIG_AGE_BATCH_DEFAULT_MAX_TIME) # pylint: disable=C0301
380  self.configVars[self.CONFIG_AGE_BATCH_MAX_URLS_SITE] = configParser.getint(className,
382  self.configVars[self.CONFIG_AGE_BATCH_MAX_URLS_TOTAL] = configParser.getint(className,
384  self.configVars[self.CONFIG_AGE_BATCH_MAX_SITES] = configParser.getint(className, self.CONFIG_AGE_BATCH_MAX_SITES)
385  self.configVars[self.CONFIG_AGE_BATCH_QUEUE_TASK_TTL] = configParser.getint(className,
387  self.configVars[self.CONFIG_AGE_BATCH_DEFAULT_STARTER] = configParser.get(className,
389  self.configVars[self.CONFIG_AGE_BATCH_MAX_TASKS] = configParser.getint(className, self.CONFIG_AGE_BATCH_MAX_TASKS)
390  self.configVars[self.CONFIG_AGE_BATCH_URL_CRITERION] = configParser.get(className,
392  self.configVars[self.CONFIG_AGE_BATCH_SITE_CRITERION] = configParser.get(className,
394  self.processAgingLastTs = time.time()
395 
396  # Age batches in queue counter init in stat vars
397  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
398  # Age total batches counter name for stat variables
399  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_TOTAL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
400  # Age batches cancelled DRCE task requests counter name for stat variables
401  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_CANCELLED_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
402  # Age batches DRCE task requests error counter name for stat variables
403  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_ERROR_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
404  # Age batches DRCE task requests execution faults counter name for stat variables
405  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
406  # Age batches DRCE task check state faults counter name for stat variables
407  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_CHECK_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
408 
409  if configParser.has_option(className, self.CONFIG_BATCH_MAX_ITERATIONS):
410  self.configVars[self.CONFIG_BATCH_MAX_ITERATIONS] = configParser.getint(className,
412  else:
414 
415 
416 
417  # #Events wait timeout handler, for timeout state of the connections polling. Executes periodical check of DTM tasks
418  # and initiate the main crawling iteration cycle
419  #
420  def on_poll_timeout(self):
421  logger.debug("Periodic iteration started.")
422  try:
423  # Process regular crawl batching
424  if self.configVars[self.CONFIG_REGULAR_CRAWL_PERIOD] > 0 and\
425  time.time() - self.processRegularCrawlLastTs > self.configVars[self.CONFIG_REGULAR_CRAWL_PERIOD]:
426  self.processRegularCrawlLastTs = time.time()
427  if self.configVars[self.CONFIG_REGULAR_CRAWL_MODE] == 1:
428  crawlTasks = self.getBatchTasksCount(self.BATCH_TASK_TYPE_CRAWL)
429  logger.debug("URL_FETCH for regular crawling, now crawl tasks: %s, URLFetch counter:%s", str(crawlTasks),
430  str(self.sendURLFetchRequestCounter))
431  # Process the first step of crawling iteration
432  if self.configVars[self.CONFIG_BATCH_MAX_TASKS] > crawlTasks and\
434  self.sendURLFetchRequest()
435  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
437  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_REQUESTS_NAME, 1,
439  else:
440  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_CANCELLED_NAME, 1,
442  logger.debug("Max crawl batch tasks %s>=%s in queue or URLFetch counter %s reached!",
443  str(crawlTasks), str(self.configVars[self.CONFIG_BATCH_MAX_TASKS]),
444  str(self.sendURLFetchRequestCounter))
445  else:
446  logger.debug("Regular crawling disabled!")
447 
448  # Process Batch queue
449  if self.configVars[self.CONFIG_BATCH_QUEUE_PERIOD] > 0 and\
450  time.time() - self.processBatchQueuelLastTs > self.configVars[self.CONFIG_BATCH_QUEUE_PERIOD]:
451  self.processBatchQueuelLastTs = time.time()
452  logger.debug("Process DTM tasks queue!")
453  # Process the DTM tasks queue
454  self.processDTMTasksQueue()
455 
456  # Process periodic selected for crawling URLs return
457  if self.configVars[self.CONFIG_RET_URLS_PERIOD] > 0 and\
458  time.time() - self.processSelectedURLsRetLastTs > self.configVars[self.CONFIG_RET_URLS_PERIOD]:
459  if self.configVars[self.CONFIG_RET_URLS_MODE] == 1:
460  logger.debug("Now time to perform URLs return, interval " + str(self.configVars[self.CONFIG_RET_URLS_PERIOD]))
461  self.processSelectedURLsRetLastTs = time.time()
463  else:
464  logger.debug("URLs return for crawling disabled!")
465 
466  # TODO: Process incremental crawling (not tested)
467  if self.configVars[self.CONFIG_INCR_CRAWL_PERIOD] > 0 and\
468  time.time() - self.processIncrCrawlLastTs > self.configVars[self.CONFIG_INCR_CRAWL_PERIOD]:
469  self.processIncrCrawlLastTs = time.time()
470  logger.debug("URL_FETCH for incremental crawling iteration!")
471  if self.configVars[self.CONFIG_INCR_CRAWL_MODE] == 1:
472  # Process the first step of incremental crawling iteration
473  self.sendIncrURLRequest()
474  else:
475  logger.debug("Incremental crawling disabled!")
476 
477  # Process aging resources batching
478  if self.configVars[self.CONFIG_AGING_PERIOD] > 0 and\
479  time.time() - self.processAgingLastTs > (int(self.configVars[self.CONFIG_AGING_PERIOD]) * 60):
480  if self.configVars[self.CONFIG_RESOURCE_AGING_MODE] == 1:
481  logger.debug("Now time to perform aging, interval %s", str(self.configVars[self.CONFIG_AGING_PERIOD]))
482  self.processAgingLastTs = time.time()
483  ageTasks = self.getBatchTasksCount(self.BATCH_TASK_TYPE_AGE)
484  logger.debug("Age batching, tasks: %s!", str(ageTasks))
485  if self.configVars[self.CONFIG_AGE_BATCH_MAX_TASKS] > ageTasks:
486  if self.setAgeBatch():
487  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_NAME,
489  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_TOTAL_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
490  else:
491  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_ERROR_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
492  else:
493  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_CANCELLED_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
494  logger.debug("Max age tasks %s reached", str(ageTasks))
495  else:
496  logger.debug("Resources aging disabled")
497 
498  # Process purge resources batching
499  if self.configVars[self.CONFIG_PURGE_PERIOD] > 0 and\
500  time.time() - self.processPurgeLastTs > (int(self.configVars[self.CONFIG_PURGE_PERIOD]) * 60):
501  self.processPurgeLastTs = time.time()
502  logger.debug("Now time to perform purging, interval %s", str(self.configVars[self.CONFIG_PURGE_PERIOD]))
503  if self.configVars[self.CONFIG_PURGE_MODE] == 1:
504  purgeTasks = self.getBatchTasksCount(self.BATCH_TASK_TYPE_PURGE)
505  logger.debug("Purge batching, tasks: %s!", str(purgeTasks))
506  if self.configVars[self.CONFIG_PURGE_BATCH_MAX_TASKS] > purgeTasks:
507  if self.setPurgeBatch():
508  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_NAME,
510  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_TOTAL_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
511  else:
512  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_ERROR_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
513  else:
514  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_CANCELLED_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
515  logger.debug("Max purge tasks %s reached", str(purgeTasks))
516  else:
517  logger.debug("Purging disabled!")
518 
519  except IOError as e:
520  del e
521  except Exception as err:
522  logger.error("Exception: " + str(err.message) + "\n" + Utils.getTracebackInfo())
523 
524  logger.debug("Periodic iteration finished.")
525 
526 
527 
528  # #Create new purge batch, send it to execute as asynchronous DRCE task and insert in to the batches queue
529  #
530  #
531  def setPurgeBatch(self):
532  ret = False
533 
534  try:
535  crit = {EventObjects.URLPurge.CRITERION_LIMIT:str(self.configVars[self.CONFIG_PURGE_BATCH_MAX_URLS])}
536  batchURLPurge = EventObjects.URLPurge(None, None, EventObjects.URLStatus.URL_TYPE_MD5, crit)
537  # Process all sites from first listed by SHOW TABLES
538  batchURLPurge.siteLimits = (0, EventObjects.URLPurge.ALL_SITES)
539  taskId = self.sendBatchTaskToDTM(batchURLPurge)
540  if taskId > 0:
541  logger.debug("DTM purge batch was set, taskId=%s", str(taskId))
542  # Insert the Batch object in to the queue
543  batchURLPurge.queuedTs = time.time()
544  batchURLPurge.crawlerType = EventObjects.Batch.TYPE_PURGE
545  self.dtmTasksQueue[taskId] = batchURLPurge
546  ret = True
547  else:
548  logger.error("Error send purge batch task to DTM!")
549 
550  except Exception as err:
551  logger.error("Exception: " + str(err.message) + "\n" + Utils.getTracebackInfo())
552 
553  return ret
554 
555 
556 
557  # #Create new age batch, send it to execute as asynchronous DRCE task and insert in to the batches queue
558  #
559  #
560  def setAgeBatch(self):
561  ret = False
562 
563  try:
564  # Set default criterions
565  urlsCriterions = {EventObjects.URLAge.CRITERION_LIMIT:str(self.configVars[self.CONFIG_AGE_BATCH_MAX_URLS_SITE]),
566  EventObjects.URLAge.CRITERION_WHERE:str(self.configVars[self.CONFIG_AGE_BATCH_URL_CRITERION])}
567  sitesCriterions = {EventObjects.URLAge.CRITERION_WHERE:str(self.configVars[self.CONFIG_AGE_BATCH_SITE_CRITERION])}
568 
569  batchURLAge = EventObjects.URLAge(urlsCriterions, sitesCriterions)
570  batchURLAge.maxURLs = int(self.configVars[self.CONFIG_AGE_BATCH_MAX_URLS_TOTAL])
571  batchURLAge.delayedType = EventObjects.DELAYED_OPERATION
572  taskId = self.sendBatchTaskToDTM(batchURLAge)
573  if taskId > 0:
574  logger.debug("DTM age batch was set, taskId=%s", str(taskId))
575  # Insert the Batch object in to the queue
576  batchURLAge.queuedTs = time.time()
577  batchURLAge.crawlerType = EventObjects.Batch.TYPE_AGE
578  self.dtmTasksQueue[taskId] = batchURLAge
579  ret = True
580  else:
581  logger.error("Error send age batch task to DTM!")
582 
583  except Exception as err:
584  logger.error("Exception: " + str(err.message) + "\n" + Utils.getTracebackInfo())
585 
586  return ret
587 
588 
589 
590  # #onURLFetchResponse event handler
591  #
592  # @param event instance of Event object
593  def onURLFetchResponse(self, event):
594  try:
595  if self.sendURLFetchRequestCounter > 0:
597  if self.statFields[DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_REQUESTS_NAME] > 0:
598  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_REQUESTS_NAME, 1,
600 
601  logger.debug("Reply received on URL fetch: %s", varDump(event))
602  if event.cookie is not None and isinstance(event.cookie, dict):
603  crawlerType = event.cookie.get('type', EventObjects.Batch.TYPE_NORMAL_CRAWLER)
604  fetchType = event.cookie.get(self.BATCH_FETCH_TYPE_COOKIE_NAME, None)
605  batchId = event.cookie.get(self.BATCH_ID_COOKIE_NAME, 0)
606  else:
607  crawlerType = EventObjects.Batch.TYPE_NORMAL_CRAWLER
608  fetchType = None
609  batchId = 0
610  clientResponse = event.eventObj
611  if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
612  if len(clientResponse.itemsList) > 0:
613  if event.cookie is not None and\
614  (isinstance(event.cookie, dict) and\
615  EventObjects.Batch.OPERATION_TYPE_NAME in event.cookie and\
616  (event.cookie[EventObjects.Batch.OPERATION_TYPE_NAME] == EventObjects.Batch.TYPE_NORMAL_CRAWLER or\
617  event.cookie[EventObjects.Batch.OPERATION_TYPE_NAME] == EventObjects.Batch.TYPE_INCR_CRAWLER)):
618  # Process response for main crawling URLFetch request
619  batch = self.makeBatchFromClientResponseItems(clientResponse.itemsList, crawlerType, batchId)
620  if len(batch.items) > 0:
622  isinstance(batch, EventObjects.Batch):
623  # Execute URLNew and insert URLs from the Batch in CRAWLED state to stop redundant crawling
624  self.sendURLNew(batch.items)
625  maxExecutionTime = None
626  if fetchType is not None and fetchType == EventObjects.Site.FETCH_TYPE_DYNAMIC:
627  if 'dfetcher_BatchMaxExecutionTime' in self.configVars[self.CONFIG_BATCH_FETCH_TYPE]:
628  maxExecutionTime = self.configVars[self.CONFIG_BATCH_FETCH_TYPE]['dfetcher_BatchMaxExecutionTime']
629  # Send New Batch task to DTM
630  taskId = self.sendBatchTaskToDTM(batch, maxExecutionTime)
631  if taskId > 0:
632  logger.debug("DTM batch was set, taskId=%s", str(taskId))
633  # Insert the Batch object in to the queue
634  batch.queuedTs = time.time()
635  if fetchType is not None and fetchType == EventObjects.Site.FETCH_TYPE_DYNAMIC:
636  if 'dfetcher_BatchQueueTaskTTL' in self.configVars[self.CONFIG_BATCH_FETCH_TYPE]:
637  ttl = self.configVars[self.CONFIG_BATCH_FETCH_TYPE]['dfetcher_BatchQueueTaskTTL']
638  batch.ttl = ttl
639  self.dtmTasksQueue[taskId] = batch
640  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_QUEUE_NAME, len(self.dtmTasksQueue),
642  else:
643  logger.error("Error send the Batch object to DTM!")
644  if crawlerType == EventObjects.Batch.TYPE_NORMAL_CRAWLER:
645  # Update URLs state back to New to get possibility to select them next time
646  self.sendURLUpdate(batch.items, batch.id, False)
647  else:
648  logger.debug("There is no items in batch, cancelled!")
649  else:
650  # Process response for URLs return URLFetch request
651  urls = self.getURLsCountFromClientResponseItems(clientResponse.itemsList)
652  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_RET_NAME, urls, self.STAT_FIELDS_OPERATION_ADD)
653  logger.debug(str(urls) + " URLs returned back to state NEW because processing TTL exceed.")
654  else:
655  logger.debug("There is empty clientResponse.itemsList")
656  else:
657  logger.error("URLFetch response error: " + str(clientResponse.errorCode) + " : " + clientResponse.errorMessage)
658  except Exception as err:
659  ExceptionLog.handler(logger, err, "Exception:")
660 
661 
662  # #onURLUpdateResponse event handler
663  #
664  # @param event instance of Event object
665  def onURLUpdateResponse(self, event):
666  try:
667  logger.debug("Reply received on URL update.")
668  clientResponse = event.eventObj
669  if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
670  if len(clientResponse.itemsList) > 0:
671  for clientResponseItem in clientResponse.itemsList:
672  if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
673  logger.error("URLUpdate response error: " + str(clientResponseItem.errorCode) + " : " + \
674  clientResponseItem.errorMessage + ", host:" + clientResponseItem.host + ", port:" + \
675  clientResponseItem.port + ", node:" + clientResponseItem.node + "!")
676  else:
677  logger.error("URLUpdate response empty list!")
678  else:
679  logger.error("URLUpdate response error:" + str(clientResponse.errorCode) + " : " + clientResponse.errorMessage)
680  except Exception as err:
681  ExceptionLog.handler(logger, err, "Exception:")
682 
683 
684  # #make the Batch object from the ClientResponse object items
685  # @param clientResponseItems The list of ClientResponseItem objects
686  # @param maxExecutionTime for the hce-node DRCE asynchronous task
687  #
688  def sendBatchTaskToDTM(self, batch, maxExecutionTime=None):
689  taskId = 0
690  # Prepare NewTask object
691  if isinstance(batch, EventObjects.Batch):
692  # Crawl batch
693  appName = self.configVars[self.CONFIG_DRCE_CRAWLER_APP_NAME]
694  else:
695  # Purge or Age batch
696  appName = self.configVars[self.CONFIG_DRCE_DB_APP_NAME]
697  newTaskObj = dtm.EventObjects.NewTask(appName)
698  # Set DRCE task name and type
699  newTaskObj.setSessionVar("tmode", dtm.EventObjects.Task.TASK_MODE_ASYNCH)
700  if isinstance(batch, EventObjects.Batch):
701  # Crawl batch
702  newTaskObj.setSessionVar("shell", self.configVars[self.CONFIG_BATCH_DEFAULT_STARTER])
703  if maxExecutionTime is None:
704  mt = int(self.configVars[self.CONFIG_BATCH_DEFAULT_MAX_TIME])
705  else:
706  mt = int(maxExecutionTime)
707  newTaskObj.setSessionVar("time_max", mt * 1000)
708  newTaskObj.name = self.configVars[self.CONFIG_TASK_DTM_NAME_CRAWLING]
709  newTaskObj.type = self.configVars[self.CONFIG_TASK_DTM_TYPE_CRAWLING]
710  else:
711  if isinstance(batch, EventObjects.URLPurge):
712  # Purge batch
713  newTaskObj.setSessionVar("shell", self.configVars[self.CONFIG_PURGE_BATCH_DEFAULT_STARTER])
714  newTaskObj.setSessionVar("time_max", int(self.configVars[self.CONFIG_PURGE_BATCH_DEFAULT_MAX_TIME]) * 1000)
715  newTaskObj.name = self.configVars[self.CONFIG_TASK_DTM_NAME_PURGING]
716  newTaskObj.type = self.configVars[self.CONFIG_TASK_DTM_TYPE_PURGING]
717  # Set route round-robin to prevent default resource-usage balancing for purging tasks
718  newTaskObj.setSessionVar("route", DC_CONSTS.DRCE_REQUEST_ROUTING_RND)
719  else:
720  # Age batch
721  newTaskObj.setSessionVar("shell", self.configVars[self.CONFIG_AGE_BATCH_DEFAULT_STARTER])
722  newTaskObj.setSessionVar("time_max", int(self.configVars[self.CONFIG_AGE_BATCH_DEFAULT_MAX_TIME]) * 1000)
723  newTaskObj.name = self.configVars[self.CONFIG_TASK_DTM_NAME_AGING]
724  newTaskObj.type = self.configVars[self.CONFIG_TASK_DTM_TYPE_AGING]
725  # Set route round-robin to prevent default resource-usage balancing for aging tasks
726  newTaskObj.setSessionVar("route", DC_CONSTS.DRCE_REQUEST_ROUTING_ROUND_ROBIN)
727  newTaskObj.setSessionVar("task_type", int(newTaskObj.type))
728  # Configure task's strategy
730  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_IO_WAIT_MAX,
733  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_CPU_LOAD_MAX,
736  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RAM_FREE,
739  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RDELAY,
742  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RETRY,
744  # Set auto cleanup fields
745  autoCleanupFields = {}
747  autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_TTL] = \
750  autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_DELETE_TYPE] = \
753  autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_DELETE_RETRIES] = \
756  autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_SSTATE] = \
758  if len(autoCleanupFields) > 0:
759  newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_autoCleanupFields, autoCleanupFields)
760  # Set task Id
761  if not hasattr(batch, 'id') or batch.id == 0:
762  batch.id = newTaskObj.id
763  else:
764  newTaskObj.id = batch.id
765  # Set task's input object string stream
766  if isinstance(batch, EventObjects.Batch):
767  newTaskObj.input = pickle.dumps(batch)
768  else:
769  if isinstance(batch, EventObjects.URLPurge):
770  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_PURGE, [batch])
771  else:
772  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_AGE, [batch])
773  newTaskObj.input = pickle.dumps(drceSyncTasksCoverObj)
774  # Set task's event
775  newTaskEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.NEW_TASK, newTaskObj)
776  # Execute task
777  generalResponse = self.dtmdRequestExecute(newTaskEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
778  if generalResponse is not None:
779  if generalResponse.errorCode == dtm.EventObjects.GeneralResponse.ERROR_OK:
780  # New crawling Batch task set successfully
781  taskId = newTaskObj.id
782  else:
783  # Some error of task operation
784  logger.error("DTM set batch task error: " + str(generalResponse.errorCode) + " : " + \
785  generalResponse.errorMessage + ", statuses:" + varDump(generalResponse))
786  else:
787  logger.error("DTM set batch task response error, possible timeout!")
788 
789  # TODO: return the Task Id any case error or not to check it state later
790  taskId = newTaskObj.id
791 
792  return taskId
793 
794 
795 
796  # #make the Batch object from the ClientResponse object items
797 
798  # @param clientResponseItems The list of ClientResponseItem objects
799  # @return the Batch object instance
800  def makeBatchFromClientResponseItems(self, clientResponseItems, crawlerType, batchId=0):
801  batch = EventObjects.Batch(batchId, None, crawlerType)
802  batch.maxIterations = self.configVars[self.CONFIG_BATCH_MAX_ITERATIONS]
803  batch.maxExecutionTime = self.configVars[self.CONFIG_BATCH_MAX_TIME]
804  batch.removeUnprocessedItems = self.configVars[self.CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS]
805  batchItemsCounter = 0
806  batchItemsTotalCounter = 0
807  uniqueURLsCRCDic = {}
808  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_TOTAL_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
809 
810  for item in clientResponseItems:
811  if item.errorCode == EventObjects.ClientResponseItem.STATUS_OK:
812  if isinstance(item.itemObject, list):
813  for url in item.itemObject:
814  batchItemsTotalCounter = batchItemsTotalCounter + 1
815  if isinstance(url, EventObjects.URL):
816  # Check is this URL under processing in another active batch
818  batchTaskId = self.getBatchTaskIdByURL(url.siteId, url.urlMd5)
819  else:
820  batchTaskId = 0
821  if batchTaskId == 0:
822  url.batchId = batchId
823  batchItem = EventObjects.BatchItem(url.siteId, url.urlMd5, url)
824  itemId = str(url.siteId) + ":" + str(url.urlMd5)
825  if itemId not in uniqueURLsCRCDic:
826  uniqueURLsCRCDic[itemId] = batchItem
827  logger.debug("Insert batchItem: %s", varDump(batchItem))
828  batch.items.append(batchItem)
829  batchItemsCounter = batchItemsCounter + 1
830  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
831  else:
832  logger.debug("URL is under processing of batch %s, skipped from new batch", str(batchTaskId))
833  else:
834  logger.error("Wrong object type in the itemObject.item: " + str(type(url)) + \
835  " but 'URL' expected")
836  if batchItemsCounter > 0:
837  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FILLED_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
838  else:
839  logger.error("Wrong object type in the ClientResponseItem.itemObject: " + str(type(item.itemObject)) + \
840  " but 'list' expected")
841  else:
842  logger.debug("ClientResponseItem error: " + str(item.errorCode) + " : " + item.errorMessage)
843 
844  logger.debug("Batch object created, items: " + str(batchItemsTotalCounter) + " total, " + str(batchItemsCounter) + \
845  " added!")
846 
847  return batch
848 
849 
850 
851  # #Enumerate URLs number in the ClientResponse object items
852  #
853  # @param siteId The site Id md5
854  # @param urlMd5 The URL Id md5
855  # @return the batch Id if URL is under processing and zero if not
856  def getBatchTaskIdByURL(self, siteId, urlMd5):
857  batchTaskId = 0
858  for taskId, taskBatch in self.dtmTasksQueue.items():
859  if isinstance(taskBatch, EventObjects.Batch):
860  for batchItem in taskBatch.items:
861  if batchItem.siteId == siteId and batchItem.urlId == urlMd5:
862  batchTaskId = taskId
863  break
864  if batchTaskId > 0:
865  break
866 
867  return batchTaskId
868 
869 
870 
871  # #Enumerate URLs number in the ClientResponse object items
872  #
873  # @param clientResponseItems The list of ClientResponseItem objects
874  # @return the unique URLs number
875  def getURLsCountFromClientResponseItems(self, clientResponseItems, unique=True):
876  batchItemsCounter = 0
877  batchItemsTotalCounter = 0
878  uniqueURLsCRCDic = {}
879 
880  for item in clientResponseItems:
881  if item.errorCode == EventObjects.ClientResponseItem.STATUS_OK:
882  if isinstance(item.itemObject, list):
883  for url in item.itemObject:
884  batchItemsTotalCounter = batchItemsTotalCounter + 1
885  if isinstance(url, EventObjects.URL):
886  itemId = str(url.siteId) + ":" + str(url.urlMd5)
887  if itemId not in uniqueURLsCRCDic:
888  uniqueURLsCRCDic[itemId] = 1
889  batchItemsCounter = batchItemsCounter + 1
890  else:
891  uniqueURLsCRCDic[itemId] += 1
892  else:
893  logger.error("Wrong object type in the itemObject.item: " + str(type(url)) + \
894  " but 'URL' expected")
895  else:
896  logger.error("Wrong object type in the ClientResponseItem.itemObject: " + str(type(item.itemObject)) + \
897  " but 'list' expected")
898  else:
899  logger.debug("ClientResponseItem error: " + str(item.errorCode) + " : " + item.errorMessage)
900 
901  logger.debug("Unique URLs: " + str(batchItemsCounter) + ", total URLs: " + str(batchItemsTotalCounter))
902 
903  if unique:
904  return batchItemsCounter
905  else:
906  return batchItemsTotalCounter
907 
908 
909 
910  # #Send URLFetch request
911  #
912  #
914  # Process the first step of crawling iteration
915  urlUpdate = EventObjects.URLUpdate(0, "", EventObjects.URLStatus.URL_TYPE_MD5, None,
916  EventObjects.URL.STATUS_SELECTED_CRAWLING)
917  urlUpdate.tcDate = SQLExpression("NOW()")
918  newDTMTaskObj = dtm.EventObjects.NewTask('')
919  urlUpdate.batchId = newDTMTaskObj.id
920  limit = str(self.configVars[self.CONFIG_BATCH_MAX_URLS])
921  fetchType = None
922  fetcherCondition = ''
923  if 'splitter' in self.configVars[self.CONFIG_BATCH_FETCH_TYPE]:
924  random.seed()
925  fetchType = int((random.random() + 1) > float(self.configVars[self.CONFIG_BATCH_FETCH_TYPE]['splitter'])) + 1
926  fetcherCondition = ' AND `FetchType`=' + str(fetchType)
927  if fetchType == EventObjects.Site.FETCH_TYPE_DYNAMIC and\
928  'dfetcher_BatchDefaultMaxURLs' in self.configVars[self.CONFIG_BATCH_FETCH_TYPE]:
929  limit = self.configVars[self.CONFIG_BATCH_FETCH_TYPE]['dfetcher_BatchDefaultMaxURLs']
930  # Create URLFetch object with URLUpdate to update selected URLs state
931  sitesCriterions = {EventObjects.URLFetch.CRITERION_WHERE: " `sites`.`State`=" + \
932  str(EventObjects.Site.STATE_ACTIVE) + \
933  " AND IFNULL((SELECT `Value` FROM `sites_properties` WHERE `Name`='MODES_FLAG') & 1, 1)<>0" + \
934  fetcherCondition
935  }
936  urlCriterions = {EventObjects.URLFetch.CRITERION_WHERE: "`Status`=" + str(EventObjects.URL.STATUS_NEW) + \
937  " AND `State`=0",
938  EventObjects.URLFetch.CRITERION_ORDER: self.configVars[self.CONFIG_BATCH_ORDER_BY_URLS],
939  EventObjects.URLFetch.CRITERION_LIMIT: limit}
940  siteUpdate = EventObjects.SiteUpdate(0)
941  siteUpdate.tcDate = EventObjects.SQLExpression("Now()")
942  urlFetch = EventObjects.URLFetch(None, urlCriterions, sitesCriterions, urlUpdate, siteUpdate)
943  urlFetch.algorithm = EventObjects.URLFetch.PROPORTIONAL_ALGORITHM
944  urlFetch.maxURLs = int(limit)
945  urlFetchEvent = self.eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_FETCH, [urlFetch])
946  urlFetchEvent.cookie = {EventObjects.Batch.OPERATION_TYPE_NAME: EventObjects.Batch.TYPE_NORMAL_CRAWLER,
947  self.BATCH_ID_COOKIE_NAME: urlUpdate.batchId}
948  if fetchType is not None:
949  urlFetchEvent.cookie[self.BATCH_FETCH_TYPE_COOKIE_NAME] = fetchType
950  # Send request URLFetch to SitesManager
951  self.send(self.clientSitesManagerName, urlFetchEvent)
952  logger.debug("The URLFetch request to SitesManager sent!")
953 
954  if fetchType is not None and fetchType == EventObjects.Site.FETCH_TYPE_DYNAMIC:
955  # Batches dynamic fetcher's batches counter init in stat vars
956  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FETCHER_DYNAMIC, 1, self.STAT_FIELDS_OPERATION_ADD)
957  elif fetchType is not None and fetchType == EventObjects.Site.FETCH_TYPE_STATIC:
958  # Batches static fetcher's batches counter init in stat vars
959  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FETCHER_STATIC, 1, self.STAT_FIELDS_OPERATION_ADD)
960  else:
961  # Batches mixed static and dynamic fetcher's batches counter init in stat vars
962  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FETCHER_MIXED, 1, self.STAT_FIELDS_OPERATION_ADD)
963 
964 
965 
966  # #Send URLFetch for incremental crawling request
967  #
968  #
970  # add new state URL.STATUS_SELECTED_CRAWLING_INCREMENTAL and add it state for updating
971  urlUpdate = EventObjects.URLUpdate(0, "", EventObjects.URLStatus.URL_TYPE_MD5, None,
972  EventObjects.URL.STATUS_SELECTED_CRAWLING_INCREMENTAL)
973  # Create URLFetch object
974  conditionStr = ''' `Status` in (%s, %s, %s, %s) AND `Depth`<=%s \
975  AND `State`=0 AND timediff(`TcDate`,`LastModified`)>=0 AND (NOW() - `TcDate`) > timediff(`TcDate`, `LastModified`)
976  ''' % (EventObjects.URL.STATUS_CRAWLED, EventObjects.URL.STATUS_SELECTED_PROCESSING,
977  EventObjects.URL.STATUS_PROCESSING, EventObjects.URL.STATUS_PROCESSED, # pylint: disable=C0330
978  self.configVars[DC_CONSTS.INCR_MAX_DEPTH_CONFIG_VAR_NAME]) # pylint: disable=C0330
979  urlCriterions = {EventObjects.URLFetch.CRITERION_WHERE: conditionStr,
980  EventObjects.URLFetch.CRITERION_ORDER: "`Depth` ASC, `MRate` DESC, `UDate` DESC",
981  EventObjects.URLFetch.CRITERION_LIMIT:
982  str(self.configVars[DC_CONSTS.INCR_MAX_URLS_CONFIG_VAR_NAME])
983  }
984 
985  siteCriterions = {EventObjects.URLFetch.CRITERION_WHERE: "`State`=" + \
986  str(EventObjects.Site.STATE_ACTIVE) + "`ID` in (SELECT `Site_Id` FROM `sites_properties` " + \
987  "WHERE `Name`='INCREMENTAL_CRAWLING' AND `Value`='1') " + \
988  "AND IFNULL((SELECT `Value` FROM `sites_properties` WHERE `Name`='MODES_FLAG') & 1, 1)<>0"} # pylint: disable=C0301
989  siteUpdate = EventObjects.SiteUpdate(0)
990  siteUpdate.tcDate = EventObjects.SQLExpression("Now()")
991  urlFetch = EventObjects.URLFetch(None, urlCriterions, siteCriterions, urlUpdate, siteUpdate)
992  urlFetch.algorithm = EventObjects.URLFetch.PROPORTIONAL_ALGORITHM
993  urlFetch.maxURLs = self.configVars[DC_CONSTS.INCR_MAX_URLS_CONFIG_VAR_NAME]
994  urlFetchEvent = self.eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_FETCH, [urlFetch])
995  urlFetchEvent.cookie = {"type": EventObjects.Batch.TYPE_INCR_CRAWLER}
996  # Send request URLFetch to SitesManager
997  self.send(self.clientSitesManagerName, urlFetchEvent)
998  logger.debug("The URLFetch for incremental crawler request to SitesManager sent!")
999  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_INCR_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
1000 
1001 
1002 
1003  # #Process selected for crawling URLs return request
1004  #
1005  #
1007  # Update to NEW state to return URLs in to the queue
1008  urlUpdate = EventObjects.URLUpdate(0, "", EventObjects.URLStatus.URL_TYPE_MD5, None,
1009  EventObjects.URL.STATUS_NEW)
1010  # Create URLFetch object with URLUpdate to update selected URLs state
1011  sitesCriterions = {EventObjects.URLFetch.CRITERION_WHERE: "`State`=" + str(EventObjects.Site.STATE_ACTIVE)}
1012  urlCriterions = {EventObjects.URLFetch.CRITERION_WHERE: "`Status` IN (" + \
1013  str(EventObjects.URL.STATUS_SELECTED_CRAWLING) + "," + \
1014  str(EventObjects.URL.STATUS_CRAWLING) + "," + \
1015  str(EventObjects.URL.STATUS_SELECTED_PROCESSING) + "," + \
1016  str(EventObjects.URL.STATUS_PROCESSING) + ") AND DATE_ADD(UDate, INTERVAL " + \
1017  str(self.configVars[self.CONFIG_RET_URLS_TTL]) + " MINUTE) < NOW()",
1018  EventObjects.URLFetch.CRITERION_ORDER: "`UDate` ASC",
1019  EventObjects.URLFetch.CRITERION_LIMIT: str(self.configVars[self.CONFIG_RET_URLS_MAX_NUMBER])}
1020  urlFetch = EventObjects.URLFetch(None, urlCriterions, sitesCriterions, urlUpdate)
1021  urlFetch.algorithm = EventObjects.URLFetch.PROPORTIONAL_ALGORITHM
1022  urlFetch.maxURLs = self.configVars[self.CONFIG_RET_URLS_MAX_NUMBER]
1023  urlFetchEvent = self.eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_FETCH, [urlFetch])
1024  urlFetchEvent.cookie = {EventObjects.Batch.OPERATION_TYPE_NAME: EventObjects.Batch.TYPE_URLS_RETURN}
1025  # Send request URLFetch to SitesManager
1026  self.send(self.clientSitesManagerName, urlFetchEvent)
1027  logger.debug("The URLFetch request to SitesManager sent!")
1028 
1029 
1030 
1031  # #Get the DTM task state by involving one of two methods on DTM service
1032  #
1033  # @param taskId Id of DTM task
1034  def getDTMTaskState(self, taskId):
1035  taskState = None
1036 
1038  # Check the task status on DRCE EE hce-node
1039  logger.debug("Check state of taskId=" + str(taskId))
1040  checkTaskStateObj = dtm.EventObjects.CheckTaskState(taskId)
1041  checkStateEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE, checkTaskStateObj)
1042  eeResponseData = self.dtmdRequestExecute(checkStateEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
1043  logger.debug("DTM CheckTaskState request finished, taskId=" + str(taskId))
1044  # if eeResponseData is not None and type(eeResponseData) == type(dtm.EventObjects.EEResponseData(0)):
1045  if eeResponseData is not None and isinstance(eeResponseData, dtm.EventObjects.EEResponseData):
1046  taskState = eeResponseData.state
1047  else:
1048  # Get task status on DTM service
1049  logger.debug("Get status of taskId=" + str(taskId))
1050  getTasksStatusObj = dtm.EventObjects.GetTasksStatus([taskId])
1051  getTasksStatusEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.GET_TASK_STATUS, getTasksStatusObj)
1052  listTaskManagerFields = self.dtmdRequestExecute(getTasksStatusEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
1053  logger.debug("DTM getTasksStatus request finished, taskId=" + str(taskId))
1054  if listTaskManagerFields is not None and isinstance(listTaskManagerFields, list):
1055  if len(listTaskManagerFields) > 0:
1056  taskState = listTaskManagerFields[0].fields["state"]
1057  else:
1058  # Set TASK_STATE_FINISHED state to push task to delete from queue
1059  taskState = dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED
1060  logger.error("DTM getTasksStatus taskId=" + str(taskId) + " returned empty fields array in response:\n" + \
1061  Utils.varDump(listTaskManagerFields))
1062  else:
1063  logger.error("DTM getTasksStatus taskId=" + str(taskId) + " returned wrong data:\n" + \
1064  Utils.varDump(listTaskManagerFields))
1065 
1066  return taskState
1067 
1068 
1069 
1070  # #Process the DTM tasks queue
1071  #
1072  #
1074  tmpQueue = {}
1076  itemsTotal = 0
1077  tasksWithItems = 0
1078 
1079  # Process the DTM tasks queue
1080  for taskId, taskBatch in self.dtmTasksQueue.items():
1081  if isinstance(taskBatch, EventObjects.Batch):
1082  if hasattr(taskBatch, 'ttl'):
1083  ttl = taskBatch.ttl
1084  else:
1085  ttl = self.configVars[self.CONFIG_BATCH_QUEUE_TASK_TTL]
1086  else:
1088  if hasattr(taskBatch, 'items'):
1089  items = len(taskBatch.items)
1090  tasksWithItems += 1
1091  else:
1092  items = 0
1093  itemsTotal += items
1094  logger.debug("Batch in queue type: %s, taskId: %s, ttl: %s, items: %s", str(type(taskBatch)), str(taskId),
1095  str(ttl), str(items))
1096  batchState = self.getDTMTaskState(taskId)
1097  if batchState != None:
1098  logger.debug("Batch state: %s", str(batchState))
1099  if self.isDTMTaskDead(batchState, taskBatch.queuedTs, ttl):
1100  # Delete task in DTM and task's data in EE (DRCE)
1101  deleteTaskObj = dtm.EventObjects.DeleteTask(taskId)
1102  deleteTaskEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK, deleteTaskObj)
1103  generalResponse = self.dtmdRequestExecute(deleteTaskEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
1104  logger.debug("DTM DeleteTask request finished, taskId=" + str(taskId))
1105  if generalResponse is not None:
1106  if generalResponse.errorCode == dtm.EventObjects.GeneralResponse.ERROR_OK:
1107  logger.debug("DTM task deleted, taskId=" + str(taskId))
1108  if batchState == dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED:
1109  logger.debug("batch:\n" + varDump(taskBatch) + "\n finished, taskId=" + str(taskId))
1110  self.processFinishedBatch(taskBatch)
1111  else:
1112  logger.debug("batch:" + varDump(taskBatch) + " not finished, state=" + str(batchState))
1113  self.finishDTMTaskFaultPostProcess(taskBatch)
1114  else:
1115  # Save this batch to check it later
1116  tmpQueue[taskId] = taskBatch
1117  logger.error("DTM delete task taskId=" + str(taskId) + ", error: " + str(generalResponse.errorCode) + \
1118  " : " + generalResponse.errorMessage + ", generalResponse:" + varDump(generalResponse))
1119  self.deleteDTMTaskFaultCountersUpdate(taskBatch)
1120  else:
1121  # Save this batch to check it later
1122  tmpQueue[taskId] = taskBatch
1123  logger.error("DTM delete task error: wrong response or timeout, taskId=" + str(taskId) + " still in queue!")
1124  else:
1125  logger.debug("DTM task still alive Id=" + str(taskId) + " state=" + str(batchState))
1126  if time.time() - taskBatch.queuedTs > ttl:
1127  self.finishDTMTaskFaultPostProcess(taskBatch, taskId, ttl, False)
1128  else:
1129  # Save this batch to check it later
1130  tmpQueue[taskId] = taskBatch
1131  logger.debug("DTM task Id=" + str(taskId) + " still in queue")
1132  else:
1133  logger.error("DTM check task state error: wrong response or timeout, taskId=" + str(taskId) + "!")
1134  if time.time() - taskBatch.queuedTs > ttl:
1135  logger.error("DTM task Id=" + str(taskId) + " removed from queue by TTL:" + str(ttl))
1136  else:
1137  # Save this batch to check it later
1138  tmpQueue[taskId] = taskBatch
1139  self.checkDTMTaskFaultCountersUpdate(taskBatch)
1140  logger.error("DTM task Id=" + str(taskId) + " saved in queue.")
1141 
1142  self.dtmTasksQueue = tmpQueue
1143  self.updateDTMTasksQueueCounters(tasksWithItems, itemsTotal)
1144 
1145 
1146  # #Update DTM task fault counters for correspondent type of the task
1147  #
1148  #
1149  def finishDTMTaskFaultPostProcess(self, taskBatch, taskId=None, ttl=0, incrementFaultsCounter=True):
1150  if taskId is not None and taskId > 0:
1151  # Terminate task and delete it's data request
1152  deleteTaskObj = dtm.EventObjects.DeleteTask(taskId)
1153  deleteTaskObj.action = dtm.EventObjects.DeleteTask.ACTION_TERMINATE_TASK_AND_DELETE_DATA
1154  deleteTaskEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK, deleteTaskObj)
1155  generalResponse = self.dtmdRequestExecute(deleteTaskEvent, self.configVars[self.CONFIG_DTMD_TIMEOUT])
1156  logger.error("DTM task Id=" + str(taskId) + " terminated and removed from queue by TTL:" + str(ttl) + \
1157  ", generalResponse=" + str(generalResponse) + ", batch=" + str(taskBatch))
1158 
1159  if isinstance(taskBatch, EventObjects.Batch):
1160  if incrementFaultsCounter:
1161  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
1162  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_FAULT_NAME, len(taskBatch.items),
1164  if taskId is not None and taskId > 0 and incrementFaultsCounter:
1165  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FAULT_TTL_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
1166  # Send update URLs of not finished batch on all nodes to get possibility to crawl them next time
1167  self.sendURLUpdate(taskBatch.items, taskBatch.id, False)
1168  else:
1169  if isinstance(taskBatch, EventObjects.URLPurge):
1170  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
1171  else:
1172  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_FAULT_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
1173 
1174 
1175 
1176  # #Update DTM task fault counters for correspondent type of the task
1177  #
1178  #
1179  def checkDTMTaskFaultCountersUpdate(self, taskBatch):
1180  if isinstance(taskBatch, EventObjects.Batch):
1181  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_CHECK_FAULT_NAME, 1,
1183  else:
1184  if isinstance(taskBatch, EventObjects.URLPurge):
1185  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_CHECK_FAULT_NAME, 1,
1187  else:
1188  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_CHECK_FAULT_NAME, 1,
1190 
1191 
1192 
1193  # #Update DTM task fault counters for correspondent type of the task
1194  #
1195  #
1196  def deleteDTMTaskFaultCountersUpdate(self, taskBatch):
1197  if isinstance(taskBatch, EventObjects.Batch):
1198  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_DELETE_FAULT_NAME, 1,
1200  else:
1201  if isinstance(taskBatch, EventObjects.URLPurge):
1202  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_DELETE_FAULT_NAME, 1,
1204  else:
1205  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_DELETE_FAULT_NAME, 1,
1207 
1208 
1209 
1210  # #Check is DTM task alive by status code verification, returns True if yes or False if not
1211  #
1212  #
1213  def isDTMTaskDead(self, state, queuedTs, ttl):
1214  ret = False
1215 
1216  deadStates = [dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED,
1217  dtm.EventObjects.EEResponseData.TASK_STATE_CRASHED,
1218  dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED,
1219  dtm.EventObjects.EEResponseData.TASK_STATE_UNDEFINED,
1220  dtm.EventObjects.EEResponseData.TASK_STATE_SET_ERROR,
1221  dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL,
1222  dtm.EventObjects.EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED
1223  ]
1224 
1225  if state in deadStates or (state == dtm.EventObjects.EEResponseData.TASK_STATE_NEW_SCHEDULED and time.time() - queuedTs > ttl):
1226  ret = True
1227 
1228  return ret
1229 
1230 
1231 
1232  # #Update DTM tasks queue counters
1233  #
1234  #
1235  def updateDTMTasksQueueCounters(self, tasksWithItems=0, itemsTotal=0):
1236  # Update number of crawl batches
1237  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_QUEUE_NAME,
1239  # Update number of purge batches
1240  self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_NAME, self.getBatchTasksCount(self.BATCH_TASK_TYPE_PURGE),
1242  # Update number of age batches
1243  self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_NAME, self.getBatchTasksCount(self.BATCH_TASK_TYPE_AGE),
1245  logger.debug("Batches tasks in queue - total:%s, crawl:%s, purge:%s, age:%s", str(len(self.dtmTasksQueue)),
1246  str(self.statFields[DC_CONSTS.BATCHES_CRAWL_COUNTER_QUEUE_NAME]),
1247  str(self.statFields[DC_CONSTS.BATCHES_PURGE_COUNTER_NAME]),
1248  str(self.statFields[DC_CONSTS.BATCHES_AGE_COUNTER_NAME]))
1249 
1250  if tasksWithItems > 0:
1251  # Update average number of items in queued batches
1252  self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_ITEMS_AVG_NAME,
1253  itemsTotal / tasksWithItems, self.STAT_FIELDS_OPERATION_SET)
1254 
1255 
1256 
1257  # #Calculate number of batches
1258  #
1259  # @param batchType type of batches, crawl or purge
1260  #
1261  def getBatchTasksCount(self, batchType=BATCH_TASK_TYPE_CRAWL):
1262  ret = 0
1263 
1264  for taskId, taskBatch in self.dtmTasksQueue.items(): # pylint: disable=W0612
1265  if batchType == self.BATCH_TASK_TYPE_CRAWL:
1266  if isinstance(taskBatch, EventObjects.Batch):
1267  ret = ret + 1
1268  else:
1269  if batchType == self.BATCH_TASK_TYPE_PURGE:
1270  if isinstance(taskBatch, EventObjects.URLPurge):
1271  ret = ret + 1
1272  else:
1273  if batchType == self.BATCH_TASK_TYPE_AGE:
1274  if isinstance(taskBatch, EventObjects.URLAge):
1275  ret = ret + 1
1276  else:
1277  ret = ret + 1
1278 
1279  return ret
1280 
1281 
1282 
1283  # #Do some post batch processing after batch was successfully finished
1284  #
1285  # @param taskBatch the Batch object
1286  #
1287  def processFinishedBatch(self, taskBatch):
1288  if isinstance(taskBatch, EventObjects.Batch):
1289  if self.configVars[self.CONFIG_CRAWLED_URLS_STRATEGY] == 0:
1290  self.sendURLUpdate(taskBatch.items, taskBatch.id, True)
1291  logger.debug("Send update URLs from batch: %s for all foreign hosts by the Batch_Id", str(taskBatch.id))
1292  else:
1293  self.sendURLDelete(taskBatch.items, taskBatch.id)
1294  logger.debug("Send delete URLs from batch: %s for all foreign hosts by the Batch_Id", str(taskBatch.id))
1295  else:
1296  if isinstance(taskBatch, EventObjects.URLPurge):
1297  logger.debug("Purge batch: %s finished!", str(taskBatch.id))
1298  else:
1299  logger.debug("Age batch: %s finished!", str(taskBatch.id))
1300 
1301 
1302 
1303  # #Execute DTMD task request
1304  #
1305  # @param requestEvent The request event to send to DTMD
1306  # @param timeout The DTMD request timeout
1307  #
1308  def dtmdRequestExecute(self, requestEvent, timeout, maxTries=100):
1309  ret = None
1310  if maxTries < 0:
1311  maxTries = 0
1312 
1313  try:
1314  # Send DTMD request
1315  self.dtmdConnection.send(requestEvent)
1316 
1317  for i in range(maxTries + 1):
1318  # Poll DTMD connection
1319  if self.dtmdConnection.poll(int(timeout)) == 0:
1320  logger.error("DTMD request timeout reached " + str(timeout) + "!")
1321  break
1322  else:
1323  # Recv DTMD response
1324  retEvent = self.dtmdConnection.recv()
1325  if retEvent != None:
1326  # Get response object
1327  # if type(retEvent.eventObj) == type(dtm.EventObjects.EEResponseData(0)) or\
1328  # type(retEvent.eventObj) == type(dtm.EventObjects.GeneralResponse()) or\
1329  # isinstance(retEvent.eventObj, list):
1330  if isinstance(retEvent.eventObj, dtm.EventObjects.EEResponseData) or\
1331  isinstance(retEvent.eventObj, dtm.EventObjects.GeneralResponse) or\
1332  isinstance(retEvent.eventObj, list):
1333  if retEvent.uid == requestEvent.uid:
1334  ret = retEvent.eventObj
1335  break
1336  else:
1337  logger.error("DTMD returned wrong object uid: " + str(retEvent.uid) + " but " + \
1338  str(requestEvent.uid) + " expected, iteration " + str(i) + "!")
1339  else:
1340  logger.error("DTMD returned wrong object type: " + str(type(retEvent.eventObj)) + "!")
1341  else:
1342  logger.error("DTMD returned None event!")
1343  except Exception, e:
1344  logger.error("DTMD request execution exception: " + e.message + "!")
1345 
1346  logger.debug("The DTMD request finished!")
1347 
1348  return ret
1349 
1350 
1351 
1352  # #Send URL update for batch URLs
1353  #
1354  # @param batchItemsList List of BatchItem objects
1355  # @param batchId the Batch Id
1356  # @param batchState state of batch operation False - means return URLs to New state, True - means set crawled state
1357  def sendURLUpdate(self, batchItemsList, batchId, batchState):
1358  urlsList = []
1359 
1360  # Prepare URLs list to update
1361  for batchItem in batchItemsList:
1362  # notRootURLExpr = " AND `ParentMd5`<>''"
1363  notRootURLExpr = ''
1364  # Set status value depends on update reason - crawled successfully or not
1365  if batchState is True:
1366  status = EventObjects.URL.STATUS_CRAWLED
1367  sqlExpression = SQLExpression("`URLMd5`='" + str(batchItem.urlId) + "' AND (" + \
1368  "(`Batch_Id`<>" + str(batchId) + " AND `Status`=" + \
1369  str(EventObjects.URL.STATUS_NEW) + ")" + \
1370  " OR (`Batch_Id`=" + str(batchId) + " AND `Status` IN (" + \
1371  str(EventObjects.URL.STATUS_SELECTED_CRAWLING) + "," + \
1372  str(EventObjects.URL.STATUS_SELECTED_CRAWLING_INCREMENTAL) + "))" + \
1373  ")" + notRootURLExpr)
1374  else:
1375  status = EventObjects.URL.STATUS_NEW
1376  sqlExpression = SQLExpression("`URLMd5`='" + str(batchItem.urlId) + "' AND `Status` IN (" +
1377  str(EventObjects.URL.STATUS_SELECTED_CRAWLING) + "," + \
1378  str(EventObjects.URL.STATUS_SELECTED_CRAWLING_INCREMENTAL) + ")" + \
1379  notRootURLExpr)
1380 
1381  urlUpdate = EventObjects.URLUpdate(batchItem.siteId, batchItem.urlId, EventObjects.URLStatus.URL_TYPE_MD5,
1382  None, status)
1383  urlUpdate.processed = 0
1384  urlUpdate.crawled = 0
1385  urlUpdate.criterions[EventObjects.URLFetch.CRITERION_WHERE] = sqlExpression
1386  logger.debug("batch: %s, URLUpdate: %s", str(batchId), varDump(urlUpdate))
1387  urlsList.append(urlUpdate)
1388 
1389  # Make URLUpdate event
1390  urlUpdateEvent = self.eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_UPDATE, urlsList)
1391  # Send request URLUpdate to SitesManager
1392  self.send(self.clientSitesManagerName, urlUpdateEvent)
1393  logger.debug("The URLUpdate request to SitesManager sent!")
1394 
1395 
1396 
1397  # #Send URL delete for batch URLs that is not crawled and stays in SELECTED_FOR_CRAWLING state (2)
1398  #
1399  # @param batchItemsList List of BatchItem objects
1400  # @param batchId Id of the batch
1401  def sendURLDelete(self, batchItemsList, batchId):
1402  urlsList = []
1403 
1404  # Prepare URLs list to delete
1405  for batchItem in batchItemsList:
1406  sqlExpression = SQLExpression("`ParentMd5`<>'' AND `URLMd5`='" + str(batchItem.urlId) + "' AND `Batch_Id`<>" + \
1407  str(batchId))
1408  urlDelete = EventObjects.URLDelete(batchItem.siteId, None, EventObjects.URLStatus.URL_TYPE_URL,
1409  {EventObjects.URLFetch.CRITERION_WHERE:sqlExpression,
1410  EventObjects.URLFetch.CRITERION_LIMIT:1},
1411  reason=EventObjects.URLDelete.REASON_SELECT_TO_CRAWL_TTL)
1412  logger.debug("URLDelete: " + varDump(urlDelete))
1413  urlsList.append(urlDelete)
1414 
1415  # Make URLDelete event
1416  urlDeleteEvent = self.eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_DELETE, urlsList)
1417  # Send request URLDelete to SitesManager
1418  self.send(self.clientSitesManagerName, urlDeleteEvent)
1419  logger.debug("The URLDelete request to SitesManager sent!")
1420 
1421 
1422 
1423  # #onURLDeleteResponse event handler
1424  #
1425  # @param event instance of Event object
1426  def onURLDeleteResponse(self, event):
1427  try:
1428  logger.debug("Reply received on URL delete.")
1429  clientResponse = event.eventObj
1430  if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
1431  if len(clientResponse.itemsList) > 0:
1432  for clientResponseItem in clientResponse.itemsList:
1433  if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
1434  logger.error("URLDelete response error: " + str(clientResponseItem.errorCode) + " : " + \
1435  clientResponseItem.errorMessage + ", host:" + clientResponseItem.host + ", port:" + \
1436  clientResponseItem.port + ", node:" + clientResponseItem.node + "!")
1437  else:
1438  logger.error("URLDelete response empty list!")
1439  else:
1440  logger.error("URLDelete response error:" + str(clientResponse.errorCode) + " : " + clientResponse.errorMessage)
1441  except Exception as err:
1442  ExceptionLog.handler(logger, err, "Exception:")
1443 
1444 
1445  # #Send URLNew for batch URLs to insert them in state CRAWLED on all hosts to block redundant crawling
1446  #
1447  # @param batchItemsList List of BatchItem objects
1448  def sendURLNew(self, batchItemsList):
1449  urlsList = []
1450  # Prepare URLs list to insert
1451  for batchItem in batchItemsList:
1452  if isinstance(batchItem.urlObj, EventObjects.URL) and batchItem.urlObj.parentMd5 != '':
1453  urlObj = copy.deepcopy(batchItem.urlObj)
1454  urlObj.status = EventObjects.URL.STATUS_CRAWLED
1455  urlObj.crawled = 0
1456  urlObj.processed = 0
1457  urlObj.contentType = ''
1458  urlObj.charset = ''
1459  urlObj.batch_Id = 0
1460  urlObj.errorMask = 0
1461  urlObj.crawlingTime = 0
1462  urlObj.processingTime = 0
1463  urlObj.totalTime = 0
1464  urlObj.httpCode = 0
1465  urlObj.size = 0
1466  urlObj.linksI = 0
1467  urlObj.linksE = 0
1468  urlObj.freq = 0
1469  urlObj.depth = 0
1470  urlObj.rawContentMd5 = ""
1471  urlObj.eTag = ""
1472  urlObj.mRate = 0.0
1473  urlObj.mRateCounter = 0
1474  urlObj.contentMask = EventObjects.URL.CONTENT_EMPTY
1475  urlObj.tagsMask = 0
1476  urlObj.tagsCount = 0
1477  urlObj.pDate = SQLExpression("NULL")
1478  urlObj.urlUpdate = None
1479  # logger.debug("URLNew item: " + varDump(urlObj))
1480  logger.debug("URLNew item: %s, batchId: %s", urlObj.urlMd5, str(urlObj.batchId))
1481  urlsList.append(urlObj)
1482 
1483  if len(urlsList) > 0:
1484  # Make URLNew event
1485  urlNewEvent = self.eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_NEW, urlsList)
1486  # Send request URLNew to SitesManager
1487  self.send(self.clientSitesManagerName, urlNewEvent)
1488  logger.debug("The URLNew request to SitesManager sent!")
1489 
1490 
1491 
1492  # #onURLNewResponse event handler
1493  #
1494  # @param event instance of Event object
1495  def onURLNewResponse(self, event):
1496  try:
1497  logger.debug("Reply received on URL new.\n" + Utils.varDump(event))
1498  clientResponse = event.eventObj
1499  if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
1500  if len(clientResponse.itemsList) > 0:
1501  for clientResponseItem in clientResponse.itemsList:
1502  if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
1503  logger.error("URLNew response error: " + str(clientResponseItem.errorCode) + " : " + \
1504  clientResponseItem.errorMessage + ", host:" + clientResponseItem.host + ", port:" + \
1505  clientResponseItem.port + ", node:" + clientResponseItem.node + "!")
1506  else:
1507  logger.error("URLNew response empty list!")
1508  else:
1509  logger.error("URLNew response error:" + str(clientResponse.errorCode) + " : " + clientResponse.errorMessage)
1510  except Exception as err:
1511  ExceptionLog.handler(logger, err, "Exception:")
def __init__(self, configParser, connectionBuilderLight=None)
def getURLsCountFromClientResponseItems(self, clientResponseItems, unique=True)
def updateStatField(self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
update values of stat field - default sum
def getBatchTasksCount(self, batchType=BATCH_TASK_TYPE_CRAWL)
NewTask event object, defines the Task object fields.
GeneralResponse event object, represents general state response for multipurpose usage.
def makeBatchFromClientResponseItems(self, clientResponseItems, crawlerType, batchId=0)
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def addConnection(self, name, connection)
def getBatchTaskIdByURL(self, siteId, urlMd5)
DeleteTask event object, to delete task from DTM application and from EE.
def isDTMTaskDead(self, state, queuedTs, ttl)
This is app base class for management server connection end-points and parallel transport messages pr...
def deleteDTMTaskFaultCountersUpdate(self, taskBatch)
def sendURLDelete(self, batchItemsList, batchId)
def poll(self)
poll function polling connections receive as multipart msg, the second argument is pickled pyobj ...
def finishDTMTaskFaultPostProcess(self, taskBatch, taskId=None, ttl=0, incrementFaultsCounter=True)
CheckTaskState event object, for check task status inside EE.
def checkDTMTaskFaultCountersUpdate(self, taskBatch)
Class hides routines of bulding connection objects.
def send(self, connect_name, event)
send event
def dtmdRequestExecute(self, requestEvent, timeout, maxTries=100)
def sendURLUpdate(self, batchItemsList, batchId, batchState)
GetTasksStatus event object, for check task status operation.
def sendBatchTaskToDTM(self, batch, maxExecutionTime=None)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
def updateDTMTasksQueueCounters(self, tasksWithItems=0, itemsTotal=0)
EEResponseData event object, store task results data, returned from EE.