HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
SocialModule.py
Go to the documentation of this file.
1 # coding: utf-8
2 
3 """
4 HCE project, Python bindings, Distributed Tasks Manager application.
5 SocialModule is a module class and has a main functional for call social task module.
6 
7 @package: dc_postprocessor
8 @file SocialModule.py
9 @author Alexander Vybornyh <alexander.hce.cluster@gmail.com>
10 @link: http://hierarchical-cluster-engine.com/
11 @copyright: Copyright &copy; 2013-2017 IOIX Ukraine
12 @license: http://hierarchical-cluster-engine.com/license/
13 @since: 0.1
14 """
15 
16 try:
17  import cPickle as pickle
18 except ImportError:
19  import pickle
20 
21 import os
22 import json
23 import getpass
24 import tempfile
25 import ConfigParser
26 from subprocess import Popen
27 from subprocess import PIPE
28 
29 from dc.EventObjects import Batch
30 from dc_postprocessor.PostProcessingModuleClass import PostProcessingModuleClass
31 from dc_crawler.DBTasksWrapper import DBTasksWrapper
32 from dc_crawler.DBProxyWrapper import DBProxyWrapper
33 from dc_crawler.UserProxyJsonWrapper import UserProxyJsonWrapper
34 
35 
36 # This object is a run at once module for call social task module.
38 
39  # # Constants for property 'SOCIAL_RATE'
40  SOCIAL_RATE_PROPERTY_NAME = 'SOCIAL_RATE'
41  USER_PROXY_PROPERTY_NAME = 'USER_PROXY'
42  PARAM_USER_PROXY = 'user_proxy'
43 
44  # # Constans used options from config file
45  OPTION_EXECUTION_LOCAL = 'executionLocal'
46  OPTION_EXECUTION_REMOTE = 'executionRemote'
47  OPTION_EXECUTION_TYPE = 'executionType'
48  OPTION_DB_TASK_INI = 'db_task_ini'
49 
50  EXECUTION_TYPE_LOCAL = 0
51  EXECUTION_TYPE_REMOTE = 1
52  EXECUTION_TYPE_DEFAULT = EXECUTION_TYPE_LOCAL
53 
54  # Constants used in class
55  TMP_INPUT_FILE_NAME = 'in'
56  TMP_OUTPUT_FILE_NAME = 'out'
57  TMP_FILE_NAMES_LIST = [TMP_INPUT_FILE_NAME, TMP_OUTPUT_FILE_NAME]
58 
59  # Constants of used macro
60  MACRO_INPUT_FILE = '%INPUT_FILE%'
61  MACRO_OUTPUT_FILE = '%OUTPUT_FILE%'
62  MACRO_USER_NAME = '%USER_NAME%'
63 
64  # Constants of error messages
65  ERROR_MSG_INITIALIZATION_CALLBACK = "Error initialization of callback function for get config options."
66  ERROR_MSG_INITIALIZATION_LOGGER = "Error initialization of self.logger."
67  ERROR_MSG_EXECUTION_TYPE = "Wrong execution type ( %s ) was got from config file."
68  ERROR_MSG_EXECUTION_CMD_EMPTY = "Execution command line is empty."
69  ERROR_MSG_CREATION_DBTASK_WRAPPER = "Creation DBTaskWrapper failed. Error: %s"
70  ERROR_MSG_LOAD_USER_PROXY = "Load parameter '" + PARAM_USER_PROXY + "' from site property failed. Error: %s"
71 
72 
73  # Default initialization
74  def __init__(self, getConfigOption=None, log=None):
75  PostProcessingModuleClass.__init__(self, getConfigOption, log)
76 
77  self.cmd = None
78  self.dbWrapper = None
79 
80 
81  # # initialization
82  #
83  # @param - None
84  # @return - None
85  def init(self):
86  if self.getConfigOption is None:
87  raise Exception(self.ERROR_MSG_INITIALIZATION_CALLBACK)
88 
89  if self.logger is None:
90  raise Exception(self.ERROR_MSG_INITIALIZATION_LOGGER)
91 
92  self.cmd = self.__getCmd()
93  self.dbWrapper = self.__getDBWrapper()
94 
95 
96  # # get DBTasksWrapper instance
97  #
98  # @param - None
99  # @return DBTasksWrapper instance
100  def __getDBWrapper(self):
101  # variable for result
102  ret = None
103  try:
104  configParser = ConfigParser.ConfigParser()
105  configParser.read(self.getConfigOption(sectionName=self.__class__.__name__, optionName=self.OPTION_DB_TASK_INI))
106  ret = DBTasksWrapper(configParser)
107  except Exception, err:
108  raise Exception(self.ERROR_MSG_CREATION_DBTASK_WRAPPER % str(err))
109 
110  return ret
111 
112 
113  # # get comamnd line use parameter from config file
114  #
115  # @param - None
116  # @return - command line options as string
117  def __getCmd(self):
118  # variable for result
119  ret = None
120  executionType = int(self.getConfigOption(sectionName=self.__class__.__name__,
121  optionName=self.OPTION_EXECUTION_TYPE,
122  defaultValue=self.EXECUTION_TYPE_DEFAULT))
123 
124  if executionType == self.EXECUTION_TYPE_LOCAL:
125  ret = self.getConfigOption(sectionName=self.__class__.__name__,
126  optionName=self.OPTION_EXECUTION_LOCAL,
127  defaultValue='')
128 
129  elif executionType == self.EXECUTION_TYPE_REMOTE:
130  ret = self.getConfigOption(sectionName=self.__class__.__name__,
131  optionName=self.OPTION_EXECUTION_REMOTE,
132  defaultValue='')
133 
134  else:
135  raise Exception(self.ERROR_MSG_EXECUTION_TYPE % str(executionType))
136 
137  if ret == "":
138  raise Exception(self.ERROR_MSG_EXECUTION_CMD_EMPTY)
139 
140  return ret
141 
142 
143  # # execute command line command
144  #
145  # @param cmd - command line string
146  # @param inputStream - input stream to popen
147  # @return result object of execution and exit code
148  def executeCommand(self, cmd, inputStream=''):
149  self.logger.debug("Popen: %s", str(cmd))
150  process = Popen(cmd, stdout=PIPE, stdin=PIPE, stderr=PIPE, shell=True, close_fds=True, executable='/bin/bash')
151  self.logger.debug("process.communicate(), len(inputStream)=" + str(len(inputStream)))
152  (output, err) = process.communicate(input=inputStream)
153  self.logger.debug("Process std_error=: %s", str(err))
154  self.logger.debug("Process output len=:" + str(len(output)))
155  exitCode = process.wait()
156  self.logger.debug("Process response exitCode: %s", str(exitCode))
157 
158  return output, exitCode
159 
160 
161  # # create temporary files
162  #
163  # @param - None
164  # @return dict of handlers temporary files (key - name, value -handler)
166  # variable for result
167  files = {}
168  for name in self.TMP_FILE_NAMES_LIST:
169  files[name] = tempfile.NamedTemporaryFile(delete=False)
170 
171  return files
172 
173 
174  # # remove temporary files
175  #
176  # @param tempFiles - temporary files dictionary
177  # @return - None
178  def __removeTemporaryFiles(self, tempFiles):
179  if isinstance(tempFiles, dict):
180  for f in tempFiles.values():
181  if f is not None and os.path.isfile(f.name):
182  os.unlink(f.name)
183 
184 
185  # # make input file
186  #
187  # @param tempFiles - list handles of temporary files
188  # @param inputBatch - input batch instance
189  # @return - None
190  def __makeInputFile(self, tempFiles, inputBatch):
191 
192  if isinstance(tempFiles, dict) and self.TMP_INPUT_FILE_NAME in tempFiles:
193  tempFiles[self.TMP_INPUT_FILE_NAME].write(pickle.dumps(inputBatch))
194  tempFiles[self.TMP_INPUT_FILE_NAME].close()
195 
196 
197  # # read output file
198  #
199  # @param tempFiles - list handles of temporary files
200  # @return extracted batch object
201  def __readOutputFile(self, tempFiles):
202  # variable for result
203  ret = None
204  if tempFiles[self.TMP_OUTPUT_FILE_NAME] is not None:
205  ret = pickle.loads(tempFiles[self.TMP_OUTPUT_FILE_NAME].read())
206  tempFiles[self.TMP_OUTPUT_FILE_NAME].close()
207 
208  return ret
209 
210 
211  # # make command line for social task
212  #
213  # @param tempFiles - list handles of temporary files
214  # @param templateCmdLine - template of cmd
215  # @return command line string ready for execution
216  def __makeCmdLine(self, tempFiles, templateCmdLine):
217  # variable for result
218  ret = templateCmdLine
219 
220  # set temporary file names
221  if isinstance(tempFiles, dict):
222 
223  if self.MACRO_INPUT_FILE.upper() in ret and tempFiles[self.TMP_INPUT_FILE_NAME] is not None:
224  ret = ret.replace(self.MACRO_INPUT_FILE.upper(), tempFiles[self.TMP_INPUT_FILE_NAME].name)
225 
226  if self.MACRO_INPUT_FILE.lower() in ret and tempFiles[self.TMP_INPUT_FILE_NAME] is not None:
227  ret = ret.replace(self.MACRO_INPUT_FILE.lower(), tempFiles[self.TMP_INPUT_FILE_NAME].name)
228 
229  if self.MACRO_OUTPUT_FILE.upper() in ret and tempFiles[self.TMP_OUTPUT_FILE_NAME] is not None:
230  ret = ret.replace(self.MACRO_OUTPUT_FILE.upper(), tempFiles[self.TMP_OUTPUT_FILE_NAME].name)
231 
232  if self.MACRO_OUTPUT_FILE.lower() in ret and tempFiles[self.TMP_OUTPUT_FILE_NAME] is not None:
233  ret = ret.replace(self.MACRO_OUTPUT_FILE.lower(), tempFiles[self.TMP_OUTPUT_FILE_NAME].name)
234 
235  if self.MACRO_USER_NAME .upper() in ret:
236  ret = ret.replace(self.MACRO_USER_NAME.upper(), getpass.getuser())
237 
238  if self.MACRO_USER_NAME .lower() in ret:
239  ret = ret.replace(self.MACRO_USER_NAME.lower(), getpass.getuser())
240 
241  return ret
242 
243 
244  # # fill user proxy data each batch item
245  #
246  # @param batchItem - batch item for update
247  # @return - None
248  def __fillUserProxyData(self, batchItem):
249 
250  if self.PARAM_USER_PROXY in batchItem.properties[self.SOCIAL_RATE_PROPERTY_NAME]:
251  try:
252  socialRateProperties = json.loads(batchItem.properties[self.SOCIAL_RATE_PROPERTY_NAME])
253 
254  if self.PARAM_USER_PROXY in socialRateProperties:
255  self.logger.debug("!!! user_proxy: %s", str(socialRateProperties[self.PARAM_USER_PROXY]))
256  userProxyJsonWrapper = UserProxyJsonWrapper(socialRateProperties[self.PARAM_USER_PROXY])
257  self.logger.debug("!!! source: %s", str(userProxyJsonWrapper.getSource()))
258  self.logger.debug("!!! proxies: %s", str(userProxyJsonWrapper.getProxies()))
259 
260  if userProxyJsonWrapper.getSource() == UserProxyJsonWrapper.SOURCE_DATABASE:
261  self.logger.debug("Getting proxies list from DB.")
262 
263  self.logger.debug("!!! batchItem.siteId: %s", str(batchItem.siteId))
264 
265  proxyWrapper = DBProxyWrapper(self.dbWrapper)
266  proxiesList = proxyWrapper.getEnaibledProxies(batchItem.siteId)
267  self.logger.debug("!!! type: %s, proxiesList: %s", str(type(proxiesList)), str(proxiesList))
268 
269  userProxyJsonWrapper.addProxyList(proxiesList)
270  userProxyJsonWrapper.setSource(UserProxyJsonWrapper.SOURCE_PROPERTY)
271  self.logger.debug("!!! userProxyJsonWrapper.getProxies(): %s", str(userProxyJsonWrapper.getProxies()))
272  self.logger.debug("!!! userProxyJsonWrapper.getSource(): %s", str(userProxyJsonWrapper.getSource()))
273 
274  batchItem.properties[self.SOCIAL_RATE_PROPERTY_NAME] = json.dumps(socialRateProperties)
275 
276  except Exception, err:
277  self.logger.error(self.ERROR_MSG_LOAD_USER_PROXY, str(err))
278 
279  self.logger.debug("!!! batchItem.properties: %s", str(batchItem.properties))
280 
281 
282  return batchItem
283 
284 
285  # # process batch item interface method
286  #
287  # @param batchObj - batch instance
288  # @return - None
289  def processBatch(self, batch):
290 
291  if isinstance(batch, Batch):
292  localBatchItems = []
293  # accumulate batch items for send to social task processing
294  for i in xrange(len(batch.items)):
295  if self.SOCIAL_RATE_PROPERTY_NAME in batch.items[i].properties:
296  localBatchItems.append(self.__fillUserProxyData(batch.items[i]))
297 
298  if len(localBatchItems) > 0:
299  localBatch = Batch(batchId=batch.id)
300  localBatch.items = localBatchItems
301  self.logger.debug("Accumulated %s items from %s total for send to SocialTask",
302  str(len(localBatchItems)), str(len(batch.items)))
303 
304  localBatch = self.executeSocialTask(localBatch)
305  self.logger.debug("Recived %s items from SocialTask", str(len(localBatch.items)))
306 
307  foundCount = 0
308  # update batch items after processing of the social task
309  for i in xrange(len(batch.items)):
310  for batchItem in localBatch.items:
311  if batch.items[i].urlId == batchItem.urlId and batch.items[i].siteId == batchItem.siteId:
312  batch.items[i] = batchItem
313  self.logger.debug("Found result for %s", str(batch.items[i].urlId))
314  foundCount += 1
315  break
316 
317  self.logger.debug("Found results for %s items", str(foundCount))
318 
319  else:
320  self.logger.error("Input object has type: %s", str(type(batch)))
321 
322  return batch
323 
324 
325  # # Execute social task processing
326  #
327  # @param inputBatch - input batch instance
328  # @return output batch object
329  def executeSocialTask(self, inputBatch):
330  # variable for result
331  ret = inputBatch
332  tempFiles = self.__createTemporaryFiles()
333  self.logger.debug("!!! tempFiles: %s", str(tempFiles))
334  try:
335  if self.cmd is None or self.cmd == "":
336  raise Exception(self.ERROR_MSG_EXECUTION_CMD_EMPTY)
337 
338  self.__makeInputFile(tempFiles, inputBatch)
339 
340  self.logger.debug("!!! template cmd: %s", str(self.cmd))
341  cmd = self.__makeCmdLine(tempFiles, self.cmd)
342  self.logger.debug("!!! execute cmd: %s", str(cmd))
343 
344  output, exitCode = self.executeCommand(cmd)
345  self.logger.debug("!!! output: %s", str(output))
346 
347  if int(exitCode) == 0:
348  ret = self.__readOutputFile(tempFiles)
349 
350  except Exception, err:
351  self.logger.error(str(err))
352  finally:
353  self.__removeTemporaryFiles(tempFiles)
354 
355  return ret
def executeCommand(self, cmd, inputStream='')
def __init__(self, getConfigOption=None, log=None)
Definition: SocialModule.py:74
def __makeInputFile(self, tempFiles, inputBatch)
logger
-mask-info
def __makeCmdLine(self, tempFiles, templateCmdLine)
getConfigOption