HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
RTCFinalizer.py
Go to the documentation of this file.
1 """
2 HCE project, Python bindings, Distributed Tasks Manager application.
3 RTCFinalizer Class content main functional for finalize realtime crawling.
4 
5 @package: dc_crawler
6 @file RTCFinalizer.py
7 @author Oleksii <developers.hce@gmail.com>, bgv, Alexander Vybornyh <alexander.hce.cluster@gmail.com>
8 @link: http://hierarchical-cluster-engine.com/
9 @copyright: Copyright &copy; 2013-2015 IOIX Ukraine
10 @license: http://hierarchical-cluster-engine.com/license/
11 @since: 0.1
12 """
13 
14 import sys
15 import pickle
16 import logging.config
17 import ConfigParser
18 from cement.core import foundation
19 import app.Consts as APP_CONSTS
20 import app.Utils as Utils
21 from app.Utils import varDump
22 from app.ContentCheck import ContentCheck
23 import dc.EventObjects as dc_event
24 import dc.Constants as DC_CONSTS
25 from dc_db.TasksManager import TasksManager as DBTasksManager
26 import dc_crawler.Constants as DC_CRAWLER_CONSTS
27 from dc_crawler.Fetcher import BaseFetcher
28 
29 
30 # # RTCFinalizer Class content main functional for finalize realtime crawling,
31 # class inherits from foundation.CementApp
32 #
33 class RTCFinalizer(foundation.CementApp):
34 
35  # # Constants error messages used in class
36  MSG_ERROR_PARSE_CMD_PARAMS = "Error parse command line parameters."
37  MSG_ERROR_EMPTY_CONFIG_FILE_NAME = "Config file name is empty."
38  MSG_ERROR_WRONG_CONFIG_FILE_NAME = "Config file name is wrong"
39  MSG_ERROR_LOAD_APP_CONFIG = "Error loading application config file."
40  MSG_ERROR_READ_LOG_CONFIG = "Error read log config file."
41  MSG_ERROR_READ_DB_TASK_CONFIG = "Error read db-task config file."
42 
43  # #Constans log messages
44  MSG_ERROR_DELETE_URL = "Delete url has failed"
45  MSG_DELETE_URL_OK = "URL was deleted"
46 
47  # #Constans used options from config file
48  FINALIZER_OPTION_LOG = "log"
49  FINALIZER_OPTION_DB_TASK_INI = "db_task_ini"
50 
51  # Mandatory
52  class Meta(object):
53  label = DC_CRAWLER_CONSTS.RTC_FINALIZER_APP_NAME
54  def __init__(self):
55  pass
56 
57  # #constructor
58  def __init__(self):
59  # call base class __init__ method
60  foundation.CementApp.__init__(self)
61 
62  self.logger = None
63  self.batch = None
64  self.items = None
65  self.exitCode = APP_CONSTS.EXIT_SUCCESS
66  self.urlContentResponse = None
67  self.rb = None
68  self.rc = None
69  self.dbTask = None
70 
71 
72  # # setup application
73  def setup(self):
74  # call base class setup method
75  foundation.CementApp.setup(self)
76 
77 
78  # # run application
79  def run(self):
80  # call base class run method
81  foundation.CementApp.run(self)
82 
83  # call initialization application
84  self.__initApp()
85 
86  # call internal processing
87  self.process()
88 
89  # Finish logging
90  self.logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
91 
92 
93  # #initialize application from config files
94  #
95  # @param - None
96  # @return - None
97  def __initApp(self):
98  if self.pargs.config:
99  confLogFileName, confDBTaskName = self.__loadAppConfig(self.pargs.config)
100  self.__loadLogConfig(confLogFileName)
101  self.dbTask = DBTasksManager(self.__loadDBTaskConfig(confDBTaskName))
102  else:
103  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG)
104  self.rb = self.pargs.rb
105  self.rc = int(self.pargs.rc) if self.pargs.rc is not None else None
106 
107 
108  # #load application config file
109  #
110  # @param configName - name of application config file
111  # @return - log config file name and db-task config file name
112  def __loadAppConfig(self, configName):
113  # variable for result
114  confLogFileName = ""
115  confDBTaskName = ""
116 
117  try:
118  config = ConfigParser.ConfigParser()
119  config.optionxform = str
120 
121  readOk = config.read(configName)
122 
123  if len(readOk) == 0:
124  raise Exception(self.MSG_ERROR_WRONG_CONFIG_FILE_NAME + ": " + configName)
125 
126  if config.has_section(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME):
127  confLogFileName = str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME, self.FINALIZER_OPTION_LOG))
128  confDBTaskName = str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME, \
130 
131  except Exception, err:
132  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG + ' ' + str(err))
133 
134  return confLogFileName, confDBTaskName
135 
136 
137  # #load log config file
138  #
139  # @param configName - name of log rtc-finalizer config file
140  # @return - None
141  def __loadLogConfig(self, configName):
142  try:
143  if isinstance(configName, str) and len(configName) == 0:
144  raise Exception(self.MSG_ERROR_EMPTY_CONFIG_FILE_NAME)
145 
146  logging.config.fileConfig(configName)
147 
148  # call rotation log files and initialization logger
149  self.logger = Utils.MPLogger().getLogger()
150 
151  except Exception, err:
152  raise Exception(self.MSG_ERROR_READ_LOG_CONFIG + ' ' + str(err))
153 
154 
155  # #load db-task config file
156  #
157  # @param configName - name of log rtc-finalizer config file
158  # @return - config parser instance
159  def __loadDBTaskConfig(self, configName):
160  # return config parser
161  config = None
162  if isinstance(configName, str) and len(configName) > 0:
163  try:
164  config = ConfigParser.ConfigParser()
165  config.optionxform = str
166  config.read(configName)
167 
168  except Exception, err:
169  raise Exception(self.MSG_ERROR_READ_DB_TASK_CONFIG + ' ' + str(err))
170 
171  return config
172 
173 
174  def getBatchFromInput(self):
175  try:
176  # read pickled batch object from stdin and unpickle it
177  input_pickled_object = sys.stdin.read()
178  # self.logger.debug("input_pickled_object: %s", varDump(input_pickled_object))
179 
180  # print input_pickled_object
181  input_data = (pickle.loads(input_pickled_object))
182  # self.logger.debug("input_data: %s", varDump(input_data))
183 
184  # print("Batch item: siteId: %s, urlId: %s" %(input_data.siteId, input_data.urlId))
185  self.batch = input_data
186  self.items = self.batch.items
187 # self.logger.debug("Batch: %s", varDump(self.batch, stringifyType=0, maxDepth=10))
188  except Exception, err:
189  raise Exception('getBatchFromInput error: ' + str(err))
190 
191 
192  def getURLContent(self):
193  urlContentRequest = []
194  num_of_items = len(self.items)
195  self.logger.debug("Num of items in batch: <<%s>>" % (num_of_items))
196  item_no = 1
197  for item in self.items:
198  if not item:
199  urlContentRequest.append(None)
200  self.logger.debug("Item is None.")
201  else:
202  siteId = item.siteId
203  url = item.urlObj.url
204  urlId = item.urlId
205  self.logger.debug("Item #%s: siteId: <<%s>>, urlId: <<%s>>, url: <<%s>>" % (item_no, siteId, urlId, url))
206  _urlContentRequest = dc_event.URLContentRequest(siteId, url)
207  _urlContentRequest.dbFieldsList = ["Status", "Crawled", "Processed", "ContentType", "Charset", "ErrorMask", \
208  "CrawlingTime", "ProcessingTime", "HTTPCode", "Size", "LinksI", "LinksE", \
209  "RawContentMd5", "LastModified", "CDate", "UDate", "TagsMask", "TagsCount", \
210  "PDate", "ContentURLMd5", "Batch_Id"]
211  urlContentRequest.append(_urlContentRequest)
212  item_no = item_no + 1
213  self.dbTask.dbTaskMode = self.batch.dbMode
214  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_CONTENT, urlContentRequest)
215  responseDRCESyncTasksCover = self.dbTask.process(drceSyncTasksCoverObj)
216  self.urlContentResponse = responseDRCESyncTasksCover.eventObject
217  self.logger.debug("urlContentResponse: %s", varDump(obj=self.urlContentResponse, strTypeMaxLen=5000))
218 
219 
220  def sendURLContent(self):
221  print pickle.dumps(self.urlContentResponse)
222  sys.stdout.flush()
223 
224 
225  def saveBatchToFile(self):
226  if self.rb is not None:
227  self.logger.debug("batchSaveFile is = " + str(self.rb))
228  urlCleanupList = []
229  contentCheck = ContentCheck()
230  for item in self.items:
231  if item.siteObj is not None and item.siteObj.fetchType == BaseFetcher.TYP_AUTO:
232  if item.urlPutObj is not None and contentCheck.lookMetricsinContent(item.urlPutObj):
233  self.logger.debug(">>> start checkUrlPutObj")
234  metricsApplying = self.selectSiteProperty(item, "FINALIZER_METRICS")
235  toRecrawl = contentCheck.checkUrlPutObj(item.urlPutObj, contentCheck.CHECK_TYPE_SIMPLE, metricsApplying)
236  else:
237  self.logger.debug(">>> start urlObj")
238  toRecrawl = contentCheck.checkUrlObj(item.urlObj)
239  if not toRecrawl:
240  urlCleanup = dc_event.URLCleanup(item.urlObj.siteId, item.urlObj.url)
241  urlCleanup.urlType = dc_event.URLStatus.URL_TYPE_MD5
242  urlCleanup.url = item.urlObj.urlMd5
243  urlCleanupList.append(urlCleanup)
244  item.siteObj.fetchType = BaseFetcher.TYP_DYNAMIC
245  item.urlObj.status = dc_event.URL.STATUS_SELECTED_CRAWLING
246  item.urlObj.crawled = 0
247  item.urlObj.urlPut = None
248  item.urlPutObj = None
249  if self.rc is not None:
250  self.exitCode = self.rc
251  if len(urlCleanupList) > 0:
252  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_CLEANUP, urlCleanupList)
253  self.dbTask.process(drceSyncTasksCoverObj)
254  fd = open(self.rb, "w")
255  if fd is not None:
256  pickleObj = pickle.dumps(self.batch)
257  fd.write(pickleObj)
258  fd.close()
259 
260 
262  self.urlContentResponse = []
263  attributes = []
264  for item in self.items:
265  url = item.urlObj.url
266  if item.urlPutObj is not None:
267  # self.logger.debug("item.urlPutObj.putDict.data: %s", varDump(item.urlPutObj.putDict["data"]))
268  try:
269  if len(item.urlObj.attributes) > 0:
270  self.logger.debug("item.urlPutObj.attributes: %s", varDump(item.urlObj.attributes))
271  attributes = item.urlObj.attributes
272  except Exception, err:
273  self.logger.error("load attributes failed: %s", str(err))
274 
275  if item.urlPutObj.putDict["cDate"] is not None:
276  contents = [dc_event.Content(item.urlPutObj.putDict["data"], item.urlPutObj.putDict["cDate"],
277  dc_event.Content.CONTENT_PROCESSOR_CONTENT)]
278  else:
279  contents = [dc_event.Content(item.urlPutObj.putDict["data"],
280  typeId=dc_event.Content.CONTENT_PROCESSOR_CONTENT)]
281  else:
282  contents = []
283  rawContents = None
284  isFetchRawContent = self.selectSiteProperty(item, "FETCH_RAW_CONTENT")
285  if item.urlObj.urlPut is not None and isFetchRawContent is not None and int(isFetchRawContent) == 1:
286  rawContents = [dc_event.Content(item.urlObj.urlPut.putDict["data"], item.urlObj.urlPut.putDict["cDate"],
287  dc_event.Content.CONTENT_RAW_CONTENT)]
288  urlContentResponse = dc_event.URLContentResponse(url, rawContents, processedContents=contents)
289  urlContentResponse.status = 7
290  urlContentResponse.urlMd5 = item.urlObj.urlMd5
291  urlContentResponse.siteId = item.siteId
292  urlContentResponse.contentURLMd5 = item.urlObj.contentURLMd5
293  urlContentResponse.rawContentMd5 = item.urlObj.rawContentMd5
294  urlContentResponse.attributes = attributes
295  urlContentResponse.dbFields = {"Status":item.urlObj.status,
296  "Crawled":item.urlObj.crawled,
297  "Processed":item.urlObj.processed,
298  "ContentType":item.urlObj.contentType,
299  "Charset":item.urlObj.charset,
300  "ErrorMask":item.urlObj.errorMask,
301  "CrawlingTime":item.urlObj.crawlingTime,
302  "ProcessingTime":item.urlObj.processingTime,
303  "HttpCode":item.urlObj.httpCode,
304  "Size":item.urlObj.size,
305  "LinksI":item.urlObj.linksI,
306  "LinksE":item.urlObj.linksE,
307  "RawContentMd5":item.urlObj.rawContentMd5,
308  "LastModified":item.urlObj.lastModified,
309  "CDate":item.urlObj.CDate,
310  "UDate":item.urlObj.UDate,
311  "TagsMask":item.urlObj.tagsMask,
312  "TagsCount":item.urlObj.tagsCount,
313  "PDate":item.urlObj.pDate,
314  "ContentURLMd5":item.urlObj.contentURLMd5,
315  "BatchId":item.urlObj.batchId}
316 
317  if item.urlPutObj is not None and "properties" in item.urlPutObj.putDict:
318  urlContentResponse.itemProperties = item.urlPutObj.putDict["properties"]
319 
320  self.urlContentResponse.append(urlContentResponse)
321  self.logger.debug("urlContentResponse: %s", varDump(obj=self.urlContentResponse, strTypeMaxLen=5000))
322 
323 
324  def process(self):
325  self.getBatchFromInput()
326  # Check is Real-Time crawling
327  if self.batch.crawlerType == dc_event.Batch.TYPE_REAL_TIME_CRAWLER:
328  self.logger.debug("Real-Time crawling batch")
330  self.deleteURLContent()
331  else:
332  self.logger.debug("Regular crawling batch")
333  self.getURLContent()
334  self.saveBatchToFile()
335  if self.exitCode != self.rc:
336  self.sendURLContent()
337 
338 
339  def deleteURLContent(self):
340  items = self.batch.items
341  urlDeleteRequest = []
342  num_of_items = len(items)
343  self.logger.debug("Num of items to delete in batch: <<%s>>" % (num_of_items))
344  item_no = 1
345  for item in self.items:
346  if item is None:
347  continue
348  siteId = item.siteId
349  url = item.urlObj.url
350  urlId = item.urlId
351  self.logger.debug("Delete item #%s: siteId: <<%s>>, urlId: <<%s>>, url: <<%s>>" % (item_no, siteId, urlId, url))
352  urlDeleteRequest.append(dc_event.URLDelete(siteId, url, reason=dc_event.URLDelete.REASON_RT_FINALIZER))
353  item_no = item_no + 1
354  self.dbTask.dbTaskMode = self.batch.dbMode
355  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(DC_CONSTS.EVENT_TYPES.URL_DELETE, urlDeleteRequest)
356  responseDRCESyncTasksCover = self.dbTask.process(drceSyncTasksCoverObj)
357  urlDeleteResponse = responseDRCESyncTasksCover.eventObject
358  self.logger.debug("urlDeleteResponse: %s", varDump(urlDeleteResponse))
359  for status in urlDeleteResponse.statuses:
360  if status:
361  self.logger.debug(self.MSG_DELETE_URL_OK)
362  else:
363  self.exitCode = APP_CONSTS.EXIT_FAILURE
364  self.logger.debug(self.MSG_ERROR_DELETE_URL)
365 
366 
367  # #selectSiteProperty method reads and returns specific siteProperty specified in propName
368  #
369  # @param batchItem incoming batchItem
370  # @param propName name of returned property
371  # @return specific siteProperty's value
372  def selectSiteProperty(self, batchItem, propName):
373  ret = None
374  if batchItem.properties is not None and propName in batchItem.properties:
375  ret = batchItem.properties[propName]
376  elif batchItem.siteObj is not None and batchItem.siteObj.properties is not None:
377  for elem in batchItem.siteObj.properties:
378  if elem["name"] == propName:
379  ret = elem["value"]
380  break
381  return ret
def __loadLogConfig(self, configName)
def __loadDBTaskConfig(self, configName)
def __loadAppConfig(self, configName)
def selectSiteProperty(self, batchItem, propName)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
-mask-info