HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
prepairer_single_urls.py
Go to the documentation of this file.
1 #!/usr/bin/python
2 
3 
4 """
5 HCE project, Python bindings, Distributed Tasks Manager application.
6 Event objects definitions.
7 
8 @package: dc
9 @file prepairer.py
10 @author Oleksii <developers.hce@gmail.com>
11 @link: http://hierarchical-cluster-engine.com/
12 @copyright: Copyright &copy; 2013-2014 IOIX Ukraine
13 @license: http://hierarchical-cluster-engine.com/license/
14 @since: 0.1
15 """
16 
17 import ppath
18 from ppath import sys
19 
20 import hashlib
21 import md5
22 import time
23 import json
24 import logging
25 from subprocess import Popen
26 from subprocess import PIPE
27 import MySQLdb as mdb
28 import MySQLdb.cursors
29 from contextlib import closing
30 
31 import pickle
32 import urllib
33 from urlparse import urlparse
34 from dc.EventObjects import Batch
35 from dc.EventObjects import BatchItem
36 from dc.EventObjects import Site
37 from dc.EventObjects import SiteUpdate
38 from dc.EventObjects import URL
39 from dc.EventObjects import URLUpdate
40 from dc.EventObjects import SiteFilter
41 
42 logging.basicConfig(filename="prepairer.log", filemode="w")
43 logger = logging.getLogger("Prepairer")
44 logger.setLevel("DEBUG")
45 
46 db_connector = None
47 dc_sites_db_connect = None
48 dc_urls_db_connect = None
49 
50 
51 site_templates_dic = {}
52 templates_dic = {}
53 
54 
56  query = "SELECT sites_urls.URL, sites_properties.`Value` FROM `sites_properties` INNER JOIN sites_urls ON sites_urls.Site_Id = sites_properties.Site_Id AND sites_properties.Name = 'template'"
57  print query
58  rows = executeQuery(dc_sites_db_connect, query)
59  return rows
60 
61 
62 def cutURL(url):
63  a = urlparse(url).netloc.split(":")[0].split(".")
64  arr = None
65  b = None
66  if len(a) > 2 and a[-3] != "www":
67  arr = a[-3:]
68  b = str(arr[-3] + "." + arr[-2] + "." + arr[-1])
69  else:
70  arr = a[-2:]
71  b = str(arr[-2] + "." + arr[-1])
72  return b
73 
74 
76  templates = readTemplatesFromMySQL()
77  print templates
78  for template in templates:
79  print template
80  site_templates_dic[template["URL"]] = template["Value"]
81  with open("sites_templates_dic", "w") as f:
82  f.write(json.dumps(site_templates_dic))
83  for (key, value) in site_templates_dic.items():
84  url = cutURL(key)
85  md5 = hashlib.md5(url).hexdigest()
86  templates_dic[md5] = MySQLdb.escape_string(value)
87  pass
88 
89 
90 def executeQuery(db_connector, query):
91  try:
92  with closing(db_connector.cursor(MySQLdb.cursors.DictCursor)) as cursor:
93  cursor.execute(query)
94  db_connector.commit()
95  return cursor.fetchall()
96  except mdb.Error as err: # @todo logging in db_task
97  db_connector.rollback()
98  raise
99 
100 
102  global db_connector
103  global dc_sites_db_connect
104  global dc_urls_db_connect
105 
106  dbHost = "127.0.0.1"
107  dbPort = 3306
108  dbUser = "hce"
109  dbPWD = "hce12345"
110 
111  db_dc_sites = "dc_sites"
112  db_dc_urls = "dc_urls"
113 
114  dc_sites_db_connect = mdb.connect(dbHost, dbUser, dbPWD, db_dc_sites, dbPort)
115  dc_urls_db_connect = mdb.connect(dbHost, dbUser, dbPWD, db_dc_urls, dbPort)
116 
117 
118 def createSiteObj(input_url):
119 
120  # strip input url
121  input_url = input_url.strip()
122 
123  # root url
124  root_url = input_url
125 
126  # get url for md5
127  norm_url = cutURL(input_url)
128 
129  # create site
130  site = Site(norm_url)
131 
132  # create site filters
133  site_filter_pattern = ".*" + norm_url + ".*"
134  site_filters = SiteFilter(site.id, site_filter_pattern)
135 
136 
137  # create site properties templates
138  if site.id in templates_dic:
139  site.properties["template"] = templates_dic[site.id]
140 
141  # fill site
142  # site.urls = [root_url]
143  site.urls = []
144  site.filters = [site_filters]
145  # site.maxResources = 5
146  # site.maxURLs = 5
147  # site.state = Site.STATE_SUSPENDED
148  # site.filters = [SiteFilter(site.id, "(.*)")]
149  return site
150 
151 
152 def createURLObj(site, input_url):
153  url = URL(site.id, input_url)
154  url.status = URL.STATUS_SELECTED_CRAWLING
155  url.type = URL.TYPE_SINGLE
156  return url
157 
158 
159 def addSite(site):
160  file_name = "site_" + str(site.id) + ".json"
161  open(file_name, "w").write(site.toJSON())
162  cmd = "./dc-client.py --config=../ini/dc-client.ini --command=SITE_NEW --file=./%s" % file_name
163  process = Popen(cmd, stdout=PIPE, stdin=PIPE, shell=True, close_fds=True)
164  (output, err) = process.communicate()
165  exit_code = process.wait()
166  open("dc-client_new_site_output.txt", "w").write(output)
167  return exit_code
168 
169 
170 def addURL(url, site):
171  file_name = "url_" + str(site.id) + ".json"
172  open(file_name, "w").write("[" + url.toJSON() + "]")
173  cmd = "./dc-client.py --config=../ini/dc-client.ini --command=URL_NEW --file=./%s" % file_name
174  process = Popen(cmd, stdout=PIPE, stdin=PIPE, shell=True, close_fds=True)
175  (output, err) = process.communicate()
176  exit_code = process.wait()
177  open("dc-client_new_url_output.txt", "w").write(output)
178  return exit_code
179 
180 
181 def updateURL(input_url, site):
182  url_updated = URLUpdate(site.id, input_url)
183  url_updated.status = URL.STATUS_SELECTED_CRAWLING
184  # url_updated.type = URL.TYPE_SINGLE
185  file_name = "url_" + str(url_updated.urlMd5) + ".json"
186  open(file_name, "w").write("[" + url_updated.toJSON() + "]")
187  cmd = "./dc-client.py --config=../ini/dc-client.ini --command=URL_UPDATE --file=./%s" % file_name
188  process = Popen(cmd, stdout=PIPE, stdin=PIPE, shell=True, close_fds=True)
189  (output, err) = process.communicate()
190  exit_code = process.wait()
191  open("dc-client_update_url_output.txt", "w").write(output)
192  return url_updated
193 
194 
195 def updateSite(site):
196  site_updated = SiteUpdate(site.id)
197  site_updated.state = Site.STATE_ACTIVE
198  file_name = "updated_site_" + str(site_updated.id) + ".json"
199  open(file_name, "w").write(site_updated.toJSON())
200  cmd = "./dc-client.py --config=../ini/dc-client.ini --command=SITE_UPDATE --file=./%s" % file_name
201  process = Popen(cmd, stdout=PIPE, stdin=PIPE, shell=True, close_fds=True)
202  (output, err) = process.communicate()
203  exit_code = process.wait()
204  open("dc-client_update_site_output.txt", "w").write(output)
205 
206 
208  # input_url = sys.stdin.read()[0:-1]
209  for input_url in sys.stdin:
210  input_url = input_url.strip()
211  logger.debug(input_url)
212  # site_url = "http://" + urlparse(urllib.unquote(input_url).decode('utf8')).hostname
213  # site = createSiteObj(site_url)
214  site = createSiteObj(input_url)
215  open(site.id, "w").write(input_url)
216  url = createURLObj(site, input_url)
217  addSite(site)
218  addURL(url, site)
219  # updateURL(input_url, site)
220  # updateSite(site)
221  time.sleep(1)
222  # bItem = BatchItem(site.id, url_updated.urlMd5)
223  bItem = BatchItem(site.id, url.urlMd5, url)
224  url_list = [bItem]
225  input_object = Batch(11, url_list)
226  input_pickled_object = pickle.dumps(input_object)
227  print input_pickled_object
228 
229 
230 if __name__ == "__main__":
231  # loadDBBackend()
232  # generateTemplates()
233  processBatch()
def executeQuery(db_connector, query)
def updateURL(input_url, site)
def createURLObj(site, input_url)