HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
URLNewTask.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author igor
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 import hashlib
10 from dtm.EventObjects import GeneralResponse
11 from dc_db.BaseTask import BaseTask
12 from dc_db.URLUpdateTask import URLUpdateTask
13 from dc_db.AttrSetTask import AttrSetTask
14 from dc_db.FieldRecalculator import FieldRecalculator
15 import dc_db.Constants as Constants
16 import dc.EventObjects
17 import app.Exceptions
18 from app.Utils import varDump
19 import app.Utils as Utils # pylint: disable=F0401
20 
21 logger = Utils.MPLogger().getLogger()
22 
23 # #process list of URLs
25 
26  CODE_GOOD_INSERT = 0
27  CODE_BAD_INSERT = 1
28  CODE_ALREADY_EXIST = 2
29 
30  # #constructor
31  #
32  def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask, siteTask=None):
33  super(URLNewTask, self).__init__()
34  self.siteTask = siteTask
36  self.urlMd5 = None
37  self.urlUpdateTask = URLUpdateTask(keyValueStorageDir, rawDataDir, dBDataTask)
38 
39 
40  # Memethod creates new site, using SiteTask class
41  #
42  # @param initUrl base site url
43  # @param queryCallback function for queries execution
44  def newSiteCreate(self, initUrl, queryCallback):
45  if self.siteTask is None:
46  raise Exception(">>> URLNew.siteTask object is None!")
47  localSiteObj = dc.EventObjects.Site(initUrl)
48  self.siteTask.process(localSiteObj, queryCallback)
49 
50 
51  # Memethod makes operation with "site" database table
52  #
53  # @param urlObject instance of URL object
54  # @param queryCallback function for queries execution
55  def fillSiteRelatedFields(self, urlObj, queryCallback):
56  ret = False
57  SITE_EXTRACT_SQL_QUERY = "SELECT `RequestDelay`, `HTTPTimeout`, `URLType` FROM `sites` WHERE id = '%s'"
58  res = queryCallback(SITE_EXTRACT_SQL_QUERY % urlObj.siteId, Constants.PRIMARY_DB_ID)
59  if hasattr(res, '__iter__') and len(res) > 0:
60  ret = True
61  if urlObj.requestDelay is None:
62  urlObj.requestDelay = res[0][0]
63  if urlObj.httpTimeout is None:
64  urlObj.httpTimeout = res[0][1]
65  if urlObj.type is None:
66  urlObj.type = res[0][2]
67  return ret
68 
69 
70  # Try to extract siteId from MySQL database, based on incoming url field
71  #
72  # @param url - URL.URL field of incoming object
73  # @param queryCallback function for queries execution
74  def resolveSiteIdByURL(self, url, queryCallback):
75  ret = None
76  query = Constants.SELECT_SITE_ID_BY_URL % url
77  res = queryCallback(query, Constants.PRIMARY_DB_ID)
78  if hasattr(res, '__iter__') and len(res) > 0:
79  ret = res[0][0]
80  logger.debug(">>> Site_Id By URL = %s", str(ret))
81  return ret
82 
83 
84  def calcSiteIdByUrl(self, url):
85  canonicUrl = Utils.UrlParser.generateDomainUrl(url)
86  if canonicUrl is not None and len(canonicUrl) > 0 and canonicUrl[-1] != '/':
87  canonicUrl += '/'
88  localSiteId = hashlib.md5(canonicUrl).hexdigest()
89  return localSiteId
90 
91 
92  # Memethod makes operation with "site" database table
93  #
94  # @param urlObject instance of URL object
95  # @param queryCallback function for queries execution
96  def siteTableOperation(self, urlObj, queryCallback):
97  if urlObj.siteSelect == dc.EventObjects.URL.SITE_SELECT_TYPE_EXPLICIT:
98  if urlObj.siteId == "" or not self.isSiteExist(urlObj.siteId, queryCallback):
99  urlObj.siteId = "0"
100  elif urlObj.siteSelect == dc.EventObjects.URL.SITE_SELECT_TYPE_AUTO:
101  try:
102  canonicUrl = Utils.UrlParser.generateDomainUrl(urlObj.url)
103  if canonicUrl is not None and len(canonicUrl) > 0 and canonicUrl[-1] != '/':
104  canonicUrl += '/'
105  localSiteId = self.calcSiteIdByUrl(urlObj.url)
106  logger.debug(">>> S_NEW_ID=" + str(localSiteId))
107  if self.isSiteExist(localSiteId, queryCallback):
108  urlObj.siteId = localSiteId
109  self.fillSiteRelatedFields(urlObj, queryCallback)
110  elif canonicUrl is not None:
111  self.newSiteCreate(canonicUrl, queryCallback)
112  urlObj.siteId = localSiteId
113  else:
114  raise Exception(">>> canonicUrl is None !!!")
116  logger.debug(">>> UrlParseException")
117  elif urlObj.siteSelect == dc.EventObjects.URL.SITE_SELECT_TYPE_QUALIFY_URL:
118  localSiteId = self.calcSiteIdByUrl(urlObj.url)
119  if not self.isSiteExist(localSiteId, queryCallback):
120  urlObj.siteId = "0"
121  elif urlObj.siteSelect == dc.EventObjects.URL.SITE_SELECT_TYPE_NONE:
122  localSiteId = self.calcSiteIdByUrl(urlObj.url)
123  if not self.isSiteExist(localSiteId, queryCallback):
124  Exception(">>> urlObj operation can't find siteId")
125  else:
126  raise Exception(">>> urlObj.siteSelect field has wrong value - %s" % str(urlObj.siteSelect))
127 
128 
129  # #make all necessary actions to add new URLs into mysql db
130  #
131  # @param urls list of URL objects
132  # @param queryCallback function for queries execution
133  # @return generalResponse instance of GeneralResponse object
134  def process(self, urls, queryCallback):
135  ret = GeneralResponse()
136  status = URLNewTask.CODE_BAD_INSERT
137  isRelatedSite = False
138  for url in urls:
139  isRelatedSite = False
140  if url.siteId is None and url.siteSelect != dc.EventObjects.URL.SITE_SELECT_TYPE_EXPLICIT:
141  url.siteId = self.resolveSiteIdByURL(url.url, queryCallback)
142  if self.isSiteExist(url.siteId, queryCallback):
143  isRelatedSite = self.fillSiteRelatedFields(url, queryCallback)
144  logger.debug(">>> Url New main = " + url.url)
145  try:
146  if not isRelatedSite:
147  logger.debug(">>> Url New before = " + url.url)
148  self.siteTableOperation(url, queryCallback)
149  logger.debug(">>> Site_Id By URL = %s", str(url.url))
150  logger.debug(">>> Url New after = " + url.url)
151  if url.siteId is not None and url.siteId != "":
152  status = self.urlInsertWithGoodSietId(url, status, queryCallback)
153  except Exception as excp:
154  logger.debug(">>> Url New operation exception = " + str(excp))
155  ret.statuses.append(status)
156  return ret
157 
158 
159  # #decomposition block code in urlInsertWithGoodSietId method
160  #
161  # @param urlObj - incoming URL object
162  # @param statusInit incoming init value for return status code
163  # @param queryCallback function for queries execution
164  # @return new status code value
165  def urlInsertWithGoodSietId(self, urlObj, statusInit, queryCallback):
166  ret = statusInit
167  if not self.selectURL(urlObj, queryCallback):
168  if self.addURL(urlObj, queryCallback):
169  ret = URLNewTask.CODE_GOOD_INSERT
170  if urlObj.attributes is not None and len(urlObj.attributes) > 0:
171  self.attributesSet(urlObj.attributes, queryCallback)
172  else:
173  ret = URLNewTask.CODE_ALREADY_EXIST
174  if urlObj.urlUpdate is not None:
175  logger.debug(">>> Url New Start Internal urlUpdate")
176  self.urlUpdateTask.process([urlObj.urlUpdate], queryCallback)
177 
178  if urlObj.attributes is not None and len(urlObj.attributes) > 0:
179  self.attributesSet(urlObj.attributes, queryCallback)
180 
181  self.recalculator.commonRecalc(urlObj.siteId, queryCallback)
182  if "urlPut" in urlObj.__dict__ and urlObj.urlPut is not None:
183  self.urlUpdateTask.urlPutOperation(urlObj, urlObj.urlPut, queryCallback)
184  return ret
185 
186 
187  # #update urls
188  #
189  # @param urlObject instance of URL object
190  # @param queryCallback function for queries execution
191  def selectURL(self, urlObject, queryCallback):
192  ret = False
193  self.urlMd5 = None
194  LOCAL_URL_CHECK_QUERY = "SELECT COUNT(*) FROM `urls_%s` WHERE `URLMd5` = '%s'"
195  if urlObject.urlMd5 is not None:
196  self.urlMd5 = urlObject.urlMd5
197  else:
198  self.urlMd5 = hashlib.md5(urlObject.url).hexdigest()
199  query = LOCAL_URL_CHECK_QUERY % (urlObject.siteId, self.urlMd5)
200  res = queryCallback(query, Constants.SECONDARY_DB_ID)
201  if hasattr(res, '__iter__') and len(res) > 0 and len(res[0]) > 0 and res[0][0] > 0:
202  ret = True
203  return ret
204 
205 
206  # #inserts new url
207  #
208  # @param urlObject instance of URL object
209  # @param queryCallback function for queries execution
210  def addURL(self, urlObject, queryCallback):
211  self.statisticLogUpdate(urlObject, self.urlMd5, urlObject.siteId, urlObject.status, queryCallback, True)
212  ret = False
213  fields, values = Constants.getFieldsValuesTuple(urlObject, Constants.URLTableDict)
214  fieldValueString = Constants.createFieldsValuesString(fields, values)
215  if fieldValueString is not None and fieldValueString != "":
216  query = Constants.INSERT_COMMON_TEMPLATE % ((Constants.DC_URLS_TABLE_NAME_TEMPLATE % urlObject.siteId),
217  fieldValueString)
218  logger.debug(str(query))
219  queryCallback(query, Constants.SECONDARY_DB_ID, Constants.EXEC_NAME, True)
220  ret = True
221 
222  return ret
223 
224 
225  # #inserts new Attributes
226  #
227  # @param attributes list of Attributes objects
228  # @param queryCallback function for queries execution
229  def attributesSet(self, attributes, queryCallback):
230  logger.debug(">>> Add Attributes (len) == " + str(len(attributes)))
231  attrSetTask = AttrSetTask()
232  res = attrSetTask.process(attributes, queryCallback)
233  logger.debug(">>> Add Attributes (res) == " + varDump(res))
def isSiteExist(self, siteId, queryCallback, userId=None)
Definition: BaseTask.py:29
def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask, siteTask=None)
Definition: URLNewTask.py:32
def calcSiteIdByUrl(self, url)
Definition: URLNewTask.py:84
def newSiteCreate(self, initUrl, queryCallback)
Definition: URLNewTask.py:44
GeneralResponse event object, represents general state response for multipurpose usage.
def urlInsertWithGoodSietId(self, urlObj, statusInit, queryCallback)
Definition: URLNewTask.py:165
def siteTableOperation(self, urlObj, queryCallback)
Definition: URLNewTask.py:96
def addURL(self, urlObject, queryCallback)
Definition: URLNewTask.py:210
def statisticLogUpdate(self, localObj, urlMd5, siteId, status, queryCallback, isInsert=False)
Definition: BaseTask.py:154
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
def process(self, urls, queryCallback)
Definition: URLNewTask.py:134
def fillSiteRelatedFields(self, urlObj, queryCallback)
Definition: URLNewTask.py:55
def selectURL(self, urlObject, queryCallback)
Definition: URLNewTask.py:191
def resolveSiteIdByURL(self, url, queryCallback)
Definition: URLNewTask.py:74
def attributesSet(self, attributes, queryCallback)
Definition: URLNewTask.py:229