HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
UrlFetchJsonToDBTaskConvertor.py
Go to the documentation of this file.
1 """
2 HCE project, Python bindings, Distributed Tasks Manager application.
3 Converter of the list of the URLs object from the URLFetch request to the DBTask.
4 
5 @package: app
6 @file URLFetchToJsonDBTaskConvertor.py
7 @author Oleksii <developers.hce@gmail.com>, Alexander Vybornyh <alexander.hce.cluster@gmail.com>
8 @link: http://hierarchical-cluster-engine.com/
9 @copyright: Copyright &copy; 2013-2015 IOIX Ukraine
10 @license: http://hierarchical-cluster-engine.com/license/
11 @since: 0.1
12 """
13 
14 
15 import sys
16 import logging.config
17 import ConfigParser
18 import json
19 from subprocess import Popen
20 from subprocess import PIPE
21 try:
22  import cPickle as pickle
23 except ImportError:
24  import pickle
25 from cement.core import foundation
26 
27 import dc.Constants as DC_CONSTS
28 from dcc.DCCObjectsSerializator import DCCObjectsSerializator
29 from app.Utils import varDump
30 import app.Utils as Utils
31 import app.Consts as APP_CONSTS
32 
33 
34 # # URLFetchToJsonDBTaskConvertor Class content main functional for convert of the list of the URLs object
35 # from the URLFetch request to the DBTask, class inherits from foundation.CementApp
36 #
37 class UrlFetchToJsonDBTaskConvertor(foundation.CementApp):
38 
39  # # Constants used in class
40  CMD_DEFAULT = "cd ~/hce-node-bundle/api/python/bin && ./db-task.py --c=../ini/db-task.ini"
41 
42  # # Constants error messages used in class
43  MSG_ERROR_EMPTY_CONFIG_FILE_NAME = "Config file name is empty."
44  MSG_ERROR_WRONG_CONFIG_FILE_NAME = "Config file name is wrong"
45  MSG_ERROR_LOAD_APP_CONFIG = "Error loading application config file."
46  MSG_ERROR_READ_LOG_CONFIG = "Error read log config file."
47  MSG_ERROR_EXIT_STATUS = "Exit failure. "
48 
49  MSG_DEBUG_INPUT_PICKLE = "Input pickle: "
50  MSG_DEBUG_OUTPUT_PICKLE = "Output pickle: "
51  MSG_DEBUG_SEND_PICKLE = "Send pickle. Done."
52 
53  MSG_INFO_PROCESSOR_EXIT_CODE = "Scraper exit_code: "
54  MSG_INFO_PROCESSOR_OUTPUT = "Scraper output: "
55  MSG_INFO_PROCESSOR_ERROR = "Scraper err: "
56 
57  # #Constans used options from config file
58  URLS_FETCH_JSON_TO_DBTASK_OPTION_LOG = "log"
59  URLS_FETCH_JSON_TO_DBTASK_OPTION_CMD = "cmd"
60 
61 
62  # Mandatory
63  class Meta(object):
64  label = APP_CONSTS.URLS_FETCH_JSON_TO_DBTASK_APP_NAME
65  def __init__(self):
66  pass
67 
68 
69  # #constructor
70  def __init__(self):
71  # call base class __init__ method
72  foundation.CementApp.__init__(self)
73 
74  self.logger = None
75  self.exitCode = APP_CONSTS.EXIT_SUCCESS
76  self.cmd = self.CMD_DEFAULT
77 
78 
79  # # setup application
80  def setup(self):
81  # call base class setup method
82  foundation.CementApp.setup(self)
83 
84 
85  # # run application
86  def run(self):
87  # call base class run method
88  foundation.CementApp.run(self)
89 
90  # call initialization application
91  self.__initApp()
92 
93  # call internal processing
94  self.process()
95 
96  # Finish logging
97  self.logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
98 
99 
100  # #initialize application from config files
101  #
102  # @param - None
103  # @return - None
104  def __initApp(self):
105  if self.pargs.config:
106  self.__loadLogConfig(self.__loadAppConfig(self.pargs.config))
107  else:
108  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG)
109 
110 
111  # #load application config file
112  #
113  # @param configName - name of application config file
114  # @return - log config file name
115  def __loadAppConfig(self, configName):
116  # variable for result
117  confLogFileName = ""
118 
119  try:
120  config = ConfigParser.ConfigParser()
121  config.optionxform = str
122 
123  readOk = config.read(configName)
124 
125  if len(readOk) == 0:
126  raise Exception(self.MSG_ERROR_WRONG_CONFIG_FILE_NAME + ": " + configName)
127 
128  if config.has_section(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME):
129  confLogFileName = str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME,
131  self.cmd = str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME,
133 
134  except Exception, err:
135  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG + ' ' + str(err))
136 
137  return confLogFileName
138 
139 
140  # #load log config file
141  #
142  # @param configName - name of log rtc-finalizer config file
143  # @return - None
144  def __loadLogConfig(self, configName):
145  try:
146  if isinstance(configName, str) and len(configName) == 0:
147  raise Exception(self.MSG_ERROR_EMPTY_CONFIG_FILE_NAME)
148 
149  logging.config.fileConfig(configName)
150 
151  # call rotation log files and initialization logger
152  self.logger = Utils.MPLogger().getLogger()
153 
154  except Exception, err:
155  raise Exception(self.MSG_ERROR_READ_LOG_CONFIG + ' ' + str(err))
156 
157 
158  def getURLFetchJson(self):
159  inputUrlFetchJson = sys.stdin.read()
160  self.logger.debug(self.MSG_DEBUG_INPUT_PICKLE + '\n' + varDump(inputUrlFetchJson))
161 
162  return json.loads(inputUrlFetchJson)
163 
164 
165  def createOutputPickle(self, inputUrlFetchJson):
166  eventType = DC_CONSTS.EVENT_TYPES.URL_FETCH
167  convertor = DCCObjectsSerializator()
168  eventObj = convertor.URLFetchDeserialize(inputUrlFetchJson)
169  self.logger.debug(self.MSG_DEBUG_OUTPUT_PICKLE + '\n' + str(eventObj))
170  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(eventType, eventObj)
171  outputPickle = pickle.dumps(drceSyncTasksCoverObj)
172  self.logger.debug(self.MSG_DEBUG_OUTPUT_PICKLE + '\n' + str(outputPickle))
173 
174  return outputPickle
175 
176 
177  def sendToDbTask(self, outputPickle):
178  process = Popen(self.cmd, stdout=PIPE, stdin=PIPE, shell=True, close_fds=True)
179  (output, err) = process.communicate(input=outputPickle)
180  self.exitCode = process.wait()
181  self.logger.info(self.MSG_INFO_PROCESSOR_EXIT_CODE + str(self.exitCode) + '\n' + \
182  self.MSG_INFO_PROCESSOR_OUTPUT + str(output) + '\n' + \
183  self.MSG_INFO_PROCESSOR_ERROR + str(err) + '\n' + str(pickle.loads(output)))
184 
185  return output
186 
187 
188  def sendPickle(self, output_pickle):
189  sys.stdout.write(output_pickle)
190  self.logger.debug(self.MSG_DEBUG_SEND_PICKLE)
191 
192 
193  def process(self):
194 
195  self.logger.info('self.cmd: ' + str(self.cmd))
196  try:
197  # chain of necessary calls for processing
199  except Exception, err:
200  self.logger.error(self.MSG_ERROR_EXIT_STATUS + str(err))
201  self.exitCode = APP_CONSTS.EXIT_FAILURE
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
-mask-info