HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
UrlsToBatchTask.py
Go to the documentation of this file.
1 """
2 HCE project, Python bindings, Distributed Tasks Manager application.
3 Converter of the list of the URLs object from the URLFetch request to the Batch object.
4 Used for the processing batching as part of the regular processing on DC service.
5 
6 @package: dc
7 @file UrlsToBatchTask.py
8 @author Oleksii, bgv <developers.hce@gmail.com>, Alexander Vybornyh <alexander.hce.cluster@gmail.com>
9 @link: http://hierarchical-cluster-engine.com/
10 @copyright: Copyright &copy; 2013-2015 IOIX Ukraine
11 @license: http://hierarchical-cluster-engine.com/license/
12 @since: 0.1
13 """
14 
15 
16 import sys
17 import logging.config
18 import ConfigParser
19 import ctypes
20 import zlib
21 import time
22 try:
23  import cPickle as pickle
24 except ImportError:
25  import pickle
26 from cement.core import foundation
27 
28 from transport.IDGenerator import IDGenerator
29 from dc.EventObjects import Batch
30 from dc.EventObjects import BatchItem
31 import dc.EventObjects as dc_event
32 from app.Utils import varDump
33 from app.Utils import getHash
34 import app.Utils as Utils
35 import app.Consts as APP_CONSTS
36 
37 
38 
39 # # UrlsToBatchTask Class content main functional for convert of the list of the URLs object
40 # from the URLFetch request to the Batch object, class inherits from foundation.CementApp
41 #
42 class UrlsToBatchTask(foundation.CementApp):
43 
44  # #Constans as numeric for exit code
45  STATUS_EMPTY_BATCH = 2
46 
47  # # Constants error messages used in class
48  MSG_ERROR_EMPTY_CONFIG_FILE_NAME = "Config file name is empty."
49  MSG_ERROR_WRONG_CONFIG_FILE_NAME = "Config file name is wrong"
50  MSG_ERROR_LOAD_APP_CONFIG = "Error loading application config file."
51  MSG_ERROR_READ_LOG_CONFIG = "Error read log config file."
52 
53  MSG_ERROR_EXIT_STATUS = "Execution"
54  MSG_DEBUG_INPUT_PICKLE = "Input pickle: "
55  MSG_DEBUG_INPUT_UNPICKLE = "Input unpickle: "
56  MSG_DEBUG_LEN_URL_LIST = "Input url list count: "
57  MSG_DEBUG_INPUT_URL_LIST = "Append url: "
58  MSG_DEBUG_UNIQ_URL_LIST = "Append uniq url: "
59  MSG_DEBUG_OUTPUT_BATCH_ITEM = "Output batch item: "
60  MSG_DEBUG_OUTPUT_BATCH = "Output batch: "
61  MSG_DEBUG_OUTPUT_PICKLE = "Output pickle: "
62  MSG_DEBUG_SEND_PICKLE = "Send pickle. Done."
63  MSG_ERROR_UNKNOWN_EXCEPTION = "Unknown exception!"
64  MSG_DEBUG_EMPTY_BATCH = "Empty Batch, exit code " + str(STATUS_EMPTY_BATCH)
65 
66  # #Constans used options from config file
67  URLS_TO_BATCH_TASK_OPTION_LOG = "log"
68 
69 
70  # Mandatory
71  class Meta(object):
72  label = APP_CONSTS.URLS_TO_BATCH_TASK_APP_NAME
73  def __init__(self):
74  pass
75 
76 
77  # #constructor
78  def __init__(self):
79  # call base class __init__ method
80  foundation.CementApp.__init__(self)
81 
82  self.logger = None
83  self.exitCode = APP_CONSTS.EXIT_SUCCESS
84 
85 
86  # # setup application
87  def setup(self):
88  # call base class setup method
89  foundation.CementApp.setup(self)
90 
91 
92  # # run application
93  def run(self):
94  # call base class run method
95  foundation.CementApp.run(self)
96 
97  # call initialization application
98  self.__initApp()
99 
100  # call internal processing
101  self.process()
102 
103  # Finish logging
104  self.logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
105 
106 
107  # #initialize application from config files
108  #
109  # @param - None
110  # @return - None
111  def __initApp(self):
112  if self.pargs.config:
113  self.__loadLogConfig(self.__loadAppConfig(self.pargs.config))
114  else:
115  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG)
116 
117 
118  # #load application config file
119  #
120  # @param configName - name of application config file
121  # @return - log config file name
122  def __loadAppConfig(self, configName):
123  # variable for result
124  confLogFileName = ""
125 
126  try:
127  config = ConfigParser.ConfigParser()
128  config.optionxform = str
129 
130  readOk = config.read(configName)
131 
132  if len(readOk) == 0:
133  raise Exception(self.MSG_ERROR_WRONG_CONFIG_FILE_NAME + ": " + configName)
134 
135  if config.has_section(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME):
136  confLogFileName = str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME,
138 
139  except Exception, err:
140  raise Exception(self.MSG_ERROR_LOAD_APP_CONFIG + ' ' + str(err))
141 
142  return confLogFileName
143 
144 
145  # #load log config file
146  #
147  # @param configName - name of log rtc-finalizer config file
148  # @return - None
149  def __loadLogConfig(self, configName):
150  try:
151  if isinstance(configName, str) and len(configName) == 0:
152  raise Exception(self.MSG_ERROR_EMPTY_CONFIG_FILE_NAME)
153 
154  logging.config.fileConfig(configName)
155 
156  # call rotation log files and initialization logger
157  self.logger = Utils.MPLogger().getLogger()
158 
159  except Exception, err:
160  raise Exception(self.MSG_ERROR_READ_LOG_CONFIG + ' ' + str(err))
161 
162 
163 
164  def getInputPickle(self):
165  input_pickle = sys.stdin.read()
166  #self.logger.debug(self.MSG_DEBUG_INPUT_PICKLE + '\n' + str(input_pickle))
167 
168  return input_pickle
169 
170 
171  def unpickleInput(self, input_pickle):
172  #input_unpickled_obj = pickle.loads(input_pickle).eventObject
173  input_unpickled = pickle.loads(input_pickle)
174  self.logger.debug('>>> input_unpickled: ' + Utils.varDump(input_unpickled))
175 
176  input_unpickled_obj = input_unpickled.eventObject
177  self.logger.debug(self.MSG_DEBUG_INPUT_UNPICKLE + '\n' + Utils.varDump(input_unpickled_obj))
178 
179  return input_unpickled_obj
180 
181 
182  def loadListOfURLs(self, input_unpickled_obj):
183  list_of_url_obj = input_unpickled_obj
184  self.logger.info(self.MSG_DEBUG_LEN_URL_LIST + str(len(list_of_url_obj)))
185  self.logger.debug(self.MSG_DEBUG_INPUT_URL_LIST + varDump(list_of_url_obj))
186 
187  return list_of_url_obj
188 
189 
190  def getListOfUniqueURLs(self, list_of_url_obj):
191  seen = set()
192  list_of_uniq_urls = [url_obj for url_obj in list_of_url_obj if url_obj.urlMd5 not in seen and
193  not seen.add(url_obj.urlMd5)]
194  self.logger.debug(self.MSG_DEBUG_UNIQ_URL_LIST + Utils.varDump(list_of_uniq_urls))
195 
196  return list_of_uniq_urls
197 
198 
199  def createBatchId(self):
200  idGenerator = IDGenerator()
201  #batch_id = ctypes.c_uint32(zlib.crc32(idGenerator.get_connection_uid(), int(time.time()))).value
202  batch_id = self.id = getHash(idGenerator.get_connection_uid())
203 
204  return batch_id
205 
206 
207  def createBatchItems(self, list_of_uniq_urls):
208  list_of_batch_items = []
209  for url_obj in list_of_uniq_urls:
210  url_obj.contentMask = dc_event.URL.CONTENT_STORED_ON_DISK
211  site_id = url_obj.siteId
212  url_id = url_obj.urlMd5
213  batch_item = BatchItem(site_id, url_id, url_obj)
214  self.logger.debug(self.MSG_DEBUG_OUTPUT_BATCH_ITEM + Utils.varDump(batch_item))
215  list_of_batch_items.append(batch_item)
216 
217  return list_of_batch_items
218 
219 
220  def createOutputBatch(self, batch_id, list_of_batch_items):
221  output_batch = Batch(batch_id, list_of_batch_items)
222  self.logger.info("Output batch id: %s, items: %s", str(output_batch.id), str(len(output_batch.items)))
223  self.logger.debug(self.MSG_DEBUG_OUTPUT_BATCH + varDump(output_batch))
224 
225  return output_batch
226 
227 
228  def createOutputPickle(self, output_batch):
229  output_pickle = pickle.dumps(output_batch)
230  #self.logger.debug(self.MSG_DEBUG_OUTPUT_PICKLE + str(output_pickle))
231 
232  return output_pickle
233 
234 
235  def sendPickle(self, output_pickle):
236  sys.stdout.write(output_pickle)
237  self.logger.debug(self.MSG_DEBUG_SEND_PICKLE)
238 
239 
240  def process(self):
241  try:
242  input_pickle = self.getInputPickle()
243  input_unpickled_obj = self.unpickleInput(input_pickle)
244  list_of_url_obj = self.loadListOfURLs(input_unpickled_obj)
245 # list_of_uniq_urls = self.getListOfUniqueURLs(list_of_url_obj)
246  list_of_uniq_urls = list_of_url_obj
247  batch_id = self.createBatchId()
248 
249  self.logger.debug('>>> list_of_uniq_urls: ' + varDump(list_of_uniq_urls))
250 
251  list_of_batch_items = self.createBatchItems(list_of_uniq_urls)
252  output_batch = self.createOutputBatch(batch_id, list_of_batch_items)
253  output_pickle = self.createOutputPickle(output_batch)
254  self.sendPickle(output_pickle)
255 
256  if len(output_batch.items) == 0:
257  self.logger.debug(self.MSG_DEBUG_EMPTY_BATCH)
258  self.exitCode = self.STATUS_EMPTY_BATCH
259 
260  except Exception:
261  self.exitCode = APP_CONSTS.EXIT_FAILURE
262 
def sendPickle(self, output_pickle)
def createBatchItems(self, list_of_uniq_urls)
def createOutputBatch(self, batch_id, list_of_batch_items)
def __loadLogConfig(self, configName)
def __loadAppConfig(self, configName)
def unpickleInput(self, input_pickle)
def loadListOfURLs(self, input_unpickled_obj)
def createOutputPickle(self, output_batch)
def getListOfUniqueURLs(self, list_of_url_obj)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
IDGenerator is used to generate unique id for connections.
Definition: IDGenerator.py:15
def getHash(strBuf, binSize=32, digestType=0, fixedMode=0, valLimit=18446744073709552000L)
Definition: Utils.py:1649