HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ftest_PostprocessorTask.py
Go to the documentation of this file.
1 #!/usr/bin/python
2 # coding: utf-8
3 
4 try:
5  import cPickle as pickle
6 except ImportError:
7  import pickle
8 
9 import threading
10 import os
11 import sys
12 import ConfigParser
13 import base64
14 import logging
15 import json
16 import time
17 from subprocess import Popen
18 from subprocess import PIPE
19 from cement.core import foundation
20 from dc.EventObjects import URL
21 from dc.EventObjects import Batch
22 from dc.EventObjects import BatchItem
23 from dc.EventObjects import URLContentResponse
24 import app.Consts as APP_CONSTS
25 import app.Utils as Utils
26 from app.Utils import varDump
27 from dc_postprocessor.PostprocessorTask import PostprocessorTask
28 from dc_postprocessor.PostProcessingApplicationClass import PostProcessingApplicationClass
29 from dc_postprocessor.LinkResolver import LinkResolver
30 
31 
32 def getLogger():
33 
34  # create logger
35  logger = logging.getLogger('console')
36  logger.setLevel(logging.DEBUG)
37 
38  # create console handler and set level to debug
39  ch = logging.StreamHandler()
40  ch.setLevel(logging.DEBUG)
41 
42  # create formatter
43  formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
44 
45  # add formatter to ch
46  ch.setFormatter(formatter)
47 
48  # add ch to logger
49  logger.addHandler(ch)
50 
51  return logger
52 
53 
55 
56  configName = '../ini/postprocessor_task_log-rt.ini'
57 
58  retval = os.getcwd()
59  os.chdir('..')
60  # read config
61  logging.config.fileConfig(configName)
62 
63  # create logger
64  log = Utils.MPLogger().getLogger()
65  # log = logging.getLogger(APP_CONSTS.LOGGER_NAME)
66  os.chdir(retval)
67 
68  return log
69 
70 
71 # # execute command line command
72 #
73 # @param cmd - command line string
74 # @param inputStream - input stream to popen
75 # @return result object of execution and exit code
76 def executeCommand(cmd, inputStream=None, log=None):
77  if log is not None:
78  log.debug("Popen: %s", str(cmd))
79  process = Popen(cmd, stdout=PIPE, stdin=PIPE, stderr=PIPE, shell=True, close_fds=True)
80  if isinstance(inputStream, basestring) and log is not None:
81  log.debug("process.communicate(), len(inputStream)=" + str(len(inputStream)))
82  (output, err) = process.communicate(input=inputStream)
83  if log is not None:
84  log.debug("Process std_error: %s", str(err))
85  log.debug("Process output len =" + str(len(output)))
86  exitCode = process.wait()
87  if log is not None:
88  log.debug("Process response exitCode: %s", str(exitCode))
89 
90  return output, exitCode
91 
92 
93 def generateBatch(id=0):
94  siteId = id
95  url = 'https://retrip.jp/external-link/?article_content_id=482406'
96  urlObj = URL(siteId, url)
97 
98  processedContent = {'link':url}
99  processedContents = [base64.b64encode(json.dumps(processedContent))]
100  urlContentResponse = URLContentResponse(url=url, processedContents=processedContents)
101 
102  batchItem = BatchItem(siteId=siteId, urlId=urlObj.urlMd5, urlObj=urlObj, urlContentResponse=urlContentResponse)
103  batchItem.properties = {"LINK_RESOLVE":{"method":{"retrip.jp/external-link":"GET"}}}
104  # batchItem.properties = {}
105  return Batch(id, batchItems=[batchItem])
106 
107 
109  return pickle.dumps(generateBatch(id=0))
110 
111 
112 def test(id=0, log=None):
113 
114  inputFile = '/tmp/input.tmp'
115  outputFile = '/tmp/output.tmp'
116 
117  cmd = "cd ..; ../bin/postprocessor_task.py --config=../ini/postprocessor_task-rt.ini --inputFile=%s > %s" % (str(inputFile), str(outputFile))
118 
119  f = open(inputFile, 'w')
120  f.write(generateInputStream())
121  f.close()
122 
123  output, exitCode = executeCommand(cmd, log=log)
124 
125  if log is not None:
126  log.debug("len(output) = %s", str(len(output)))
127  log.debug("exitCode: %s", str(exitCode))
128  log.debug("===Finish===")
129 
130 
131 def threadRun(id=0, log=None):
132 
133  sys.stdout.write("Thread ID = %s started.\n" % str(id))
134 
135  test(id=id, log=log)
136 
137  sys.stdout.write("Thread ID = %s stopped.\n" % str(id))
138 
139 
140 if __name__ == '__main__':
141 
142  logger = getLogger()
143  # logger = getFileLogger()
144 
145  testCount = 5
146  threadsList = []
147 
148  for i in xrange(testCount):
149  threadsList.append(threading.Thread(target=threadRun, kwargs={'id':i, 'log':logger}))
150  threadsList[-1].start()
151 
152  for i in xrange(testCount):
153  threadsList[i].join()
154  # #test(id=1, log=logger)
def executeCommand(cmd, inputStream=None, log=None)
Definition: join.py:1