2 HCE project, Python bindings, Distributed Crawler application. 3 BatchTasksManagerProcess object and related classes definitions. 6 @author bgv bgv.hce@gmail.com 7 @link: http://hierarchical-cluster-engine.com/ 8 @copyright: Copyright © 2015 IOIX Ukraine 9 @license: http://hierarchical-cluster-engine.com/license/ 16 import cPickle
as pickle
21 from dc
import EventObjects
27 from app.Utils import SQLExpression, varDump
43 DTM_TASK_CHECK_STATE_METHOD_STATUS = 0
44 DTM_TASK_CHECK_STATE_METHOD_STATE = 1
47 CONFIG_SERVER =
"server" 48 CONFIG_DTMD_HOST =
"DTMDHost" 49 CONFIG_DTMD_PORT =
"DTMDPort" 50 CONFIG_DTMD_TIMEOUT =
"DTMDTimeout" 51 CONFIG_POLLING_TIMEOUT =
"PollingTimeout" 52 CONFIG_SITES_MANAGER_CLIENT =
"clientSitesManager" 54 CONFIG_DRCE_PROCESSOR_APP_NAME =
"DRCEProcessorAppName" 55 CONFIG_DRCE_DB_APP_NAME =
"DRCEDBAppName" 56 CONFIG_PROCESS_PERIOD =
"ProcessingPeriod" 57 CONFIG_PROCESS_MODE =
"ProcessingMode" 59 CONFIG_BATCH_DEFAULT_MAX_TIME =
"BatchDefaultMaxExecutionTime" 60 CONFIG_BATCH_MAX_URLS =
"BatchDefaultMaxURLs" 61 CONFIG_BATCH_ORDER_BY_URLS =
"BatchDefaultOrderByURLs" 62 CONFIG_BATCH_MAX_TASKS =
"BatchDefaultMaxTasks" 63 CONFIG_BATCH_QUEUE_PERIOD =
"BatchQueueProcessingPeriod" 64 CONFIG_BATCH_QUEUE_TASK_TTL =
"BatchQueueTaskTTL" 65 CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD =
"BatchQueueTaskCheckStateMethod" 66 CONFIG_BATCH_DEFAULT_STARTER =
"BatchTask_STARTER" 67 CONFIG_BATCH_WHERE_URLS =
"BatchDefaultWhereURLs" 68 CONFIG_BATCH_WHERE_SITES =
"BatchDefaultWhereSites" 69 CONFIG_BATCH_MAX_TIME =
"BatchMaxExecutionTime" 70 CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS =
"RemoveUnprocessedBatchItems" 73 CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX =
"BatchTask_IO_WAIT_MAX" 74 CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX =
"BatchTask_CPU_LOAD_MAX" 75 CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN =
"BatchTask_RAM_FREE_MIN" 76 CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY =
"BatchTask_RDELAY" 77 CONFIG_BATCH_DEFAULT_STRATEGY_RETRY =
"BatchTask_RETRY" 78 CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL =
"BatchTask_autocleanup_TTL" 79 CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE =
"BatchTask_autocleanup_DeleteType" 80 CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES =
"BatchTask_autocleanup_DeleteRetries" 81 CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE =
"BatchTask_autocleanup_State" 84 CONFIG_TASK_DTM_NAME_PROCESS =
"BatchTaskDTMNameProcess" 86 CONFIG_TASK_DTM_TYPE_PROCESS =
"BatchTaskDTMTypeProcess" 95 def __init__(self, configParser, connectionBuilderLight=None):
96 super(BatchTasksManagerProcess, self).
__init__()
99 if connectionBuilderLight
is None:
124 className = self.__class__.__name__
150 serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.
serverName)
151 sitesManagerConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT, self.
clientSitesManagerName)
154 self.
dtmdConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.CLIENT_CONNECT,
157 TRANSPORT_CONSTS.TCP_TYPE)
230 logger.debug(
"Periodic iteration started.")
237 logger.debug(
"Processing batch cycle iteration started")
242 self.
updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_CANCELLED_NAME, 1,
244 logger.debug(
"Max processing batch tasks %s in queue reached, new batch is not created!",
247 logger.debug(
"Processing batch disabled!")
253 logger.debug(
"Process DTM tasks queue!")
259 except Exception
as err:
260 ExceptionLog.handler(logger, err,
"Exception:")
262 logger.debug(
"Periodic iteration finished.")
275 EventObjects.URL.STATUS_SELECTED_PROCESSING)
278 sCrit =
"IFNULL((SELECT `Value` FROM `sites_properties` WHERE `Name`='PROCESS_WHERE_SITES'), " + \
280 sitesCriterions = {EventObjects.URLFetch.CRITERION_WHERE: sCrit,
281 EventObjects.URLFetch.CRITERION_ORDER:
"Priority DESC, TcDateProcess ASC"}
283 urlCriterions = {EventObjects.URLFetch.CRITERION_WHERE: uCrit,
286 EventObjects.URLFetch.CRITERION_SQL: {
287 "PROCESS_WHERE_URLS":
"SELECT `Value` FROM `dc_sites`.`sites_properties` WHERE " + \
288 "`Name`='PROCESS_WHERE_URLS' AND `Site_Id`=\"%SITE_ID%\" LIMIT 1" 294 urlFetch.algorithm = EventObjects.URLFetch.PROPORTIONAL_ALGORITHM
298 logger.debug(
"DTM process batch was set, taskId=%s", str(taskId))
300 urlFetch.QueuedTs = time.time()
301 urlFetch.crawlerType = EventObjects.Batch.TYPE_PROCESS
307 logger.error(
"Error send process batch task to DTM!")
309 except Exception
as err:
310 ExceptionLog.handler(logger, err,
"Exception:")
326 newTaskObj.setSessionVar(
"tmode", dtm.EventObjects.Task.TASK_MODE_ASYNCH)
329 newTaskObj.setSessionVar(
"task_type", int(newTaskObj.type))
332 newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_IO_WAIT_MAX,
335 newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_CPU_LOAD_MAX,
338 newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RAM_FREE,
341 newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RDELAY,
344 newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_RETRY,
347 autoCleanupFields = {}
349 autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_TTL] = \
352 autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_DELETE_TYPE] = \
355 autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_DELETE_RETRIES] = \
358 autoCleanupFields[dtm.EventObjects.Task.STRATEGY_AUTOCLEANUP_SSTATE] = \
360 if len(autoCleanupFields) > 0:
361 newTaskObj.setStrategyVar(dtm.EventObjects.Task.STRATEGY_autoCleanupFields, autoCleanupFields)
362 batch.id = newTaskObj.id
363 drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_FETCH, [batch])
364 newTaskObj.input = pickle.dumps(drceSyncTasksCoverObj)
365 newTaskEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.NEW_TASK, newTaskObj)
367 if generalResponse
is not None:
368 if generalResponse.errorCode == dtm.EventObjects.GeneralResponse.ERROR_OK:
370 taskId = newTaskObj.id
373 logger.error(
"DTM set batch task error: " + str(generalResponse.errorCode) +
" : " + \
374 generalResponse.errorMessage +
", statuses:" +
varDump(generalResponse))
376 logger.error(
"DTM set batch task response error, possible timeout!")
379 taskId = newTaskObj.id
393 logger.debug(
"Check state of taskId=" + str(taskId))
395 checkStateEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE, checkTaskStateObj)
397 logger.debug(
"DTM CheckTaskState request finished, taskId=" + str(taskId))
399 taskState = eeResponseData.state
402 logger.debug(
"Get status of taskId=" + str(taskId))
404 getTasksStatusEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.GET_TASK_STATUS, getTasksStatusObj)
406 logger.debug(
"DTM getTasksStatus request finished, taskId=" + str(taskId))
407 if listTaskManagerFields
is not None and isinstance(listTaskManagerFields, list):
408 if len(listTaskManagerFields) > 0:
409 taskState = listTaskManagerFields[0].fields[
"state"]
412 taskState = dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED
414 logger.error(
"DTM getTasksStatus taskId=" + str(taskId) +
" returned wrong data:\n" + \
415 Utils.varDump(listTaskManagerFields))
427 logger.debug(
"Process batch tasks in queue:" + str(len(self.
dtmTasksQueue)))
435 if batchState !=
None:
436 logger.debug(
"Process batch state " + str(batchState) +
", Id=" + str(taskId))
437 if batchState == dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED
or\
438 batchState == dtm.EventObjects.EEResponseData.TASK_STATE_CRASHED
or\
439 batchState == dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED
or\
440 batchState == dtm.EventObjects.EEResponseData.TASK_STATE_UNDEFINED
or\
441 batchState == dtm.EventObjects.EEResponseData.TASK_STATE_SET_ERROR
or\
442 batchState == dtm.EventObjects.EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED:
445 deleteTaskEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK, deleteTaskObj)
447 logger.debug(
"DTM DeleteTask request finished, taskId=" + str(taskId))
448 if generalResponse
is not None:
449 if generalResponse.errorCode == dtm.EventObjects.GeneralResponse.ERROR_OK:
450 logger.debug(
"DTM task deleted, taskId=" + str(taskId))
453 logger.debug(
"batch:\n" +
varDump(taskBatch) +
"\n finished, taskId=" + str(taskId))
457 logger.debug(
"batch:" +
varDump(taskBatch) +
" not finished, state= " + str(batchState))
462 tmpQueue[taskId] = taskBatch
463 self.
updateStatField(DC_CONSTS.BATCHES_PROCESS_COUNTER_DELETE_FAULT_NAME, 1,
465 logger.error(
"DTM delete task taskId=" + str(taskId) +
" error: " + str(generalResponse.errorCode) + \
466 " : " + generalResponse.errorMessage +
", statuses:" +
varDump(generalResponse))
469 tmpQueue[taskId] = taskBatch
471 logger.error(
"DTM delete task error: wrong response or timeout, taskId=" + str(taskId) +
"!")
473 logger.debug(
"DTM task Id=" + str(taskId) +
" state=" + str(batchState))
474 if time.time() - taskBatch.QueuedTs > ttl:
477 deleteTaskObj.action = dtm.EventObjects.DeleteTask.ACTION_TERMINATE_TASK_AND_DELETE_DATA
478 deleteTaskEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.DELETE_TASK, deleteTaskObj)
480 logger.error(
"DTM task Id=" + str(taskId) +
" terminated and removed from queue by TTL:" + str(ttl))
487 tmpQueue[taskId] = taskBatch
488 logger.debug(
"DTM task Id=" + str(taskId) +
" still in queue")
490 logger.error(
"DTM check task state error: wrong response or timeout, taskId=" + str(taskId) +
"!")
491 if time.time() - taskBatch.QueuedTs > ttl:
492 logger.error(
"DTM task Id=" + str(taskId) +
" removed from queue by TTL:" + str(ttl))
495 tmpQueue[taskId] = taskBatch
497 logger.error(
"DTM task Id=" + str(taskId) +
" saved in queue.")
502 logger.debug(
"The DTM tasks queue processing finished, batch tasks in queue " + str(len(self.
dtmTasksQueue)))
511 if state == dtm.EventObjects.EEResponseData.TASK_STATE_FINISHED
or\
512 state == dtm.EventObjects.EEResponseData.TASK_STATE_CRASHED
or\
513 state == dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED
or\
514 state == dtm.EventObjects.EEResponseData.TASK_STATE_UNDEFINED
or\
515 state == dtm.EventObjects.EEResponseData.TASK_STATE_SET_ERROR
or\
516 state == dtm.EventObjects.EEResponseData.TASK_STATE_TERMINATED_BY_DRCE_TTL
or\
517 state == dtm.EventObjects.EEResponseData.TASK_STATE_SCHEDULE_TRIES_EXCEEDED:
554 for i
in range(maxTries + 1):
557 logger.error(
"DTMD request timeout reached " + str(timeout) +
"!")
566 isinstance(retEvent.eventObj, list):
567 if retEvent.uid == requestEvent.uid:
568 ret = retEvent.eventObj
571 logger.error(
"DTMD returned wrong object uid: " + str(retEvent.uid) +
" but " + \
572 str(requestEvent.uid) +
" expected, iteration " + str(i) +
"!")
574 logger.error(
"DTMD returned wrong object type: " + str(
type(retEvent.eventObj)) +
"!")
576 logger.error(
"DTMD returned None event!")
578 logger.error(
"DTMD request execution exception: " + e.message +
"!")
580 logger.debug(
"The DTMD request finished!")
595 for batchItem
in batchItemsList:
597 if batchState
is True:
598 status = EventObjects.URL.STATUS_CRAWLED
599 sqlExpression =
SQLExpression(
"`URLMd5`='" + str(batchItem.urlId) +
"' AND `Batch_Id` <>" + str(batchId) + \
600 " AND `Status` NOT IN (" + str(EventObjects.URL.STATUS_CRAWLED) + \
601 "," + str(EventObjects.URL.STATUS_SELECTED_PROCESSING) +
")")
603 status = EventObjects.URL.STATUS_NEW
604 sqlExpression =
SQLExpression(
"`URLMd5`='" + str(batchItem.urlId) +
"' AND `Status` IN (" +
605 str(EventObjects.URL.STATUS_SELECTED_CRAWLING) +
"," + \
606 str(EventObjects.URL.STATUS_SELECTED_CRAWLING_INCREMENTAL) +
")")
608 urlUpdate =
EventObjects.URLUpdate(batchItem.siteId, batchItem.urlId, EventObjects.URLStatus.URL_TYPE_MD5,
610 urlUpdate.criterions[EventObjects.URLFetch.CRITERION_WHERE] = sqlExpression
611 logger.debug(
"URLUpdate: " +
varDump(urlUpdate))
612 urlsList.append(urlUpdate)
615 urlUpdateEvent = self.
eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_UPDATE, urlsList)
618 logger.debug(
"The URLUpdate request to SitesManager sent!")
630 for batchItem
in batchItemsList:
631 sqlExpression =
SQLExpression(
"`ParentMd5`<>'' AND `URLMd5`='" + str(batchItem.urlId) +
"' AND `Batch_Id`<>" + \
634 {EventObjects.URLFetch.CRITERION_WHERE:sqlExpression,
635 EventObjects.URLFetch.CRITERION_LIMIT:1},
636 reason=EventObjects.URLDelete.REASON_SELECT_TO_PROCESS_TTL)
637 logger.debug(
"URLDelete: " +
varDump(urlDelete))
638 urlsList.append(urlDelete)
641 urlDeleteEvent = self.
eventBuilder.build(DC_CONSTS.EVENT_TYPES.URL_DELETE, urlsList)
644 logger.debug(
"The URLDelete request to SitesManager sent!")
653 logger.debug(
"Reply received on URL update.")
654 clientResponse = event.eventObj
655 if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
656 if len(clientResponse.itemsList) > 0:
657 for clientResponseItem
in clientResponse.itemsList:
658 if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
659 logger.error(
"URLUpdate response error: " + str(clientResponseItem.errorCode) +
" : " + \
660 clientResponseItem.errorMessage +
", host:" + clientResponseItem.host +
", port:" + \
661 clientResponseItem.port +
", node:" + clientResponseItem.node +
"!")
663 logger.error(
"URLUpdate response empty list!")
665 logger.error(
"URLUpdate response error:" + str(clientResponse.errorCode) +
" : " + clientResponse.errorMessage)
666 except Exception
as err:
667 ExceptionLog.handler(logger, err,
"Exception:")
675 logger.debug(
"Reply received on URL delete.")
676 clientResponse = event.eventObj
677 if clientResponse.errorCode == EventObjects.ClientResponse.STATUS_OK:
678 if len(clientResponse.itemsList) > 0:
679 for clientResponseItem
in clientResponse.itemsList:
680 if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
681 logger.error(
"URLDelete response error: " + str(clientResponseItem.errorCode) +
" : " + \
682 clientResponseItem.errorMessage +
", host:" + clientResponseItem.host +
", port:" + \
683 clientResponseItem.port +
", node:" + clientResponseItem.node +
"!")
685 logger.error(
"URLDelete response empty list!")
687 logger.error(
"URLDelete response error:" + str(clientResponse.errorCode) +
" : " + clientResponse.errorMessage)
688 except Exception
as err:
689 ExceptionLog.handler(logger, err,
"Exception:")
string CONFIG_POLLING_TIMEOUT
def __init__(self, configParser, connectionBuilderLight=None)
def sendBatchTaskToDTM(self, batch)
def processFinishedBatch(self, taskBatch)
def processDTMTasksQueue(self)
string CONFIG_PROCESS_PERIOD
string CONFIG_BATCH_MAX_TIME
def isDTMTaskDead(self, state)
def updateStatField(self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
update values of stat field - default sum
def onURLDeleteResponse(self, event)
string CONFIG_DRCE_PROCESSOR_APP_NAME
string CONFIG_SITES_MANAGER_CLIENT
string CONFIG_BATCH_DEFAULT_STRATEGY_IO_WAIT_MAX
NewTask event object, defines the Task object fields.
GeneralResponse event object, represents general state response for multipurpose usage.
string POLL_TIMEOUT_CONFIG_VAR_NAME
string CONFIG_BATCH_WHERE_URLS
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def addConnection(self, name, connection)
def sendURLUpdate(self, batchItemsList, batchId, batchState)
string CONFIG_BATCH_QUEUE_TASK_TTL
DeleteTask event object, to delete task from DTM application and from EE.
string CONFIG_DTMD_TIMEOUT
This is app base class for management server connection end-points and parallel transport messages pr...
def setProcessBatch(self)
int STAT_FIELDS_OPERATION_SET
def getDTMTaskState(self, taskId)
def poll(self)
poll function polling connections receive as multipart msg, the second argument is pickled pyobj ...
CheckTaskState event object, for check task status inside EE.
int STAT_FIELDS_OPERATION_ADD
string CONFIG_TASK_DTM_NAME_PROCESS
int DTM_TASK_CHECK_STATE_METHOD_STATUS
string CONFIG_BATCH_DEFAULT_STRATEGY_CPU_LOAD_MAX
string CONFIG_BATCH_DEFAULT_STRATEGY_STRATEGY_RDELAY
Class hides routines of bulding connection objects.
string CONFIG_BATCH_DEFAULT_STRATEGY_RETRY
def send(self, connect_name, event)
send event
string CONFIG_BATCH_QUEUE_PERIOD
string CONFIG_BATCH_MAX_TASKS
string CONFIG_BATCH_MAX_URLS
string CONFIG_BATCH_DEFAULT_STRATEGY_RAM_FREE_MIN
string CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_TYPE
def dtmdRequestExecute(self, requestEvent, timeout, maxTries=100)
string CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_DELETE_RETRIES
GetTasksStatus event object, for check task status operation.
string CONFIG_BATCH_QUEUE_TASK_CHECK_METHOD
string CONFIG_DRCE_DB_APP_NAME
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
def onURLUpdateResponse(self, event)
string CONFIG_PROCESS_MODE
def sendURLDelete(self, batchItemsList, batchId)
string CONFIG_BATCH_ORDER_BY_URLS
def on_poll_timeout(self)
EEResponseData event object, store task results data, returned from EE.
string CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_TTL
string CONFIG_BATCH_DEFAULT_MAX_TIME
string CONFIG_BATCH_DEFAULT_STRATEGY_AUTOCLEANUP_STATE
string CONFIG_BATCH_WHERE_SITES
string CONFIG_BATCH_DEFAULT_STARTER
string CONFIG_TASK_DTM_TYPE_PROCESS
int STAT_FIELDS_OPERATION_INIT
string CONFIG_BATCH_REMOVE_UNPROCESSED_ITEMS