HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
CrawlerTask.py
Go to the documentation of this file.
1 # coding: utf-8
2 
3 """
4 HCE project, Python bindings, Distributed Tasks Manager application.
5 Event objects definitions.
6 
7 @package: dc
8 @file CrawlerTask.py
9 @author scorp <developers.hce@gmail.com>, Alexander Vybornyh <alexander.hce.cluster@gmail.com>
10 @copyright: Copyright &copy; 2013-2017 IOIX Ukraine
11 @license: http://hierarchical-cluster-engine.com/license/
12 @since: 0.1
13 """
14 import os.path
15 import sys
16 import hashlib
17 import datetime
18 
19 try:
20  import cPickle as pickle
21 except ImportError:
22  import pickle
23 
24 import time
25 from time import strftime
26 
27 from collections import namedtuple
28 
29 import signal
30 import logging.config
31 import ConfigParser
32 import re
33 import types
34 import copy
35 import json
36 import base64
37 import urlparse
38 import urllib
39 import requests
40 import lxml.etree
41 import tidylib
42 from dateutil.parser import parse
43 from cement.core import foundation
44 from dc_crawler.CollectURLs import CollectURLs
45 from dc_crawler.CollectProperties import CollectProperties
46 from dc_crawler.DetectModified import DetectModified
47 from dc_crawler.DetectModified import NotModifiedException
48 from dc_crawler.Fetcher import BaseFetcher
49 from dc_crawler.FetcherType import FetcherType
50 from dc_crawler.Fetcher import Response
51 from dc_crawler.Fetcher import SeleniumFetcher
52 from dc_crawler.CrawledResource import CrawledResource
53 from dc_crawler.UrlSchema import UrlSchema
54 from dc_crawler.Exceptions import SyncronizeException
55 from dc_crawler.Exceptions import InternalCrawlerException
56 from dc_crawler.Exceptions import CrawlerFilterException
57 from dc_crawler.DBTasksWrapper import DBTasksWrapper
58 from dc_crawler.RobotsParser import RobotsParser
59 from dc_crawler.RefererHeaderResolver import RefererHeaderResolver
60 from dc_crawler.HTTPCookieResolver import HTTPCookieResolver
61 from dc_crawler.RequestsRedirectWrapper import RequestsRedirectWrapper
62 from dc_crawler.ResourceProcess import ResourceProcess
63 from dc_crawler.URLProcess import URLProcess
64 from dc_crawler.DBProxyWrapper import DBProxyWrapper
65 from dc_crawler.HTTPProxyResolver import HTTPProxyResolver
66 import dc_crawler.Constants as CONSTS
67 from app.Utils import varDump
68 from app.Utils import PathMaker
69 from app.Filters import Filters
70 from app.Utils import SQLExpression
71 from app.LFSDataStorage import LFSDataStorage
72 from app.HostRequestStorage import HostRequestStorage
73 from app.DateTimeType import DateTimeType
74 from app.Exceptions import SeleniumFetcherException
75 from app.Exceptions import UrlAvailableException
76 from app.Exceptions import DatabaseException
77 from app.Exceptions import ProxyException
78 from app.Utils import ExceptionLog
79 from app.Utils import UrlNormalizator
80 from app.Utils import strToProxy
81 from app.FieldsSQLExpressionEvaluator import FieldsSQLExpressionEvaluator
82 from app.ContentEvaluator import ContentEvaluator
83 import app.Profiler
84 import app.Consts as APP_CONSTS
85 import app.Utils as Utils # pylint: disable=F0401
86 from app.UrlNormalize import UrlNormalize
87 import dc.Constants as DC_CONSTS
88 import dc.EventObjects as dc_event
89 from dc.EventObjects import Site
90 from dc.EventObjects import Batch
91 import dc_processor.Constants as PCONSTS
92 
93 DB_SITES = "dc_sites"
94 DB_URLS = "dc_urls"
95 
96 MSG_ERROR_LOAD_CONFIG = "Error loading config file. Exciting. "
97 MSG_ERROR_LOAD_OPTIONS = "Error loading options. Exciting. "
98 MSG_ERROR_LOAD_LOG_CONFIG_FILE = "Can't load logging config file. Exiting. "
99 MSG_ERROR_LOAD_SITE_DATA = "Can't load site data: "
100 MSG_ERROR_UPDATE_SITE_DATA = "Can't update site data: "
101 MSG_ERROR_LOAD_URL_DATA = "Can't load url data: "
102 MSG_ERROR_PROCESS_BATCH_ITEM = "Can't process batch item "
103 MSG_ERROR_WRITE_CRAWLED_DATA = "Can't write crawled data "
104 MSG_ERROR_COLLECT_URLS = "Can't collect urls "
105 MSG_ERROR_ADD_URL_TO_BATCH_ITEM = "Can't add url to batch item "
106 MSG_ERROR_LOAD_SITE_PROPERTIES = "Can't load site properties "
107 MSG_ERROR_CRAWL_SITE = "Can't crawl site "
108 MSG_ERROR_CHECK_SITE = "Site don't passed check site "
109 MSG_ERROR_GET_DIR = "Can't get dir "
110 MSG_ERROR_READ_SITE_FROM_DB = "Can't read site data from db"
111 MSG_ERROR_EMPTY_RESPONSE_SIZE = "Empty response"
112 MSG_ERROR_NOT_EXIST_ANY_VALID_PROXY = "Not exist any valid proxy"
113 MSG_ERROR_EMPTY_CONFIG_FILE_NAME = "Config file name is empty."
114 MSG_ERROR_WRONG_CONFIG_FILE_NAME = "Config file name is wrong: %s"
115 MSG_ERROR_LOAD_APP_CONFIG = "Error loading application config file. %s"
116 MSG_ERROR_EXTRACT_BASE_URL = "Extract base url failed. Error: %s"
117 
118 
119 MSG_INFO_PROCESS_BATCH = "ProcessBatch "
120 MSG_INFO_STORE_COOKIES_FILE = "Store cookies file on disk."
121 
122 MSG_DEBUG_NON_PROCESSING = "ProcessorName is NONE. Exclude batch item from further processing."
123 
124 SITE_MD5_EMPTY = "d41d8cd98f00b204e9800998ecf8427e"
125 
126 DEFAULT_MAX_SIZE = 1000000
127 EMPTY_RESPONSE_SIZE = "0"
128 
129 APP_NAME = "crawler-task"
130 
131 HTTP_COOKIE = "HTTP_COOKIE"
132 DEFAULT_HTTP_COOKIE = ""
133 HTTP_HEADERS = "HTTP_HEADERS"
134 DEFAULT_HTTP_HEADER = ""
135 
136 DC_URLS_DB_NAME = "dc_urls"
137 DC_URLS_TABLE_PREFIX = "urls_"
138 DC_SITES_DB_NAME = "dc_sites"
139 DC_SITES_PROPERTIES_TABLE_NAME = "sites_properties"
140 DC_SITES_TABLE_NAME = "sites"
141 DC_URLS_TABLE_NAME = "urls"
142 COOKIES_FILE_POSTFIX = ".cookies.txt"
143 
144 NON_PROCESSING = "NONE"
145 
146 HTTP_REDIRECT = "<Response [301]>"
147 HTML_REDIRECT = ""
148 MAX_HTTP_REDIRECTS_UNLIMITED = 0
149 MAX_HTML_REDIRECTS_UNLIMITED = 0
150 META_XPATH = "//meta[contains(@content, 'url')]/@content"
151 
152 Results = namedtuple("Results", "exit_code, output, err")
153 
154 ROBOTS_PATTERN = re.compile(r'(https?://[^/]+).*', re.I)
155 
156 TEXT_CONTENT_TYPE_PATTERN = re.compile('text', re.I)
157 
158 ENV_CRAWLER_STORE_PATH = "ENV_CRAWLER_STORE_PATH"
159 # SiteProperties = namedtuple("http_header", "http_cookie")
160 # CrawlResponse = namedtuple("html_content", "html_header", "html_request")
161 # named tuple for filters
162 # Filter = namedtuple("Filter", "pattern, pattern_name, type, state")
163 
164 DETECT_MIME_MAIN_CONTENT = "1"
165 RECOVER_IF_FAILED = "2"
166 
167 EXIT_SUCCESS = 0
168 EXIT_FAILURE = 1
169 
170 # #The CrawlerTask class, is a interface for fetching content from the web
171 #
172 # This object is a run at once application
173 class CrawlerTask(foundation.CementApp):
174 
175  # Dict matches of error masks and http codes
176  errorMaskHttpCodeDict = { \
177  APP_CONSTS.ERROR_FETCH_INVALID_URL : SeleniumFetcher.ERROR_NAME_NOT_RESOLVED, \
178  APP_CONSTS.ERROR_FETCHER_INTERNAL : SeleniumFetcher.ERROR_FATAL, \
179  APP_CONSTS.ERROR_FETCHER_INTERNAL : SeleniumFetcher.ERROR_GENERAL, \
180  APP_CONSTS.ERROR_FETCH_TOO_MANY_REDIRECTS : SeleniumFetcher.ERROR_TOO_MANY_REDIRECTS, \
181  APP_CONSTS.ERROR_FETCH_CONNECTION_ERROR : SeleniumFetcher.ERROR_PROXY_CONNECTION_FAILED, \
182  APP_CONSTS.ERROR_FETCH_CONNECTION_TIMEOUT : SeleniumFetcher.ERROR_CONNECTION_TIMED_OUT, \
183  APP_CONSTS.ERROR_FETCH_FORBIDDEN : SeleniumFetcher.ERROR_TUNNEL_CONNECTION_FAILED, \
184  APP_CONSTS.ERROR_EMPTY_RESPONSE : SeleniumFetcher.ERROR_EMPTY_RESPONSE, \
185  APP_CONSTS.ERROR_FETCH_FORBIDDEN : SeleniumFetcher.ERROR_SERVICE_UNAVAILABLE, \
186  APP_CONSTS.ERROR_NOT_EXIST_ANY_VALID_PROXY : SeleniumFetcher.ERROR_PROXY_CONNECTION_FAILED, \
187  APP_CONSTS.ERROR_FETCH_HTTP_ERROR : SeleniumFetcher.ERROR_PROXY_CONNECTION_FAILED
188  }
189 
190  # Configuration settings options names
191  DB_HOST_CFG_NAME = "db_host"
192  DB_PORT_CFG_NAME = "db_port"
193  DB_USER_CFG_NAME = "db_user"
194  DB_PWD_CFG_NAME = "db_pwd"
195  DB_SITES_CFG_NAME = "db_dc_sites"
196  DB_URLS_CFG_NAME = "db_dc_urls"
197 
198  RAW_DATA_DIR = "raw_data_dir"
199  SITE_TEMPLATES = "dc_site_template"
200  KEY_VALUE_STORAGE_DIR = "key_value_storage_dir"
201  DB_DATA_DIR = "db_data_dir"
202  URL_SCHEMA_DIR = "url_schema_data_dir"
203  URLS_XPATH_LIST_FILE = "urls_xpath_list_file"
204 
205  # Constants used in class
206  HOST_ALIVE_CHECK_NAME = 'HOST_ALIVE_CHECK'
207  HOST_ALIVE_CHECK_PROXY_NAME = 'HOST_ALIVE_CHECK_PROXY'
208  DEFAULT_PROTOCOL_PREFIX = 'http://'
209 
210  SEARCH_BASE_URL_PATTERN = r'<base[^>]+href="([^">]+)"'
211 
212  # Mandatory
213  class Meta(object):
214  label = APP_NAME
215  def __init__(self):
216  # self.site_table_row = None
217  self.urlXpathList = {}
218 
219 
220  # #constructor
221  # initialize default fields
222  def __init__(self):
223  # call base class __init__ method
224  foundation.CementApp.__init__(self)
226  self.raw_data_dir = None
227  self.batchItem = None
228  self.siteTable = None
229  self.logger = None
230  self.loggerDefault = None
231  self.urlTable = None
232  self.batch = None
233  self.defaultHeaderFile = None
234  self.defaultCookieFile = None
235  self.kvDbDir = None
237  self.dbWrapper = None
238  self.store_items = []
239  self.url = None
240  self.max_fetch_time = CONSTS.FETCHER_TIME_LIMIT_MAX
242  self.exit_code = EXIT_SUCCESS
243  self.keep_old_resources = False
246  self.siteHTTPHeaders = None
247  self.tidyOptions = {'numeric-entities': 1, 'char-encoding': 'utf8'}
248  self.headerFileDir = None
249  self.robotsFileDir = None
250  self.fileHeaders = None
251  self.siteHeaders = None
254  self.feedItems = None
256  self.errorMask = APP_CONSTS.ERROR_OK
257  self.httpApplyHeaders = None
258  self.chainDict = {}
259  self.chainIndex = 0
262  self.dir = None
264  self.crawledResource = None
265  self.realUrl = None
266  self.res = None
267  self.crawledTime = None
268  self.storeHttpRequest = True
269  self.store_http_headers = True
270  self.headersDict = {}
271  self.postForms = {}
272  self.headers = None
273  self.cookie = ''
274  self.proxies = None
275  self.authName = None,
276  self.authPwd = None
277  self.external_url = None
279  self.htmlRecover = None
280  self.autoDetectMime = None
282  self.siteProperties = {}
283  self.needStoreMime = None
285  self.processorName = None
286  self.storeCookies = True
287  self.dom = None
288  self.max_http_redirects = CONSTS.MAX_HTTP_REDIRECTS_LIMIT
289  self.max_html_redirects = CONSTS.MAX_HTML_REDIRECTS_LIMIT
290  self.urlXpathList = {}
291  self.urlTempalteRegular = None
295  self.detectModified = None
296  self.site = None
297  self.urlSchemaDataDir = None
298  self.proxyResolver = None
299  self.normMask = UrlNormalizator.NORM_DEFAULT
300  self.urlsXpathList = None
301  self.cookieResolver = None
302  # status update variables from 'URL_SCHEMA'
306  self.feedUrl = {}
307  self.resetVars()
308  # for support max execution time
312 
313 
314  # #setup
315  # setup application
316  def setup(self):
317  # call base class setup method
318  foundation.CementApp.setup(self)
319 
320 
321  # #run
322  # run application
323  def run(self):
324  # call base class run method
325  foundation.CementApp.run(self)
326 
327  # config section
328  self.loadConfig()
329 
330  # load logger config file
331  self.loadLogConfigFile()
332 
333  # load mandatory options
334  self.loadOptions()
335 
336  # load key-value db
337  self.loadKeyValueDB()
338 
339  # make processing batch data
340  self.processBatch()
341 
342  # Finish logging
343  self.logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
344 
345 
346  # #fetchSiteHeader reads from file and returns fileHeader element
347  #
348  # @return is collect successfully
349  # bullshit, move it to the separate module (like URLCollector)
350  def fetchFileHeader(self, url, siteId):
351  ret = None
352  if self.headerFileDir is not None and self.headerFileDir:
353  host = Utils.UrlParser.getDomain(url)
354  if host is not None and self.siteHeaders is not None:
355  ret = self.hTTPHeadersStorage.loadElement(self.headerFileDir, host, siteId, self.siteHeaders)
356  return ret
357 
358 
359  # #fetchSiteHeader reads from file and returns fileHeader element
360  #
361  # @return is collect successfully
362  # bullshit, move it to the separate module (like URLCollector)
363  def saveFileHeader(self, url, siteId, localFileHeaders):
364  if self.headerFileDir is not None:
365  auth = urlparse.urlsplit(url.strip())[1]
366  host = re.search('([^@]*@)?([^:]*):?(.*)', auth).groups()[1]
367  if host is not None:
368  self.hTTPHeadersStorage.saveElement(self.headerFileDir, host, siteId, localFileHeaders)
369 
370 
371  # #RLs collect URL from response body and fills new batch items for batching iterations
372  #
373  # @return is collect successfully
374  def collectURLs(self):
375  collectURLsResult = False
376  if True or self.dom is not None:
377  collectURLs = CollectURLs(isAbortedByTTL=self.isAbortedByTTL)
378  collectURLs.crawledResource = self.crawledResource
379  collectURLs.url = self.url
380  collectURLs.dom = self.dom
381  collectURLs.realUrl = self.realUrl
382  collectURLs.baseUrl = self.batchItem.baseUrl
383  collectURLs.processorName = self.processorName
384  collectURLs.batchItem = self.batchItem
385  collectURLs.urlXpathList = self.urlXpathList
386  collectURLs.siteProperties = self.siteProperties
387  collectURLs.site = self.site
388  collectURLs.autoRemoveProps = self.auto_remove_props
389  collectURLs.autoDetectMime = self.autoDetectMime
390  collectURLs.processContentTypes = self.processContentTypes
391  collectURLs.postForms = self.postForms
392  collectURLs.urlProcess = self.urlProcess
393  collectURLs.urlsXpathList = self.urlsXpathList
394 
395  self.logger.debug("!!! self.batchItem.baseUrl = %s" , str(self.batchItem.baseUrl))
396 
397  if self.batch.crawlerType != dc_event.Batch.TYPE_REAL_TIME_CRAWLER:
398  collectURLs.dbWrapper = self.dbWrapper
399 
400  if "ROBOTS_COLLECT" not in self.siteProperties or int(self.siteProperties["ROBOTS_COLLECT"]) > 0:
401  collectURLs.robotsParser = self.robotsParser
402 
403  # get proxy name
404  proxyName = CrawlerTask.getProxyName(siteProperties=self.siteProperties,
405  siteId=self.site.id,
406  url=self.url.url,
407  dbWrapper=self.dbWrapper,
408  logger=self.logger)
409 
411  self.url.url,
412  HTTPCookieResolver.STAGE_REGULAR)
413 
414  collectURLsResult, internalLinks, externalLinks, urlObjects, self.feedItems, chainUrls = \
415  collectURLs.process(self.crawledResource.http_code,
416  not self.batch.dbMode & dc_event.Batch.DB_MODE_W,
417  self.httpApplyHeaders,
418  proxyName)
419 
420  self.logger.debug("!!! internalLinks (%s): %s", str(len(internalLinks)), str(internalLinks))
421  self.logger.debug("!!! externalLinks (%s): %s", str(len(externalLinks)), str(externalLinks))
422 
423  if self.dom is not None and collectURLsResult and self.collect_additional_prop and (len(internalLinks) > 0 or\
424  len(externalLinks)):
425  collectPropertiesObj = CollectProperties()
426  collectPropertiesObj.siteId = Utils.autoFillSiteId(self.batchItem.siteId, self.logger)
427  collectPropertiesObj.kvDbDir = self.kvDbDir
428  collectPropertiesObj.res = self.res
429  collectPropertiesObj.batchItem = self.batchItem
430  collectPropertiesObj.realUrl = self.realUrl
431  collectPropertiesObj.process(self.dom, internalLinks, externalLinks)
432 
433  # Fill new batch items in self.collectURLsItems
434  if urlObjects is not None and len(urlObjects) > 0 and \
435  (self.batchItem.depth > 0 or self.processorName == PCONSTS.PROCESSOR_RSS):
436  self.fillItemsFromIterations(urlObjects)
437  if chainUrls is not None and len(chainUrls) > 0:
438  self.fillItemsFromIterationsWithChain(chainUrls, self.batchItem)
439 
440 
441  # #writeData write the response body and headers to file
442  #
443  # @param
444  def writeData(self):
445  if self.dbWrapper is None:
446  self.logger.info("self.dbWrapper is None")
447  return
448 
449  # check wether need store to disk
450  if self.needStoreMime is not None and self.needStoreMime != '*':
451  if self.crawledResource.content_type.lower() not in self.needStoreMime:
452  self.logger.info("Content not set on disk because a conent-type `%s` is not in MIME types list:\n%s",
453  str(self.crawledResource.content_type), str(self.needStoreMime))
454  return
455 
456  urlPut_list = []
457 
458  if TEXT_CONTENT_TYPE_PATTERN.match(self.crawledResource.content_type):
459  # save UTF-8 encoding text for text content types
460  if self.crawledResource.dynamic_fetcher_type:
461  raw_unicode_content = self.crawledResource.meta_content
462  else:
463  raw_unicode_content = self.crawledResource.html_content
464  if raw_unicode_content:
465  if self.crawledResource.charset is not None and self.crawledResource.charset.lower() != "utf-8":
466  self.logger.debug('Decoding content charset: ' + str(self.crawledResource.charset))
467  try:
468  raw_unicode_content = raw_unicode_content.decode(self.crawledResource.charset)
469  except Exception, err:
470  self.logger.debug("Decoding content charset error, type: '" + str(type(raw_unicode_content)) + \
471  "', length: " + str(len(raw_unicode_content)) + " to charset: '" + \
472  str(self.crawledResource.charset) + "', error message: " + str(err))
473  putDict = {'data': base64.b64encode(raw_unicode_content)}
474  contentType = dc_event.Content.CONTENT_RAW_CONTENT
475  urlPut_list.append(dc_event.URLPut(self.batchItem.siteId, self.batchItem.urlId, contentType, putDict))
476 
477  # save tidy recovered file
478  # localDom = None
479  # if self.htmlRecover is not None and self.htmlRecover == '2':
480  # localDom = self.resourceProcess.domParser(None, raw_unicode_content, 200,
481  # self.crawledResource.charset if self.crawledResource \
482  # is not None else None)
483  # if self.htmlRecover is not None and (self.htmlRecover == '1' or self.htmlRecover == '2' and localDom is None):
484  if self.htmlRecover is not None and (self.htmlRecover == '1' or self.htmlRecover == '2'):
485  tidy_content = tidylib.tidy_document(raw_unicode_content, self.tidyOptions)[0]
486  if self.crawledResource.charset is not None and self.crawledResource.charset.lower() != "utf-8":
487  self.logger.debug('Decoding tidy content charset: ' + str(self.crawledResource.charset))
488  try:
489  tidy_content = tidy_content.decode(self.crawledResource.charset)
490  except Exception, err:
491  self.logger.debug("Decoding tidy content charset error, type: '" + str(type(tidy_content)) + \
492  "', length: " + str(len(tidy_content)) + " to charset: '" + \
493  str(self.crawledResource.charset) + "', error message: " + str(err))
494  putDict = {'data': base64.b64encode(tidy_content)}
495  contentType = dc_event.Content.CONTENT_TIDY_CONTENT
496  urlPut_list.append(dc_event.URLPut(self.batchItem.siteId, self.batchItem.urlId, contentType, putDict))
497  else:
498  # save origin binary data for non-text content types
499  if self.crawledResource.binary_content is not None:
500  putDict = {'data': base64.b64encode(self.crawledResource.binary_content)}
501  contentType = dc_event.Content.CONTENT_RAW_CONTENT
502  urlPut_list.append(dc_event.URLPut(self.batchItem.siteId, self.batchItem.urlId, contentType, putDict))
503 
504  # save rendered file
505  if self.crawledResource.dynamic_fetcher_type and self.crawledResource.html_content:
506  putDict = {"data": base64.b64encode(self.crawledResource.html_content)}
507  contentType = dc_event.Content.CONTENT_DYNAMIC_CONTENT
508  urlPut_list.append(dc_event.URLPut(self.batchItem.siteId, self.batchItem.urlId, contentType, putDict))
509 
510  self.logger.debug('!!! self.crawledResource.response_header = ' + str(self.crawledResource.response_header))
511 # self.logger.debug('!!! self.httpApplyHeaders = ' + str(self.httpApplyHeaders))
512 # self.logger.debug('!!! self.crawledResource.html_request = ' + str(self.crawledResource.html_request))
513 # self.logger.debug('!!! Before change self.httpApplyHeaders = ' + str(self.httpApplyHeaders))
514  # ##self.httpApplyHeaders = self.crawledResource.response_header # #???
515 # self.logger.debug('!!! After change self.httpApplyHeaders = ' + str(self.httpApplyHeaders))
516  # html header
517  if self.store_http_headers and self.crawledResource.response_header:
518  putDict = {"data": base64.b64encode(self.crawledResource.response_header)}
519  contentType = dc_event.Content.CONTENT_HEADERS_CONTENT
520  urlPut_list.append(dc_event.URLPut(self.batchItem.siteId, self.batchItem.urlId, contentType, putDict))
521 
522  # html request
523  if self.storeHttpRequest and self.crawledResource.html_request:
524  putDict = {"data": base64.b64encode(self.crawledResource.html_request)}
525  contentType = dc_event.Content.CONTENT_REQUESTS_CONTENT
526  urlPut_list.append(dc_event.URLPut(self.batchItem.siteId, self.batchItem.urlId, contentType, putDict))
527 
528  # Write raw content on disk via db-task
529  # Check if Real-Time crawling
530  self.dbWrapper.putURLContent(urlPut_list)
531 
532  # change url's contentMask
533  self.url.contentMask = dc_event.URL.CONTENT_STORED_ON_DISK
534  self.batchItem.urlObj.contentMask = dc_event.URL.CONTENT_STORED_ON_DISK
535 
536 
537  # #getDir prepare dir
538  #
539  # @param
540  def getDir(self):
541  if len(self.batchItem.siteId):
542  self.dir = os.path.join(self.raw_data_dir, self.batchItem.siteId, PathMaker(self.batchItem.urlId).getDir())
543  else:
544  self.dir = os.path.join(self.raw_data_dir, "0", PathMaker(self.batchItem.urlId).getDir())
545 
546 
547  # #makeDir prepare dir
548  #
549  # @param
550  def makeDir(self):
551  self.logger.debug('!!! makeDir() enter .... self.dir = ' + str(self.dir))
552  if not os.path.exists(self.dir):
553  os.makedirs(self.dir)
554 
555  if not os.path.isdir(self.dir):
556  self.errorMask = self.errorMask | APP_CONSTS.ERROR_WRITE_FILE_ERROR
557  self.updateURLForFailed(APP_CONSTS.ERROR_WRITE_FILE_ERROR)
558  raise Exception("path %s exists, but is not a directory" % (self.dir,))
559 
560 
561  # #updateURLForFailed
562  #
563  # @param error_bit BitMask of error
564  def updateURLForFailed(self, errorBit, httpCode=CONSTS.HTTP_CODE_400, status=dc_event.URL.STATUS_CRAWLED, \
565  updateUdate=True):
566  self.urlProcess.siteId = self.batchItem.siteId
567  self.urlProcess.updateURLForFailed(errorBit, self.batchItem, httpCode, status, updateUdate)
568  if self.crawledResource is not None:
569  self.crawledResource.http_code = httpCode
570  self.writeData()
571 
572 
573  # #httpRequestWrapper method makes http request, using detectModified object if it present
574  #
575  # @param url - http requests url
576  # @param headers - http headers, using in http request
577  # @param auth - authentification
578  # @param postData - data using in post request
579  # @param urlObj - url's Obj
580  # @param incomingContent - raw content that is externally provided
581  # @param macroCode - list of the strings with the source code to be executed at the dynamic fetcher,
582  # for example JavaScript
583  # @param proxyName - proxy name
584  # @return http resource (http response)
585  def httpRequestWrapper(self, url, headers, auth, postData, urlObj, incomingContent, macroCode=None, proxyName=None):
586  ret = None
587  # (1) Fetcher type detection section
588  localFetchType = copy.deepcopy(self.site.fetchType)
589  if localFetchType == BaseFetcher.TYP_DYNAMIC:
590  if urlObj is not None and urlObj.parentMd5 is not None and \
591  "DYNAMIC_FETCH_ONLY_FOR_ROOT_URL" in self.siteProperties and \
592  int(self.siteProperties["DYNAMIC_FETCH_ONLY_FOR_ROOT_URL"]) > 0:
593  if urlObj.parentMd5 == "":
594  localFetchType = BaseFetcher.TYP_DYNAMIC
595  else:
596  localFetchType = BaseFetcher.TYP_NORMAL
597  elif localFetchType == BaseFetcher.TYP_AUTO:
598  localFetchType = BaseFetcher.TYP_NORMAL
599  # if self.curBatchIterations == 2 and self.processorName == PCONSTS.PROCESSOR_RSS and \
600  # localFetchType == BaseFetcher.TYP_DYNAMIC:
601  if urlObj is not None and urlObj.parentMd5 is not None and urlObj.parentMd5 == "" and \
602  self.processorName == PCONSTS.PROCESSOR_RSS and localFetchType == BaseFetcher.TYP_DYNAMIC:
603  localFetchType = BaseFetcher.TYP_NORMAL
604 
605  if urlObj is not None and "FETCHER_TYPE" in self.siteProperties and self.siteProperties["FETCHER_TYPE"]:
606  fetchResType = FetcherType.getFromProperty(self.siteProperties["FETCHER_TYPE"], urlObj.url, self.logger)
607  if fetchResType is not None:
608  localFetchType = fetchResType
609 
610  # (1) END
611  self.logger.debug(">>> FetchType before applying = " + str(localFetchType))
612  if self.detectModified is not None:
613  self.logger.debug(">>> self.detectModified.modifiedSettings = " + str(self.detectModified.modifiedSettings))
614  self.logger.debug(">>> self.urlProcess.urlObj.lastModified = " + str(self.urlProcess.urlObj.lastModified))
615 
616  if self.detectModified is None or self.detectModified.modifiedSettings is None or \
617  (urlObj is not None and urlObj.crawled == 0):
618  if incomingContent is None:
619 # self.logger.debug(">>> Filters() (1) self.site.filters: " + varDump(self.site.filters))
620 # localFilters = Filters(None, self.dbWrapper, self.batchItem.siteId, 0, \
621 # None, Filters.OC_RE)
622  # ret = BaseFetcher.get_fetcher(localFetchType).\
623  # open(url, external_url=self.external_url, timeout=int(self.url.httpTimeout) / 1000.0, headers=headers,
624  # allow_redirects=self.allow_http_redirects, proxies=self.proxies, auth=auth, data=postData,
625  # logger=self.logger, allowed_content_types=self.processContentTypes,
626  # max_resource_size=self.site.maxResourceSize, max_redirects=self.max_http_redirects,
627  # filters=localFilters, depth=urlObj.depth, macro=macroCode)
628 
629  fetcher = BaseFetcher.get_fetcher(localFetchType, self.dbWrapper, self.site.id)
630  if "CONNECTION_TIMEOUT" in self.siteProperties:
631  fetcher.connectionTimeout = float(self.siteProperties["CONNECTION_TIMEOUT"])
632  else:
633  fetcher.connectionTimeout = CONSTS.CONNECTION_TIMEOUT
634 
635  if self.external_url is not None and \
636  (isinstance(self.external_url, str) or isinstance(self.external_url, unicode)):
637  self.logger.debug('self.external_url: ' + str(self.external_url) + ' url: ' + str(url))
638  if '%URL%' in self.external_url:
639  url = self.external_url.replace('%URL%', urllib.quote(url))
640  self.logger.debug('New url: ' + str(url))
641  tm = int(self.url.httpTimeout) / 1000.0
642  if isinstance(self.url.httpTimeout, float):
643  tm += float('0' + str(self.url.httpTimeout).strip()[str(self.url.httpTimeout).strip().find('.'):])
644 
645  cookieStage = HTTPCookieResolver.STAGE_REDIRECT
646  if self.processorName == PCONSTS.PROCESSOR_RSS:
647  cookieStage = cookieStage | HTTPCookieResolver.STAGE_RSS
648  headers = self.updateHeadersByCookies(headers, url, cookieStage)
649 
650  self.logger.debug("!!! Before fetcher.open() for url: %s", str(url))
651  self.logger.debug("!!! Before fetcher.open() self.site.maxResourceSize = %s", str(self.site.maxResourceSize))
652 
653  ret = fetcher.open(url, timeout=tm, headers=headers,
654  allow_redirects=self.allow_http_redirects, proxies=strToProxy(proxyName), auth=auth,
655  data=postData, log=self.logger, allowed_content_types=self.processContentTypes,
656  max_resource_size=self.site.maxResourceSize, max_redirects=self.max_http_redirects,
657  filters=None if urlObj.parentMd5 == "" else self.site.filters,
658  depth=urlObj.depth, macro=macroCode)
659  else:
660  # ret = BaseFetcher.get_fetcher(BaseFetcher.TYP_CONTENT).open(url, inputContent=incomingContent)
661  fetcher = BaseFetcher.get_fetcher(BaseFetcher.TYP_CONTENT, self.dbWrapper, self.site.id)
662  if "CONNECTION_TIMEOUT" in self.siteProperties:
663  fetcher.connectionTimeout = float(self.siteProperties["CONNECTION_TIMEOUT"])
664  else:
665  fetcher.connectionTimeout = CONSTS.CONNECTION_TIMEOUT
666  ret = fetcher.open(url, inputContent=incomingContent, log=self.logger)
667  else:
668  self.detectModified.expiredData = urlObj.CDate
669  self.detectModified.eTags = urlObj.eTag
670  self.detectModified.prevContentLen = urlObj.size
671  self.detectModified.prevContentCrc32 = urlObj.rawContentMd5
672  self.detectModified.prevContentDate = urlObj.CDate
673  httpParams = {}
674  httpParams["url"] = url
675  httpParams["externalUrl"] = self.external_url
676  tm = int(self.url.httpTimeout) / 1000.0
677  if isinstance(self.url.httpTimeout, float):
678  tm += float('0' + str(self.url.httpTimeout).strip()[str(self.url.httpTimeout).strip().find('.'):])
679  httpParams["httpTimeout"] = tm
680  httpParams["httpHeader"] = headers
681  httpParams["allowHttpRedirects"] = self.allow_http_redirects
682  httpParams["proxies"] = self.proxies
683  httpParams["auth"] = auth
684  httpParams["postData"] = postData
685  httpParams["processContentTypes"] = self.processContentTypes
686  httpParams["maxResourceSize"] = self.site.maxResourceSize
687  httpParams["maxHttpRedirects"] = self.max_http_redirects
688  ret = self.detectModified.makeHTTPRequest(localFetchType, httpParams)
689 
690  if self.detectModified.isNotModified():
691  self.logger.debug("!!! self.detectModified.isNotModified() ret.status_code: %s !!!", str(ret.status_code))
692  if ret is not None:
693  self.res = ret
694  raise NotModifiedException("Detect resource not modified state", ret.status_code)
695 
696  return ret
697 
698 
699  # #getRotatedHeaders get dict() of rotated headers with low frequency of usage and update frequencies
700  #
701  # @return dict() of rotated headers with low frequency of usage
702  def processRotatedHeaders(self, url):
703  self.fileHeaders = self.fetchFileHeader(url, self.batchItem.siteId)
704  rotatedHeaders = self.hTTPHeadersStorage.fetchLowFreqHeaders(self.fileHeaders, self.siteHeaders)
705  self.httpApplyHeaders = {}
706  # Initialize with single-value string type not rotated headers
707  for h in self.headersDict:
708  if isinstance(self.headersDict[h], basestring):
709  self.httpApplyHeaders[h] = self.headersDict[h]
710  # Add rotated headers, possible overwrite someone from initialized before
711  for h in rotatedHeaders:
712  self.fileHeaders[h[0]][h[1]] += 1
713  self.httpApplyHeaders[h[0]] = h[1]
714  self.saveFileHeader(url, self.batchItem.siteId, self.fileHeaders)
715 # self.logger.debug("self.fileHeaders:\n%s\nEffective headers:\n%s", str(self.fileHeaders),
716 # str(self.httpApplyHeaders))
717  # Overwrite a referrer header with custom per site property rule definition
718  self.refererHeaderResolver.\
719  resolveRefererHeader(self.httpApplyHeaders, int(self.siteProperties["REFERER_SELF_URL"])
720  if "REFERER_SELF_URL" in self.siteProperties else RefererHeaderResolver.MODE_SIMPLE, url,
721  self.batchItem.siteId, self.url.parentMd5, self.dbWrapper)
722 
723 
724  # #crawl site
725  #
726  # @return should continue to write data and collect URLs
727  def crawl(self, incomingContent):
728  self.crawledResource = None
729  # delay
730  self.logger.debug("Make request delay " + str(self.url.requestDelay / 1000.0) + " sec.")
731  time.sleep(self.url.requestDelay / 1000.0)
732  # (1) Decode url - [UrlProcess]
733  # TODO move to - [UrlProcess]
734  self.logger.debug("!!! self.url.url = '%s'", str(self.url.url))
735 
736  self.urlProcess.url = self.url.url
737  self.urlProcess.site = self.site
738  # (1) END
739  url = self.urlProcess.getRealUrl()
740  self.realUrl = url
741  startTime = time.time()
742 
743  self.processRotatedHeaders(url)
744 
745  self.logger.debug("!!! url = '%s'", str(url))
746 
747  macro = None
748  if 'FETCHER_MACRO' in self.siteProperties and self.siteProperties['FETCHER_MACRO'] is not None\
749  and self.siteProperties['FETCHER_MACRO'] != '':
750  try:
751  macro = json.loads(self.siteProperties['FETCHER_MACRO'])
752  except Exception, err:
753  self.logger.error("Initialization of macro error: %s, source: %s", str(err),
754  str(self.siteProperties['FETCHER_MACRO']))
755  self.errorMask = self.errorMask | APP_CONSTS.ERROR_MACRO_DESERIALIZATION
756  self.updateSiteParams(APP_CONSTS.ERROR_MACRO_DESERIALIZATION)
757  self.updateURLForFailed(self.errorMask)
758  return False
759 
760  try:
761  # (2) Makes auth tuple - []
762  # TODO move to - []
763  if self.authName and self.authPwd:
764  auth = (self.authName, self.authPwd)
765  self.logger.info("using http basic auth %s:%s", self.authName, self.authPwd)
766  else:
767  auth = None
768  # (2) END
769  # (3) Resolve HTTP Method (move convertToHttpDateFmt to Utils) - [UrlProcess]
770  # TODO move to - [UrlProcess]
771  self.urlProcess.urlObj = self.url
772  postData = self.urlProcess.resolveHTTP(self.postForms, self.httpApplyHeaders)
773  # (3) END
774 
775  # Workaround
776  url = self.urlProcess.urlTemplateApply(url, self.batch.crawlerType, self.urlTempalteRegular,
779 
780  self.logger.debug("!!! urlTemplateApply() return url = '%s'", str(url))
781 
782  # Checking is available urs or not
783  if not CrawlerTask.isAvailableUrl(siteProperties=self.siteProperties, url=url, logger=self.logger):
784  self.logger.debug("Host '%s' is not available!", str(url))
785  raise UrlAvailableException("Host '%s' is not available!" % str(url))
786 
787  # Check robots mode use proxy if allowed
788  if "ROBOTS_MODE" not in self.siteProperties or int(self.siteProperties["ROBOTS_MODE"]) > 0:
789  self.logger.debug("Robots.txt obey mode ON")
790 
791  # get proxy name
792  proxyName = CrawlerTask.getProxyName(siteProperties=self.siteProperties,
793  siteId=self.site.id,
794  url=self.url.url,
795  dbWrapper=self.dbWrapper,
796  logger=self.logger)
797 
798  if self.robotsParser.loadRobots(url, self.batchItem.siteId, self.httpApplyHeaders, proxyName):
800  url,
801  HTTPCookieResolver.STAGE_ROBOTS)
802 
803  isAllowed, retUserAgent = self.robotsParser.checkUrlByRobots(url, self.batchItem.siteId,
804  self.httpApplyHeaders)
805  if not isAllowed:
806  self.logger.debug(">>> URL " + url + " is NOT Allowed by user-agent:" + str(retUserAgent))
807  self.errorMask = self.errorMask | APP_CONSTS.ERROR_ROBOTS_NOT_ALLOW
808  self.updateSiteParams(APP_CONSTS.ERROR_ROBOTS_NOT_ALLOW)
809  self.updateURLForFailed(self.errorMask)
810  return False
811 
812  # (3.2) HTTP Fetcher with html redirect resolving
813  localUrl = url
814  prevUrl = localUrl
815  res = self.makeDefaultResponse(Response())
816 
817  self.logger.debug("!!! localUrl = '%s'", str(localUrl))
818 
819  retriesCount = HTTPProxyResolver.getTriesCount(self.siteProperties)
820  proxyTriesCount = 0
821  proxyName = None
822  for count in range(0, retriesCount + 1):
823  self.logger.debug("retriesCount = %s, count = %s", str(retriesCount), str(count))
824  # check is exceeded tries count
825  HTTPProxyResolver.checkTriesCount(siteProperties=self.siteProperties, currentTriesCount=proxyTriesCount)
826 
827  # get proxy name
828  proxyName = CrawlerTask.getProxyName(siteProperties=self.siteProperties,
829  siteId=self.site.id,
830  url=self.url.url,
831  dbWrapper=self.dbWrapper,
832  logger=self.logger)
833 
834  # increment counter
835  if proxyName is not None:
836  proxyTriesCount += 1
837 
838  try:
839  self.logger.debug("Use headers: %s type: %s", str(self.httpApplyHeaders), str(type(self.httpApplyHeaders)))
840  self.logger.info("start to fetch: %s", localUrl)
841  res = self.httpRequestWrapper(localUrl, self.httpApplyHeaders, auth, postData, self.url, incomingContent, \
842  macro, proxyName)
843  except SeleniumFetcherException, err:
844  self.logger.debug("!!! httpRequestWrapper return error: %s", str(err))
845  CrawlerTask.addProxyFaults(siteProperties=self.siteProperties,
846  siteId=self.site.id,
847  proxyName=proxyName,
848  dbWrapper=self.dbWrapper)
849  continue
850 
851  # Check raw content use patterns
852  if CrawlerTask.isNeedRotateProxy(siteProperties=self.siteProperties,
853  siteId=self.site.id,
854  proxyName=proxyName,
855  dbWrapper=self.dbWrapper,
856  rawContent=res.rendered_unicode_content):
857  self.logger.debug('Necessary rotate proxy. Go to the next...')
858  continue
859 
860  if res is not None and res.error_mask != APP_CONSTS.ERROR_OK:
861  self.logger.debug("res.error_mask = %s", str(res.error_mask))
862  continue
863  elif res is None or self.max_html_redirects is None or \
864  self.max_html_redirects < CONSTS.MAX_HTML_REDIRECTS_LIMIT or \
865  not self.allow_html_redirects:
866  break
867  elif self.max_html_redirects > 0 and self.htmlRedirects >= self.max_html_redirects:
868  self.logger.debug("Max html redirects reached %s>=%s", str(self.htmlRedirects), str(self.max_html_redirects))
869  self.errorMask = self.errorMask | APP_CONSTS.ERROR_MAX_ALLOW_HTML_REDIRECTS
870  self.updateURLForFailed(APP_CONSTS.ERROR_MAX_ALLOW_HTML_REDIRECTS)
871  return False
872  elif res.rendered_unicode_content is not None:
873  if 'content-type' in res.headers and res.headers['content-type'].find('text/html') > -1:
874  prevUrl = localUrl
875 
876  if self.site.fetchType == BaseFetcher.TYP_DYNAMIC:
877  res.rendered_unicode_content = Utils.eraseNoScript(res.rendered_unicode_content)
878 
879  try:
880  localUrl = Utils.getHTMLRedirectUrl(res.rendered_unicode_content, self.logger)
881  except Exception, err:
882  self.logger.error("Error: %s", str(err))
883  self.logger.info(Utils.getTracebackInfo())
884 
885  self.logger.debug("!!! HTML redirect to '%s'", str(localUrl))
886  if localUrl is None or localUrl == '':
887  break
888  elif res.status_code != CONSTS.HTTP_CODE_200 and res.status_code not in CONSTS.REDIRECT_HTTP_CODES:
889  self.logger.debug("!!! Url skipped, because http code = '%s'", str(res.status_code))
890  localUrl = None
891  break
892  else:
893  # HTML Redirect section
894  collectURLs = CollectURLs()
895  isAllowedByFilter = collectURLs.filtersApply(self.site.filters, localUrl, 0, self.dbWrapper, \
896  self.batchItem.siteId, None, \
897  Filters.OC_RE, Filters.STAGE_COLLECT_URLS)
898  if not isAllowedByFilter:
899  localUrl = urlparse.urljoin(prevUrl, localUrl)
900 
901  localUrl = dc_event.URL(0, localUrl, normalizeMask=self.normMask).getURL(self.normMask)
902  self.logger.debug("HTML redirect: %s, is allowed by filters: %s", localUrl, str(bool(isAllowedByFilter)))
903  self.htmlRedirects += 1
904  else:
905  break
906  else:
907  break
908  # (4) Checks and update url in db-task them - [UrlProcess]
909  # TODO move to - [UrlProcess]
910  # (OLD if version) if res.headers is None:
911  if res is not None and res.error_mask != 0:
912  self.logger.debug("Positive res.error_mask: %s", str(res.error_mask))
913  self.updateURLForFailed(res.error_mask)
914  self.errorMask = self.errorMask | res.error_mask
915  return False
916 
917  if res is not None and res.headers is not None and "content-length" in res.headers and \
918  res.headers["content-length"] == EMPTY_RESPONSE_SIZE:
919  self.logger.debug('Zero content-length!')
920  self.errorMask = self.errorMask | APP_CONSTS.ERROR_EMPTY_RESPONSE
921  self.updateURLForFailed(self.errorMask)
922  return False
923  # (4) END
924 
925  # save res to self, will use it in next steps
926  # (5) Create and fill resource struct - [ResourceProcess]
927  # TODO move to - [ResourceProcess]
928  self.res = res
929  self.logger.info("!!! response code: '%s'", str(self.res.status_code))
930  self.logger.info("!!! response cookies: '%s'", str(self.res.cookies))
931  # use charset to improve encoding detect
932  self.crawledTime = time.time()
933  self.resourceProcess.urlObj = self.url
934  resource = self.resourceProcess.generateResource(startTime, res, self.headers, self.crawledTime,
935  self.defaultIcrCrawlTime,
936  self.siteProperties["CONTENT_TYPE_MAP"] if \
937  "CONTENT_TYPE_MAP" in self.siteProperties else None)
938 
939  # Execution 'REPLACE' property if necessary
940  if APP_CONSTS.REPLACEMENT_CONTENT_DATA in self.siteProperties:
941  self.logger.debug("!!! Found property 'REPLACE' !!!")
942 
943  self.res.rendered_unicode_content = ContentEvaluator.executeReplace(
944  dbWrapper=self.dbWrapper,
945  siteId=self.batchItem.siteId,
946  propertyString=self.siteProperties[APP_CONSTS.REPLACEMENT_CONTENT_DATA],
947  contentData=self.res.rendered_unicode_content)
948 
949  self.res.content_size = len(self.res.rendered_unicode_content)
950 
951 
952  # collect cookies
953  self.cookieResolver.addCookie(url, resource.cookies)
954 
955  resource.dynamic_fetcher_type = res.dynamic_fetcher_type
956  resource.dynamic_fetcher_result_type = res.dynamic_fetcher_result_type
957  self.crawledResource = resource
958  # (5) END
959 
960  # Save last modified value to detect modified object
961  if self.detectModified is not None:
962  self.detectModified.lastModified = self.crawledResource.last_modified
963 
964  if self.crawledResource.http_code >= CONSTS.HTTP_CODE_400:
965  self.errorMask = self.errorMask | APP_CONSTS.ERROR_HTTP_ERROR
966  # Add error mask about forbidden fetch
967  if self.crawledResource.http_code == CONSTS.HTTP_CODE_403:
968  self.errorMask = APP_CONSTS.ERROR_FETCH_FORBIDDEN
969 
970  self.updateSiteParams(self.errorMask)
971  self.updateURLForFailed(self.errorMask, self.crawledResource.http_code)
972  return False
973 
974  # Added by Oleksii
975  # support of HTTP_REDIRECTS_MAX
976  # (7) Check response for redirect count [???]
977  self.checkResponse()
978 
979  if not self.allow_http_redirects:
980  self.errorMask = self.errorMask | APP_CONSTS.ERROR_MAX_ALLOW_HTTP_REDIRECTS
981  self.updateSiteParams(APP_CONSTS.ERROR_MAX_ALLOW_HTTP_REDIRECTS)
982  self.updateURLForFailed(self.errorMask) # #, self.crawledResource.http_code)
983  return False
984  # (7) END
985 
986  # (8) Check Content size and response code - [ResourceProcess]
987  # TODO move to - [ResourceProcess]
988  self.resourceProcess.resource = resource
989  self.resourceProcess.batchItem = self.batchItem
990  if not self.resourceProcess.checkResourcesResponse(res, self.site.maxResourceSize, self.updateSiteParams):
991  self.errorMask = self.errorMask | resource.error_mask
992  self.updateURLForFailed(self.errorMask, res.status_code) # #, self.crawledResource.http_code)
993  return False
994  # (8) END
995 
996  # # Block handlers 'STAGE_BEFORE_DOM_PRE'
997  self.logger.debug("+++++++++++++++++++++++++++++++++++++")
998  self.logger.debug("Block handlers 'STAGE_BEFORE_DOM_PRE'")
999  collectURLs = CollectURLs()
1000  self.logger.debug("self.site.filters: " + varDump(self.site.filters))
1001  # Create class Filters instance for check 'raw content' use regular expressions
1002  localFilters = Filters(None, self.dbWrapper, self.batchItem.siteId, 0,
1003  None, Filters.OC_RE, Filters.STAGE_BEFORE_DOM_PRE, Filters.SELECT_SUBJECT_RAW_CONTENT)
1004 
1005  self.logger.debug('>>> localFilters.filters: ' + varDump(localFilters.filters))
1006  self.logger.debug(">>> isExistStage('STAGE_BEFORE_DOM_PRE'): " + \
1007  str(localFilters.isExistStage(Filters.STAGE_BEFORE_DOM_PRE)))
1008  # (9) Check RAW content text regular expression
1009  if localFilters.isExistStage(Filters.STAGE_BEFORE_DOM_PRE):
1010  self.logger.debug("Check RAW content text regular expression ...")
1011  if collectURLs.filtersApply(None, resource.binary_content, 0, self.dbWrapper,
1012  self.batchItem.siteId, None, Filters.OC_RE,
1013  Filters.STAGE_BEFORE_DOM_PRE, Filters.SELECT_SUBJECT_RAW_CONTENT, True):
1014  self.logger.debug("RAW content text regular expression check SUCCESS")
1015  else:
1016  self.logger.debug("RAW content text regular expression check FAILED")
1017  self.errorMask = self.errorMask | APP_CONSTS.ERROR_CRAWLER_FILTERS_BREAK
1018  self.updateURLForFailed(self.errorMask)
1019  return False
1020  # (9) END
1021 
1022  # Create class Filters instance for check 'headers' use regular expressions
1023  localFilters = Filters(None, self.dbWrapper, self.batchItem.siteId, 0,
1024  None, Filters.OC_RE, Filters.STAGE_BEFORE_DOM_PRE, Filters.SELECT_SUBJECT_HEADERS_ALL)
1025 
1026  self.logger.debug('>>> localFilters.filters: ' + varDump(localFilters.filters))
1027  self.logger.debug(">>> isExistStage('STAGE_BEFORE_DOM_PRE'): " + \
1028  str(localFilters.isExistStage(Filters.STAGE_BEFORE_DOM_PRE)))
1029  # (10) Check HTTP headers by name text regular expression check
1030  if localFilters.isExistStage(Filters.STAGE_BEFORE_DOM_PRE):
1031  self.logger.debug("Check HTTP headers by name text regular expression check ...")
1032  if collectURLs.filtersApply(None, resource.response_header, 0, self.dbWrapper,
1033  self.batchItem.siteId, None, Filters.OC_RE,
1034  Filters.STAGE_BEFORE_DOM_PRE, Filters.SELECT_SUBJECT_HEADERS_ALL, True):
1035  self.logger.debug("HTTP headers by name text regular expression check SUCCESS")
1036  else:
1037  self.logger.debug("HTTP headers by name text regular expression check FAILED")
1038  self.errorMask = self.errorMask | APP_CONSTS.ERROR_CRAWLER_FILTERS_BREAK
1039  self.updateURLForFailed(self.errorMask)
1040  return False
1041  # (10) END
1042 
1043  # (11) Check Last modified datetime value date comparison check
1044  self.logger.debug("Check Last modified datetime value date comparison check ...")
1045  self.logger.debug('resource.last_modified = ' + str(resource.last_modified))
1046 
1047  localFilters = Filters(None, self.dbWrapper, self.batchItem.siteId, 0,
1048  {'PDATE':str(resource.last_modified)}, Filters.OC_SQLE, Filters.STAGE_BEFORE_DOM_PRE,
1049  Filters.SELECT_SUBJECT_LAST_MODIFIED)
1050 
1051  self.logger.debug('>>> localFilters.filters: ' + varDump(localFilters.filters))
1052  self.logger.debug(">>> isExistStage('STAGE_BEFORE_DOM_PRE'): " + \
1053  str(localFilters.isExistStage(Filters.STAGE_BEFORE_DOM_PRE)))
1054 
1055  if localFilters.isExistStage(Filters.STAGE_BEFORE_DOM_PRE):
1056  if collectURLs.filtersApply(None, '', 0, self.dbWrapper, self.batchItem.siteId,
1057  {'PDATE':str(resource.last_modified)}, Filters.OC_SQLE,
1058  Filters.STAGE_BEFORE_DOM_PRE, Filters.SELECT_SUBJECT_LAST_MODIFIED, True):
1059  self.logger.debug("Last modified datetime value date comparison check SUCCESS")
1060  else:
1061  self.logger.debug("Last modified datetime value date comparison check FAILED")
1062  self.errorMask = self.errorMask | APP_CONSTS.ERROR_CRAWLER_FILTERS_BREAK
1063  self.updateURLForFailed(self.errorMask)
1064  return False
1065  # (11) END
1066 
1067 
1068  except (requests.exceptions.Timeout, requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout), err:
1069  self.errorMask = self.errorMask | APP_CONSTS.ERROR_REQUEST_TIMEOUT
1070  self.updateURLForFailed(APP_CONSTS.ERROR_REQUEST_TIMEOUT)
1071  self.res = self.makeDefaultResponse(self.res)
1072  return False
1073  except requests.exceptions.InvalidURL:
1074  self.errorMask = self.errorMask | APP_CONSTS.ERROR_BAD_URL
1075  self.updateURLForFailed(APP_CONSTS.ERROR_BAD_URL)
1076  self.res = self.makeDefaultResponse(self.res)
1077  return False
1078  except requests.exceptions.TooManyRedirects:
1079  self.errorMask = self.errorMask | APP_CONSTS.ERROR_FETCH_TOO_MANY_REDIRECTS
1080  self.updateURLForFailed(APP_CONSTS.ERROR_FETCH_TOO_MANY_REDIRECTS)
1081  self.res = self.makeDefaultResponse(self.res)
1082  return False
1083  except requests.exceptions.ChunkedEncodingError:
1084  self.errorMask = self.errorMask | APP_CONSTS.ERROR_PAGE_CONVERT_ERROR
1085  self.updateURLForFailed(APP_CONSTS.ERROR_PAGE_CONVERT_ERROR)
1086  self.res = self.makeDefaultResponse(self.res)
1087  return False
1088  except requests.exceptions.ConnectionError:
1089  self.errorMask = self.errorMask | APP_CONSTS.ERROR_CONNECTION_ERROR
1090  self.updateURLForFailed(APP_CONSTS.ERROR_CONNECTION_ERROR)
1091  self.res = self.makeDefaultResponse(self.res)
1092  return False
1093  except requests.exceptions.ContentDecodingError:
1094  self.errorMask = self.errorMask | APP_CONSTS.ERROR_PAGE_CONVERT_ERROR
1095  self.updateURLForFailed(APP_CONSTS.ERROR_PAGE_CONVERT_ERROR)
1096  self.res = self.makeDefaultResponse(self.res)
1097  return False
1098  except lxml.etree.XMLSyntaxError: # pylint: disable=E1101
1099  self.logger.debug("XML HTML syntax error")
1100  self.errorMask = self.errorMask | APP_CONSTS.ERROR_DTD_INVALID
1101  self.updateURLForFailed(APP_CONSTS.ERROR_DTD_INVALID)
1102  self.res = self.makeDefaultResponse(self.res)
1103  return False
1104  except ProxyException, err:
1105  self.logger.debug('self.errorMask = ' + str(self.errorMask) + ' err.code = ' + str(err.code) + \
1106  ' err.statusUpdate = ' + str(err.statusUpdate))
1107  status = dc_event.URL.STATUS_CRAWLED
1108  if err.statusUpdate is not None:
1109  status = err.statusUpdate
1110  self.logger.debug('Set status update = ' + str(status))
1111  self.errorMask = self.errorMask | err.code
1112  self.updateURLForFailed(self.errorMask, SeleniumFetcher.ERROR_PROXY_CONNECTION_FAILED, status)
1113  self.res = self.makeDefaultResponse(self.res, SeleniumFetcher.ERROR_PROXY_CONNECTION_FAILED)
1114  return False
1115  except SeleniumFetcherException, err:
1116  self.logger.error("Selenium fetcher error: " + str(err) + ' code = ' + str(err.code))
1117  httpCode = CONSTS.HTTP_CODE_400
1118  if err.code in self.errorMaskHttpCodeDict:
1119  httpCode = self.errorMaskHttpCodeDict[err.code]
1120  self.errorMask = self.errorMask | err.code
1121  self.updateURLForFailed(self.errorMask, httpCode)
1122  self.res = self.makeDefaultResponse(self.res, httpCode)
1123  return False
1124  except UrlAvailableException, err:
1125  self.errorMask = self.errorMask | APP_CONSTS.ERROR_CONNECTION_ERROR
1126  self.updateURLForFailed(APP_CONSTS.ERROR_CONNECTION_ERROR)
1127  self.res = self.makeDefaultResponse(self.res)
1128  return False
1129  except requests.exceptions.HTTPError, err:
1130  self.errorMask = self.errorMask | APP_CONSTS.ERROR_FETCH_HTTP_ERROR
1131  self.updateURLForFailed(APP_CONSTS.ERROR_FETCH_HTTP_ERROR)
1132  self.res = self.makeDefaultResponse(self.res)
1133  return False
1134  except requests.exceptions.URLRequired, err:
1135  self.errorMask = self.errorMask | APP_CONSTS.ERROR_FETCH_INVALID_URL
1136  self.updateURLForFailed(APP_CONSTS.ERROR_FETCH_INVALID_URL)
1137  self.res = self.makeDefaultResponse(self.res)
1138  return False
1139  except requests.exceptions.RequestException, err:
1140  self.errorMask = self.errorMask | APP_CONSTS.ERROR_FETCH_AMBIGUOUS_REQUEST
1141  self.updateURLForFailed(APP_CONSTS.ERROR_FETCH_AMBIGUOUS_REQUEST)
1142  self.res = self.makeDefaultResponse(self.res)
1143  return False
1144  except CrawlerFilterException, err:
1145  self.errorMask = self.errorMask | APP_CONSTS.ERROR_CRAWLER_FILTERS_BREAK
1146  self.updateURLForFailed(APP_CONSTS.ERROR_CRAWLER_FILTERS_BREAK)
1147  self.res = self.makeDefaultResponse(self.res)
1148  return False
1149  except NotModifiedException, err:
1150  status = dc_event.URL.STATUS_CRAWLED
1151  updateUDate = True
1152  if self.detectModified is not None:
1153  status, updateUDate = self.detectModified.notModifiedStateProcessing(self.batchItem.siteId, self.realUrl,
1154  self.dbWrapper, status, updateUDate)
1155  self.logger.debug("!!! URL is NOT MODIFIED. Update httpCode = %s, status = %s, updateUDate = %s",
1156  str(err.httpCode), str(status), str(updateUDate))
1157 
1158  self.updateURLForFailed(self.errorMask, err.httpCode, status, updateUDate)
1159  self.res = self.makeDefaultResponse(self.res, err.httpCode)
1160  return False
1161  except DatabaseException, err:
1162  self.errorMask = self.errorMask | APP_CONSTS.ERROR_DATABASE_ERROR
1163  self.updateURLForFailed(APP_CONSTS.ERROR_DATABASE_ERROR)
1164  self.res = self.makeDefaultResponse(self.res)
1165  return False
1166  except InternalCrawlerException, err:
1167  self.errorMask = self.errorMask | APP_CONSTS.ERROR_FETCHER_INTERNAL
1168  self.updateURLForFailed(APP_CONSTS.ERROR_FETCHER_INTERNAL)
1169  self.res = self.makeDefaultResponse(self.res)
1170  return False
1171  except Exception, err:
1172  self.errorMask = self.errorMask | APP_CONSTS.ERROR_GENERAL_CRAWLER
1173  self.updateURLForFailed(APP_CONSTS.ERROR_GENERAL_CRAWLER)
1174  ExceptionLog.handler(self.logger, err, "Crawler fatal error.", (err), \
1175  {ExceptionLog.LEVEL_NAME_ERROR:ExceptionLog.LEVEL_VALUE_DEBUG})
1176  return False
1177 
1178  return True
1179 
1180 
1181  # # Make defailt response
1182  #
1183  # @param response - instance of Response
1184  # @param httpCode - HTTP code
1185  # @return response - updated instance of Response
1186  def makeDefaultResponse(self, response, httpCode=CONSTS.HTTP_CODE_400):
1187 
1188  if response is None:
1189  response = Response()
1190  # set default values for response
1191  response.status_code = httpCode
1192  response.unicode_content = ""
1193  response.str_content = ""
1194  response.rendered_unicode_content = ""
1195  response.content_size = 0
1196  response.encoding = ""
1197  response.headers = {"content-length": 0, "content-type": ""}
1198  response.meta_res = ""
1199 
1200  return response
1201 
1202 
1203  # updateAVGSpeed update sites.AVGSpeed property
1204  def updateAVGSpeed(self):
1205  if self.res.status_code == CONSTS.HTTP_CODE_304:
1206  # not modified will return empty response body
1207  return
1208 
1209  avgSpeed = (self.site.avgSpeed * self.site.avgSpeedCounter + self.crawledResource.bps)\
1210  / (self.site.avgSpeedCounter + 1)
1211  self.site.avgSpeed = avgSpeed
1212  self.site.avgSpeedCounter += 1
1213 
1214  if self.dbWrapper is not None:
1215  localSiteUpdate = dc_event.SiteUpdate(self.batchItem.siteId)
1216  for attr in localSiteUpdate.__dict__:
1217  if hasattr(localSiteUpdate, attr):
1218  setattr(localSiteUpdate, attr, None)
1219  localSiteUpdate.id = self.batchItem.siteId
1220  localSiteUpdate.avgSpeed = avgSpeed
1221  localSiteUpdate.avgSpeedCounter = SQLExpression("`AVGSpeedCounter` + 1")
1222  self.dbWrapper.siteNewOrUpdate(localSiteUpdate)
1223 
1224 
1225  # saveCookies
1226  def saveCookies(self):
1227  self.logger.debug(MSG_INFO_STORE_COOKIES_FILE)
1228  timePostfix = ""
1229  if self.keep_old_resources:
1230  timePostfix = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
1231  self.logger.debug("self.storeCookies = %s", str(self.storeCookies))
1232  if self.storeCookies:
1233  # #cookies_str = [key + ": " + value + "; " for (key, value) in self.crawledResource.cookies.items()]
1234 # self.logger.debug("self.res.headers: '%s'", str(self.res.headers))
1235 # self.logger.debug("self.crawledResource.cookies: '%s'", str(self.crawledResource.cookies))
1236 
1237  cookies_str = ''
1238  if RequestsRedirectWrapper.RESPONSE_COOKIE_HEADER_NAME in self.res.headers:
1239  # set raw cookies string from header
1240  cookies_str = self.res.headers[RequestsRedirectWrapper.RESPONSE_COOKIE_HEADER_NAME]
1241 
1242  elif isinstance(self.crawledResource.cookies, dict):
1243  # set cookies from cookies dictionary
1244  cookies_str = ''.join([key + ": " + value + "; " for (key, value) in self.crawledResource.cookies.items()])
1245 
1246  self.makeDir()
1247  self.logger.debug("Response cookies string: %s", str(cookies_str))
1248  self.logger.debug("self.batchItem.urlId: %s", str(self.batchItem.urlId))
1249  self.logger.debug("timePostfix: %s", str(timePostfix))
1250  if timePostfix == "":
1251  base_path = os.path.join(self.dir, self.batchItem.urlId)
1252  else:
1253  base_path = os.path.join(self.dir, self.batchItem.urlId + "_" + str(timePostfix))
1254  cookies_file_name = base_path + COOKIES_FILE_POSTFIX
1255  with open(cookies_file_name, "wb") as f:
1256  # #f.write(''.join(sorted(cookies_str)))
1257  f.write(cookies_str)
1258 
1259 
1260  # #prepare request headers and cookie
1261  #
1262  # @param site - site object
1263  # TODO move to - [SiteProcess]
1265  # TODO not sure is it right that fetch headers and cookie
1266  # by `Name` = 'headers'/'cookie'
1267  self.storeHttpRequest = True
1268  self.store_http_headers = True
1269  self.headersDict = {}
1270  self.postForms = {}
1271  self.headers = None
1272  self.cookie = ''
1273  self.proxies = None
1274  self.authName = None,
1275  self.authPwd = None
1276  self.external_url = None
1277  self.auto_remove_props = {}
1278  self.htmlRecover = None
1279  self.autoDetectMime = None
1280  self.processContentTypes = []
1281  cookie = None
1282 
1283  try:
1284  # stub
1285  # self.logger.debug("self.site.properties: %s", varDump(self.site.properties))
1286 
1287  # Update site properties from batch item properties
1288  keys = [localProperty["name"] for localProperty in self.site.properties]
1289  # self.logger.debug("keys: %s" % str(keys))
1290  for key in self.batchItem.properties.keys():
1291  if key in keys:
1292  for localProperty in self.site.properties:
1293  if localProperty["name"] == key:
1294  self.logger.debug("%s present in site properties. Rewrite localProperty" % key)
1295  localProperty["value"] = self.batchItem.properties[key]
1296  else:
1297  self.logger.debug("%s not present in site properties. Add localProperty" % key)
1298  self.site.properties.append({"name":key, "value":self.batchItem.properties[key],
1299  "URLMd5":self.batchItem.urlId})
1300  # self.logger.debug("Updated site's properties: " + str(self.site.properties))
1301  self.siteProperties = {}
1302  for item in self.site.properties:
1303  self.siteProperties[item["name"]] = item["value"]
1304  self.initHTTPHeaders()
1305  if 'HTTP_COOKIE' in self.siteProperties and self.siteProperties['HTTP_COOKIE'] != "":
1306  self.cookieResolver = HTTPCookieResolver(self.siteProperties['HTTP_COOKIE'])
1307  else:
1309 
1310  if 'STORE_HTTP_REQUEST' in self.siteProperties:
1311  self.storeHttpRequest = int(self.siteProperties['STORE_HTTP_REQUEST']) != 0
1312  if 'STORE_HTTP_HEADERS' in self.siteProperties:
1313  self.store_http_headers = int(self.siteProperties['STORE_HTTP_HEADERS']) != 0
1314  if 'MIME_TYPE_STORE_ON_DISK' in self.siteProperties:
1315  allowMimes = self.siteProperties['MIME_TYPE_STORE_ON_DISK']
1316  if allowMimes is not None and allowMimes != '' and allowMimes != '*':
1317  self.needStoreMime = set([mime.lower() for mime in allowMimes.split(',')])
1318  if 'HTTP_AUTH_NAME' in self.siteProperties:
1319  self.authName = self.siteProperties['HTTP_AUTH_NAME']
1320  if 'HTTP_AUTH_PWD' in self.siteProperties:
1321  self.authPwd = self.siteProperties['HTTP_AUTH_PWD']
1322  for key in self.siteProperties.keys():
1323  if key.startswith('HTTP_POST_FORM_'):
1324  self.postForms[key[15:]] = self.siteProperties[key]
1325  if 'EXTERNAL_URL' in self.siteProperties:
1326  self.external_url = self.siteProperties['EXTERNAL_URL'] # pylint: disable=W0201
1327  if 'HTML_RECOVER' in self.siteProperties:
1328  self.htmlRecover = self.siteProperties['HTML_RECOVER'] # pylint: disable=W0201
1329  if 'MIME_TYPE_AUTO_DETECT' in self.siteProperties:
1330  self.autoDetectMime = self.siteProperties['MIME_TYPE_AUTO_DETECT'] # pylint: disable=W0201
1331  if 'ALLOWED_CTYPES' in self.siteProperties:
1332  self.processContentTypes = self.siteProperties['ALLOWED_CTYPES'].lower().split(',') # pylint: disable=W0201
1333  if 'PROCESSOR_NAME' in self.siteProperties:
1334  self.processorName = self.siteProperties['PROCESSOR_NAME']
1335  if DC_CONSTS.SITE_PROP_SAVE_COOKIES in self.siteProperties:
1336  self.storeCookies = int(self.siteProperties[DC_CONSTS.SITE_PROP_SAVE_COOKIES]) > 0
1337  if DC_CONSTS.SITE_PROP_AUTO_REMOVE_RESOURCES in self.siteProperties:
1338  self.auto_remove_props[DC_CONSTS.SITE_PROP_AUTO_REMOVE_RESOURCES] = \
1339  self.siteProperties[DC_CONSTS.SITE_PROP_AUTO_REMOVE_RESOURCES]
1340  if DC_CONSTS.SITE_PROP_AUTO_REMOVE_ORDER in self.siteProperties:
1341  self.auto_remove_props[DC_CONSTS.SITE_PROP_AUTO_REMOVE_ORDER] = \
1342  self.siteProperties[DC_CONSTS.SITE_PROP_AUTO_REMOVE_ORDER]
1343  if DC_CONSTS.SITE_PROP_AUTO_REMOVE_WHERE in self.siteProperties:
1344  self.auto_remove_props[DC_CONSTS.SITE_PROP_AUTO_REMOVE_WHERE] = \
1345  self.siteProperties[DC_CONSTS.SITE_PROP_AUTO_REMOVE_WHERE]
1346  if DC_CONSTS.SITE_PROP_AUTO_REMOVE_WHERE_ACTIVE in self.siteProperties:
1347  self.auto_remove_props[DC_CONSTS.SITE_PROP_AUTO_REMOVE_WHERE_ACTIVE] = \
1348  self.siteProperties[DC_CONSTS.SITE_PROP_AUTO_REMOVE_WHERE_ACTIVE]
1349  if 'URL_NORMALIZE_MASK' in self.siteProperties:
1350  self.normMask = int(self.siteProperties['URL_NORMALIZE_MASK'])
1351  if APP_CONSTS.URL_NORMALIZE in self.siteProperties:
1352  self.normMask = UrlNormalize.getNormalizeMask(siteProperties=self.siteProperties, defaultValue=self.normMask)
1353  if self.urlProcess is not None:
1354  self.urlProcess.normMask = self.normMask
1355  if 'HTTP_REDIRECTS_MAX' in self.siteProperties:
1356  self.max_http_redirects = int(self.siteProperties['HTTP_REDIRECTS_MAX'])
1357  if 'HTML_REDIRECTS_MAX' in self.siteProperties:
1358  self.max_html_redirects = int(self.siteProperties['HTML_REDIRECTS_MAX'])
1359  if 'COLLECT_URLS_XPATH_LIST' in self.siteProperties:
1360  self.urlXpathList = json.loads(self.siteProperties['COLLECT_URLS_XPATH_LIST'])
1361  if 'URL_TEMPLATE_REGULAR' in self.siteProperties:
1362  self.urlTempalteRegular = self.siteProperties['URL_TEMPLATE_REGULAR']
1363  if 'URL_TEMPLATE_REALTIME' in self.siteProperties:
1364  self.urlTempalteRealtime = self.siteProperties['URL_TEMPLATE_REALTIME']
1365  if 'URL_TEMPLATE_REGULAR_URLENCODE' in self.siteProperties:
1366  self.urlTempalteRegularEncode = self.siteProperties['URL_TEMPLATE_REGULAR_URLENCODE']
1367  if 'URL_TEMPLATE_REALTIME_URLENCODE' in self.siteProperties:
1368  self.urlTempalteRealtimeEncode = self.siteProperties['URL_TEMPLATE_REALTIME_URLENCODE']
1369  if 'PROTOCOLS' in self.siteProperties:
1370  if self.urlProcess is not None:
1371  self.urlProcess.setProtocols(self.siteProperties['PROTOCOLS'])
1372  if 'DETECT_MODIFIED' in self.siteProperties:
1373  self.detectModified = DetectModified(self.siteProperties['DETECT_MODIFIED'])
1374  if 'CONTENT_TYPE_MAP' in self.siteProperties:
1375  try:
1376  self.siteProperties['CONTENT_TYPE_MAP'] = json.loads(self.siteProperties['CONTENT_TYPE_MAP'])
1377  except Exception:
1378  self.siteProperties['CONTENT_TYPE_MAP'] = {}
1379 
1380  if self.defaultHeaderFile is not None:
1381  # read request headers from crawler-task_headers.txt file
1382  self.headers = CrawlerTask.readSmallFileContent(self.defaultHeaderFile)
1383 
1384  if self.cookieResolver is None:
1385  # read request cookies from crawler-task_headers.txt file
1386  cookie = CrawlerTask.readSmallFileContent(self.defaultCookieFile)
1387 
1388  if cookie is not None and cookie != "":
1389  if cookie.lower().startswith('cookie:'):
1390  self.cookie = cookie[len('cookie:'):]
1391  else:
1392  self.cookie = cookie
1393 
1394  # TODO: algorithm is not clear
1395  for header in self.headers.splitlines():
1396  if not header:
1397  continue
1398  try:
1399  key, value = header[:header.index(':')].strip(), header[header.index(':') + len(':'):].strip()
1400  except Exception:
1401  self.logger.debug("header:%s", header)
1402 
1403  if key[0] != '#':
1404  self.headersDict[key] = value
1405  if self.cookie != "":
1406  self.headersDict['Cookie'] = self.cookie
1407 
1408  self.logger.debug("proxies: %s", self.proxies)
1409  self.urlProcess.siteProperties = self.siteProperties
1410  except Exception, err:
1411  ExceptionLog.handler(self.logger, err, MSG_ERROR_LOAD_SITE_PROPERTIES)
1412  self.errorMask |= APP_CONSTS.ERROR_CRAWLER_FATAL_INITIALIZATION_PROJECT_ERROR
1413 
1414 
1415  # # initHeaders initialize HTTP request headers set
1416  #
1417  def initHTTPHeaders(self):
1418  if 'HTTP_HEADERS' in self.siteProperties and self.siteProperties['HTTP_HEADERS'] != "":
1419  lh = self.siteProperties['HTTP_HEADERS']
1420  if lh.startswith('file://'):
1421  lh = Utils.loadFromFileByReference(fileReference=lh, loggerObj=self.logger)
1422  self.siteHeaders = {'User-Agent': [h.strip() for h in lh.split("\n") if len(h) > 0 and h[0] != '#']}
1423  else:
1424  self.siteHeaders = self.hTTPHeadersStorage.extractSiteStorageElement(lh)
1425  for lh in self.siteHeaders:
1426  if isinstance(self.siteHeaders[lh], basestring):
1427  if self.siteHeaders[lh].startswith('file://'):
1428  self.siteHeaders[lh] = Utils.loadFromFileByReference(fileReference=lh, loggerObj=self.logger)
1429  else:
1430  self.siteHeaders[lh] = [self.siteHeaders[lh]]
1431 
1432 
1433 # # # # resolveProxy resolve http proxies for current url
1434 # # #
1435 # # def resolveProxy(self, tmpProxies):
1436 # # ret = None
1437 # # if self.url is not None:
1438 # # self.proxyResolver = ProxyResolver(self.siteProperties, self.dbWrapper, self.batchItem.siteId, self.url.url)
1439 # # result = self.proxyResolver.getProxy(tmpProxies)
1440 # # if result is not None:
1441 # # ret = ("http", result[0], result[1], None, None)
1442 # # return ret
1443 
1444 
1445  # # readSmallFileContent read small file content
1446  #
1447  # @param path the file path to read
1448  @staticmethod
1450  with open(path, 'r') as f:
1451  return ''.join(f.readlines())
1452 
1453 
1454  # #update site params
1455  # update site params
1456  # @param mask ErrorMask
1457  # @param is_suspend should set state to suspend for this site
1458  def updateSiteParams(self, mask, is_suspend=False):
1459  ret = False
1460  if self.dbWrapper is None:
1461  ret = True
1462  else:
1463  try:
1464  localSiteUpdate = dc_event.SiteUpdate(self.batchItem.siteId)
1465  for attr in localSiteUpdate.__dict__:
1466  if hasattr(localSiteUpdate, attr):
1467  setattr(localSiteUpdate, attr, None)
1468 
1469  # TODO: possible redundant for the URLFetch algorithm and need to be removed
1470  if mask:
1471  # localSiteUpdate.errors = SQLExpression("`Errors` + 1")
1472  localSiteUpdate.errorMask = SQLExpression(("`ErrorMask` | %s" % mask))
1473  if is_suspend:
1474  localSiteUpdate.state = Site.STATE_SUSPENDED
1475 
1476  localSiteUpdate.id = self.batchItem.siteId
1477  localSiteUpdate.updateType = dc_event.SiteUpdate.UPDATE_TYPE_UPDATE
1478  updated_count = self.dbWrapper.siteNewOrUpdate(siteObject=localSiteUpdate, stype=dc_event.SiteUpdate)
1479  if updated_count > 0:
1480  ret = True
1481  except DatabaseException, err:
1482  ExceptionLog.handler(self.logger, err, MSG_ERROR_UPDATE_SITE_DATA, (err))
1483  raise err
1484  except Exception, err:
1485  ExceptionLog.handler(self.logger, err, MSG_ERROR_UPDATE_SITE_DATA, (err))
1486  raise err
1487 
1488  return ret
1489 
1490 
1491  # # update site use sql expression evaluator
1492  #
1493  # @param - None
1494  # @return ret - True if success or False otherwise
1495  def updateSite(self):
1496  ret = False
1497 
1498  if self.dbWrapper is None:
1499  ret = True
1500  else:
1501  if self.site is not None and APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_CRAWLER in self.siteProperties:
1502  try:
1503  localSiteUpdate = dc_event.SiteUpdate(self.batchItem.siteId)
1504  for attr in localSiteUpdate.__dict__:
1505  if hasattr(localSiteUpdate, attr):
1506  setattr(localSiteUpdate, attr, None)
1507 
1508  # Evaluate 'Site' class values if neccessary
1509  changedFieldsDict = FieldsSQLExpressionEvaluator.execute(self.siteProperties, self.dbWrapper, self.site,
1510  None, self.logger,
1511  APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_CRAWLER)
1512  # Update 'Site' class values
1513  for name, value in changedFieldsDict.items():
1514  if hasattr(localSiteUpdate, name) and value is not None and name not in ['CDate', 'UDate', 'tcDate']:
1515  setattr(localSiteUpdate, name, value)
1516 
1517  localSiteUpdate.errorMask = SQLExpression(("`ErrorMask` | %s" % self.site.errorMask))
1518  localSiteUpdate.id = self.batchItem.siteId
1519  localSiteUpdate.updateType = dc_event.SiteUpdate.UPDATE_TYPE_UPDATE
1520 
1521  updatedCount = self.dbWrapper.siteNewOrUpdate(siteObject=localSiteUpdate, stype=dc_event.SiteUpdate)
1522  self.logger.debug('Updated ' + str(updatedCount) + ' rows.')
1523  if updatedCount > 0:
1524  ret = True
1525  except DatabaseException, err:
1526  self.logger.error("Update 'Site' failed, error: %s", str(err))
1527  raise err
1528  except Exception, err:
1529  self.logger.error("Update 'Site' failed, error: %s", str(err))
1530  raise err
1531 
1532  return ret
1533 
1534 
1535  # #Check sevarl common conditions
1536  #
1537  # @return Boolean that means - should continue execute this BatchItem or not
1538  def commonChecks(self, urlObj):
1539  if self.site is None or urlObj is None:
1540  self.logger.error('Error: self.site or urlObj is None!')
1541  return False
1542  # self.logger.debug("urlObj:" + str(urlObj))
1543 
1544  if((self.batch.crawlerType != dc_event.Batch.TYPE_REAL_TIME_CRAWLER) and (self.site.state != Site.STATE_ACTIVE)) \
1545  or ((self.batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER) and (self.site.state == Site.STATE_DISABLED)):
1546  self.logger.debug("Warning: Batch CrawlerType: %s, site state is %s but is not STATE_ACTIVE!"
1547  % (str(self.batch.crawlerType), str(self.site.state)))
1548  self.errorMask = self.errorMask | APP_CONSTS.ERROR_MASK_SITE_STATE
1549  return False
1550 
1551  if not self.isRootURL(urlObj):
1552  if self.site.maxErrors > 0 and self.site.errors > self.site.maxErrors:
1553  self.logger.debug("Site max errors: %s limit: %s is reached", str(self.site.errors), str(self.site.maxErrors))
1554  # TODO: possible improve suspend logic to use the property to define, suspend the site if limit reached or not
1555  # self.updateSiteParams(CONSTS.ERROR_SITE_MAX_ERRORS, True)
1556  self.errorMask = self.errorMask | APP_CONSTS.ERROR_SITE_MAX_ERRORS
1557  self.updateSiteParams(APP_CONSTS.ERROR_SITE_MAX_ERRORS)
1558  return False
1559 
1560  return True
1561 
1562 
1563  # #Is URL object a root URL
1564  #
1565  # @param urlObj
1566  # @param urlString
1567  # @return True if URL object a root URL or False
1568  def isRootURL(self, urlObj, urlString=None):
1569  ret = False
1570 
1571  if urlString is None:
1572  if urlObj.parentMd5 == '':
1573  ret = True
1574  else:
1575  if urlString == '':
1576  ret = True
1577 
1578  return ret
1579 
1580 
1581  # # load site structure
1582  # the site object to crawl
1583  # @param batch - batch object instance
1584  def loadSite(self, batch):
1585  try:
1586  # # FIXED alexv 2015-11-11
1587  # if not len(self.batchItem.siteId):
1588  # self.batchItem.siteId = "0"
1589 
1590  if batch.crawlerType != dc_event.Batch.TYPE_REAL_TIME_CRAWLER: # # FIXED alexv 2017-07-24
1591  self.readSiteFromDB()
1592 
1593  # Check if site not exist then read site for siteId=0
1594  if self.site.id == SITE_MD5_EMPTY and bool(self.useZeroSiteIdSiteNotExists):
1595  self.logger.debug("Site not found. Assume a site id as: `0`")
1596  self.batchItem.siteId = '0'
1597  self.batchItem.urlObj.siteId = '0'
1598  self.readSiteFromDB()
1599  else:
1600  # Create empty Site object
1601  self.site = dc_event.Site("")
1602  self.site.id = '0'
1603 
1604  # self.logger.debug(">>> site before = " + varDump(self.site))
1605  if self.site is not None and self.batchItem.siteObj is not None:
1606  self.site.rewriteFields(self.batchItem.siteObj)
1607  # self.logger.debug(">>> site after = " + varDump(self.site))
1608  except Exception as err:
1609  ExceptionLog.handler(self.logger, err, MSG_ERROR_LOAD_SITE_DATA, (err))
1610  self.errorMask |= APP_CONSTS.ERROR_CRAWLER_FATAL_INITIALIZATION_PROJECT_ERROR
1611  raise err
1612 
1613 
1614  # #load url
1615  # the site object to crawl
1616  # @param site - object to crawl
1617  def readSiteFromDB(self):
1618  siteStatus = dc_event.SiteStatus(self.batchItem.siteId)
1619  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.SITE_STATUS, siteStatus)
1620  responseDRCESyncTasksCover = self.dbWrapper.process(drceSyncTasksCoverObj)
1621  self.site = None
1622  self.siteTable = DC_SITES_TABLE_NAME
1623  try:
1624  self.site = responseDRCESyncTasksCover.eventObject
1625  except Exception as err:
1626  ExceptionLog.handler(self.logger, err, MSG_ERROR_READ_SITE_FROM_DB, (err))
1627  raise Exception
1628 
1629 
1630  # #load url
1631  # the site object to crawl
1632  # @param site - object to crawl
1633  def loadURL(self):
1634  # do siteId
1635 
1636  if len(self.batchItem.siteId):
1637  self.urlTable = DC_URLS_TABLE_PREFIX + self.batchItem.siteId
1638  else:
1639  self.urlTable = DC_URLS_TABLE_PREFIX + "0"
1640  self.logger.debug("db backend Table for siteId %s is: %s" % (self.batchItem.siteId, self.urlTable))
1641  self.urlProcess.siteId = self.batchItem.siteId
1642  self.urlProcess.urlTable = self.urlTable
1643  self.url = self.batchItem.urlObj
1644  self.urlProcess.urlDBSync(self.batchItem, self.batch.crawlerType, self.site.recrawlPeriod,
1645  self.auto_remove_props)
1646  if self.urlProcess.isUpdateCollection:
1647  self.updateCollectedURLs()
1648 
1649 
1650  # #resetVars
1651  def resetVars(self):
1652  self.needStoreMime = None # pylint: disable=F0401,W0201
1653  self.site = None # pylint: disable=F0401,W0201
1654  self.headersDict = None # pylint: disable=F0401,W0201
1655  self.store_http_headers = True # pylint: disable=F0401,W0201
1656  self.cookie = None # pylint: disable=F0401,W0201
1657  self.proxies = None # pylint: disable=F0401,W0201
1658  self.realUrl = None # pylint: disable=F0401,W0201
1659  self.url = None
1660  self.crawledResource = None
1661  self.headers = None # pylint: disable=F0401,W0201
1662  self.crawledTime = None # pylint: disable=F0401,W0201
1663  self.storeHttpRequest = True # pylint: disable=F0401,W0201
1664  self.dir = None # pylint: disable=F0401,W0201
1665  self.kvConnector = None # pylint: disable=F0401,W0201
1666  self.kvCursor = None # pylint: disable=F0401,W0201
1667  self.processorName = None # pylint: disable=F0401,W0201
1668  self.auto_remove_props = None # pylint: disable=F0401,W0201
1669  self.storeCookies = True # pylint: disable=F0401,W0201
1670  self.allow_http_redirects = True
1671  self.allow_html_redirects = True # pylint: disable=F0401,W0201
1672  self.httpRedirects = 0 # pylint: disable=F0401,W0201
1673  self.htmlRedirects = 0 # pylint: disable=F0401,W0201
1674  self.max_http_redirects = CONSTS.MAX_HTTP_REDIRECTS_LIMIT # pylint: disable=F0401,W0201
1675  self.max_html_redirects = CONSTS.MAX_HTML_REDIRECTS_LIMIT # pylint: disable=F0401,W0201
1676  self.dom = None
1677  self.res = None # pylint: disable=F0401,W0201
1678  self.postForms = None # pylint: disable=F0401,W0201
1679  self.authName = None # pylint: disable=F0401,W0201
1680  self.authPwd = None # pylint: disable=F0401,W0201
1681  self.siteProperties = {} # pylint: disable=F0401,W0201
1682  self.urlXpathList = {} # pylint: disable=F0401,W0201
1683  self.errorMask = APP_CONSTS.ERROR_OK
1684  self.feed = None # pylint: disable=F0401,W0201
1685  self.feedItems = None
1686  self.urlTempalteRegular = None # pylint: disable=F0401,W0201
1687  self.urlTempalteRealtime = None # pylint: disable=F0401,W0201
1688  self.urlTempalteRegularEncode = None # pylint: disable=F0401,W0201
1689  self.urlTempalteRealtimeEncode = None # pylint: disable=F0401,W0201
1690  self.detectModified = None # pylint: disable=F0401,W0201
1691  self.collectURLsItems = []
1692  self.schemaBatchItems = []
1693 
1694 
1695  # #updateBatchItemAfterCarwling fills some field in the class fields objects after crawling
1696  def updateBatchItemAfterCarwling(self, status=dc_event.URL.STATUS_CRAWLED):
1697  self.urlProcess.urlObj = self.url
1698  self.urlProcess.siteId = self.batchItem.siteId
1699  self.logger.debug("set siteId = '" + str(self.urlProcess.siteId) + "' from 'updateBatchItemAfterCarwling'")
1700 
1701  # Update status value accord to dict HTTP codes
1702  if "HTTP_CODE_STATUS_UPDATE" in self.siteProperties and self.siteProperties["HTTP_CODE_STATUS_UPDATE"] != "":
1703  self.logger.debug('!!!!!! HTTP_CODE_STATUS_UPDATE !!!!! ')
1704  # Default value
1705  status = dc_event.URL.STATUS_CRAWLED
1706  try:
1707  statusDict = json.loads(self.siteProperties["HTTP_CODE_STATUS_UPDATE"])
1708  self.logger.debug('!!!!!! statusDict: ' + str(statusDict))
1709  if str(self.crawledResource.http_code) in statusDict:
1710  self.logger.debug("Change status from (%s) to (%s), because http_code = %s", str(status), \
1711  str(statusDict[str(self.crawledResource.http_code)]), str(self.crawledResource.http_code))
1712  status = int(statusDict[str(self.crawledResource.http_code)])
1713  except Exception, err:
1714  self.logger.error("Load property 'HTTP_CODE_STATUS_UPDATE' has error: " + str(err))
1715 
1716  if status is not None:
1717  self.batchItem.urlObj.status = dc_event.URL.STATUS_CRAWLED
1718  self.batchItem.urlObj.crawlingTime = self.crawledResource.crawling_time
1719  self.batchItem.urlObj.errorMask |= self.crawledResource.error_mask
1720  if self.batchItem.urlObj.charset == "":
1721  self.batchItem.urlObj.charset = self.crawledResource.charset
1722  if self.batchItem.urlObj.httpCode == 0:
1723  self.batchItem.urlObj.httpCode = self.crawledResource.http_code
1724  if self.batchItem.urlObj.contentType == "":
1725  self.batchItem.urlObj.contentType = self.crawledResource.content_type
1726  self.batchItem.urlObj.crawled = 1
1727  if self.batchItem.urlObj.size == 0:
1728  self.batchItem.urlObj.size = self.res.content_size
1729  self.batchItem.urlObj.CDate = self.batchItem.urlObj.UDate = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
1730  self.batchItem.urlObj.tagsCount = 0
1731  self.batchItem.urlObj.tagsMask = 0
1732 
1733  if self.urlProcess.siteProperties is None:
1734  self.urlProcess.siteProperties = self.siteProperties
1735 
1736  self.urlProcess.updateCrawledURL(self.crawledResource, self.batchItem, self.res.content_size, status)
1737 
1738 
1739  # #process batch item
1740  # the batch item is the one from list of batch items within batch object
1741  # @param item - the batch item is the one from list of batch items within batch object
1742  def processBatchItem(self):
1743  self.resetVars()
1744  self.logger.info("Batch item START, siteId: %s, urlId: %s, self.batch.crawlerType: %s",
1745  self.batchItem.siteId, self.batchItem.urlId, str(self.batch.crawlerType))
1746  isResourceCrawled = False
1747  nextStep = True
1748  detectedMime = None
1749  mpLogger = Utils.MPLogger()
1750  self.urlProcess.siteId = self.batchItem.siteId
1751 
1752  try:
1753  # self.logger.debug("BatchItem: %s" % varDump(self.batchItem))
1754  self.loadSite(self.batch)
1755 
1756  # prepare headers and cookie
1757  self.readSiteProperties()
1758 
1759  if 'LOGGER' in self.siteProperties and self.siteProperties['LOGGER'] is not None and\
1760  self.siteProperties['LOGGER'] != '':
1761  self.logger.info("Switch logger to dedicated project %s log", str(self.batchItem.siteId))
1762  self.setLogConfigFileProject(mpLogger, self.batchItem.siteId, self.siteProperties['LOGGER'])
1763 
1764  # self.batchItem = self.changeBatchItemByUrlSchema(self.batchItem, self.batch.id)
1765  self.setChainId(self.batchItem)
1766  if self.robotsParser is not None:
1767  self.robotsParser.\
1768  initFiends(None, (bool(int(self.siteProperties['ROBOTS_CACHE'])) if \
1769  'ROBOTS_CACHE' in self.siteProperties else False),
1770  self.robotsFileDir)
1771  # Check if Real-Time crawling
1772  if self.batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER:
1773  if self.batchItem.urlObj.urlPut is not None:
1774  self.batchItem.urlObj.contentMask = dc_event.URL.CONTENT_STORED_ON_DISK
1775  # save tidy recovered file
1776  if "data" in self.batchItem.urlObj.urlPut.putDict:
1777  raw_unicode_content = self.batchItem.urlObj.urlPut.putDict["data"]
1778  localDom = None
1779  if self.htmlRecover is not None and self.htmlRecover == "2":
1780  localDom = self.resourceProcess.\
1781  domParser(None, raw_unicode_content, CONSTS.HTTP_CODE_200,
1782  self.crawledResource.charset if self.crawledResource is not None else None)
1783  if self.htmlRecover is not None and (self.htmlRecover == "1" or self.htmlRecover == "2" and \
1784  localDom is None):
1785  tidy_content = tidylib.tidy_document(raw_unicode_content, self.tidyOptions)[0]
1786  self.batchItem.urlObj.urlPut.putDict["data"] = base64.b64encode(tidy_content)
1787  # TODO: need for comments
1788  if self.batch.dbMode & dc_event.Batch.DB_MODE_W == 0:
1789  self.url = self.batchItem.urlObj # pylint: disable=W0201
1790  else:
1791  self.loadURL()
1792 
1793  if not self.commonChecks(self.url):
1794  self.logger.debug("Common checks failed!")
1795  nextStep = False
1796  # self.batchItem = None
1797  # return self.batchItem
1798 
1799  if nextStep and "HTTP_FREQ_LIMITS" in self.siteProperties:
1800  hostRequestStorage = HostRequestStorage(self.siteProperties["HTTP_FREQ_LIMITS"])
1801  if hostRequestStorage.checkHost(None, self.url.url, self.batchItem.siteId) == HostRequestStorage.ITEM_BREAK:
1802  self.logger.debug(">>> Skip url [%s] by http requests freq", self.url.url)
1803  self.url.status = dc_event.URL.STATUS_NEW
1804  self.url.errorMask = APP_CONSTS.ERROR_NO_TIME_WINDOW
1805  self.url.httpCode = CONSTS.HTTP_CODE_400
1806  self.urlProcess.updateURLStatus(self.batchItem.urlId, self.url.status)
1807  nextStep = False
1808 
1809  # Check if Real-Time crawling resource already crawled
1810  if nextStep and self.batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER:
1811  # TODO: strange condition: self.url.tagsCount > 0
1812  if self.url.crawled > 0 and self.url.errorMask == APP_CONSTS.ERROR_OK and self.url.tagsCount > 0 and\
1813  self.errorMask == APP_CONSTS.ERROR_OK:
1814  self.logger.debug("RealTime Crawling: Cashed resource. Resource crawled and error mask is empty")
1815  if PCONSTS.RECRAWL_KEY not in self.batchItem.properties or \
1816  int(self.batchItem.properties[PCONSTS.RECRAWL_KEY]) == PCONSTS.RECRAWL_VALUE_NO:
1817  self.logger.debug("Item not need to be recrawled.")
1818  # set contentMask stored on disk
1819  self.batchItem.urlObj.contentMask = dc_event.URL.CONTENT_STORED_ON_DISK
1820  # return self.batchItem
1821  nextStep = False
1822  else:
1823  self.logger.debug("Property `recrawl` = %s. Item recrawling." % \
1824  str(self.batchItem.properties[PCONSTS.RECRAWL_KEY]))
1825 
1826  if nextStep:
1827  # reset error mask for current crawled url
1828  self.urlProcess.resetErrorMask(self.batchItem)
1829  # update state to crawling
1830  self.urlProcess.urlObj = self.url
1831  self.urlProcess.updateURL(self.batchItem, self.batch.id)
1832  self.logger.debug('Start to crawl item')
1833  isResourceCrawled = self.crawl(self.batchItem.urlObj.urlPut.putDict["data"] if \
1834  (self.batchItem.urlObj.urlPut is not None and \
1835  self.batchItem.urlObj.urlPut.putDict is not None and "data" in \
1836  self.batchItem.urlObj.urlPut.putDict) else None)
1837 
1838  self.logger.debug("After crawl() isResourceCrawled: %s", str(isResourceCrawled))
1839 
1840  if self.crawledResource is not None and isResourceCrawled:
1841  # TODO move to - [SiteProcess]
1842  self.updateAVGSpeed()
1843  # Build DOM object
1844  if self.batchItem.urlObj.type == dc_event.URL.TYPE_SINGLE:
1845  self.logger.debug('URL type single, do not pars and build DOM, set self.dom = None')
1846  self.dom = None
1847  else:
1848  self.logger.debug('Build DOM, call domParser()\n self.crawledResource.charset = ' + \
1849  str(self.crawledResource.charset))
1850  self.dom = self.resourceProcess.domParser(self.htmlRecover, self.crawledResource.html_content,
1851  self.crawledResource.http_code,
1852  self.crawledResource.charset if self.crawledResource is not None\
1853  else None)
1854  # # check dom value
1855  if self.dom is None:
1856  self.errorMask |= APP_CONSTS.ERROR_PARSE_ERROR
1857  self.updateURLForFailed(self.errorMask)
1858 
1859  # Set pDate accord to site property 'PDATE_SOURCE_MASK'
1860  self.logger.debug('>>>>> self.crawledResource.last_modified = ' + str(self.crawledResource.last_modified))
1861 
1862  self.logger.debug('>>> Before getPubdateUseSourceMask() self.batchItem.urlObj.pDate = ' + \
1863  str(self.batchItem.urlObj.pDate))
1864  self.batchItem.urlObj.pDate = self.getPubdateUseSourceMask(self.siteProperties, \
1865  self.crawledResource, \
1866  self.batchItem.urlObj)
1867  self.logger.debug('>>> After getPubdateUseSourceMask() self.batchItem.urlObj.pDate = ' + \
1868  str(self.batchItem.urlObj.pDate))
1869 
1870  # Add 'feed_url' to headers
1871  if self.processorName == PCONSTS.PROCESSOR_FEED_PARSER or self.processorName == PCONSTS.PROCESSOR_RSS:
1872  parentUrl = self.refererHeaderResolver.fetchParentUrl(self.batchItem.siteId,
1873  self.url.parentMd5,
1874  self.dbWrapper)
1875  self.logger.debug("!!! parentUrl: %s", str(parentUrl))
1876  self.logger.debug("!!! self.url.parentMd5: %s", str(self.url.parentMd5))
1877  self.logger.debug("!!! self.url.url: %s", str(self.url.url))
1878  self.logger.debug("!!! self.feedUrl: %s", str(self.feedUrl))
1879 
1880  if self.url.parentMd5 == "":
1881  self.feedUrl[hashlib.md5(self.url.url).hexdigest()] = self.url.url
1882  elif self.url.parentMd5 != "" and parentUrl is None:
1883  if self.url.parentMd5 in self.feedUrl:
1884  parentUrl = self.feedUrl[self.url.parentMd5]
1885 
1886  if parentUrl is not None and parentUrl != "":
1887  self.addFeedUrlToHeader(self.crawledResource, parentUrl)
1888 
1889  # Add 'base_url' to batch item and header
1890  self.batchItem.baseUrl = self.extractBaseUrl(self.crawledResource.html_content, self.batchItem.urlObj.url)
1891  self.addBaseUrlToHeader(self.crawledResource, self.batchItem.baseUrl)
1892 
1893  self.getDir()
1894  try:
1895  if self.processorName != PCONSTS.PROCESSOR_FEED_PARSER:
1896  self.writeData()
1897  try:
1898  self.saveCookies()
1899  except Exception, err:
1900  self.logger.error("saveCookies fail: %s\n%s", str(err), Utils.getTracebackInfo())
1901 
1902  if self.autoDetectMime is not None and \
1903  ResourceProcess.isAllowedReplaceMimeType(self.autoDetectMime, self.batchItem.urlObj):
1904  detectedMime = self.resourceProcess.\
1905  mimeDetectByContent(self.crawledResource, self.siteProperties["CONTENT_TYPE_MAP"] if \
1906  "CONTENT_TYPE_MAP" in self.siteProperties else None, self.batchItem.urlObj)
1907  except Exception, err:
1908  self.errorMask |= APP_CONSTS.ERROR_WRITE_FILE_ERROR
1909  self.updateURLForFailed(self.errorMask)
1910  self.logger.error(MSG_ERROR_WRITE_CRAWLED_DATA + ': ' + str(err))
1911  else:
1912  nextStep = False
1913 
1914  if nextStep:
1915  self.logger.debug("Enter in collectURLs()")
1916  self.collectURLs()
1917 
1918  if self.batchItem.urlObj.chainId is not None and self.batchItem.urlObj.chainId in self.chainDict and \
1919  self.chainDict[self.batchItem.urlObj.chainId]["batchItem"].urlId == self.batchItem.urlId:
1920  self.updateBatchItemAfterCarwling(None)
1921  self.logger.debug(">>> ChainId, update URL without status")
1922  else:
1924  # self.logger.debug('self.feed: %s, self.feedItems: %s', str(self.feed), str(self.feedItems))
1925  # TODO move to - [UrlProcess]
1926  self.urlProcess.siteId = self.batchItem.siteId
1927  # self.logger.debug("!!! detectedMime = %s, self.autoDetectMime = %s ",
1928  # str(detectedMime), str(self.autoDetectMime))
1929  if self.res is not None:
1930  self.urlProcess.updateCollectTimeAndMime(detectedMime, self.batchItem, self.crawledTime,
1931  self.autoDetectMime, self.res.headers, self.res.str_content)
1932  else:
1933  self.urlProcess.updateCollectTimeAndMime(detectedMime, self.batchItem, self.crawledTime,
1934  self.autoDetectMime)
1935  # (1) END
1936 
1937  # # Update site properties if necessary
1938  if APP_CONSTS.SQL_EXPRESSION_FIELDS_UPDATE_CRAWLER in self.siteProperties:
1939  self.updateSite()
1940 
1941  except DatabaseException, err:
1942  self.errorMask |= APP_CONSTS.ERROR_DATABASE_ERROR
1943  ExceptionLog.handler(self.logger, err, MSG_ERROR_PROCESS_BATCH_ITEM, (err))
1944  except SyncronizeException, err:
1945  self.errorMask |= APP_CONSTS.ERROR_SYNCHRONIZE_URL_WITH_DB
1946  ExceptionLog.handler(self.logger, err, MSG_ERROR_PROCESS_BATCH_ITEM, (err), \
1947  {ExceptionLog.LEVEL_NAME_ERROR:ExceptionLog.LEVEL_VALUE_DEBUG})
1948  except Exception as err:
1949  ExceptionLog.handler(self.logger, err, MSG_ERROR_PROCESS_BATCH_ITEM, (err))
1950 
1951  if 'LOGGER' in self.siteProperties and self.siteProperties['LOGGER'] is not None and\
1952  self.siteProperties['LOGGER'] != '':
1953  self.logger.info("Switch logger back to default from dedicated for project %s", str(self.batchItem.siteId))
1954  self.setLogConfigFileDefault(mpLogger)
1955  self.logger.info("Switched logger back to default from dedicated for project %s", str(self.batchItem.siteId))
1956 
1957  if self.dbWrapper is not None:
1958  # Update db counters
1959  self.logger.debug('>>>>> Before self.dbWrapper.fieldsRecalculating([self.batchItem.siteId])')
1960  self.dbWrapper.fieldsRecalculating([self.batchItem.siteId])
1961  self.logger.info("Batch item FINISH, siteId: %s, urlId: %s" % (self.batchItem.siteId, self.batchItem.urlId))
1962 
1963 
1964  # #process batch
1965  # the main processing of the batch object
1966  def processBatch(self):
1967  try:
1968  input_pickled_object = sys.stdin.read()
1969  # self.logger.debug("len(input_pickle)=%i", len(input_pickled_object))
1970  input_batch = pickle.loads(input_pickled_object)
1971 
1972  if input_batch.crawlerType != dc_event.Batch.TYPE_REAL_TIME_CRAWLER:
1973  self.urlProcess.dbWrapper = self.dbWrapper
1974  self.resourceProcess.dbWrapper = self.dbWrapper
1975 
1976  app.Profiler.messagesList.append("Batch.id: " + str(input_batch.id))
1977  self.logger.info("Input batch id: %s, items: %s", str(input_batch.id), str(len(input_batch.items)))
1978  # self.logger.debug("input_batch:\n" + varDump(input_batch))
1979  self.logger.debug("len before (batch_items)=%i", len(input_batch.items))
1980  self.generateBatchitemsByURLSchema(input_batch)
1981  self.logger.debug("len after (batch_items)=%i", len(input_batch.items))
1982 
1983  if int(input_batch.maxExecutionTime) > 0:
1984  self.maxExecutionTimeValue = input_batch.maxExecutionTime
1985  signal.signal(signal.SIGALRM, self.signalHandlerTimer)
1986  signal.alarm(self.maxExecutionTimeValue)
1987  self.removeUnprocessedItems = bool(input_batch.removeUnprocessedItems)
1988  self.logger.debug("Set maxExecutionTime = %s, removeUnprocessedItems = %s",
1989  str(self.maxExecutionTimeValue), str(self.removeUnprocessedItems))
1990 
1991  self.batch = input_batch
1992  Utils.storePickleOnDisk(input_pickled_object, ENV_CRAWLER_STORE_PATH, "crawler.in." + str(self.batch.id))
1993  batch_items = []
1994  self.curBatchIterations = 1
1995  maxBatchIterations = input_batch.maxIterations
1996  # input_batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER:
1997  # TODO: temporary set as 2 to test w/o support on client side
1998  # maxBatchIterations = 2
1999  self.logger.debug("maxBatchIterations=%s", str(maxBatchIterations))
2000  if self.batch.dbMode & dc_event.Batch.DB_MODE_W == 0:
2001  if self.dbWrapper is not None:
2002  self.dbWrapper.affect_db = False
2003  while True:
2004  self.curBatchIterations += 1
2005  beforeItems = 0
2006  self.logger.debug("self.curBatchIterations=%s, beforeItems=%s", str(self.curBatchIterations), str(beforeItems))
2007  for index, batchItem in enumerate(self.batch.items):
2008  # check max execution time
2009  if self.maxExecutionTimeReached:
2010  self.logger.debug("Maximum execution time %ss reached, process batch items loop interrupted!",
2011  str(self.maxExecutionTimeValue))
2012  self.errorMask = APP_CONSTS.ERROR_MAX_EXECUTION_TIME
2013  if self.removeUnprocessedItems:
2014  break
2015  else:
2016  batch_items.append(batchItem)
2017  continue
2018 
2019  if batchItem.urlObj.status == dc_event.URL.STATUS_NEW or \
2020  batchItem.urlObj.status == dc_event.URL.STATUS_SELECTED_CRAWLING or \
2021  batchItem.urlObj.status == dc_event.URL.STATUS_SELECTED_CRAWLING_INCREMENTAL:
2022  self.errorMask = batchItem.urlObj.errorMask
2023  batchItem.urlObj.status = dc_event.URL.STATUS_CRAWLING
2024  self.batchItem = batchItem
2025  self.logger.debug('========== log flush for batchId = ' + str(self.batch.id) +
2026  ' batchItem index = ' + str(index))
2027  Utils.loggerFlush(self.logger) # flush logger
2028  self.processBatchItem()
2029  self.logger.debug('========== log flush for batchId = ' + str(self.batch.id))
2030  Utils.loggerFlush(self.logger) # flush logger
2031  self.fillChainUrlMD5List(self.batchItem)
2032  # Currently we update URL.ErrorMask for each url in batch
2033  self.updateUrlObjInBatchItem(batchItem.urlObj)
2034 
2035  if self.processorName != PCONSTS.PROCESSOR_RSS and not self.batchItem:
2036  self.logger.debug('!!! Before self.updateBatchItem(self.batchItem)')
2037  # self.updateBatchItem(batchItem)
2038  self.updateBatchItem(self.batchItem)
2039  input_batch.errorMask |= self.errorMask
2040  elif self.processorName != PCONSTS.PROCESSOR_RSS:
2041  self.logger.debug('!!! Before self.updateBatchItem(batchItem)')
2042  # self.updateBatchItem(self.batchItem)
2043  # batch_items.append(self.batchItem)
2044  self.updateBatchItem(batchItem)
2045  batch_items.append(batchItem)
2046 
2047  # Extend self.store_items with new batch items if iterations >1 and not last iteration
2048  if maxBatchIterations > 1 and self.curBatchIterations <= maxBatchIterations:
2050  beforeItems += len(self.collectURLsItems)
2051 
2052  # self.logger.debug("!!! self.collectURLsItems: " + varDump(self.collectURLsItems))
2053  # Exit the batch cycling if no one new item added or max iterations reached
2054  if self.curBatchIterations > maxBatchIterations or beforeItems == 0:
2055  self.logger.debug("Exit from batching iteration:" + \
2056  "self.curBatchIterations=%s, maxBatchIterations=%s, beforeItems=%s, self.store_items=%s",
2057  str(self.curBatchIterations), str(maxBatchIterations), str(beforeItems),
2058  str(len(self.store_items)))
2059  break
2060  else:
2061  self.batch.items = self.store_items
2062  # self.logger.debug("Next batching iteration %s\n%s", str(self.curBatchIterations), varDump(self.store_items))
2063  self.logger.debug("Next batching iteration %s, len(self.store_items): %s", str(self.curBatchIterations),
2064  str(len(self.store_items)))
2065 
2066  if input_batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER:
2067  process_task_batch = input_batch
2068  process_task_batch.items = self.store_items
2069  else:
2070  process_task_batch = Batch(input_batch.id, batch_items)
2071  process_task_batch.errorMask |= self.errorMask
2072  self.saveChainStorageData()
2073  if self.processorName == PCONSTS.PROCESSOR_RSS and len(process_task_batch.items) == 0 and \
2074  self.batchItem is not None:
2075  self.logger.debug("RSS empty!")
2076  if self.batchItem.urlObj is not None:
2077  self.batchItem.urlObj.errorMask |= APP_CONSTS.ERROR_RSS_EMPTY
2078  process_task_batch.items.append(self.batchItem)
2079 
2080  # self.logger.debug("output_batch:\n" + varDump(process_task_batch))
2081  self.logger.info("Out batch id: %s, items: %s", str(process_task_batch.id), str(len(process_task_batch.items)))
2082  output_pickled_object = pickle.dumps(process_task_batch)
2083  Utils.storePickleOnDisk(output_pickled_object, ENV_CRAWLER_STORE_PATH, "crawler.out." + str(self.batch.id))
2084  sys.stdout.write(output_pickled_object)
2085  sys.stdout.flush()
2086  except Exception, err:
2087  ExceptionLog.handler(self.logger, err, 'Batch processing failed!', (err))
2088 
2089 
2090  # #generateBatchitemsByURLSchema methon added new items into the batch.items list
2091  #
2092  # @param batch - incoming batch
2094 
2095  additionBatchItems = []
2096  for batchItem in batch.items:
2097  self.batchItem = batchItem
2098  self.resetVars()
2099  self.loadSite(batch)
2100  self.readSiteProperties()
2101  self.batchItem.siteProperties = copy.deepcopy(self.siteProperties)
2102  batchItem = self.changeBatchItemByUrlSchema(self.batchItem, batch.id)
2103 
2104  self.logger.debug("batchItem.urlObj.url: " + str(batchItem.urlObj.url))
2105 
2106  self.logger.debug('>>> len(additionBatchItems) = ' + str(len(additionBatchItems)) + \
2107  ' len(self.schemaBatchItems) = ' + str(len(self.schemaBatchItems)))
2108 
2109  for elem in self.schemaBatchItems:
2110  self.logger.debug("url: " + str(elem.urlObj.url))
2111 
2112  additionBatchItems += self.schemaBatchItems
2113  tmpBatchItems = []
2114  for elem in additionBatchItems:
2115  if elem.urlId not in [e.urlId for e in tmpBatchItems]:
2116  tmpBatchItems.append(elem)
2117 
2118  batch.items += tmpBatchItems
2119 
2120  self.logger.debug("len(batch.items) = " + str(len(batch.items)) + \
2121  " len(tmpBatchItems) = " + str(len(tmpBatchItems)))
2122 
2123  # TODO maxItems, maxItems <= len(batch.items)
2124  if batch.maxItems is not None and int(batch.maxItems) < len(batch.items):
2125  batch.items = batch.items[0: self.batch.maxItems]
2126  batch.items[-1].urlObj.errorMask |= APP_CONSTS.ERROR_MAX_ITEMS
2127 
2128 
2129  # #changeBatchItemByUrlSchema generates new url bu urlSchema and replace batchItem.urlObject if url was change
2130  #
2131  # @param batchItem - incoming batchItem
2132  # @return batchItem with changed urlObj or untoched batchItem
2133  def changeBatchItemByUrlSchema(self, batchItem, batchId):
2134  # self.schemaBatchItems
2135  if "URLS_SCHEMA" in self.siteProperties:
2136  urlSchema = UrlSchema(self.siteProperties["URLS_SCHEMA"], batchItem.urlObj.siteId, self.urlSchemaDataDir)
2137  newUrls = urlSchema.generateUrlSchema(batchItem.urlObj.url)
2138  self.logger.debug("Generated new urls count = %s", str(len(newUrls)))
2139 
2140  # check limits
2141  if self.site.maxURLs > 0 and len(newUrls) >= self.site.maxURLs:
2142  newUrls = set(list(newUrls)[:self.site.maxURLs])
2143  self.logger.debug("Site maxURLs = %s limit reached.", str(self.site.maxURLs))
2144 
2145  if self.site.maxResources > 0 and len(newUrls) >= self.site.maxResources:
2146  newUrls = set(list(newUrls)[:self.site.maxResources])
2147  self.logger.debug("Site maxResources = %s limit reached.", str(self.site.maxResources))
2148 
2149  # url update
2150  if len(newUrls) > 0:
2151  self.logger.debug("Url was changed. From %s to %s", batchItem.urlObj.url, newUrls[0])
2152 
2153  if self.dbWrapper is not None:
2154  urlUpdateObj = dc_event.URLUpdate(siteId=batchItem.urlObj.siteId, urlString=batchItem.urlObj.url,
2155  normalizeMask=UrlNormalizator.NORM_NONE)
2156  urlUpdateObj.urlMd5 = batchItem.urlObj.urlMd5
2157  urlUpdateObj.batchId = batchId
2158  urlUpdateObj.crawled = SQLExpression("`Crawled`+1")
2159  urlUpdateObj.processed = 0
2160  urlUpdateObj.status = dc_event.URL.STATUS_CRAWLED
2161  urlUpdateObj.size = 0
2162  urlUpdateObj.contentType = ""
2163  result = self.dbWrapper.urlUpdate(urlUpdateObj)
2164  self.logger.debug("urlUpdate() return result: " + str(result))
2165 
2166  batchItem.urlObj.url = newUrls[0]
2167  batchItem.urlObj.parentMd5 = batchItem.urlObj.urlMd5
2168  if urlSchema.externalError != APP_CONSTS.ERROR_OK:
2169  batchItem.urlObj.errorMask |= urlSchema.externalError
2170  batchItem.urlId = batchItem.urlObj.urlMd5
2171 
2172  if self.dbWrapper is not None:
2173  result = self.dbWrapper.urlNew([batchItem.urlObj])
2174  self.logger.debug("urlNew() return result: " + str(result))
2175 
2176  if len(newUrls) > 1:
2177  for newUrl in newUrls[1:]:
2178  localBatchItem = copy.deepcopy(batchItem)
2179  localBatchItem.urlObj.batchId = 0
2180  localBatchItem.urlObj.status = dc_event.URL.STATUS_NEW
2181  localBatchItem.urlObj.url = newUrl
2182  localBatchItem.urlObj.urlMd5 = hashlib.md5(newUrl).hexdigest()
2183  localBatchItem.urlObj.parentMd5 = batchItem.urlObj.urlMd5
2184  localBatchItem.urlId = localBatchItem.urlObj.urlMd5
2185  localBatchItem.urlObj.CDate = str(datetime.datetime.now())
2186  localBatchItem.urlObj.errorMask = 0
2187  localBatchItem.urlObj.tagsCount = 0
2188  localBatchItem.urlObj.tagsMask = 0
2189  localBatchItem.urlObj.crawled = 0
2190  localBatchItem.urlObj.processed = 0
2191  localBatchItem.urlObj.size = 0
2192  localBatchItem.urlObj.contentType = ""
2193  localBatchItem.urlObj.rawContentMd5 = ""
2194  localBatchItem.urlObj.state = dc_event.URL.STATE_ENABLED
2195 
2196  if urlSchema.externalError != APP_CONSTS.ERROR_OK:
2197  localBatchItem.urlObj.errorMask |= urlSchema.externalError
2198  self.schemaBatchItems.append(localBatchItem)
2199 
2200  if self.dbWrapper is not None:
2201  # result = self.dbWrapper.urlNew([elem.urlObj for elem in self.schemaBatchItems])
2202  for elem in self.schemaBatchItems:
2203  result = self.dbWrapper.urlNew([elem.urlObj])
2204  self.logger.debug("urlNew() for urls list return result: " + str(result))
2205  if int(result) == 0: # necessary update url
2206  urlUpdateObj = dc_event.URLUpdate(siteId=elem.urlObj.siteId, urlString=elem.urlObj.url,
2207  normalizeMask=UrlNormalizator.NORM_NONE)
2208  urlUpdateObj.urlMd5 = elem.urlObj.urlMd5
2209  urlUpdateObj.parentMd5 = batchItem.urlObj.urlMd5
2210  urlUpdateObj.batchId = 0 # #batchId
2211  urlUpdateObj.status = dc_event.URL.STATUS_NEW
2212  urlUpdateObj.UDate = SQLExpression("NOW()")
2213  urlUpdateObj.errorMask = 0
2214  urlUpdateObj.tagsCount = 0
2215  urlUpdateObj.tagsMask = 0
2216  urlUpdateObj.crawled = SQLExpression("`Crawled`+1")
2217  urlUpdateObj.processed = 0
2218  urlUpdateObj.size = 0
2219  urlUpdateObj.contentType = ""
2220  urlUpdateObj.rawContentMd5 = ""
2221  urlUpdateObj.state = dc_event.URL.STATE_ENABLED
2222 
2223  result = self.dbWrapper.urlUpdate(urlUpdateObj)
2224  self.logger.debug("urlUpdate() for urls list return result: " + str(result))
2225 
2226  # # Apply 'batch_insert' property
2227  if urlSchema.batchInsert == UrlSchema.BATCH_INSERT_ALL_NEW_ITEMS:
2228  self.logger.debug("UrlSchema use 'batch_insert' as 'BATCH_INSERT_ALL_NEW_ITEMS'")
2229  elif urlSchema.batchInsert == UrlSchema.BATCH_INSERT_ONLY_FIRST_ITEM:
2230  self.logger.debug("UrlSchema use 'batch_insert' as 'BATCH_INSERT_ONLY_FIRST_ITEM'")
2231  if len(self.schemaBatchItems) > 0:
2232  self.schemaBatchItems = self.schemaBatchItems[0:1]
2233  elif urlSchema.batchInsert == UrlSchema.BATCH_INSERT_NO_ONE_ITEMS:
2234  self.logger.debug("UrlSchema use 'batch_insert' as 'BATCH_INSERT_NO_ONE_ITEMS'")
2235  self.schemaBatchItems = []
2236  else:
2237  self.logger.error("UrlSchema use 'batch_insert' an unsupported value: " + str(urlSchema.batchInsert))
2238 
2239  return batchItem
2240 
2241 
2242  # #updateUrlObjInBatchItem update urlObj's fields in batchitem
2243  #
2244  # @param urlObj - reference to batchItem.urlObj
2245  # @param batchId - batch's id
2246  def updateUrlObjInBatchItem(self, urlObj):
2247  if urlObj is not None:
2248  urlObj.errorMask |= self.errorMask
2249  self.logger.debug("Set error_mask: %s", str(urlObj.errorMask))
2250  if self.crawledResource is not None:
2251  urlObj.httpCode = self.crawledResource.http_code
2252  self.logger.debug("Set HTTP Code: %s, contentType: %s", str(self.crawledResource.http_code),
2253  str(self.crawledResource.content_type))
2254  if self.crawledResource.content_type is not None and \
2255  self.crawledResource.content_type != dc_event.URL.CONTENT_TYPE_UNDEFINED:
2256  urlObj.contentType = self.crawledResource.content_type
2257 
2258 
2259  # #updateBatchItem
2260  #
2261  def updateBatchItem(self, batchItem):
2262  # update batchItem properties
2263  # RSS feed items -> batch items
2264  if batchItem is not None:
2265  self.logger.debug("self.processorName: %s", varDump(self.processorName))
2266 
2267  if self.processorName == PCONSTS.PROCESSOR_FEED_PARSER:
2268  if self.feedItems is not None and len(self.feedItems) > 0:
2269  self.logger.debug("len(self.feedItems): %s", str(len(self.feedItems)))
2270  batchItem = self.createBatchItemsFromFeedItems(batchItem)
2271  elif self.url is not None:
2272  self.logger.debug("Before: batchItem urlObj errorMask: %s, url ErrorMask: %s" % (batchItem.urlObj.errorMask,
2273  self.url.errorMask))
2274  batchItem.urlObj.errorMask |= self.url.errorMask
2275  batchItem.urlObj.errorMask |= self.errorMask
2276  self.logger.debug("After: batchItem urlObj errorMask: %s, url ErrorMask: %s" % (batchItem.urlObj.errorMask,
2277  self.url.errorMask))
2278  if isinstance(batchItem, types.ListType):
2279  self.batchItemsExtendUnique(self.store_items, batchItem)
2280  else:
2281  self.batchItemsExtendUnique(self.store_items, [batchItem], False)
2282  if self.feedItems is not None:
2283  self.logger.debug("self.feedItems: %s, self.store_items: %s", str(len(self.feedItems)),
2284  str(len(self.store_items)))
2285  else:
2286  self.logger.debug(">>> wrong !!! updateBatchItem, batchItem is None")
2287 
2288 
2289  # #createBatchItems
2290  #
2291  def createBatchItemsFromFeedItems(self, parentBatchItem):
2292  self.logger.debug("!!! createBatchItemsFromFeedItems() enter ... self.crawledResource: " + \
2293  varDump(self.crawledResource))
2294  items = []
2295  for elem in self.feedItems:
2296  if self.batch.maxItems > len(items): # update only in case allowed high value of limits
2297  urlMd5 = elem["urlMd5"]
2298  self.logger.debug("URLMD5: %s" % str(urlMd5))
2299  self.logger.debug("value: %s" % str(elem))
2300  siteId = self.batchItem.siteId
2301  urlObj = elem["urlObj"]
2302 
2303  # serialize content
2304  elem.pop("urlObj", None)
2305 
2306  # list of objects must be converted to string before json dumps
2307  dates = ["published_parsed", "updated_parsed"]
2308  for date in dates:
2309  if date in elem["entry"] and elem["entry"][date] is not None:
2310  elem["entry"][date] = strftime("%a, %d %b %Y %H:%M:%S +0000", elem["entry"][date])
2311  elem["urlMd5"] = urlMd5
2312  elem["entry"] = dict(elem["entry"])
2313  # store raw content on disk
2314  saveBatchItemUrlId = self.batchItem.urlId
2315  try:
2316  self.crawledResource = CrawledResource() # pylint: disable=W0201
2317  self.crawledResource.binary_content = json.dumps(elem)
2318  self.batchItem.urlId = urlMd5
2319  self.getDir()
2320  self.writeData()
2321  self.batchItem.urlId = saveBatchItemUrlId
2322  except Exception, err:
2323  ExceptionLog.handler(self.logger, err, "Can't save object on disk. Reason:", (), \
2324  {ExceptionLog.LEVEL_NAME_ERROR:ExceptionLog.LEVEL_VALUE_DEBUG})
2325  self.batchItem.urlId = saveBatchItemUrlId
2326  continue
2327 
2328  urlObj.contentMask = dc_event.URL.CONTENT_STORED_ON_DISK
2329  if parentBatchItem is not None:
2330  batchItem = copy.deepcopy(parentBatchItem)
2331  batchItem.siteId = siteId
2332  batchItem.urlId = urlMd5
2333  batchItem.urlObj = urlObj
2334  else:
2335  batchItem = dc_event.BatchItem(siteId, urlMd5, urlObj)
2336  batchItem.urlObj.urlPut = None
2337  items.append(batchItem)
2338 
2339  return items
2340 
2341 
2342  # #load config from file
2343  # load from cli argument or default config file
2344  def loadConfig(self):
2345  try:
2346  self.config = ConfigParser.ConfigParser()
2347  self.config.optionxform = str
2348  if self.pargs.config:
2349  self.config.read(self.pargs.config)
2350  except:
2351  print MSG_ERROR_LOAD_CONFIG
2352  raise
2353 
2354 
2355  # #loadKeyValueDB load key-value db
2356  #
2357  #
2358  def loadKeyValueDB(self):
2359  try:
2360  className = self.__class__.__name__
2361  self.kvDbDir = self.config.get(className, self.DB_DATA_DIR)
2362  except Exception, err:
2363  ExceptionLog.handler(self.logger, err, "Error load KVDB config option: %s", self.DB_DATA_DIR)
2364  raise
2365 
2366 
2367  # #load logging
2368  # load logging configuration (log file, log level, filters)
2369  #
2371  try:
2372  log_conf_file = self.config.get("Application", "log")
2373  logging.config.fileConfig(log_conf_file)
2374  self.logger = Utils.MPLogger().getLogger()
2375  self.loggerDefault = self.logger
2376  except:
2377  print MSG_ERROR_LOAD_LOG_CONFIG_FILE
2378  raise
2379 
2380 
2381  # #load project/site-specific logging
2382  # load logging configuration (log file, log level, filters) for project-specific file name
2383  #
2384  def setLogConfigFileProject(self, mpLogger, projectId, propertyStr): # pylint: disable=W0622
2385  if propertyStr is not None and propertyStr != '':
2386  try:
2387  propertyObj = json.loads(propertyStr)
2388  if 'suffix' in propertyObj:
2389  suffix = propertyObj['suffix'].replace('%PROJECT_ID%', projectId)
2390  self.logger = mpLogger.getLogger(fileNameSuffix=suffix)
2391  else:
2392  self.logger.debug("Suffix field not found for project %s in property: %s", str(projectId), str(propertyObj))
2393  except Exception, err:
2394  self.logger.error("Error set project-specific logger: %s", str(err))
2395  else:
2396  self.logger.debug("Wrong or empty file name suffix, project %s logger not set: %s", str(projectId), \
2397  str(propertyObj))
2398 
2399 
2400  # #set default logging
2401  #
2402  def setLogConfigFileDefault(self, mpLogger):
2403  # self.logger = self.loggerDefault
2404  try:
2405  self.logger = mpLogger.getLogger(restore=True)
2406  except Exception, err:
2407  self.logger.error("Error set default logger: %s", str(err))
2408 
2409 
2410  # #load mandatory options
2411  # load mandatory options
2412  #
2413  def loadOptions(self):
2414  try:
2415  self.raw_data_dir = self.config.get(self.__class__.__name__, "raw_data_dir")
2416  self.defaultHeaderFile = self.config.get(self.__class__.__name__, "headers_file")
2417  self.defaultCookieFile = self.config.get(self.__class__.__name__, "cookie_file")
2418  self.defaultIcrCrawlTime = self.config.getfloat(self.__class__.__name__, "default_icr_crawl_time")
2419  self.headerFileDir = self.config.get(self.__class__.__name__, "header_file_dir")
2420  self.robotsFileDir = self.config.get(self.__class__.__name__, "robots_file_dir")
2421  dbTaskIni = self.config.get(self.__class__.__name__, "db-task_ini")
2422  self.urlSchemaDataDir = self.config.get(self.__class__.__name__, self.URL_SCHEMA_DIR)
2423  # Read urls xpath list from file as json and make loads
2424  self.urlsXpathList = \
2425  json.loads(open(self.config.get(self.__class__.__name__, self.URLS_XPATH_LIST_FILE), 'r').read())
2426 
2427  if self.config.has_option(self.__class__.__name__, "useZeroSiteIdSiteNotExists"):
2428  try:
2430  bool(int(self.config.get(self.__class__.__name__, "useZeroSiteIdSiteNotExists")))
2431  except Exception:
2432  self.useZeroSiteIdSiteNotExists = False
2433  # Add support operations updateCollectedURLs and removeURLs
2434 # cfgParser = ConfigParser.ConfigParser()
2435 # cfgParser.read(db_task_ini)
2436 # self.dbWrapper = DBTasksWrapper(cfgParser)
2437  self.dbWrapper = self.__createDBTasksWrapper(dbTaskIni)
2438  # does call collectAddtionalProp and collectProperties methods
2439  self.max_fetch_time = self.config.getint(self.__class__.__name__, "max_fetch_time")
2440  # keep old resources on disk
2441  if self.config.has_option(self.__class__.__name__, "keep_old_resources"):
2442  self.keep_old_resources = self.config.getboolean(self.__class__.__name__, "keep_old_resources")
2443  self.collect_additional_prop = self.config.getboolean(self.__class__.__name__, "collect_additional_prop")
2444  # self.urlProcess = URLProcess()
2445  #self.urlProcess.dbWrapper = None ### self.dbWrapper #####
2446  # self.resourceProcess = ResourceProcess()
2447  #self.resourceProcess.dbWrapper = None ####self.dbWrapper ####
2448  except Exception, err:
2449  ExceptionLog.handler(self.logger, err, "Error load config options:")
2450  raise Exception('CRAWLER FATAL INITIALIZATION INI ERROR: ' + str(err))
2451 
2452 
2453  # #checkResponse checkMaxHttpRedirects
2454  #
2455  def checkResponse(self):
2456  self.logger.debug("Requests response history: %s", str(self.res.redirects))
2457  # calculate http redirect
2458  if self.res.redirects and HTTP_REDIRECT in self.res.redirects:
2459  self.httpRedirects += 1
2460  self.logger.debug("http redirects: %s, http max redirects: %s", str(self.httpRedirects),
2461  str(self.max_http_redirects))
2462  # check http redirect
2463  if self.max_http_redirects and self.max_http_redirects != MAX_HTTP_REDIRECTS_UNLIMITED and \
2464  self.httpRedirects >= self.max_http_redirects:
2465  self.allow_http_redirects = False
2466  self.logger.debug("http redirect limit was reached! Max http redirects: %s, encountered http redirects: %s." %
2467  (str(self.max_http_redirects), str(self.httpRedirects)))
2468  else:
2469  self.allow_http_redirects = True # pylint: disable=W0201
2470 
2471 
2472  # #calcLastModified
2473  #
2474  def calcLastModified(self, resource, res):
2475  if resource.http_code == CONSTS.HTTP_CODE_304:
2476  resource.last_modified = self.url.tcDate
2477  elif 'Last-Modified' in res.headers:
2478  resource.last_modified = res.headers['Last-Modified']
2479  resource.last_modified = parse(resource.last_modified).strftime('%Y-%m-%d %H:%M:%S')
2480  elif 'Date' in res.headers:
2481  resource.last_modified = res.headers['Date']
2482  resource.last_modified = parse(resource.last_modified).strftime('%Y-%m-%d %H:%M:%S')
2483  else:
2484  resource.last_modified = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(time.time() - self.defaultIcrCrawlTime))
2485  self.logger.debug("LastModified date: %s", str(resource.last_modified))
2486 
2487  return str(resource.last_modified)
2488 
2489 
2490  # #fillItemsFromIterations
2491  #
2492  # Optionally resets and fills the list of the BatchItems with the new instances initialized from the URLs objects list
2493  # @param urlObjects the list of the URL objects
2494  # @param siteObject the site object
2495  # @param reset the Boolean defines reset the items list before fill, True means reset
2496  def fillItemsFromIterations(self, urlObjects=None, siteObject=None, reset=True):
2497  if reset:
2498  self.collectURLsItems = []
2499 
2500  if urlObjects is not None:
2501  if siteObject is None:
2502  siteObjectLocal = self.batchItem.siteObj
2503  siteIdLocal = self.batchItem.siteId
2504  else:
2505  siteObjectLocal = siteObject
2506  siteIdLocal = siteObject.id
2507 
2508  for urlObject in urlObjects:
2509  self.logger.debug("Create new batch item, URLMd5: %s, siteId: %s", urlObject.urlMd5, siteIdLocal)
2510  batchItem = dc_event.BatchItem(siteId=siteIdLocal, urlId=urlObject.urlMd5, urlObj=urlObject,
2511  siteObj=siteObjectLocal)
2512  batchItem.properties = self.batchItem.properties
2513  self.collectURLsItems.append(batchItem)
2514 
2515 
2516  # #batchItemsExtendUnique
2517  #
2518  # Fills the batch items list with unique items from another list
2519  # @param destinationBatchItems the destination BatchItem objects list
2520  # @param destinationBatchItems the source BatchItem objects list
2521  def batchItemsExtendUnique(self, destinationBatchItems, sourceBatchItems, lookIncomingBatch=True, urlType=1):
2522  for srcBatchItem in sourceBatchItems:
2523  inList = False
2524  if lookIncomingBatch:
2525  for dstBatchItem in self.batch.items:
2526  if srcBatchItem.siteId == dstBatchItem.siteId and srcBatchItem.urlId == dstBatchItem.urlId:
2527  self.logger.debug("batchItemsExtendUnique baseItems duplicate " + srcBatchItem.urlId + " " +
2528  dstBatchItem.urlId)
2529  inList = True
2530  break
2531  if not inList:
2532  for dstBatchItem in destinationBatchItems:
2533  if srcBatchItem.siteId == dstBatchItem.siteId and srcBatchItem.urlId == dstBatchItem.urlId:
2534  self.logger.debug("batchItemsExtendUnique duplicate " + srcBatchItem.urlId + " " + dstBatchItem.urlId)
2535  inList = True
2536  break
2537  if not inList:
2538  self.logger.debug("batchItemsExtendUnique added, urlId: %s", srcBatchItem.urlId)
2539  srcBatchItem.urlObj.type = urlType
2540  # check max allowed limits of resources
2541  if self.batch.maxItems is not None and int(self.batch.maxItems) <= len(destinationBatchItems):
2542  destinationBatchItems = destinationBatchItems[0: self.batch.maxItems]
2543  destinationBatchItems[-1].urlObj.errorMask |= APP_CONSTS.ERROR_MAX_ITEMS
2544  self.logger.debug("Set ErrorMask^ %s", str(destinationBatchItems[-1].urlObj.errorMask))
2545  else:
2546  destinationBatchItems.append(srcBatchItem)
2547 
2548 
2549  # #updateCollectedURLs
2550  #
2552  self.dbWrapper.collectedURLsRecalculating(Utils.autoFillSiteId(self.batchItem.siteId, self.logger))
2553 
2554 
2555  # #setChainId
2556  #
2557  # @param batchItem incoming batch item
2558  def setChainId(self, batchItem):
2559  if batchItem.urlObj.chainId is None and "URL_CHAIN" in self.siteProperties and \
2560  self.siteProperties["URL_CHAIN"] is not None:
2561  batchItem.urlObj.chainId = self.chainIndex
2562  self.chainIndex += 1
2563 
2564 
2565  # #fillItemsFromIterationsWithChain
2566  #
2567  # @param urlObjects the list of the URL objects
2568  def fillItemsFromIterationsWithChain(self, urlObjects, batchItem):
2569  if urlObjects is not None and len(urlObjects) > 0:
2570  if batchItem.urlObj.chainId not in self.chainDict:
2571  self.chainDict[batchItem.urlObj.chainId] = {}
2572  self.chainDict[batchItem.urlObj.chainId]["batchItem"] = batchItem
2573  self.chainDict[batchItem.urlObj.chainId]["chainUrlMD5List"] = []
2574  for urlObj in urlObjects:
2575  urlObj.chainId = batchItem.urlObj.chainId
2576  self.fillItemsFromIterations(urlObjects, None, False)
2577 
2578 
2579  # #fillChainUrlMD5List
2580  #
2581  # @param batchItem incoming batch item
2582  def fillChainUrlMD5List(self, batchItem):
2583  if batchItem.urlObj.chainId is not None and batchItem.urlObj.chainId in self.chainDict:
2584  localChainElem = self.chainDict[batchItem.urlObj.chainId]
2585  if batchItem.urlObj.urlMd5 != localChainElem["batchItem"].urlObj.urlMd5 and \
2586  batchItem.urlObj.urlMd5 not in localChainElem["chainUrlMD5List"]:
2587  localChainElem["chainUrlMD5List"].append(batchItem.urlObj.urlMd5)
2588 
2589 
2590  # # Check available of host
2591  #
2592  # @param url - input url for check
2593  # @param parameters - input parameters dict
2594  # @param logger - logger instance
2595  # @param timeout - timeout value
2596  # @return boolean flag True if success, otherwise False
2597  @staticmethod
2598  def isHostAvailable(url, parameters, logger=None, timeout=0.5):
2599  ret = True
2600  if logger is not None:
2601  logger.debug('isHostAvailable url: ' + str(url) + ', parameters: ' + str(parameters))
2602  try:
2603  if not isinstance(url, basestring) or url == "":
2604  raise Exception("Bad parameter 'url'")
2605 
2606  if 'method' in parameters and int(parameters['method']) == 0:
2607  # from urlparse import urlparse
2608  pr = urlparse.urlparse(url)
2609  # print str(pr)
2610  pr = pr.netloc.split(':')
2611  if len(pr) == 1:
2612  port = 80
2613  else:
2614  port = int(pr[1])
2615  host = pr[0]
2616  # print host, port
2617  if 'domain_name_resolve' in parameters and int(parameters['domain_name_resolve']) == 1:
2618  import socket
2619  ai = socket.getaddrinfo(host, port, 0, 0, socket.IPPROTO_TCP)
2620  # print ai
2621  if 'connect_resolve' in parameters and int(parameters['connect_resolve']) == 1:
2622  if 'connection_timeout' in parameters and float(parameters['connection_timeout']) > 0:
2623  timeout = float(parameters['connection_timeout'])
2624  for item in ai:
2625  af, socktype, proto, canonname, sa = item # pylint: disable=W0612
2626  s = socket.socket(af, socktype, proto)
2627  s.settimeout(float(timeout))
2628  try:
2629  s.connect(sa)
2630  except Exception, err:
2631  ret = False
2632  # print str(sa), str(err)
2633  if logger is not None:
2634  logger.debug("Host %s, timeout %f connect check error: %s", str(sa), str(timeout), str(err))
2635  # logger.debug("Traceback: \n" + str(Utils.getTracebackInfo()))
2636  continue
2637  s.close()
2638  ret = True
2639  break
2640 
2641  except Exception, err:
2642  ret = False
2643  # print str(err)
2644  if logger is not None:
2645  logger.debug("Host %s availability check error: %s", str(url), str(err))
2646  # logger.debug("Traceback: \n" + str(Utils.getTracebackInfo()))
2647 
2648  return ret
2649 
2650 
2651  # #saveChainStorageData
2652  #
2654  urlPutList = []
2655  for localChainKay in self.chainDict:
2656  localChainElem = self.chainDict[localChainKay]
2657  saveBuf = '\n'.join(localChainElem["chainUrlMD5List"])
2658  saveBuf = saveBuf.strip()
2659  putDict = {"data": base64.b64encode(saveBuf)}
2660  urlPutObj = dc_event.URLPut(localChainElem["batchItem"].siteId, localChainElem["batchItem"].urlObj.urlMd5, \
2661  dc_event.Content.CONTENT_CHAIN_PARTS, putDict)
2662  urlPutList.append(urlPutObj)
2663  if len(urlPutList) > 0:
2664  self.dbWrapper.putURLContent([urlPutObj])
2665 
2666  self.urlProcess.siteId = localChainElem["batchItem"].siteId
2667  self.urlProcess.updateURLStatus(localChainElem["batchItem"].urlId)
2668 
2669 
2670  # #Get pubdate accord to 'PDATE_SOURCE_MASK'
2671  #
2672  # @param siteProperties - site properites instance
2673  # @param crawledResource - CrawledResource instance
2674  # @param urlObj - class URL instance
2675  # @return datetime as string in iso format
2676  def getPubdateUseSourceMask(self, siteProperties, crawledResource, urlObj):
2677  # variable to result
2678  ret = None
2679  try:
2680  if siteProperties is not None and crawledResource is not None:
2681  # default values
2682  pdateSourceMask = APP_CONSTS.PDATE_SOURCES_MASK_BIT_DEFAULT
2683  pdateSourceMaskOverwrite = APP_CONSTS.PDATE_SOURCES_MASK_OVERWRITE_DEFAULT
2684 
2685  # get value 'PDATE_SOURCES_MASK' from site properties
2686  if APP_CONSTS.PDATE_SOURCES_MASK_PROP_NAME in siteProperties:
2687  pdateSourceMask = int(siteProperties[APP_CONSTS.PDATE_SOURCES_MASK_PROP_NAME])
2688 
2689  # get value 'PDATE_SOURCES_MASK_OVERWRITE' from site properties
2690  if APP_CONSTS.PDATE_SOURCES_MASK_OVERWRITE_PROP_NAME in siteProperties:
2691  pdateSourceMaskOverwrite = int(siteProperties[APP_CONSTS.PDATE_SOURCES_MASK_OVERWRITE_PROP_NAME])
2692 
2693  self.logger.debug('pdateSourceMask = %s, pdateSourceMaskOverwrite = %s',
2694  str(pdateSourceMask), str(pdateSourceMaskOverwrite))
2695  # self.logger.debug('crawledResource.response_header = ' + str(crawledResource.response_header))
2696  self.logger.debug('crawledResource.last_modified = ' + str(crawledResource.last_modified))
2697 
2698  # Extracted from URL name (not implemented)
2699  if pdateSourceMask & APP_CONSTS.PDATE_SOURCES_MASK_URL_NAME: # reserved
2700  if pdateSourceMaskOverwrite & APP_CONSTS.PDATE_SOURCES_MASK_URL_NAME: # ON
2701  pass
2702  else: # OFF
2703  pass
2704 
2705  # URL object the "pdate" field (supposed was got from the RSS feed)
2706  if pdateSourceMask & APP_CONSTS.PDATE_SOURCES_MASK_RSS_FEED:
2707  if (pdateSourceMaskOverwrite & APP_CONSTS.PDATE_SOURCES_MASK_RSS_FEED and ret is None) or \
2708  not pdateSourceMaskOverwrite & APP_CONSTS.PDATE_SOURCES_MASK_RSS_FEED:
2709  try:
2710  dt = DateTimeType.parse(urlObj.pDate, True, self.logger, False)
2711  if dt is not None:
2712  ret = dt.strftime("%Y-%m-%d %H:%M:%S")
2713  self.addPubdateRssFeedToHeader(crawledResource, ret)
2714  except TypeError:
2715  self.logger.debug("Unsupported date format: <%s>", Utils.varDump(urlObj.pDate))
2716 
2717  self.logger.debug('pubdate from rss feed: ' + str(ret))
2718 
2719 
2720  # URL object "Date" field (supposed was got from the web server's HTTP response header)
2721  if pdateSourceMask & APP_CONSTS.PDATE_SOURCES_MASK_HTTP_DATE and 'date' in crawledResource.response_header:
2722  if (pdateSourceMaskOverwrite & APP_CONSTS.PDATE_SOURCES_MASK_HTTP_DATE and ret is None) or \
2723  not pdateSourceMaskOverwrite & APP_CONSTS.PDATE_SOURCES_MASK_HTTP_DATE:
2724  value = self.extractValueFromHeader(crawledResource.response_header, 'date')
2725  dt = DateTimeType.parse(value, True, self.logger)
2726  if dt is not None:
2727  ret = dt.strftime('%Y-%m-%d %H:%M:%S')
2728  self.logger.debug('pubdate from http header: ' + str(ret))
2729 
2730  # URL object "lastModified" field (supposed was got from the web server's HTTP response header)
2731  if pdateSourceMask & APP_CONSTS.PDATE_SOURCES_MASK_HTTP_LAST_MODIFIED:
2732  if (pdateSourceMaskOverwrite & APP_CONSTS.PDATE_SOURCES_MASK_HTTP_LAST_MODIFIED and ret is None) or \
2733  not pdateSourceMaskOverwrite & APP_CONSTS.PDATE_SOURCES_MASK_HTTP_LAST_MODIFIED:
2734  d = None
2735  if 'last-modified' in crawledResource.response_header:
2736  value = self.extractValueFromHeader(crawledResource.response_header, 'last-modified')
2737  d = DateTimeType.parse(value, True, self.logger)
2738  else:
2739  d = DateTimeType.parse(crawledResource.last_modified, True, self.logger)
2740 
2741  if d is not None:
2742  ret = d.strftime('%Y-%m-%d %H:%M:%S')
2743  self.logger.debug('pubdate from last modified: ' + str(ret))
2744 
2745  except Exception, err:
2746  self.logger.error('Error: ' + str(err))
2747 
2748  return ret
2749 
2750 
2751  # #Extract field from response header string
2752  #
2753  # @param responseHeader - input response header as string
2754  # @param name - name for extracted
2755  # @return value as string extracted from response header
2756  def extractValueFromHeader(self, responseHeader, name):
2757  # variable for result
2758  ret = ''
2759  if isinstance(responseHeader, str) or isinstance(responseHeader, unicode):
2760  responseHeader = responseHeader.split('\r\n')
2761 
2762  # self.logger.debug('responseHeader: ' + str(responseHeader))
2763  for elem in responseHeader:
2764  begPos = elem.find(name)
2765  endPos = elem.find(':')
2766  if begPos > -1 and endPos > -1:
2767  foundName = elem[begPos:endPos].strip()
2768  self.logger.debug('foundName: %s', str(foundName))
2769  if foundName == name:
2770  ret = elem[endPos + 1:].strip()
2771  self.logger.debug("value extracted from field '%s': %s", name, str(ret))
2772  break
2773 
2774  return ret
2775 
2776 
2777  # # Add pubdate to rss feed crawler resource header
2778  # @param crawledResource - crawled resource
2779  # @param pubdateRssFeed - pubdate to rss feed
2780  def addPubdateRssFeedToHeader(self, crawledResource, pubdateRssFeed):
2781  if crawledResource is not None and pubdateRssFeed is not None:
2782  self.crawledResource.response_header = (crawledResource.response_header + '\r\n' +
2783  CONSTS.pubdateRssFeedHeaderName + ': ' + str(pubdateRssFeed + '+0000'))
2784 
2785 
2786  # # Add feed url to rss feed crawler resource header
2787  # @param crawledResource - crawled resource
2788  # @param feedUrl - feed url
2789  def addFeedUrlToHeader(self, crawledResource, feedUrl):
2790  if crawledResource is not None and feedUrl is not None:
2791  self.crawledResource.response_header = (crawledResource.response_header + '\r\n' +
2792  CONSTS.rssFeedUrlHeaderName + ': ' + str(feedUrl))
2793 
2794 
2795  # # Add base url to crawler resource header
2796  # @param crawledResource - crawled resource
2797  # @param feedUrl - feed url
2798  def addBaseUrlToHeader(self, crawledResource, baseUrl):
2799  if crawledResource is not None and baseUrl is not None:
2800  self.crawledResource.response_header = (crawledResource.response_header + '\r\n' +
2801  CONSTS.baseUrlHeaderName + ': ' + str(baseUrl))
2802 
2803 
2804  # # Extract base url
2805  #
2806  # @param htmlContent - batct item
2807  # @param default - default value of url
2808  # @return base url
2809  def extractBaseUrl(self, htmlContent, default):
2810  # variable for result
2811  ret = default
2812 
2813  try:
2814  if isinstance(htmlContent, basestring):
2815  urlsList = re.findall(pattern=self.SEARCH_BASE_URL_PATTERN, string=htmlContent, flags=re.M + re.I + re.U)
2816  if len(urlsList) > 0:
2817  ret = urlsList[0]
2818 
2819  except Exception, err:
2820  self.logger.error(MSG_ERROR_EXTRACT_BASE_URL, str(err))
2821 
2822  return ret
2823 
2824 
2825  # # Update headers by cached cookies
2826  #
2827  # @param headers - headers values dict
2828  # @param url - url string
2829  # @param stage - stage of apply cookies
2830  # @return updated headers object
2831  def updateHeadersByCookies(self, headers, url, stage):
2832 
2833 # self.logger.debug('!!! stage = %s, headers: %s', str(stage), str(headers))
2834  if headers is not None:
2835  headers = RequestsRedirectWrapper.updateHeadersByCookies(headers, url, self.cookieResolver, stage)
2836 
2837  return headers
2838 
2839 
2840  # # host alive property common handler
2841  #
2842  # @param propertyName - property name
2843  # @param siteProperties - site properties
2844  # @param url - input url for check
2845  # @param logger - logger instance
2846  @staticmethod
2847  def hostAliveHandler(propertyName, siteProperties, url, logger=None):
2848  # variable for result
2849  ret = True
2850  if propertyName in siteProperties:
2851  try:
2852  if logger is not None:
2853  logger.debug("Property '%s' found in site properties", str(propertyName))
2854 
2855  parameters = json.loads(siteProperties[propertyName])
2856  if logger is not None:
2857  logger.debug("Property '%s' successfully got from json", str(propertyName))
2858 
2859  ret = CrawlerTask.isHostAvailable(url, parameters, logger)
2860  except Exception, err:
2861  if logger is not None:
2862  logger.error("Try getting '%s' was fail: %s", str(propertyName), str(err))
2863 
2864  return ret
2865 
2866 
2867  # # check is available url or not
2868  #
2869  # @param siteProperties - site properties
2870  # @param url - input url for check
2871  # @param logger - logger instance
2872  # @return True - if available or False otherwise
2873  @staticmethod
2874  def isAvailableUrl(siteProperties, url, logger=None):
2875  return CrawlerTask.hostAliveHandler(propertyName=CrawlerTask.HOST_ALIVE_CHECK_NAME,
2876  siteProperties=siteProperties,
2877  url=url,
2878  logger=logger)
2879 
2880 
2881  # # check is available proxy or not
2882  #
2883  # @param siteProperties - site properties
2884  # @param proxyName - input proxy name for check
2885  # @param logger - logger instance
2886  # @return True - if available or False otherwise
2887  @staticmethod
2888  def isAvailableProxy(siteProperties, proxyName, logger=None):
2889  return CrawlerTask.hostAliveHandler(propertyName=CrawlerTask.HOST_ALIVE_CHECK_PROXY_NAME,
2890  siteProperties=siteProperties,
2891  url=CrawlerTask.DEFAULT_PROTOCOL_PREFIX + proxyName,
2892  logger=logger)
2893 
2894 
2895  # # Get proxy name
2896  #
2897  # @param siteProperties - sites property dict
2898  # @param siteId - site id
2899  # @param url - resource url for check allowed domain
2900  # @param dbWrapper - DBTaskWrapper instance
2901  # @param logger - logger instance
2902  # @return - proxy name as string or None
2903  @staticmethod
2904  def getProxyName(siteProperties, siteId, url, dbWrapper, logger):
2905  # variable for result
2906  proxyName = None
2907 
2908  # create DBProxyWrapper instance if necessary
2909  dbProxyWrapper = None
2910  if HTTPProxyResolver.USER_PROXY_PROPERTY_NAME in siteProperties and dbWrapper is not None:
2911  dbProxyWrapper = DBProxyWrapper(dbWrapper)
2912 
2913  for triesCount in xrange(HTTPProxyResolver.getTriesCount(siteProperties)):
2914  proxyName = HTTPProxyResolver.getProxy(siteProperties=siteProperties,
2915  siteId=siteId,
2916  url=url,
2917  dbProxyWrapper=dbProxyWrapper)
2918 
2919  # Check host available parameters proxy
2920  if proxyName is not None and not CrawlerTask.isAvailableProxy(siteProperties=siteProperties,
2921  proxyName=proxyName,
2922  logger=logger):
2923 
2924  logger.debug("Tries count = %s. Proxy: '%s' is not available!!!", str(triesCount), str(proxyName))
2925  HTTPProxyResolver.addFaults(siteProperties=siteProperties,
2926  siteId=siteId,
2927  proxyName=proxyName,
2928  dbProxyWrapper=dbProxyWrapper)
2929  else:
2930  logger.debug("Tries count = %s. Proxy: '%s' is available!!!", str(triesCount), str(proxyName))
2931  break
2932 
2933  return proxyName
2934 
2935 
2936  # # add proxy faults
2937  #
2938  # @param siteProperties - sites property dict
2939  # @param siteId - site ID value for request
2940  # @param proxyName - proxy host name
2941  # @param dbWrapper - DBTaskWrapper instance
2942  # @return - None
2943  @staticmethod
2944  def addProxyFaults(siteProperties, siteId, proxyName, dbWrapper):
2945  # create DBProxyWrapper instance if necessary
2946  dbProxyWrapper = None
2947  if HTTPProxyResolver.USER_PROXY_PROPERTY_NAME in siteProperties and dbWrapper is not None:
2948  dbProxyWrapper = DBProxyWrapper(dbWrapper)
2949 
2950  # add faults
2951  if proxyName is not None:
2952  HTTPProxyResolver.addFaults(siteProperties=siteProperties,
2953  siteId=siteId,
2954  proxyName=proxyName,
2955  dbProxyWrapper=dbProxyWrapper)
2956 
2957 
2958  # # check is necessary rotate proxy
2959  #
2960  # @param siteProperties - sites property dict
2961  # @param siteId - site ID value for request
2962  # @param proxyName - proxy host name
2963  # @param dbWrapper - DBTaskWrapper instance
2964  # @param rawContent - sites property dict
2965  # @return True if success or False - otherwise
2966  @staticmethod
2967  def isNeedRotateProxy(siteProperties, siteId, proxyName, dbWrapper, rawContent):
2968  # variable for result
2969  ret = False
2970 
2971  # create DBProxyWrapper instance if necessary
2972  dbProxyWrapper = None
2973  if HTTPProxyResolver.USER_PROXY_PROPERTY_NAME in siteProperties and dbWrapper is not None:
2974  dbProxyWrapper = DBProxyWrapper(dbWrapper)
2975 
2976  if proxyName is not None:
2977  ret = HTTPProxyResolver.isNeedRotateProxy(siteProperties=siteProperties,
2978  siteId=siteId,
2979  proxyName=proxyName,
2980  dbProxyWrapper=dbProxyWrapper,
2981  rawContent=rawContent)
2982 
2983  return ret
2984 
2985 
2986  # #Timer alarm signal handler
2987  #
2988  # @param signum
2989  # @param frame
2990  def signalHandlerTimer(self, signum, frame):
2991  del frame
2992  self.maxExecutionTimeReached = True
2993  self.logger.debug("Signal %s - timer trapped!", str(signum))
2994 
2995 
2996  # # Check is set flag max execution time reached
2997  #
2998  # @param - None
2999  # @return True if set or False otherwise
3000  def isAbortedByTTL(self):
3001  return self.maxExecutionTimeReached
3002 
3003 
3004  # # create dbtask wrapper instance
3005  #
3006  # @param configName - dbtask ini file
3007  # @return instance of DBTasksWrapper class
3008  def __createDBTasksWrapper(self, configName):
3009  # variable for result
3010  dbTasksWrapper = None
3011  try:
3012  if configName == "":
3013  raise Exception(MSG_ERROR_EMPTY_CONFIG_FILE_NAME)
3014 
3015  config = ConfigParser.ConfigParser()
3016  config.optionxform = str
3017 
3018  readOk = config.read(configName)
3019 
3020  if len(readOk) == 0:
3021  raise Exception(MSG_ERROR_WRONG_CONFIG_FILE_NAME % configName)
3022 
3023  dbTasksWrapper = DBTasksWrapper(config)
3024 
3025  except Exception, err:
3026  raise Exception(MSG_ERROR_LOAD_APP_CONFIG % str(err))
3027 
3028  return dbTasksWrapper
def updateURL(input_url, site)
def signalHandlerTimer(self, signum, frame)
def batchItemsExtendUnique(self, destinationBatchItems, sourceBatchItems, lookIncomingBatch=True, urlType=1)
def addProxyFaults(siteProperties, siteId, proxyName, dbWrapper)
def updateHeadersByCookies(self, headers, url, stage)
def createBatchItemsFromFeedItems(self, parentBatchItem)
def calcLastModified(self, resource, res)
def fillChainUrlMD5List(self, batchItem)
def updateSiteParams(self, mask, is_suspend=False)
def isAvailableUrl(siteProperties, url, logger=None)
def addBaseUrlToHeader(self, crawledResource, baseUrl)
def getPubdateUseSourceMask(self, siteProperties, crawledResource, urlObj)
def changeBatchItemByUrlSchema(self, batchItem, batchId)
def makeDefaultResponse(self, response, httpCode=CONSTS.HTTP_CODE_400)
def httpRequestWrapper(self, url, headers, auth, postData, urlObj, incomingContent, macroCode=None, proxyName=None)
Definition: CrawlerTask.py:585
def fetchFileHeader(self, url, siteId)
Definition: CrawlerTask.py:350
def generateBatchitemsByURLSchema(self, batch)
def fillItemsFromIterations(self, urlObjects=None, siteObject=None, reset=True)
def updateBatchItem(self, batchItem)
def isHostAvailable(url, parameters, logger=None, timeout=0.5)
def addPubdateRssFeedToHeader(self, crawledResource, pubdateRssFeed)
def getProxyName(siteProperties, siteId, url, dbWrapper, logger)
def extractBaseUrl(self, htmlContent, default)
def isRootURL(self, urlObj, urlString=None)
def isNeedRotateProxy(siteProperties, siteId, proxyName, dbWrapper, rawContent)
def crawl(self, incomingContent)
Definition: CrawlerTask.py:727
def updateUrlObjInBatchItem(self, urlObj)
def setLogConfigFileProject(self, mpLogger, projectId, propertyStr)
def isAvailableProxy(siteProperties, proxyName, logger=None)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
def saveFileHeader(self, url, siteId, localFileHeaders)
Definition: CrawlerTask.py:363
def hostAliveHandler(propertyName, siteProperties, url, logger=None)
def updateBatchItemAfterCarwling(self, status=dc_event.URL.STATUS_CRAWLED)
def fillItemsFromIterationsWithChain(self, urlObjects, batchItem)
-mask-info
def updateURLForFailed(self, errorBit, httpCode=CONSTS.HTTP_CODE_400, status=dc_event.URL.STATUS_CRAWLED, updateUdate=True)
Definition: CrawlerTask.py:565
def setLogConfigFileDefault(self, mpLogger)
def extractValueFromHeader(self, responseHeader, name)
Definition: join.py:1
def __createDBTasksWrapper(self, configName)
def strToProxy(proxyString, log=None, defaultProxyType='http')
Definition: Utils.py:1710
def addFeedUrlToHeader(self, crawledResource, feedUrl)
def setChainId(self, batchItem)