4 HCE project, Python bindings, Distributed Tasks Manager application. 5 Content updater tools main functional. 8 @file ContentUpdater.py 9 @author Alexander Vybornyh <alexander.hce.cluster@gmail.com> 10 @link: http://hierarchical-cluster-engine.com/ 11 @copyright: Copyright © 2013-2016 IOIX Ukraine 12 @license: http://hierarchical-cluster-engine.com/license/ 17 import cPickle
as pickle
25 from cement.core
import foundation
42 MSG_ERROR_EMPTY_CONFIG_FILE_NAME =
"Config file name is empty." 43 MSG_ERROR_WRONG_CONFIG_FILE_NAME =
"Config file name is wrong" 44 MSG_ERROR_LOAD_APP_CONFIG =
"Error loading application config file." 45 MSG_ERROR_READ_LOG_CONFIG =
"Error read log config file." 46 MSG_ERROR_MISSED_SECTION =
"Missed mandatory section '%s'" 47 MSG_ERROR_DATABASE_OPERATION =
"Database operation has error: %s" 48 MSG_ERROR_UPDATE_PROCESSED_CONTENTS =
"Update processed contents has error: %s" 50 MSG_DEBUG_INPUT_PICKLE =
"Input pickle: " 51 MSG_DEBUG_INPUT_UNPICKLED =
"input unpickled: " 52 MSG_DEBUG_OUTPUT_BATCH =
"Output batch: " 53 MSG_DEBUG_OUTPUT_PICKLE =
"Output pickle: " 54 MSG_DEBUG_SEND_PICKLE =
"Send pickle. Done." 56 ATTRIBUTE_ERROR_MESSAGE_NAME =
'errorMessage' 60 label = APP_CONSTS.CONTENT_UPDATER_APP_NAME
68 CONTENT_UPDATER_OPTION_LOG =
"log" 69 CONTENT_UPDATER_OPTION_DB_TASK_INI =
"db_task_ini" 71 def __init__(self, confLogFileName=None, dbTaskIniFile=None):
79 foundation.CementApp.__init__(self)
89 foundation.CementApp.setup(self)
95 foundation.CementApp.run(self)
104 self.
logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
112 if self.pargs.config:
126 self.
errorMsg = str(self.pargs.error)
137 config = ConfigParser.ConfigParser()
138 config.optionxform = str
140 readOk = config.read(configName)
145 if not config.has_section(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME):
149 str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME,
150 ContentUpdater.ConfigOptions.CONTENT_UPDATER_OPTION_LOG)),
151 str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME,
152 ContentUpdater.ConfigOptions.CONTENT_UPDATER_OPTION_DB_TASK_INI)))
154 except Exception, err:
166 if isinstance(configName, str)
and len(configName) == 0:
169 logging.config.fileConfig(configName)
174 except Exception, err:
184 dbTasksWrapper =
None 189 config = ConfigParser.ConfigParser()
190 config.optionxform = str
192 readOk = config.read(configName)
199 except Exception, err:
202 return dbTasksWrapper
210 inputPickle = sys.stdin.read()
220 inputUnpickled = pickle.loads(inputPickle)
223 return inputUnpickled
232 outputPickle = pickle.dumps(outputBatch)
243 sys.stdout.write(outputPickle)
261 except Exception, err:
263 self.
exitCode = APP_CONSTS.EXIT_FAILURE
275 self.
logger.debug(
"The processing of batch Id = %s started", str(inputBatch.id))
277 for batchItem
in inputBatch.items:
281 if batchItem.urlContentResponse
is not None:
282 for processedContent
in batchItem.urlContentResponse.processedContents:
284 self.
logger.debug(
"!!! processedContent: %s",
varDump(processedContent, stringifyType=0))
288 putDict[
"id"] = batchItem.urlId
289 putDict[
"data"] = processedContent
292 urlPut = dc_event.URLPut(batchItem.siteId,
294 dc_event.Content.CONTENT_PROCESSOR_CONTENT,
298 urlPuts.append(urlPut)
300 except Exception, err:
312 for attrJson
in batchItem.urlContentResponse.attributes:
313 attrDict = json.loads(attrJson)
317 attrValue = json.dumps(attrDict[
'value'], ensure_ascii=
False, encoding=
'utf-8')
319 attribute =
Attribute(siteId=attrDict[
'siteId'],
320 name=attrDict[
'name'],
321 urlMd5=attrDict[
'urlMd5'],
324 attributes.append(attribute)
326 if len(attributes) > 0:
327 self.
logger.debug(
"Made attributes: %s",
varDump(attributes))
329 except Exception, err:
330 self.
logger.
error(
"Make attributes error: %s", str(err))
341 self.
logger.debug(
'Database operations executed...')
343 except DatabaseException, err:
346 except Exception, err:
350 self.
logger.debug(
"The processing of batch Id = %s finished", str(inputBatch.id))
360 for batchItem
in inputBatch.items:
362 self.
logger.debug(
"batchItem.urlContentResponse: %s",
varDump(batchItem.urlContentResponse))
366 attributes.append(
Attribute(siteId=batchItem.siteId,
368 urlMd5=batchItem.urlId,
369 value=self.
dbWrapper.dbTask.dbConnections[DB_CONSTS.PRIMARY_DB_ID].\
372 self.
logger.debug(
"Made attributes: %s",
varDump(attributes))
373 except Exception, err:
374 self.
logger.
error(
"Make attributes error: %s", str(err))
384 self.
logger.debug(
'Database operations executed...')
386 except DatabaseException, err:
string ATTRIBUTE_ERROR_MESSAGE_NAME
string MSG_ERROR_LOAD_APP_CONFIG
string MSG_ERROR_DATABASE_OPERATION
string MSG_ERROR_READ_LOG_CONFIG
def updateProcessedContents(self, inputBatch)
def sendPickle(self, outputPickle)
string MSG_ERROR_UPDATE_PROCESSED_CONTENTS
def createOutputPickle(self, outputBatch)
def updateAttributesOnly(self, inputBatch)
def __loadAppConfig(self, configName)
def __init__(self, confLogFileName=None, dbTaskIniFile=None)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
string MSG_DEBUG_SEND_PICKLE
def __createDBTasksWrapper(self, configName)
string MSG_ERROR_MISSED_SECTION
string MSG_ERROR_WRONG_CONFIG_FILE_NAME
def getTracebackInfo(linesNumberMax=None)
string MSG_ERROR_EMPTY_CONFIG_FILE_NAME
def unpickleInput(self, inputPickle)
def __loadLogConfig(self, configName)