HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
URLFetchTask.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author igor
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 import MySQLdb as mdb
11 from app.Utils import UrlNormalizator
12 import app.Utils as Utils # pylint: disable=F0401
13 from dc_db.URLUpdateTask import URLUpdateTask
14 from dc_db.SiteUpdateTask import SiteUpdateTask
15 from dc_db.BaseTask import BaseTask
16 from dc_db.AttrFetchTask import AttrFetchTask
17 import dc_db.Constants as Constants
18 import dc.EventObjects
19 
20 logger = Utils.MPLogger().getLogger()
21 
22 
23 # #process urlFetch event
25 
26 
27  # #constructor
28  #
29  def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask, dcSiteTemplate, keyValueDefaultFile,
30  dcStatTemplate, dcLogTemplate, mutexLockTTL=Constants.DEFAULT_LOCK_TTL):
31  super(URLFetchTask, self).__init__()
32  self.quantMaxUrls = 0
33  self.localSiteList = []
34  self.mutexLockTTL = mutexLockTTL
35  self.uRLUpdateTask = URLUpdateTask(keyValueStorageDir, rawDataDir, dBDataTask)
36  self.siteUpdateTask = SiteUpdateTask(dcSiteTemplate, keyValueDefaultFile, keyValueStorageDir, rawDataDir,
37  dBDataTask, dcStatTemplate, dcLogTemplate, None)
38 
39 
40  # #make all necessary actions to fetch urls data from db,
41  # also locks db mutex (if locking ) between URLFetch operation
42  #
43  # @param urlFetches list of URLFetch objects
44  # @param queryCallback function for queries execution
45  # @return list of URL objects
46  def process(self, urlFetches, queryCallback):
47  ret = []
48  needToLock = False
49  try:
50  for urlFetch in urlFetches:
51  needToLock = urlFetch.isLocking
52  if needToLock:
53  self.dbLock(Constants.FETCH_LOCK_NAME, queryCallback, urlFetch.lockIterationTimeout, self.mutexLockTTL)
54  # @todo add more complex case
55  if urlFetch.algorithm is None or urlFetch.algorithm == dc.EventObjects.URLFetch.DEFAULT_ALGORITHM:
56  ret.extend(self.processSimple(urlFetch, self.uRLUpdateTask, queryCallback))
57  elif urlFetch.algorithm == dc.EventObjects.URLFetch.PROPORTIONAL_ALGORITHM:
58  ret.extend(self.processProportial(urlFetch, self.uRLUpdateTask, queryCallback))
59 
60  for url in ret:
61  attributeNames = urlFetch.attributeNames if hasattr(urlFetch, 'attributeNames') else ['*']
62  url.attributes = AttrFetchTask.fetchUrlsAttributesByNames(url.siteId,
63  url.urlMd5,
64  queryCallback,
65  attributeNames)
66 
67  if needToLock:
68  self.dbUnlock(Constants.FETCH_LOCK_NAME, queryCallback)
69  except Exception, err:
70  logger.error('Exception: %s', str(err))
71  if needToLock:
72  self.dbUnlock(Constants.FETCH_LOCK_NAME, queryCallback)
73  raise
74  except mdb.Error, err: # pylint: disable=E1101
75  logger.error('mdb.Error: %s', str(err))
76  if needToLock:
77  self.dbUnlock(Constants.FETCH_LOCK_NAME, queryCallback)
78  raise
79  except:
80  logger.error('Unknown type error')
81  if needToLock:
82  self.dbUnlock(Constants.FETCH_LOCK_NAME, queryCallback)
83  raise
84  return ret
85 
86 
87  # #processProportial propostial algo implementing
88  #
89  # @param urlFetch incoming UrlFetch object
90  # @param queryCallback function for queries execution
91  # @param uRLUpdateTask incoming URLUpdateTask object
92  # @return list of site_ids object
93  def processProportial(self, urlFetch, uRLUpdateTask, queryCallback):
94  ret = []
95  offset = 0
96  if urlFetch.maxURLs is not None and urlFetch.maxURLs > 0:
97  urlFetch.sitesList = self.fillSiteList(urlFetch.sitesList, queryCallback, urlFetch.sitesCriterions)
98 # look sites from SiteList for clearing (month ... mins) sites limits
99 # self.limitsClearing(urlFetch.sitesList, queryCallback):
100  if len(urlFetch.sitesList) > 0:
101  self.quantMaxUrls = urlFetch.maxURLs / len(urlFetch.sitesList)
102  if self.quantMaxUrls < 1:
103  self.quantMaxUrls = 1
104 
105  while len(ret) < urlFetch.maxURLs and len(urlFetch.sitesList) > 0:
106  self.fillCriterionLimits(urlFetch, offset)
107  self.localSiteList = []
108  logger.debug(">>> Debug-1 = %s %s %s %s",
109  str(urlFetch.urlsCriterions[dc.EventObjects.URLFetch.CRITERION_LIMIT][0]),
110  str(urlFetch.urlsCriterions[dc.EventObjects.URLFetch.CRITERION_LIMIT][1]),
111  str(urlFetch.maxURLs),
112  str(self.quantMaxUrls))
113  ret.extend(self.getURLFromURLTable(urlFetch, len(ret), queryCallback))
114  urlFetch.sitesList = self.localSiteList
115  offset += self.quantMaxUrls
116  else:
117  logger.debug(">>> UrlFetch proportional >>> Empty SiteList")
118  self.updateUrl(ret, urlFetch, uRLUpdateTask, queryCallback)
119  return ret
120 
121 
122  # #fillCriterionLimits calculates new limit and insert ot the urlsCriterions
123  #
124  # @param urlFetch incoming UrlFetch object
125  # @return list of site_ids object
126  def fillCriterionLimits(self, urlFetch, offset):
127  if urlFetch.urlsCriterions is None:
128  urlFetch.urlsCriterions = {}
129  urlFetch.urlsCriterions[dc.EventObjects.URLFetch.CRITERION_LIMIT] = [offset, self.quantMaxUrls]
130 
131 
132  # #processSimple simple algo implementing
133  #
134  # @param urlFetch incoming UrlFetch object
135  # @param queryCallback function for queries execution
136  # @param uRLUpdateTask incoming URLUpdateTask object
137  # @return list of site_ids object
138  def processSimple(self, urlFetch, uRLUpdateTask, queryCallback):
139  urlFetch.sitesList = self.fillSiteList(urlFetch.sitesList, queryCallback, urlFetch.sitesCriterions)
140 # look sites from SiteList for clearing (month ... mins) sites limits
141 # self.limitsClearing(urlFetch.sitesList, queryCallback):
142  ret = self.getURLFromURLTable(urlFetch, 0, queryCallback)
143 
144  self.updateUrl(ret, urlFetch, uRLUpdateTask, queryCallback)
145  return ret
146 
147 
148  # #updateUrl updates urls from urls list
149  #
150  # @param urls incoming urls list
151  # @param urlFetch incoming UrlFetch object
152  # @param queryCallback function for queries execution
153  # @param uRLUpdateTask incoming URLUpdateTask object
154  # @return list of site_ids object
155  def updateUrl(self, urls, urlFetch, uRLUpdateTask, queryCallback):
156  if len(urls) > 0:
157  if urlFetch.urlUpdate is not None:
158  for localURL in urls:
159  if localURL.urlMd5 is not None:
160  urlUpdateList = []
161  urlUpdateList.append(urlFetch.urlUpdate)
162  urlUpdateList[0].urlMd5 = localURL.urlMd5
163  urlUpdateList[0].siteId = localURL.siteId
164  uRLUpdateTask.process(urlUpdateList, queryCallback)
165  if urlFetch.urlUpdate.status is not None:
166  localURL.status = urlFetch.urlUpdate.status
167 
168 
169  # #Generates SQL String request for using in the fillSiteList method
170  #
171  # @param sitesCriterions - income sitesCriterions field
172  # @param userId - income userId field
173  # @return query string
174  def fillSiteListSQLGenerate(self, sitesCriterions, userId=None):
175  query = Constants.SELECT_SQL_TEMPLATE_SIMPLE % ("`Id`", "sites")
176  additionQueryStr = ""
177  additionWhereCause = ""
178  if userId is not None:
179  additionWhereCause += (Constants.CHECK_TABLE_SQL_ADDITION % str(userId))
180 
181  if sitesCriterions is None:
182  if additionWhereCause != "":
183  additionWhereCause += " AND "
184  additionWhereCause += ("`state`=%d" % dc.EventObjects.Site.STATE_ACTIVE)
185 
186  if sitesCriterions is None:
187  if additionWhereCause != "":
188  additionQueryStr = " WHERE " + additionWhereCause
189  else:
190  if additionWhereCause == "":
191  additionQueryStr = self.generateCriterionSQL(sitesCriterions)
192  else:
193  additionQueryStr = self.generateCriterionSQL(sitesCriterions, additionWhereCause)
194  query += additionQueryStr
195 
196  return query
197 
198 
199  # #get id from sites
200  #
201  # @parama incomeSiteList - income (from event) site list
202  # @param queryCallback function for queries execution
203  # @param sitesCriterions - income sitesCriterions field
204  # @param userId - income userId field
205  # @return list of site_ids object
206  def fillSiteList(self, incomeSiteList, queryCallback, sitesCriterions, userId=None):
207  ret = []
208  newSiteList = []
209  localSiteDict = {}
210  query = self.fillSiteListSQLGenerate(sitesCriterions, userId)
211  logger.debug("!!! query: %s", str(query))
212  res = None
213  try:
214  res = queryCallback(query, Constants.PRIMARY_DB_ID)
215  logger.debug("!!! res: %s", str(res))
216  except mdb.Error, err: # pylint: disable=E1101
217  logger.error("Error: %s", str(err))
218 
219  if res is not None and hasattr(res, '__iter__'):
220  for row in res:
221  localSiteDict[row[0]] = row[0]
222  newSiteList.append(row[0])
223 
224  if len(incomeSiteList) == 0:
225  ret = newSiteList
226  else:
227  newSiteList = []
228  for site in incomeSiteList:
229  if site in localSiteDict:
230  newSiteList.append(site)
231  ret = newSiteList
232 
233  return ret
234 
235 
236  # #static fillUrlObj method, create and fills URL object
237  #
238  # @param row db select row with URL fields
239  # @return URL object
240  @staticmethod
241  def fillUrlObj(row):
242  url = dc.EventObjects.URL(row["Site_Id"], row["URL"], normalizeMask=UrlNormalizator.NORM_NONE)
243  for field in Constants.URLTableDict.keys():
244  if hasattr(url, field) and Constants.URLTableDict[field] in row:
245  setattr(url, field, row[Constants.URLTableDict[field]])
246  url.UDate = Constants.readDataTimeField("UDate", row)
247  url.CDate = Constants.readDataTimeField("CDate", row)
248  url.lastModified = Constants.readDataTimeField("LastModified", row)
249  url.tcDate = Constants.readDataTimeField("TcDate", row)
250  url.pDate = Constants.readDataTimeField("PDate", row)
251  return url
252 
253 
254  # #get URL from URL db
255  #
256  # @param urlFetch instance of URLFetch object
257  # @param queryCallback function for queries execution
258  # @return list of URL object
259  def getURLFromURLTable(self, urlFetch, globalLen, queryCallback):
260  urls = []
261 
262  for siteId in urlFetch.sitesList:
263  # Update site TcDate to push rotate sites list
264  if urlFetch.siteUpdate is not None:
265  urlFetch.siteUpdate.id = siteId
266  self.siteUpdateTask.process(urlFetch.siteUpdate, queryCallback)
267  logger.debug("Site %s updated", str(siteId))
268 
269  tableName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % siteId
270  # Make criterion
271  additionQueryStr = self.generateCriterionSQL(urlFetch.urlsCriterions, None, siteId)
272  # Execute additional SQLs for criterion
273  if urlFetch.CRITERION_SQL in urlFetch.urlsCriterions and len(urlFetch.urlsCriterions[urlFetch.CRITERION_SQL]) > 0:
274  additionalSQLs = self.execAdditionalSQLs(urlFetch.urlsCriterions[urlFetch.CRITERION_SQL], siteId, queryCallback)
275  else:
276  additionalSQLs = {}
277  # Substitute values from additional SQLs results in criterion
278  for sql in additionalSQLs:
279  additionQueryStr = additionQueryStr.replace("%" + sql + "%", additionalSQLs[sql])
280  # Finally add criterion to query
281  query = Constants.SELECT_SQL_TEMPLATE_SIMPLE % ("*", tableName)
282  if len(additionQueryStr) > 0:
283  query += " "
284  query += additionQueryStr
285 
286  logger.debug('>>> getURLFromURLTable UrlFetchQuery: ' + str(query))
287  # Execute query
288  res = queryCallback(query, Constants.SECONDARY_DB_ID, Constants.EXEC_NAME)
289 
290  if not hasattr(res, '__iter__'):
291  res = []
292 
293  logger.debug('>>> getURLFromURLTable len(res): ' + str(len(res)))
294 
295  # Add urlList truncating
296  # res = self.limitsURLs(siteId, res, queryCallback):
297  if self.quantMaxUrls > 0 and len(res) == self.quantMaxUrls:
298  self.localSiteList.append(siteId)
299 
300  for row in res:
301  if urlFetch.maxURLs is not None and urlFetch.maxURLs != 0 and urlFetch.maxURLs == globalLen:
302  break
303  if "Site_Id" in row and "URL" in row:
304  urls.append(URLFetchTask.fillUrlObj(row))
305  globalLen += 1
306  if urlFetch.maxURLs is not None and urlFetch.maxURLs != 0 and urlFetch.maxURLs == globalLen:
307  break
308 
309  logger.debug("Return urlsFetch count = %s", str(len(urls)))
310  return urls
311 
312 
313  # #get URL from URL db
314  #
315  # @param urlFetch instance of URLFetch object
316  # @param queryCallback function for queries execution
317  # @return list of URL object
318  def execAdditionalSQLs(self, sqls, siteId, queryCallback):
319  ret = {}
320  for key in sqls:
321  q = sqls[key].replace("%" + Constants.SITE_ID_NAME + "%", str(siteId))
322  r = queryCallback(q, Constants.PRIMARY_DB_ID)
323  if hasattr(r, '__iter__') and len(r) > 0 and r[0][0] != None:
324  ret[key] = r[0][0]
325  else:
326  ret[key] = "NULL"
327 
328  logger.debug("Additional SQLs: %s", str(ret))
329 
330  return ret
def fillSiteList(self, incomeSiteList, queryCallback, sitesCriterions, userId=None)
def fillCriterionLimits(self, urlFetch, offset)
def processProportial(self, urlFetch, uRLUpdateTask, queryCallback)
Definition: URLFetchTask.py:93
def processSimple(self, urlFetch, uRLUpdateTask, queryCallback)
def dbLock(self, mutexName, queryCallback, sleepTime=1, mutexLockTTL=Constants.DEFAULT_LOCK_TTL)
Definition: BaseTask.py:73
def execAdditionalSQLs(self, sqls, siteId, queryCallback)
def updateUrl(self, urls, urlFetch, uRLUpdateTask, queryCallback)
def process(self, urlFetches, queryCallback)
Definition: URLFetchTask.py:46
def getURLFromURLTable(self, urlFetch, globalLen, queryCallback)
def fillSiteListSQLGenerate(self, sitesCriterions, userId=None)
def dbUnlock(self, mutexName, queryCallback)
Definition: BaseTask.py:88
def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask, dcSiteTemplate, keyValueDefaultFile, dcStatTemplate, dcLogTemplate, mutexLockTTL=Constants.DEFAULT_LOCK_TTL)
Definition: URLFetchTask.py:30
def generateCriterionSQL(self, criterions, additionWhere=None, siteId=None)
Definition: BaseTask.py:46