HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
prepairer_regular_urls.py
Go to the documentation of this file.
1 #!/usr/bin/python
2 
3 
4 """
5 HCE project, Python bindings, Distributed Tasks Manager application.
6 Event objects definitions.
7 
8 @package: dc
9 @file prepairer.py
10 @author Oleksii <developers.hce@gmail.com>
11 @link: http://hierarchical-cluster-engine.com/
12 @copyright: Copyright &copy; 2013-2014 IOIX Ukraine
13 @license: http://hierarchical-cluster-engine.com/license/
14 @since: 0.1
15 """
16 
17 import ppath
18 from ppath import sys
19 
20 import md5
21 import time
22 import logging
23 from subprocess import Popen
24 from subprocess import PIPE
25 import MySQLdb as mdb
26 import MySQLdb.cursors
27 from contextlib import closing
28 
29 import pickle
30 import urllib
31 from urlparse import urlparse
32 from dc.EventObjects import Batch
33 from dc.EventObjects import BatchItem
34 from dc.EventObjects import Site
35 from dc.EventObjects import SiteUpdate
36 from dc.EventObjects import URL
37 from dc.EventObjects import URLUpdate
38 from dc.EventObjects import SiteFilter
39 
40 logging.basicConfig(filename="prepairer.log", filemode="w")
41 logger = logging.getLogger("Prepairer")
42 logger.setLevel("DEBUG")
43 
44 db_connector = None
45 dc_sites_db_connect = None
46 dc_urls_db_connect = None
47 
48 
49 def executeQuery(db_connector, query):
50  try:
51  with closing(db_connector.cursor(MySQLdb.cursors.DictCursor)) as cursor:
52  cursor.execute(query)
53  db_connector.commit()
54  return cursor.fetchall()
55  except mdb.Error as err: # @todo logging in db_task
56  db_connector.rollback()
57  raise
58 
59 
61  global db_connector
62  global dc_sites_db_connect
63  global dc_urls_db_connect
64 
65  dbHost = "127.0.0.1"
66  dbPort = 3306
67  dbUser = "hce"
68  dbPWD = "hce12345"
69 
70  db_dc_sites = "dc_sites"
71  db_dc_urls = "dc_urls"
72 
73  dc_sites_db_connect = mdb.connect(dbHost, dbUser, dbPWD, db_dc_sites, dbPort)
74  dc_urls_db_connect = mdb.connect(dbHost, dbUser, dbPWD, db_dc_urls, dbPort)
75 
76 
77 def createSiteObj(input_url):
78  site = Site(input_url)
79  # site.urls = []
80  # site.maxResources = 5
81  # site.maxURLs = 5
82  # site.state = Site.STATE_SUSPENDED
83  # site.filters = [SiteFilter(site.id, "(.*)")]
84  return site
85 
86 
87 def createURLObj(site, input_url):
88  url = URL(site.id, input_url)
89  url.status = URL.STATUS_SELECTED_CRAWLING
90  url.type = URL.TYPE_SINGLE
91  return url
92 
93 
94 def addSite(site):
95  file_name = "site_" + str(site.id) + ".json"
96  open(file_name, "w").write(site.toJSON())
97  cmd = "./dc-client.py --config=../ini/dc-client.ini --command=SITE_NEW --file=./%s" % file_name
98  process = Popen(cmd, stdout=PIPE, stdin=PIPE, shell=True, close_fds=True)
99  (output, err) = process.communicate()
100  exit_code = process.wait()
101  open("dc-client_new_site_output.txt", "w").write(output)
102  return exit_code
103 
104 
105 def addURL(url, site):
106  file_name = "url_" + str(site.id) + ".json"
107  open(file_name, "w").write("[" + url.toJSON() + "]")
108  cmd = "./dc-client.py --config=../ini/dc-client.ini --command=URL_NEW --file=./%s" % file_name
109  process = Popen(cmd, stdout=PIPE, stdin=PIPE, shell=True, close_fds=True)
110  (output, err) = process.communicate()
111  exit_code = process.wait()
112  open("dc-client_new_url_output.txt", "w").write(output)
113  return exit_code
114 
115 
116 def updateURL(input_url, site):
117  url_updated = URLUpdate(site.id, input_url)
118  url_updated.status = URL.STATUS_SELECTED_CRAWLING
119  url_updated.type = URL.TYPE_SINGLE
120  file_name = "url_" + str(url_updated.urlMd5) + ".json"
121  open(file_name, "w").write("[" + url_updated.toJSON() + "]")
122  cmd = "./dc-client.py --config=../ini/dc-client.ini --command=URL_UPDATE --file=./%s" % file_name
123  process = Popen(cmd, stdout=PIPE, stdin=PIPE, shell=True, close_fds=True)
124  (output, err) = process.communicate()
125  exit_code = process.wait()
126  open("dc-client_update_url_output.txt", "w").write(output)
127  return url_updated
128 
129 
130 def updateSite(site):
131  site_updated = SiteUpdate(site.id)
132  site_updated.state = Site.STATE_ACTIVE
133  file_name = "updated_site_" + str(site_updated.id) + ".json"
134  open(file_name, "w").write(site_updated.toJSON())
135  cmd = "./dc-client.py --config=../ini/dc-client.ini --command=SITE_UPDATE --file=./%s" % file_name
136  process = Popen(cmd, stdout=PIPE, stdin=PIPE, shell=True, close_fds=True)
137  (output, err) = process.communicate()
138  exit_code = process.wait()
139  open("dc-client_update_site_output.txt", "w").write(output)
140 
141 
143  # input_url = sys.stdin.read()[0:-1]
144  for input_url in sys.stdin:
145  # input_url = input_url.strip()
146  logger.debug(input_url)
147  # site_url = "http://" + urlparse(urllib.unquote(input_url).decode('utf8')).hostname
148  # site = createSiteObj(site_url)
149  site = createSiteObj(input_url)
150  open(site.id, "w").write(input_url)
151  url = createURLObj(site, input_url)
152  addSite(site)
153  # addURL(url, site)
154  # updateURL(input_url, site)
155  # updateSite(site)
156  time.sleep(1)
157  # bItem = BatchItem(site.id, url_updated.urlMd5)
158  bItem = BatchItem(site.id, site.id, url)
159  url_list = [bItem]
160  input_object = Batch(11, url_list)
161  input_pickled_object = pickle.dumps(input_object)
162  print input_pickled_object
163 
164 
165 if __name__ == "__main__":
166  loadDBBackend()
167  processBatch()
def updateURL(input_url, site)
def createURLObj(site, input_url)
def executeQuery(db_connector, query)