HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
dc_db.URLFetchTask.URLFetchTask Class Reference
Inheritance diagram for dc_db.URLFetchTask.URLFetchTask:
Collaboration diagram for dc_db.URLFetchTask.URLFetchTask:

Public Member Functions

def __init__ (self, keyValueStorageDir, rawDataDir, dBDataTask, dcSiteTemplate, keyValueDefaultFile, dcStatTemplate, dcLogTemplate, mutexLockTTL=Constants.DEFAULT_LOCK_TTL)
 
def process (self, urlFetches, queryCallback)
 
def processProportial (self, urlFetch, uRLUpdateTask, queryCallback)
 
def fillCriterionLimits (self, urlFetch, offset)
 
def processSimple (self, urlFetch, uRLUpdateTask, queryCallback)
 
def updateUrl (self, urls, urlFetch, uRLUpdateTask, queryCallback)
 
def fillSiteListSQLGenerate (self, sitesCriterions, userId=None)
 
def fillSiteList (self, incomeSiteList, queryCallback, sitesCriterions, userId=None)
 
def getURLFromURLTable (self, urlFetch, globalLen, queryCallback)
 
def execAdditionalSQLs (self, sqls, siteId, queryCallback)
 
- Public Member Functions inherited from dc_db.BaseTask.BaseTask
def isSiteExist (self, siteId, queryCallback, userId=None)
 
def generateCriterionSQL (self, criterions, additionWhere=None, siteId=None)
 
def fetchByCriterions (self, criterions, queryCallback)
 
def dbLock (self, mutexName, queryCallback, sleepTime=1, mutexLockTTL=Constants.DEFAULT_LOCK_TTL)
 
def dbUnlock (self, mutexName, queryCallback)
 
def createUrlsInsertQuery (self, siteId, localKeys, localValues)
 
def copyUrlsToDcUrls (self, siteId, queryCallback)
 
def statisticLogUpdate (self, localObj, urlMd5, siteId, status, queryCallback, isInsert=False)
 
def calculateMd5FormUrl (self, url, urlType, useNormilize=False)
 

Static Public Member Functions

def fillUrlObj (row)
 
- Static Public Member Functions inherited from dc_db.BaseTask.BaseTask
def readValueFromSiteProp (siteId, propName, queryCallback, urlMd5=None)
 

Public Attributes

 quantMaxUrls
 
 localSiteList
 
 mutexLockTTL
 
 uRLUpdateTask
 
 siteUpdateTask
 

Detailed Description

Definition at line 24 of file URLFetchTask.py.

Constructor & Destructor Documentation

◆ __init__()

def dc_db.URLFetchTask.URLFetchTask.__init__ (   self,
  keyValueStorageDir,
  rawDataDir,
  dBDataTask,
  dcSiteTemplate,
  keyValueDefaultFile,
  dcStatTemplate,
  dcLogTemplate,
  mutexLockTTL = Constants.DEFAULT_LOCK_TTL 
)

Definition at line 30 of file URLFetchTask.py.

30  dcStatTemplate, dcLogTemplate, mutexLockTTL=Constants.DEFAULT_LOCK_TTL):
31  super(URLFetchTask, self).__init__()
32  self.quantMaxUrls = 0
33  self.localSiteList = []
34  self.mutexLockTTL = mutexLockTTL
35  self.uRLUpdateTask = URLUpdateTask(keyValueStorageDir, rawDataDir, dBDataTask)
36  self.siteUpdateTask = SiteUpdateTask(dcSiteTemplate, keyValueDefaultFile, keyValueStorageDir, rawDataDir,
37  dBDataTask, dcStatTemplate, dcLogTemplate, None)
38 
39 
def __init__(self)
constructor
Definition: UIDGenerator.py:19

Member Function Documentation

◆ execAdditionalSQLs()

def dc_db.URLFetchTask.URLFetchTask.execAdditionalSQLs (   self,
  sqls,
  siteId,
  queryCallback 
)

Definition at line 318 of file URLFetchTask.py.

318  def execAdditionalSQLs(self, sqls, siteId, queryCallback):
319  ret = {}
320  for key in sqls:
321  q = sqls[key].replace("%" + Constants.SITE_ID_NAME + "%", str(siteId))
322  r = queryCallback(q, Constants.PRIMARY_DB_ID)
323  if hasattr(r, '__iter__') and len(r) > 0 and r[0][0] != None:
324  ret[key] = r[0][0]
325  else:
326  ret[key] = "NULL"
327 
328  logger.debug("Additional SQLs: %s", str(ret))
329 
330  return ret
331 
Here is the caller graph for this function:

◆ fillCriterionLimits()

def dc_db.URLFetchTask.URLFetchTask.fillCriterionLimits (   self,
  urlFetch,
  offset 
)

Definition at line 126 of file URLFetchTask.py.

126  def fillCriterionLimits(self, urlFetch, offset):
127  if urlFetch.urlsCriterions is None:
128  urlFetch.urlsCriterions = {}
129  urlFetch.urlsCriterions[dc.EventObjects.URLFetch.CRITERION_LIMIT] = [offset, self.quantMaxUrls]
130 
131 
Here is the caller graph for this function:

◆ fillSiteList()

def dc_db.URLFetchTask.URLFetchTask.fillSiteList (   self,
  incomeSiteList,
  queryCallback,
  sitesCriterions,
  userId = None 
)

Definition at line 206 of file URLFetchTask.py.

206  def fillSiteList(self, incomeSiteList, queryCallback, sitesCriterions, userId=None):
207  ret = []
208  newSiteList = []
209  localSiteDict = {}
210  query = self.fillSiteListSQLGenerate(sitesCriterions, userId)
211  logger.debug("!!! query: %s", str(query))
212  res = None
213  try:
214  res = queryCallback(query, Constants.PRIMARY_DB_ID)
215  logger.debug("!!! res: %s", str(res))
216  except mdb.Error, err: # pylint: disable=E1101
217  logger.error("Error: %s", str(err))
218 
219  if res is not None and hasattr(res, '__iter__'):
220  for row in res:
221  localSiteDict[row[0]] = row[0]
222  newSiteList.append(row[0])
223 
224  if len(incomeSiteList) == 0:
225  ret = newSiteList
226  else:
227  newSiteList = []
228  for site in incomeSiteList:
229  if site in localSiteDict:
230  newSiteList.append(site)
231  ret = newSiteList
232 
233  return ret
234 
235 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ fillSiteListSQLGenerate()

def dc_db.URLFetchTask.URLFetchTask.fillSiteListSQLGenerate (   self,
  sitesCriterions,
  userId = None 
)

Definition at line 174 of file URLFetchTask.py.

174  def fillSiteListSQLGenerate(self, sitesCriterions, userId=None):
175  query = Constants.SELECT_SQL_TEMPLATE_SIMPLE % ("`Id`", "sites")
176  additionQueryStr = ""
177  additionWhereCause = ""
178  if userId is not None:
179  additionWhereCause += (Constants.CHECK_TABLE_SQL_ADDITION % str(userId))
180 
181  if sitesCriterions is None:
182  if additionWhereCause != "":
183  additionWhereCause += " AND "
184  additionWhereCause += ("`state`=%d" % dc.EventObjects.Site.STATE_ACTIVE)
185 
186  if sitesCriterions is None:
187  if additionWhereCause != "":
188  additionQueryStr = " WHERE " + additionWhereCause
189  else:
190  if additionWhereCause == "":
191  additionQueryStr = self.generateCriterionSQL(sitesCriterions)
192  else:
193  additionQueryStr = self.generateCriterionSQL(sitesCriterions, additionWhereCause)
194  query += additionQueryStr
195 
196  return query
197 
198 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ fillUrlObj()

def dc_db.URLFetchTask.URLFetchTask.fillUrlObj (   row)
static

Definition at line 241 of file URLFetchTask.py.

241  def fillUrlObj(row):
242  url = dc.EventObjects.URL(row["Site_Id"], row["URL"], normalizeMask=UrlNormalizator.NORM_NONE)
243  for field in Constants.URLTableDict.keys():
244  if hasattr(url, field) and Constants.URLTableDict[field] in row:
245  setattr(url, field, row[Constants.URLTableDict[field]])
246  url.UDate = Constants.readDataTimeField("UDate", row)
247  url.CDate = Constants.readDataTimeField("CDate", row)
248  url.lastModified = Constants.readDataTimeField("LastModified", row)
249  url.tcDate = Constants.readDataTimeField("TcDate", row)
250  url.pDate = Constants.readDataTimeField("PDate", row)
251  return url
252 
253 

◆ getURLFromURLTable()

def dc_db.URLFetchTask.URLFetchTask.getURLFromURLTable (   self,
  urlFetch,
  globalLen,
  queryCallback 
)

Definition at line 259 of file URLFetchTask.py.

259  def getURLFromURLTable(self, urlFetch, globalLen, queryCallback):
260  urls = []
261 
262  for siteId in urlFetch.sitesList:
263  # Update site TcDate to push rotate sites list
264  if urlFetch.siteUpdate is not None:
265  urlFetch.siteUpdate.id = siteId
266  self.siteUpdateTask.process(urlFetch.siteUpdate, queryCallback)
267  logger.debug("Site %s updated", str(siteId))
268 
269  tableName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % siteId
270  # Make criterion
271  additionQueryStr = self.generateCriterionSQL(urlFetch.urlsCriterions, None, siteId)
272  # Execute additional SQLs for criterion
273  if urlFetch.CRITERION_SQL in urlFetch.urlsCriterions and len(urlFetch.urlsCriterions[urlFetch.CRITERION_SQL]) > 0:
274  additionalSQLs = self.execAdditionalSQLs(urlFetch.urlsCriterions[urlFetch.CRITERION_SQL], siteId, queryCallback)
275  else:
276  additionalSQLs = {}
277  # Substitute values from additional SQLs results in criterion
278  for sql in additionalSQLs:
279  additionQueryStr = additionQueryStr.replace("%" + sql + "%", additionalSQLs[sql])
280  # Finally add criterion to query
281  query = Constants.SELECT_SQL_TEMPLATE_SIMPLE % ("*", tableName)
282  if len(additionQueryStr) > 0:
283  query += " "
284  query += additionQueryStr
285 
286  logger.debug('>>> getURLFromURLTable UrlFetchQuery: ' + str(query))
287  # Execute query
288  res = queryCallback(query, Constants.SECONDARY_DB_ID, Constants.EXEC_NAME)
289 
290  if not hasattr(res, '__iter__'):
291  res = []
292 
293  logger.debug('>>> getURLFromURLTable len(res): ' + str(len(res)))
294 
295  # Add urlList truncating
296  # res = self.limitsURLs(siteId, res, queryCallback):
297  if self.quantMaxUrls > 0 and len(res) == self.quantMaxUrls:
298  self.localSiteList.append(siteId)
299 
300  for row in res:
301  if urlFetch.maxURLs is not None and urlFetch.maxURLs != 0 and urlFetch.maxURLs == globalLen:
302  break
303  if "Site_Id" in row and "URL" in row:
304  urls.append(URLFetchTask.fillUrlObj(row))
305  globalLen += 1
306  if urlFetch.maxURLs is not None and urlFetch.maxURLs != 0 and urlFetch.maxURLs == globalLen:
307  break
308 
309  logger.debug("Return urlsFetch count = %s", str(len(urls)))
310  return urls
311 
312 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ process()

def dc_db.URLFetchTask.URLFetchTask.process (   self,
  urlFetches,
  queryCallback 
)

Definition at line 46 of file URLFetchTask.py.

46  def process(self, urlFetches, queryCallback):
47  ret = []
48  needToLock = False
49  try:
50  for urlFetch in urlFetches:
51  needToLock = urlFetch.isLocking
52  if needToLock:
53  self.dbLock(Constants.FETCH_LOCK_NAME, queryCallback, urlFetch.lockIterationTimeout, self.mutexLockTTL)
54  # @todo add more complex case
55  if urlFetch.algorithm is None or urlFetch.algorithm == dc.EventObjects.URLFetch.DEFAULT_ALGORITHM:
56  ret.extend(self.processSimple(urlFetch, self.uRLUpdateTask, queryCallback))
57  elif urlFetch.algorithm == dc.EventObjects.URLFetch.PROPORTIONAL_ALGORITHM:
58  ret.extend(self.processProportial(urlFetch, self.uRLUpdateTask, queryCallback))
59 
60  for url in ret:
61  attributeNames = urlFetch.attributeNames if hasattr(urlFetch, 'attributeNames') else ['*']
62  url.attributes = AttrFetchTask.fetchUrlsAttributesByNames(url.siteId,
63  url.urlMd5,
64  queryCallback,
65  attributeNames)
66 
67  if needToLock:
68  self.dbUnlock(Constants.FETCH_LOCK_NAME, queryCallback)
69  except Exception, err:
70  logger.error('Exception: %s', str(err))
71  if needToLock:
72  self.dbUnlock(Constants.FETCH_LOCK_NAME, queryCallback)
73  raise
74  except mdb.Error, err: # pylint: disable=E1101
75  logger.error('mdb.Error: %s', str(err))
76  if needToLock:
77  self.dbUnlock(Constants.FETCH_LOCK_NAME, queryCallback)
78  raise
79  except:
80  logger.error('Unknown type error')
81  if needToLock:
82  self.dbUnlock(Constants.FETCH_LOCK_NAME, queryCallback)
83  raise
84  return ret
85 
86 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processProportial()

def dc_db.URLFetchTask.URLFetchTask.processProportial (   self,
  urlFetch,
  uRLUpdateTask,
  queryCallback 
)

Definition at line 93 of file URLFetchTask.py.

93  def processProportial(self, urlFetch, uRLUpdateTask, queryCallback):
94  ret = []
95  offset = 0
96  if urlFetch.maxURLs is not None and urlFetch.maxURLs > 0:
97  urlFetch.sitesList = self.fillSiteList(urlFetch.sitesList, queryCallback, urlFetch.sitesCriterions)
98 # look sites from SiteList for clearing (month ... mins) sites limits
99 # self.limitsClearing(urlFetch.sitesList, queryCallback):
100  if len(urlFetch.sitesList) > 0:
101  self.quantMaxUrls = urlFetch.maxURLs / len(urlFetch.sitesList)
102  if self.quantMaxUrls < 1:
103  self.quantMaxUrls = 1
104 
105  while len(ret) < urlFetch.maxURLs and len(urlFetch.sitesList) > 0:
106  self.fillCriterionLimits(urlFetch, offset)
107  self.localSiteList = []
108  logger.debug(">>> Debug-1 = %s %s %s %s",
109  str(urlFetch.urlsCriterions[dc.EventObjects.URLFetch.CRITERION_LIMIT][0]),
110  str(urlFetch.urlsCriterions[dc.EventObjects.URLFetch.CRITERION_LIMIT][1]),
111  str(urlFetch.maxURLs),
112  str(self.quantMaxUrls))
113  ret.extend(self.getURLFromURLTable(urlFetch, len(ret), queryCallback))
114  urlFetch.sitesList = self.localSiteList
115  offset += self.quantMaxUrls
116  else:
117  logger.debug(">>> UrlFetch proportional >>> Empty SiteList")
118  self.updateUrl(ret, urlFetch, uRLUpdateTask, queryCallback)
119  return ret
120 
121 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ processSimple()

def dc_db.URLFetchTask.URLFetchTask.processSimple (   self,
  urlFetch,
  uRLUpdateTask,
  queryCallback 
)

Definition at line 138 of file URLFetchTask.py.

138  def processSimple(self, urlFetch, uRLUpdateTask, queryCallback):
139  urlFetch.sitesList = self.fillSiteList(urlFetch.sitesList, queryCallback, urlFetch.sitesCriterions)
140 # look sites from SiteList for clearing (month ... mins) sites limits
141 # self.limitsClearing(urlFetch.sitesList, queryCallback):
142  ret = self.getURLFromURLTable(urlFetch, 0, queryCallback)
143 
144  self.updateUrl(ret, urlFetch, uRLUpdateTask, queryCallback)
145  return ret
146 
147 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ updateUrl()

def dc_db.URLFetchTask.URLFetchTask.updateUrl (   self,
  urls,
  urlFetch,
  uRLUpdateTask,
  queryCallback 
)

Definition at line 155 of file URLFetchTask.py.

155  def updateUrl(self, urls, urlFetch, uRLUpdateTask, queryCallback):
156  if len(urls) > 0:
157  if urlFetch.urlUpdate is not None:
158  for localURL in urls:
159  if localURL.urlMd5 is not None:
160  urlUpdateList = []
161  urlUpdateList.append(urlFetch.urlUpdate)
162  urlUpdateList[0].urlMd5 = localURL.urlMd5
163  urlUpdateList[0].siteId = localURL.siteId
164  uRLUpdateTask.process(urlUpdateList, queryCallback)
165  if urlFetch.urlUpdate.status is not None:
166  localURL.status = urlFetch.urlUpdate.status
167 
168 
Here is the caller graph for this function:

Member Data Documentation

◆ localSiteList

dc_db.URLFetchTask.URLFetchTask.localSiteList

Definition at line 33 of file URLFetchTask.py.

◆ mutexLockTTL

dc_db.URLFetchTask.URLFetchTask.mutexLockTTL

Definition at line 34 of file URLFetchTask.py.

◆ quantMaxUrls

dc_db.URLFetchTask.URLFetchTask.quantMaxUrls

Definition at line 32 of file URLFetchTask.py.

◆ siteUpdateTask

dc_db.URLFetchTask.URLFetchTask.siteUpdateTask

Definition at line 36 of file URLFetchTask.py.

◆ uRLUpdateTask

dc_db.URLFetchTask.URLFetchTask.uRLUpdateTask

Definition at line 35 of file URLFetchTask.py.


The documentation for this class was generated from the following file: