4 HCE project, Python bindings, Distributed Tasks Manager application. 5 SocialModule is a module class and has a main functional for call social task module. 7 @package: dc_postprocessor 9 @author Alexander Vybornyh <alexander.hce.cluster@gmail.com> 10 @link: http://hierarchical-cluster-engine.com/ 11 @copyright: Copyright © 2013-2017 IOIX Ukraine 12 @license: http://hierarchical-cluster-engine.com/license/ 17 import cPickle
as pickle
26 from subprocess
import Popen
27 from subprocess
import PIPE
40 SOCIAL_RATE_PROPERTY_NAME =
'SOCIAL_RATE' 41 USER_PROXY_PROPERTY_NAME =
'USER_PROXY' 42 PARAM_USER_PROXY =
'user_proxy' 45 OPTION_EXECUTION_LOCAL =
'executionLocal' 46 OPTION_EXECUTION_REMOTE =
'executionRemote' 47 OPTION_EXECUTION_TYPE =
'executionType' 48 OPTION_DB_TASK_INI =
'db_task_ini' 50 EXECUTION_TYPE_LOCAL = 0
51 EXECUTION_TYPE_REMOTE = 1
52 EXECUTION_TYPE_DEFAULT = EXECUTION_TYPE_LOCAL
55 TMP_INPUT_FILE_NAME =
'in' 56 TMP_OUTPUT_FILE_NAME =
'out' 57 TMP_FILE_NAMES_LIST = [TMP_INPUT_FILE_NAME, TMP_OUTPUT_FILE_NAME]
60 MACRO_INPUT_FILE =
'%INPUT_FILE%' 61 MACRO_OUTPUT_FILE =
'%OUTPUT_FILE%' 62 MACRO_USER_NAME =
'%USER_NAME%' 65 ERROR_MSG_INITIALIZATION_CALLBACK =
"Error initialization of callback function for get config options." 66 ERROR_MSG_INITIALIZATION_LOGGER =
"Error initialization of self.logger." 67 ERROR_MSG_EXECUTION_TYPE =
"Wrong execution type ( %s ) was got from config file." 68 ERROR_MSG_EXECUTION_CMD_EMPTY =
"Execution command line is empty." 69 ERROR_MSG_CREATION_DBTASK_WRAPPER =
"Creation DBTaskWrapper failed. Error: %s" 70 ERROR_MSG_LOAD_USER_PROXY =
"Load parameter '" + PARAM_USER_PROXY +
"' from site property failed. Error: %s" 74 def __init__(self, getConfigOption=None, log=None):
75 PostProcessingModuleClass.__init__(self, getConfigOption, log)
104 configParser = ConfigParser.ConfigParser()
107 except Exception, err:
120 executionType = int(self.
getConfigOption(sectionName=self.__class__.__name__,
149 self.
logger.debug(
"Popen: %s", str(cmd))
150 process = Popen(cmd, stdout=PIPE, stdin=PIPE, stderr=PIPE, shell=
True, close_fds=
True, executable=
'/bin/bash')
151 self.
logger.debug(
"process.communicate(), len(inputStream)=" + str(len(inputStream)))
152 (output, err) = process.communicate(input=inputStream)
153 self.
logger.debug(
"Process std_error=: %s", str(err))
154 self.
logger.debug(
"Process output len=:" + str(len(output)))
155 exitCode = process.wait()
156 self.
logger.debug(
"Process response exitCode: %s", str(exitCode))
158 return output, exitCode
169 files[name] = tempfile.NamedTemporaryFile(delete=
False)
179 if isinstance(tempFiles, dict):
180 for f
in tempFiles.values():
181 if f
is not None and os.path.isfile(f.name):
218 ret = templateCmdLine
221 if isinstance(tempFiles, dict):
257 self.
logger.debug(
"!!! source: %s", str(userProxyJsonWrapper.getSource()))
258 self.
logger.debug(
"!!! proxies: %s", str(userProxyJsonWrapper.getProxies()))
260 if userProxyJsonWrapper.getSource() == UserProxyJsonWrapper.SOURCE_DATABASE:
261 self.
logger.debug(
"Getting proxies list from DB.")
263 self.
logger.debug(
"!!! batchItem.siteId: %s", str(batchItem.siteId))
266 proxiesList = proxyWrapper.getEnaibledProxies(batchItem.siteId)
267 self.
logger.debug(
"!!! type: %s, proxiesList: %s", str(
type(proxiesList)), str(proxiesList))
269 userProxyJsonWrapper.addProxyList(proxiesList)
270 userProxyJsonWrapper.setSource(UserProxyJsonWrapper.SOURCE_PROPERTY)
271 self.
logger.debug(
"!!! userProxyJsonWrapper.getProxies(): %s", str(userProxyJsonWrapper.getProxies()))
272 self.
logger.debug(
"!!! userProxyJsonWrapper.getSource(): %s", str(userProxyJsonWrapper.getSource()))
276 except Exception, err:
279 self.
logger.debug(
"!!! batchItem.properties: %s", str(batchItem.properties))
291 if isinstance(batch, Batch):
294 for i
in xrange(len(batch.items)):
298 if len(localBatchItems) > 0:
299 localBatch =
Batch(batchId=batch.id)
300 localBatch.items = localBatchItems
301 self.
logger.debug(
"Accumulated %s items from %s total for send to SocialTask",
302 str(len(localBatchItems)), str(len(batch.items)))
305 self.
logger.debug(
"Recived %s items from SocialTask", str(len(localBatch.items)))
309 for i
in xrange(len(batch.items)):
310 for batchItem
in localBatch.items:
311 if batch.items[i].urlId == batchItem.urlId
and batch.items[i].siteId == batchItem.siteId:
312 batch.items[i] = batchItem
313 self.
logger.debug(
"Found result for %s", str(batch.items[i].urlId))
317 self.
logger.debug(
"Found results for %s items", str(foundCount))
333 self.
logger.debug(
"!!! tempFiles: %s", str(tempFiles))
335 if self.
cmd is None or self.
cmd ==
"":
340 self.
logger.debug(
"!!! template cmd: %s", str(self.
cmd))
342 self.
logger.debug(
"!!! execute cmd: %s", str(cmd))
345 self.
logger.debug(
"!!! output: %s", str(output))
347 if int(exitCode) == 0:
350 except Exception, err:
string TMP_INPUT_FILE_NAME
def __fillUserProxyData(self, batchItem)
string OPTION_EXECUTION_TYPE
string ERROR_MSG_CREATION_DBTASK_WRAPPER
int EXECUTION_TYPE_DEFAULT
def executeCommand(self, cmd, inputStream='')
string ERROR_MSG_EXECUTION_TYPE
def __init__(self, getConfigOption=None, log=None)
def __makeInputFile(self, tempFiles, inputBatch)
string OPTION_EXECUTION_REMOTE
string ERROR_MSG_INITIALIZATION_CALLBACK
def executeSocialTask(self, inputBatch)
string ERROR_MSG_EXECUTION_CMD_EMPTY
string OPTION_EXECUTION_LOCAL
string TMP_OUTPUT_FILE_NAME
int EXECUTION_TYPE_REMOTE
string SOCIAL_RATE_PROPERTY_NAME
def __createTemporaryFiles(self)
def processBatch(self, batch)
string ERROR_MSG_INITIALIZATION_LOGGER
def __removeTemporaryFiles(self, tempFiles)
def __readOutputFile(self, tempFiles)
def __makeCmdLine(self, tempFiles, templateCmdLine)
string OPTION_DB_TASK_INI
string ERROR_MSG_LOAD_USER_PROXY