3 @author Alexey, bgv <developers.hce@gmail.com> 4 @link http://hierarchical-cluster-engine.com/ 5 @copyright Copyright © 2013 IOIX Ukraine 6 @license http://hierarchical-cluster-engine.com/license/ 7 @package HCE project node API 17 from cement.core
import foundation
28 APP_NAME =
"ProcessorFeedParser" 43 def __init__(self, usageModel=APP_CONSTS.APP_USAGE_MODEL_PROCESS, configFile=None, logger=None, inputData=None):
44 if usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
46 foundation.CementApp.__init__(self)
74 if self.
usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
76 foundation.CementApp.setup(self)
82 if self.
usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
84 foundation.CementApp.run(self)
104 if self.
usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
106 self.
logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
117 extractor = self.__class__.__name__
119 for tag
in self.
entry[
"entry"]:
120 tagValue = self.
entry[
"entry"][tag]
121 if isinstance(tagValue, (str, unicode)):
123 tagValue = Utils.UrlNormalizator.entitiesEncode(tagValue)
126 elif isinstance(tagValue, int)
or isinstance(tagValue, bool)
or isinstance(tagValue, float):
127 tagValue = str(tagValue)
128 elif isinstance(tagValue, dict):
129 names = {
"url",
"name",
"value"}
132 if name
in tagValue
and isinstance(tagValue[name], (str, unicode)):
134 tagValueNew = Utils.UrlNormalizator.entitiesEncode(tagValue[name])
136 tagValueNew = tagValue[name]
138 if tagValueNew
is None:
141 tagValue = tagValueNew
142 elif isinstance(tagValue, list)
and len(tagValue) > 0:
144 if isinstance(tagValue[0], dict):
145 names = {
"href":
",",
"url":
",",
"name":
",",
"term":
",",
"value":
" "}
147 if name
in tagValue[0]:
149 for item
in tagValue:
150 if name
in item
and isinstance(item[name], (str, unicode)):
152 tv.append(Utils.UrlNormalizator.entitiesEncode(item[name].strip()))
154 tv.append(item[name].strip())
155 tagValueNew = names[name].
join(tv)
157 if tagValueNew
is None:
160 tagValue = tagValueNew
162 self.
logger.debug(
"Unsupported tag '%s' value type: %s", str(tag),
varDump(tagValue))
165 article.tags[tag] = {
"data":[tagValue],
"name":tag,
"xpath":
"",
"extractor":extractor}
168 if "parent_rss_feed" in self.
entry:
169 parent_rss_feed = json.dumps(self.
entry[
"parent_rss_feed"])
170 if parent_rss_feed[0] ==
'"':
171 parent_rss_feed = parent_rss_feed[1:]
172 if parent_rss_feed[-1] ==
'"':
173 parent_rss_feed = parent_rss_feed[:-1]
174 article.tags[
"parent_rss_feed"] = {
"data":[parent_rss_feed],
175 "name":
"parent_rss_feed",
"xpath":
"",
"extractor":extractor}
178 if "parent_rss_feed_urlMd5" in self.
entry:
179 article.tags[
"parent_rss_feed_urlMd5"] = {
"data":[self.
entry[
"parent_rss_feed_urlMd5"]],
180 "name":
"parent_rss_feed_urlMd5",
"xpath":
"",
"extractor":extractor}
183 article.tagsCount = len(article.tags.keys())
186 article.finish = time.time()
203 except ValueError, err:
204 ExceptionLog.handler(self.
logger, err,
'Bad raw content:', (self.
input_data.raw_content), \
205 {ExceptionLog.LEVEL_NAME_ERROR:ExceptionLog.LEVEL_VALUE_DEBUG})
226 if CONSTS.PUBLISHED
in self.
article.tags:
228 self.
pubdate = parse(self.
article.tags[CONSTS.PUBLISHED][
"data"][0]).strftime(
'%Y-%m-%d %H:%M:%S')
230 self.
logger.debug(
"Resource %s hasn't publish date" % str(self.
article.tags[CONSTS.TAG_LINK][
"data"]))
235 self.
logger.debug(
"Resource hasn't raw content. Exit.")
242 if self.
db_engine == CONSTS.MYSQL_ENGINE:
243 query_prefix =
"REPLACE INTO `contents_" + str(self.
input_data.siteId
if len(self.
input_data.siteId)
else 0)
246 self.
logger.info(
"db_name: " + query_prefix)
255 for key
in data.keys():
257 buf[key] = data[key].data
258 ret =
encode(json.dumps(buf))
259 self.
logger.info(
"Result buffer: %s" % ret)
267 tags_count = int(result[
"default"].tagsCount)
268 self.
logger.info(
"Tags count: %s" % str(tags_count))
269 if int(tags_count) > 0:
270 self.
logger.info(
"Tags count OK")
281 query = query_prefix +
"`(`id`,`data`, CDate) VALUES('" + result[
"default"].resId +
"', '" + data +
"', NOW())" 286 options[
"query"] = query
289 if self.
db_engine == CONSTS.MYSQL_ENGINE:
291 options[
"dbHost"] = self.dbHost
292 options[
"dbPort"] = self.dbPort
293 options[
"dbUser"] = self.dbUser
294 options[
"dbPWD"] = self.dbPWD
295 options[
"MYSQLDBName"] = self.dc_contents_db
306 if self.
usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
308 self.
input_data = pickle.loads(sys.stdin.read())
312 self.
logger.info(
"Incoming data:%s", msgStr)
313 app.Profiler.messagesList.append(msgStr)
323 self.
logger.debug(
"scraperResponse:%s\n",
varDump(scraperResponse))
325 if self.
usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
326 print pickle.dumps(scraperResponse)
330 except Exception
as err:
331 ExceptionLog.handler(self.
logger, err, CONSTS.MSG_ERROR_PROCESS)
341 from dc_processor.base_extractor import BaseExtractor 342 # check content's presence in response 343 if self.scraper_response.tagsMask & BaseExtractor.tagsMask[CONSTS.SUMMARY_DETAIL]: 345 tags = json.loads(self.scraper_response.processedContent)["data"]["tagList"] 347 if tag["name"]==CONSTS.SUMMARY_DETAIL: 348 content = tag["data"]["value"] 349 self.logger.debug("content: %s" % str(content)) 350 from scrapy.selector import Selector 351 sel = Selector(text=self.raw_content) 352 xpath_list = sel.xpath("//*[contains(., '" + content + "')]").extract() 353 self.logger.debug("xpath_list: %s" % str(xpath_list)) 364 self.
config = ConfigParser.ConfigParser()
365 self.
config.optionxform = str
366 if self.
usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
367 if self.pargs.config:
368 self.
config.read(self.pargs.config)
371 except Exception
as err:
372 raise Exception(CONSTS.MSG_ERROR_LOAD_CONFIG +
" : " + str(err))
381 if self.
usageModel == APP_CONSTS.APP_USAGE_MODEL_PROCESS:
382 log_conf_file = self.
config.get(
"Application",
"log")
383 logging.config.fileConfig(log_conf_file)
385 except Exception
as err:
386 raise Exception(CONSTS.MSG_ERROR_LOAD_CONFIG +
" : " + str(err))
394 class_name = self.__class__.__name__
401 if self.
config.has_option(
"sqlite",
"PRAGMA_synchronous"):
403 if self.
config.has_option(
"sqlite",
"PRAGMA_journal_mode"):
405 if self.
config.has_option(
"sqlite",
"PRAGMA_temp_store"):
409 if self.
config.has_option(class_name,
"db_engine"):
412 except Exception
as err:
413 print CONSTS.MSG_ERROR_LOAD_OPTIONS + err.message
def getDataBuffer(self, data)
getDataBuffer prepare data buffer
def getQueryPrefix(self)
For.
def __init__(self, usageModel=APP_CONSTS.APP_USAGE_MODEL_PROCESS, configFile=None, logger=None, inputData=None)
def loadLogConfigFile(self)
def storeXPath(self)
storeXpath
def putArticleToDB(self, result)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)