HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
dc_processor.ProcessorTask.ProcessorTask Class Reference
Inheritance diagram for dc_processor.ProcessorTask.ProcessorTask:
Collaboration diagram for dc_processor.ProcessorTask.ProcessorTask:

Classes

class  Meta
 

Public Member Functions

def __init__ (self)
 
def setup (self)
 
def run (self)
 
def createUniqueMultiItemsUrl (self, url, counter)
 
def getPropValueFromSiteProperties (self, batchItemDict, propName)
 
def loadSiteProperties (self, site, url, batchItem, batchItemDict)
 
def getProcessorCmd (self, processorName)
 
def getProcessedContent (self, template, scraperResponse, errorMask)
 
def isAllowedSiteLimits (self, siteObj, accumulatedBatchItems)
 
def readScraperOutputData (self, batchItem, scraperOutputData, siteObj)
 
def process (self, scraperInputObject, batchItem, batchItemDict)
 
def getRawContentFromFS (self, batchItem, batchItemDict)
 
def convertRawContentCharset (self, batchItemDict)
 
def updateURLCharset (self, batchItem, charset)
 
def readFilters (self, site)
 
def updateProcessedURL (self, batchItem, batchItemDict)
 
def processContentHash (self, batchItemDict)
 
def stickHashedContents (self, listHashedTags, scraperResponse)
 
def extendProcessorProperties (self, batchItemDict, siteProperties)
 
def extendTemplateFromSource (self, batchItemDict)
 
def parseTemplate (self, batchItem, batchItemDict)
 
def removeTemplateElementsByCondition (self, template, batchItemDict)
 
def convertTemplateFormat (self, batchItem, batchItemDict)
 
def mapResponseProcessedContent (self, template, processedContent, removeTrailingComma, entry, processorProperties)
 
def mapResponse (self, template, crawlingTime, scraperResponse, processorProperties)
 
def mapResponseAdditionSubstitutes (self, buf, errorMask)
 
def addAdditionalValue (self, buf, name, value)
 
def reduceResponse (self, processingTamplatesDict, templateSelectType, batchItemDict)
 
def processTask (self, batchItem, batchItemDict, withoutProcess=False)
 
def putContent (self, batchItem, processedContent, batchItemDict)
 
def updateURL (self, batchItem, errorMask=None)
 
def isDisabledSite (self, site)
 
def isOverlimitMaxResources (self, site, url)
 
def readSiteFromDB (self, batchItem)
 
def loadSite (self, batchItem)
 
def loadURL (self, batchItem)
 
def extendBatchItemsWithChain (self, batchItems)
 
def templateMetricsCalculate (self, template, scraperResponse)
 
def processBatchItemScrapyStep (self, batchItem)
 
def resolveProcessorNameByContentType (self, urlContentType, batchItemDict)
 
def resortProcessedContentsByMetrics (self, batchItemDict, sortedMetric)
 
def processBatchItemTemplateSelectStep (self, batchItem, batchItemDict)
 
def mergeChains (self, chainElem, batchItem, batchItemDict, delimiter=' ')
 
def setDefaultInternalForChainContents (self, chainDict)
 
def processBatchItemChainSelectStep (self, batchItem, batchItemDict, chainDict)
 
def processBatchItemTemplateFillStep (self, batchItem, batchItemDict)
 
def processBatchItemURLContentStep (self, batchItem, batchItemDict)
 
def processBatchItems (self, inputItems)
 
def processBatch (self)
 
def loadConfig (self)
 
def loadLogConfigFile (self)
 
def loadOptions (self)
 
def filtersApply (self, localValue, wrapper, siteId, fields=None, opCode=Filters.OC_RE, stage=Filters.STAGE_ALL, defaultRet=False)
 
def getExitCode (self)
 
def putUrlsMultiItems (self, batchItems)
 
def getRawContent (self, siteId, url)
 
def putRawContentOfType (self, batchItems, rawContentData, contentRequestType)
 
def putRawContentsMultiItems (self, siteId, url, batchItems)
 
def signalHandlerTimer (self, signum, frame)
 

Public Attributes

 exit_code
 
 logger
 
 template
 
 raw_content
 
 DBConnector
 
 url
 
 process_time
 
 batchItem
 
 site_table
 
 raw_data_dir
 
 filters
 
 processorName
 
 batchSites
 
 scraper_response
 
 input_batch
 
 db_task_ini
 
 wrapper
 
 htmlRecover
 
 objFilters
 
 hashed_content
 
 algorithmsModel
 
 algorithmsModule
 
 algorithmsClass
 
 accumulatedBatchItems
 
 groupResponses
 
 sourceTemplateExtractor
 
 accumulateProcessing
 
 localTemplate
 
 normMask
 
 maxExecutionTimeReached
 
 maxExecutionTimeValue
 
 removeUnprocessedItems
 
 config
 

Detailed Description

Definition at line 126 of file ProcessorTask.py.

Constructor & Destructor Documentation

◆ __init__()

def dc_processor.ProcessorTask.ProcessorTask.__init__ (   self)

Definition at line 137 of file ProcessorTask.py.

137  def __init__(self):
138  # call base class __init__ method
139  foundation.CementApp.__init__(self)
140  self.exit_code = EXIT_SUCCESS
141  self.logger = None
142  self.template = None
143  self.raw_content = None
144  self.DBConnector = None
145  self.url = None
146  self.process_time = None
147  self.batchItem = None
148  self.site_table = None
149  self.raw_data_dir = None
150  self.filters = None
151  self.processorName = None
152  self.batchSites = set()
153  self.scraper_response = None
154  self.input_batch = None
155  self.db_task_ini = None
156  self.wrapper = None
157  self.htmlRecover = False
158  self.objFilters = {}
159  self.hashed_content = None
160  self.algorithmsModel = APP_CONSTS.APP_USAGE_MODEL_PROCESS
161  self.algorithmsModule = None
162  self.algorithmsClass = None
163  self.accumulatedBatchItems = []
164  self.groupResponses = {}
165  self.sourceTemplateExtractor = None
166  self.accumulateProcessing = False
167  self.localTemplate = None
168  self.normMask = UrlNormalizator.NORM_NONE
169  # for support max execution time
170  self.maxExecutionTimeReached = False
171  self.maxExecutionTimeValue = 0
172  self.removeUnprocessedItems = False
173 
174 
def __init__(self)
constructor
Definition: UIDGenerator.py:19

Member Function Documentation

◆ addAdditionalValue()

def dc_processor.ProcessorTask.ProcessorTask.addAdditionalValue (   self,
  buf,
  name,
  value 
)

Definition at line 1166 of file ProcessorTask.py.

1166  def addAdditionalValue(self, buf, name, value):
1167  # variable for result
1168  ret = buf
1169  try:
1170  fieldsDict = json.loads(buf)
1171 
1172  if isinstance(fieldsDict, dict):
1173  fieldsDict[name] = value
1174 
1175  ret = json.dumps(fieldsDict)
1176 
1177  except Exception, err:
1178  self.logger.error(str(err))
1179  self.logger.info(Utils.getTracebackInfo())
1180 
1181  return ret
1182 
1183 
-mask-info
Here is the caller graph for this function:

◆ convertRawContentCharset()

def dc_processor.ProcessorTask.ProcessorTask.convertRawContentCharset (   self,
  batchItemDict 
)

Definition at line 701 of file ProcessorTask.py.

701  def convertRawContentCharset(self, batchItemDict):
702  try:
703  # self.logger.info("self.raw_content: %s" % str(self.raw_content))
704  if 'charset' in batchItemDict:
705  self.logger.debug("Charset incoming is: %s", batchItemDict["charset"])
706  else:
707  self.logger.debug("Charset not defined in batchItemDict!")
708  batchItemDict["charset"] = icu.CharsetDetector(batchItemDict["rawContent"]).detect().getName() # pylint: disable=E1101
709  self.logger.debug("Charset detected with icu is: %s", batchItemDict["charset"])
710  if batchItemDict["charset"] != 'utf-8':
711  self.logger.debug("Content charset decode ignore errors, incoming len: %s",
712  str(len(batchItemDict["rawContent"])))
713  batchItemDict["rawContent"] = batchItemDict["rawContent"].decode('utf-8', 'ignore')
714  self.logger.debug("Content after decoding len: %s", str(len(batchItemDict["rawContent"])))
715  except Exception, err:
716  ExceptionLog.handler(self.logger, err, MSG_ERROR_CONVERT_RAW_CONTENT_CHARSET, (), \
717  {ExceptionLog.LEVEL_NAME_ERROR:ExceptionLog.LEVEL_VALUE_DEBUG})
718  batchItemDict["errorMask"] |= APP_CONSTS. ERROR_BAD_ENCODING
719  # Crawler must set content encoding as UTF-8 or it is not changed if natural is
720  batchItemDict["charset"] = 'utf-8'
721 
722 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ convertTemplateFormat()

def dc_processor.ProcessorTask.ProcessorTask.convertTemplateFormat (   self,
  batchItem,
  batchItemDict 
)

Definition at line 993 of file ProcessorTask.py.

993  def convertTemplateFormat(self, batchItem, batchItemDict):
994  self.logger.debug("Template before convertion: %s", varDump(batchItemDict["template"]))
995  template = copy.copy(batchItemDict["template"])
996  batchItemDict["template"] = {"templates":[{"priority":100, "mandatory":1, "is_filled":0, "tags":template}]}
997  batchItem.properties["template"] = batchItemDict["template"]
998  self.logger.debug("Template after convertion: %s", varDump(batchItemDict["template"]))
999 
1000 
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:
Here is the caller graph for this function:

◆ createUniqueMultiItemsUrl()

def dc_processor.ProcessorTask.ProcessorTask.createUniqueMultiItemsUrl (   self,
  url,
  counter 
)

Definition at line 212 of file ProcessorTask.py.

212  def createUniqueMultiItemsUrl(self, url, counter):
213  return url + '#' + str(counter)
214 
215 
Here is the caller graph for this function:

◆ extendBatchItemsWithChain()

def dc_processor.ProcessorTask.ProcessorTask.extendBatchItemsWithChain (   self,
  batchItems 
)

Definition at line 1454 of file ProcessorTask.py.

1454  def extendBatchItemsWithChain(self, batchItems):
1455  ret = []
1456  chainIndex = 0
1457  chainIncrement = 0
1458  for batchItem in batchItems:
1459  if batchItem.urlObj.chainId is not None and batchItem.urlObj.chainId > chainIndex:
1460  chainIndex = batchItem.urlObj.chainId + 1
1461  self.logger.debug(">>> Started chainId is = " + str(chainIndex))
1462  for batchItem in batchItems:
1463  chainIncrement = 0
1464  ret.append(batchItem)
1465  if batchItem.urlObj.chainId is None:
1466  urlContent = dc_event.URLContentRequest(batchItem.siteId, batchItem.urlObj.url)
1467  urlContent.contentTypeMask = dc_event.URLContentRequest.CONTENT_TYPE_CHAIN
1468  urlContent.contentTypeMask |= dc_event.URLContentRequest.CONTENT_TYPE_RAW_FIRST
1469  urlContent.urlMd5 = batchItem.urlObj.urlMd5
1470  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_CONTENT, [urlContent])
1471  responseDRCESyncTasksCover = self.wrapper.process(drceSyncTasksCoverObj)
1472  row = responseDRCESyncTasksCover.eventObject
1473  if row is not None and len(row) > 0 and len(row[0].rawContents) > 0:
1474  self.logger.debug(">>> Started chainId , yes chain list")
1475  chainBuf = row[0].rawContents[0].buffer
1476  self.logger.debug(">>> buff = " + str(chainBuf))
1477  try:
1478  chainBuf = base64.b64decode(chainBuf)
1479  except Exception:
1480  self.logger.debug(">>> chain buf exception")
1481  chainBuf = None
1482  if chainBuf is not None:
1483  splitterList = chainBuf.split("\n")
1484  self.logger.debug(">>> Started chainId , yes chain list len is = " + str(len(splitterList)))
1485  for i in xrange(0, len(splitterList)):
1486  urlMd5 = splitterList[i].strip()
1487  self.logger.debug(">>> extract chain MD5 = " + urlMd5)
1488  urlObjEvent = dc_event.URLStatus(batchItem.siteId, urlMd5)
1489  urlObjEvent.urlType = dc_event.URLStatus.URL_TYPE_MD5
1490  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_STATUS, [urlObjEvent])
1491  responseDRCESyncTasksCover = self.wrapper.process(drceSyncTasksCoverObj)
1492  row = responseDRCESyncTasksCover.eventObject
1493  if row is not None and len(row) > 0:
1494  chainIncrement = 1
1495  batchItem.urlObj.chainId = chainIndex
1496  newBatchitem = copy.deepcopy(batchItem)
1497  newBatchitem.urlId = urlMd5
1498  newBatchitem.urlObj = row[0]
1499  newBatchitem.urlObj.chainId = chainIndex
1500  newBatchitem.urlObj.contentMask = dc_event.URL.CONTENT_STORED_ON_DISK
1501  ret.append(newBatchitem)
1502  self.logger.debug(">>> added new chain item, urlMd5 = " + str(urlMd5) + " ChainId = " + \
1503  str(newBatchitem.urlObj.chainId))
1504  chainIndex += chainIncrement
1505  return ret
1506 
1507 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ extendProcessorProperties()

def dc_processor.ProcessorTask.ProcessorTask.extendProcessorProperties (   self,
  batchItemDict,
  siteProperties 
)

Definition at line 885 of file ProcessorTask.py.

885  def extendProcessorProperties(self, batchItemDict, siteProperties):
886  if "processorProperties" in batchItemDict:
887  try:
888  localJson = json.loads(batchItemDict["processorProperties"])
889  for elem in siteProperties:
890  if elem["name"] == "HTTP_HEADERS" and "EXTRACTOR_USER_AGENT" not in localJson:
891  try:
892  localHeaders = json.loads(elem["value"])
893  for key in localHeaders:
894  if key.lower() == "useragent":
895  localJson["EXTRACTOR_USER_AGENT"] = localHeaders[key]
896  break
897  except Exception:
898  self.logger.debug(">>> Bad json value in the siteProperties[\"HTTP_HEADERS\"]")
899  elif elem["name"] == "SCRAPER_METRICS" and "metrics" not in localJson:
900  localJson["metrics"] = elem["value"]
901  elif elem["name"] == "SCRAPER_SCRAPY_PRECONFIGURED" and "SCRAPER_SCRAPY_PRECONFIGURED" not in localJson:
902  localJson["SCRAPER_SCRAPY_PRECONFIGURED"] = elem["value"]
903  if "url" in batchItemDict and batchItemDict["url"] is not None and "parentMd5" not in localJson:
904  localJson["parentMd5"] = batchItemDict["url"].parentMd5
905  batchItemDict["processorProperties"] = json.dumps(localJson)
906  except Exception as err:
907  self.logger.debug(">>> Something wrong with processorProperties = " + str(err))
908 
909 
Here is the caller graph for this function:

◆ extendTemplateFromSource()

def dc_processor.ProcessorTask.ProcessorTask.extendTemplateFromSource (   self,
  batchItemDict 
)

Definition at line 914 of file ProcessorTask.py.

914  def extendTemplateFromSource(self, batchItemDict):
915  ret = batchItemDict["template"]
916  additionData = {}
917  additionData["parentMD5"] = batchItemDict["url"].parentMd5
918  self.sourceTemplateExtractor = SourceTemplateExtractor()
919  additionTemplate = self.sourceTemplateExtractor.loadTemplateFromSource(batchItemDict["TEMPLATE_SOURCE"],
920  None, batchItemDict["rawContent"],
921  batchItemDict["url"].url)
922  if "templates" in ret:
923  for additionTemplateElem in additionTemplate:
924  for templateElem in ret["templates"]:
925  if hasattr(additionTemplateElem, "name") and hasattr(templateElem, "name") and \
926  additionTemplateElem.name == templateElem.name:
927  templateElem = copy.deepcopy(additionTemplateElem)
928  additionTemplateElem = None
929  break
930  if additionTemplateElem is not None:
931  ret["templates"].append(additionTemplateElem)
932  return ret
933 
934 
Here is the caller graph for this function:

◆ filtersApply()

def dc_processor.ProcessorTask.ProcessorTask.filtersApply (   self,
  localValue,
  wrapper,
  siteId,
  fields = None,
  opCode = Filters.OC_RE,
  stage = Filters.STAGE_ALL,
  defaultRet = False 
)

Definition at line 2186 of file ProcessorTask.py.

2186  defaultRet=False):
2187  ret = defaultRet
2188  fValue = Utils.generateReplacementDict()
2189  localFilters = None
2190  if siteId in self.objFilters:
2191  localFilters = self.objFilters[siteId]
2192 
2193  if localFilters is None:
2194  localFilters = Filters(self.filters, wrapper, siteId, 0, fields, opCode, stage)
2195  self.objFilters[siteId] = localFilters
2196 
2197  self.logger.debug("Filters with stage (" + str(stage) + ") count = " + \
2198  str(localFilters.searchFiltersWithStage(stage)) + \
2199  '\nfields: ' + str(fields))
2200 
2201  if localFilters.isExistStage(stage):
2202  self.logger.debug(">>> value before filter include = " + localValue[:255] + ' . . . ')
2203  fResult = localFilters.filterAll(stage, fValue, Filters.LOGIC_OR, localValue, 1)
2204  self.logger.debug(">>> filter result - " + str(fResult))
2205 
2206  ret = False
2207  for elem in fResult:
2208  self.logger.debug('elem = ' + str(elem) + ' type: ' + str(type(elem)))
2209  if elem > 0:
2210  ret = True
2211  break
2212 
2213  if ret is True:
2214  self.logger.debug(">>> value before filter exclude = " + localValue[:255] + ' . . . ')
2215  fResult = localFilters.filterAll(stage, fValue, Filters.LOGIC_OR, localValue, -1)
2216  self.logger.debug(">>> filter result - " + str(fResult))
2217  for elem in fResult:
2218  self.logger.debug('elem = ' + str(elem) + ' type: ' + str(type(elem)))
2219  if elem < 0:
2220  ret = False
2221  break
2222 
2223  return ret
2224 
2225 
Here is the caller graph for this function:

◆ getExitCode()

def dc_processor.ProcessorTask.ProcessorTask.getExitCode (   self)

Definition at line 2231 of file ProcessorTask.py.

2231  def getExitCode(self):
2232  return self.exit_code
2233 
2234 
Here is the caller graph for this function:

◆ getProcessedContent()

def dc_processor.ProcessorTask.ProcessorTask.getProcessedContent (   self,
  template,
  scraperResponse,
  errorMask 
)

Definition at line 401 of file ProcessorTask.py.

401  def getProcessedContent(self, template, scraperResponse, errorMask):
402  if self.input_batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER and "response" in template:
403  localResponse = self.mapResponseAdditionSubstitutes(template["response"][0], errorMask)
404  ret = encode(localResponse)
405  else:
406  resultDict = {}
407  if scraperResponse is not None and scraperResponse.processedContent is not None:
408  if "data" in scraperResponse.processedContent["default"].data and \
409  "tagList" in scraperResponse.processedContent["default"].data["data"] and \
410  len(scraperResponse.processedContent["default"].data["data"]["tagList"]) > 0:
411  tagList = scraperResponse.processedContent["default"].data["data"]["tagList"][0]
412  # self.logger.debug("!!! tagList: %s, type: %s", varDump(tagList), str(type(tagList)))
413 
414  for index, tag in enumerate(tagList):
415  # self.logger.debug("!!!tagList[%s]: %s", str(index), varDump(tag))
416  if "name" in tag and tag["name"] == "content_encoded" and "data" in tag and \
417  "output_format" in template and "name" in template["output_format"] and \
418  template["output_format"]["name"] == 'json':
419  for elem in tag["data"]:
420  tagList[index]["data"] = str(elem)
421  # self.logger.debug("!!! elem: %s", varDump(elem))
422 
423  scraperResponse.processedContent["default"].get()
424  resultDict["default"] = scraperResponse.processedContent["default"].data
425  resultDict["internal"] = []
426  resultDict["custom"] = []
427  buf = []
428  for content in scraperResponse.processedContent["internal"]:
429  content.get()
430  buf.append(content.data)
431  resultDict["internal"] = buf
432  resultDict["custom"] = scraperResponse.processedContent["custom"]
433  ret = encode(json.dumps(resultDict, ensure_ascii=False))
434  return ret
435 
436 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ getProcessorCmd()

def dc_processor.ProcessorTask.ProcessorTask.getProcessorCmd (   self,
  processorName 
)

Definition at line 362 of file ProcessorTask.py.

362  def getProcessorCmd(self, processorName):
363  if self.algorithmsModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
364  # support PROCESSOR_NAME
365  cmd = CONSTS.PYTHON_BINARY + " " + CONSTS.SCRAPER_BINARY + " " + CONSTS.SCRAPER_CFG
366  if processorName is None or processorName == CONSTS.PROCESSOR_EMPTY:
367  cmd = CONSTS.PYTHON_BINARY + " " + CONSTS.SCRAPER_BINARY + " " + CONSTS.SCRAPER_CFG
368  elif processorName == CONSTS.PROCESSOR_STORE:
369  cmd = CONSTS.PYTHON_BINARY + " " + CONSTS.STORE_PROCESSOR_BINARY + " " + CONSTS.STORE_PROCESSOR_CFG
370  elif processorName == CONSTS.PROCESSOR_FEED_PARSER:
371  # Use default scraper for rss_feed site
372  cmd = CONSTS.PYTHON_BINARY + " " + CONSTS.PROCESSOR_FEED_PARSER_BINARY + " " + CONSTS.PROCESSOR_FEED_PARSER_CFG
373  elif processorName == CONSTS.PROCESSOR_SCRAPER_MULTI_ITEMS:
374  cmd = CONSTS.PYTHON_BINARY + " " + CONSTS.SCRAPER_MULTI_ITEMS_BINARY + " " + CONSTS.SCRAPER_MULTI_ITEMS_CFG
375  elif processorName == CONSTS.PROCESSOR_SCRAPER_CUSTOM:
376  cmd = CONSTS.PYTHON_BINARY + " " + CONSTS.SCRAPER_CUSTOM_BINARY + " " + CONSTS.SCRAPER_CUSTOM_CFG
377  self.logger.debug(MSG_INFO_PROCESSOR_CMD + cmd)
378  else:
379  cmd = {}
380  if processorName == CONSTS.PROCESSOR_STORE:
381  cmd["AppClass"] = CONSTS.STORE_APP_CLASS_NAME
382  cmd["AppConfig"] = CONSTS.STORE_APP_CLASS_CFG
383  elif processorName == CONSTS.PROCESSOR_FEED_PARSER:
384  cmd["AppClass"] = CONSTS.PROCESSOR_FEED_PARSER_CLASS_NAME
385  cmd["AppConfig"] = CONSTS.PROCESSOR_FEED_PARSER_CLASS_CFG
386  elif processorName == CONSTS.PROCESSOR_SCRAPER_MULTI_ITEMS:
387  cmd["AppClass"] = CONSTS.SCRAPER_MULTI_ITEMS_APP_CLASS_NAME
388  cmd["AppConfig"] = CONSTS.SCRAPER_MULTI_ITEMS_APP_CLASS_CFG
389  elif processorName == CONSTS.PROCESSOR_SCRAPER_CUSTOM:
390  cmd["AppClass"] = CONSTS.SCRAPER_CUSTOM_JSON_APP_CLASS_NAME
391  cmd["AppConfig"] = CONSTS.SCRAPER_CUSTOM_JSON_APP_CLASS_CFG
392  else:
393  cmd["AppClass"] = CONSTS.SCRAPER_APP_CLASS_NAME
394  cmd["AppConfig"] = CONSTS.SCRAPER_APP_CLASS_CFG
395 
396  return cmd
397 
398 
Here is the caller graph for this function:

◆ getPropValueFromSiteProperties()

def dc_processor.ProcessorTask.ProcessorTask.getPropValueFromSiteProperties (   self,
  batchItemDict,
  propName 
)

Definition at line 221 of file ProcessorTask.py.

221  def getPropValueFromSiteProperties(self, batchItemDict, propName):
222  ret = None
223  if "site" in batchItemDict:
224  for prop in batchItemDict["site"].properties:
225  if prop["name"] == propName:
226  ret = prop["value"]
227  break
228  return ret
229 
230 
Here is the caller graph for this function:

◆ getRawContent()

def dc_processor.ProcessorTask.ProcessorTask.getRawContent (   self,
  siteId,
  url 
)

Definition at line 2261 of file ProcessorTask.py.

2261  def getRawContent(self, siteId, url):
2262  # #Local function for check and print of data
2263  #
2264  # @param rawContentData - raw content data
2265  # @param rawContentName - raw content name
2266  def printContentStatus(rawContentData, rawContentName):
2267  if rawContentData is not None:
2268  self.logger.debug("Some %s content size %s on disk", rawContentName, str(len(rawContentData)))
2269  else:
2270  self.logger.debug("NO %s content on disk", rawContentName)
2271 
2272  # variables for result
2273  rawContent = None
2274  dynamicContent = None
2275  headerContent = None
2276  requestsContent = None
2277 
2278  urlContentObj = dc_event.URLContentRequest(siteId, url, \
2279  dc_event.URLContentRequest.CONTENT_TYPE_RAW_LAST + \
2280  dc_event.URLContentRequest. CONTENT_TYPE_RAW + \
2281  dc_event.URLContentRequest.CONTENT_TYPE_DYNAMIC + \
2282  dc_event.URLContentRequest.CONTENT_TYPE_HEADERS + \
2283  dc_event.URLContentRequest.CONTENT_TYPE_REQUESTS)
2284 
2285  rawContentData = self.wrapper.urlContent([urlContentObj])
2286 
2287  if rawContentData is not None and len(rawContentData) > 0:
2288  if rawContentData[0].headers is not None and len(rawContentData[0].headers) > 0 and \
2289  rawContentData[0].headers[0] is not None:
2290  headerContent = rawContentData[0].headers[0].buffer
2291 
2292  if rawContentData[0].requests is not None and len(rawContentData[0].requests) > 0 and \
2293  rawContentData[0].requests[0] is not None:
2294  requestsContent = rawContentData[0].requests[0].buffer
2295 
2296  if rawContentData[0].rawContents is not None:
2297  if len(rawContentData[0].rawContents) > 0 and rawContentData[0].rawContents[0] is not None:
2298  rawContent = rawContentData[0].rawContents[0].buffer
2299  else:
2300  pass
2301  if len(rawContentData[0].rawContents) > 1 and rawContentData[0].rawContents[1] is not None:
2302  dynamicContent = rawContentData[0].rawContents[1].buffer
2303  else:
2304  pass
2305 
2306  printContentStatus(rawContent, 'raw')
2307  printContentStatus(dynamicContent, 'dynamic')
2308  printContentStatus(headerContent, 'header')
2309  printContentStatus(requestsContent, 'requests')
2310 
2311  return rawContent, dynamicContent, headerContent, requestsContent
2312 
2313 
Here is the caller graph for this function:

◆ getRawContentFromFS()

def dc_processor.ProcessorTask.ProcessorTask.getRawContentFromFS (   self,
  batchItem,
  batchItemDict 
)

Definition at line 650 of file ProcessorTask.py.

650  def getRawContentFromFS(self, batchItem, batchItemDict):
651  ret = None
652  try:
653  # Check if Real-Time crawling with ONLY_PROCESSING algorithm
654  # if self.input_batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER and \
655  # batchItem.urlObj.urlPut is not None:
656  if batchItem.urlObj.urlPut is not None:
657  ret = batchItem.urlObj.urlPut.putDict["data"]
658  if batchItem.urlObj.urlPut.contentType == dc_event.Content.CONTENT_RAW_CONTENT:
659  ret = base64.b64decode(ret)
660  else:
661  if "htmlRecover" in batchItemDict and batchItemDict["htmlRecover"] is not None and \
662  batchItemDict["htmlRecover"] == "1":
663  urlContentObj = dc_event.URLContentRequest(batchItem.siteId, batchItem.urlObj.url,
664  dc_event.URLContentRequest.CONTENT_TYPE_RAW_LAST + \
665  dc_event.URLContentRequest.CONTENT_TYPE_TIDY)
666  urlContentObj.urlMd5 = batchItem.urlObj.urlMd5
667  ret = self.wrapper.urlContent([urlContentObj])
668  if len(ret[0].rawContents) > 0:
669  self.logger.debug(">>> YES tidy on disk")
670  else:
671  self.logger.debug(">>> NO tidy on disk")
672  else:
673  urlContentObj = dc_event.URLContentRequest(batchItem.siteId, batchItem.urlObj.url,
674  dc_event.URLContentRequest.CONTENT_TYPE_RAW_LAST + \
675  dc_event.URLContentRequest.CONTENT_TYPE_RAW)
676  urlContentObj.urlMd5 = batchItem.urlObj.urlMd5
677  ret = self.wrapper.urlContent([urlContentObj])
678  # Decode buffer
679  if ret is not None and len(ret) > 0 and ret[0].rawContents is not None and len(ret[0].rawContents) > 0:
680  putDict = {}
681  putDict["id"] = batchItem.urlId
682  putDict["data"] = ret[0].rawContents[0].buffer
683  putDict["cDate"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
684  batchItem.urlObj.urlPut = dc_event.URLPut(batchItem.siteId, batchItem.urlId,
685  dc_event.Content.CONTENT_RAW_CONTENT, putDict)
686  ret = base64.b64decode(ret[0].rawContents[0].buffer)
687  self.logger.debug("Some raw content size %s on disk", str(len(ret)))
688  else:
689  ret = None
690  self.logger.debug("NO raw content on disk, raw_content: %s", str(ret))
691 
692  except Exception as err:
693  ExceptionLog.handler(self.logger, err, MSG_ERROR_GET_RAW_CONTENT_FROM_DB, (err))
694  batchItemDict["errorMask"] |= APP_CONSTS.ERROR_MASK_MISSED_RAW_CONTENT_ON_DISK
695  raise ProcessorException(MSG_ERROR_GET_RAW_CONTENT_FROM_DB)
696  return ret
697 
698 
Here is the caller graph for this function:

◆ isAllowedSiteLimits()

def dc_processor.ProcessorTask.ProcessorTask.isAllowedSiteLimits (   self,
  siteObj,
  accumulatedBatchItems 
)

Definition at line 442 of file ProcessorTask.py.

442  def isAllowedSiteLimits(self, siteObj, accumulatedBatchItems):
443 
444  self.logger.debug("!!! siteObj.maxURLs = " + str(siteObj.maxURLs))
445  self.logger.debug("!!! siteObj.maxResources = " + str(siteObj.maxResources))
446  self.logger.debug("!!! siteObj.maxErrors = " + str(siteObj.maxErrors))
447  self.logger.debug("!!! siteObj.maxResourceSize = " + str(siteObj.maxResourceSize))
448 
449  if siteObj.maxURLs > 0 and siteObj.maxURLs < len(accumulatedBatchItems):
450  accumulatedBatchItems[-1].urlObj.errorMask |= APP_CONSTS.ERROR_MAX_ITEMS
451  self.logger.debug("Max URLs is reached! Urls count= %s. Site maxURLs: %s ", len(accumulatedBatchItems),
452  siteObj.maxURLs)
453  return False
454 
455  if siteObj.maxResources > 0 and siteObj.maxResources < len(accumulatedBatchItems):
456  accumulatedBatchItems[-1].urlObj.errorMask |= APP_CONSTS.ERROR_MASK_SITE_MAX_RESOURCES_SIZE
457  self.logger.debug("Max resources size is reached! resources count= %s. Site maxResources: %s ",
458  len(accumulatedBatchItems), siteObj.maxResources)
459  return False
460 
461  errors = 0
462  for batchItem in accumulatedBatchItems:
463  self.logger.debug("!!! batchItem.urlObj.errorMask = " + str(batchItem.urlObj.errorMask))
464  if batchItem.urlObj.errorMask != APP_CONSTS.ERROR_OK:
465  errors += 1
466 
467  if 'data' in batchItem.urlObj.urlPut.putDict:
468  resourcesSize = len(batchItem.urlObj.urlPut.putDict['data'])
469  self.logger.debug("Resource size = " + str(resourcesSize))
470 
471  if siteObj.maxResourceSize > 0 and siteObj.maxResourceSize <= resourcesSize:
472  accumulatedBatchItems[-1].urlObj.errorMask |= APP_CONSTS.ERROR_RESPONSE_SIZE_ERROR
473  self.logger.debug("Max resource size is reached! resource size = %s. Site maxResourceSize: %s ",
474  resourcesSize, siteObj.maxResourceSize)
475  return False
476 
477 
478  if siteObj.maxErrors > 0 and siteObj.maxErrors <= errors:
479  accumulatedBatchItems[-1].urlObj.errorMask |= APP_CONSTS.ERROR_SITE_MAX_ERRORS
480  self.logger.debug("Max errors is reached! Errors count= %s. Site maxErrors: %s ",
481  errors, siteObj.maxErrors)
482  return False
483 
484  return True
485 
486 
Here is the caller graph for this function:

◆ isDisabledSite()

def dc_processor.ProcessorTask.ProcessorTask.isDisabledSite (   self,
  site 
)

Definition at line 1365 of file ProcessorTask.py.

1365  def isDisabledSite(self, site):
1366  return bool(site.state == Site.STATE_DISABLED)
1367 
1368 
Here is the caller graph for this function:

◆ isOverlimitMaxResources()

def dc_processor.ProcessorTask.ProcessorTask.isOverlimitMaxResources (   self,
  site,
  url 
)

Definition at line 1374 of file ProcessorTask.py.

1374  def isOverlimitMaxResources(self, site, url):
1375  # variable for result
1376  ret = False
1377  # pass checking if it is first processing, but not re-crawling
1378  if site.maxResources > 0 and site.contents > site.maxResources and url.processed == 0:
1379  self.logger.debug("Site maxResources number is reached! Site contents is: %s. Site maxResources: %s ",
1380  str(site.contents), str(site.maxResources))
1381  ret = True
1382 
1383  return ret
1384 
1385 
Here is the caller graph for this function:

◆ loadConfig()

def dc_processor.ProcessorTask.ProcessorTask.loadConfig (   self)

Definition at line 2131 of file ProcessorTask.py.

2131  def loadConfig(self):
2132  try:
2133  if self.pargs.config:
2134  self.config = ConfigParser.ConfigParser()
2135  self.config.optionxform = str
2136  self.config.read(self.pargs.config)
2137  else:
2138  raise Exception(MSG_ERROR_EMPTY_CONFIG_FILE_NAME)
2139  except Exception, err:
2140  raise Exception(MSG_ERROR_LOAD_CONFIG + ' ' + str(err))
2141 
2142 
Here is the caller graph for this function:

◆ loadLogConfigFile()

def dc_processor.ProcessorTask.ProcessorTask.loadLogConfigFile (   self)

Definition at line 2146 of file ProcessorTask.py.

2146  def loadLogConfigFile(self):
2147  try:
2148  log_conf_file = self.config.get("Application", "log")
2149  logging.config.fileConfig(log_conf_file)
2150  # Logger initialization
2151  self.logger = Utils.MPLogger().getLogger()
2152  except Exception, err:
2153  raise Exception(MSG_ERROR_LOAD_LOG_CONFIG_FILE + str(err))
2154 
2155 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ loadOptions()

def dc_processor.ProcessorTask.ProcessorTask.loadOptions (   self)

Definition at line 2159 of file ProcessorTask.py.

2159  def loadOptions(self):
2160  try:
2161  self.raw_data_dir = self.config.get(self.__class__.__name__, "raw_data_dir")
2162  self.db_task_ini = self.config.get(self.__class__.__name__, "db-task_ini")
2163  try:
2164  self.algorithmsModel = self.config.getint(self.__class__.__name__, "algorithmsModel")
2165  except Exception as err:
2166  pass
2167  # Add support operations updateCollectedURLs and removeURLs
2168  cfgParser = ConfigParser.ConfigParser()
2169  cfgParser.read(self.db_task_ini)
2170  self.wrapper = DBTasksWrapper(cfgParser)
2171  except Exception, err:
2172  raise Exception(MSG_ERROR_LOAD_OPTIONS + str(err))
2173 
2174 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ loadSite()

def dc_processor.ProcessorTask.ProcessorTask.loadSite (   self,
  batchItem 
)

Definition at line 1411 of file ProcessorTask.py.

1411  def loadSite(self, batchItem):
1412  ret = None
1413  try:
1414  if not len(batchItem.siteId):
1415  batchItem.siteId = "0"
1416  site = self.readSiteFromDB(batchItem)
1417  if site is not None and batchItem.siteObj is not None:
1418  site.rewriteFields(batchItem.siteObj)
1419  ret = site
1420  except Exception as err:
1421  ExceptionLog.handler(self.logger, err, MSG_ERROR_LOAD_SITE_DATA)
1422  raise err
1423 
1424  return ret
1425 
1426 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ loadSiteProperties()

def dc_processor.ProcessorTask.ProcessorTask.loadSiteProperties (   self,
  site,
  url,
  batchItem,
  batchItemDict 
)

Definition at line 234 of file ProcessorTask.py.

234  def loadSiteProperties(self, site, url, batchItem, batchItemDict):
235  batchItemDict["template"] = ""
236 # self.logger.debug("site.properties: " + str(site.properties))
237  try:
238  # Update site properties from batch item properties
239  keys = [localProperty["name"] for localProperty in site.properties]
240 # self.logger.debug("keys: %s" % str(keys))
241  for key in batchItem.properties.keys():
242  if key in keys:
243  for localProperty in site.properties:
244  if localProperty["name"] == key:
245  self.logger.debug("%s exist in site properties. Rewrite property", key)
246 # self.logger.debug("Old value: %s" % varDump(localProperty["value"]))
247  localProperty["value"] = batchItem.properties[key]
248 # self.logger.debug("New value: %s" % varDump(localProperty["value"]))
249  else:
250  self.logger.debug("%s don't exist in site properties. Add property", key)
251  site.properties.append({"name": key, "value": batchItem.properties[key], "URLMd5": batchItem.urlId})
252  # self.logger.debug("Updated site's properties: " + str(site.properties))
253 
254 
255  for localProperty in site.properties:
256  # self.logger.debug('>>> localProperty ' + str(localProperty["name"]) + ' => ' + str(localProperty["value"]))
257 
258  # PROCESS_CTYPES
259  if localProperty["name"] == "PROCESS_CTYPES":
260  batchItemDict["processCTypes"] = localProperty["value"]
261  self.logger.debug("PROCESS_CTYPES: " + str(batchItemDict["processCTypes"]))
262 
263  # CONTENT_TYPE_MAP
264  elif localProperty["name"] == "CONTENT_TYPE_MAP":
265  batchItemDict["contentTypeMap"] = localProperty["value"]
266  self.logger.debug("CONTENT_TYPE_MAP: " + str(batchItemDict["contentTypeMap"]))
267 
268  # TIMEZONE
269  elif localProperty["name"] == "TIMEZONE":
270  batchItemDict["timezone"] = localProperty["value"]
271  self.logger.debug("TIMEZONE: " + str(batchItemDict["timezone"]))
272 
273  # REFINE_TAGS
274  elif localProperty["name"] == "TEXT_STATS":
275  batchItemDict["textStatus"] = int(localProperty["value"])
276  self.logger.debug("TEXT_STATS: " + str(batchItemDict["textStatus"]))
277 
278  # TEMPLATE_SOURCE
279  elif localProperty["name"] == "TEMPLATE_SOURCE":
280  batchItemDict["TEMPLATE_SOURCE"] = localProperty["value"]
281  self.logger.debug("TEMPLATE_SOURCE: " + str(batchItemDict["TEMPLATE_SOURCE"]))
282 
283  # PROCESSOR_PROPERTIES
284  elif localProperty["name"] == "PROCESSOR_PROPERTIES":
285  batchItemDict["processorProperties"] = localProperty["value"]
286  self.logger.debug("PROCESSOR_PROPERTIES: " + str(batchItemDict["processorProperties"]))
287 
288  # CONTENT_HASH
289  elif localProperty["name"] == "CONTENT_HASH":
290  batchItemDict["contentHash"] = localProperty["value"]
291  self.logger.debug("CONTENT_HASH: " + str(batchItemDict["contentHash"]))
292 
293  # HTML_RECOVER
294  elif localProperty["name"] == "HTML_RECOVER":
295  batchItemDict["htmlRecover"] = localProperty["value"]
296  self.logger.debug("HTML_RECOVER: " + str(batchItemDict["htmlRecover"]))
297 
298  # URL_NORMALIZE_MASK_PROCESSOR
299  elif localProperty["name"] == "URL_NORMALIZE_MASK_PROCESSOR":
300  batchItemDict["urlNormalizeMaskProcessor"] = localProperty["value"]
301  self.logger.debug("URL_NORMALIZE_MASK_PROCESSOR: " + str(batchItemDict["urlNormalizeMaskProcessor"]))
302 
303  # template
304  elif localProperty["name"] == "template":
305  batchItemDict["template"] = localProperty["value"]
306  self.logger.debug("Template: " + varDump(batchItemDict["template"]))
307 
308  # PROCESSOR_NAME_REPLACE
309  elif localProperty["name"] == "PROCESSOR_NAME_REPLACE":
310  batchItemDict["PROCESSOR_NAME_REPLACE"] = localProperty["value"]
311  self.logger.debug("PROCESSOR_NAME_REPLACE: " + str(batchItemDict["PROCESSOR_NAME_REPLACE"]))
312 
313  # PROCESSOR_NAME
314  elif localProperty["name"] == "PROCESSOR_NAME":
315  batchItemDict["processorName"] = localProperty["value"]
316  self.logger.debug("PROCESSOR_NAME: " + str(batchItemDict["processorName"]))
317 
318  # HTTP_REDIRECT_LINK
319  elif localProperty["name"] == "HTTP_REDIRECT_LINK":
320  batchItemDict["HTTP_REDIRECT_LINK"] = localProperty["value"]
321  self.logger.debug("HTTP_REDIRECT_LINK: " + str(batchItemDict["HTTP_REDIRECT_LINK"]))
322 
323 
324  # debug info
325  self.logger.debug("HASH: %s, URL: %s", str(hashlib.md5(app.Utils.UrlParser.generateDomainUrl(url.url)).hexdigest()), url.url)
326  if "processorProperties" not in batchItemDict:
327  batchItemDict["processorProperties"] = None
328  if "processCTypes" in batchItemDict:
329  self.logger.debug("PROCESS_CTYPES: " + str(batchItemDict["processCTypes"]))
330  if "timezone" in batchItemDict:
331  self.logger.debug("TIMEZONE: " + str(batchItemDict["timezone"]))
332  else:
333  batchItemDict["timezone"] = None
334  if "textStatus" in batchItemDict:
335  self.logger.debug("TEXT_STATS: " + str(batchItemDict["textStatus"]))
336 
337  if "processorName" in batchItemDict:
338  self.logger.debug("PROCESSOR_NAME: " + str(batchItemDict["processorName"]))
339  else:
340  batchItemDict["processorName"] = None
341 
342  if "contentHash" in batchItemDict:
343  self.logger.debug("CONTENT_HASH: " + str(batchItemDict["contentHash"]))
344  else:
345  batchItemDict["contentHash"] = None
346  if "template" in batchItem.properties and self.accumulateProcessing:
347  batchItemDict["template"] = batchItem.properties["template"]
348  self.logger.debug(">>> Reproduce template for accumulate batchItem")
349  if "template" in batchItemDict:
350  batchItemDict["template"] = copy.deepcopy(batchItemDict["template"])
351  # self.logger.debug("Template: " + varDump(batchItemDict["template"]))
352  except ProcessorException, err:
353  ExceptionLog.handler(self.logger, err, MSG_INFO_LOAD_SITE_PROPERTIES)
354  raise err
355  except Exception as err:
356  ExceptionLog.handler(self.logger, err, MSG_INFO_LOAD_SITE_PROPERTIES)
357  raise err
358 
359 
def generateDomainUrl(url)
Definition: Utils.py:533
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:
Here is the caller graph for this function:

◆ loadURL()

def dc_processor.ProcessorTask.ProcessorTask.loadURL (   self,
  batchItem 
)

Definition at line 1430 of file ProcessorTask.py.

1430  def loadURL(self, batchItem):
1431  ret = None
1432  urlStatus = dc_event.URLStatus(batchItem.siteId, batchItem.urlId)
1433  urlStatus.urlType = dc_event.URLStatus.URL_TYPE_MD5
1434  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_STATUS, [urlStatus])
1435  responseDRCESyncTasksCover = self.wrapper.process(drceSyncTasksCoverObj)
1436  row = responseDRCESyncTasksCover.eventObject
1437  try:
1438  if len(row):
1439  ret = row[0]
1440  else: # throw if url doesn't exists
1441  raise ProcessorException(">>> URLStatus return empty response, urlId=" + batchItem.urlId)
1442  except Exception, err:
1443  ExceptionLog.handler(self.logger, err, MSG_ERROR_LOAD_URL_DATA, (ret))
1444  raise err
1445 
1446  self.logger.debug(">>> Url object: " + varDump(ret))
1447 
1448  return ret
1449 
1450 
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mapResponse()

def dc_processor.ProcessorTask.ProcessorTask.mapResponse (   self,
  template,
  crawlingTime,
  scraperResponse,
  processorProperties 
)

Definition at line 1095 of file ProcessorTask.py.

1095  def mapResponse(self, template, crawlingTime, scraperResponse, processorProperties):
1096  # I. TODO reduce phase
1097  # self.logger.debug("Output format: %s" % varDump(template["output_format"]))
1098  localProcessedContents = []
1099  if scraperResponse is not None:
1100  localProcessedContents.append(scraperResponse.processedContent["default"])
1101  localProcessedContents.extend(scraperResponse.processedContent["internal"])
1102  template["response"] = []
1103 
1104  # II. Fill header
1105  localResponse = template["output_format"]["header"]
1106 
1107  # III. Fill item
1108  localResponse = localResponse + template["output_format"]["items_header"]
1109 
1110  entry = template["output_format"]["item"]
1111  removeTrailingComma = entry.endswith(",")
1112  i = 0
1113  for localProcessedContent in localProcessedContents:
1114  internalEntry = copy.deepcopy(entry)
1115  internalLocalResponse = copy.deepcopy(localResponse)
1116  if localProcessedContent is not None:
1117  internalEntry = self.mapResponseProcessedContent(template, localProcessedContent, removeTrailingComma,
1118  internalEntry, processorProperties)
1119 
1120  internalEntry = internalEntry.replace("%crawler_time%", str(crawlingTime / 1000.0))
1121  if removeTrailingComma and len(internalEntry) > 0 and internalEntry.endswith(","):
1122  internalEntry = internalEntry[:-1]
1123 
1124  entry = self.mapResponseAdditionSubstitutes(entry, scraperResponse.errorMask)
1125  internalLocalResponse = internalLocalResponse + internalEntry + template["output_format"]["items_footer"]
1126  # IV. Fill footer
1127  internalLocalResponse = internalLocalResponse + template["output_format"]["footer"]
1128 
1129  template["response"].append(internalLocalResponse)
1130  if i > 0:
1131  scraperResponse.processedContent["custom"].append(internalLocalResponse)
1132  i += 1
1133 
1134  if len(localProcessedContents) == 0:
1135  entry = entry.replace("%crawler_time%", str(crawlingTime / 1000.0))
1136  if removeTrailingComma and len(entry) > 0 and entry.endswith(","):
1137  entry = entry[:-1]
1138 
1139  if scraperResponse is not None:
1140  entry = self.mapResponseAdditionSubstitutes(entry, scraperResponse.errorMask)
1141  localResponse = localResponse + entry + template["output_format"]["items_footer"]
1142  # IV. Fill footer
1143  localResponse = localResponse + template["output_format"]["footer"]
1144  template["response"].append(localResponse)
1145 
1146 # self.logger.debug("Output response: %s" % varDump(template["response"]))
1147 
1148 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mapResponseAdditionSubstitutes()

def dc_processor.ProcessorTask.ProcessorTask.mapResponseAdditionSubstitutes (   self,
  buf,
  errorMask 
)

Definition at line 1151 of file ProcessorTask.py.

1151  def mapResponseAdditionSubstitutes(self, buf, errorMask):
1152  ret = buf
1153  substituteDict = {"%errors_mask%": str(errorMask)}
1154  for key in substituteDict:
1155  ret = re.sub(key, substituteDict[key], ret)
1156 
1157  return ret
1158 
1159 
Here is the caller graph for this function:

◆ mapResponseProcessedContent()

def dc_processor.ProcessorTask.ProcessorTask.mapResponseProcessedContent (   self,
  template,
  processedContent,
  removeTrailingComma,
  entry,
  processorProperties 
)

Definition at line 1003 of file ProcessorTask.py.

1003  def mapResponseProcessedContent(self, template, processedContent, removeTrailingComma, entry, processorProperties):
1004  if len(template["tags"]) == 0:
1005  entries = []
1006  for data in processedContent.data["data"]["tagList"][0]:
1007  ei = re.sub("%tag_name%_%extractor_value%", "%" + data["name"] + "_extractor%", entry)
1008  ei = re.sub("%tag_value%", "%" + data["name"] + "%", ei)
1009  ei = re.sub("%tag_name%", data["name"], ei)
1010 
1011  if ei != "":
1012  entries.append(ei)
1013 
1014  if len(entries) > 0:
1015  if template["output_format"]["name"] == "json":
1016  itemDelimiter = ","
1017  else:
1018  itemDelimiter = " "
1019 
1020  # # make unique
1021  entries = list(set(entries))
1022  entry = itemDelimiter.join(entries)
1023  if removeTrailingComma and len(entry) > 0 and entry.endswith(","):
1024  entry = entry[:-1]
1025  else:
1026  entry = ""
1027 
1028  langTagsNames = []
1029  properties = {}
1030  try:
1031  properties = json.loads(processorProperties)
1032  except Exception, err:
1033  self.logger.error(str(err))
1034 
1035  if CONSTS.LANG_PROP_NAME in properties:
1036  langDetector = ScraperLangDetector(properties[CONSTS.LANG_PROP_NAME])
1037  langTagsNames = langDetector.getLangTagsNames()
1038  # self.logger.debug("langTagsNames: %s", str(langTagsNames))
1039 
1040  if entry != "":
1041  # self.logger.debug("Output processedContent.data.tagList: %s", varDump(processedContent.data["data"]["tagList"]))
1042  for data in processedContent.data["data"]["tagList"][0]:
1043  # self.logger.debug("Output processedContent.data: %s" % varDump(data))
1044  pattern = "%" + data["name"] + "%"
1045  # pattern = pattern.replace("\\", "\\\\")
1046  if entry.find(pattern) != -1:
1047  for item in data["data"]:
1048  if item is None or item == "":
1049  continue
1050  entry = entry.replace(pattern, item)
1051 
1052  if "extractor" in data and "name" in data:
1053  entry = entry.replace("%" + data["name"] + "_extractor" + "%", str(data["extractor"]))
1054 
1055  if "xpath" in data and "name" in data and data["data"] and len(data["data"]) > 0:
1056  xpathData = str(data["xpath"])
1057  if template["output_format"]["name"] == "json":
1058  # self.logger.debug('>>> ' + str(data["name"]) + ' xpathData: ' + str(xpathData))
1059  xpathData = json.dumps(xpathData).strip('"')
1060  # self.logger.debug('>>> ' + str(data["name"]) + ' xpathData: ' + str(xpathData))
1061  entry = entry.replace("%" + data["name"] + "_xpath" + "%", xpathData)
1062  elif "xpath" in data and "name" in data:
1063  entry = entry.replace("%" + data["name"] + "_xpath" + "%", "%" + data["name"] + "_xpath" + "%")
1064 
1065  if "lang" in data and "lang_suffix" in data and "name" in data:
1066  entry = entry.replace("%" + data["name"] + data["lang_suffix"] + "%", str(data["lang"]))
1067 
1068  if "summary_lang" in data and "lang_suffix" in data:
1069  entry = entry.replace("%" + data["lang_suffix"] + "%", str(data["summary_lang"]))
1070 
1071  for langTagName in langTagsNames:
1072  if "lang" in data and "name" in data and data["name"] in langTagName:
1073  if "%" + langTagName + "%" in entry:
1074  entry = entry.replace("%" + langTagName + "%", str(data["lang"]))
1075  else:
1076  entry = self.addAdditionalValue(entry, str(langTagName), str(data["lang"]))
1077 
1078  if "time" in processedContent.data:
1079  entry = entry.replace("%scraper_time%", str(processedContent.data["time"]))
1080  else:
1081  entry = json.dumps({"default":processedContent.data}, ensure_ascii=False)
1082 
1083  localMetrics = json.dumps(processedContent.metrics)
1084  if len(localMetrics) > 0 and localMetrics[0] == '"' or localMetrics[0] == '\'':
1085  localMetrics = localMetrics[1:]
1086  if len(localMetrics) > 0 and localMetrics[-1] == '"' or localMetrics[-1] == '\'':
1087  localMetrics = localMetrics[0:-1]
1088  entry = entry.replace("%metrics%", localMetrics)
1089 
1090  return entry
1091 
1092 
-mask-info
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mergeChains()

def dc_processor.ProcessorTask.ProcessorTask.mergeChains (   self,
  chainElem,
  batchItem,
  batchItemDict,
  delimiter = ' ' 
)

Definition at line 1828 of file ProcessorTask.py.

1828  def mergeChains(self, chainElem, batchItem, batchItemDict, delimiter=' '): # pylint: disable=W0612,W0613
1829  self.logger.debug(">>> mergeChains")
1830  scraperResponseDest = chainElem["batchItemDict"]["scraperResponse"][0]
1831  scraperResponseSrc = batchItemDict["scraperResponse"][0]
1832  if "site" in chainElem["batchItemDict"]:
1833  sitePropetries = chainElem["batchItemDict"]["site"].properties
1834  self.logger.debug(">>> mergeChains sitePropetries = " + str(sitePropetries))
1835  for sitePropElem in sitePropetries:
1836  if sitePropElem["name"] == "URL_CHAIN" and sitePropElem["value"] is not None:
1837  urlChainDict = json.loads(sitePropElem["value"])
1838  if "tags_name" in urlChainDict:
1839  self.logger.debug(">>> mergeChains URL_CHAIN = " + str(urlChainDict))
1840 
1841  for srcData in scraperResponseSrc.processedContent["default"].data["data"]["tagList"][0]:
1842  isAppended = False
1843  self.logger.debug(">>> mergeChains srcData = " + str(srcData["name"]))
1844  if urlChainDict["tags_name"] is None or srcData["name"] in urlChainDict["tags_name"]:
1845  for destData in scraperResponseDest.processedContent["default"].data["data"]["tagList"][0]:
1846  if srcData["name"] == destData["name"]:
1847  # destData["data"].extend(srcData["data"])
1848  if "delimiter" in urlChainDict and urlChainDict["delimiter"] is not None:
1849  localDelimiter = urlChainDict["delimiter"]
1850  else:
1851  localDelimiter = DEFSULT_CHAIN_DELIMITER
1852  destData["data"][0] += localDelimiter
1853  destData["data"][0] += srcData["data"][0]
1854  if "extractors" in chainElem and srcData["name"] in chainElem["extractors"] and \
1855  srcData["extractor"] not in chainElem["extractors"][srcData["name"]]:
1856  destData["extractor"] += delimiter
1857  destData["extractor"] += srcData["extractor"]
1858  isAppended = True
1859  break
1860  if not isAppended:
1861  scraperResponseDest.processedContent["default"].data["data"]["tagList"][0].\
1862  append(copy.deepcopy(srcData))
1863  isAppended = True
1864  if isAppended:
1865  if "extractors" not in chainElem:
1866  chainElem["extractors"] = {}
1867  if srcData["name"] not in chainElem["extractors"]:
1868  chainElem["extractors"][srcData["name"]] = []
1869  if srcData["extractor"] not in chainElem["extractors"][srcData["name"]]:
1870  chainElem["extractors"][srcData["name"]].append(srcData["extractor"])
1871 
1872 
Here is the caller graph for this function:

◆ parseTemplate()

def dc_processor.ProcessorTask.ProcessorTask.parseTemplate (   self,
  batchItem,
  batchItemDict 
)

Definition at line 939 of file ProcessorTask.py.

939  def parseTemplate(self, batchItem, batchItemDict):
940  # Parse templates
941  # self.logger.debug("template: %s" % varDump(batchItemDict["template"]))
942  # self.logger.debug("type of template is = " + str(type(batchItemDict["template"])))
943  if isinstance(batchItemDict["template"], basestring) and batchItemDict["template"] != "":
944  batchItemDict["template"] = json.loads(batchItemDict["template"])
945  if "templates" in batchItemDict["template"]:
946  # If new template format do nothing
947  self.logger.debug("NEW template format")
948  if "template" not in batchItem.properties:
949  batchItem.properties["template"] = copy.deepcopy(batchItemDict["template"])
950  else:
951  # Conver old template format to new one
952  self.logger.debug("OLD template format")
953  self.convertTemplateFormat(batchItem, batchItemDict)
954  if not self.accumulateProcessing:
955  if "TEMPLATE_SOURCE" in batchItemDict:
956  batchItemDict["template"] = self.extendTemplateFromSource(batchItemDict)
957  batchItemDict["template"] = self.removeTemplateElementsByCondition(batchItemDict["template"], batchItemDict)
958 
959 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ process()

def dc_processor.ProcessorTask.ProcessorTask.process (   self,
  scraperInputObject,
  batchItem,
  batchItemDict 
)

Definition at line 561 of file ProcessorTask.py.

561  def process(self, scraperInputObject, batchItem, batchItemDict):
562  try:
563  # sleep to reduce system load
564  time.sleep(batchItemDict["url"].processingDelay / 1000.0)
565 
566  # self.logger.debug('batchItemDict: ' + varDump(batchItemDict))
567  self.logger.debug('batchItemDict["processorName"]: ' + varDump(batchItemDict["processorName"]))
568 
569  cmd = self.getProcessorCmd(batchItemDict["processorName"])
570 
571  self.logger.debug('cmd: ' + varDump(cmd))
572 
573  err = ""
574  scraperResponse = None
575  if self.algorithmsModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
576  inputPickledObject = pickle.dumps(scraperInputObject)
577  Utils.storePickleOnDisk(inputPickledObject, ENV_PROCESSOR_STORE_PATH, "processor.out." + batchItem.urlId)
578  self.logger.debug("The process Popen() algorithms usage model")
579  self.logger.debug("Popen: %s", str(cmd))
580  process = Popen(cmd, stdout=PIPE, stdin=PIPE, stderr=PIPE, shell=True, close_fds=True)
581  self.logger.debug("process.communicate(), len(input_pickled_object)=" + str(len(inputPickledObject)))
582  (output, err) = process.communicate(input=inputPickledObject)
583  self.logger.debug("Process std_error=: %s", str(err))
584  self.logger.debug("Process output len=:" + str(len(output)))
585  exitCode = process.wait()
586  self.logger.debug("Process response exitCode: %s", str(exitCode))
587  if exitCode == EXIT_FAILURE:
588  self.logger.error("Process has failed!")
589  raise ProcessorException("Scraper has failed.")
590  # self.scraper_response = pickle.loads(output)
591  scraperOutputData = pickle.loads(output)
592  # self.logger.debug("scraperOutputData: %s", varDump(scraperOutputData))
593 
594  scraperResponse, accumulatedBatchItems = self.readScraperOutputData(batchItem, scraperOutputData,
595  batchItemDict["site"])
596  self.accumulatedBatchItems += accumulatedBatchItems
597  else:
598  self.logger.debug("The module import algorithms usage model")
599  if self.algorithmsModule is None:
600  self.logger.debug("Initialize algorithm module and class instantiate")
601  import importlib
602  self.logger.debug("importlib.import_module(dc_processor." + cmd["AppClass"] + ")")
603  self.algorithmsModule = importlib.import_module("dc_processor." + cmd["AppClass"])
604  self.logger.debug("Module dc_processor." + cmd["AppClass"] + " imported")
605  # create the app
606  try:
607  self.algorithmsClass = getattr(self.algorithmsModule, cmd["AppClass"])(APP_CONSTS.APP_USAGE_MODEL_MODULE,
608  cmd["AppConfig"],
609  self.logger, scraperInputObject)
610  self.algorithmsClass.setup()
611  except Exception as err:
612  raise ProcessorException("Module initialization failed: " + str(err) + "\n" + Utils.getTracebackInfo())
613  else:
614  # Use instance from previous call
615  self.algorithmsClass.input_data = scraperInputObject
616  try:
617  self.algorithmsClass.run()
618  exitCode = self.algorithmsClass.getExitCode()
619 
620  self.logger.debug('type(self.algorithmsClass.output_data): ' + str(type(self.algorithmsClass.output_data)))
621  self.logger.debug('self.algorithmsClass.output_data: ' + str(self.algorithmsClass.output_data))
622 
623  scraperResponse, accumulatedBatchItems = self.readScraperOutputData(batchItem,
624  self.algorithmsClass.output_data,
625  batchItemDict["site"])
626  self.accumulatedBatchItems += accumulatedBatchItems
627 
628  output = pickle.dumps(scraperResponse)
629  except Exception as err:
630  raise ProcessorException("Algorithm module has failed: " + str(err) + "\n" + Utils.getTracebackInfo())
631 
632  self.logger.info(MSG_INFO_PROCESSOR_EXIT_CODE + str(exitCode))
633 
634  if scraperResponse is not None:
635  # self.logger.debug("scraper_response: %s", varDump(scraperResponse))
636 
637  batchItemDict["errorMask"] |= scraperResponse.errorMask
638 
639  except Exception as err:
640  ExceptionLog.handler(self.logger, err, MSG_ERROR_PROCESS)
641  raise ProcessorException(MSG_ERROR_PROCESS + " : " + str(err) + "\n" + Utils.getTracebackInfo())
642 
643  return Results(exitCode, output, err, scraperResponse)
644 
645 
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
-mask-info
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processBatch()

def dc_processor.ProcessorTask.ProcessorTask.processBatch (   self)

Definition at line 2036 of file ProcessorTask.py.

2036  def processBatch(self):
2037  try:
2038  start_batch_time = time.time()
2039 
2040  # read pickled batch object from stdin and unpickle it
2041  input_pickled_object = sys.stdin.read()
2042  input_batch = pickle.loads(input_pickled_object)
2043 
2044  # self.logger.info("Incoming batch id: %s, items: %s", str(input_batch.id), str(len(input_batch.items)))
2045  # self.logger.debug("input_batch: %s", varDump(input_batch))
2046 
2047  app.Profiler.messagesList.append("Batch.id=" + str(input_batch.id))
2048  self.input_batch = input_batch
2049  Utils.storePickleOnDisk(input_pickled_object, ENV_PROCESSOR_STORE_PATH, "processor.in." +
2050  str(self.input_batch.id))
2051 
2052  if int(self.input_batch.maxExecutionTime) > 0:
2053  self.maxExecutionTimeValue = self.input_batch.maxExecutionTime
2054  signal.signal(signal.SIGALRM, self.signalHandlerTimer)
2055  signal.alarm(self.maxExecutionTimeValue)
2056  self.removeUnprocessedItems = bool(self.input_batch.removeUnprocessedItems)
2057  self.logger.debug("Set maxExecutionTime = %s, removeUnprocessedItems = %s",
2058  str(self.maxExecutionTimeValue), str(self.removeUnprocessedItems))
2059 
2060  #----------------------------------------------------------------------------------------
2061  batchItems = self.input_batch.items
2062  batchItems = self.extendBatchItemsWithChain(batchItems)
2063  self.logger.info(">>> batchItems len = " + str(len(batchItems)))
2064  batchItems = self.processBatchItems(batchItems)
2065  #----------------------------------------------------------------------------------------
2066  # Main processing over every url from list of urls in the batch object
2067  batchItems = [localItem for localItem in batchItems if localItem is not None]
2068 
2069  self.logger.debug('len(batchItems) = ' + str(len(batchItems)))
2070  self.logger.debug('len(self.accumulatedBatchItems) = ' + str(len(self.accumulatedBatchItems)))
2071 
2072  self.accumulateProcessing = True
2073  # check allowed limits
2074  if self.input_batch.maxItems is not None and \
2075  int(self.input_batch.maxItems) < len(self.accumulatedBatchItems) + len(batchItems):
2076  self.accumulatedBatchItems = self.accumulatedBatchItems[0: (int(self.input_batch.maxItems) - len(batchItems))]
2077  self.accumulatedBatchItems[-1].urlObj.errorMask |= APP_CONSTS.ERROR_MAX_ITEMS
2078  self.logger.debug("Truncated scraper responces list because over limit 'maxItems' = " + \
2079  str(self.input_batch.maxItems) + " set errorMask = " + str(APP_CONSTS.ERROR_MAX_ITEMS))
2080 
2081  if self.input_batch.crawlerType != dc_event.Batch.TYPE_REAL_TIME_CRAWLER:
2082  self.logger.debug('>>> call putUrlsMultiItems()')
2083  self.putUrlsMultiItems(self.accumulatedBatchItems)
2084  if len(batchItems) > 0:
2085  self.logger.debug('>>> call putRawContentsMultiItems()')
2086  self.putRawContentsMultiItems(batchItems[0].siteId, batchItems[0].urlObj.url, self.accumulatedBatchItems)
2087 
2088  self.accumulatedBatchItems = self.processBatchItems(self.accumulatedBatchItems)
2089  self.accumulatedBatchItems = [localItem for localItem in self.accumulatedBatchItems if localItem is not None]
2090 
2091  batchItems.extend(self.accumulatedBatchItems)
2092 
2093  # self.logger.debug("Output batch items: " + varDump(batchItems))
2094 
2095  # TODO: for what this difference ???
2096 # if input_batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER:
2097 # input_batch.items = batchItems
2098 # process_task_batch = input_batch
2099 # else:
2100 # process_task_batch = Batch(input_batch.id, batchItems)
2101 
2102  input_batch.items = batchItems
2103 
2104  # self.logger.info("Outgoing batch id: %s, items: %s", str(input_batch.id), str(len(input_batch.items)))
2105  # self.logger.debug("%s", varDump(input_batch))
2106 
2107  # send response to the stdout
2108  output_pickled_object = pickle.dumps(input_batch) # #process_task_batch)
2109  Utils.storePickleOnDisk(output_pickled_object, ENV_PROCESSOR_STORE_PATH, "processor.out." + str(input_batch.id))
2110  sys.stdout.write(output_pickled_object)
2111  sys.stdout.flush()
2112 
2113  # Update db counters
2114  self.wrapper.fieldsRecalculating(self.batchSites)
2115 
2116  batch_processing_time = int((time.time() - start_batch_time) * 1000)
2117  self.logger.debug("Batch processing time: %s msec.", str(batch_processing_time))
2118  except ProcessorException, err:
2119  ExceptionLog.handler(self.logger, err, MSG_INFO_PROCESS_BATCH)
2120  raise Exception(MSG_INFO_PROCESS_BATCH + " : " + str(err))
2121  except DatabaseException, err:
2122  ExceptionLog.handler(self.logger, err, MSG_INFO_PROCESS_BATCH)
2123  raise Exception(MSG_INFO_PROCESS_BATCH + " : " + str(err))
2124  except Exception as err:
2125  ExceptionLog.handler(self.logger, err, MSG_INFO_PROCESS_BATCH, (err))
2126  raise Exception(MSG_ERROR_PROCESS_BATCH + " : " + str(err))
2127 
2128 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processBatchItemChainSelectStep()

def dc_processor.ProcessorTask.ProcessorTask.processBatchItemChainSelectStep (   self,
  batchItem,
  batchItemDict,
  chainDict 
)

Definition at line 1889 of file ProcessorTask.py.

1889  def processBatchItemChainSelectStep(self, batchItem, batchItemDict, chainDict):
1890  self.logger.debug(">>> 3 step")
1891  self.logger.debug(">>> Chain Id = " + str(batchItem.urlObj.chainId) + " Md5= " + batchItem.urlObj.urlMd5)
1892  if batchItem.urlObj.chainId is not None:
1893  batchItemDict["dellBatch"] = True
1894  if batchItem.urlObj.chainId in chainDict:
1895  self.logger.debug(">>> old Chain Id = " + str(batchItem.urlObj.chainId) + " Md5= " + batchItem.urlObj.urlMd5)
1896  if "scraperResponse" in batchItemDict and len(batchItemDict["scraperResponse"]) > 0 and \
1897  batchItemDict["scraperResponse"][0] is not None:
1898  if "scraperResponse" not in chainDict[batchItem.urlObj.chainId]["batchItemDict"] or \
1899  len(chainDict[batchItem.urlObj.chainId]["batchItemDict"]["scraperResponse"]) == 0 or \
1900  chainDict[batchItem.urlObj.chainId]["batchItemDict"]["scraperResponse"][0] is None:
1901  chainDict[batchItem.urlObj.chainId]["batchItemDict"]["scraperResponse"] = batchItemDict["scraperResponse"]
1902  else:
1903  self.mergeChains(chainDict[batchItem.urlObj.chainId], batchItem, batchItemDict)
1904  else:
1905  self.logger.debug(">>> no or empty scraperResponse for current BatchItem")
1906  else:
1907  self.logger.debug(">>> new Chain Id = " + str(batchItem.urlObj.chainId) + " Md5= " + batchItem.urlObj.urlMd5)
1908  chainDict[batchItem.urlObj.chainId] = {"batchItem": copy.deepcopy(batchItem),
1909  "batchItemDict": copy.deepcopy(batchItemDict)}
1910 
1911 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processBatchItems()

def dc_processor.ProcessorTask.ProcessorTask.processBatchItems (   self,
  inputItems 
)

Definition at line 1955 of file ProcessorTask.py.

1955  def processBatchItems(self, inputItems):
1956  #----------------------------------------------------------------------------------------
1957  localBatchItems = inputItems
1958  batchProcessingData = []
1959  chainDict = {}
1960 
1961  try:
1962  for batchItem in localBatchItems:
1963  self.logger.debug("!!! urlId: %s, maxExecutionTimeReached = %s", str(batchItem.urlId),
1964  str(self.maxExecutionTimeReached))
1965 
1966  if self.maxExecutionTimeReached:
1967  self.logger.debug("Maximum execution time %ss reached, news extraction loop interrupted!",
1968  str(self.maxExecutionTimeValue))
1969  self.logger.debug("!!! ERROR_MAX_EXECUTION_TIME !!! Set errorMask = %s",
1970  str(APP_CONSTS.ERROR_MAX_EXECUTION_TIME))
1971 
1972  if self.removeUnprocessedItems:
1973  if len(batchProcessingData) > 0:
1974  batchProcessingData[-1]["errorMask"] = APP_CONSTS.ERROR_MAX_EXECUTION_TIME
1975  break
1976  else:
1977  elem = {}
1978  elem["errorMask"] |= APP_CONSTS.ERROR_MAX_EXECUTION_TIME
1979  elem["batchItem"] = batchItem
1980  batchProcessingData.append(elem)
1981  continue
1982 
1983  batchProcessingData.append(self.processBatchItemScrapyStep(batchItem))
1984 
1985  for i in xrange(0, len(batchProcessingData)):
1986  # self.logger.debug(">>> i = " + str(i))
1987  if "batchItem" not in batchProcessingData[i] or batchProcessingData[i]["batchItem"] is not None:
1988  # self.logger.debug(">>> i = " + str(i))
1989  self.processBatchItemTemplateSelectStep(localBatchItems[i], batchProcessingData[i])
1990 
1991  for i in xrange(0, len(batchProcessingData)):
1992  if "batchItem" not in batchProcessingData[i] or batchProcessingData[i]["batchItem"] is not None:
1993  sortedMetric = self.getPropValueFromSiteProperties(batchProcessingData[i], "DEFAULT_METRIC")
1994  if sortedMetric is not None:
1995  self.resortProcessedContentsByMetrics(batchProcessingData[i], sortedMetric)
1996 
1997  for i in xrange(0, len(batchProcessingData)):
1998  if "batchItem" not in batchProcessingData[i] or batchProcessingData[i]["batchItem"] is not None:
1999  self.processBatchItemChainSelectStep(localBatchItems[i], batchProcessingData[i], chainDict)
2000 
2001  self.setDefaultInternalForChainContents(chainDict)
2002 
2003  newBatchProcessingData = []
2004  newBatchitems = []
2005  for i in xrange(0, len(batchProcessingData)):
2006  if "dellBatch" not in batchProcessingData[i] or not batchProcessingData[i]["dellBatch"]:
2007  newBatchitems.append(localBatchItems[i])
2008  newBatchProcessingData.append(batchProcessingData[i])
2009  else:
2010  self.updateProcessedURL(localBatchItems[i], batchProcessingData[i])
2011  localBatchItems = newBatchitems
2012  batchProcessingData = newBatchProcessingData
2013  for key in chainDict:
2014  localBatchItems.append(chainDict[key]["batchItem"])
2015  batchProcessingData.append(chainDict[key]["batchItemDict"])
2016 
2017  for i in xrange(0, len(batchProcessingData)):
2018  if "batchItem" not in batchProcessingData[i] or batchProcessingData[i]["batchItem"] is not None:
2019  self.processBatchItemTemplateFillStep(localBatchItems[i], batchProcessingData[i])
2020 
2021  for i in xrange(0, len(batchProcessingData)):
2022  self.processBatchItemURLContentStep(localBatchItems[i], batchProcessingData[i])
2023 
2024  except Exception, err:
2025  ExceptionLog.handler(self.logger, err, MSG_INFO_PROCESS_BATCH_ITEM)
2026  if len(localBatchItems) > 0:
2027  localBatchItems[-1].urlObj.errorMask = APP_CONSTS.ERROR_PROCESSOR_BATCH_ITEM_PROCESS
2028 
2029  ret = localBatchItems
2030  #----------------------------------------------------------------------------------------
2031  return ret
2032 
2033 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processBatchItemScrapyStep()

def dc_processor.ProcessorTask.ProcessorTask.processBatchItemScrapyStep (   self,
  batchItem 
)

Definition at line 1528 of file ProcessorTask.py.

1528  def processBatchItemScrapyStep(self, batchItem):
1529  self.logger.debug(">>> 1 step")
1530  ret = {}
1531  ret["errorMask"] = batchItem.urlObj.errorMask
1532  ret["processedTime"] = time.time()
1533  try:
1534  self.batchSites.update([batchItem.urlObj.siteId])
1535  ret["site"] = self.loadSite(batchItem)
1536  # Check if Real-Time crawling
1537  # if self.input_batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER and \
1538  # (self.input_batch.dbMode & dc_event.Batch.DB_MODE_W == 0):
1539  if self.input_batch.dbMode & dc_event.Batch.DB_MODE_W == 0:
1540  self.wrapper.affect_db = False
1541  if batchItem.urlObj is not None:
1542  ret["url"] = batchItem.urlObj
1543  else:
1544  ret["url"] = self.loadURL(batchItem)
1545 
1546  # Save raw content charset to dict
1547  self.logger.debug("Save charset '" + str(ret["url"].charset) + "' to dict")
1548  ret["charset"] = ret["url"].charset
1549 
1550  # Check current state of site
1551  if self.isDisabledSite(ret["site"]):
1552  raise ProcessorException("Site state is not active! Actual site state is: %s. " % ret["site"].state)
1553 
1554  # Check overlimit max resources limits
1555  if self.isOverlimitMaxResources(ret["site"], ret["url"]):
1556  ret["errorMask"] |= APP_CONSTS.ERROR_MASK_SITE_MAX_RESOURCES_NUMBER
1557  return ret
1558 
1559  if ret["url"].processed != 0 and ret["url"].errorMask == ERROR_MASK_NO_ERRORS and \
1560  ret["url"].tagsCount > 0 and ret["errorMask"] == ERROR_MASK_NO_ERRORS:
1561  self.logger.debug("Real time crawling. Check reprocessing.")
1562  self.logger.debug("Batch item properties: %s", json.dumps(batchItem.properties))
1563 
1564  if ("PROCESSOR_NAME" in batchItem.properties) and \
1565  (batchItem.properties["PROCESSOR_NAME"] == "NONE"):
1566  self.logger.debug("RealTime Crawling: Only crawling mode. Exit.")
1567  ret["batchItem"] = None
1568  return ret
1569 
1570  self.logger.debug("batchItem.properties: %s", json.dumps(batchItem.properties))
1571 
1572  if "PROCESSOR_NAME" in batchItem.properties:
1573  ret["processorName"] = batchItem.properties["PROCESSOR_NAME"]
1574  elif "processorName" in batchItem.properties:
1575  ret["processorName"] = batchItem.properties["processorName"]
1576 
1577  if (CONSTS.REPROCESS_KEY in batchItem.properties) and \
1578  (batchItem.properties[CONSTS.REPROCESS_KEY] == CONSTS.REPROCESS_VALUE_NO):
1579  self.logger.debug("RealTime Crawling: Cashed resource. Resource crawled and errorMask is empty." +
1580  "Don't need to reprocess.")
1581  ret["batchItem"] = None
1582  return ret
1583  else:
1584  self.logger.debug("RealTime Crawling: Cashed resource. Resource crawled and errorMask is emppty but " +
1585  "properties reprocess is Yes or empty. Send to reprocessing.")
1586 
1587  # (1) Apply SQLExpression filter before processor for zero site
1588  if batchItem.siteId == '0':
1589  self.logger.debug("Check SQLExpression filter for zero site ...")
1590  fields = {}
1591  for key, value in batchItem.urlObj.__dict__.items():
1592  fields[key.upper()] = value
1593 
1594  if self.filtersApply('', self.wrapper, batchItem.siteId,
1595  fields, Filters.OC_SQLE, Filters.STAGE_BEFORE_PROCESSOR, True):
1596  self.logger.debug("SQLExpression filter for zero site checked - SUCCESS")
1597  else:
1598  self.logger.debug("SQLExpression filter for zero site checked - Fail")
1599  ret["errorMask"] |= APP_CONSTS.ERROR_PROCESSOR_FILTERS_BREAK
1600  ret["batchItem"] = None
1601  return ret
1602  # (1) END
1603 
1604  if len(batchItem.properties.keys()) == 0:
1605  self.logger.debug('>>> property len(batchItem.properties.keys()) == 0')
1606  # self.logger.debug('>>> loadSiteProperties ret["site"].properties: ' + varDump(ret['site'].properties))
1607 
1608  for localProperty in ret["site"].properties:
1609  batchItem.properties[localProperty["name"]] = copy.deepcopy(localProperty["value"])
1610  # self.logger.debug('>>> copy property ' + str(localProperty["name"]) + ' = ' \
1611  # + varDump(localProperty["value"]))
1612 
1613  self.loadSiteProperties(ret["site"], ret["url"], batchItem, ret) # #
1614 
1615  if "urlNormalizeMaskProcessor" in ret:
1616  self.normMask = int(ret["urlNormalizeMaskProcessor"])
1617 
1618  if "processCTypes" in ret and ret["url"].contentType not in ret["processCTypes"]:
1619  self.logger.debug('>>>> ret["url"].contentType = ' + str(ret["url"].contentType))
1620  self.logger.debug('>>>> ret["processCTypes"] = ' + str(ret["processCTypes"]))
1621 
1622  isOkContentType = False
1623  try:
1624  if "contentTypeMap" in ret:
1625  contentTypeMap = json.loads(ret["contentTypeMap"])
1626  if ret["processCTypes"] in contentTypeMap:
1627  self.logger.debug('>>>> Found in ret["contentTypeMap"] = ' + str(contentTypeMap))
1628 
1629  if ret["processCTypes"] in contentTypeMap:
1630  if ret["url"].contentType == contentTypeMap[ret["processCTypes"]]:
1631  self.logger.debug('>>>> Good!!!')
1632  isOkContentType = True
1633 
1634  except Exception, err:
1635  self.logger.debug("Fail loads of 'CONTENT_TYPE_MAP': " + str(err))
1636 
1637  if not isOkContentType:
1638  ret["errorMask"] |= APP_CONSTS.ERROR_MASK_SITE_UNSUPPORTED_CONTENT_TYPE
1639  ret["batchItem"] = batchItem
1640  self.logger.error("url ContentType not matched! url.contentType: '%s', site_properties.PROCESS_CTYPES: '%s'",
1641  str(ret["url"].contentType), ret["processCTypes"])
1642  if self.input_batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER and \
1643  (self.input_batch.dbMode & dc_event.Batch.DB_MODE_W > 0):
1644  self.updateURL(batchItem)
1645 
1646  self.resolveProcessorNameByContentType(ret["url"].contentType, ret)
1647  # Check if content stored on disk
1648  if batchItem.urlObj.contentMask != dc_event.URL.CONTENT_STORED_ON_DISK:
1649  self.logger.debug(">>> Content not found on disk. Exit.")
1650  elif batchItem.urlObj.httpCode != 200:
1651  self.logger.debug(">>> HTTP Code != 200. Code == " + str(batchItem.urlObj.httpCode) + ". Exit")
1652  if batchItem.urlObj.contentMask != dc_event.URL.CONTENT_STORED_ON_DISK or batchItem.urlObj.httpCode != 200:
1653  self.updateURL(batchItem)
1654  ret["batchItem"] = None
1655  self.logger.debug("Exit. batchItem.urlObj.contentMask = " + str(batchItem.urlObj.contentMask) + \
1656  " batchItem.urlObj.httpCode = " + str(batchItem.urlObj.httpCode))
1657  return ret
1658  self.updateURL(batchItem)
1659 
1660 
1661  # (2) Apply filter before processor to 'URL.url'
1662  self.logger.debug("Check filter to 'url' use regular expression ...")
1663  if self.filtersApply(batchItem.urlObj.url, self.wrapper, batchItem.siteId,
1664  None, Filters.OC_RE, Filters.STAGE_BEFORE_PROCESSOR, True):
1665  self.logger.debug("Filter to 'url' use regular expression checked - SUCCESS")
1666  else:
1667  self.logger.debug("Filter to 'url' use regular expression checked - Fail")
1668  ret["errorMask"] |= APP_CONSTS.ERROR_PROCESSOR_FILTERS_BREAK
1669  ret["batchItem"] = None
1670  return ret
1671  # (2) END
1672 
1673  # Get raw content
1674  ret["rawContent"] = self.getRawContentFromFS(batchItem, ret)
1675 
1676  if ret["rawContent"] is not None:
1677  # (3) Apply filter before processor to 'raw content'
1678  self.logger.debug("Check filter to 'raw content' use regular expression (STAGE_BEFORE_PROCESSOR)...")
1679  if self.filtersApply(ret["rawContent"], self.wrapper, batchItem.siteId,
1680  None, Filters.OC_RE, Filters.STAGE_BEFORE_PROCESSOR, True):
1681  self.logger.debug("Filter to 'raw content' use regular expression checked - SUCCESS")
1682  else:
1683  self.logger.debug("Filter to 'raw content' use regular expression checked - Fail")
1684  ret["errorMask"] |= APP_CONSTS.ERROR_PROCESSOR_FILTERS_BREAK
1685  ret["batchItem"] = None
1686  return ret
1687  # (3) END
1688 
1689  # Parse templates
1690  self.parseTemplate(batchItem, ret)
1691 
1692 # if "HTTP_REDIRECT_LINK" in ret and int(ret["HTTP_REDIRECT_LINK"]) > 0:
1693 # self.logger.debug('>>>>> headerContent batchItem.siteId: ' + str(batchItem.siteId))
1694 # self.logger.debug('>>>>> headerContent batchItem.urlObj.url: ' + str(batchItem.urlObj.url))
1695 # headerContent = self.getHeaderContent(batchItem.siteId, batchItem.urlObj.url)
1696 # urlValue = self.getLocationFromHeaderContent(headerContent)
1697 # self.logger.debug('>>>>> headerContent urlValue: ' + str(urlValue))
1698 # if urlValue is not None:
1699 # pass
1700 # batchItem.urlObj.url = urlValue
1701 # batchItem.urlObj.urlMd5 = hashlib.md5(batchItem.urlObj.url).hexdigest()
1702 # batchItem.urlObj.parentMd5 = batchItem.urlObj.urlMd5
1703 
1704  # (3) Apply filter 'STAGE_AFTER_DOM_PRE' to 'raw content'
1705  self.logger.debug("Check filter to 'raw content' use regular expression ('STAGE_AFTER_DOM_PRE')...")
1706  if self.filtersApply(ret["rawContent"], self.wrapper, batchItem.siteId, \
1707  None, Filters.OC_RE, Filters.STAGE_AFTER_DOM_PRE, True):
1708  self.logger.debug("Filter to 'raw content' use regular expression checked - SUCCESS")
1709  else:
1710  self.logger.debug("Filter to 'raw content' use regular expression checked - Fail")
1711  ret["errorMask"] |= APP_CONSTS.ERROR_PROCESSOR_FILTERS_BREAK
1712  ret["batchItem"] = None
1713  return ret
1714  # (3) END
1715 
1716  self.convertRawContentCharset(ret)
1717  self.processTask(batchItem, ret)
1718  localContentHash = self.processContentHash(ret)
1719  if localContentHash is not None:
1720  ret["contentURLMd5"] = localContentHash
1721 
1722  # self.logger.debug("!!! batchItem.properties: " + varDump(batchItem.properties))
1723  if APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR in batchItem.properties:
1724  self.logger.debug("!!! Found '" + str(APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR) + \
1725  "' in batchItem.properties")
1726 
1727  localSiteUpdate = dc_event.SiteUpdate(batchItem.siteId)
1728  for attr in localSiteUpdate.__dict__:
1729  if hasattr(localSiteUpdate, attr):
1730  setattr(localSiteUpdate, attr, None)
1731 
1732  localSiteUpdate.id = batchItem.siteId
1733  localSiteUpdate.updateType = dc_event.SiteUpdate.UPDATE_TYPE_UPDATE
1734 
1735  # Evaluate 'Site' class values if neccessary
1736  changedFieldsDict = FieldsSQLExpressionEvaluator.execute(batchItem.properties, self.wrapper, ret["site"],
1737  None, self.logger,
1738  APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR)
1739  # Update 'Site' class values
1740  for name, value in changedFieldsDict.items():
1741  if hasattr(localSiteUpdate, name) and value is not None and name not in ['CDate', 'UDate', 'tcDate']:
1742  setattr(localSiteUpdate, name, value)
1743 
1744  localSiteUpdate.errorMask = SQLExpression(("`ErrorMask` | %s" % ret["site"].errorMask))
1745 
1746  updatedCount = self.wrapper.siteNewOrUpdate(siteObject=localSiteUpdate, stype=dc_event.SiteUpdate)
1747  self.logger.debug("!!! Use property '" + str(APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR) + \
1748  "' updated " + str(updatedCount) + " rows.")
1749 
1750  except DatabaseException, err:
1751  ExceptionLog.handler(self.logger, err, MSG_INFO_PROCESS_BATCH_ITEM, (ret))
1752  ret["errorMask"] = ret["errorMask"] | APP_CONSTS.ERROR_DATABASE_ERROR
1753  except ProcessorException, err:
1754  ExceptionLog.handler(self.logger, err, MSG_INFO_PROCESS_BATCH_ITEM, (ret))
1755  ret["errorMask"] = ret["errorMask"] | APP_CONSTS.ERROR_PROCESSOR_BATCH_ITEM_PROCESS
1756  except Exception as err:
1757  ExceptionLog.handler(self.logger, err, MSG_INFO_PROCESS_BATCH_ITEM, (ret))
1758  ret["errorMask"] = ret["errorMask"] | APP_CONSTS.ERROR_PROCESSOR_BATCH_ITEM_PROCESS
1759 
1760  # #self.logger.debug(">>> NEW DICT IS = " + str(ret))
1761  return ret
1762 
1763 
-mask-info
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processBatchItemTemplateFillStep()

def dc_processor.ProcessorTask.ProcessorTask.processBatchItemTemplateFillStep (   self,
  batchItem,
  batchItemDict 
)

Definition at line 1915 of file ProcessorTask.py.

1915  def processBatchItemTemplateFillStep(self, batchItem, batchItemDict): # pylint: disable=W0613
1916  self.logger.debug(">>> 4 step")
1917  try:
1918  if "template" in batchItemDict:
1919  # self.logger.debug(">>> template is = " + str(batchItemDict["template"]))
1920  if "output_format" in batchItemDict["template"]:
1921  self.mapResponse(batchItemDict["template"], batchItemDict["url"].crawlingTime,
1922  batchItemDict["scraperResponse"][0] if "scraperResponse" in batchItemDict and \
1923  len(batchItemDict["scraperResponse"]) > 0 else None,
1924  batchItemDict["processorProperties"])
1925  else:
1926  self.logger.debug(">>> wrong no output_format field for batch template")
1927 
1928  batchItemDict["processedContent"] = self.getProcessedContent(batchItemDict["template"],
1929  batchItemDict["scraperResponse"][0] if \
1930  "scraperResponse" in batchItemDict and \
1931  len(batchItemDict["scraperResponse"]) > 0 \
1932  else None,
1933  batchItemDict["errorMask"])
1934  else:
1935  self.logger.debug(">>> wrong !!! empty batchItemDict[\"template\"]")
1936  except Exception, err:
1937  self.logger.error("Template error: " + str(err) + "\nSiteId: " + str(batchItem.siteId) + \
1938  "\nurl: " + batchItem.urlObj.url + "\nurlMD5: " + str(batchItem.urlObj.urlMd5))
1939  self.logger.debug(Utils.getTracebackInfo())
1940  batchItemDict["errorMask"] = batchItemDict["errorMask"] | APP_CONSTS.ERROR_TEMPLATE_SOURCE
1941 
1942 
-mask-info
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processBatchItemTemplateSelectStep()

def dc_processor.ProcessorTask.ProcessorTask.processBatchItemTemplateSelectStep (   self,
  batchItem,
  batchItemDict 
)

Definition at line 1798 of file ProcessorTask.py.

1798  def processBatchItemTemplateSelectStep(self, batchItem, batchItemDict): # pylint: disable=W0613
1799  self.logger.debug(">>> 2 step")
1800  if "template" in batchItemDict and "templates" in batchItemDict["template"]:
1801  if "scraperResponse" in batchItemDict and len(batchItemDict["scraperResponse"]) > 0:
1802  templateSelectType = \
1803  batchItemDict["template"]["select"] if "select" in batchItemDict["template"] else "first_nonempty"
1804  processingTemplatesDict = []
1805  i = 0
1806  self.logger.debug(">>> Tmpl Len = " + str(len(batchItemDict["template"]["templates"])))
1807  self.logger.debug(">>> Responce Len = " + str(len(batchItemDict["scraperResponse"])))
1808  for localTemplate in batchItemDict["template"]["templates"]:
1809  if "state" in localTemplate and not bool(int(localTemplate["state"])):
1810  self.logger.debug(">>> Template disable")
1811  i += 1
1812  continue
1813  self.templateMetricsCalculate(localTemplate, batchItemDict["scraperResponse"][i])
1814  processingTemplatesDict.append([copy.deepcopy(localTemplate), batchItemDict["scraperResponse"][i]])
1815  i += 1
1816  self.reduceResponse(processingTemplatesDict, templateSelectType, batchItemDict)
1817  else:
1818  self.logger.debug(">>> no scraperResponse or scraperResponse is empty")
1819  else:
1820  self.logger.debug(">>> wrong !!! empty batchItemDict[\"template\"][\"templates\"]")
1821 
1822 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processBatchItemURLContentStep()

def dc_processor.ProcessorTask.ProcessorTask.processBatchItemURLContentStep (   self,
  batchItem,
  batchItemDict 
)

Definition at line 1946 of file ProcessorTask.py.

1946  def processBatchItemURLContentStep(self, batchItem, batchItemDict): # pylint: disable=W0613
1947  self.logger.debug(">>> 5 step")
1948  if "template" in batchItemDict and "templates" not in batchItemDict["template"]:
1949  batchItemDict["template"] = {"templates": batchItemDict["template"]}
1950  if "processedContent" in batchItemDict and batchItemDict["processedContent"] is not None:
1951  self.putContent(batchItem, batchItemDict["processedContent"], batchItemDict)
1952  self.updateProcessedURL(batchItem, batchItemDict)
1953 
1954 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processContentHash()

def dc_processor.ProcessorTask.ProcessorTask.processContentHash (   self,
  batchItemDict 
)

Definition at line 836 of file ProcessorTask.py.

836  def processContentHash(self, batchItemDict):
837  ret = None
838  try:
839  if "contentHash" in batchItemDict and batchItemDict["contentHash"] is not None \
840  and "scraperResponse" in batchItemDict and len(batchItemDict["scraperResponse"]) > 0:
841  self.logger.debug(">>> Site has content hash rule: %s", str(batchItemDict["contentHash"]))
842  localContentHash = json.loads(batchItemDict["contentHash"])
843  # self.list_hashed_tags = self.contentHash["tags"].split(",")
844  listHashedTags = localContentHash["tags"].split(",")
845  batchItemDict["contentURLMd5Algorithm"] = localContentHash["algorithm"]
846  self.logger.debug(">>> List hashed tags: %s", str(listHashedTags))
847  localBuf = self.stickHashedContents(listHashedTags, batchItemDict["scraperResponse"][0])
848  ret = ContentHashCalculator.hashCalculate(localBuf, int(batchItemDict["contentURLMd5Algorithm"]))
849  else:
850  self.logger.debug(">>> Site hasn't content hash rule")
851  except ProcessorException as err:
852  ExceptionLog.handler(self.logger, err, MSG_ERROR_CALC_CONTENT_HASH, (), \
853  {ExceptionLog.LEVEL_NAME_ERROR:ExceptionLog.LEVEL_VALUE_DEBUG})
854  return ret
855 
856 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processTask()

def dc_processor.ProcessorTask.ProcessorTask.processTask (   self,
  batchItem,
  batchItemDict,
  withoutProcess = False 
)

Definition at line 1215 of file ProcessorTask.py.

1215  def processTask(self, batchItem, batchItemDict, withoutProcess=False):
1216  try:
1217  # If the processing algorithm is "raw-data" (bgv)
1218  if batchItemDict["processorProperties"] is not None:
1219  processorProperties = json.loads(batchItemDict["processorProperties"])
1220  if "algorithm" in processorProperties:
1221  algorithm = processorProperties["algorithm"]
1222  if "algorithm_name" in algorithm and algorithm["algorithm_name"] == "raw-data":
1223  self.logger.debug("raw-data algorithm defined!")
1224  batchItemDict["processedContent"] = encode(batchItemDict["rawContent"])
1225  self.putContent(batchItem, batchItemDict["processedContent"], batchItemDict)
1226  return
1227  self.extendProcessorProperties(batchItemDict, batchItemDict["site"].properties)
1228  # self.logger.debug("processorProperties = " + str(batchItemDict["processorProperties"]))
1229  if "charset" in batchItemDict:
1230  self.updateURLCharset(batchItem, batchItemDict["charset"])
1231  if "site" in batchItemDict:
1232  self.readFilters(batchItemDict["site"])
1233 
1234  # self.logger.debug("batchItemDict[\"template\"]) before: " + varDump(batchItemDict["template"]))
1235  # Parse templates
1236  # self.parseTemplate(batchItem, batchItemDict)
1237 
1238  # self.logger.debug("batchItemDict[\"template\"]) after: " + varDump(batchItemDict["template"]))
1239 
1240  # Step I: reorder templates order by priority
1241  # self.logger.debug("Template order before: %s" % varDump(self.template))
1242  checkPriority = True
1243  for elem in batchItemDict["template"]["templates"]:
1244  if "priority" not in elem:
1245  checkPriority = False
1246  break
1247  if checkPriority:
1248  batchItemDict["template"]["templates"] = sorted(batchItemDict["template"]["templates"],
1249  key=lambda template: template["priority"], reverse=True)
1250  # self.logger.debug("Template order after: %s" % varDump(self.template))
1251 
1252  # Step II: process each template
1253  # processingTemplatesDict = []
1254  # templateSelectType = self.template["select"] if "select" in self.template else "first_nonempty"
1255  if "scraperResponse" not in batchItemDict:
1256  if batchItem.urlContentResponse is not None and batchItem.urlContentResponse.processedContents is not None:
1257  batchItemDict["scraperResponse"] = batchItem.urlContentResponse.processedContents
1258  else:
1259  batchItemDict["scraperResponse"] = []
1260 
1261  for self.localTemplate in batchItemDict["template"]["templates"]:
1262  # self.logger.debug(">>> Processing template %s" % varDump(self.localTemplate))
1263  if "state" in self.localTemplate and not bool(int(self.localTemplate["state"])):
1264  self.logger.debug(">>> Template disable")
1265  batchItemDict["scraperResponse"].append(None)
1266  continue
1267  # Anything can happen but
1268  # we don't bother what happeh - just go further
1269  if not withoutProcess:
1270  try:
1271  scraperInputObject = ScraperInData(batchItemDict["url"].url, batchItem.urlId, batchItem.siteId,
1272  batchItemDict["rawContent"], self.localTemplate["tags"],
1273  self.filters, batchItemDict["url"].lastModified,
1274  batchItemDict["timezone"], self.input_batch.id,
1275  self.input_batch.dbMode, batchItem,
1276  batchItemDict["processorProperties"],
1277  self.localTemplate["output_format"] if "output_format" in \
1278  self.localTemplate else None)
1279  result = self.process(scraperInputObject, batchItem, batchItemDict)
1280  batchItemDict["scraperResponse"].append(result.scraperResponse)
1281  except Exception() as err:
1282  self.logger.error("Some error in template processing : " + str(err))
1283  batchItemDict["scraperResponse"].append(None)
1284  self.localTemplate = None
1285  else:
1286  self.logger.debug("'scraperResponse' found in batchItemDict!!!")
1287  except ProcessorException as err:
1288  ExceptionLog.handler(self.logger, err, MSG_ERROR_PROCESS_TASK)
1289  batchItemDict["errorMask"] |= APP_CONSTS.ERROR_MASK_SCRAPER_ERROR
1290  raise err
1291  except Exception as err:
1292  ExceptionLog.handler(self.logger, err, MSG_ERROR_PROCESS_TASK, (err))
1293  batchItemDict["errorMask"] |= APP_CONSTS.ERROR_MASK_SCRAPER_ERROR
1294 
1295 
-mask-info
Here is the caller graph for this function:

◆ putContent()

def dc_processor.ProcessorTask.ProcessorTask.putContent (   self,
  batchItem,
  processedContent,
  batchItemDict 
)

Definition at line 1298 of file ProcessorTask.py.

1298  def putContent(self, batchItem, processedContent, batchItemDict):
1299 
1300  # add to batchItem processed content for result output batch
1301  batchItem.urlContentResponse = dc_event.URLContentResponse(None, processedContents=[processedContent])
1302 
1303  # URLPut
1304  # Create new URLPut object
1305  putDict = {}
1306  putDict["id"] = batchItem.urlId
1307  putDict["data"] = processedContent
1308  putDict["cDate"] = SQLExpression("NOW()") # # datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
1309  urlPut = dc_event.URLPut(batchItem.siteId, batchItem.urlId, dc_event.Content.CONTENT_PROCESSOR_CONTENT, putDict)
1310  # Check if Real-Time crawling
1311  if self.input_batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER and \
1312  (self.input_batch.dbMode & dc_event.Batch.DB_MODE_W == 0) and batchItem.urlPutObj is None:
1313 
1314  if "TEMPLATE_SOURCE" in batchItemDict or self.accumulateProcessing:
1315  putDict["properties"] = copy.deepcopy(batchItem.properties)
1316  if "template" in batchItemDict:
1317  if isinstance(batchItemDict["template"]["templates"], types.DictType):
1318  putDict["properties"]["template"]["templates"] = [copy.deepcopy(batchItemDict["template"]["templates"])]
1319  elif isinstance(batchItemDict["template"]["templates"], types.ListType) and \
1320  len(batchItemDict["template"]["templates"]) > 0:
1321  putDict["properties"]["template"]["templates"] = [copy.deepcopy(batchItemDict["template"]["templates"][0])]
1322  if len(putDict["properties"]["template"]["templates"]) > 0:
1323  putDict["properties"]["template"]["templates"][0]["response"] = []
1324 
1325  batchItem.urlPutObj = urlPut
1326 
1327  else:
1328  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_PUT, [urlPut])
1329  try:
1330  responseDRCESyncTasksCover = self.wrapper.process(drceSyncTasksCoverObj)
1331  if responseDRCESyncTasksCover.eventType == DC_CONSTS.EVENT_TYPES.URL_PUT_RESPONSE:
1332  for obj in responseDRCESyncTasksCover.eventObject:
1333  self.logger.debug("URL_PUT_RESPONSE: %s", varDump(obj))
1334  else:
1335  self.logger.error("URL_PUT_RESPONSE >>> Wrong response type")
1336  except DatabaseException, err:
1337  self.logger.error('PutContent error: ' + str(err))
1338  batchItemDict["errorMask"] |= APP_CONSTS.ERROR_DATABASE_ERROR
1339 
1340 
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
-mask-info
Here is the call graph for this function:
Here is the caller graph for this function:

◆ putRawContentOfType()

def dc_processor.ProcessorTask.ProcessorTask.putRawContentOfType (   self,
  batchItems,
  rawContentData,
  contentRequestType 
)

Definition at line 2320 of file ProcessorTask.py.

2320  def putRawContentOfType(self, batchItems, rawContentData, contentRequestType):
2321 
2322  rawContent = ''
2323  urlPutList = []
2324 
2325  if rawContentData is not None:
2326  rawContent = rawContentData
2327 
2328  # put raw content to DB
2329  for batchItem in batchItems:
2330 
2331  putDict = {}
2332  putDict["id"] = batchItem.urlObj.urlMd5
2333  putDict["data"] = rawContent
2334  putDict["cDate"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
2335  urlPutList.append(dc_event.URLPut(batchItem.siteId,
2336  batchItem.urlObj.urlMd5,
2337  contentRequestType,
2338  putDict))
2339 
2340  self.wrapper.putURLContent(urlPutList)
2341 
2342 
Here is the caller graph for this function:

◆ putRawContentsMultiItems()

def dc_processor.ProcessorTask.ProcessorTask.putRawContentsMultiItems (   self,
  siteId,
  url,
  batchItems 
)

Definition at line 2349 of file ProcessorTask.py.

2349  def putRawContentsMultiItems(self, siteId, url, batchItems):
2350 
2351  self.logger.debug(">>> putRawContentsMultiItems enter...")
2352 
2353  rawContent, dynamicContent, headerContent, requestsContent = self.getRawContent(siteId, url)
2354 
2355  self.putRawContentOfType(batchItems, rawContent, dc_event.Content.CONTENT_RAW_CONTENT)
2356  self.putRawContentOfType(batchItems, dynamicContent, dc_event.Content.CONTENT_DYNAMIC_CONTENT)
2357  self.putRawContentOfType(batchItems, headerContent, dc_event.Content.CONTENT_HEADERS_CONTENT)
2358  self.putRawContentOfType(batchItems, requestsContent, dc_event.Content.CONTENT_REQUESTS_CONTENT)
2359 
2360 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ putUrlsMultiItems()

def dc_processor.ProcessorTask.ProcessorTask.putUrlsMultiItems (   self,
  batchItems 
)

Definition at line 2239 of file ProcessorTask.py.

2239  def putUrlsMultiItems(self, batchItems):
2240  # result variable
2241  params = []
2242 
2243  for batchItem in batchItems:
2244  params.append(batchItem.urlObj)
2245  self.logger.debug(">>> putUrlsMultiItems url: " + str(batchItem.urlObj.url))
2246 
2247  self.logger.debug(">>> putUrlsMultiItems params: " + varDump(params))
2248 
2249  result = self.wrapper.urlNew(params)
2250  self.logger.debug(">>> putUrlsMultiItems result: " + str(result))
2251  self.logger.debug(">>> bool(result): " + str(bool(result)))
2252 
2253  return bool(result)
2254 
2255 
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:
Here is the caller graph for this function:

◆ readFilters()

def dc_processor.ProcessorTask.ProcessorTask.readFilters (   self,
  site 
)

Definition at line 740 of file ProcessorTask.py.

740  def readFilters(self, site):
741  self.filters = site.filters
742  self.logger.debug("sites_filters: " + varDump(self.filters))
743 
744 
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:
Here is the caller graph for this function:

◆ readScraperOutputData()

def dc_processor.ProcessorTask.ProcessorTask.readScraperOutputData (   self,
  batchItem,
  scraperOutputData,
  siteObj 
)

Definition at line 493 of file ProcessorTask.py.

493  def readScraperOutputData(self, batchItem, scraperOutputData, siteObj):
494  # variables for result
495  scraperResponse = None
496  accumulatedBatchItems = []
497 
498  if isinstance(scraperOutputData, list):
499  if len(scraperOutputData) > 0:
500  scraperResponse = scraperOutputData[0]
501  else:
502  pass
503 
504  if len(scraperOutputData) > 1:
505  for scraperOutputItem in scraperOutputData[1:]:
506 
507  localBatchItem = copy.deepcopy(batchItem)
508  localBatchItem.urlContentResponse = dc_event.URLContentResponse(None, None, [scraperOutputItem])
509 
510  localBatchItem.urlObj.url = self.createUniqueMultiItemsUrl(localBatchItem.urlObj.url, \
511  len(accumulatedBatchItems) + 1)
512 
513  localBatchItem.urlObj.urlMd5 = hashlib.md5(localBatchItem.urlObj.url).hexdigest()
514 
515  localBatchItem.urlObj.parentMd5 = batchItem.urlObj.urlMd5
516  localBatchItem.urlObj.status = dc_event.URL.STATUS_PROCESSED
517  localBatchItem.urlObj.crawled = 1
518  localBatchItem.urlObj.processed += 1
519  localBatchItem.urlObj.type = dc_event.URL.TYPE_SINGLE
520  localBatchItem.urlObj.errorMask = scraperOutputItem.errorMask
521  localBatchItem.urlObj.CDate = localBatchItem.urlObj.UDate = localBatchItem.urlObj.tcDate = \
522  datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
523 
524  localBatchItem.urlId = localBatchItem.urlObj.urlMd5
525 
526  self.logger.debug('self.localTemplate: ' + varDump(self.localTemplate))
527  self.logger.debug('localBatchItem.properties["template"]: ' + varDump(localBatchItem.properties["template"]))
528 
529  if self.localTemplate is not None and localBatchItem.properties is not None\
530  and "template" in localBatchItem.properties and "templates" in localBatchItem.properties["template"]:
531  localBatchItem.properties["template"] = {}
532  localBatchItem.properties["template"]["templates"] = []
533  localBatchItem.properties["template"]["templates"].append(self.localTemplate)
534  elif self.localTemplate is not None and "template" in localBatchItem.properties:
535  self.logger.debug('localBatchItem.properties["template"]: ' + \
536  varDump(localBatchItem.properties["template"]))
537 
538  accumulatedBatchItems.append(localBatchItem)
539  # Check allowed site limits
540  if not self.isAllowedSiteLimits(siteObj, accumulatedBatchItems):
541  self.logger.debug("!!! Not allowed site limits. len(accumulatedBatchItems) = " + \
542  str(len(accumulatedBatchItems)))
543  break
544 
545  self.logger.debug('---> 1 ----')
546 
547  else:
548  scraperResponse = scraperOutputData
549  self.logger.debug('---> 2 ----')
550 
551  # self.logger.debug("scraperResponse: %s", varDump(scraperResponse))
552 
553  return scraperResponse, accumulatedBatchItems
554 
555 
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:
Here is the caller graph for this function:

◆ readSiteFromDB()

def dc_processor.ProcessorTask.ProcessorTask.readSiteFromDB (   self,
  batchItem 
)

Definition at line 1388 of file ProcessorTask.py.

1388  def readSiteFromDB(self, batchItem):
1389  ret = None
1390  try:
1391  # Add support db-task Site
1392  siteStatus = dc_event.SiteStatus(batchItem.siteId)
1393  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.SITE_STATUS, siteStatus)
1394  responseDRCESyncTasksCover = self.wrapper.process(drceSyncTasksCoverObj)
1395  site = responseDRCESyncTasksCover.eventObject
1396  ret = site
1397 
1398  # self.logger.debug('>>> readSiteFromDB site: ' + varDump(site))
1399  except DatabaseException, err:
1400  ExceptionLog.handler(self.logger, err, MSG_ERROR_READ_SITE_FROM_DB)
1401  raise err
1402  except Exception, err:
1403  ExceptionLog.handler(self.logger, err, MSG_ERROR_READ_SITE_FROM_DB)
1404  raise err
1405 
1406  return ret
1407 
1408 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ reduceResponse()

def dc_processor.ProcessorTask.ProcessorTask.reduceResponse (   self,
  processingTamplatesDict,
  templateSelectType,
  batchItemDict 
)

Definition at line 1186 of file ProcessorTask.py.

1186  def reduceResponse(self, processingTamplatesDict, templateSelectType, batchItemDict):
1187  maxVal = -1
1188  ret = None
1189  templeteElem = None
1190  for elem in processingTamplatesDict:
1191  templeteElem = elem[0]
1192  if "mandatory" in templeteElem and templeteElem["mandatory"] == 1 and "isEmpty" in templeteElem and \
1193  templeteElem["isEmpty"]:
1194  batchItemDict["errorMask"] |= APP_CONSTS.ERROR_MANDATORY_TEMPLATE
1195  ret = elem
1196  break
1197  if "contentsCount" in templeteElem and "contentsLen" in templeteElem:
1198  if templateSelectType == "first_nonempty":
1199  if templeteElem["contentsCount"] > maxVal:
1200  maxVal = templeteElem["contentsCount"]
1201  ret = elem
1202  elif templeteElem["contentsLen"] > maxVal:
1203  maxVal = templeteElem["contentsLen"]
1204  ret = elem
1205  else:
1206  ret = elem
1207  if ret is not None:
1208  batchItemDict["template"] = ret[0]
1209  batchItemDict["scraperResponse"] = []
1210  batchItemDict["scraperResponse"].append(ret[1])
1211 
1212 
Here is the caller graph for this function:

◆ removeTemplateElementsByCondition()

def dc_processor.ProcessorTask.ProcessorTask.removeTemplateElementsByCondition (   self,
  template,
  batchItemDict 
)

Definition at line 964 of file ProcessorTask.py.

964  def removeTemplateElementsByCondition(self, template, batchItemDict):
965  ret = template
966  if "templates" in ret:
967  newTemplateList = []
968  for templateElement in ret["templates"]:
969  isAdded = True
970  if "condition" in templateElement:
971  isAdded = False
972  if templateElement["condition"]["type"] == CONSTS.TEMPLATE_CONDITION_TYPE_URL:
973  if "url" in batchItemDict:
974  compareObj = batchItemDict["url"]
975  if hasattr(compareObj, templateElement["condition"]["field"]):
976  fieldValue = getattr(compareObj, templateElement["condition"]["field"], None)
977  if fieldValue is not None:
978  try:
979  if re.compile(templateElement["condition"]["pattern"]).match(str(fieldValue)) is not None:
980  isAdded = True
981  except Exception as excp:
982  self.logger.debug(">>> Some wrong with RE. in ret condition; err = " + str(excp))
983  if isAdded:
984  newTemplateList.append(templateElement)
985  ret["templates"] = newTemplateList
986  return ret
987 
988 
Here is the caller graph for this function:

◆ resolveProcessorNameByContentType()

def dc_processor.ProcessorTask.ProcessorTask.resolveProcessorNameByContentType (   self,
  urlContentType,
  batchItemDict 
)

Definition at line 1767 of file ProcessorTask.py.

1767  def resolveProcessorNameByContentType(self, urlContentType, batchItemDict):
1768  if "PROCESSOR_NAME_REPLACE" in batchItemDict:
1769  self.logger.debug(">>> PROCESSOR_NAME_REPLACE is; " + batchItemDict["PROCESSOR_NAME_REPLACE"])
1770  try:
1771  localJson = json.loads(batchItemDict["PROCESSOR_NAME_REPLACE"])
1772  if isinstance(localJson, dict):
1773  for elem in localJson:
1774  if isinstance(localJson[elem], types.ListType) and urlContentType in localJson[elem]:
1775  batchItemDict["processorName"] = elem
1776  self.logger.debug("Resolved processor name: " + str(elem))
1777  break
1778  except Exception as excp:
1779  self.logger.debug(">>> PROCESSOR_NAME_REPLACE bad json;" + str(excp))
1780 
1781 
Here is the caller graph for this function:

◆ resortProcessedContentsByMetrics()

def dc_processor.ProcessorTask.ProcessorTask.resortProcessedContentsByMetrics (   self,
  batchItemDict,
  sortedMetric 
)

Definition at line 1785 of file ProcessorTask.py.

1785  def resortProcessedContentsByMetrics(self, batchItemDict, sortedMetric):
1786  if "scraperResponse" in batchItemDict:
1787  for scraperResponse in batchItemDict["scraperResponse"]:
1788  if scraperResponse is not None:
1789  sortedContents = Metrics.sortElementsByMetric(scraperResponse.processedContent["internal"], sortedMetric)
1790  scraperResponse.processedContent["internal"] = sortedContents
1791  if len(sortedContents[0]) > 0:
1792  scraperResponse.processedContent["default"] = sortedContents[0]
1793 
1794 
Here is the caller graph for this function:

◆ run()

def dc_processor.ProcessorTask.ProcessorTask.run (   self)

Definition at line 184 of file ProcessorTask.py.

184  def run(self):
185  # call base class run method
186  foundation.CementApp.run(self)
187 
188  # config section
189  self.loadConfig()
190 
191  # load scraper
192  # self.loadScraper()
193 
194  # load logger config file
195  self.loadLogConfigFile()
196 
197  # load mandatory options
198  self.loadOptions()
199 
200  # make processing
201  self.processBatch()
202 
203  # Finish logging
204  self.logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
205 
206 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ setDefaultInternalForChainContents()

def dc_processor.ProcessorTask.ProcessorTask.setDefaultInternalForChainContents (   self,
  chainDict 
)

Definition at line 1876 of file ProcessorTask.py.

1876  def setDefaultInternalForChainContents(self, chainDict):
1877  self.logger.debug(">>> 3.1 reformatted Chain internal")
1878  for chainElem in chainDict.values():
1879  scraperResponseDest = chainElem["batchItemDict"]["scraperResponse"][0]
1880  if scraperResponseDest.processedContent["default"] is not None and \
1881  len(scraperResponseDest.processedContent["internal"]) > 0:
1882  scraperResponseDest.processedContent["internal"] = [scraperResponseDest.processedContent["default"]]
1883 
1884 
Here is the caller graph for this function:

◆ setup()

def dc_processor.ProcessorTask.ProcessorTask.setup (   self)

Definition at line 177 of file ProcessorTask.py.

177  def setup(self):
178  # call base class setup method
179  foundation.CementApp.setup(self)
180 
181 
Here is the caller graph for this function:

◆ signalHandlerTimer()

def dc_processor.ProcessorTask.ProcessorTask.signalHandlerTimer (   self,
  signum,
  frame 
)

Definition at line 2365 of file ProcessorTask.py.

2365  def signalHandlerTimer(self, signum, frame):
2366  del frame
2367  self.maxExecutionTimeReached = True
2368  self.logger.debug("Signal %s - timer trapped!", str(signum))
2369 
Here is the caller graph for this function:

◆ stickHashedContents()

def dc_processor.ProcessorTask.ProcessorTask.stickHashedContents (   self,
  listHashedTags,
  scraperResponse 
)

Definition at line 861 of file ProcessorTask.py.

861  def stickHashedContents(self, listHashedTags, scraperResponse):
862  ret = ""
863  tagsList = []
864  if len(scraperResponse.processedContent["default"].data["data"]["tagList"]) > 0:
865  tagsList = scraperResponse.processedContent["default"].data["data"]["tagList"][0]
866  for tag in tagsList:
867  if tag["name"] in listHashedTags:
868  self.logger.debug(">>> Tag name added to hash: %s", str(tag["name"]))
869  # self.logger.debug(">>> Tag value added to hash: %s", str(tag["data"]))
870  if isinstance(tag["data"], basestring):
871  ret += tag["data"]
872  ret += ' '
873  elif isinstance(tag["data"], list):
874  for elem in tag["data"]:
875  ret += str(elem)
876  ret += ' '
877  else:
878  self.logger.debug(">>> Tag: %s not added to hash", tag["name"])
879  return ret
880 
881 
Here is the caller graph for this function:

◆ templateMetricsCalculate()

def dc_processor.ProcessorTask.ProcessorTask.templateMetricsCalculate (   self,
  template,
  scraperResponse 
)

Definition at line 1511 of file ProcessorTask.py.

1511  def templateMetricsCalculate(self, template, scraperResponse):
1512  template["contentsCount"] = 0
1513  template["contentsLen"] = 0
1514  template["isEmpty"] = True
1515  for data in scraperResponse.processedContent["default"].data["data"]["tagList"][0]:
1516  saveLen = template["contentsLen"]
1517  for item in data["data"]:
1518  if item is not None:
1519  template["contentsLen"] += len(item)
1520  if template["contentsLen"] > saveLen:
1521  template["contentsCount"] += 1
1522  template["isEmpty"] = False
1523 
1524 
Here is the caller graph for this function:

◆ updateProcessedURL()

def dc_processor.ProcessorTask.ProcessorTask.updateProcessedURL (   self,
  batchItem,
  batchItemDict 
)

Definition at line 748 of file ProcessorTask.py.

748  def updateProcessedURL(self, batchItem, batchItemDict):
749  # check url state
750  state = dc_event.URL.STATE_ENABLED
751  processedTime = int((time.time() - batchItemDict["processedTime"]) * 1000)
752  if "contentURLMd5" not in batchItemDict:
753  batchItemDict["contentURLMd5"] = ""
754 
755  if "scraperResponse" in batchItemDict and batchItemDict["scraperResponse"] is not None and \
756  len(batchItemDict["scraperResponse"]) > 0 and batchItemDict["scraperResponse"][0] is not None:
757  tagsMask = batchItemDict["scraperResponse"][0].tagsMask or 0
758  tagsCount = batchItemDict["scraperResponse"][0].tagsCount or 0
759  pubdate = batchItemDict["scraperResponse"][0].pubdate or None
760  # Validate pubdate value from scraper response and return as None if value is incorrect
761  self.logger.debug('pubdate from scraper response: ' + str(pubdate))
762  if pubdate is not None and isinstance(pubdate, basestring):
763  pubdate = DateTimeType.parse(pubdate)
764  if pubdate is not None:
765  pubdate = pubdate.strftime("%Y-%m-%d %H:%M:%S")
766  else:
767  tagsMask = 0
768  tagsCount = 0
769  pubdate = None
770 
771  self.logger.debug('>>> updateProcessedURL pubdate = ' + str(pubdate) + ' type: ' + str(type(pubdate)))
772 
773  try:
774  urlUpdateList = []
775  batchItem.urlObj.status = dc_event.URL.STATUS_PROCESSED
776  batchItem.urlObj.state = state
777  batchItem.urlObj.errorMask = batchItemDict["errorMask"]
778  batchItem.urlObj.tagsMask = tagsMask
779  batchItem.urlObj.tagsCount = tagsCount
780  batchItem.urlObj.totalTime = SQLExpression(("`TotalTime` + %s" % str(processedTime)))
781  batchItem.urlObj.processingTime = processedTime
782  batchItem.urlObj.pDate = pubdate
783  batchItem.urlObj.UDate = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
784  batchItem.urlObj.tcDate = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
785  batchItem.urlObj.contentURLMd5 = batchItemDict["contentURLMd5"]
786  batchItem.urlObj.processed += 1
787 
788  urlUpdateObj = dc_event.URLUpdate(batchItem.siteId, batchItem.urlId, dc_event.URLStatus.URL_TYPE_MD5, \
789  normalizeMask=self.normMask)
790  urlUpdateObj.status = batchItem.urlObj.status
791  urlUpdateObj.state = batchItem.urlObj.state
792  urlUpdateObj.errorMask = SQLExpression("`ErrorMask` + %s" % str(batchItem.urlObj.errorMask))
793  urlUpdateObj.tagsMask = batchItem.urlObj.tagsMask
794  urlUpdateObj.tagsCount = batchItem.urlObj.tagsCount
795  urlUpdateObj.totalTime = SQLExpression(("`TotalTime` + %s" % str(batchItem.urlObj.processingTime)))
796  urlUpdateObj.processingTime = batchItem.urlObj.processingTime
797  urlUpdateObj.pDate = batchItem.urlObj.pDate
798  urlUpdateObj.UDate = SQLExpression("NOW()")
799  urlUpdateObj.tcDate = SQLExpression("NOW()")
800  urlUpdateObj.contentURLMd5 = batchItem.urlObj.contentURLMd5
801  urlUpdateObj.processed = SQLExpression("`Processed`+1")
802  urlUpdateObj.crawled = batchItem.urlObj.crawled
803 
804  urlUpdateList.append(urlUpdateObj)
805  self.logger.debug('>>>> urlUpdateList: ' + varDump(urlUpdateList))
806 
807  if ("processorName" in batchItemDict) and (batchItemDict["processorName"] == CONSTS.PROCESSOR_FEED_PARSER) and \
808  (dc_event.BatchItem.PROP_FEED in batchItem.properties):
809  for url, value in batchItem.properties[dc_event.BatchItem.PROP_FEED].items():
810  del url
811  urlUpdateObjLocal = copy.deepcopy(urlUpdateObj)
812  urlUpdateObjLocal.urlMd5 = value["urlMd5"]
813  urlUpdateObjLocal.errorMask = batchItemDict["errorMask"]
814  urlUpdateList.append(urlUpdateObjLocal)
815 
816  # Evaluate 'URL' class values use sql expression if neccessary
817  changedFieldsDict = FieldsSQLExpressionEvaluator.execute(batchItem.properties, self.wrapper, None,
818  batchItem.urlObj, self.logger,
819  APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_PROCESSOR)
820  # Update URL values
821  if isinstance(changedFieldsDict, dict):
822  for name, value in changedFieldsDict.items():
823  if hasattr(urlUpdateObj, name):
824  setattr(urlUpdateObj, name, value)
825 
826  result = self.wrapper.urlUpdate(urlUpdateList)
827  del result
828  except Exception as err:
829  ExceptionLog.handler(self.logger, err, MSG_ERROR_UPDATE_PROCESSED_URL)
830  raise err
831 
832 
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:
Here is the caller graph for this function:

◆ updateURL()

def dc_processor.ProcessorTask.ProcessorTask.updateURL (   self,
  batchItem,
  errorMask = None 
)

Definition at line 1343 of file ProcessorTask.py.

1343  def updateURL(self, batchItem, errorMask=None):
1344  try:
1345  # updated by Oleksii
1346  # if insert into kv db wasn't successfully
1347  # not update Processed counter
1348  urlUpdateObj = dc_event.URLUpdate(batchItem.siteId, batchItem.urlId, dc_event.URLStatus.URL_TYPE_MD5, \
1349  normalizeMask=self.normMask)
1350  urlUpdateObj.tcDate = SQLExpression("NOW()")
1351  urlUpdateObj.status = dc_event.URL.STATUS_PROCESSING
1352  urlUpdateObj.batchId = self.input_batch.id
1353  if errorMask is not None:
1354  urlUpdateObj.errorMask = errorMask
1355  self.wrapper.urlUpdate(urlUpdateObj)
1356  except Exception as err:
1357  ExceptionLog.handler(self.logger, err, MSG_ERROR_LOAD_SITE_DATA)
1358  raise err
1359 
1360 
def updateURL(input_url, site)
Here is the caller graph for this function:

◆ updateURLCharset()

def dc_processor.ProcessorTask.ProcessorTask.updateURLCharset (   self,
  batchItem,
  charset 
)

Definition at line 726 of file ProcessorTask.py.

726  def updateURLCharset(self, batchItem, charset):
727  try:
728  urlUpdateObj = dc_event.URLUpdate(batchItem.siteId, batchItem.urlId, dc_event.URLStatus.URL_TYPE_MD5, \
729  normalizeMask=self.normMask)
730  # urlUpdateObj.UDate = SQLExpression("NOW()")
731  urlUpdateObj.charset = charset
732  self.wrapper.urlUpdate(urlUpdateObj)
733  except Exception as err:
734  ExceptionLog.handler(self.logger, err, MSG_ERROR_UPDATE_URL_CHARSET)
735  raise err
736 
737 
Here is the caller graph for this function:

Member Data Documentation

◆ accumulatedBatchItems

dc_processor.ProcessorTask.ProcessorTask.accumulatedBatchItems

Definition at line 163 of file ProcessorTask.py.

◆ accumulateProcessing

dc_processor.ProcessorTask.ProcessorTask.accumulateProcessing

Definition at line 166 of file ProcessorTask.py.

◆ algorithmsClass

dc_processor.ProcessorTask.ProcessorTask.algorithmsClass

Definition at line 162 of file ProcessorTask.py.

◆ algorithmsModel

dc_processor.ProcessorTask.ProcessorTask.algorithmsModel

Definition at line 160 of file ProcessorTask.py.

◆ algorithmsModule

dc_processor.ProcessorTask.ProcessorTask.algorithmsModule

Definition at line 161 of file ProcessorTask.py.

◆ batchItem

dc_processor.ProcessorTask.ProcessorTask.batchItem

Definition at line 147 of file ProcessorTask.py.

◆ batchSites

dc_processor.ProcessorTask.ProcessorTask.batchSites

Definition at line 152 of file ProcessorTask.py.

◆ config

dc_processor.ProcessorTask.ProcessorTask.config

Definition at line 2134 of file ProcessorTask.py.

◆ db_task_ini

dc_processor.ProcessorTask.ProcessorTask.db_task_ini

Definition at line 155 of file ProcessorTask.py.

◆ DBConnector

dc_processor.ProcessorTask.ProcessorTask.DBConnector

Definition at line 144 of file ProcessorTask.py.

◆ exit_code

dc_processor.ProcessorTask.ProcessorTask.exit_code

Definition at line 140 of file ProcessorTask.py.

◆ filters

dc_processor.ProcessorTask.ProcessorTask.filters

Definition at line 150 of file ProcessorTask.py.

◆ groupResponses

dc_processor.ProcessorTask.ProcessorTask.groupResponses

Definition at line 164 of file ProcessorTask.py.

◆ hashed_content

dc_processor.ProcessorTask.ProcessorTask.hashed_content

Definition at line 159 of file ProcessorTask.py.

◆ htmlRecover

dc_processor.ProcessorTask.ProcessorTask.htmlRecover

Definition at line 157 of file ProcessorTask.py.

◆ input_batch

dc_processor.ProcessorTask.ProcessorTask.input_batch

Definition at line 154 of file ProcessorTask.py.

◆ localTemplate

dc_processor.ProcessorTask.ProcessorTask.localTemplate

Definition at line 167 of file ProcessorTask.py.

◆ logger

dc_processor.ProcessorTask.ProcessorTask.logger

Definition at line 141 of file ProcessorTask.py.

◆ maxExecutionTimeReached

dc_processor.ProcessorTask.ProcessorTask.maxExecutionTimeReached

Definition at line 170 of file ProcessorTask.py.

◆ maxExecutionTimeValue

dc_processor.ProcessorTask.ProcessorTask.maxExecutionTimeValue

Definition at line 171 of file ProcessorTask.py.

◆ normMask

dc_processor.ProcessorTask.ProcessorTask.normMask

Definition at line 168 of file ProcessorTask.py.

◆ objFilters

dc_processor.ProcessorTask.ProcessorTask.objFilters

Definition at line 158 of file ProcessorTask.py.

◆ process_time

dc_processor.ProcessorTask.ProcessorTask.process_time

Definition at line 146 of file ProcessorTask.py.

◆ processorName

dc_processor.ProcessorTask.ProcessorTask.processorName

Definition at line 151 of file ProcessorTask.py.

◆ raw_content

dc_processor.ProcessorTask.ProcessorTask.raw_content

Definition at line 143 of file ProcessorTask.py.

◆ raw_data_dir

dc_processor.ProcessorTask.ProcessorTask.raw_data_dir

Definition at line 149 of file ProcessorTask.py.

◆ removeUnprocessedItems

dc_processor.ProcessorTask.ProcessorTask.removeUnprocessedItems

Definition at line 172 of file ProcessorTask.py.

◆ scraper_response

dc_processor.ProcessorTask.ProcessorTask.scraper_response

Definition at line 153 of file ProcessorTask.py.

◆ site_table

dc_processor.ProcessorTask.ProcessorTask.site_table

Definition at line 148 of file ProcessorTask.py.

◆ sourceTemplateExtractor

dc_processor.ProcessorTask.ProcessorTask.sourceTemplateExtractor

Definition at line 165 of file ProcessorTask.py.

◆ template

dc_processor.ProcessorTask.ProcessorTask.template

Definition at line 142 of file ProcessorTask.py.

◆ url

dc_processor.ProcessorTask.ProcessorTask.url

Definition at line 145 of file ProcessorTask.py.

◆ wrapper

dc_processor.ProcessorTask.ProcessorTask.wrapper

Definition at line 156 of file ProcessorTask.py.


The documentation for this class was generated from the following file: