4 HCE project, Python bindings, Distributed Tasks Manager application. 5 Event objects definitions. 9 @author Oleksii, bgv <developers.hce@gmail.com>, Alexander Vybornyh <alexander.hce.cluster@gmail.com> 10 @link: http://hierarchical-cluster-engine.com/ 11 @copyright: Copyright © 2013-2014 IOIX Ukraine 12 @license: http://hierarchical-cluster-engine.com/license/ 17 import cPickle
as pickle
33 from subprocess
import Popen
34 from subprocess
import PIPE
35 from collections
import namedtuple
36 from cement.core
import foundation
65 APP_NAME =
"processor-task" 67 DC_URLS_DB_NAME =
"dc_urls" 68 DC_URLS_TABLE_PREFIX =
"urls_" 69 DC_SITES_DB_NAME =
"dc_sites" 70 DC_SITES_TABLE_NAME =
"sites" 71 DC_URLS_TABLE_NAME =
"urls" 72 DC_SITES_PROPERTIES_TABLE_NAME =
"sites_properties" 74 MSG_ERROR_PROCESS_BATCH_ITEM =
"Error process batch item " 75 MSG_ERROR_PROCESS_BATCH =
"Error process batch. " 76 MSG_ERROR_LOAD_CONFIG =
"Error loading config file." 77 MSG_ERROR_EMPTY_CONFIG_FILE_NAME =
"Config file name is empty." 78 MSG_ERROR_LOAD_LOG_CONFIG_FILE =
"Error loading logging config file. Exiting." 79 MSG_ERROR_LOAD_URL_DATA =
"Can't load url data: " 80 MSG_ERROR_LOAD_SITE_DATA =
"Error load site data: " 81 MSG_ERROR_READ_SITE_FROM_DB =
"Error read site data from db" 82 MSG_ERROR_PROCESS_TASK =
"Can't process task " 83 MSG_ERROR_SERIALISE_RESULT =
"Error serialize result " 84 MSG_ERROR_GET_SITE_FILE_DB =
"Error get site file db " 85 MSG_ERROR_UPDATE_RECORD =
"Error update record " 86 MSG_ERROR_UPDATE_PROCESSED_URL =
"Error update processed url " 87 MSG_ERROR_UPDATE_URL_CHARSET =
"Error update url charset " 88 MSG_ERROR_GET_RAW_CONTENT_FROM_DB =
"Error get raw content from disk " 89 MSG_ERROR_PROCESS =
"Error process " 90 MSG_ERROR_LOAD_SITE_PROPERTIES =
"Error load site properties " 91 MSG_ERROR_CHECK_SITE =
"Site check is not passed. " 92 MSG_ERROR_LOAD_OPTIONS =
"Error load options. " 93 MSG_ERROR_CONVERT_RAW_CONTENT_CHARSET =
"Cannot convert raw content charset. " 94 MSG_ERROR_UPDATE_SITE_RESOURCES =
"Error update site resources. " 95 MSG_ERROR_EMPTY_BATCH =
"Error read input pickle from stdin." 96 MSG_ERROR_CHECK_CONTENT_HASH =
"Fail to check content hash" 97 MSG_ERROR_CALC_CONTENT_HASH =
"Fail to calc content hash" 98 MSG_ERROR_CHECK_CONTENT_HASH_DUPLICATE =
"Can't check content hash duplicate" 100 MSG_INFO_PROCESSOR_CMD =
"Processor cmd: " 101 MSG_INFO_LOAD_SITE_PROPERTIES =
"Mismatch load site properties " 102 MSG_INFO_PROCESS_BATCH =
"Skipped process batch. " 103 MSG_INFO_PROCESS_BATCH_ITEM =
"Skipped process batch item " 104 MSG_INFO_PROCESSOR_EXIT_CODE =
"Scraper exit_code: " 105 MSG_INFO_PROCESSOR_OUTPUT =
"Scraper output: " 106 MSG_INFO_PROCESSOR_ERROR =
"Scraper err: " 111 ERROR_MASK_NO_ERRORS = 0
112 ERROR_MASK_SITE_OK = 0
114 URLS_OF_MEDIA_CONTENT = 1
115 ENV_PROCESSOR_STORE_PATH =
"ENV_PROCESSOR_STORE_PATH" 117 SCRAPER_RESPONSE_ATTR_NAME =
'scraperResponse' 118 DEFSULT_CHAIN_DELIMITER =
' ' 120 Results = namedtuple(
"Results",
"exit_code, output, err, scraperResponse")
139 foundation.CementApp.__init__(self)
179 foundation.CementApp.setup(self)
186 foundation.CementApp.run(self)
204 self.
logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
213 return url +
'#' + str(counter)
223 if "site" in batchItemDict:
224 for prop
in batchItemDict[
"site"].properties:
225 if prop[
"name"] == propName:
235 batchItemDict[
"template"] =
"" 239 keys = [localProperty[
"name"]
for localProperty
in site.properties]
241 for key
in batchItem.properties.keys():
243 for localProperty
in site.properties:
244 if localProperty[
"name"] == key:
245 self.
logger.debug(
"%s exist in site properties. Rewrite property", key)
247 localProperty[
"value"] = batchItem.properties[key]
250 self.
logger.debug(
"%s don't exist in site properties. Add property", key)
251 site.properties.append({
"name": key,
"value": batchItem.properties[key],
"URLMd5": batchItem.urlId})
255 for localProperty
in site.properties:
259 if localProperty[
"name"] ==
"PROCESS_CTYPES":
260 batchItemDict[
"processCTypes"] = localProperty[
"value"]
261 self.
logger.debug(
"PROCESS_CTYPES: " + str(batchItemDict[
"processCTypes"]))
264 elif localProperty[
"name"] ==
"CONTENT_TYPE_MAP":
265 batchItemDict[
"contentTypeMap"] = localProperty[
"value"]
266 self.
logger.debug(
"CONTENT_TYPE_MAP: " + str(batchItemDict[
"contentTypeMap"]))
269 elif localProperty[
"name"] ==
"TIMEZONE":
270 batchItemDict[
"timezone"] = localProperty[
"value"]
271 self.
logger.debug(
"TIMEZONE: " + str(batchItemDict[
"timezone"]))
274 elif localProperty[
"name"] ==
"TEXT_STATS":
275 batchItemDict[
"textStatus"] = int(localProperty[
"value"])
276 self.
logger.debug(
"TEXT_STATS: " + str(batchItemDict[
"textStatus"]))
279 elif localProperty[
"name"] ==
"TEMPLATE_SOURCE":
280 batchItemDict[
"TEMPLATE_SOURCE"] = localProperty[
"value"]
281 self.
logger.debug(
"TEMPLATE_SOURCE: " + str(batchItemDict[
"TEMPLATE_SOURCE"]))
284 elif localProperty[
"name"] ==
"PROCESSOR_PROPERTIES":
285 batchItemDict[
"processorProperties"] = localProperty[
"value"]
286 self.
logger.debug(
"PROCESSOR_PROPERTIES: " + str(batchItemDict[
"processorProperties"]))
289 elif localProperty[
"name"] ==
"CONTENT_HASH":
290 batchItemDict[
"contentHash"] = localProperty[
"value"]
291 self.
logger.debug(
"CONTENT_HASH: " + str(batchItemDict[
"contentHash"]))
294 elif localProperty[
"name"] ==
"HTML_RECOVER":
295 batchItemDict[
"htmlRecover"] = localProperty[
"value"]
296 self.
logger.debug(
"HTML_RECOVER: " + str(batchItemDict[
"htmlRecover"]))
299 elif localProperty[
"name"] ==
"URL_NORMALIZE_MASK_PROCESSOR":
300 batchItemDict[
"urlNormalizeMaskProcessor"] = localProperty[
"value"]
301 self.
logger.debug(
"URL_NORMALIZE_MASK_PROCESSOR: " + str(batchItemDict[
"urlNormalizeMaskProcessor"]))
304 elif localProperty[
"name"] ==
"template":
305 batchItemDict[
"template"] = localProperty[
"value"]
306 self.
logger.debug(
"Template: " +
varDump(batchItemDict[
"template"]))
309 elif localProperty[
"name"] ==
"PROCESSOR_NAME_REPLACE":
310 batchItemDict[
"PROCESSOR_NAME_REPLACE"] = localProperty[
"value"]
311 self.
logger.debug(
"PROCESSOR_NAME_REPLACE: " + str(batchItemDict[
"PROCESSOR_NAME_REPLACE"]))
314 elif localProperty[
"name"] ==
"PROCESSOR_NAME":
315 batchItemDict[
"processorName"] = localProperty[
"value"]
316 self.
logger.debug(
"PROCESSOR_NAME: " + str(batchItemDict[
"processorName"]))
319 elif localProperty[
"name"] ==
"HTTP_REDIRECT_LINK":
320 batchItemDict[
"HTTP_REDIRECT_LINK"] = localProperty[
"value"]
321 self.
logger.debug(
"HTTP_REDIRECT_LINK: " + str(batchItemDict[
"HTTP_REDIRECT_LINK"]))
326 if "processorProperties" not in batchItemDict:
327 batchItemDict[
"processorProperties"] =
None 328 if "processCTypes" in batchItemDict:
329 self.
logger.debug(
"PROCESS_CTYPES: " + str(batchItemDict[
"processCTypes"]))
330 if "timezone" in batchItemDict:
331 self.
logger.debug(
"TIMEZONE: " + str(batchItemDict[
"timezone"]))
333 batchItemDict[
"timezone"] =
None 334 if "textStatus" in batchItemDict:
335 self.
logger.debug(
"TEXT_STATS: " + str(batchItemDict[
"textStatus"]))
337 if "processorName" in batchItemDict:
338 self.
logger.debug(
"PROCESSOR_NAME: " + str(batchItemDict[
"processorName"]))
340 batchItemDict[
"processorName"] =
None 342 if "contentHash" in batchItemDict:
343 self.
logger.debug(
"CONTENT_HASH: " + str(batchItemDict[
"contentHash"]))
345 batchItemDict[
"contentHash"] =
None 347 batchItemDict[
"template"] = batchItem.properties[
"template"]
348 self.
logger.debug(
">>> Reproduce template for accumulate batchItem")
349 if "template" in batchItemDict:
350 batchItemDict[
"template"] = copy.deepcopy(batchItemDict[
"template"])
352 except ProcessorException, err:
353 ExceptionLog.handler(self.
logger, err, MSG_INFO_LOAD_SITE_PROPERTIES)
355 except Exception
as err:
356 ExceptionLog.handler(self.
logger, err, MSG_INFO_LOAD_SITE_PROPERTIES)
365 cmd = CONSTS.PYTHON_BINARY +
" " + CONSTS.SCRAPER_BINARY +
" " + CONSTS.SCRAPER_CFG
366 if processorName
is None or processorName == CONSTS.PROCESSOR_EMPTY:
367 cmd = CONSTS.PYTHON_BINARY +
" " + CONSTS.SCRAPER_BINARY +
" " + CONSTS.SCRAPER_CFG
368 elif processorName == CONSTS.PROCESSOR_STORE:
369 cmd = CONSTS.PYTHON_BINARY +
" " + CONSTS.STORE_PROCESSOR_BINARY +
" " + CONSTS.STORE_PROCESSOR_CFG
370 elif processorName == CONSTS.PROCESSOR_FEED_PARSER:
372 cmd = CONSTS.PYTHON_BINARY +
" " + CONSTS.PROCESSOR_FEED_PARSER_BINARY +
" " + CONSTS.PROCESSOR_FEED_PARSER_CFG
373 elif processorName == CONSTS.PROCESSOR_SCRAPER_MULTI_ITEMS:
374 cmd = CONSTS.PYTHON_BINARY +
" " + CONSTS.SCRAPER_MULTI_ITEMS_BINARY +
" " + CONSTS.SCRAPER_MULTI_ITEMS_CFG
375 elif processorName == CONSTS.PROCESSOR_SCRAPER_CUSTOM:
376 cmd = CONSTS.PYTHON_BINARY +
" " + CONSTS.SCRAPER_CUSTOM_BINARY +
" " + CONSTS.SCRAPER_CUSTOM_CFG
377 self.
logger.debug(MSG_INFO_PROCESSOR_CMD + cmd)
380 if processorName == CONSTS.PROCESSOR_STORE:
381 cmd[
"AppClass"] = CONSTS.STORE_APP_CLASS_NAME
382 cmd[
"AppConfig"] = CONSTS.STORE_APP_CLASS_CFG
383 elif processorName == CONSTS.PROCESSOR_FEED_PARSER:
384 cmd[
"AppClass"] = CONSTS.PROCESSOR_FEED_PARSER_CLASS_NAME
385 cmd[
"AppConfig"] = CONSTS.PROCESSOR_FEED_PARSER_CLASS_CFG
386 elif processorName == CONSTS.PROCESSOR_SCRAPER_MULTI_ITEMS:
387 cmd[
"AppClass"] = CONSTS.SCRAPER_MULTI_ITEMS_APP_CLASS_NAME
388 cmd[
"AppConfig"] = CONSTS.SCRAPER_MULTI_ITEMS_APP_CLASS_CFG
389 elif processorName == CONSTS.PROCESSOR_SCRAPER_CUSTOM:
390 cmd[
"AppClass"] = CONSTS.SCRAPER_CUSTOM_JSON_APP_CLASS_NAME
391 cmd[
"AppConfig"] = CONSTS.SCRAPER_CUSTOM_JSON_APP_CLASS_CFG
393 cmd[
"AppClass"] = CONSTS.SCRAPER_APP_CLASS_NAME
394 cmd[
"AppConfig"] = CONSTS.SCRAPER_APP_CLASS_CFG
402 if self.
input_batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER
and "response" in template:
404 ret =
encode(localResponse)
407 if scraperResponse
is not None and scraperResponse.processedContent
is not None:
408 if "data" in scraperResponse.processedContent[
"default"].data
and \
409 "tagList" in scraperResponse.processedContent[
"default"].data[
"data"]
and \
410 len(scraperResponse.processedContent[
"default"].data[
"data"][
"tagList"]) > 0:
411 tagList = scraperResponse.processedContent[
"default"].data[
"data"][
"tagList"][0]
414 for index, tag
in enumerate(tagList):
416 if "name" in tag
and tag[
"name"] ==
"content_encoded" and "data" in tag
and \
417 "output_format" in template
and "name" in template[
"output_format"]
and \
418 template[
"output_format"][
"name"] ==
'json':
419 for elem
in tag[
"data"]:
420 tagList[index][
"data"] = str(elem)
423 scraperResponse.processedContent[
"default"].get()
424 resultDict[
"default"] = scraperResponse.processedContent[
"default"].data
425 resultDict[
"internal"] = []
426 resultDict[
"custom"] = []
428 for content
in scraperResponse.processedContent[
"internal"]:
430 buf.append(content.data)
431 resultDict[
"internal"] = buf
432 resultDict[
"custom"] = scraperResponse.processedContent[
"custom"]
433 ret =
encode(json.dumps(resultDict, ensure_ascii=
False))
444 self.
logger.debug(
"!!! siteObj.maxURLs = " + str(siteObj.maxURLs))
445 self.
logger.debug(
"!!! siteObj.maxResources = " + str(siteObj.maxResources))
446 self.
logger.debug(
"!!! siteObj.maxErrors = " + str(siteObj.maxErrors))
447 self.
logger.debug(
"!!! siteObj.maxResourceSize = " + str(siteObj.maxResourceSize))
449 if siteObj.maxURLs > 0
and siteObj.maxURLs < len(accumulatedBatchItems):
450 accumulatedBatchItems[-1].urlObj.errorMask |= APP_CONSTS.ERROR_MAX_ITEMS
451 self.
logger.debug(
"Max URLs is reached! Urls count= %s. Site maxURLs: %s ", len(accumulatedBatchItems),
455 if siteObj.maxResources > 0
and siteObj.maxResources < len(accumulatedBatchItems):
456 accumulatedBatchItems[-1].urlObj.errorMask |= APP_CONSTS.ERROR_MASK_SITE_MAX_RESOURCES_SIZE
457 self.
logger.debug(
"Max resources size is reached! resources count= %s. Site maxResources: %s ",
458 len(accumulatedBatchItems), siteObj.maxResources)
462 for batchItem
in accumulatedBatchItems:
463 self.
logger.debug(
"!!! batchItem.urlObj.errorMask = " + str(batchItem.urlObj.errorMask))
464 if batchItem.urlObj.errorMask != APP_CONSTS.ERROR_OK:
467 if 'data' in batchItem.urlObj.urlPut.putDict:
468 resourcesSize = len(batchItem.urlObj.urlPut.putDict[
'data'])
469 self.
logger.debug(
"Resource size = " + str(resourcesSize))
471 if siteObj.maxResourceSize > 0
and siteObj.maxResourceSize <= resourcesSize:
472 accumulatedBatchItems[-1].urlObj.errorMask |= APP_CONSTS.ERROR_RESPONSE_SIZE_ERROR
473 self.
logger.debug(
"Max resource size is reached! resource size = %s. Site maxResourceSize: %s ",
474 resourcesSize, siteObj.maxResourceSize)
478 if siteObj.maxErrors > 0
and siteObj.maxErrors <= errors:
479 accumulatedBatchItems[-1].urlObj.errorMask |= APP_CONSTS.ERROR_SITE_MAX_ERRORS
480 self.
logger.debug(
"Max errors is reached! Errors count= %s. Site maxErrors: %s ",
481 errors, siteObj.maxErrors)
495 scraperResponse =
None 496 accumulatedBatchItems = []
498 if isinstance(scraperOutputData, list):
499 if len(scraperOutputData) > 0:
500 scraperResponse = scraperOutputData[0]
504 if len(scraperOutputData) > 1:
505 for scraperOutputItem
in scraperOutputData[1:]:
507 localBatchItem = copy.deepcopy(batchItem)
508 localBatchItem.urlContentResponse = dc_event.URLContentResponse(
None,
None, [scraperOutputItem])
511 len(accumulatedBatchItems) + 1)
513 localBatchItem.urlObj.urlMd5 = hashlib.md5(localBatchItem.urlObj.url).hexdigest()
515 localBatchItem.urlObj.parentMd5 = batchItem.urlObj.urlMd5
516 localBatchItem.urlObj.status = dc_event.URL.STATUS_PROCESSED
517 localBatchItem.urlObj.crawled = 1
518 localBatchItem.urlObj.processed += 1
519 localBatchItem.urlObj.type = dc_event.URL.TYPE_SINGLE
520 localBatchItem.urlObj.errorMask = scraperOutputItem.errorMask
521 localBatchItem.urlObj.CDate = localBatchItem.urlObj.UDate = localBatchItem.urlObj.tcDate = \
522 datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M:%S")
524 localBatchItem.urlId = localBatchItem.urlObj.urlMd5
527 self.
logger.debug(
'localBatchItem.properties["template"]: ' +
varDump(localBatchItem.properties[
"template"]))
529 if self.
localTemplate is not None and localBatchItem.properties
is not None\
530 and "template" in localBatchItem.properties
and "templates" in localBatchItem.properties[
"template"]:
531 localBatchItem.properties[
"template"] = {}
532 localBatchItem.properties[
"template"][
"templates"] = []
533 localBatchItem.properties[
"template"][
"templates"].append(self.
localTemplate)
534 elif self.
localTemplate is not None and "template" in localBatchItem.properties:
535 self.
logger.debug(
'localBatchItem.properties["template"]: ' + \
536 varDump(localBatchItem.properties[
"template"]))
538 accumulatedBatchItems.append(localBatchItem)
541 self.
logger.debug(
"!!! Not allowed site limits. len(accumulatedBatchItems) = " + \
542 str(len(accumulatedBatchItems)))
545 self.
logger.debug(
'---> 1 ----')
548 scraperResponse = scraperOutputData
549 self.
logger.debug(
'---> 2 ----')
553 return scraperResponse, accumulatedBatchItems
561 def process(self, scraperInputObject, batchItem, batchItemDict):
564 time.sleep(batchItemDict[
"url"].processingDelay / 1000.0)
567 self.
logger.debug(
'batchItemDict["processorName"]: ' +
varDump(batchItemDict[
"processorName"]))
574 scraperResponse =
None 576 inputPickledObject = pickle.dumps(scraperInputObject)
577 Utils.storePickleOnDisk(inputPickledObject, ENV_PROCESSOR_STORE_PATH,
"processor.out." + batchItem.urlId)
578 self.
logger.debug(
"The process Popen() algorithms usage model")
579 self.
logger.debug(
"Popen: %s", str(cmd))
580 process = Popen(cmd, stdout=PIPE, stdin=PIPE, stderr=PIPE, shell=
True, close_fds=
True)
581 self.
logger.debug(
"process.communicate(), len(input_pickled_object)=" + str(len(inputPickledObject)))
582 (output, err) = process.communicate(input=inputPickledObject)
583 self.
logger.debug(
"Process std_error=: %s", str(err))
584 self.
logger.debug(
"Process output len=:" + str(len(output)))
585 exitCode = process.wait()
586 self.
logger.debug(
"Process response exitCode: %s", str(exitCode))
587 if exitCode == EXIT_FAILURE:
591 scraperOutputData = pickle.loads(output)
595 batchItemDict[
"site"])
598 self.
logger.debug(
"The module import algorithms usage model")
600 self.
logger.debug(
"Initialize algorithm module and class instantiate")
602 self.
logger.debug(
"importlib.import_module(dc_processor." + cmd[
"AppClass"] +
")")
603 self.
algorithmsModule = importlib.import_module(
"dc_processor." + cmd[
"AppClass"])
604 self.
logger.debug(
"Module dc_processor." + cmd[
"AppClass"] +
" imported")
609 self.
logger, scraperInputObject)
611 except Exception
as err:
612 raise ProcessorException(
"Module initialization failed: " + str(err) +
"\n" + Utils.getTracebackInfo())
625 batchItemDict[
"site"])
628 output = pickle.dumps(scraperResponse)
629 except Exception
as err:
630 raise ProcessorException(
"Algorithm module has failed: " + str(err) +
"\n" + Utils.getTracebackInfo())
632 self.
logger.info(MSG_INFO_PROCESSOR_EXIT_CODE + str(exitCode))
634 if scraperResponse
is not None:
637 batchItemDict[
"errorMask"] |= scraperResponse.errorMask
639 except Exception
as err:
640 ExceptionLog.handler(self.
logger, err, MSG_ERROR_PROCESS)
641 raise ProcessorException(MSG_ERROR_PROCESS +
" : " + str(err) +
"\n" + Utils.getTracebackInfo())
643 return Results(exitCode, output, err, scraperResponse)
656 if batchItem.urlObj.urlPut
is not None:
657 ret = batchItem.urlObj.urlPut.putDict[
"data"]
658 if batchItem.urlObj.urlPut.contentType == dc_event.Content.CONTENT_RAW_CONTENT:
659 ret = base64.b64decode(ret)
661 if "htmlRecover" in batchItemDict
and batchItemDict[
"htmlRecover"]
is not None and \
662 batchItemDict[
"htmlRecover"] ==
"1":
663 urlContentObj = dc_event.URLContentRequest(batchItem.siteId, batchItem.urlObj.url,
664 dc_event.URLContentRequest.CONTENT_TYPE_RAW_LAST + \
665 dc_event.URLContentRequest.CONTENT_TYPE_TIDY)
666 urlContentObj.urlMd5 = batchItem.urlObj.urlMd5
667 ret = self.
wrapper.urlContent([urlContentObj])
668 if len(ret[0].rawContents) > 0:
669 self.
logger.debug(
">>> YES tidy on disk")
671 self.
logger.debug(
">>> NO tidy on disk")
673 urlContentObj = dc_event.URLContentRequest(batchItem.siteId, batchItem.urlObj.url,
674 dc_event.URLContentRequest.CONTENT_TYPE_RAW_LAST + \
675 dc_event.URLContentRequest.CONTENT_TYPE_RAW)
676 urlContentObj.urlMd5 = batchItem.urlObj.urlMd5
677 ret = self.
wrapper.urlContent([urlContentObj])
679 if ret
is not None and len(ret) > 0
and ret[0].rawContents
is not None and len(ret[0].rawContents) > 0:
681 putDict[
"id"] = batchItem.urlId
682 putDict[
"data"] = ret[0].rawContents[0].buffer
683 putDict[
"cDate"] = datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M:%S")
684 batchItem.urlObj.urlPut = dc_event.URLPut(batchItem.siteId, batchItem.urlId,
685 dc_event.Content.CONTENT_RAW_CONTENT, putDict)
686 ret = base64.b64decode(ret[0].rawContents[0].buffer)
687 self.
logger.debug(
"Some raw content size %s on disk", str(len(ret)))
690 self.
logger.debug(
"NO raw content on disk, raw_content: %s", str(ret))
692 except Exception
as err:
693 ExceptionLog.handler(self.
logger, err, MSG_ERROR_GET_RAW_CONTENT_FROM_DB, (err))
694 batchItemDict[
"errorMask"] |= APP_CONSTS.ERROR_MASK_MISSED_RAW_CONTENT_ON_DISK
704 if 'charset' in batchItemDict:
705 self.
logger.debug(
"Charset incoming is: %s", batchItemDict[
"charset"])
707 self.
logger.debug(
"Charset not defined in batchItemDict!")
708 batchItemDict[
"charset"] = icu.CharsetDetector(batchItemDict[
"rawContent"]).detect().getName()
709 self.
logger.debug(
"Charset detected with icu is: %s", batchItemDict[
"charset"])
710 if batchItemDict[
"charset"] !=
'utf-8':
711 self.
logger.debug(
"Content charset decode ignore errors, incoming len: %s",
712 str(len(batchItemDict[
"rawContent"])))
713 batchItemDict[
"rawContent"] = batchItemDict[
"rawContent"].
decode(
'utf-8',
'ignore')
714 self.
logger.debug(
"Content after decoding len: %s", str(len(batchItemDict[
"rawContent"])))
715 except Exception, err:
716 ExceptionLog.handler(self.
logger, err, MSG_ERROR_CONVERT_RAW_CONTENT_CHARSET, (), \
717 {ExceptionLog.LEVEL_NAME_ERROR:ExceptionLog.LEVEL_VALUE_DEBUG})
718 batchItemDict[
"errorMask"] |= APP_CONSTS. ERROR_BAD_ENCODING
720 batchItemDict[
"charset"] =
'utf-8' 728 urlUpdateObj = dc_event.URLUpdate(batchItem.siteId, batchItem.urlId, dc_event.URLStatus.URL_TYPE_MD5, \
731 urlUpdateObj.charset = charset
732 self.
wrapper.urlUpdate(urlUpdateObj)
733 except Exception
as err:
734 ExceptionLog.handler(self.
logger, err, MSG_ERROR_UPDATE_URL_CHARSET)
750 state = dc_event.URL.STATE_ENABLED
751 processedTime = int((time.time() - batchItemDict[
"processedTime"]) * 1000)
752 if "contentURLMd5" not in batchItemDict:
753 batchItemDict[
"contentURLMd5"] =
"" 755 if "scraperResponse" in batchItemDict
and batchItemDict[
"scraperResponse"]
is not None and \
756 len(batchItemDict[
"scraperResponse"]) > 0
and batchItemDict[
"scraperResponse"][0]
is not None:
757 tagsMask = batchItemDict[
"scraperResponse"][0].tagsMask
or 0
758 tagsCount = batchItemDict[
"scraperResponse"][0].tagsCount
or 0
759 pubdate = batchItemDict[
"scraperResponse"][0].pubdate
or None 761 self.
logger.debug(
'pubdate from scraper response: ' + str(pubdate))
762 if pubdate
is not None and isinstance(pubdate, basestring):
763 pubdate = DateTimeType.parse(pubdate)
764 if pubdate
is not None:
765 pubdate = pubdate.strftime(
"%Y-%m-%d %H:%M:%S")
771 self.
logger.debug(
'>>> updateProcessedURL pubdate = ' + str(pubdate) +
' type: ' + str(
type(pubdate)))
775 batchItem.urlObj.status = dc_event.URL.STATUS_PROCESSED
776 batchItem.urlObj.state = state
777 batchItem.urlObj.errorMask = batchItemDict[
"errorMask"]
778 batchItem.urlObj.tagsMask = tagsMask
779 batchItem.urlObj.tagsCount = tagsCount
780 batchItem.urlObj.totalTime =
SQLExpression((
"`TotalTime` + %s" % str(processedTime)))
781 batchItem.urlObj.processingTime = processedTime
782 batchItem.urlObj.pDate = pubdate
783 batchItem.urlObj.UDate = datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M:%S")
784 batchItem.urlObj.tcDate = datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M:%S")
785 batchItem.urlObj.contentURLMd5 = batchItemDict[
"contentURLMd5"]
786 batchItem.urlObj.processed += 1
788 urlUpdateObj = dc_event.URLUpdate(batchItem.siteId, batchItem.urlId, dc_event.URLStatus.URL_TYPE_MD5, \
790 urlUpdateObj.status = batchItem.urlObj.status
791 urlUpdateObj.state = batchItem.urlObj.state
792 urlUpdateObj.errorMask =
SQLExpression(
"`ErrorMask` + %s" % str(batchItem.urlObj.errorMask))
793 urlUpdateObj.tagsMask = batchItem.urlObj.tagsMask
794 urlUpdateObj.tagsCount = batchItem.urlObj.tagsCount
795 urlUpdateObj.totalTime =
SQLExpression((
"`TotalTime` + %s" % str(batchItem.urlObj.processingTime)))
796 urlUpdateObj.processingTime = batchItem.urlObj.processingTime
797 urlUpdateObj.pDate = batchItem.urlObj.pDate
800 urlUpdateObj.contentURLMd5 = batchItem.urlObj.contentURLMd5
802 urlUpdateObj.crawled = batchItem.urlObj.crawled
804 urlUpdateList.append(urlUpdateObj)
805 self.
logger.debug(
'>>>> urlUpdateList: ' +
varDump(urlUpdateList))
807 if (
"processorName" in batchItemDict)
and (batchItemDict[
"processorName"] == CONSTS.PROCESSOR_FEED_PARSER)
and \
808 (dc_event.BatchItem.PROP_FEED
in batchItem.properties):
809 for url, value
in batchItem.properties[dc_event.BatchItem.PROP_FEED].
items():
811 urlUpdateObjLocal = copy.deepcopy(urlUpdateObj)
812 urlUpdateObjLocal.urlMd5 = value[
"urlMd5"]
813 urlUpdateObjLocal.errorMask = batchItemDict[
"errorMask"]
814 urlUpdateList.append(urlUpdateObjLocal)
817 changedFieldsDict = FieldsSQLExpressionEvaluator.execute(batchItem.properties, self.
wrapper,
None,
818 batchItem.urlObj, self.
logger,
819 APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR)
821 if isinstance(changedFieldsDict, dict):
822 for name, value
in changedFieldsDict.items():
823 if hasattr(urlUpdateObj, name):
824 setattr(urlUpdateObj, name, value)
826 result = self.
wrapper.urlUpdate(urlUpdateList)
828 except Exception
as err:
829 ExceptionLog.handler(self.
logger, err, MSG_ERROR_UPDATE_PROCESSED_URL)
839 if "contentHash" in batchItemDict
and batchItemDict[
"contentHash"]
is not None \
840 and "scraperResponse" in batchItemDict
and len(batchItemDict[
"scraperResponse"]) > 0:
841 self.
logger.debug(
">>> Site has content hash rule: %s", str(batchItemDict[
"contentHash"]))
842 localContentHash = json.loads(batchItemDict[
"contentHash"])
844 listHashedTags = localContentHash[
"tags"].split(
",")
845 batchItemDict[
"contentURLMd5Algorithm"] = localContentHash[
"algorithm"]
846 self.
logger.debug(
">>> List hashed tags: %s", str(listHashedTags))
848 ret = ContentHashCalculator.hashCalculate(localBuf, int(batchItemDict[
"contentURLMd5Algorithm"]))
850 self.
logger.debug(
">>> Site hasn't content hash rule")
851 except ProcessorException
as err:
852 ExceptionLog.handler(self.
logger, err, MSG_ERROR_CALC_CONTENT_HASH, (), \
853 {ExceptionLog.LEVEL_NAME_ERROR:ExceptionLog.LEVEL_VALUE_DEBUG})
864 if len(scraperResponse.processedContent[
"default"].data[
"data"][
"tagList"]) > 0:
865 tagsList = scraperResponse.processedContent[
"default"].data[
"data"][
"tagList"][0]
867 if tag[
"name"]
in listHashedTags:
868 self.
logger.debug(
">>> Tag name added to hash: %s", str(tag[
"name"]))
870 if isinstance(tag[
"data"], basestring):
873 elif isinstance(tag[
"data"], list):
874 for elem
in tag[
"data"]:
878 self.
logger.debug(
">>> Tag: %s not added to hash", tag[
"name"])
886 if "processorProperties" in batchItemDict:
888 localJson = json.loads(batchItemDict[
"processorProperties"])
889 for elem
in siteProperties:
890 if elem[
"name"] ==
"HTTP_HEADERS" and "EXTRACTOR_USER_AGENT" not in localJson:
892 localHeaders = json.loads(elem[
"value"])
893 for key
in localHeaders:
894 if key.lower() ==
"useragent":
895 localJson[
"EXTRACTOR_USER_AGENT"] = localHeaders[key]
898 self.
logger.debug(
">>> Bad json value in the siteProperties[\"HTTP_HEADERS\"]")
899 elif elem[
"name"] ==
"SCRAPER_METRICS" and "metrics" not in localJson:
900 localJson[
"metrics"] = elem[
"value"]
901 elif elem[
"name"] ==
"SCRAPER_SCRAPY_PRECONFIGURED" and "SCRAPER_SCRAPY_PRECONFIGURED" not in localJson:
902 localJson[
"SCRAPER_SCRAPY_PRECONFIGURED"] = elem[
"value"]
903 if "url" in batchItemDict
and batchItemDict[
"url"]
is not None and "parentMd5" not in localJson:
904 localJson[
"parentMd5"] = batchItemDict[
"url"].parentMd5
905 batchItemDict[
"processorProperties"] = json.dumps(localJson)
906 except Exception
as err:
907 self.
logger.debug(
">>> Something wrong with processorProperties = " + str(err))
915 ret = batchItemDict[
"template"]
917 additionData[
"parentMD5"] = batchItemDict[
"url"].parentMd5
920 None, batchItemDict[
"rawContent"],
921 batchItemDict[
"url"].url)
922 if "templates" in ret:
923 for additionTemplateElem
in additionTemplate:
924 for templateElem
in ret[
"templates"]:
925 if hasattr(additionTemplateElem,
"name")
and hasattr(templateElem,
"name")
and \
926 additionTemplateElem.name == templateElem.name:
927 templateElem = copy.deepcopy(additionTemplateElem)
928 additionTemplateElem =
None 930 if additionTemplateElem
is not None:
931 ret[
"templates"].append(additionTemplateElem)
943 if isinstance(batchItemDict[
"template"], basestring)
and batchItemDict[
"template"] !=
"":
944 batchItemDict[
"template"] = json.loads(batchItemDict[
"template"])
945 if "templates" in batchItemDict[
"template"]:
947 self.
logger.debug(
"NEW template format")
948 if "template" not in batchItem.properties:
949 batchItem.properties[
"template"] = copy.deepcopy(batchItemDict[
"template"])
952 self.
logger.debug(
"OLD template format")
955 if "TEMPLATE_SOURCE" in batchItemDict:
966 if "templates" in ret:
968 for templateElement
in ret[
"templates"]:
970 if "condition" in templateElement:
972 if templateElement[
"condition"][
"type"] == CONSTS.TEMPLATE_CONDITION_TYPE_URL:
973 if "url" in batchItemDict:
974 compareObj = batchItemDict[
"url"]
975 if hasattr(compareObj, templateElement[
"condition"][
"field"]):
976 fieldValue = getattr(compareObj, templateElement[
"condition"][
"field"],
None)
977 if fieldValue
is not None:
979 if re.compile(templateElement[
"condition"][
"pattern"]).match(str(fieldValue))
is not None:
981 except Exception
as excp:
982 self.
logger.debug(
">>> Some wrong with RE. in ret condition; err = " + str(excp))
984 newTemplateList.append(templateElement)
985 ret[
"templates"] = newTemplateList
994 self.
logger.debug(
"Template before convertion: %s",
varDump(batchItemDict[
"template"]))
995 template = copy.copy(batchItemDict[
"template"])
996 batchItemDict[
"template"] = {
"templates":[{
"priority":100,
"mandatory":1,
"is_filled":0,
"tags":template}]}
997 batchItem.properties[
"template"] = batchItemDict[
"template"]
998 self.
logger.debug(
"Template after convertion: %s",
varDump(batchItemDict[
"template"]))
1004 if len(template[
"tags"]) == 0:
1006 for data
in processedContent.data[
"data"][
"tagList"][0]:
1007 ei = re.sub(
"%tag_name%_%extractor_value%",
"%" + data[
"name"] +
"_extractor%", entry)
1008 ei = re.sub(
"%tag_value%",
"%" + data[
"name"] +
"%", ei)
1009 ei = re.sub(
"%tag_name%", data[
"name"], ei)
1014 if len(entries) > 0:
1015 if template[
"output_format"][
"name"] ==
"json":
1021 entries = list(set(entries))
1022 entry = itemDelimiter.join(entries)
1023 if removeTrailingComma
and len(entry) > 0
and entry.endswith(
","):
1031 properties = json.loads(processorProperties)
1032 except Exception, err:
1035 if CONSTS.LANG_PROP_NAME
in properties:
1037 langTagsNames = langDetector.getLangTagsNames()
1042 for data
in processedContent.data[
"data"][
"tagList"][0]:
1044 pattern =
"%" + data[
"name"] +
"%" 1046 if entry.find(pattern) != -1:
1047 for item
in data[
"data"]:
1048 if item
is None or item ==
"":
1050 entry = entry.replace(pattern, item)
1052 if "extractor" in data
and "name" in data:
1053 entry = entry.replace(
"%" + data[
"name"] +
"_extractor" +
"%", str(data[
"extractor"]))
1055 if "xpath" in data
and "name" in data
and data[
"data"]
and len(data[
"data"]) > 0:
1056 xpathData = str(data[
"xpath"])
1057 if template[
"output_format"][
"name"] ==
"json":
1059 xpathData = json.dumps(xpathData).strip(
'"')
1061 entry = entry.replace(
"%" + data[
"name"] +
"_xpath" +
"%", xpathData)
1062 elif "xpath" in data
and "name" in data:
1063 entry = entry.replace(
"%" + data[
"name"] +
"_xpath" +
"%",
"%" + data[
"name"] +
"_xpath" +
"%")
1065 if "lang" in data
and "lang_suffix" in data
and "name" in data:
1066 entry = entry.replace(
"%" + data[
"name"] + data[
"lang_suffix"] +
"%", str(data[
"lang"]))
1068 if "summary_lang" in data
and "lang_suffix" in data:
1069 entry = entry.replace(
"%" + data[
"lang_suffix"] +
"%", str(data[
"summary_lang"]))
1071 for langTagName
in langTagsNames:
1072 if "lang" in data
and "name" in data
and data[
"name"]
in langTagName:
1073 if "%" + langTagName +
"%" in entry:
1074 entry = entry.replace(
"%" + langTagName +
"%", str(data[
"lang"]))
1078 if "time" in processedContent.data:
1079 entry = entry.replace(
"%scraper_time%", str(processedContent.data[
"time"]))
1081 entry = json.dumps({
"default":processedContent.data}, ensure_ascii=
False)
1083 localMetrics = json.dumps(processedContent.metrics)
1084 if len(localMetrics) > 0
and localMetrics[0] ==
'"' or localMetrics[0] ==
'\'':
1085 localMetrics = localMetrics[1:]
1086 if len(localMetrics) > 0
and localMetrics[-1] ==
'"' or localMetrics[-1] ==
'\'':
1087 localMetrics = localMetrics[0:-1]
1088 entry = entry.replace(
"%metrics%", localMetrics)
1095 def mapResponse(self, template, crawlingTime, scraperResponse, processorProperties):
1098 localProcessedContents = []
1099 if scraperResponse
is not None:
1100 localProcessedContents.append(scraperResponse.processedContent[
"default"])
1101 localProcessedContents.extend(scraperResponse.processedContent[
"internal"])
1102 template[
"response"] = []
1105 localResponse = template[
"output_format"][
"header"]
1108 localResponse = localResponse + template[
"output_format"][
"items_header"]
1110 entry = template[
"output_format"][
"item"]
1111 removeTrailingComma = entry.endswith(
",")
1113 for localProcessedContent
in localProcessedContents:
1114 internalEntry = copy.deepcopy(entry)
1115 internalLocalResponse = copy.deepcopy(localResponse)
1116 if localProcessedContent
is not None:
1118 internalEntry, processorProperties)
1120 internalEntry = internalEntry.replace(
"%crawler_time%", str(crawlingTime / 1000.0))
1121 if removeTrailingComma
and len(internalEntry) > 0
and internalEntry.endswith(
","):
1122 internalEntry = internalEntry[:-1]
1125 internalLocalResponse = internalLocalResponse + internalEntry + template[
"output_format"][
"items_footer"]
1127 internalLocalResponse = internalLocalResponse + template[
"output_format"][
"footer"]
1129 template[
"response"].append(internalLocalResponse)
1131 scraperResponse.processedContent[
"custom"].append(internalLocalResponse)
1134 if len(localProcessedContents) == 0:
1135 entry = entry.replace(
"%crawler_time%", str(crawlingTime / 1000.0))
1136 if removeTrailingComma
and len(entry) > 0
and entry.endswith(
","):
1139 if scraperResponse
is not None:
1141 localResponse = localResponse + entry + template[
"output_format"][
"items_footer"]
1143 localResponse = localResponse + template[
"output_format"][
"footer"]
1144 template[
"response"].append(localResponse)
1153 substituteDict = {
"%errors_mask%": str(errorMask)}
1154 for key
in substituteDict:
1155 ret = re.sub(key, substituteDict[key], ret)
1170 fieldsDict = json.loads(buf)
1172 if isinstance(fieldsDict, dict):
1173 fieldsDict[name] = value
1175 ret = json.dumps(fieldsDict)
1177 except Exception, err:
1179 self.
logger.info(Utils.getTracebackInfo())
1186 def reduceResponse(self, processingTamplatesDict, templateSelectType, batchItemDict):
1190 for elem
in processingTamplatesDict:
1191 templeteElem = elem[0]
1192 if "mandatory" in templeteElem
and templeteElem[
"mandatory"] == 1
and "isEmpty" in templeteElem
and \
1193 templeteElem[
"isEmpty"]:
1194 batchItemDict[
"errorMask"] |= APP_CONSTS.ERROR_MANDATORY_TEMPLATE
1197 if "contentsCount" in templeteElem
and "contentsLen" in templeteElem:
1198 if templateSelectType ==
"first_nonempty":
1199 if templeteElem[
"contentsCount"] > maxVal:
1200 maxVal = templeteElem[
"contentsCount"]
1202 elif templeteElem[
"contentsLen"] > maxVal:
1203 maxVal = templeteElem[
"contentsLen"]
1208 batchItemDict[
"template"] = ret[0]
1209 batchItemDict[
"scraperResponse"] = []
1210 batchItemDict[
"scraperResponse"].append(ret[1])
1215 def processTask(self, batchItem, batchItemDict, withoutProcess=False):
1218 if batchItemDict[
"processorProperties"]
is not None:
1219 processorProperties = json.loads(batchItemDict[
"processorProperties"])
1220 if "algorithm" in processorProperties:
1221 algorithm = processorProperties[
"algorithm"]
1222 if "algorithm_name" in algorithm
and algorithm[
"algorithm_name"] ==
"raw-data":
1223 self.
logger.debug(
"raw-data algorithm defined!")
1224 batchItemDict[
"processedContent"] =
encode(batchItemDict[
"rawContent"])
1225 self.
putContent(batchItem, batchItemDict[
"processedContent"], batchItemDict)
1229 if "charset" in batchItemDict:
1231 if "site" in batchItemDict:
1242 checkPriority =
True 1243 for elem
in batchItemDict[
"template"][
"templates"]:
1244 if "priority" not in elem:
1245 checkPriority =
False 1248 batchItemDict[
"template"][
"templates"] = sorted(batchItemDict[
"template"][
"templates"],
1249 key=
lambda template: template[
"priority"], reverse=
True)
1255 if "scraperResponse" not in batchItemDict:
1256 if batchItem.urlContentResponse
is not None and batchItem.urlContentResponse.processedContents
is not None:
1257 batchItemDict[
"scraperResponse"] = batchItem.urlContentResponse.processedContents
1259 batchItemDict[
"scraperResponse"] = []
1261 for self.
localTemplate in batchItemDict[
"template"][
"templates"]:
1264 self.
logger.debug(
">>> Template disable")
1265 batchItemDict[
"scraperResponse"].append(
None)
1269 if not withoutProcess:
1271 scraperInputObject =
ScraperInData(batchItemDict[
"url"].url, batchItem.urlId, batchItem.siteId,
1273 self.
filters, batchItemDict[
"url"].lastModified,
1276 batchItemDict[
"processorProperties"],
1279 result = self.
process(scraperInputObject, batchItem, batchItemDict)
1280 batchItemDict[
"scraperResponse"].append(result.scraperResponse)
1281 except Exception()
as err:
1282 self.
logger.
error(
"Some error in template processing : " + str(err))
1283 batchItemDict[
"scraperResponse"].append(
None)
1286 self.
logger.debug(
"'scraperResponse' found in batchItemDict!!!")
1287 except ProcessorException
as err:
1288 ExceptionLog.handler(self.
logger, err, MSG_ERROR_PROCESS_TASK)
1289 batchItemDict[
"errorMask"] |= APP_CONSTS.ERROR_MASK_SCRAPER_ERROR
1291 except Exception
as err:
1292 ExceptionLog.handler(self.
logger, err, MSG_ERROR_PROCESS_TASK, (err))
1293 batchItemDict[
"errorMask"] |= APP_CONSTS.ERROR_MASK_SCRAPER_ERROR
1298 def putContent(self, batchItem, processedContent, batchItemDict):
1301 batchItem.urlContentResponse = dc_event.URLContentResponse(
None, processedContents=[processedContent])
1306 putDict[
"id"] = batchItem.urlId
1307 putDict[
"data"] = processedContent
1309 urlPut = dc_event.URLPut(batchItem.siteId, batchItem.urlId, dc_event.Content.CONTENT_PROCESSOR_CONTENT, putDict)
1311 if self.
input_batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER
and \
1312 (self.
input_batch.dbMode & dc_event.Batch.DB_MODE_W == 0)
and batchItem.urlPutObj
is None:
1315 putDict[
"properties"] = copy.deepcopy(batchItem.properties)
1316 if "template" in batchItemDict:
1317 if isinstance(batchItemDict[
"template"][
"templates"], types.DictType):
1318 putDict[
"properties"][
"template"][
"templates"] = [copy.deepcopy(batchItemDict[
"template"][
"templates"])]
1319 elif isinstance(batchItemDict[
"template"][
"templates"], types.ListType)
and \
1320 len(batchItemDict[
"template"][
"templates"]) > 0:
1321 putDict[
"properties"][
"template"][
"templates"] = [copy.deepcopy(batchItemDict[
"template"][
"templates"][0])]
1322 if len(putDict[
"properties"][
"template"][
"templates"]) > 0:
1323 putDict[
"properties"][
"template"][
"templates"][0][
"response"] = []
1325 batchItem.urlPutObj = urlPut
1328 drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_PUT, [urlPut])
1330 responseDRCESyncTasksCover = self.
wrapper.
process(drceSyncTasksCoverObj)
1331 if responseDRCESyncTasksCover.eventType == DC_CONSTS.EVENT_TYPES.URL_PUT_RESPONSE:
1332 for obj
in responseDRCESyncTasksCover.eventObject:
1335 self.
logger.
error(
"URL_PUT_RESPONSE >>> Wrong response type")
1336 except DatabaseException, err:
1337 self.
logger.
error(
'PutContent error: ' + str(err))
1338 batchItemDict[
"errorMask"] |= APP_CONSTS.ERROR_DATABASE_ERROR
1348 urlUpdateObj = dc_event.URLUpdate(batchItem.siteId, batchItem.urlId, dc_event.URLStatus.URL_TYPE_MD5, \
1351 urlUpdateObj.status = dc_event.URL.STATUS_PROCESSING
1353 if errorMask
is not None:
1354 urlUpdateObj.errorMask = errorMask
1355 self.
wrapper.urlUpdate(urlUpdateObj)
1356 except Exception
as err:
1357 ExceptionLog.handler(self.
logger, err, MSG_ERROR_LOAD_SITE_DATA)
1366 return bool(site.state == Site.STATE_DISABLED)
1378 if site.maxResources > 0
and site.contents > site.maxResources
and url.processed == 0:
1379 self.
logger.debug(
"Site maxResources number is reached! Site contents is: %s. Site maxResources: %s ",
1380 str(site.contents), str(site.maxResources))
1392 siteStatus = dc_event.SiteStatus(batchItem.siteId)
1393 drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.SITE_STATUS, siteStatus)
1394 responseDRCESyncTasksCover = self.
wrapper.
process(drceSyncTasksCoverObj)
1395 site = responseDRCESyncTasksCover.eventObject
1399 except DatabaseException, err:
1400 ExceptionLog.handler(self.
logger, err, MSG_ERROR_READ_SITE_FROM_DB)
1402 except Exception, err:
1403 ExceptionLog.handler(self.
logger, err, MSG_ERROR_READ_SITE_FROM_DB)
1414 if not len(batchItem.siteId):
1415 batchItem.siteId =
"0" 1417 if site
is not None and batchItem.siteObj
is not None:
1418 site.rewriteFields(batchItem.siteObj)
1420 except Exception
as err:
1421 ExceptionLog.handler(self.
logger, err, MSG_ERROR_LOAD_SITE_DATA)
1432 urlStatus = dc_event.URLStatus(batchItem.siteId, batchItem.urlId)
1433 urlStatus.urlType = dc_event.URLStatus.URL_TYPE_MD5
1434 drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_STATUS, [urlStatus])
1435 responseDRCESyncTasksCover = self.
wrapper.
process(drceSyncTasksCoverObj)
1436 row = responseDRCESyncTasksCover.eventObject
1441 raise ProcessorException(
">>> URLStatus return empty response, urlId=" + batchItem.urlId)
1442 except Exception, err:
1443 ExceptionLog.handler(self.
logger, err, MSG_ERROR_LOAD_URL_DATA, (ret))
1458 for batchItem
in batchItems:
1459 if batchItem.urlObj.chainId
is not None and batchItem.urlObj.chainId > chainIndex:
1460 chainIndex = batchItem.urlObj.chainId + 1
1461 self.
logger.debug(
">>> Started chainId is = " + str(chainIndex))
1462 for batchItem
in batchItems:
1464 ret.append(batchItem)
1465 if batchItem.urlObj.chainId
is None:
1466 urlContent = dc_event.URLContentRequest(batchItem.siteId, batchItem.urlObj.url)
1467 urlContent.contentTypeMask = dc_event.URLContentRequest.CONTENT_TYPE_CHAIN
1468 urlContent.contentTypeMask |= dc_event.URLContentRequest.CONTENT_TYPE_RAW_FIRST
1469 urlContent.urlMd5 = batchItem.urlObj.urlMd5
1470 drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_CONTENT, [urlContent])
1471 responseDRCESyncTasksCover = self.
wrapper.
process(drceSyncTasksCoverObj)
1472 row = responseDRCESyncTasksCover.eventObject
1473 if row
is not None and len(row) > 0
and len(row[0].rawContents) > 0:
1474 self.
logger.debug(
">>> Started chainId , yes chain list")
1475 chainBuf = row[0].rawContents[0].buffer
1476 self.
logger.debug(
">>> buff = " + str(chainBuf))
1478 chainBuf = base64.b64decode(chainBuf)
1480 self.
logger.debug(
">>> chain buf exception")
1482 if chainBuf
is not None:
1483 splitterList = chainBuf.split(
"\n")
1484 self.
logger.debug(
">>> Started chainId , yes chain list len is = " + str(len(splitterList)))
1485 for i
in xrange(0, len(splitterList)):
1486 urlMd5 = splitterList[i].strip()
1487 self.
logger.debug(
">>> extract chain MD5 = " + urlMd5)
1488 urlObjEvent = dc_event.URLStatus(batchItem.siteId, urlMd5)
1489 urlObjEvent.urlType = dc_event.URLStatus.URL_TYPE_MD5
1490 drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_STATUS, [urlObjEvent])
1491 responseDRCESyncTasksCover = self.
wrapper.
process(drceSyncTasksCoverObj)
1492 row = responseDRCESyncTasksCover.eventObject
1493 if row
is not None and len(row) > 0:
1495 batchItem.urlObj.chainId = chainIndex
1496 newBatchitem = copy.deepcopy(batchItem)
1497 newBatchitem.urlId = urlMd5
1498 newBatchitem.urlObj = row[0]
1499 newBatchitem.urlObj.chainId = chainIndex
1500 newBatchitem.urlObj.contentMask = dc_event.URL.CONTENT_STORED_ON_DISK
1501 ret.append(newBatchitem)
1502 self.
logger.debug(
">>> added new chain item, urlMd5 = " + str(urlMd5) +
" ChainId = " + \
1503 str(newBatchitem.urlObj.chainId))
1504 chainIndex += chainIncrement
1512 template[
"contentsCount"] = 0
1513 template[
"contentsLen"] = 0
1514 template[
"isEmpty"] =
True 1515 for data
in scraperResponse.processedContent[
"default"].data[
"data"][
"tagList"][0]:
1516 saveLen = template[
"contentsLen"]
1517 for item
in data[
"data"]:
1518 if item
is not None:
1519 template[
"contentsLen"] += len(item)
1520 if template[
"contentsLen"] > saveLen:
1521 template[
"contentsCount"] += 1
1522 template[
"isEmpty"] =
False 1529 self.
logger.debug(
">>> 1 step")
1531 ret[
"errorMask"] = batchItem.urlObj.errorMask
1532 ret[
"processedTime"] = time.time()
1534 self.
batchSites.update([batchItem.urlObj.siteId])
1535 ret[
"site"] = self.
loadSite(batchItem)
1539 if self.
input_batch.dbMode & dc_event.Batch.DB_MODE_W == 0:
1540 self.
wrapper.affect_db =
False 1541 if batchItem.urlObj
is not None:
1542 ret[
"url"] = batchItem.urlObj
1544 ret[
"url"] = self.
loadURL(batchItem)
1547 self.
logger.debug(
"Save charset '" + str(ret[
"url"].charset) +
"' to dict")
1548 ret[
"charset"] = ret[
"url"].charset
1552 raise ProcessorException(
"Site state is not active! Actual site state is: %s. " % ret[
"site"].state)
1556 ret[
"errorMask"] |= APP_CONSTS.ERROR_MASK_SITE_MAX_RESOURCES_NUMBER
1559 if ret[
"url"].processed != 0
and ret[
"url"].errorMask == ERROR_MASK_NO_ERRORS
and \
1560 ret[
"url"].tagsCount > 0
and ret[
"errorMask"] == ERROR_MASK_NO_ERRORS:
1561 self.
logger.debug(
"Real time crawling. Check reprocessing.")
1562 self.
logger.debug(
"Batch item properties: %s", json.dumps(batchItem.properties))
1564 if (
"PROCESSOR_NAME" in batchItem.properties)
and \
1565 (batchItem.properties[
"PROCESSOR_NAME"] ==
"NONE"):
1566 self.
logger.debug(
"RealTime Crawling: Only crawling mode. Exit.")
1567 ret[
"batchItem"] =
None 1570 self.
logger.debug(
"batchItem.properties: %s", json.dumps(batchItem.properties))
1572 if "PROCESSOR_NAME" in batchItem.properties:
1573 ret[
"processorName"] = batchItem.properties[
"PROCESSOR_NAME"]
1574 elif "processorName" in batchItem.properties:
1575 ret[
"processorName"] = batchItem.properties[
"processorName"]
1577 if (CONSTS.REPROCESS_KEY
in batchItem.properties)
and \
1578 (batchItem.properties[CONSTS.REPROCESS_KEY] == CONSTS.REPROCESS_VALUE_NO):
1579 self.
logger.debug(
"RealTime Crawling: Cashed resource. Resource crawled and errorMask is empty." +
1580 "Don't need to reprocess.")
1581 ret[
"batchItem"] =
None 1584 self.
logger.debug(
"RealTime Crawling: Cashed resource. Resource crawled and errorMask is emppty but " +
1585 "properties reprocess is Yes or empty. Send to reprocessing.")
1588 if batchItem.siteId ==
'0':
1589 self.
logger.debug(
"Check SQLExpression filter for zero site ...")
1591 for key, value
in batchItem.urlObj.__dict__.items():
1592 fields[key.upper()] = value
1595 fields, Filters.OC_SQLE, Filters.STAGE_BEFORE_PROCESSOR,
True):
1596 self.
logger.debug(
"SQLExpression filter for zero site checked - SUCCESS")
1598 self.
logger.debug(
"SQLExpression filter for zero site checked - Fail")
1599 ret[
"errorMask"] |= APP_CONSTS.ERROR_PROCESSOR_FILTERS_BREAK
1600 ret[
"batchItem"] =
None 1604 if len(batchItem.properties.keys()) == 0:
1605 self.
logger.debug(
'>>> property len(batchItem.properties.keys()) == 0')
1608 for localProperty
in ret[
"site"].properties:
1609 batchItem.properties[localProperty[
"name"]] = copy.deepcopy(localProperty[
"value"])
1615 if "urlNormalizeMaskProcessor" in ret:
1616 self.
normMask = int(ret[
"urlNormalizeMaskProcessor"])
1618 if "processCTypes" in ret
and ret[
"url"].contentType
not in ret[
"processCTypes"]:
1619 self.
logger.debug(
'>>>> ret["url"].contentType = ' + str(ret[
"url"].contentType))
1620 self.
logger.debug(
'>>>> ret["processCTypes"] = ' + str(ret[
"processCTypes"]))
1622 isOkContentType =
False 1624 if "contentTypeMap" in ret:
1625 contentTypeMap = json.loads(ret[
"contentTypeMap"])
1626 if ret[
"processCTypes"]
in contentTypeMap:
1627 self.
logger.debug(
'>>>> Found in ret["contentTypeMap"] = ' + str(contentTypeMap))
1629 if ret[
"processCTypes"]
in contentTypeMap:
1630 if ret[
"url"].contentType == contentTypeMap[ret[
"processCTypes"]]:
1631 self.
logger.debug(
'>>>> Good!!!')
1632 isOkContentType =
True 1634 except Exception, err:
1635 self.
logger.debug(
"Fail loads of 'CONTENT_TYPE_MAP': " + str(err))
1637 if not isOkContentType:
1638 ret[
"errorMask"] |= APP_CONSTS.ERROR_MASK_SITE_UNSUPPORTED_CONTENT_TYPE
1639 ret[
"batchItem"] = batchItem
1640 self.
logger.
error(
"url ContentType not matched! url.contentType: '%s', site_properties.PROCESS_CTYPES: '%s'",
1641 str(ret[
"url"].contentType), ret[
"processCTypes"])
1642 if self.
input_batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER
and \
1643 (self.
input_batch.dbMode & dc_event.Batch.DB_MODE_W > 0):
1648 if batchItem.urlObj.contentMask != dc_event.URL.CONTENT_STORED_ON_DISK:
1649 self.
logger.debug(
">>> Content not found on disk. Exit.")
1650 elif batchItem.urlObj.httpCode != 200:
1651 self.
logger.debug(
">>> HTTP Code != 200. Code == " + str(batchItem.urlObj.httpCode) +
". Exit")
1652 if batchItem.urlObj.contentMask != dc_event.URL.CONTENT_STORED_ON_DISK
or batchItem.urlObj.httpCode != 200:
1654 ret[
"batchItem"] =
None 1655 self.
logger.debug(
"Exit. batchItem.urlObj.contentMask = " + str(batchItem.urlObj.contentMask) + \
1656 " batchItem.urlObj.httpCode = " + str(batchItem.urlObj.httpCode))
1662 self.
logger.debug(
"Check filter to 'url' use regular expression ...")
1664 None, Filters.OC_RE, Filters.STAGE_BEFORE_PROCESSOR,
True):
1665 self.
logger.debug(
"Filter to 'url' use regular expression checked - SUCCESS")
1667 self.
logger.debug(
"Filter to 'url' use regular expression checked - Fail")
1668 ret[
"errorMask"] |= APP_CONSTS.ERROR_PROCESSOR_FILTERS_BREAK
1669 ret[
"batchItem"] =
None 1676 if ret[
"rawContent"]
is not None:
1678 self.
logger.debug(
"Check filter to 'raw content' use regular expression (STAGE_BEFORE_PROCESSOR)...")
1680 None, Filters.OC_RE, Filters.STAGE_BEFORE_PROCESSOR,
True):
1681 self.
logger.debug(
"Filter to 'raw content' use regular expression checked - SUCCESS")
1683 self.
logger.debug(
"Filter to 'raw content' use regular expression checked - Fail")
1684 ret[
"errorMask"] |= APP_CONSTS.ERROR_PROCESSOR_FILTERS_BREAK
1685 ret[
"batchItem"] =
None 1705 self.
logger.debug(
"Check filter to 'raw content' use regular expression ('STAGE_AFTER_DOM_PRE')...")
1707 None, Filters.OC_RE, Filters.STAGE_AFTER_DOM_PRE,
True):
1708 self.
logger.debug(
"Filter to 'raw content' use regular expression checked - SUCCESS")
1710 self.
logger.debug(
"Filter to 'raw content' use regular expression checked - Fail")
1711 ret[
"errorMask"] |= APP_CONSTS.ERROR_PROCESSOR_FILTERS_BREAK
1712 ret[
"batchItem"] =
None 1719 if localContentHash
is not None:
1720 ret[
"contentURLMd5"] = localContentHash
1723 if APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR
in batchItem.properties:
1724 self.
logger.debug(
"!!! Found '" + str(APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR) + \
1725 "' in batchItem.properties")
1727 localSiteUpdate = dc_event.SiteUpdate(batchItem.siteId)
1728 for attr
in localSiteUpdate.__dict__:
1729 if hasattr(localSiteUpdate, attr):
1730 setattr(localSiteUpdate, attr,
None)
1732 localSiteUpdate.id = batchItem.siteId
1733 localSiteUpdate.updateType = dc_event.SiteUpdate.UPDATE_TYPE_UPDATE
1736 changedFieldsDict = FieldsSQLExpressionEvaluator.execute(batchItem.properties, self.
wrapper, ret[
"site"],
1738 APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR)
1740 for name, value
in changedFieldsDict.items():
1741 if hasattr(localSiteUpdate, name)
and value
is not None and name
not in [
'CDate',
'UDate',
'tcDate']:
1742 setattr(localSiteUpdate, name, value)
1744 localSiteUpdate.errorMask =
SQLExpression((
"`ErrorMask` | %s" % ret[
"site"].errorMask))
1746 updatedCount = self.
wrapper.siteNewOrUpdate(siteObject=localSiteUpdate, stype=dc_event.SiteUpdate)
1747 self.
logger.debug(
"!!! Use property '" + str(APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR) + \
1748 "' updated " + str(updatedCount) +
" rows.")
1750 except DatabaseException, err:
1751 ExceptionLog.handler(self.
logger, err, MSG_INFO_PROCESS_BATCH_ITEM, (ret))
1752 ret[
"errorMask"] = ret[
"errorMask"] | APP_CONSTS.ERROR_DATABASE_ERROR
1753 except ProcessorException, err:
1754 ExceptionLog.handler(self.
logger, err, MSG_INFO_PROCESS_BATCH_ITEM, (ret))
1755 ret[
"errorMask"] = ret[
"errorMask"] | APP_CONSTS.ERROR_PROCESSOR_BATCH_ITEM_PROCESS
1756 except Exception
as err:
1757 ExceptionLog.handler(self.
logger, err, MSG_INFO_PROCESS_BATCH_ITEM, (ret))
1758 ret[
"errorMask"] = ret[
"errorMask"] | APP_CONSTS.ERROR_PROCESSOR_BATCH_ITEM_PROCESS
1768 if "PROCESSOR_NAME_REPLACE" in batchItemDict:
1769 self.
logger.debug(
">>> PROCESSOR_NAME_REPLACE is; " + batchItemDict[
"PROCESSOR_NAME_REPLACE"])
1771 localJson = json.loads(batchItemDict[
"PROCESSOR_NAME_REPLACE"])
1772 if isinstance(localJson, dict):
1773 for elem
in localJson:
1774 if isinstance(localJson[elem], types.ListType)
and urlContentType
in localJson[elem]:
1775 batchItemDict[
"processorName"] = elem
1776 self.
logger.debug(
"Resolved processor name: " + str(elem))
1778 except Exception
as excp:
1779 self.
logger.debug(
">>> PROCESSOR_NAME_REPLACE bad json;" + str(excp))
1786 if "scraperResponse" in batchItemDict:
1787 for scraperResponse
in batchItemDict[
"scraperResponse"]:
1788 if scraperResponse
is not None:
1789 sortedContents = Metrics.sortElementsByMetric(scraperResponse.processedContent[
"internal"], sortedMetric)
1790 scraperResponse.processedContent[
"internal"] = sortedContents
1791 if len(sortedContents[0]) > 0:
1792 scraperResponse.processedContent[
"default"] = sortedContents[0]
1799 self.
logger.debug(
">>> 2 step")
1800 if "template" in batchItemDict
and "templates" in batchItemDict[
"template"]:
1801 if "scraperResponse" in batchItemDict
and len(batchItemDict[
"scraperResponse"]) > 0:
1802 templateSelectType = \
1803 batchItemDict[
"template"][
"select"]
if "select" in batchItemDict[
"template"]
else "first_nonempty" 1804 processingTemplatesDict = []
1806 self.
logger.debug(
">>> Tmpl Len = " + str(len(batchItemDict[
"template"][
"templates"])))
1807 self.
logger.debug(
">>> Responce Len = " + str(len(batchItemDict[
"scraperResponse"])))
1808 for localTemplate
in batchItemDict[
"template"][
"templates"]:
1809 if "state" in localTemplate
and not bool(int(localTemplate[
"state"])):
1810 self.
logger.debug(
">>> Template disable")
1814 processingTemplatesDict.append([copy.deepcopy(localTemplate), batchItemDict[
"scraperResponse"][i]])
1816 self.
reduceResponse(processingTemplatesDict, templateSelectType, batchItemDict)
1818 self.
logger.debug(
">>> no scraperResponse or scraperResponse is empty")
1820 self.
logger.debug(
">>> wrong !!! empty batchItemDict[\"template\"][\"templates\"]")
1828 def mergeChains(self, chainElem, batchItem, batchItemDict, delimiter=' '):
1829 self.
logger.debug(
">>> mergeChains")
1830 scraperResponseDest = chainElem[
"batchItemDict"][
"scraperResponse"][0]
1831 scraperResponseSrc = batchItemDict[
"scraperResponse"][0]
1832 if "site" in chainElem[
"batchItemDict"]:
1833 sitePropetries = chainElem[
"batchItemDict"][
"site"].properties
1834 self.
logger.debug(
">>> mergeChains sitePropetries = " + str(sitePropetries))
1835 for sitePropElem
in sitePropetries:
1836 if sitePropElem[
"name"] ==
"URL_CHAIN" and sitePropElem[
"value"]
is not None:
1837 urlChainDict = json.loads(sitePropElem[
"value"])
1838 if "tags_name" in urlChainDict:
1839 self.
logger.debug(
">>> mergeChains URL_CHAIN = " + str(urlChainDict))
1841 for srcData
in scraperResponseSrc.processedContent[
"default"].data[
"data"][
"tagList"][0]:
1843 self.
logger.debug(
">>> mergeChains srcData = " + str(srcData[
"name"]))
1844 if urlChainDict[
"tags_name"]
is None or srcData[
"name"]
in urlChainDict[
"tags_name"]:
1845 for destData
in scraperResponseDest.processedContent[
"default"].data[
"data"][
"tagList"][0]:
1846 if srcData[
"name"] == destData[
"name"]:
1848 if "delimiter" in urlChainDict
and urlChainDict[
"delimiter"]
is not None:
1849 localDelimiter = urlChainDict[
"delimiter"]
1851 localDelimiter = DEFSULT_CHAIN_DELIMITER
1852 destData[
"data"][0] += localDelimiter
1853 destData[
"data"][0] += srcData[
"data"][0]
1854 if "extractors" in chainElem
and srcData[
"name"]
in chainElem[
"extractors"]
and \
1855 srcData[
"extractor"]
not in chainElem[
"extractors"][srcData[
"name"]]:
1856 destData[
"extractor"] += delimiter
1857 destData[
"extractor"] += srcData[
"extractor"]
1861 scraperResponseDest.processedContent[
"default"].data[
"data"][
"tagList"][0].\
1862 append(copy.deepcopy(srcData))
1865 if "extractors" not in chainElem:
1866 chainElem[
"extractors"] = {}
1867 if srcData[
"name"]
not in chainElem[
"extractors"]:
1868 chainElem[
"extractors"][srcData[
"name"]] = []
1869 if srcData[
"extractor"]
not in chainElem[
"extractors"][srcData[
"name"]]:
1870 chainElem[
"extractors"][srcData[
"name"]].append(srcData[
"extractor"])
1877 self.
logger.debug(
">>> 3.1 reformatted Chain internal")
1878 for chainElem
in chainDict.values():
1879 scraperResponseDest = chainElem[
"batchItemDict"][
"scraperResponse"][0]
1880 if scraperResponseDest.processedContent[
"default"]
is not None and \
1881 len(scraperResponseDest.processedContent[
"internal"]) > 0:
1882 scraperResponseDest.processedContent[
"internal"] = [scraperResponseDest.processedContent[
"default"]]
1890 self.
logger.debug(
">>> 3 step")
1891 self.
logger.debug(
">>> Chain Id = " + str(batchItem.urlObj.chainId) +
" Md5= " + batchItem.urlObj.urlMd5)
1892 if batchItem.urlObj.chainId
is not None:
1893 batchItemDict[
"dellBatch"] =
True 1894 if batchItem.urlObj.chainId
in chainDict:
1895 self.
logger.debug(
">>> old Chain Id = " + str(batchItem.urlObj.chainId) +
" Md5= " + batchItem.urlObj.urlMd5)
1896 if "scraperResponse" in batchItemDict
and len(batchItemDict[
"scraperResponse"]) > 0
and \
1897 batchItemDict[
"scraperResponse"][0]
is not None:
1898 if "scraperResponse" not in chainDict[batchItem.urlObj.chainId][
"batchItemDict"]
or \
1899 len(chainDict[batchItem.urlObj.chainId][
"batchItemDict"][
"scraperResponse"]) == 0
or \
1900 chainDict[batchItem.urlObj.chainId][
"batchItemDict"][
"scraperResponse"][0]
is None:
1901 chainDict[batchItem.urlObj.chainId][
"batchItemDict"][
"scraperResponse"] = batchItemDict[
"scraperResponse"]
1903 self.
mergeChains(chainDict[batchItem.urlObj.chainId], batchItem, batchItemDict)
1905 self.
logger.debug(
">>> no or empty scraperResponse for current BatchItem")
1907 self.
logger.debug(
">>> new Chain Id = " + str(batchItem.urlObj.chainId) +
" Md5= " + batchItem.urlObj.urlMd5)
1908 chainDict[batchItem.urlObj.chainId] = {
"batchItem": copy.deepcopy(batchItem),
1909 "batchItemDict": copy.deepcopy(batchItemDict)}
1916 self.
logger.debug(
">>> 4 step")
1918 if "template" in batchItemDict:
1920 if "output_format" in batchItemDict[
"template"]:
1921 self.
mapResponse(batchItemDict[
"template"], batchItemDict[
"url"].crawlingTime,
1922 batchItemDict[
"scraperResponse"][0]
if "scraperResponse" in batchItemDict
and \
1923 len(batchItemDict[
"scraperResponse"]) > 0
else None,
1924 batchItemDict[
"processorProperties"])
1926 self.
logger.debug(
">>> wrong no output_format field for batch template")
1929 batchItemDict[
"scraperResponse"][0]
if \
1930 "scraperResponse" in batchItemDict
and \
1931 len(batchItemDict[
"scraperResponse"]) > 0 \
1933 batchItemDict[
"errorMask"])
1935 self.
logger.debug(
">>> wrong !!! empty batchItemDict[\"template\"]")
1936 except Exception, err:
1937 self.
logger.
error(
"Template error: " + str(err) +
"\nSiteId: " + str(batchItem.siteId) + \
1938 "\nurl: " + batchItem.urlObj.url +
"\nurlMD5: " + str(batchItem.urlObj.urlMd5))
1939 self.
logger.debug(Utils.getTracebackInfo())
1940 batchItemDict[
"errorMask"] = batchItemDict[
"errorMask"] | APP_CONSTS.ERROR_TEMPLATE_SOURCE
1947 self.
logger.debug(
">>> 5 step")
1948 if "template" in batchItemDict
and "templates" not in batchItemDict[
"template"]:
1949 batchItemDict[
"template"] = {
"templates": batchItemDict[
"template"]}
1950 if "processedContent" in batchItemDict
and batchItemDict[
"processedContent"]
is not None:
1951 self.
putContent(batchItem, batchItemDict[
"processedContent"], batchItemDict)
1957 localBatchItems = inputItems
1958 batchProcessingData = []
1962 for batchItem
in localBatchItems:
1963 self.
logger.debug(
"!!! urlId: %s, maxExecutionTimeReached = %s", str(batchItem.urlId),
1967 self.
logger.debug(
"Maximum execution time %ss reached, news extraction loop interrupted!",
1969 self.
logger.debug(
"!!! ERROR_MAX_EXECUTION_TIME !!! Set errorMask = %s",
1970 str(APP_CONSTS.ERROR_MAX_EXECUTION_TIME))
1973 if len(batchProcessingData) > 0:
1974 batchProcessingData[-1][
"errorMask"] = APP_CONSTS.ERROR_MAX_EXECUTION_TIME
1978 elem[
"errorMask"] |= APP_CONSTS.ERROR_MAX_EXECUTION_TIME
1979 elem[
"batchItem"] = batchItem
1980 batchProcessingData.append(elem)
1985 for i
in xrange(0, len(batchProcessingData)):
1987 if "batchItem" not in batchProcessingData[i]
or batchProcessingData[i][
"batchItem"]
is not None:
1991 for i
in xrange(0, len(batchProcessingData)):
1992 if "batchItem" not in batchProcessingData[i]
or batchProcessingData[i][
"batchItem"]
is not None:
1994 if sortedMetric
is not None:
1997 for i
in xrange(0, len(batchProcessingData)):
1998 if "batchItem" not in batchProcessingData[i]
or batchProcessingData[i][
"batchItem"]
is not None:
2003 newBatchProcessingData = []
2005 for i
in xrange(0, len(batchProcessingData)):
2006 if "dellBatch" not in batchProcessingData[i]
or not batchProcessingData[i][
"dellBatch"]:
2007 newBatchitems.append(localBatchItems[i])
2008 newBatchProcessingData.append(batchProcessingData[i])
2011 localBatchItems = newBatchitems
2012 batchProcessingData = newBatchProcessingData
2013 for key
in chainDict:
2014 localBatchItems.append(chainDict[key][
"batchItem"])
2015 batchProcessingData.append(chainDict[key][
"batchItemDict"])
2017 for i
in xrange(0, len(batchProcessingData)):
2018 if "batchItem" not in batchProcessingData[i]
or batchProcessingData[i][
"batchItem"]
is not None:
2021 for i
in xrange(0, len(batchProcessingData)):
2024 except Exception, err:
2025 ExceptionLog.handler(self.
logger, err, MSG_INFO_PROCESS_BATCH_ITEM)
2026 if len(localBatchItems) > 0:
2027 localBatchItems[-1].urlObj.errorMask = APP_CONSTS.ERROR_PROCESSOR_BATCH_ITEM_PROCESS
2029 ret = localBatchItems
2038 start_batch_time = time.time()
2041 input_pickled_object = sys.stdin.read()
2042 input_batch = pickle.loads(input_pickled_object)
2047 app.Profiler.messagesList.append(
"Batch.id=" + str(input_batch.id))
2049 Utils.storePickleOnDisk(input_pickled_object, ENV_PROCESSOR_STORE_PATH,
"processor.in." +
2057 self.
logger.debug(
"Set maxExecutionTime = %s, removeUnprocessedItems = %s",
2063 self.
logger.info(
">>> batchItems len = " + str(len(batchItems)))
2067 batchItems = [localItem
for localItem
in batchItems
if localItem
is not None]
2069 self.
logger.debug(
'len(batchItems) = ' + str(len(batchItems)))
2078 self.
logger.debug(
"Truncated scraper responces list because over limit 'maxItems' = " + \
2079 str(self.
input_batch.maxItems) +
" set errorMask = " + str(APP_CONSTS.ERROR_MAX_ITEMS))
2081 if self.
input_batch.crawlerType != dc_event.Batch.TYPE_REAL_TIME_CRAWLER:
2082 self.
logger.debug(
'>>> call putUrlsMultiItems()')
2084 if len(batchItems) > 0:
2085 self.
logger.debug(
'>>> call putRawContentsMultiItems()')
2102 input_batch.items = batchItems
2108 output_pickled_object = pickle.dumps(input_batch)
2109 Utils.storePickleOnDisk(output_pickled_object, ENV_PROCESSOR_STORE_PATH,
"processor.out." + str(input_batch.id))
2110 sys.stdout.write(output_pickled_object)
2116 batch_processing_time = int((time.time() - start_batch_time) * 1000)
2117 self.
logger.debug(
"Batch processing time: %s msec.", str(batch_processing_time))
2118 except ProcessorException, err:
2119 ExceptionLog.handler(self.
logger, err, MSG_INFO_PROCESS_BATCH)
2120 raise Exception(MSG_INFO_PROCESS_BATCH +
" : " + str(err))
2121 except DatabaseException, err:
2122 ExceptionLog.handler(self.
logger, err, MSG_INFO_PROCESS_BATCH)
2123 raise Exception(MSG_INFO_PROCESS_BATCH +
" : " + str(err))
2124 except Exception
as err:
2125 ExceptionLog.handler(self.
logger, err, MSG_INFO_PROCESS_BATCH, (err))
2126 raise Exception(MSG_ERROR_PROCESS_BATCH +
" : " + str(err))
2133 if self.pargs.config:
2135 self.
config.optionxform = str
2136 self.
config.read(self.pargs.config)
2138 raise Exception(MSG_ERROR_EMPTY_CONFIG_FILE_NAME)
2139 except Exception, err:
2140 raise Exception(MSG_ERROR_LOAD_CONFIG +
' ' + str(err))
2148 log_conf_file = self.
config.get(
"Application",
"log")
2149 logging.config.fileConfig(log_conf_file)
2152 except Exception, err:
2153 raise Exception(MSG_ERROR_LOAD_LOG_CONFIG_FILE + str(err))
2165 except Exception
as err:
2168 cfgParser = ConfigParser.ConfigParser()
2171 except Exception, err:
2172 raise Exception(MSG_ERROR_LOAD_OPTIONS + str(err))
2185 def filtersApply(self, localValue, wrapper, siteId, fields=None, opCode=Filters.OC_RE, stage=Filters.STAGE_ALL, \
2188 fValue = Utils.generateReplacementDict()
2193 if localFilters
is None:
2194 localFilters =
Filters(self.
filters, wrapper, siteId, 0, fields, opCode, stage)
2197 self.
logger.debug(
"Filters with stage (" + str(stage) +
") count = " + \
2198 str(localFilters.searchFiltersWithStage(stage)) + \
2199 '\nfields: ' + str(fields))
2201 if localFilters.isExistStage(stage):
2202 self.
logger.debug(
">>> value before filter include = " + localValue[:255] +
' . . . ')
2203 fResult = localFilters.filterAll(stage, fValue, Filters.LOGIC_OR, localValue, 1)
2204 self.
logger.debug(
">>> filter result - " + str(fResult))
2207 for elem
in fResult:
2208 self.
logger.debug(
'elem = ' + str(elem) +
' type: ' + str(
type(elem)))
2214 self.
logger.debug(
">>> value before filter exclude = " + localValue[:255] +
' . . . ')
2215 fResult = localFilters.filterAll(stage, fValue, Filters.LOGIC_OR, localValue, -1)
2216 self.
logger.debug(
">>> filter result - " + str(fResult))
2217 for elem
in fResult:
2218 self.
logger.debug(
'elem = ' + str(elem) +
' type: ' + str(
type(elem)))
2243 for batchItem
in batchItems:
2244 params.append(batchItem.urlObj)
2245 self.
logger.debug(
">>> putUrlsMultiItems url: " + str(batchItem.urlObj.url))
2247 self.
logger.debug(
">>> putUrlsMultiItems params: " +
varDump(params))
2249 result = self.
wrapper.urlNew(params)
2250 self.
logger.debug(
">>> putUrlsMultiItems result: " + str(result))
2251 self.
logger.debug(
">>> bool(result): " + str(bool(result)))
2266 def printContentStatus(rawContentData, rawContentName):
2267 if rawContentData
is not None:
2268 self.
logger.debug(
"Some %s content size %s on disk", rawContentName, str(len(rawContentData)))
2270 self.
logger.debug(
"NO %s content on disk", rawContentName)
2274 dynamicContent =
None 2275 headerContent =
None 2276 requestsContent =
None 2278 urlContentObj = dc_event.URLContentRequest(siteId, url, \
2279 dc_event.URLContentRequest.CONTENT_TYPE_RAW_LAST + \
2280 dc_event.URLContentRequest. CONTENT_TYPE_RAW + \
2281 dc_event.URLContentRequest.CONTENT_TYPE_DYNAMIC + \
2282 dc_event.URLContentRequest.CONTENT_TYPE_HEADERS + \
2283 dc_event.URLContentRequest.CONTENT_TYPE_REQUESTS)
2285 rawContentData = self.
wrapper.urlContent([urlContentObj])
2287 if rawContentData
is not None and len(rawContentData) > 0:
2288 if rawContentData[0].headers
is not None and len(rawContentData[0].headers) > 0
and \
2289 rawContentData[0].headers[0]
is not None:
2290 headerContent = rawContentData[0].headers[0].buffer
2292 if rawContentData[0].requests
is not None and len(rawContentData[0].requests) > 0
and \
2293 rawContentData[0].requests[0]
is not None:
2294 requestsContent = rawContentData[0].requests[0].buffer
2296 if rawContentData[0].rawContents
is not None:
2297 if len(rawContentData[0].rawContents) > 0
and rawContentData[0].rawContents[0]
is not None:
2298 rawContent = rawContentData[0].rawContents[0].buffer
2301 if len(rawContentData[0].rawContents) > 1
and rawContentData[0].rawContents[1]
is not None:
2302 dynamicContent = rawContentData[0].rawContents[1].buffer
2306 printContentStatus(rawContent,
'raw')
2307 printContentStatus(dynamicContent,
'dynamic')
2308 printContentStatus(headerContent,
'header')
2309 printContentStatus(requestsContent,
'requests')
2311 return rawContent, dynamicContent, headerContent, requestsContent
2325 if rawContentData
is not None:
2326 rawContent = rawContentData
2329 for batchItem
in batchItems:
2332 putDict[
"id"] = batchItem.urlObj.urlMd5
2333 putDict[
"data"] = rawContent
2334 putDict[
"cDate"] = datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M:%S")
2335 urlPutList.append(dc_event.URLPut(batchItem.siteId,
2336 batchItem.urlObj.urlMd5,
2340 self.
wrapper.putURLContent(urlPutList)
2351 self.
logger.debug(
">>> putRawContentsMultiItems enter...")
2353 rawContent, dynamicContent, headerContent, requestsContent = self.
getRawContent(siteId, url)
2356 self.
putRawContentOfType(batchItems, dynamicContent, dc_event.Content.CONTENT_DYNAMIC_CONTENT)
2357 self.
putRawContentOfType(batchItems, headerContent, dc_event.Content.CONTENT_HEADERS_CONTENT)
2358 self.
putRawContentOfType(batchItems, requestsContent, dc_event.Content.CONTENT_REQUESTS_CONTENT)
2368 self.
logger.debug(
"Signal %s - timer trapped!", str(signum))
def loadSite(self, batchItem)
def generateDomainUrl(url)
def mergeChains(self, chainElem, batchItem, batchItemDict, delimiter=' ')
def extendTemplateFromSource(self, batchItemDict)
def convertTemplateFormat(self, batchItem, batchItemDict)
def reduceResponse(self, processingTamplatesDict, templateSelectType, batchItemDict)
def processBatchItemChainSelectStep(self, batchItem, batchItemDict, chainDict)
def processBatchItemTemplateSelectStep(self, batchItem, batchItemDict)
def extendProcessorProperties(self, batchItemDict, siteProperties)
def createUniqueMultiItemsUrl(self, url, counter)
def updateURL(self, batchItem, errorMask=None)
def resolveProcessorNameByContentType(self, urlContentType, batchItemDict)
def processBatchItems(self, inputItems)
def processTask(self, batchItem, batchItemDict, withoutProcess=False)
def processBatchItemTemplateFillStep(self, batchItem, batchItemDict)
def mapResponseProcessedContent(self, template, processedContent, removeTrailingComma, entry, processorProperties)
def removeTemplateElementsByCondition(self, template, batchItemDict)
def mapResponse(self, template, crawlingTime, scraperResponse, processorProperties)
def getProcessedContent(self, template, scraperResponse, errorMask)
def process(self, scraperInputObject, batchItem, batchItemDict)
def filtersApply(self, localValue, wrapper, siteId, fields=None, opCode=Filters.OC_RE, stage=Filters.STAGE_ALL, defaultRet=False)
def convertRawContentCharset(self, batchItemDict)
def updateProcessedURL(self, batchItem, batchItemDict)
def isDisabledSite(self, site)
def extendBatchItemsWithChain(self, batchItems)
def parseTemplate(self, batchItem, batchItemDict)
def getPropValueFromSiteProperties(self, batchItemDict, propName)
def stickHashedContents(self, listHashedTags, scraperResponse)
def processBatchItemURLContentStep(self, batchItem, batchItemDict)
def loadSiteProperties(self, site, url, batchItem, batchItemDict)
def putRawContentsMultiItems(self, siteId, url, batchItems)
def getRawContent(self, siteId, url)
def isOverlimitMaxResources(self, site, url)
def mapResponseAdditionSubstitutes(self, buf, errorMask)
def signalHandlerTimer(self, signum, frame)
def updateURLCharset(self, batchItem, charset)
def putUrlsMultiItems(self, batchItems)
def processBatchItemScrapyStep(self, batchItem)
def readScraperOutputData(self, batchItem, scraperOutputData, siteObj)
def processContentHash(self, batchItemDict)
def templateMetricsCalculate(self, template, scraperResponse)
def isAllowedSiteLimits(self, siteObj, accumulatedBatchItems)
def addAdditionalValue(self, buf, name, value)
def readSiteFromDB(self, batchItem)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
def putRawContentOfType(self, batchItems, rawContentData, contentRequestType)
def getRawContentFromFS(self, batchItem, batchItemDict)
def resortProcessedContentsByMetrics(self, batchItemDict, sortedMetric)
def setDefaultInternalForChainContents(self, chainDict)
def loadURL(self, batchItem)
def getProcessorCmd(self, processorName)
def putContent(self, batchItem, processedContent, batchItemDict)
def loadLogConfigFile(self)
def readFilters(self, site)