HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ProcessorFeedParser.py
Go to the documentation of this file.
1 """@package docstring
2  @file Scraper.py
3  @author Alexey, bgv <developers.hce@gmail.com>
4  @link http://hierarchical-cluster-engine.com/
5  @copyright Copyright &copy; 2013 IOIX Ukraine
6  @license http://hierarchical-cluster-engine.com/license/
7  @package HCE project node API
8  @since 0.1
9 """
10 
11 import time
12 import pickle
13 import sys
14 import json
15 import logging.config
16 import ConfigParser
17 from cement.core import foundation
18 import dc_processor.Constants as CONSTS
19 from dc_processor.scraper_result import Result
20 from dc_processor.scraper_utils import encode
21 from dc_processor.ScraperResponse import ScraperResponse
22 import app.Utils as Utils # pylint: disable=F0401
23 from app.Utils import varDump
24 from app.Utils import ExceptionLog
25 import app.Profiler
26 import app.Consts as APP_CONSTS
27 
28 APP_NAME = "ProcessorFeedParser"
29 
30 #
31 class ProcessorFeedParser(foundation.CementApp):
32 
33 
34  # Mandatory
35  class Meta(object):
36  label = APP_NAME
37  def __init__(self):
38  pass
39 
40 
41  # #constructor
42  # initialize default fields
43  def __init__(self, usageModel=APP_CONSTS.APP_USAGE_MODEL_PROCESS, configFile=None, logger=None, inputData=None):
44  if usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
45  # call base class __init__ method
46  foundation.CementApp.__init__(self)
47 
48  self.exit_code = CONSTS.EXIT_SUCCESS
49  self.logger = logger
50  self.config_db_dir = None
51  self.sqliteTimeout = CONSTS.SQLITE_TIMEOUT
52  self.entry = None
53  self.tagsCount = None
54  self.tagsMask = None
55  self.pubdate = None
56  self.processedContent = None
57  self.store_xpath = False
58  self.db_engine = CONSTS.MYSQL_ENGINE
59  self.article = None
60  self.input_data = inputData
61  self.articles_tbl = None
62  self.PRAGMA_synchronous = None
63  self.PRAGMA_journal_mode = None
64  self.PRAGMA_temp_store = None
65  self.usageModel = usageModel
66  self.configFile = configFile
67  self.output_data = None
68 
69 
70 
71  # #setup
72  # setup application
73  def setup(self):
74  if self.usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
75  # call base class setup method
76  foundation.CementApp.setup(self)
77 
78 
79  # #run
80  # run application
81  def run(self):
82  if self.usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
83  # call base class run method
84  foundation.CementApp.run(self)
85 
86  # config section
87  self.loadConfig()
88 
89  # load logger config file
90  self.loadLogConfigFile()
91 
92  # load sqlite db backend
93  # self.loadSqliteDBBackend()
94 
95  # sqlite
96  # self.loadDBBackend()
97 
98  # options
99  self.loadOptions()
100 
101  #Do applied algorithm's job
102  self.processBatch()
103 
104  if self.usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
105  # Finish logging
106  self.logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
107 
108 
109 
110  # #main content processing
111  # main content processing
112  #
113  def createArticle(self):
114  #self.logger.debug("self.entry: %s" % varDump(self.entry))
115 
116  article = Result(self.config, self.entry["urlMd5"])
117  extractor = self.__class__.__name__
118 
119  for tag in self.entry["entry"]:
120  tagValue = self.entry["entry"][tag]
121  if isinstance(tagValue, (str, unicode)):
122  if tag == 'link':
123  tagValue = Utils.UrlNormalizator.entitiesEncode(tagValue)
124  else:
125  tagValue = tagValue
126  elif isinstance(tagValue, int) or isinstance(tagValue, bool) or isinstance(tagValue, float):
127  tagValue = str(tagValue)
128  elif isinstance(tagValue, dict):
129  names = {"url", "name", "value"}
130  tagValueNew = None
131  for name in names:
132  if name in tagValue and isinstance(tagValue[name], (str, unicode)):
133  if name == 'url':
134  tagValueNew = Utils.UrlNormalizator.entitiesEncode(tagValue[name])
135  else:
136  tagValueNew = tagValue[name]
137  break
138  if tagValueNew is None:
139  tagValue = ""
140  else:
141  tagValue = tagValueNew
142  elif isinstance(tagValue, list) and len(tagValue) > 0:
143  tagValueNew = None
144  if isinstance(tagValue[0], dict):
145  names = {"href":",", "url":",", "name":",", "term":",", "value":" "} # pylint: disable=R0204
146  for name in names:
147  if name in tagValue[0]:
148  tv = []
149  for item in tagValue:
150  if name in item and isinstance(item[name], (str, unicode)):
151  if name == 'href':
152  tv.append(Utils.UrlNormalizator.entitiesEncode(item[name].strip()))
153  else:
154  tv.append(item[name].strip())
155  tagValueNew = names[name].join(tv)
156  break
157  if tagValueNew is None:
158  tagValue = ""
159  else:
160  tagValue = tagValueNew
161  else:
162  self.logger.debug("Unsupported tag '%s' value type: %s", str(tag), varDump(tagValue))
163  tagValue = ""
164 
165  article.tags[tag] = {"data":[tagValue], "name":tag, "xpath":"", "extractor":extractor}
166 
167  #parent rss feed tag
168  if "parent_rss_feed" in self.entry:
169  parent_rss_feed = json.dumps(self.entry["parent_rss_feed"])
170  if parent_rss_feed[0] == '"':
171  parent_rss_feed = parent_rss_feed[1:]
172  if parent_rss_feed[-1] == '"':
173  parent_rss_feed = parent_rss_feed[:-1]
174  article.tags["parent_rss_feed"] = {"data":[parent_rss_feed],
175  "name":"parent_rss_feed", "xpath":"", "extractor":extractor}
176 
177  #parent rss feed urlMd5 tag
178  if "parent_rss_feed_urlMd5" in self.entry:
179  article.tags["parent_rss_feed_urlMd5"] = {"data":[self.entry["parent_rss_feed_urlMd5"]],
180  "name":"parent_rss_feed_urlMd5", "xpath":"", "extractor":extractor}
181 
182  #tags count
183  article.tagsCount = len(article.tags.keys())
184 
185  # Finish time
186  article.finish = time.time()
187 
188  # Set final article
189  self.article = article
190  #self.logger.debug("article: %s" % varDump(article))
191 
192 
193 
194  # #main content processing
195  # main content processing
196  #
197  def parseFeed(self):
198  ret = True
199  try:
200  self.entry = json.loads(self.input_data.raw_content)
201  self.createArticle()
202  #self.putArticleToDB({"default":self.article})
203  except ValueError, err:
204  ExceptionLog.handler(self.logger, err, 'Bad raw content:', (self.input_data.raw_content), \
205  {ExceptionLog.LEVEL_NAME_ERROR:ExceptionLog.LEVEL_VALUE_DEBUG})
206  ret = False
207  return ret
208 
209 
210  # #main content processing
211  # main content processing
212  #
213  def process(self):
214  self.logger.debug("URL: %s" % str(self.input_data.url))
215  self.logger.debug("URLMd5: %s" % str(self.input_data.urlId))
216  self.logger.debug("SiteId: %s" % str(self.input_data.siteId))
217  if self.parseFeed():
218  self.tagsCount = self.article.tagsCount
219  self.tagsMask = self.article.tagsMask
220  # TODO: strange level "default", possible need to be removed and new common object need to be used by all scrapers
221  self.article.get()
222  self.processedContent = {"default":self.article}
223  self.processedContent["internal"] = [self.article]
224  self.processedContent["custom"] = []
225  # correct pubdate
226  if CONSTS.PUBLISHED in self.article.tags:
227  from dateutil.parser import parse
228  self.pubdate = parse(self.article.tags[CONSTS.PUBLISHED]["data"][0]).strftime('%Y-%m-%d %H:%M:%S')
229  else:
230  self.logger.debug("Resource %s hasn't publish date" % str(self.article.tags[CONSTS.TAG_LINK]["data"]))
231  # for ml
232  if self.store_xpath:
233  self.storeXPath()
234  else:
235  self.logger.debug("Resource hasn't raw content. Exit.")
236 
237 
238 
240  def getQueryPrefix(self):
241  query_prefix = None
242  if self.db_engine == CONSTS.MYSQL_ENGINE:
243  query_prefix = "REPLACE INTO `contents_" + str(self.input_data.siteId if len(self.input_data.siteId) else 0)
244  else:
245  self.logger.info("DB Backend %s not supported" % str(self.db_engine))
246  self.logger.info("db_name: " + query_prefix)
247  return query_prefix
248 
249 
250 
253  def getDataBuffer(self, data):
254  buf = {}
255  for key in data.keys():
256  data[key].get()
257  buf[key] = data[key].data
258  ret = encode(json.dumps(buf))
259  self.logger.info("Result buffer: %s" % ret)
260  return ret
261 
262 
263  #TODO: Seems need to remove obsolete functionality
264  def putArticleToDB(self, result):
265  # Check if Content extracted
266  self.logger.info("Result: %s" % varDump(result))
267  tags_count = int(result["default"].tagsCount)
268  self.logger.info("Tags count: %s" % str(tags_count))
269  if int(tags_count) > 0:
270  self.logger.info("Tags count OK")
271 
272  # Get DB table name
273  query_prefix = self.getQueryPrefix()
274 
275  # Prepare query
276  # Firstly, prepare data buffer
277  data = self.getDataBuffer(result)
278  # Then, insert data buffer to the query
279  #query = query_prefix + "`(`id`,`data`, CDate) VALUES('" \
280  # + result.resId + "', '" + encode(result.get()) + "', strftime('%Y-%m-%d %H:%M:%f', 'now', 'localtime'))"
281  query = query_prefix + "`(`id`,`data`, CDate) VALUES('" + result["default"].resId + "', '" + data + "', NOW())"
282 
283  # Initialize options for DB Backend
284  options = {}
285  # Genaral options
286  options["query"] = query
287  self.logger.info("DB Backend %s" % str(self.db_engine))
288 
289  if self.db_engine == CONSTS.MYSQL_ENGINE:
290  # MYSQL options
291  options["dbHost"] = self.dbHost # pylint: disable=E1101
292  options["dbPort"] = self.dbPort # pylint: disable=E1101
293  options["dbUser"] = self.dbUser # pylint: disable=E1101
294  options["dbPWD"] = self.dbPWD # pylint: disable=E1101
295  options["MYSQLDBName"] = self.dc_contents_db # pylint: disable=E1101
296  #TODO: remove obsolete functionality
297  #Utils.putContentToMYSQLDB(result, options)
298  else:
299  self.logger.info("DB Backend %s not supported" % str(self.db_engine))
300 
301 
302  # #process batch
303  # the main processing of the batch object
304  def processBatch(self):
305  try:
306  if self.usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
307  # read pickled batch object from stdin and unpickle it
308  self.input_data = pickle.loads(sys.stdin.read())
309 
310  msgStr = "siteId=" + str(self.input_data.siteId) + ", urlId=" + str(self.input_data.urlId) + \
311  ", url=" + str(self.input_data.url)
312  self.logger.info("Incoming data:%s", msgStr)
313  app.Profiler.messagesList.append(msgStr)
314  self.logger.debug("self.input_data:%s\n", varDump(self.input_data))
315 
316 
317  self.process()
318 
319  self.logger.info("Resulted tagsCount=%s, tagsMask=%s, pubdate=%s", str(self.tagsCount), str(self.tagsMask),
320  str(self.pubdate))
321 
322  scraperResponse = ScraperResponse(self.tagsCount, self.tagsMask, self.pubdate, self.processedContent)
323  self.logger.debug("scraperResponse:%s\n", varDump(scraperResponse))
324 
325  if self.usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
326  print pickle.dumps(scraperResponse)
327  sys.stdout.flush()
328  else:
329  self.output_data = scraperResponse
330  except Exception as err:
331  ExceptionLog.handler(self.logger, err, CONSTS.MSG_ERROR_PROCESS)
332  self.exit_code = CONSTS.EXIT_FAILURE
333  raise Exception(err)
334 
335 
336 
338  def storeXPath(self):
339  ret = False
340  '''
341  from dc_processor.base_extractor import BaseExtractor
342  # check content's presence in response
343  if self.scraper_response.tagsMask & BaseExtractor.tagsMask[CONSTS.SUMMARY_DETAIL]:
344  content = None
345  tags = json.loads(self.scraper_response.processedContent)["data"]["tagList"]
346  for tag in tags:
347  if tag["name"]==CONSTS.SUMMARY_DETAIL:
348  content = tag["data"]["value"]
349  self.logger.debug("content: %s" % str(content))
350  from scrapy.selector import Selector
351  sel = Selector(text=self.raw_content)
352  xpath_list = sel.xpath("//*[contains(., '" + content + "')]").extract()
353  self.logger.debug("xpath_list: %s" % str(xpath_list))
354  ''' # pylint: disable=W0105
355 
356  return ret
357 
358 
359 
360  # #load config from file
361  # load from cli argument or default config file
362  def loadConfig(self):
363  try:
364  self.config = ConfigParser.ConfigParser()
365  self.config.optionxform = str
366  if self.usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
367  if self.pargs.config:
368  self.config.read(self.pargs.config)
369  else:
370  self.config.read(self.configFile)
371  except Exception as err:
372  raise Exception(CONSTS.MSG_ERROR_LOAD_CONFIG + " : " + str(err))
373 
374 
375 
376  # #load logging
377  # load logging configuration (log file, log level, filters)
378  #
379  def loadLogConfigFile(self):
380  try:
381  if self.usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
382  log_conf_file = self.config.get("Application", "log")
383  logging.config.fileConfig(log_conf_file)
384  self.logger = Utils.MPLogger().getLogger()
385  except Exception as err:
386  raise Exception(CONSTS.MSG_ERROR_LOAD_CONFIG + " : " + str(err))
387 
388 
389 
390  # #load mandatory options
391  # load mandatory options
392  #
393  def loadOptions(self):
394  class_name = self.__class__.__name__
395  try:
396  self.config_db_dir = self.config.get(class_name, "config_db_dir")
397  self.articles_tbl = self.config.get("sqlite", "articles_tbl")
398  self.sqliteTimeout = self.config.getint("sqlite", "timeout")
399 
400  # support sqlite optimizer options
401  if self.config.has_option("sqlite", "PRAGMA_synchronous"):
402  self.PRAGMA_synchronous = self.config.get("sqlite", "PRAGMA_synchronous")
403  if self.config.has_option("sqlite", "PRAGMA_journal_mode"):
404  self.PRAGMA_journal_mode = self.config.get("sqlite", "PRAGMA_journal_mode")
405  if self.config.has_option("sqlite", "PRAGMA_temp_store"):
406  self.PRAGMA_temp_store = self.config.get("sqlite", "PRAGMA_temp_store")
407 
408  # support db backend
409  if self.config.has_option(class_name, "db_engine"):
410  self.db_engine = self.config.get(class_name, "db_engine")
411 
412  except Exception as err:
413  print CONSTS.MSG_ERROR_LOAD_OPTIONS + err.message
414  raise
415 
416 
417  def getExitCode(self):
418  return self.exit_code
def getDataBuffer(self, data)
getDataBuffer prepare data buffer
def __init__(self, usageModel=APP_CONSTS.APP_USAGE_MODEL_PROCESS, configFile=None, logger=None, inputData=None)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Definition: join.py:1