1528 def processBatchItemScrapyStep(self, batchItem):
1529 self.logger.debug(
">>> 1 step")
1531 ret[
"errorMask"] = batchItem.urlObj.errorMask
1532 ret[
"processedTime"] = time.time()
1534 self.batchSites.update([batchItem.urlObj.siteId])
1535 ret[
"site"] = self.loadSite(batchItem)
1539 if self.input_batch.dbMode & dc_event.Batch.DB_MODE_W == 0:
1540 self.wrapper.affect_db =
False 1541 if batchItem.urlObj
is not None:
1542 ret[
"url"] = batchItem.urlObj
1544 ret[
"url"] = self.loadURL(batchItem)
1547 self.logger.debug(
"Save charset '" + str(ret[
"url"].charset) +
"' to dict")
1548 ret[
"charset"] = ret[
"url"].charset
1551 if self.isDisabledSite(ret[
"site"]):
1552 raise ProcessorException(
"Site state is not active! Actual site state is: %s. " % ret[
"site"].state)
1555 if self.isOverlimitMaxResources(ret[
"site"], ret[
"url"]):
1556 ret[
"errorMask"] |= APP_CONSTS.ERROR_MASK_SITE_MAX_RESOURCES_NUMBER
1559 if ret[
"url"].processed != 0
and ret[
"url"].errorMask == ERROR_MASK_NO_ERRORS
and \
1560 ret[
"url"].tagsCount > 0
and ret[
"errorMask"] == ERROR_MASK_NO_ERRORS:
1561 self.logger.debug(
"Real time crawling. Check reprocessing.")
1562 self.logger.debug(
"Batch item properties: %s", json.dumps(batchItem.properties))
1564 if (
"PROCESSOR_NAME" in batchItem.properties)
and \
1565 (batchItem.properties[
"PROCESSOR_NAME"] ==
"NONE"):
1566 self.logger.debug(
"RealTime Crawling: Only crawling mode. Exit.")
1567 ret[
"batchItem"] =
None 1570 self.logger.debug(
"batchItem.properties: %s", json.dumps(batchItem.properties))
1572 if "PROCESSOR_NAME" in batchItem.properties:
1573 ret[
"processorName"] = batchItem.properties[
"PROCESSOR_NAME"]
1574 elif "processorName" in batchItem.properties:
1575 ret[
"processorName"] = batchItem.properties[
"processorName"]
1577 if (CONSTS.REPROCESS_KEY
in batchItem.properties)
and \
1578 (batchItem.properties[CONSTS.REPROCESS_KEY] == CONSTS.REPROCESS_VALUE_NO):
1579 self.logger.debug(
"RealTime Crawling: Cashed resource. Resource crawled and errorMask is empty." +
1580 "Don't need to reprocess.")
1581 ret[
"batchItem"] =
None 1584 self.logger.debug(
"RealTime Crawling: Cashed resource. Resource crawled and errorMask is emppty but " +
1585 "properties reprocess is Yes or empty. Send to reprocessing.")
1588 if batchItem.siteId ==
'0':
1589 self.logger.debug(
"Check SQLExpression filter for zero site ...")
1591 for key, value
in batchItem.urlObj.__dict__.items():
1592 fields[key.upper()] = value
1594 if self.filtersApply(
'', self.wrapper, batchItem.siteId,
1595 fields, Filters.OC_SQLE, Filters.STAGE_BEFORE_PROCESSOR,
True):
1596 self.logger.debug(
"SQLExpression filter for zero site checked - SUCCESS")
1598 self.logger.debug(
"SQLExpression filter for zero site checked - Fail")
1599 ret[
"errorMask"] |= APP_CONSTS.ERROR_PROCESSOR_FILTERS_BREAK
1600 ret[
"batchItem"] =
None 1604 if len(batchItem.properties.keys()) == 0:
1605 self.logger.debug(
'>>> property len(batchItem.properties.keys()) == 0')
1608 for localProperty
in ret[
"site"].properties:
1609 batchItem.properties[localProperty[
"name"]] = copy.deepcopy(localProperty[
"value"])
1613 self.loadSiteProperties(ret[
"site"], ret[
"url"], batchItem, ret)
1615 if "urlNormalizeMaskProcessor" in ret:
1616 self.normMask = int(ret[
"urlNormalizeMaskProcessor"])
1618 if "processCTypes" in ret
and ret[
"url"].contentType
not in ret[
"processCTypes"]:
1619 self.logger.debug(
'>>>> ret["url"].contentType = ' + str(ret[
"url"].contentType))
1620 self.logger.debug(
'>>>> ret["processCTypes"] = ' + str(ret[
"processCTypes"]))
1622 isOkContentType =
False 1624 if "contentTypeMap" in ret:
1625 contentTypeMap = json.loads(ret[
"contentTypeMap"])
1626 if ret[
"processCTypes"]
in contentTypeMap:
1627 self.logger.debug(
'>>>> Found in ret["contentTypeMap"] = ' + str(contentTypeMap))
1629 if ret[
"processCTypes"]
in contentTypeMap:
1630 if ret[
"url"].contentType == contentTypeMap[ret[
"processCTypes"]]:
1631 self.logger.debug(
'>>>> Good!!!')
1632 isOkContentType =
True 1634 except Exception, err:
1635 self.logger.debug(
"Fail loads of 'CONTENT_TYPE_MAP': " + str(err))
1637 if not isOkContentType:
1638 ret[
"errorMask"] |= APP_CONSTS.ERROR_MASK_SITE_UNSUPPORTED_CONTENT_TYPE
1639 ret[
"batchItem"] = batchItem
1640 self.logger.
error(
"url ContentType not matched! url.contentType: '%s', site_properties.PROCESS_CTYPES: '%s'",
1641 str(ret[
"url"].contentType), ret[
"processCTypes"])
1642 if self.input_batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER
and \
1643 (self.input_batch.dbMode & dc_event.Batch.DB_MODE_W > 0):
1644 self.updateURL(batchItem)
1646 self.resolveProcessorNameByContentType(ret[
"url"].contentType, ret)
1648 if batchItem.urlObj.contentMask != dc_event.URL.CONTENT_STORED_ON_DISK:
1649 self.logger.debug(
">>> Content not found on disk. Exit.")
1650 elif batchItem.urlObj.httpCode != 200:
1651 self.logger.debug(
">>> HTTP Code != 200. Code == " + str(batchItem.urlObj.httpCode) +
". Exit")
1652 if batchItem.urlObj.contentMask != dc_event.URL.CONTENT_STORED_ON_DISK
or batchItem.urlObj.httpCode != 200:
1653 self.updateURL(batchItem)
1654 ret[
"batchItem"] =
None 1655 self.logger.debug(
"Exit. batchItem.urlObj.contentMask = " + str(batchItem.urlObj.contentMask) + \
1656 " batchItem.urlObj.httpCode = " + str(batchItem.urlObj.httpCode))
1658 self.updateURL(batchItem)
1662 self.logger.debug(
"Check filter to 'url' use regular expression ...")
1663 if self.filtersApply(batchItem.urlObj.url, self.wrapper, batchItem.siteId,
1664 None, Filters.OC_RE, Filters.STAGE_BEFORE_PROCESSOR,
True):
1665 self.logger.debug(
"Filter to 'url' use regular expression checked - SUCCESS")
1667 self.logger.debug(
"Filter to 'url' use regular expression checked - Fail")
1668 ret[
"errorMask"] |= APP_CONSTS.ERROR_PROCESSOR_FILTERS_BREAK
1669 ret[
"batchItem"] =
None 1674 ret[
"rawContent"] = self.getRawContentFromFS(batchItem, ret)
1676 if ret[
"rawContent"]
is not None:
1678 self.logger.debug(
"Check filter to 'raw content' use regular expression (STAGE_BEFORE_PROCESSOR)...")
1679 if self.filtersApply(ret[
"rawContent"], self.wrapper, batchItem.siteId,
1680 None, Filters.OC_RE, Filters.STAGE_BEFORE_PROCESSOR,
True):
1681 self.logger.debug(
"Filter to 'raw content' use regular expression checked - SUCCESS")
1683 self.logger.debug(
"Filter to 'raw content' use regular expression checked - Fail")
1684 ret[
"errorMask"] |= APP_CONSTS.ERROR_PROCESSOR_FILTERS_BREAK
1685 ret[
"batchItem"] =
None 1690 self.parseTemplate(batchItem, ret)
1705 self.logger.debug(
"Check filter to 'raw content' use regular expression ('STAGE_AFTER_DOM_PRE')...")
1706 if self.filtersApply(ret[
"rawContent"], self.wrapper, batchItem.siteId, \
1707 None, Filters.OC_RE, Filters.STAGE_AFTER_DOM_PRE,
True):
1708 self.logger.debug(
"Filter to 'raw content' use regular expression checked - SUCCESS")
1710 self.logger.debug(
"Filter to 'raw content' use regular expression checked - Fail")
1711 ret[
"errorMask"] |= APP_CONSTS.ERROR_PROCESSOR_FILTERS_BREAK
1712 ret[
"batchItem"] =
None 1716 self.convertRawContentCharset(ret)
1717 self.processTask(batchItem, ret)
1718 localContentHash = self.processContentHash(ret)
1719 if localContentHash
is not None:
1720 ret[
"contentURLMd5"] = localContentHash
1723 if APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR
in batchItem.properties:
1724 self.logger.debug(
"!!! Found '" + str(APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR) + \
1725 "' in batchItem.properties")
1727 localSiteUpdate = dc_event.SiteUpdate(batchItem.siteId)
1728 for attr
in localSiteUpdate.__dict__:
1729 if hasattr(localSiteUpdate, attr):
1730 setattr(localSiteUpdate, attr,
None)
1732 localSiteUpdate.id = batchItem.siteId
1733 localSiteUpdate.updateType = dc_event.SiteUpdate.UPDATE_TYPE_UPDATE
1736 changedFieldsDict = FieldsSQLExpressionEvaluator.execute(batchItem.properties, self.wrapper, ret[
"site"],
1738 APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR)
1740 for name, value
in changedFieldsDict.items():
1741 if hasattr(localSiteUpdate, name)
and value
is not None and name
not in [
'CDate',
'UDate',
'tcDate']:
1742 setattr(localSiteUpdate, name, value)
1744 localSiteUpdate.errorMask = SQLExpression((
"`ErrorMask` | %s" % ret[
"site"].errorMask))
1746 updatedCount = self.wrapper.siteNewOrUpdate(siteObject=localSiteUpdate, stype=dc_event.SiteUpdate)
1747 self.logger.debug(
"!!! Use property '" + str(APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR) + \
1748 "' updated " + str(updatedCount) +
" rows.")
1750 except DatabaseException, err:
1751 ExceptionLog.handler(self.logger, err, MSG_INFO_PROCESS_BATCH_ITEM, (ret))
1752 ret[
"errorMask"] = ret[
"errorMask"] | APP_CONSTS.ERROR_DATABASE_ERROR
1753 except ProcessorException, err:
1754 ExceptionLog.handler(self.logger, err, MSG_INFO_PROCESS_BATCH_ITEM, (ret))
1755 ret[
"errorMask"] = ret[
"errorMask"] | APP_CONSTS.ERROR_PROCESSOR_BATCH_ITEM_PROCESS
1756 except Exception
as err:
1757 ExceptionLog.handler(self.logger, err, MSG_INFO_PROCESS_BATCH_ITEM, (ret))
1758 ret[
"errorMask"] = ret[
"errorMask"] | APP_CONSTS.ERROR_PROCESSOR_BATCH_ITEM_PROCESS