2 HCE project, Python bindings, Distributed Tasks Manager application. 3 RTCFinalizer Class content main functional for finalize realtime crawling. 7 @author Oleksii <developers.hce@gmail.com>, bgv, Alexander Vybornyh <alexander.hce.cluster@gmail.com> 8 @link: http://hierarchical-cluster-engine.com/ 9 @copyright: Copyright © 2013-2015 IOIX Ukraine 10 @license: http://hierarchical-cluster-engine.com/license/ 18 from cement.core
import foundation
36 MSG_ERROR_PARSE_CMD_PARAMS =
"Error parse command line parameters." 37 MSG_ERROR_EMPTY_CONFIG_FILE_NAME =
"Config file name is empty." 38 MSG_ERROR_WRONG_CONFIG_FILE_NAME =
"Config file name is wrong" 39 MSG_ERROR_LOAD_APP_CONFIG =
"Error loading application config file." 40 MSG_ERROR_READ_LOG_CONFIG =
"Error read log config file." 41 MSG_ERROR_READ_DB_TASK_CONFIG =
"Error read db-task config file." 44 MSG_ERROR_DELETE_URL =
"Delete url has failed" 45 MSG_DELETE_URL_OK =
"URL was deleted" 48 FINALIZER_OPTION_LOG =
"log" 49 FINALIZER_OPTION_DB_TASK_INI =
"db_task_ini" 53 label = DC_CRAWLER_CONSTS.RTC_FINALIZER_APP_NAME
60 foundation.CementApp.__init__(self)
75 foundation.CementApp.setup(self)
81 foundation.CementApp.run(self)
90 self.
logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
99 confLogFileName, confDBTaskName = self.
__loadAppConfig(self.pargs.config)
104 self.
rb = self.pargs.rb
105 self.
rc = int(self.pargs.rc)
if self.pargs.rc
is not None else None 118 config = ConfigParser.ConfigParser()
119 config.optionxform = str
121 readOk = config.read(configName)
126 if config.has_section(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME):
127 confLogFileName = str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME, self.
FINALIZER_OPTION_LOG))
128 confDBTaskName = str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME, \
131 except Exception, err:
134 return confLogFileName, confDBTaskName
143 if isinstance(configName, str)
and len(configName) == 0:
146 logging.config.fileConfig(configName)
151 except Exception, err:
162 if isinstance(configName, str)
and len(configName) > 0:
164 config = ConfigParser.ConfigParser()
165 config.optionxform = str
166 config.read(configName)
168 except Exception, err:
177 input_pickled_object = sys.stdin.read()
181 input_data = (pickle.loads(input_pickled_object))
185 self.
batch = input_data
188 except Exception, err:
189 raise Exception(
'getBatchFromInput error: ' + str(err))
193 urlContentRequest = []
194 num_of_items = len(self.
items)
195 self.
logger.debug(
"Num of items in batch: <<%s>>" % (num_of_items))
197 for item
in self.
items:
199 urlContentRequest.append(
None)
200 self.
logger.debug(
"Item is None.")
203 url = item.urlObj.url
205 self.
logger.debug(
"Item #%s: siteId: <<%s>>, urlId: <<%s>>, url: <<%s>>" % (item_no, siteId, urlId, url))
206 _urlContentRequest = dc_event.URLContentRequest(siteId, url)
207 _urlContentRequest.dbFieldsList = [
"Status",
"Crawled",
"Processed",
"ContentType",
"Charset",
"ErrorMask", \
208 "CrawlingTime",
"ProcessingTime",
"HTTPCode",
"Size",
"LinksI",
"LinksE", \
209 "RawContentMd5",
"LastModified",
"CDate",
"UDate",
"TagsMask",
"TagsCount", \
210 "PDate",
"ContentURLMd5",
"Batch_Id"]
211 urlContentRequest.append(_urlContentRequest)
212 item_no = item_no + 1
214 drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_CONTENT, urlContentRequest)
215 responseDRCESyncTasksCover = self.
dbTask.
process(drceSyncTasksCoverObj)
226 if self.
rb is not None:
227 self.
logger.debug(
"batchSaveFile is = " + str(self.
rb))
230 for item
in self.
items:
231 if item.siteObj
is not None and item.siteObj.fetchType == BaseFetcher.TYP_AUTO:
232 if item.urlPutObj
is not None and contentCheck.lookMetricsinContent(item.urlPutObj):
233 self.
logger.debug(
">>> start checkUrlPutObj")
235 toRecrawl = contentCheck.checkUrlPutObj(item.urlPutObj, contentCheck.CHECK_TYPE_SIMPLE, metricsApplying)
237 self.
logger.debug(
">>> start urlObj")
238 toRecrawl = contentCheck.checkUrlObj(item.urlObj)
240 urlCleanup = dc_event.URLCleanup(item.urlObj.siteId, item.urlObj.url)
241 urlCleanup.urlType = dc_event.URLStatus.URL_TYPE_MD5
242 urlCleanup.url = item.urlObj.urlMd5
243 urlCleanupList.append(urlCleanup)
244 item.siteObj.fetchType = BaseFetcher.TYP_DYNAMIC
245 item.urlObj.status = dc_event.URL.STATUS_SELECTED_CRAWLING
246 item.urlObj.crawled = 0
247 item.urlObj.urlPut =
None 248 item.urlPutObj =
None 249 if self.
rc is not None:
251 if len(urlCleanupList) > 0:
252 drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_CLEANUP, urlCleanupList)
254 fd = open(self.
rb,
"w")
256 pickleObj = pickle.dumps(self.
batch)
264 for item
in self.
items:
265 url = item.urlObj.url
266 if item.urlPutObj
is not None:
269 if len(item.urlObj.attributes) > 0:
270 self.
logger.debug(
"item.urlPutObj.attributes: %s",
varDump(item.urlObj.attributes))
271 attributes = item.urlObj.attributes
272 except Exception, err:
273 self.
logger.
error(
"load attributes failed: %s", str(err))
275 if item.urlPutObj.putDict[
"cDate"]
is not None:
276 contents = [dc_event.Content(item.urlPutObj.putDict[
"data"], item.urlPutObj.putDict[
"cDate"],
277 dc_event.Content.CONTENT_PROCESSOR_CONTENT)]
279 contents = [dc_event.Content(item.urlPutObj.putDict[
"data"],
280 typeId=dc_event.Content.CONTENT_PROCESSOR_CONTENT)]
285 if item.urlObj.urlPut
is not None and isFetchRawContent
is not None and int(isFetchRawContent) == 1:
286 rawContents = [dc_event.Content(item.urlObj.urlPut.putDict[
"data"], item.urlObj.urlPut.putDict[
"cDate"],
287 dc_event.Content.CONTENT_RAW_CONTENT)]
288 urlContentResponse = dc_event.URLContentResponse(url, rawContents, processedContents=contents)
289 urlContentResponse.status = 7
290 urlContentResponse.urlMd5 = item.urlObj.urlMd5
291 urlContentResponse.siteId = item.siteId
292 urlContentResponse.contentURLMd5 = item.urlObj.contentURLMd5
293 urlContentResponse.rawContentMd5 = item.urlObj.rawContentMd5
294 urlContentResponse.attributes = attributes
295 urlContentResponse.dbFields = {
"Status":item.urlObj.status,
296 "Crawled":item.urlObj.crawled,
297 "Processed":item.urlObj.processed,
298 "ContentType":item.urlObj.contentType,
299 "Charset":item.urlObj.charset,
300 "ErrorMask":item.urlObj.errorMask,
301 "CrawlingTime":item.urlObj.crawlingTime,
302 "ProcessingTime":item.urlObj.processingTime,
303 "HttpCode":item.urlObj.httpCode,
304 "Size":item.urlObj.size,
305 "LinksI":item.urlObj.linksI,
306 "LinksE":item.urlObj.linksE,
307 "RawContentMd5":item.urlObj.rawContentMd5,
308 "LastModified":item.urlObj.lastModified,
309 "CDate":item.urlObj.CDate,
310 "UDate":item.urlObj.UDate,
311 "TagsMask":item.urlObj.tagsMask,
312 "TagsCount":item.urlObj.tagsCount,
313 "PDate":item.urlObj.pDate,
314 "ContentURLMd5":item.urlObj.contentURLMd5,
315 "BatchId":item.urlObj.batchId}
317 if item.urlPutObj
is not None and "properties" in item.urlPutObj.putDict:
318 urlContentResponse.itemProperties = item.urlPutObj.putDict[
"properties"]
327 if self.
batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER:
328 self.
logger.debug(
"Real-Time crawling batch")
332 self.
logger.debug(
"Regular crawling batch")
340 items = self.
batch.items
341 urlDeleteRequest = []
342 num_of_items = len(items)
343 self.
logger.debug(
"Num of items to delete in batch: <<%s>>" % (num_of_items))
345 for item
in self.
items:
349 url = item.urlObj.url
351 self.
logger.debug(
"Delete item #%s: siteId: <<%s>>, urlId: <<%s>>, url: <<%s>>" % (item_no, siteId, urlId, url))
352 urlDeleteRequest.append(dc_event.URLDelete(siteId, url, reason=dc_event.URLDelete.REASON_RT_FINALIZER))
353 item_no = item_no + 1
355 drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_DELETE, urlDeleteRequest)
356 responseDRCESyncTasksCover = self.
dbTask.
process(drceSyncTasksCoverObj)
357 urlDeleteResponse = responseDRCESyncTasksCover.eventObject
358 self.
logger.debug(
"urlDeleteResponse: %s",
varDump(urlDeleteResponse))
359 for status
in urlDeleteResponse.statuses:
363 self.
exitCode = APP_CONSTS.EXIT_FAILURE
374 if batchItem.properties
is not None and propName
in batchItem.properties:
375 ret = batchItem.properties[propName]
376 elif batchItem.siteObj
is not None and batchItem.siteObj.properties
is not None:
377 for elem
in batchItem.siteObj.properties:
378 if elem[
"name"] == propName:
string MSG_ERROR_WRONG_CONFIG_FILE_NAME
def saveBatchToFile(self)
string MSG_ERROR_EMPTY_CONFIG_FILE_NAME
string FINALIZER_OPTION_LOG
def __loadLogConfig(self, configName)
def __loadDBTaskConfig(self, configName)
string MSG_ERROR_DELETE_URL
def getBatchFromInput(self)
string MSG_ERROR_READ_LOG_CONFIG
def __loadAppConfig(self, configName)
def selectSiteProperty(self, batchItem, propName)
string FINALIZER_OPTION_DB_TASK_INI
string MSG_ERROR_READ_DB_TASK_CONFIG
string MSG_ERROR_LOAD_APP_CONFIG
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
def getURLContentFromBatch(self)
def deleteURLContent(self)