HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
RTCPreprocessor.py
Go to the documentation of this file.
1 """
2 HCE project, Python bindings, Distributed Tasks Manager application.
3 RTCPreprocessor Class content main functional for preprocessor for realtime crawling.
4 
5 @package: dc_crawler
6 @file RTCPreprocessor.py
7 @author Alex <developers.hce@gmail.com>, Alexander Vybornyh <alexander.hce.cluster@gmail.com>
8 @link: http://hierarchical-cluster-engine.com/
9 @copyright: Copyright &copy; 2013-2014 IOIX Ukraine
10 @license: http://hierarchical-cluster-engine.com/license/
11 @since: 0.1
12 """
13 
14 
15 import sys
16 import os
17 import pickle
18 import logging.config
19 import ConfigParser
20 
21 import argparse
22 
23 from app.Utils import varDump
24 import app.Utils as Utils
25 import app.Consts as APP_CONSTS
26 import dc_crawler.Constants as DC_CRAWLER_CONSTS
27 from cement.core import foundation
28 
29 APP_NAME = "RTCPreprocessor"
30 
31 
34 class RTCPreprocessor(foundation.CementApp):
35 
36 
37  MSG_ERROR_PARSE_CMD_PARAMS = "Error parse command line parameters."
38  MSG_ERROR_EMPTY_CONFIG_FILE_NAME = "Config file name is empty."
39  MSG_ERROR_WRONG_CONFIG_FILE_NAME = "Config file name is wrong"
40  MSG_ERROR_LOAD_APP_CONFIG = "Error loading application config file."
41  MSG_ERROR_READ_LOG_CONFIG = "Error read log config file."
42 
43 
44  DRCE_NODE_NUMBER = "DRCE_NODE_NUMBER"
45  DRCE_NODES_TOTAL = "DRCE_NODES_TOTAL"
46 
47 
48  ERROR_EMPTY_ENV_VARS = 2
49 
50 
51  PREPROCESSOR_OPTION_LOG = "log"
52 
53  # Mandatory
54  class Meta(object):
55  label = DC_CRAWLER_CONSTS.RTC_PREPROCESSOR_APP_NAME
56  def __init__(self):
57  pass
58 
59 
60  def __init__(self):
61  # call base class __init__ method
62  foundation.CementApp.__init__(self)
63 
64  self.logger = None
65  self.batch = None
66  self.exitCode = APP_CONSTS.EXIT_SUCCESS
67  self.pickled_object = None
68  self.envVars = {self.DRCE_NODES_TOTAL: 1,
69  self.DRCE_NODE_NUMBER: 1}
70 
71 
72 
73  def setup(self):
74  # call base class setup method
75  foundation.CementApp.setup(self)
76 
77 
78 
79  def run(self):
80  # call base class run method
81  foundation.CementApp.run(self)
82 
83  # call initialization application
84  self.__initApp()
85 
86  # call internal processing
87  self.process()
88 
89  # Finish logging
90  self.logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
91 
92 
93 
97  def __initApp(self):
98  if self.pargs.config:
99  self.__loadLogConfig(self.__loadAppConfig(self.pargs.config))
100  else:
101  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG)
102 
103 
104 
108  def __loadAppConfig(self, configName):
109  #variable for result
110  confLogFileName = ""
111 
112  try:
113  config = ConfigParser.ConfigParser()
114  config.optionxform = str
115 
116  readOk = config.read(configName)
117 
118  if len(readOk) == 0:
119  raise Exception(self.MSG_ERROR_WRONG_CONFIG_FILE_NAME + ": " + configName)
120 
121  if config.has_section(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME):
122  confLogFileName = str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME, self.PREPROCESSOR_OPTION_LOG))
123 
124  except Exception, err:
125  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG + ' ' + str(err))
126 
127  return confLogFileName
128 
129 
130 
134  def __loadLogConfig(self, configName):
135  try:
136  if isinstance(configName, str) and len(configName) == 0:
137  raise Exception(self.MSG_ERROR_EMPTY_CONFIG_FILE_NAME)
138 
139  logging.config.fileConfig(configName)
140 
141  #call rotation log files and initialization logger
142  self.logger = Utils.MPLogger().getLogger()
143 
144  except Exception, err:
145  raise Exception(self.MSG_ERROR_READ_LOG_CONFIG + ' ' + str(err))
146 
147 
148  def getBatchFromInput(self):
149  self.pickled_object = sys.stdin.read()
150 
151 
152  def cutBatch(self):
153  self.batch = (pickle.loads(self.pickled_object))
154  self.logger.info("Before id:%s items: %s", str(self.batch.id), str(len(self.batch.items)))
155  self.logger.debug("self.batch: %s", varDump(self.batch))
156  items = self.batch.items
157  if len(items) > 1:
158  splitted_items = self.split(self.batch.items, int(self.envVars[self.DRCE_NODES_TOTAL]))
159  self.logger.debug("Input items: %s", str(self.batch.items))
160  self.logger.debug("Splitted items: %s", str(splitted_items))
161  self.batch.items = splitted_items[int(self.envVars[self.DRCE_NODE_NUMBER]) - 1]
162  self.logger.debug("Output items: %s", str(self.batch.items))
163  self.logger.debug("Output batch: %s", varDump(self.batch))
164  self.pickled_object = pickle.dumps(self.batch)
165 
166  self.logger.info("After id:%s items: %s", str(self.batch.id), str(len(self.batch.items)))
167 
168  def split(self, arr, count):
169  return [arr[i::count] for i in range(count)]
170 
171 
172  def sendBatch(self):
173  print self.pickled_object
174  sys.stdout.flush()
175 
176 
177  def getEnvVars(self):
178  for key in self.envVars.keys():
179  if key in os.environ and os.environ[key] != "":
180  self.envVars[key] = os.environ[key]
181  self.logger.debug("os.environ[%s]: set to <<%s>>" % (key, self.envVars[key]))
182  else:
183  self.logger.debug("os.environ[%s]: not set. Use default value: <<%s>>" % (key, self.envVars[key]))
184  self.exitCode = self.ERROR_EMPTY_ENV_VARS
185 
186 
187  def process(self):
188  try:
189  self.getBatchFromInput()
190  self.getEnvVars()
191  if self.exitCode != self.ERROR_EMPTY_ENV_VARS:
192  self.logger.info("The batch possible will be reduced")
193  self.cutBatch()
194  else:
195  self.logger.info("The batch will not be reduced")
196  self.sendBatch()
197  except Exception:
198  self.exitCode = APP_CONSTS.EXIT_FAILURE
199 
def __loadAppConfig(self, configName)
load application config file
int ERROR_EMPTY_ENV_VARS
Constans as numeric for exit code.
RTCPreprocessor Class content main functional for preprocessor for realtime crawling, class inherits from foundation.CementApp.
string DRCE_NODE_NUMBER
Constans used in class.
def __loadLogConfig(self, configName)
load log config file
def __initApp(self)
initialize application from config files
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
string PREPROCESSOR_OPTION_LOG
Constans used options from config file.