154 def __init__(self, configParser, connectionBuilderLight=None):
155 super(BatchTasksManager, self).
__init__()
158 if connectionBuilderLight
is None:
159 connectionBuilderLight = ConnectionBuilderLight()
162 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_TOTAL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
164 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_QUEUE_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
166 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
168 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FILLED_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
170 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
172 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
174 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
176 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_CANCELLED_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
178 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_DELETE_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
180 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_CHECK_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
182 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_RET_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
184 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_INCR_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
186 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FAULT_TTL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
188 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_ITEMS_AVG_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
190 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FETCHER_DYNAMIC, 0, self.STAT_FIELDS_OPERATION_INIT)
192 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FETCHER_STATIC, 0, self.STAT_FIELDS_OPERATION_INIT)
194 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_FETCHER_MIXED, 0, self.STAT_FIELDS_OPERATION_INIT)
197 className = self.__class__.__name__
198 self.serverName = configParser.get(className, self.CONFIG_SERVER)
199 self.clientSitesManagerName = configParser.get(className, self.CONFIG_SITES_MANAGER_CLIENT)
201 self.configVars[self.CONFIG_DTMD_HOST] = configParser.get(className, self.CONFIG_DTMD_HOST)
202 self.configVars[self.CONFIG_DTMD_PORT] = configParser.get(className, self.CONFIG_DTMD_PORT)
203 self.configVars[self.CONFIG_DTMD_TIMEOUT] = configParser.getint(className, self.CONFIG_DTMD_TIMEOUT)
206 self.configVars[self.CONFIG_BATCH_MAX_URLS] = configParser.getint(className, self.CONFIG_BATCH_MAX_URLS)
209 self.configVars[self.CONFIG_BATCH_MAX_TIME] = configParser.getint(className, self.CONFIG_BATCH_MAX_TIME)
211 self.configVars[self.CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS] = \
212 bool(configParser.getint(className, self.CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS))
216 self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] = configParser.getint(className, self.CONFIG_POLLING_TIMEOUT)
218 self.configVars[DC_CONSTS.INCR_MIN_FREQ_CONFIG_VAR_NAME] = configParser.getint(className, self.CONFIG_INCR_MIN_FREQ)
219 self.configVars[DC_CONSTS.INCR_MAX_DEPTH_CONFIG_VAR_NAME] = configParser.getint(className,
220 self.CONFIG_INCR_MAX_DEPTH)
221 self.configVars[DC_CONSTS.INCR_MAX_URLS_CONFIG_VAR_NAME] = configParser.getint(className, self.CONFIG_INCR_MAX_URL)
222 self.configVars[self.CONFIG_INCR_CRAWL_MODE] = configParser.getint(className, self.CONFIG_INCR_CRAWL_MODE)
225 self.configVars[self.CONFIG_DRCE_CRAWLER_APP_NAME] = configParser.get(className, self.CONFIG_DRCE_CRAWLER_APP_NAME)
226 self.configVars[self.CONFIG_BATCH_DEFAULT_MAX_TIME] = \
227 configParser.getint(className, self.CONFIG_BATCH_DEFAULT_MAX_TIME)
230 serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.serverName)
231 sitesManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT, self.clientSitesManagerName)
234 self.dtmdConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
235 self.configVars[self.CONFIG_DTMD_HOST] +
":" + \
236 self.configVars[self.CONFIG_DTMD_PORT],
237 TRANSPORT_CONSTS.TCP_TYPE)
240 self.addConnection(self.serverName, serverConnection)
241 self.addConnection(self.clientSitesManagerName, sitesManagerConnection)
244 self.setEventHandler(DC_CONSTS.EVENT_TYPES.URL_FETCH_RESPONSE, self.onURLFetchResponse)
246 self.setEventHandler(DC_CONSTS.EVENT_TYPES.URL_UPDATE_RESPONSE, self.onURLUpdateResponse)
248 self.setEventHandler(DC_CONSTS.EVENT_TYPES.URL_DELETE_RESPONSE, self.onURLDeleteResponse)
250 self.setEventHandler(DC_CONSTS.EVENT_TYPES.URL_NEW_RESPONSE, self.onURLNewResponse)
253 self.dtmTasksQueue = {}
256 self.configVars[self.CONFIG_RET_URLS_MAX_NUMBER] = configParser.getint(className, self.CONFIG_RET_URLS_MAX_NUMBER)
257 self.configVars[self.CONFIG_RET_URLS_PERIOD] = configParser.getint(className, self.CONFIG_RET_URLS_PERIOD)
258 self.configVars[self.CONFIG_RET_URLS_TTL] = configParser.getint(className, self.CONFIG_RET_URLS_TTL)
259 self.configVars[self.CONFIG_RET_URLS_MODE] = configParser.getint(className, self.CONFIG_RET_URLS_MODE)
260 self.processSelectedURLsRetLastTs = time.time()
263 self.configVars[self.CONFIG_INCR_CRAWL_PERIOD] = configParser.getint(className, self.CONFIG_INCR_CRAWL_PERIOD)
264 self.processIncrCrawlLastTs = time.time()
267 self.configVars[self.CONFIG_CRAWLED_URLS_STRATEGY] = configParser.getint(className,
268 self.CONFIG_CRAWLED_URLS_STRATEGY)
270 self.configVars[self.CONFIG_BATCH_ORDER_BY_URLS] = configParser.get(className, self.CONFIG_BATCH_ORDER_BY_URLS)
274 self.configVars[self.CONFIG_BATCH_FETCH_TYPE] = configParser.get(className, self.CONFIG_BATCH_FETCH_TYPE)
275 self.configVars[self.CONFIG_BATCH_FETCH_TYPE] = json.loads(self.configVars[self.CONFIG_BATCH_FETCH_TYPE])
276 except ConfigParser.NoOptionError:
277 self.configVars[self.CONFIG_BATCH_FETCH_TYPE] = {}
280 self.configVars[self.CONFIG_BATCH_MAX_TASKS] = configParser.getint(className, self.CONFIG_BATCH_MAX_TASKS)
282 self.configVars[self.CONFIG_REGULAR_CRAWL_PERIOD] = configParser.getint(className, self.CONFIG_REGULAR_CRAWL_PERIOD)
283 self.configVars[self.CONFIG_REGULAR_CRAWL_MODE] = configParser.getint(className, self.CONFIG_REGULAR_CRAWL_MODE)
284 self.configVars[self.CONFIG_REGULAR_CRAWL_PROPAGATE_URLS] = \
285 configParser.getint(className, self.CONFIG_REGULAR_CRAWL_PROPAGATE_URLS)
287 self.processRegularCrawlLastTs = time.time()
289 self.configVars[self.CONFIG_BATCH_QUEUE_PERIOD] = configParser.getint(className, self.CONFIG_BATCH_QUEUE_PERIOD)
290 self.processBatchQueuelLastTs = time.time()
292 self.configVars[self.CONFIG_BATCH_QUEUE_TASK_TTL] = configParser.getint(className, self.CONFIG_BATCH_QUEUE_TASK_TTL)
294 self.configVars[self.CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD] = \
295 configParser.getint(className, self.CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD)
298 self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX] = \
299 configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX)
300 self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX] = \
301 configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX)
302 self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN] = \
303 configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN)
304 self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY] = \
305 configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY)
306 self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_RETRY] = \
307 configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_RETRY)
310 self.configVars[self.CONFIG_BATCH_DEFAULT_STARTER] = configParser.get(className, self.CONFIG_BATCH_DEFAULT_STARTER)
313 self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL] = \
314 configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL)
315 self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE] = \
316 configParser.getint(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE)
317 self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES] = \
318 configParser.get(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES)
319 self.configVars[self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE] = \
320 configParser.get(className, self.CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE)
323 self.sendURLFetchRequestCounter = 0
324 self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_REQUESTS_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
327 self.configVars[self.CONFIG_BATCH_DEFAULT_CHECK_URLS_IN_ACTIVE_BATCHES] = \
328 configParser.getint(className, self.CONFIG_BATCH_DEFAULT_CHECK_URLS_IN_ACTIVE_BATCHES)
331 self.configVars[self.CONFIG_PURGE_PERIOD] = configParser.getint(className, self.CONFIG_PURGE_PERIOD)
332 self.configVars[self.CONFIG_PURGE_BATCH_DEFAULT_MAX_TIME] = \
333 configParser.getint(className, self.CONFIG_PURGE_BATCH_DEFAULT_MAX_TIME)
334 self.configVars[self.CONFIG_PURGE_BATCH_MAX_URLS] = configParser.getint(className, self.CONFIG_PURGE_BATCH_MAX_URLS)
335 self.configVars[self.CONFIG_PURGE_BATCH_QUEUE_TASK_TTL] = \
336 configParser.getint(className, self.CONFIG_PURGE_BATCH_QUEUE_TASK_TTL)
337 self.configVars[self.CONFIG_PURGE_BATCH_DEFAULT_STARTER] = \
338 configParser.get(className, self.CONFIG_PURGE_BATCH_DEFAULT_STARTER)
339 self.configVars[self.CONFIG_DRCE_DB_APP_NAME] = configParser.get(className, self.CONFIG_DRCE_DB_APP_NAME)
340 self.configVars[self.CONFIG_PURGE_BATCH_MAX_TASKS] = \
341 configParser.getint(className, self.CONFIG_PURGE_BATCH_MAX_TASKS)
342 self.configVars[self.CONFIG_PURGE_MODE] = configParser.getint(className, self.CONFIG_PURGE_MODE)
345 self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
346 self.processPurgeLastTs = time.time()
348 self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_TOTAL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
350 self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_CANCELLED_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
352 self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_ERROR_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
354 self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
356 self.updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_CHECK_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
359 self.configVars[self.CONFIG_TASK_DTM_NAME_CRAWLING] = configParser.get(className,
360 self.CONFIG_TASK_DTM_NAME_CRAWLING)
362 self.configVars[self.CONFIG_TASK_DTM_NAME_PURGING] = configParser.get(className,
363 self.CONFIG_TASK_DTM_NAME_PURGING)
365 self.configVars[self.CONFIG_TASK_DTM_TYPE_CRAWLING] = configParser.getint(className,
366 self.CONFIG_TASK_DTM_TYPE_CRAWLING)
368 self.configVars[self.CONFIG_TASK_DTM_TYPE_PURGING] = configParser.getint(className,
369 self.CONFIG_TASK_DTM_TYPE_PURGING)
372 self.configVars[self.CONFIG_AGING_PERIOD] = configParser.getint(className,
373 self.CONFIG_AGING_PERIOD)
374 self.configVars[self.CONFIG_RESOURCE_AGING_MODE] = configParser.getint(className,
375 self.CONFIG_RESOURCE_AGING_MODE)
376 self.configVars[self.CONFIG_TASK_DTM_NAME_AGING] = configParser.get(className, self.CONFIG_TASK_DTM_NAME_AGING)
377 self.configVars[self.CONFIG_TASK_DTM_TYPE_AGING] = configParser.getint(className, self.CONFIG_TASK_DTM_TYPE_AGING)
378 self.configVars[self.CONFIG_AGE_BATCH_DEFAULT_MAX_TIME] = configParser.getint(className,
379 self.CONFIG_AGE_BATCH_DEFAULT_MAX_TIME)
380 self.configVars[self.CONFIG_AGE_BATCH_MAX_URLS_SITE] = configParser.getint(className,
381 self.CONFIG_AGE_BATCH_MAX_URLS_SITE)
382 self.configVars[self.CONFIG_AGE_BATCH_MAX_URLS_TOTAL] = configParser.getint(className,
383 self.CONFIG_AGE_BATCH_MAX_URLS_TOTAL)
384 self.configVars[self.CONFIG_AGE_BATCH_MAX_SITES] = configParser.getint(className, self.CONFIG_AGE_BATCH_MAX_SITES)
385 self.configVars[self.CONFIG_AGE_BATCH_QUEUE_TASK_TTL] = configParser.getint(className,
386 self.CONFIG_AGE_BATCH_QUEUE_TASK_TTL)
387 self.configVars[self.CONFIG_AGE_BATCH_DEFAULT_STARTER] = configParser.get(className,
388 self.CONFIG_AGE_BATCH_DEFAULT_STARTER)
389 self.configVars[self.CONFIG_AGE_BATCH_MAX_TASKS] = configParser.getint(className, self.CONFIG_AGE_BATCH_MAX_TASKS)
390 self.configVars[self.CONFIG_AGE_BATCH_URL_CRITERION] = configParser.get(className,
391 self.CONFIG_AGE_BATCH_URL_CRITERION)
392 self.configVars[self.CONFIG_AGE_BATCH_SITE_CRITERION] = configParser.get(className,
393 self.CONFIG_AGE_BATCH_SITE_CRITERION)
394 self.processAgingLastTs = time.time()
397 self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
399 self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_TOTAL_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
401 self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_CANCELLED_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
403 self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_ERROR_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
405 self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
407 self.updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_CHECK_FAULT_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
409 if configParser.has_option(className, self.CONFIG_BATCH_MAX_ITERATIONS):
410 self.configVars[self.CONFIG_BATCH_MAX_ITERATIONS] = configParser.getint(className,
411 self.CONFIG_BATCH_MAX_ITERATIONS)
413 self.configVars[self.CONFIG_BATCH_MAX_ITERATIONS] = self.CONFIG_BATCH_MAX_ITERATIONS_DEFAULT
def __init__(self)
constructor