HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ResponseExtractor.py
Go to the documentation of this file.
1 #coding: utf-8
2 """
3 HCE project, Python bindings, DC service utility.
4 ResponseExtractor utility main application class.
5 
6 @package: DC utility
7 @file ResponseExtractor.py
8 @author bgv <developers.hce@gmail.com>
9 @link: http://hierarchical-cluster-engine.com/
10 @copyright: Copyright &copy; 2015 IOIX Ukraine
11 @license: http://hierarchical-cluster-engine.com/license/
12 @since: 0.1
13 """
14 
15 
16 import sys
17 import json
18 import base64
19 import time
20 import logging.config
21 import ConfigParser
22 from cement.core import foundation
23 
24 import app.Utils as Utils
25 from app.Utils import ExceptionLog
26 import app.Consts as APP_CONSTS
27 import dc_crawler.Constants as DC_CRAWLER_CONSTS
28 
29 
30 
33 class ResponseExtractor(foundation.CementApp):
34 
35 
36  MSG_ERROR_PARSE_CMD_PARAMS = "Error parse command line parameters."
37  MSG_ERROR_EMPTY_CONFIG_FILE_NAME = "Config file name is empty."
38  MSG_ERROR_WRONG_CONFIG_FILE_NAME = "Config file name is wrong"
39  MSG_ERROR_LOAD_APP_CONFIG = "Error loading application config file."
40  MSG_ERROR_READ_LOG_CONFIG = "Error read log config file."
41  MSG_ERROR_PROCESSING_REQUEST = "Error processing input data."
42 
43  CONFIG_OPTION_ITEM_DELIMITER = "itemDelimiter"
44 
45  FROMAT_AUTO = -1
46  FROMAT_INTERNAL = 0
47  FROMAT_NEWS = 1
48  FROMAT_RSS_FEED = 2
49 
50  RESULTS_FORMAT_CSV_LINE = 0
51  RESULTS_FORMAT_FIELD_LINE = 1
52  RESULTS_FORMAT_JSON = 2
53 
54  # Options from config file
55  CONFIG_OPTION_LOG = 'log'
56 
57 
58  # Mandatory
59  class Meta(object):
60  label = DC_CRAWLER_CONSTS.RTC_PREPROCESSOR_APP_NAME
61  def __init__(self):
62  pass
63 
64 
65 
66  def __init__(self):
67  # call base class __init__ method
68  foundation.CementApp.__init__(self)
69 
70  self.logger = None
71  self.batch = None
72  self.exitCode = APP_CONSTS.EXIT_SUCCESS
75  self.extendedLog = False
77  self.itemDelimiter = "\n"
78 
79 
80  def setup(self):
81  # call base class setup method
82  foundation.CementApp.setup(self)
83  self.args.add_argument('-c', '--config', action='store', metavar='config_file', help='config ini-file')
84  self.args.add_argument('-i', '--input', action='store', metavar='input_json_file',
85  help='input json file of the URL_CONTENT response, if omitted the stdin read used')
86  self.args.add_argument('-o', '--output', action='store', metavar='output_file, if omitted the stdout print used',
87  help='input file, if omitted the stdout write used')
88  self.args.add_argument('-f', '--format', action='store', metavar='input_json_file_format',
89  help='input json file buffer format: -1 - auto (default if omitted) 0 - internal,' + \
90  ' 1 - news, 2 - rss-feed')
91  self.args.add_argument('-m', '--maxitems', action='store', metavar='max_items',
92  help='max items number to read')
93  self.args.add_argument('-s', '--start', action='store', metavar='start_from',
94  help='start from item')
95  self.args.add_argument('-e', '--extended', action='store', metavar='extended',
96  help='extended log with additional debug information')
97  self.args.add_argument('-t', '--tags', action='store', metavar='tags',
98  help='csv tags fields names list, all fields from response if omitted')
99  self.args.add_argument('-r', '--results', action='store', metavar='results',
100  help='results format: 0 - csv fields names list one item per line, ' + \
101  '1 - fields list one field per line, 2 - json (default if omitted)')
102 
103 
104 
108  def __loadAppConfig(self, configName):
109  #variable for result
110  confLogFileName = ""
111 
112  try:
113  config = ConfigParser.ConfigParser()
114  config.optionxform = str
115 
116  readOk = config.read(configName)
117 
118  if len(readOk) == 0:
119  raise Exception(self.MSG_ERROR_WRONG_CONFIG_FILE_NAME + ": " + configName)
120 
121  if config.has_section(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME):
122  confLogFileName = str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME, self.CONFIG_OPTION_LOG))
123  #Init response item delimiter
124  self.itemDelimiter = str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME,
126  except Exception, err:
127  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG + ' ' + str(err))
128 
129  return confLogFileName
130 
131 
132 
136  def initLogger(self, configName):
137  try:
138  if isinstance(configName, str) and len(configName) == 0:
139  raise Exception(self.MSG_ERROR_EMPTY_CONFIG_FILE_NAME)
140 
141  logging.config.fileConfig(configName)
142 
143  #call rotation log files and initialization logger
144  self.logger = Utils.MPLogger().getLogger()
145 
146  except Exception, err:
147  raise Exception(self.MSG_ERROR_READ_LOG_CONFIG + ' ' + str(err))
148 
149 
150 
154  def readFile(self, templateFile):
155  with open(templateFile, 'r') as f:
156  ret = f.read()
157 
158  return ret
159 
160 
161 
165  def writeFile(self, fileName, outBuffer):
166  with open(fileName, 'w') as f:
167  f.write(outBuffer)
168 
169 
170 
171  def run(self):
172  # call base class run method
173  foundation.CementApp.run(self)
174 
175  startTime = time.time()
176 
177  if self.pargs.config:
178  self.initLogger(self.__loadAppConfig(self.pargs.config))
179  else:
180  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG)
181 
182  if self.pargs.input:
183  inputBuffer = self.readFile(self.pargs.input)
184  else:
185  inputBuffer = sys.stdin.read()
186 
187  if self.pargs.format:
188  inputFormat = int(self.pargs.format)
189  else:
190  inputFormat = -1
191 
192  if self.pargs.maxitems:
193  maxItems = int(self.pargs.maxitems)
194  else:
195  maxItems = -1
196  if self.pargs.start:
197  startFrom = int(self.pargs.start)
198  else:
199  startFrom = 0
200 
201  self.extendedLog = bool(int(self.pargs.extended))
202 
203  if self.pargs.tags:
204  tags = self.pargs.tags.split(',')
205  else:
206  tags = None
207 
208  if self.pargs.results:
209  self.results = int(self.pargs.results)
210 
211  # call processing
212  outputBuffer = self.process(inputBuffer, inputFormat, maxItems, startFrom, tags)
213 
214  self.logger.info("Total time: %s", str(time.time() - startTime))
215 
216  if self.pargs.output:
217  self.writeFile(self.pargs.output, outputBuffer)
218  else:
219  print outputBuffer
220  sys.stdout.flush()
221 
222  # Finish logging
223  self.logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
224 
225 
226 
231  def getNewsItem(self, contentObj, tags=None):
232  ret = {}
233 
234  if self.extendedLog:
235  self.logger.debug("News format item processing:\n%s", str(contentObj))
236 
237  if tags is None:
238  ret = contentObj
239  else:
240  #For each tag in the content object
241  for tagName in contentObj:
242  if tagName in tags:
243  ret[tagName] = self.getTagValueByName(tagName, contentObj, self.FROMAT_NEWS)
244 
245  return ret
246 
247 
248 
253  def getDefaultItem(self, contentObj, tags=None):
254  ret = {}
255 
256  if 'default' in contentObj and 'data' in contentObj['default'] and 'tagList' in contentObj['default']['data'] and\
257  isinstance(contentObj['default']['data']['tagList'], list) and len(contentObj['default']['data']['tagList']) > 0:
258  contentObj = contentObj['default']['data']['tagList'][0]
259  else:
260  raise Exception('Wrong format of the contentObj, structure checks not passed!')
261 
262  if self.extendedLog:
263  self.logger.debug("Internal format item processing:\n%s", str(contentObj))
264 
265  #For each tag in the content object
266  for tagItem in contentObj:
267  tagName = tagItem['name']
268  if tagName in tags:
269  tagValue = self.getTagValueByName(tagName, contentObj, self.FROMAT_INTERNAL)
270  ret[tagName] = tagValue
271 
272  return ret
273 
274 
275 
281  def getTagValueByName(self, tagName, item, responseFormat):
282  ret = ''
283 
284  if responseFormat == self.FROMAT_NEWS:
285  if tagName in item:
286  ret = item[tagName].decode('string_escape')
287  else:
288  if self.extendedLog:
289  self.logger.debug("Tag `%s` not found as News format, empty value assumed", tagName)
290  elif responseFormat == self.FROMAT_INTERNAL:
291  found = False
292  for tag in item:
293  if tag['name'] == tagName:
294  found = True
295  if len(tag['data']) > 0:
296  ret = tag['data'][0].decode('string_escape')
297  break
298  if not found and self.extendedLog:
299  self.logger.debug("Tag `%s` not found as internal format, empty value assumed", tagName)
300  else:
301  if self.extendedLog:
302  self.logger.debug("Format %s not supported", str(responseFormat))
303 
304  return ret
305 
306 
307 
315  def parse(self, inputObject, inputFormat, maxItems=-1, startFrom=0, tags=None):
316  ret = []
317 
318  i = 0
319  s = 0
320 
321  #For all items in URLContent response accumulate doc and toc buffers
322  for item in inputObject["itemsList"][0]["itemObject"]:
323  if maxItems > -1 and i == maxItems:
324  break
325 
326  if startFrom > 0 and s < startFrom:
327  s += 1
328  continue
329 
330  if len(item["processedContents"]) > 0:
331  try:
332  contentObj = json.loads(base64.b64decode(item["processedContents"][0]["buffer"]))
333  except Exception, err:
334  self.logger.error("Error get contentObj or cDate: %s, possible wrong json in buffer:\n%s", str(err),
335  str(item["processedContents"][0]["buffer"]))
336  continue
337 
338  if inputFormat < 0:
339  inputFormatLocal = self.detectFormat(contentObj)
340  if inputFormatLocal is None:
341  self.logger.info("Unsupported item object format or empty list:\n%s", str(contentObj))
342  continue
343  try:
344  if inputFormatLocal == self.FROMAT_INTERNAL:
345  #Process default internal object format
346  item = self.getDefaultItem(contentObj, tags)
347  elif inputFormatLocal == self.FROMAT_NEWS:
348  #Process NEWS format of response
349  item = self.getNewsItem(contentObj[0], tags)
350  elif inputFormatLocal == self.FROMAT_RSS_FEED:
351  #TODO: process RSS-FEED format of response
352  continue
353  ret.append(item)
354  i += 1
355  except (KeyboardInterrupt, SystemExit):
356  raise
357  except Exception, err:
358  self.logger.error("Error process item: %s, contentObj:\n%s", str(err), str(contentObj))
359  self.logger.debug("%s", Utils.getTracebackInfo())
360  continue
361 
362  self.logResultedStatistics(inputObject, i)
363 
364  return ret
365 
366 
367 
371  def logResultedStatistics(self, inputObject, items):
372  self.logger.debug("Items detected %s, output: %s", str(len(inputObject["itemsList"][0]["itemObject"])), str(items))
373 
374 
375 
379  def detectFormat(self, contentObj):
380  #self.logger.info("Type `%s`, contentObj:\n%s", str(type(contentObj)), str(contentObj))
381  #Auto-detect format
382  if isinstance(contentObj, dict):
383  #Default internal format
384  inputFormat = self.FROMAT_INTERNAL
385  elif isinstance(contentObj, list) and len(contentObj) > 0:
386  #News format
387  inputFormat = self.FROMAT_NEWS
388  else:
389  #Unsupported format
390  inputFormat = None
391 
392  return inputFormat
393 
394 
395 
399  def jsonLoadsSafe(self, jsonString):
400  ret = None
401 
402  try:
403  if jsonString is not None:
404  ret = json.loads(jsonString)
405  except Exception, err:
406  self.logger.error("Error pars json: %s\n%s", str(err), jsonString)
407 
408  return ret
409 
410 
411 
419  def process(self, inputBuffer, inputFormat=FROMAT_AUTO, maxItems=-1, startFrom=0, tags=None):
420  ret = ''
421 
422  try:
423  self.logger.debug("Processing started, tags: %s", str(tags))
424 
425  inputObject = json.loads(inputBuffer)
426 
427  items = self.parse(inputObject, inputFormat, maxItems, startFrom, tags)
428 
429  #Make output string buffer
430  if self.results == self.RESULTS_FORMAT_JSON:
431  ret = json.dumps(items, indent=2, ensure_ascii=False)
432  elif self.results == self.RESULTS_FORMAT_CSV_LINE:
433  for item in items:
434  buf = ''
435  for tagName in item:
436  buf += tagName + '=' + item[tagName] + ','
437  if buf[:-1].strip() != '':
438  ret += buf[:-1] + self.itemDelimiter.replace("\\n", "\n")
439  elif self.results == self.RESULTS_FORMAT_FIELD_LINE:
440  for item in items:
441  buf = ''
442  for tagName in item:
443  buf += tagName + '=' + item[tagName] + "\n"
444  if buf[:-1].strip() != '':
445  ret += buf[:-1] + self.itemDelimiter.replace("\\n", "\n")
446  else:
447  pass
448 
449  except Exception, err:
450  ExceptionLog.handler(self.logger, err, "Error:")
451  raise Exception(self.MSG_ERROR_PROCESSING_REQUEST + ' ' + str(err))
452 
453  return ret
454 
def writeFile(self, fileName, outBuffer)
Write file.
def jsonLoadsSafe(self, jsonString)
Parse json and return dict if okay or None if not.
def getTagValueByName(self, tagName, item, responseFormat)
Check is tag present in response item by the name.
def getNewsItem(self, contentObj, tags=None)
Make output content by substitution of the template&#39;s parts.
def getDefaultItem(self, contentObj, tags=None)
Make output content by substitution of the template&#39;s parts.
def process(self, inputBuffer, inputFormat=FROMAT_AUTO, maxItems=-1, startFrom=0, tags=None)
process main operations
def detectFormat(self, contentObj)
Detects the format of response object.
def __loadAppConfig(self, configName)
load application config file
ResponseExtractor Class main functional, class inherits from foundation.CementApp.
-mask-info
def readFile(self, templateFile)
Read file.
def logResultedStatistics(self, inputObject, items)
Detects the format of response object.
def parse(self, inputObject, inputFormat, maxItems=-1, startFrom=0, tags=None)
Parse the input json and make output collection.
def initLogger(self, configName)
load log config file