HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ContentUpdater.py
Go to the documentation of this file.
1 # coding: utf-8
2 
3 """
4 HCE project, Python bindings, Distributed Tasks Manager application.
5 Content updater tools main functional.
6 
7 @package: app
8 @file ContentUpdater.py
9 @author Alexander Vybornyh <alexander.hce.cluster@gmail.com>
10 @link: http://hierarchical-cluster-engine.com/
11 @copyright: Copyright &copy; 2013-2016 IOIX Ukraine
12 @license: http://hierarchical-cluster-engine.com/license/
13 @since: 0.1
14 """
15 
16 try:
17  import cPickle as pickle
18 except ImportError:
19  import pickle
20 
21 import json
22 import sys
23 import logging.config
24 import ConfigParser
25 from cement.core import foundation
26 
27 from dc.EventObjects import Attribute
28 import dc.EventObjects as dc_event
29 from app.Utils import varDump
30 import app.Utils as Utils
31 import app.Consts as APP_CONSTS
32 from app.Utils import SQLExpression
33 from app.Utils import getTracebackInfo
34 from app.Exceptions import DatabaseException
35 from dc_crawler.DBTasksWrapper import DBTasksWrapper
36 import dc_db.Constants as DB_CONSTS
37 
38 
39 # This object is a run at once application
40 class ContentUpdater(foundation.CementApp):
41  # # Constants error messages used in class
42  MSG_ERROR_EMPTY_CONFIG_FILE_NAME = "Config file name is empty."
43  MSG_ERROR_WRONG_CONFIG_FILE_NAME = "Config file name is wrong"
44  MSG_ERROR_LOAD_APP_CONFIG = "Error loading application config file."
45  MSG_ERROR_READ_LOG_CONFIG = "Error read log config file."
46  MSG_ERROR_MISSED_SECTION = "Missed mandatory section '%s'"
47  MSG_ERROR_DATABASE_OPERATION = "Database operation has error: %s"
48  MSG_ERROR_UPDATE_PROCESSED_CONTENTS = "Update processed contents has error: %s"
49 
50  MSG_DEBUG_INPUT_PICKLE = "Input pickle: "
51  MSG_DEBUG_INPUT_UNPICKLED = "input unpickled: "
52  MSG_DEBUG_OUTPUT_BATCH = "Output batch: "
53  MSG_DEBUG_OUTPUT_PICKLE = "Output pickle: "
54  MSG_DEBUG_SEND_PICKLE = "Send pickle. Done."
55 
56  ATTRIBUTE_ERROR_MESSAGE_NAME = 'errorMessage'
57 
58  # Mandatory
59  class Meta(object):
60  label = APP_CONSTS.CONTENT_UPDATER_APP_NAME
61  def __init__(self):
62  pass
63 
64 
65  # # Internal class for social rate option from config
66  class ConfigOptions(object):
67  # # Constans used options from config file
68  CONTENT_UPDATER_OPTION_LOG = "log"
69  CONTENT_UPDATER_OPTION_DB_TASK_INI = "db_task_ini"
70 
71  def __init__(self, confLogFileName=None, dbTaskIniFile=None):
72  self.confLogFileName = confLogFileName
73  self.dbTaskIniFile = dbTaskIniFile
74 
75 
76  # # Initialize default fields
77  def __init__(self):
78  # call base class __init__ method
79  foundation.CementApp.__init__(self)
80  self.exitCode = APP_CONSTS.EXIT_SUCCESS
81  self.logger = None
82  self.dbWrapper = None
83  self.errorMsg = None
84 
85 
86  # # setup application
87  def setup(self):
88  # call base class setup method
89  foundation.CementApp.setup(self)
90 
91 
92  # # run application
93  def run(self):
94  # call base class run method
95  foundation.CementApp.run(self)
96 
97  # call initialization application
98  self.__initApp()
99 
100  # call internal processing
101  self.process()
102 
103  # Finish logging
104  self.logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
105 
106 
107  # #initialize application from config files
108  #
109  # @param - None
110  # @return - None
111  def __initApp(self):
112  if self.pargs.config:
113  # load app config
114  configOptions = self.__loadAppConfig(self.pargs.config)
115 
116  # load log config
117  self.__loadLogConfig(configOptions.confLogFileName)
118 
119  # set attribute values of application
120  self.dbWrapper = self.__createDBTasksWrapper(configOptions.dbTaskIniFile)
121 
122  else:
123  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG)
124 
125  if self.pargs.error:
126  self.errorMsg = str(self.pargs.error)
127 
128 
129  # #load application config file
130  #
131  # @param configName - name of application config file
132  # @return - instance of ConfigOptions class
133  def __loadAppConfig(self, configName):
134  # variable for result
135  configOptions = None
136  try:
137  config = ConfigParser.ConfigParser()
138  config.optionxform = str
139 
140  readOk = config.read(configName)
141 
142  if len(readOk) == 0:
143  raise Exception(self.MSG_ERROR_WRONG_CONFIG_FILE_NAME + ": " + configName)
144 
145  if not config.has_section(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME):
146  raise Exception(self.MSG_ERROR_MISSED_SECTION % str(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME))
147 
148  configOptions = ContentUpdater.ConfigOptions(
149  str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME,
150  ContentUpdater.ConfigOptions.CONTENT_UPDATER_OPTION_LOG)),
151  str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME,
152  ContentUpdater.ConfigOptions.CONTENT_UPDATER_OPTION_DB_TASK_INI)))
153 
154  except Exception, err:
155  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG + ' ' + str(err))
156 
157  return configOptions
158 
159 
160  # #load log config file
161  #
162  # @param configName - name of log rtc-finalizer config file
163  # @return - None
164  def __loadLogConfig(self, configName):
165  try:
166  if isinstance(configName, str) and len(configName) == 0:
167  raise Exception(self.MSG_ERROR_EMPTY_CONFIG_FILE_NAME)
168 
169  logging.config.fileConfig(configName)
170 
171  # call rotation log files and initialization logger
172  self.logger = Utils.MPLogger().getLogger()
173 
174  except Exception, err:
175  raise Exception(self.MSG_ERROR_READ_LOG_CONFIG + ' ' + str(err))
176 
177 
178  # # create dbtask wrapper instance
179  #
180  # @param configName - dbtask ini file
181  # @return instance of DBTasksWrapper class
182  def __createDBTasksWrapper(self, configName):
183  # variable for result
184  dbTasksWrapper = None
185  try:
186  if configName == "":
187  raise Exception(self.MSG_ERROR_EMPTY_CONFIG_FILE_NAME)
188 
189  config = ConfigParser.ConfigParser()
190  config.optionxform = str
191 
192  readOk = config.read(configName)
193 
194  if len(readOk) == 0:
195  raise Exception(self.MSG_ERROR_WRONG_CONFIG_FILE_NAME + ": " + configName)
196 
197  dbTasksWrapper = DBTasksWrapper(config)
198 
199  except Exception, err:
200  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG + ' ' + str(err))
201 
202  return dbTasksWrapper
203 
204 
205  # # get input pickle from stdin
206  #
207  # @param - None
208  # @return inputPickle - input pickle object
209  def getInputPickle(self):
210  inputPickle = sys.stdin.read()
211  # self.logger.debug(self.MSG_DEBUG_INPUT_PICKLE + '\n' + str(inputPickle))
212 
213  return inputPickle
214 
215 
216  # # unpikle input object
217  #
218  # @param inputPickle - input pickle object
219  def unpickleInput(self, inputPickle):
220  inputUnpickled = pickle.loads(inputPickle)
221  # self.logger.debug(self.MSG_DEBUG_INPUT_UNPICKLED + varDump(inputUnpickled))
222 
223  return inputUnpickled
224 
225 
226  # # create output pickle object
227  #
228  # @param outputBatch - output batch
229  # @return outputPickle - output pickle object
230  def createOutputPickle(self, outputBatch):
231  # self.logger.debug(self.MSG_DEBUG_OUTPUT_BATCH + varDump(outputBatch))
232  outputPickle = pickle.dumps(outputBatch)
233  # self.logger.debug(self.MSG_DEBUG_OUTPUT_PICKLE + str(outputPickle))
234 
235  return outputPickle
236 
237 
238  # # send pickle
239  #
240  # @param outputPickle - output pickle object
241  # @return - None
242  def sendPickle(self, outputPickle):
243  sys.stdout.write(outputPickle)
244  self.logger.debug(self.MSG_DEBUG_SEND_PICKLE)
245 
246 
247  # # main process handler
248  #
249  # @param - None
250  # @return None
251  def process(self):
252  try:
253  inputBatchObj = self.unpickleInput(self.getInputPickle())
254 
255  if self.errorMsg is None:
256  self.updateProcessedContents(inputBatchObj)
257  else:
258  self.updateAttributesOnly(inputBatchObj)
259 
260  self.sendPickle(self.createOutputPickle(inputBatchObj))
261  except Exception, err:
262  self.logger.error(str(err))
263  self.exitCode = APP_CONSTS.EXIT_FAILURE
264 
265 
266  # # update processed contents
267  #
268  # @param inputBatch - input batch
269  # @return - None
270  def updateProcessedContents(self, inputBatch):
271 
272  urlPuts = []
273  attributes = []
274 
275  self.logger.debug("The processing of batch Id = %s started", str(inputBatch.id))
276 
277  for batchItem in inputBatch.items:
278 # self.logger.debug("batchItem: %s", varDump(batchItem))
279 # self.logger.debug("batchItem.urlContentResponse: %s", varDump(batchItem.urlContentResponse))
280 
281  if batchItem.urlContentResponse is not None:
282  for processedContent in batchItem.urlContentResponse.processedContents:
283 
284  self.logger.debug("!!! processedContent: %s", varDump(processedContent, stringifyType=0))
285  try:
286  # create URLPut object
287  putDict = {}
288  putDict["id"] = batchItem.urlId
289  putDict["data"] = processedContent
290  putDict["cDate"] = SQLExpression("NOW()")
291 
292  urlPut = dc_event.URLPut(batchItem.siteId,
293  batchItem.urlId,
294  dc_event.Content.CONTENT_PROCESSOR_CONTENT,
295  putDict)
296 
297  # accumulate URLPut objects
298  urlPuts.append(urlPut)
299 
300  except Exception, err:
302  self.logger.debug(getTracebackInfo())
303 
304  try:
305 # self.logger.debug("type: %s, batchItem.urlContentResponse.attributes: %s",
306 # str(type(batchItem.urlContentResponse.attributes)),
307 # varDump(batchItem.urlContentResponse.attributes,
308 # maxDepth=15))
309 
310 
311  # accumulate attributes
312  for attrJson in batchItem.urlContentResponse.attributes:
313  attrDict = json.loads(attrJson)
314 
315 # self.logger.debug("type: %s, attrDict: %s", str(type(attrDict)), str(attrDict))
316 
317  attrValue = json.dumps(attrDict['value'], ensure_ascii=False, encoding='utf-8')
318 
319  attribute = Attribute(siteId=attrDict['siteId'],
320  name=attrDict['name'],
321  urlMd5=attrDict['urlMd5'],
322  value=attrValue)
323 
324  attributes.append(attribute)
325 
326  if len(attributes) > 0:
327  self.logger.debug("Made attributes: %s", varDump(attributes))
328 
329  except Exception, err:
330  self.logger.error("Make attributes error: %s", str(err))
331  self.logger.debug(getTracebackInfo())
332 
333  try:
334  # execute database operations
335  affectDB = self.dbWrapper.affect_db
336  self.dbWrapper.affect_db = True
337  self.dbWrapper.putURLContent(urlPuts)
338  self.dbWrapper.putAttributes(attributes)
339  self.dbWrapper.affect_db = affectDB
340 
341  self.logger.debug('Database operations executed...')
342 
343  except DatabaseException, err:
344  self.logger.error(self.MSG_ERROR_DATABASE_OPERATION, str(err))
345  self.logger.debug(getTracebackInfo())
346  except Exception, err:
347  self.logger.error(self.MSG_ERROR_DATABASE_OPERATION, str(err))
348  self.logger.debug(getTracebackInfo())
349 
350  self.logger.debug("The processing of batch Id = %s finished", str(inputBatch.id))
351 
352 
353  # # update attributes by error message
354  #
355  # @param inputBatch - input batch
356  # @return - None
357  def updateAttributesOnly(self, inputBatch):
358  attributes = []
359 
360  for batchItem in inputBatch.items:
361  self.logger.debug("batchItem: %s", varDump(batchItem))
362  self.logger.debug("batchItem.urlContentResponse: %s", varDump(batchItem.urlContentResponse))
363 
364  try:
365  # accumulate attributes
366  attributes.append(Attribute(siteId=batchItem.siteId,
368  urlMd5=batchItem.urlId,
369  value=self.dbWrapper.dbTask.dbConnections[DB_CONSTS.PRIMARY_DB_ID].\
370  escape_string(str(self.errorMsg))))
371 
372  self.logger.debug("Made attributes: %s", varDump(attributes))
373  except Exception, err:
374  self.logger.error("Make attributes error: %s", str(err))
375  self.logger.debug(getTracebackInfo())
376 
377  try:
378  # execute database operations
379  affectDB = self.dbWrapper.affect_db
380  self.dbWrapper.affect_db = True
381  self.dbWrapper.putAttributes(attributes)
382  self.dbWrapper.affect_db = affectDB
383 
384  self.logger.debug('Database operations executed...')
385 
386  except DatabaseException, err:
387  self.logger.error(self.MSG_ERROR_DATABASE_OPERATION, str(err))
388  self.logger.debug(getTracebackInfo())
def updateProcessedContents(self, inputBatch)
def sendPickle(self, outputPickle)
def createOutputPickle(self, outputBatch)
def updateAttributesOnly(self, inputBatch)
def __loadAppConfig(self, configName)
def __init__(self, confLogFileName=None, dbTaskIniFile=None)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
def __createDBTasksWrapper(self, configName)
-mask-info
def getTracebackInfo(linesNumberMax=None)
Definition: Utils.py:218
def unpickleInput(self, inputPickle)
def __loadLogConfig(self, configName)