2 HCE project, Python bindings, Distributed Tasks Manager application. 3 Converter of the list of the URLs object from the URLFetch request to the Batch object. 4 Used for the processing batching as part of the regular processing on DC service. 7 @file UrlsToBatchTask.py 8 @author Oleksii, bgv <developers.hce@gmail.com>, Alexander Vybornyh <alexander.hce.cluster@gmail.com> 9 @link: http://hierarchical-cluster-engine.com/ 10 @copyright: Copyright © 2013-2015 IOIX Ukraine 11 @license: http://hierarchical-cluster-engine.com/license/ 23 import cPickle
as pickle
26 from cement.core
import foundation
45 STATUS_EMPTY_BATCH = 2
48 MSG_ERROR_EMPTY_CONFIG_FILE_NAME =
"Config file name is empty." 49 MSG_ERROR_WRONG_CONFIG_FILE_NAME =
"Config file name is wrong" 50 MSG_ERROR_LOAD_APP_CONFIG =
"Error loading application config file." 51 MSG_ERROR_READ_LOG_CONFIG =
"Error read log config file." 53 MSG_ERROR_EXIT_STATUS =
"Execution" 54 MSG_DEBUG_INPUT_PICKLE =
"Input pickle: " 55 MSG_DEBUG_INPUT_UNPICKLE =
"Input unpickle: " 56 MSG_DEBUG_LEN_URL_LIST =
"Input url list count: " 57 MSG_DEBUG_INPUT_URL_LIST =
"Append url: " 58 MSG_DEBUG_UNIQ_URL_LIST =
"Append uniq url: " 59 MSG_DEBUG_OUTPUT_BATCH_ITEM =
"Output batch item: " 60 MSG_DEBUG_OUTPUT_BATCH =
"Output batch: " 61 MSG_DEBUG_OUTPUT_PICKLE =
"Output pickle: " 62 MSG_DEBUG_SEND_PICKLE =
"Send pickle. Done." 63 MSG_ERROR_UNKNOWN_EXCEPTION =
"Unknown exception!" 64 MSG_DEBUG_EMPTY_BATCH =
"Empty Batch, exit code " + str(STATUS_EMPTY_BATCH)
67 URLS_TO_BATCH_TASK_OPTION_LOG =
"log" 72 label = APP_CONSTS.URLS_TO_BATCH_TASK_APP_NAME
80 foundation.CementApp.__init__(self)
89 foundation.CementApp.setup(self)
95 foundation.CementApp.run(self)
104 self.
logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
112 if self.pargs.config:
127 config = ConfigParser.ConfigParser()
128 config.optionxform = str
130 readOk = config.read(configName)
135 if config.has_section(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME):
136 confLogFileName = str(config.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME,
139 except Exception, err:
142 return confLogFileName
151 if isinstance(configName, str)
and len(configName) == 0:
154 logging.config.fileConfig(configName)
159 except Exception, err:
165 input_pickle = sys.stdin.read()
173 input_unpickled = pickle.loads(input_pickle)
174 self.
logger.debug(
'>>> input_unpickled: ' + Utils.varDump(input_unpickled))
176 input_unpickled_obj = input_unpickled.eventObject
179 return input_unpickled_obj
183 list_of_url_obj = input_unpickled_obj
187 return list_of_url_obj
192 list_of_uniq_urls = [url_obj
for url_obj
in list_of_url_obj
if url_obj.urlMd5
not in seen
and 193 not seen.add(url_obj.urlMd5)]
196 return list_of_uniq_urls
202 batch_id = self.
id =
getHash(idGenerator.get_connection_uid())
208 list_of_batch_items = []
209 for url_obj
in list_of_uniq_urls:
210 url_obj.contentMask = dc_event.URL.CONTENT_STORED_ON_DISK
211 site_id = url_obj.siteId
212 url_id = url_obj.urlMd5
213 batch_item =
BatchItem(site_id, url_id, url_obj)
215 list_of_batch_items.append(batch_item)
217 return list_of_batch_items
221 output_batch =
Batch(batch_id, list_of_batch_items)
222 self.
logger.info(
"Output batch id: %s, items: %s", str(output_batch.id), str(len(output_batch.items)))
229 output_pickle = pickle.dumps(output_batch)
236 sys.stdout.write(output_pickle)
246 list_of_uniq_urls = list_of_url_obj
249 self.
logger.debug(
'>>> list_of_uniq_urls: ' +
varDump(list_of_uniq_urls))
256 if len(output_batch.items) == 0:
261 self.
exitCode = APP_CONSTS.EXIT_FAILURE
string URLS_TO_BATCH_TASK_OPTION_LOG
def sendPickle(self, output_pickle)
def createBatchItems(self, list_of_uniq_urls)
string MSG_DEBUG_EMPTY_BATCH
def createOutputBatch(self, batch_id, list_of_batch_items)
string MSG_ERROR_EMPTY_CONFIG_FILE_NAME
def __loadLogConfig(self, configName)
def __loadAppConfig(self, configName)
string MSG_DEBUG_OUTPUT_BATCH_ITEM
string MSG_DEBUG_INPUT_URL_LIST
string MSG_DEBUG_INPUT_UNPICKLE
def unpickleInput(self, input_pickle)
string MSG_DEBUG_UNIQ_URL_LIST
string MSG_ERROR_WRONG_CONFIG_FILE_NAME
def loadListOfURLs(self, input_unpickled_obj)
def createOutputPickle(self, output_batch)
string MSG_ERROR_READ_LOG_CONFIG
string MSG_ERROR_LOAD_APP_CONFIG
string MSG_DEBUG_SEND_PICKLE
def getListOfUniqueURLs(self, list_of_url_obj)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
string MSG_DEBUG_OUTPUT_BATCH
IDGenerator is used to generate unique id for connections.
def getHash(strBuf, binSize=32, digestType=0, fixedMode=0, valLimit=18446744073709552000L)
string MSG_DEBUG_LEN_URL_LIST