2 HCE project, Python bindings, Distributed Crawler application. 3 BatchTasksManager object and related classes definitions. 6 @author bgv bgv.hce@gmail.com 7 @link: http://hierarchical-cluster-engine.com/ 8 @copyright: Copyright © 2013-2016 IOIX Ukraine 9 @license: http://hierarchical-cluster-engine.com/license/ 20 import cPickle
as pickle
26 from app.Utils import SQLExpression, varDump
30 from dc
import EventObjects
48 DTM_TASK_CHECK_STATE_METHOD_STATUS = 0
49 DTM_TASK_CHECK_STATE_METHOD_STATE = 1
50 CONFIG_BATCH_MAX_ITERATIONS_DEFAULT = 2
52 BATCH_TASK_TYPE_ALL = 0
53 BATCH_TASK_TYPE_CRAWL = 1
54 BATCH_TASK_TYPE_PURGE = 2
55 BATCH_TASK_TYPE_AGE = 3
58 CONFIG_SERVER =
"server" 59 CONFIG_DTMD_HOST =
"DTMDHost" 60 CONFIG_DTMD_PORT =
"DTMDPort" 61 CONFIG_DTMD_TIMEOUT =
"DTMDTimeout" 62 CONFIG_SITES_MANAGER_CLIENT =
"clientSitesManager" 63 CONFIG_POLLING_TIMEOUT =
"PollingTimeout" 64 CONFIG_DRCE_CRAWLER_APP_NAME =
"DRCECrawlerAppName" 65 CONFIG_CRAWLED_URLS_STRATEGY =
"CrawledURLStrategy" 66 CONFIG_REGULAR_CRAWL_PERIOD =
"RegularCrawlingPeriod" 67 CONFIG_REGULAR_CRAWL_MODE =
"RegularCrawlingMode" 68 CONFIG_REGULAR_CRAWL_PROPAGATE_URLS =
"RegularCrawlingPropagateURLs" 70 CONFIG_TASK_DTM_NAME_CRAWLING =
"BatchTaskDTMNameCrawl" 72 CONFIG_TASK_DTM_NAME_PURGING =
"BatchTaskDTMNamePurge" 74 CONFIG_TASK_DTM_NAME_AGING =
"BatchTaskDTMNameAging" 76 CONFIG_TASK_DTM_TYPE_CRAWLING =
"BatchTaskDTMTypeCrawl" 78 CONFIG_TASK_DTM_TYPE_PURGING =
"BatchTaskDTMTypePurge" 80 CONFIG_TASK_DTM_TYPE_AGING =
"BatchTaskDTMTypeAging" 83 CONFIG_RET_URLS_MAX_NUMBER =
"ReturnURLsMaxNumber" 84 CONFIG_RET_URLS_PERIOD =
"ReturnURLsPeriod" 85 CONFIG_RET_URLS_TTL =
"ReturnURLsTTL" 86 CONFIG_RET_URLS_MODE =
"ReturnURLsMode" 89 CONFIG_INCR_MIN_FREQ =
"IncrMinFreq" 90 CONFIG_INCR_MAX_DEPTH =
"IncrMaxDepth" 91 CONFIG_INCR_MAX_URL =
"IncrMaxURLs" 92 CONFIG_INCR_CRAWL_PERIOD =
"IncrPeriod" 93 CONFIG_INCR_CRAWL_MODE =
"IncrMode" 96 CONFIG_BATCH_DEFAULT_MAX_TIME =
"BatchDefaultMaxExecutionTime" 97 CONFIG_BATCH_MAX_URLS =
"BatchDefaultMaxURLs" 98 CONFIG_BATCH_ORDER_BY_URLS =
"BatchDefaultOrderByURLs" 99 CONFIG_BATCH_MAX_TASKS =
"BatchDefaultMaxTasks" 100 CONFIG_BATCH_QUEUE_PERIOD =
"BatchQueueProcessingPeriod" 101 CONFIG_BATCH_QUEUE_TASK_TTL =
"BatchQueueTaskTTL" 102 CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD =
"BatchQueueTaskCheckStateMethod" 103 CONFIG_BATCH_DEFAULT_STARTER =
"BatchTask_STARTER" 104 CONFIG_BATCH_DEFAULT_CHECK_URLS_IN_ACTIVE_BATCHES =
"BatchDefaultCheckURLsInActiveBatches" 105 CONFIG_BATCH_MAX_ITERATIONS =
"BatchMaxIterations" 106 CONFIG_BATCH_FETCH_TYPE =
"BatchDefaultFetchTypeOptions" 107 CONFIG_BATCH_MAX_TIME =
"BatchMaxExecutionTime" 108 CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS =
"RemoveUnprocessedBatchItems" 111 CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX =
"BatchTask_IO_WAIT_MAX" 112 CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX =
"BatchTask_CPU_LOAD_MAX" 113 CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN =
"BatchTask_RAM_FREE_MIN" 114 CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY =
"BatchTask_RDELAY" 115 CONFIG_BATCH_DEFAULT_STRATEGY_RETRY =
"BatchTask_RETRY" 116 CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL =
"BatchTask_autocleanup_TTL" 117 CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE =
"BatchTask_autocleanup_DeleteType" 118 CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES =
"BatchTask_autocleanup_DeleteRetries" 119 CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE =
"BatchTask_autocleanup_State" 122 CONFIG_PURGE_PERIOD =
"PurgePeriod" 123 CONFIG_PURGE_MODE =
"PurgeMode" 124 CONFIG_PURGE_BATCH_DEFAULT_MAX_TIME =
"PurgeBatchDefaultMaxExecutionTime" 125 CONFIG_PURGE_BATCH_MAX_URLS =
"PurgeBatchDefaultMaxURLs" 126 CONFIG_PURGE_BATCH_QUEUE_TASK_TTL =
"PurgeBatchQueueTaskTTL" 127 CONFIG_PURGE_BATCH_DEFAULT_STARTER =
"PurgeBatchTask_STARTER" 128 CONFIG_DRCE_DB_APP_NAME =
"DRCEDBAppName" 129 CONFIG_PURGE_BATCH_MAX_TASKS =
"PurgeBatchDefaultMaxTasks" 132 CONFIG_AGING_PERIOD =
"AgingPeriod" 133 CONFIG_RESOURCE_AGING_MODE =
"AgingMode" 134 CONFIG_AGE_BATCH_DEFAULT_MAX_TIME =
"AgingBatchDefaultMaxExecutionTime" 135 CONFIG_AGE_BATCH_MAX_URLS_SITE =
"AgingBatchDefaultMaxURLsSite" 136 CONFIG_AGE_BATCH_MAX_URLS_TOTAL =
"AgingBatchDefaultMaxURLsTotal" 137 CONFIG_AGE_BATCH_MAX_SITES =
"AgingBatchDefaultMaxSites" 138 CONFIG_AGE_BATCH_QUEUE_TASK_TTL =
"AgingBatchQueueTaskTTL" 139 CONFIG_AGE_BATCH_DEFAULT_STARTER =
"AgingBatchTask_STARTER" 140 CONFIG_AGE_BATCH_MAX_TASKS =
"AgingBatchDefaultMaxTasks" 141 CONFIG_AGE_BATCH_URL_CRITERION =
"AgingBatchURLsCriterion" 142 CONFIG_AGE_BATCH_SITE_CRITERION =
"AgingBatchSitesCriterion" 144 BATCH_FETCH_TYPE_COOKIE_NAME =
"FetchType" 145 BATCH_ID_COOKIE_NAME =
"batchId" 154 def __init__(self, configParser, connectionBuilderLight=None):
155 super(BatchTasksManager, self).
__init__()
158 if connectionBuilderLight
is None:
197 className = self.__class__.__name__
219 self.
configVars[DC_CONSTS.INCR_MAX_DEPTH_CONFIG_VAR_NAME] = configParser.getint(className,
230 serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.
serverName)
231 sitesManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT, self.
clientSitesManagerName)
234 self.
dtmdConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
237 TRANSPORT_CONSTS.TCP_TYPE)
276 except ConfigParser.NoOptionError:
421 logger.debug(
"Periodic iteration started.")
429 logger.debug(
"URL_FETCH for regular crawling, now crawl tasks: %s, URLFetch counter:%s", str(crawlTasks),
437 self.
updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_REQUESTS_NAME, 1,
440 self.
updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_CANCELLED_NAME, 1,
442 logger.debug(
"Max crawl batch tasks %s>=%s in queue or URLFetch counter %s reached!",
446 logger.debug(
"Regular crawling disabled!")
452 logger.debug(
"Process DTM tasks queue!")
464 logger.debug(
"URLs return for crawling disabled!")
470 logger.debug(
"URL_FETCH for incremental crawling iteration!")
475 logger.debug(
"Incremental crawling disabled!")
484 logger.debug(
"Age batching, tasks: %s!", str(ageTasks))
494 logger.debug(
"Max age tasks %s reached", str(ageTasks))
496 logger.debug(
"Resources aging disabled")
505 logger.debug(
"Purge batching, tasks: %s!", str(purgeTasks))
515 logger.debug(
"Max purge tasks %s reached", str(purgeTasks))
517 logger.debug(
"Purging disabled!")
521 except Exception
as err:
522 logger.error(
"Exception: " + str(err.message) +
"\n" + Utils.getTracebackInfo())
524 logger.debug(
"Periodic iteration finished.")
538 batchURLPurge.siteLimits = (0, EventObjects.URLPurge.ALL_SITES)
541 logger.debug(
"DTM purge batch was set, taskId=%s", str(taskId))
543 batchURLPurge.queuedTs = time.time()
544 batchURLPurge.crawlerType = EventObjects.Batch.TYPE_PURGE
548 logger.error(
"Error send purge batch task to DTM!")
550 except Exception
as err:
551 logger.error(
"Exception: " + str(err.message) +
"\n" + Utils.getTracebackInfo())
571 batchURLAge.delayedType = EventObjects.DELAYED_OPERATION
574 logger.debug(
"DTM age batch was set, taskId=%s", str(taskId))
576 batchURLAge.queuedTs = time.time()
577 batchURLAge.crawlerType = EventObjects.Batch.TYPE_AGE
581 logger.error(
"Error send age batch task to DTM!")
583 except Exception
as err:
584 logger.error(
"Exception: " + str(err.message) +
"\n" + Utils.getTracebackInfo())
597 if self.
statFields[DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_REQUESTS_NAME] > 0:
598 self.
updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URL_FETCH_REQUESTS_NAME, 1,
601 logger.debug(
"Reply received on URL fetch: %s",
varDump(event))
602 if event.cookie
is not None and isinstance(event.cookie, dict):
603 crawlerType = event.cookie.get(
'type', EventObjects.Batch.TYPE_NORMAL_CRAWLER)
607 crawlerType = EventObjects.Batch.TYPE_NORMAL_CRAWLER
610 clientResponse = event.eventObj
611 if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
612 if len(clientResponse.itemsList) > 0:
613 if event.cookie
is not None and\
614 (isinstance(event.cookie, dict)
and\
615 EventObjects.Batch.OPERATION_TYPE_NAME
in event.cookie
and\
616 (event.cookie[EventObjects.Batch.OPERATION_TYPE_NAME] == EventObjects.Batch.TYPE_NORMAL_CRAWLER
or\
617 event.cookie[EventObjects.Batch.OPERATION_TYPE_NAME] == EventObjects.Batch.TYPE_INCR_CRAWLER)):
620 if len(batch.items) > 0:
625 maxExecutionTime =
None 626 if fetchType
is not None and fetchType == EventObjects.Site.FETCH_TYPE_DYNAMIC:
632 logger.debug(
"DTM batch was set, taskId=%s", str(taskId))
634 batch.queuedTs = time.time()
635 if fetchType
is not None and fetchType == EventObjects.Site.FETCH_TYPE_DYNAMIC:
643 logger.error(
"Error send the Batch object to DTM!")
644 if crawlerType == EventObjects.Batch.TYPE_NORMAL_CRAWLER:
648 logger.debug(
"There is no items in batch, cancelled!")
653 logger.debug(str(urls) +
" URLs returned back to state NEW because processing TTL exceed.")
655 logger.debug(
"There is empty clientResponse.itemsList")
657 logger.error(
"URLFetch response error: " + str(clientResponse.errorCode) +
" : " + clientResponse.errorMessage)
658 except Exception
as err:
659 ExceptionLog.handler(logger, err,
"Exception:")
667 logger.debug(
"Reply received on URL update.")
668 clientResponse = event.eventObj
669 if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
670 if len(clientResponse.itemsList) > 0:
671 for clientResponseItem
in clientResponse.itemsList:
672 if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
673 logger.error(
"URLUpdate response error: " + str(clientResponseItem.errorCode) +
" : " + \
674 clientResponseItem.errorMessage +
", host:" + clientResponseItem.host +
", port:" + \
675 clientResponseItem.port +
", node:" + clientResponseItem.node +
"!")
677 logger.error(
"URLUpdate response empty list!")
679 logger.error(
"URLUpdate response error:" + str(clientResponse.errorCode) +
" : " + clientResponse.errorMessage)
680 except Exception
as err:
681 ExceptionLog.handler(logger, err,
"Exception:")
699 newTaskObj.setSessionVar(
"tmode", dtm.EventObjects.Task.TASK_MODE_ASYNCH)
703 if maxExecutionTime
is None:
706 mt = int(maxExecutionTime)
707 newTaskObj.setSessionVar(
"time_max", mt * 1000)
718 newTaskObj.setSessionVar(
"route", DC_CONSTS.DRCE_REQUEST_ROUTING_RND)
726 newTaskObj.setSessionVar(
"route", DC_CONSTS.DRCE_REQUEST_ROUTING_ROUND_ROBIN)
727 newTaskObj.setSessionVar(
"task_type", int(newTaskObj.type))
730 newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_IO_WAIT_MAX,
733 newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_CPU_LOAD_MAX,
736 newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RAM_FREE,
739 newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RDELAY,
742 newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RETRY,
745 autoCleanupFields = {}
747 autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_TTL] = \
750 autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_DELETE_TYPE] = \
753 autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_DELETE_RETRIES] = \
756 autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_SSTATE] = \
758 if len(autoCleanupFields) > 0:
759 newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_autoCleanupFields, autoCleanupFields)
761 if not hasattr(batch,
'id')
or batch.id == 0:
762 batch.id = newTaskObj.id
764 newTaskObj.id = batch.id
767 newTaskObj.input = pickle.dumps(batch)
770 drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_PURGE, [batch])
772 drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_AGE, [batch])
773 newTaskObj.input = pickle.dumps(drceSyncTasksCoverObj)
775 newTaskEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.NEW_TASK, newTaskObj)
778 if generalResponse
is not None:
779 if generalResponse.errorCode == dtm.EventObjects.GeneralResponse.ERROR_OK:
781 taskId = newTaskObj.id
784 logger.error(
"DTM set batch task error: " + str(generalResponse.errorCode) +
" : " + \
785 generalResponse.errorMessage +
", statuses:" +
varDump(generalResponse))
787 logger.error(
"DTM set batch task response error, possible timeout!")
790 taskId = newTaskObj.id
805 batchItemsCounter = 0
806 batchItemsTotalCounter = 0
807 uniqueURLsCRCDic = {}
810 for item
in clientResponseItems:
811 if item.errorCode == EventObjects.ClientResponseItem.STATUS_OK:
812 if isinstance(item.itemObject, list):
813 for url
in item.itemObject:
814 batchItemsTotalCounter = batchItemsTotalCounter + 1
822 url.batchId = batchId
824 itemId = str(url.siteId) +
":" + str(url.urlMd5)
825 if itemId
not in uniqueURLsCRCDic:
826 uniqueURLsCRCDic[itemId] = batchItem
827 logger.debug(
"Insert batchItem: %s",
varDump(batchItem))
828 batch.items.append(batchItem)
829 batchItemsCounter = batchItemsCounter + 1
832 logger.debug(
"URL is under processing of batch %s, skipped from new batch", str(batchTaskId))
834 logger.error(
"Wrong object type in the itemObject.item: " + str(
type(url)) + \
835 " but 'URL' expected")
836 if batchItemsCounter > 0:
839 logger.error(
"Wrong object type in the ClientResponseItem.itemObject: " + str(
type(item.itemObject)) + \
840 " but 'list' expected")
842 logger.debug(
"ClientResponseItem error: " + str(item.errorCode) +
" : " + item.errorMessage)
844 logger.debug(
"Batch object created, items: " + str(batchItemsTotalCounter) +
" total, " + str(batchItemsCounter) + \
860 for batchItem
in taskBatch.items:
861 if batchItem.siteId == siteId
and batchItem.urlId == urlMd5:
876 batchItemsCounter = 0
877 batchItemsTotalCounter = 0
878 uniqueURLsCRCDic = {}
880 for item
in clientResponseItems:
881 if item.errorCode == EventObjects.ClientResponseItem.STATUS_OK:
882 if isinstance(item.itemObject, list):
883 for url
in item.itemObject:
884 batchItemsTotalCounter = batchItemsTotalCounter + 1
886 itemId = str(url.siteId) +
":" + str(url.urlMd5)
887 if itemId
not in uniqueURLsCRCDic:
888 uniqueURLsCRCDic[itemId] = 1
889 batchItemsCounter = batchItemsCounter + 1
891 uniqueURLsCRCDic[itemId] += 1
893 logger.error(
"Wrong object type in the itemObject.item: " + str(
type(url)) + \
894 " but 'URL' expected")
896 logger.error(
"Wrong object type in the ClientResponseItem.itemObject: " + str(
type(item.itemObject)) + \
897 " but 'list' expected")
899 logger.debug(
"ClientResponseItem error: " + str(item.errorCode) +
" : " + item.errorMessage)
901 logger.debug(
"Unique URLs: " + str(batchItemsCounter) +
", total URLs: " + str(batchItemsTotalCounter))
904 return batchItemsCounter
906 return batchItemsTotalCounter
916 EventObjects.URL.STATUS_SELECTED_CRAWLING)
919 urlUpdate.batchId = newDTMTaskObj.id
922 fetcherCondition =
'' 926 fetcherCondition =
' AND `FetchType`=' + str(fetchType)
927 if fetchType == EventObjects.Site.FETCH_TYPE_DYNAMIC
and\
931 sitesCriterions = {EventObjects.URLFetch.CRITERION_WHERE:
" `sites`.`State`=" + \
932 str(EventObjects.Site.STATE_ACTIVE) + \
933 " AND IFNULL((SELECT `Value` FROM `sites_properties` WHERE `Name`='MODES_FLAG') & 1, 1)<>0" + \
936 urlCriterions = {EventObjects.URLFetch.CRITERION_WHERE:
"`Status`=" + str(EventObjects.URL.STATUS_NEW) + \
939 EventObjects.URLFetch.CRITERION_LIMIT: limit}
943 urlFetch.algorithm = EventObjects.URLFetch.PROPORTIONAL_ALGORITHM
944 urlFetch.maxURLs = int(limit)
945 urlFetchEvent = self.
eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_FETCH, [urlFetch])
946 urlFetchEvent.cookie = {EventObjects.Batch.OPERATION_TYPE_NAME: EventObjects.Batch.TYPE_NORMAL_CRAWLER,
948 if fetchType
is not None:
952 logger.debug(
"The URLFetch request to SitesManager sent!")
954 if fetchType
is not None and fetchType == EventObjects.Site.FETCH_TYPE_DYNAMIC:
957 elif fetchType
is not None and fetchType == EventObjects.Site.FETCH_TYPE_STATIC:
972 EventObjects.URL.STATUS_SELECTED_CRAWLING_INCREMENTAL)
974 conditionStr =
''' `Status` in (%s, %s, %s, %s) AND `Depth`<=%s \ 975 AND `State`=0 AND timediff(`TcDate`,`LastModified`)>=0 AND (NOW() - `TcDate`) > timediff(`TcDate`, `LastModified`) 976 ''' % (EventObjects.URL.STATUS_CRAWLED, EventObjects.URL.STATUS_SELECTED_PROCESSING,
977 EventObjects.URL.STATUS_PROCESSING, EventObjects.URL.STATUS_PROCESSED,
978 self.
configVars[DC_CONSTS.INCR_MAX_DEPTH_CONFIG_VAR_NAME])
979 urlCriterions = {EventObjects.URLFetch.CRITERION_WHERE: conditionStr,
980 EventObjects.URLFetch.CRITERION_ORDER:
"`Depth` ASC, `MRate` DESC, `UDate` DESC",
981 EventObjects.URLFetch.CRITERION_LIMIT:
982 str(self.
configVars[DC_CONSTS.INCR_MAX_URLS_CONFIG_VAR_NAME])
985 siteCriterions = {EventObjects.URLFetch.CRITERION_WHERE:
"`State`=" + \
986 str(EventObjects.Site.STATE_ACTIVE) +
"`ID` in (SELECT `Site_Id` FROM `sites_properties` " + \
987 "WHERE `Name`='INCREMENTAL_CRAWLING' AND `Value`='1') " + \
988 "AND IFNULL((SELECT `Value` FROM `sites_properties` WHERE `Name`='MODES_FLAG') & 1, 1)<>0"}
992 urlFetch.algorithm = EventObjects.URLFetch.PROPORTIONAL_ALGORITHM
993 urlFetch.maxURLs = self.
configVars[DC_CONSTS.INCR_MAX_URLS_CONFIG_VAR_NAME]
994 urlFetchEvent = self.
eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_FETCH, [urlFetch])
995 urlFetchEvent.cookie = {
"type": EventObjects.Batch.TYPE_INCR_CRAWLER}
998 logger.debug(
"The URLFetch for incremental crawler request to SitesManager sent!")
1009 EventObjects.URL.STATUS_NEW)
1011 sitesCriterions = {EventObjects.URLFetch.CRITERION_WHERE:
"`State`=" + str(EventObjects.Site.STATE_ACTIVE)}
1012 urlCriterions = {EventObjects.URLFetch.CRITERION_WHERE:
"`Status` IN (" + \
1013 str(EventObjects.URL.STATUS_SELECTED_CRAWLING) +
"," + \
1014 str(EventObjects.URL.STATUS_CRAWLING) +
"," + \
1015 str(EventObjects.URL.STATUS_SELECTED_PROCESSING) +
"," + \
1016 str(EventObjects.URL.STATUS_PROCESSING) +
") AND DATE_ADD(UDate, INTERVAL " + \
1018 EventObjects.URLFetch.CRITERION_ORDER:
"`UDate` ASC",
1021 urlFetch.algorithm = EventObjects.URLFetch.PROPORTIONAL_ALGORITHM
1023 urlFetchEvent = self.
eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_FETCH, [urlFetch])
1024 urlFetchEvent.cookie = {EventObjects.Batch.OPERATION_TYPE_NAME: EventObjects.Batch.TYPE_URLS_RETURN}
1027 logger.debug(
"The URLFetch request to SitesManager sent!")
1039 logger.debug(
"Check state of taskId=" + str(taskId))
1041 checkStateEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE, checkTaskStateObj)
1043 logger.debug(
"DTM CheckTaskState request finished, taskId=" + str(taskId))
1046 taskState = eeResponseData.state
1049 logger.debug(
"Get status of taskId=" + str(taskId))
1051 getTasksStatusEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.GET_TASK_STATUS, getTasksStatusObj)
1053 logger.debug(
"DTM getTasksStatus request finished, taskId=" + str(taskId))
1054 if listTaskManagerFields
is not None and isinstance(listTaskManagerFields, list):
1055 if len(listTaskManagerFields) > 0:
1056 taskState = listTaskManagerFields[0].fields[
"state"]
1059 taskState = dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED
1060 logger.error(
"DTM getTasksStatus taskId=" + str(taskId) +
" returned empty fields array in response:\n" + \
1061 Utils.varDump(listTaskManagerFields))
1063 logger.error(
"DTM getTasksStatus taskId=" + str(taskId) +
" returned wrong data:\n" + \
1064 Utils.varDump(listTaskManagerFields))
1082 if hasattr(taskBatch,
'ttl'):
1088 if hasattr(taskBatch,
'items'):
1089 items = len(taskBatch.items)
1094 logger.debug(
"Batch in queue type: %s, taskId: %s, ttl: %s, items: %s", str(
type(taskBatch)), str(taskId),
1095 str(ttl), str(items))
1097 if batchState !=
None:
1098 logger.debug(
"Batch state: %s", str(batchState))
1102 deleteTaskEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK, deleteTaskObj)
1104 logger.debug(
"DTM DeleteTask request finished, taskId=" + str(taskId))
1105 if generalResponse
is not None:
1106 if generalResponse.errorCode == dtm.EventObjects.GeneralResponse.ERROR_OK:
1107 logger.debug(
"DTM task deleted, taskId=" + str(taskId))
1108 if batchState == dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED:
1109 logger.debug(
"batch:\n" +
varDump(taskBatch) +
"\n finished, taskId=" + str(taskId))
1112 logger.debug(
"batch:" +
varDump(taskBatch) +
" not finished, state=" + str(batchState))
1116 tmpQueue[taskId] = taskBatch
1117 logger.error(
"DTM delete task taskId=" + str(taskId) +
", error: " + str(generalResponse.errorCode) + \
1118 " : " + generalResponse.errorMessage +
", generalResponse:" +
varDump(generalResponse))
1122 tmpQueue[taskId] = taskBatch
1123 logger.error(
"DTM delete task error: wrong response or timeout, taskId=" + str(taskId) +
" still in queue!")
1125 logger.debug(
"DTM task still alive Id=" + str(taskId) +
" state=" + str(batchState))
1126 if time.time() - taskBatch.queuedTs > ttl:
1130 tmpQueue[taskId] = taskBatch
1131 logger.debug(
"DTM task Id=" + str(taskId) +
" still in queue")
1133 logger.error(
"DTM check task state error: wrong response or timeout, taskId=" + str(taskId) +
"!")
1134 if time.time() - taskBatch.queuedTs > ttl:
1135 logger.error(
"DTM task Id=" + str(taskId) +
" removed from queue by TTL:" + str(ttl))
1138 tmpQueue[taskId] = taskBatch
1140 logger.error(
"DTM task Id=" + str(taskId) +
" saved in queue.")
1150 if taskId
is not None and taskId > 0:
1153 deleteTaskObj.action = dtm.EventObjects.DeleteTask.ACTION_TERMINATE_TASK_AND_DELETE_DATA
1154 deleteTaskEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK, deleteTaskObj)
1156 logger.error(
"DTM task Id=" + str(taskId) +
" terminated and removed from queue by TTL:" + str(ttl) + \
1157 ", generalResponse=" + str(generalResponse) +
", batch=" + str(taskBatch))
1160 if incrementFaultsCounter:
1162 self.
updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_FAULT_NAME, len(taskBatch.items),
1164 if taskId
is not None and taskId > 0
and incrementFaultsCounter:
1181 self.
updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_CHECK_FAULT_NAME, 1,
1185 self.
updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_CHECK_FAULT_NAME, 1,
1188 self.
updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_CHECK_FAULT_NAME, 1,
1198 self.
updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_DELETE_FAULT_NAME, 1,
1202 self.
updateStatField(DC_CONSTS.BATCHES_PURGE_COUNTER_DELETE_FAULT_NAME, 1,
1205 self.
updateStatField(DC_CONSTS.BATCHES_AGE_COUNTER_DELETE_FAULT_NAME, 1,
1216 deadStates = [dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED,
1217 dtm.EventObjects.EEResponseData.TASK_STATE_CRASHED,
1218 dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED,
1219 dtm.EventObjects.EEResponseData.TASK_STATE_UNDEFINED,
1220 dtm.EventObjects.EEResponseData.TASK_STATE_SET_ERROR,
1221 dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL,
1222 dtm.EventObjects.EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED
1225 if state
in deadStates
or (state == dtm.EventObjects.EEResponseData.TASK_STATE_NEW_SCHEDULED
and time.time() - queuedTs > ttl):
1245 logger.debug(
"Batches tasks in queue - total:%s, crawl:%s, purge:%s, age:%s", str(len(self.
dtmTasksQueue)),
1246 str(self.
statFields[DC_CONSTS.BATCHES_CRAWL_COUNTER_QUEUE_NAME]),
1247 str(self.
statFields[DC_CONSTS.BATCHES_PURGE_COUNTER_NAME]),
1248 str(self.
statFields[DC_CONSTS.BATCHES_AGE_COUNTER_NAME]))
1250 if tasksWithItems > 0:
1291 logger.debug(
"Send update URLs from batch: %s for all foreign hosts by the Batch_Id", str(taskBatch.id))
1294 logger.debug(
"Send delete URLs from batch: %s for all foreign hosts by the Batch_Id", str(taskBatch.id))
1297 logger.debug(
"Purge batch: %s finished!", str(taskBatch.id))
1299 logger.debug(
"Age batch: %s finished!", str(taskBatch.id))
1317 for i
in range(maxTries + 1):
1320 logger.error(
"DTMD request timeout reached " + str(timeout) +
"!")
1325 if retEvent !=
None:
1332 isinstance(retEvent.eventObj, list):
1333 if retEvent.uid == requestEvent.uid:
1334 ret = retEvent.eventObj
1337 logger.error(
"DTMD returned wrong object uid: " + str(retEvent.uid) +
" but " + \
1338 str(requestEvent.uid) +
" expected, iteration " + str(i) +
"!")
1340 logger.error(
"DTMD returned wrong object type: " + str(
type(retEvent.eventObj)) +
"!")
1342 logger.error(
"DTMD returned None event!")
1343 except Exception, e:
1344 logger.error(
"DTMD request execution exception: " + e.message +
"!")
1346 logger.debug(
"The DTMD request finished!")
1361 for batchItem
in batchItemsList:
1365 if batchState
is True:
1366 status = EventObjects.URL.STATUS_CRAWLED
1367 sqlExpression =
SQLExpression(
"`URLMd5`='" + str(batchItem.urlId) +
"' AND (" + \
1368 "(`Batch_Id`<>" + str(batchId) +
" AND `Status`=" + \
1369 str(EventObjects.URL.STATUS_NEW) +
")" + \
1370 " OR (`Batch_Id`=" + str(batchId) +
" AND `Status` IN (" + \
1371 str(EventObjects.URL.STATUS_SELECTED_CRAWLING) +
"," + \
1372 str(EventObjects.URL.STATUS_SELECTED_CRAWLING_INCREMENTAL) +
"))" + \
1373 ")" + notRootURLExpr)
1375 status = EventObjects.URL.STATUS_NEW
1376 sqlExpression =
SQLExpression(
"`URLMd5`='" + str(batchItem.urlId) +
"' AND `Status` IN (" +
1377 str(EventObjects.URL.STATUS_SELECTED_CRAWLING) +
"," + \
1378 str(EventObjects.URL.STATUS_SELECTED_CRAWLING_INCREMENTAL) +
")" + \
1381 urlUpdate =
EventObjects.URLUpdate(batchItem.siteId, batchItem.urlId, EventObjects.URLStatus.URL_TYPE_MD5,
1383 urlUpdate.processed = 0
1384 urlUpdate.crawled = 0
1385 urlUpdate.criterions[EventObjects.URLFetch.CRITERION_WHERE] = sqlExpression
1386 logger.debug(
"batch: %s, URLUpdate: %s", str(batchId),
varDump(urlUpdate))
1387 urlsList.append(urlUpdate)
1390 urlUpdateEvent = self.
eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_UPDATE, urlsList)
1393 logger.debug(
"The URLUpdate request to SitesManager sent!")
1405 for batchItem
in batchItemsList:
1406 sqlExpression =
SQLExpression(
"`ParentMd5`<>'' AND `URLMd5`='" + str(batchItem.urlId) +
"' AND `Batch_Id`<>" + \
1409 {EventObjects.URLFetch.CRITERION_WHERE:sqlExpression,
1410 EventObjects.URLFetch.CRITERION_LIMIT:1},
1411 reason=EventObjects.URLDelete.REASON_SELECT_TO_CRAWL_TTL)
1412 logger.debug(
"URLDelete: " +
varDump(urlDelete))
1413 urlsList.append(urlDelete)
1416 urlDeleteEvent = self.
eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_DELETE, urlsList)
1419 logger.debug(
"The URLDelete request to SitesManager sent!")
1428 logger.debug(
"Reply received on URL delete.")
1429 clientResponse = event.eventObj
1430 if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
1431 if len(clientResponse.itemsList) > 0:
1432 for clientResponseItem
in clientResponse.itemsList:
1433 if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
1434 logger.error(
"URLDelete response error: " + str(clientResponseItem.errorCode) +
" : " + \
1435 clientResponseItem.errorMessage +
", host:" + clientResponseItem.host +
", port:" + \
1436 clientResponseItem.port +
", node:" + clientResponseItem.node +
"!")
1438 logger.error(
"URLDelete response empty list!")
1440 logger.error(
"URLDelete response error:" + str(clientResponse.errorCode) +
" : " + clientResponse.errorMessage)
1441 except Exception
as err:
1442 ExceptionLog.handler(logger, err,
"Exception:")
1451 for batchItem
in batchItemsList:
1452 if isinstance(batchItem.urlObj,
EventObjects.URL)
and batchItem.urlObj.parentMd5 !=
'':
1453 urlObj = copy.deepcopy(batchItem.urlObj)
1454 urlObj.status = EventObjects.URL.STATUS_CRAWLED
1456 urlObj.processed = 0
1457 urlObj.contentType =
'' 1460 urlObj.errorMask = 0
1461 urlObj.crawlingTime = 0
1462 urlObj.processingTime = 0
1463 urlObj.totalTime = 0
1470 urlObj.rawContentMd5 =
"" 1473 urlObj.mRateCounter = 0
1474 urlObj.contentMask = EventObjects.URL.CONTENT_EMPTY
1476 urlObj.tagsCount = 0
1478 urlObj.urlUpdate =
None 1480 logger.debug(
"URLNew item: %s, batchId: %s", urlObj.urlMd5, str(urlObj.batchId))
1481 urlsList.append(urlObj)
1483 if len(urlsList) > 0:
1485 urlNewEvent = self.
eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_NEW, urlsList)
1488 logger.debug(
"The URLNew request to SitesManager sent!")
1497 logger.debug(
"Reply received on URL new.\n" + Utils.varDump(event))
1498 clientResponse = event.eventObj
1499 if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
1500 if len(clientResponse.itemsList) > 0:
1501 for clientResponseItem
in clientResponse.itemsList:
1502 if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
1503 logger.error(
"URLNew response error: " + str(clientResponseItem.errorCode) +
" : " + \
1504 clientResponseItem.errorMessage +
", host:" + clientResponseItem.host +
", port:" + \
1505 clientResponseItem.port +
", node:" + clientResponseItem.node +
"!")
1507 logger.error(
"URLNew response empty list!")
1509 logger.error(
"URLNew response error:" + str(clientResponse.errorCode) +
" : " + clientResponse.errorMessage)
1510 except Exception
as err:
1511 ExceptionLog.handler(logger, err,
"Exception:")
int BATCH_TASK_TYPE_CRAWL
int BATCH_TASK_TYPE_PURGE
string CONFIG_BATCH_DEFAULT_STRATEGY_RETRY
string CONFIG_BATCH_MAX_ITERATIONS
def __init__(self, configParser, connectionBuilderLight=None)
string CONFIG_REGULAR_CRAWL_PERIOD
def sendURLNew(self, batchItemsList)
string CONFIG_POLLING_TIMEOUT
string CONFIG_BATCH_MAX_URLS
string CONFIG_INCR_CRAWL_PERIOD
string CONFIG_TASK_DTM_TYPE_PURGING
string CONFIG_BATCH_MAX_TASKS
string CONFIG_BATCH_DEFAULT_MAX_TIME
def onURLDeleteResponse(self, event)
string CONFIG_AGE_BATCH_QUEUE_TASK_TTL
string CONFIG_INCR_MIN_FREQ
string CONFIG_PURGE_PERIOD
def getURLsCountFromClientResponseItems(self, clientResponseItems, unique=True)
string CONFIG_PURGE_BATCH_DEFAULT_STARTER
def updateStatField(self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
update values of stat field - default sum
string CONFIG_DRCE_CRAWLER_APP_NAME
def getBatchTasksCount(self, batchType=BATCH_TASK_TYPE_CRAWL)
int CONFIG_BATCH_MAX_ITERATIONS_DEFAULT
string CONFIG_TASK_DTM_TYPE_AGING
string CONFIG_CRAWLED_URLS_STRATEGY
NewTask event object, defines the Task object fields.
GeneralResponse event object, represents general state response for multipurpose usage.
string POLL_TIMEOUT_CONFIG_VAR_NAME
def processSelectedURLsReturn(self)
def makeBatchFromClientResponseItems(self, clientResponseItems, crawlerType, batchId=0)
string CONFIG_BATCH_DEFAULT_STARTER
def sendIncrURLRequest(self)
string CONFIG_INCR_MAX_DEPTH
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
string CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE
processSelectedURLsRetLastTs
def addConnection(self, name, connection)
string CONFIG_BATCH_FETCH_TYPE
def getBatchTaskIdByURL(self, siteId, urlMd5)
DeleteTask event object, to delete task from DTM application and from EE.
def processFinishedBatch(self, taskBatch)
def isDTMTaskDead(self, state, queuedTs, ttl)
string CONFIG_AGE_BATCH_DEFAULT_MAX_TIME
string CONFIG_BATCH_DEFAULT_CHECK_URLS_IN_ACTIVE_BATCHES
This is app base class for management server connection end-points and parallel transport messages pr...
string CONFIG_AGE_BATCH_MAX_SITES
int STAT_FIELDS_OPERATION_SET
def deleteDTMTaskFaultCountersUpdate(self, taskBatch)
def sendURLDelete(self, batchItemsList, batchId)
def poll(self)
poll function polling connections receive as multipart msg, the second argument is pickled pyobj ...
string CONFIG_RET_URLS_MODE
def onURLFetchResponse(self, event)
def on_poll_timeout(self)
string CONFIG_PURGE_BATCH_QUEUE_TASK_TTL
string CONFIG_AGE_BATCH_MAX_URLS_SITE
def finishDTMTaskFaultPostProcess(self, taskBatch, taskId=None, ttl=0, incrementFaultsCounter=True)
CheckTaskState event object, for check task status inside EE.
def onURLUpdateResponse(self, event)
int STAT_FIELDS_OPERATION_ADD
string CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS
string CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL
string CONFIG_PURGE_BATCH_MAX_URLS
def checkDTMTaskFaultCountersUpdate(self, taskBatch)
string CONFIG_BATCH_ORDER_BY_URLS
Class hides routines of bulding connection objects.
statFields
stat fields container
def send(self, connect_name, event)
send event
string CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN
string CONFIG_TASK_DTM_NAME_PURGING
string CONFIG_DTMD_TIMEOUT
string CONFIG_PURGE_BATCH_MAX_TASKS
def dtmdRequestExecute(self, requestEvent, timeout, maxTries=100)
string CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE
string CONFIG_BATCH_QUEUE_PERIOD
string CONFIG_INCR_MAX_URL
string CONFIG_TASK_DTM_TYPE_CRAWLING
string CONFIG_AGE_BATCH_SITE_CRITERION
string CONFIG_AGING_PERIOD
string CONFIG_RET_URLS_TTL
string CONFIG_REGULAR_CRAWL_MODE
def sendURLFetchRequest(self)
string CONFIG_RET_URLS_PERIOD
string CONFIG_AGE_BATCH_DEFAULT_STARTER
def onURLNewResponse(self, event)
string CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES
string CONFIG_TASK_DTM_NAME_AGING
processRegularCrawlLastTs
int DTM_TASK_CHECK_STATE_METHOD_STATUS
def sendURLUpdate(self, batchItemsList, batchId, batchState)
string CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX
string CONFIG_INCR_CRAWL_MODE
GetTasksStatus event object, for check task status operation.
def sendBatchTaskToDTM(self, batch, maxExecutionTime=None)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
string CONFIG_AGE_BATCH_URL_CRITERION
sendURLFetchRequestCounter
string CONFIG_AGE_BATCH_MAX_TASKS
string CONFIG_BATCH_QUEUE_TASK_TTL
string CONFIG_AGE_BATCH_MAX_URLS_TOTAL
int STAT_FIELDS_OPERATION_SUB
def updateDTMTasksQueueCounters(self, tasksWithItems=0, itemsTotal=0)
string CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD
string CONFIG_SITES_MANAGER_CLIENT
def getDTMTaskState(self, taskId)
string BATCH_FETCH_TYPE_COOKIE_NAME
string CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX
string CONFIG_RET_URLS_MAX_NUMBER
EEResponseData event object, store task results data, returned from EE.
string BATCH_ID_COOKIE_NAME
string CONFIG_RESOURCE_AGING_MODE
def processDTMTasksQueue(self)
string CONFIG_BATCH_MAX_TIME
int STAT_FIELDS_OPERATION_INIT
string CONFIG_DRCE_DB_APP_NAME
string CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY
string CONFIG_PURGE_BATCH_DEFAULT_MAX_TIME
string CONFIG_REGULAR_CRAWL_PROPAGATE_URLS
string CONFIG_TASK_DTM_NAME_CRAWLING