HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
PostprocessorTask.py
Go to the documentation of this file.
1 # coding: utf-8
2 
3 """
4 HCE project, Python bindings, Distributed Tasks Manager application.
5 PostprocessorTask class derived from cement application and has main functional for postprocessing.
6 
7 @package: dc_postprocessor
8 @file PostprocessorTask.py
9 @author Alexander Vybornyh <alexander.hce.cluster@gmail.com>
10 @link: http://hierarchical-cluster-engine.com/
11 @copyright: Copyright &copy; 2013-2017 IOIX Ukraine
12 @license: http://hierarchical-cluster-engine.com/license/
13 @since: 0.1
14 """
15 
16 import importlib
17 
18 from dc_postprocessor.PostProcessingApplicationClass import PostProcessingApplicationClass
19 import app.Consts as APP_CONSTS
20 
21 
22 # This object is a run at once application
24 
25  # Constants used in class
26  CONFIG_OPTION_MODULES_IMPORT = 'modulesImport'
27  CONFIG_OPTION_MODULES_ORDER = 'modulesOrder'
28 
29  PACKAGE_MAME = 'dc_postprocessor'
30 
31  # Constants of error messages
32  ERROR_MSG_LOAD_CONFIG_OPTIONS = "Error load config options. Error: %s"
33  ERROR_MSG_INSTANTIATE_MODULES = "Error instantiate modules. Error: %s"
34 
35  # Constant messages used in class
36  MSG_PROCESSING_STARTED = "Postprocessing for batch ID = %s started."
37  MSG_PROCESSING_FINISHED = "Postprocessing for batch ID = %s finished."
38 
39  # Mandatory
40  class Meta(object):
41  label = APP_CONSTS.POST_PROCESSOR_APP_NAME
42  def __init__(self):
43  # print self.label
44  pass
45 
46  # Default initialization
47  def __init__(self):
48  PostProcessingApplicationClass.__init__(self)
49 
50 # self.logger = None
51  self.modules = []
52  self.modulesImport = None
53  self.modulesOrder = None
54 
55 
56  # # intialization application
57  #
58  # @param - None
59  # @return - None
60  def init(self):
61  self.inputBatch()
62  self.loadConfig()
63  self.instantiateModules()
64 
65 
66  # # setup application
67  #
68  # @param - None
69  # @return - None
70  def setup(self):
71  PostProcessingApplicationClass.setup(self)
72 
73 
74  # # run application
75  #
76  # @param - None
77  # @return - None
78  def run(self):
79  PostProcessingApplicationClass.run(self)
80 
81  self.init()
82  self.process()
83  self.finalize()
84  # finish logging
85  self.logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
86 
87 
88  # # finalize application
89  #
90  # @param - None
91  # @return - None
92  def finalize(self):
93  self.outputBatch()
94 
95 
96  # # instantiate modules
97  #
98  # @param - None
99  # @return - None
101  for m in self.modulesImport:
102  try:
103  im = importlib.import_module(self.PACKAGE_MAME + '.' + m)
104  mi = getattr(im, m)(self.getConfigOption, self.logger)
105  mi.init()
106  self.modules.append(mi)
107  except Exception, err:
108  self.logger.error(self.ERROR_MSG_INSTANTIATE_MODULES % str(err))
109 
110 
111 # # #load log config file
112 # #
113 # # @return - None
114 # def loadLogConfig(self, configName):
115 #
116 # try:
117 # if not isinstance(configName, str) or len(configName) == 0:
118 # raise Exception(self.MSG_ERROR_EMPTY_CONFIG_FILE_NAME)
119 #
120 # logging.config.fileConfig(configName)
121 #
122 # # call rotation log files and initialization logger
123 # self.logger = Utils.MPLogger().getLogger()
124 # # self.logger = logging.getLogger(APP_CONSTS.LOGGER_NAME)
125 #
126 # except Exception, err:
127 # raise Exception(self.MSG_ERROR_READ_LOG_CONFIG + ' ' + str(err))
128 
129 
130  # # load config options
131  #
132  # @param - None
133  # @return - None
134  def loadConfig(self):
135  try:
136 # self.loadLogConfig(self.getConfigOption(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME, self.CONFIG_OPTION_LOG))
137 
138 
139  self.modulesImport = self.getConfigOption(self.__class__.__name__, self.CONFIG_OPTION_MODULES_IMPORT).split(',')
140  self.modulesOrder = self.getConfigOption(self.__class__.__name__, self.CONFIG_OPTION_MODULES_ORDER).split(',')
141 
142  self.modulesImport = [moduleName for moduleName in self.modulesImport if moduleName != ""]
143  self.modulesOrder = [moduleName for moduleName in self.modulesOrder if moduleName != ""]
144 
145  self.logger.debug("self.modulesImport: %s", str(self.modulesImport))
146  self.logger.debug("self.modulesOrder: %s", str(self.modulesOrder))
147 
148  except Exception, err:
149  raise Exception(self.ERROR_MSG_LOAD_CONFIG_OPTIONS % str(err))
150 
151 
152  # # main process block
153  #
154  # @param - None
155  # @return - None
156  def process(self):
157 
158  self.logger.debug(self.MSG_PROCESSING_STARTED, str(self.batch.id))
159  # process batch execution
160  for moduleName in self.modulesOrder:
161  for moduleInstance in self.modules:
162  # if isinstance(moduleInstance, moduleName):
163  if moduleName in str(type(moduleInstance)):
164  self.batch = moduleInstance.processBatch(self.batch)
165 
166  # process batch item execution
167  for i in xrange(len(self.batch.items)):
168  for moduleName in self.modulesOrder:
169  for moduleInstance in self.modules:
170  # if isinstance(moduleInstance, moduleName):
171  if moduleName in str(type(moduleInstance)):
172  self.batch.items[i] = moduleInstance.processBatchItem(self.batch.items[i])
173 
174  self.logger.debug(self.MSG_PROCESSING_FINISHED, str(self.batch.id))
def inputBatch(self)
batch
def outputBatch(self)
logger
def getConfigOption(self, sectionName, optionName, defaultValue=None)
-mask-info