HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
Go to the documentation of this file.
1 """
2 @package: dc
3 @file ProxyResolver.py
4 @author Scorp <developers.hce@gmail.com>
5 @link: http://hierarchical-cluster-engine.com/
6 @copyright: Copyright &copy; 2013-2014 IOIX Ukraine
7 @license: http://hierarchical-cluster-engine.com/license/
8 @since: 0.1
9 """
11 import re
12 import os
13 import json
14 import sys
15 import time
16 # import copy
18 import dbi.EventObjects
19 from app.Utils import ExceptionLog
20 from app.Utils import UrlParser
21 from app.Utils import varDump
22 import app.Utils as Utils # pylint: disable=F0401
23 import dc_db.Constants as DB_CONSTS
25 logger = Utils.MPLogger().getLogger()
28 # # ProxyResolver Class, implements http proxy resolving functionality
29 #
30 class ProxyResolver(object):
33  SOURCE_SQL = 1
35  PROXY_SQL_QUERY = ("SELECT * FROM `sites_proxy` WHERE `State` = 1 AND (`Site_Id` = '%s' OR `Site_Id` = '*')" +
36  " ORDER BY `Priority`")
38  PROXY_SQL_UPDATE_FAULTS_QUERY = "UPDATE `sites_proxy` SET `Faults`= %s WHERE `Site_Id` = '%s' AND `Host` = '%s'"
39  PROXY_SQL_DISABLE_QUERY = "UPDATE `sites_proxy` SET `State`= 0 WHERE `Site_Id` = '%s' AND `Host` = '%s'"
41  PROXY_SQL_DB = "dc_sites"
43  LIMITS = ["MIN", "HOUR", "DAY", "MONTH", "YEAR"]
44  SECONDS_MULTI = [60, 3600, 86400, 2590000, 31100000]
47  JSON_SUFF = ".json"
55  # #Class constructor
56  #
57  # @param siteProperties - incomig sites property dict
58  # @param dbWrapper - sql db acces wrapper
59  # @param siteId - current site id
60  # @param url - current resource url
61  def __init__(self, siteProperties, dbWrapper, siteId, url=None):
62  self.dbWrapper = dbWrapper
63  self.siteId = siteId
64  self.source = self.SOURCE_PROPERTY
65  self.proxyTuple = None
66  self.proxyStruct = None
67  self.internalIndexes = {}
68  self.domain = UrlParser.getDomain(url) if url is not None else None
69  self.indexFileName = None
70  proxyJson = None
71  # status update passible variables
75  # raw content check variables
80  try:
81  if siteProperties is not None:
82  if "HTTP_PROXY_HOST" in siteProperties and "HTTP_PROXY_PORT" in siteProperties:
83  self.proxyTuple = (siteProperties["HTTP_PROXY_HOST"], siteProperties["HTTP_PROXY_PORT"])
84  elif "USER_PROXY" in siteProperties:
85  try:
86  proxyJson = json.loads(siteProperties["USER_PROXY"])
87  except Exception as excp:
88  ExceptionLog.handler(logger, excp, ">>> Bad json in USER_PROXY property: " + \
89  str(siteProperties["USER_PROXY"]))
91  # extract 'source' property
92  if proxyJson is not None and "source" in proxyJson:
93  self.source = int(proxyJson["source"])
95  if self.source == self.SOURCE_PROPERTY:
96  if "proxies" in proxyJson:
97  self.proxyStruct = proxyJson["proxies"]
98  elif self.source == self.SOURCE_SQL:
99  self.proxyStruct = self.readSQLProxy(dbWrapper, siteId)
101  logger.debug('>>> self.proxyStruct: ' + str(self.proxyStruct))
103  # set status update different variables
104  if proxyJson is not None and "status_update_empty_proxy_list" in proxyJson and \
105  int(proxyJson["status_update_empty_proxy_list"]) >= self.STATUS_UPDATE_MIN_ALLOWED_VALUE and \
106  int(proxyJson["status_update_empty_proxy_list"]) <= self.STATUS_UPDATE_MAX_ALLOWED_VALUE:
107  self.statusUpdateEmptyProxyList = int(proxyJson["status_update_empty_proxy_list"])
109  if proxyJson is not None and "status_update_no_available_proxy" in proxyJson and \
110  int(proxyJson["status_update_no_available_proxy"]) >= self.STATUS_UPDATE_MIN_ALLOWED_VALUE and \
111  int(proxyJson["status_update_no_available_proxy"]) <= self.STATUS_UPDATE_MAX_ALLOWED_VALUE:
112  self.statusUpdateNoAvailableProxy = int(proxyJson["status_update_no_available_proxy"])
114  if proxyJson is not None and "status_update_tries_limit" in proxyJson and \
115  int(proxyJson["status_update_tries_limit"]) >= self.STATUS_UPDATE_MIN_ALLOWED_VALUE and \
116  int(proxyJson["status_update_tries_limit"]) <= self.STATUS_UPDATE_MAX_ALLOWED_VALUE:
117  self.statusUpdateTriesLimit = int(proxyJson["status_update_tries_limit"])
119  # set raw content check different variables
120  if proxyJson is not None and "raw_content_check" in proxyJson:
121  rawContentCheck = proxyJson["raw_content_check"]
122  if "patterns" in rawContentCheck:
123  self.rawContentCheckPatterns = rawContentCheck["patterns"]
124  if "rotate" in rawContentCheck:
125  self.rawContentCheckRotate = int(rawContentCheck["rotate"])
126  if "faults" in rawContentCheck:
127  self.rawContentCheckFaults = int(rawContentCheck["faults"])
128  else:
129  logger.error("Mandatory parameter 'patterns' for 'raw_content_check' not found")
131  if self.proxyTuple is None and proxyJson is not None:
132  if "file_path" in proxyJson:
133  self.indexFileName = proxyJson["file_path"]
134  if siteId is not None and not (self.indexFileName.find(self.JSON_SUFF) != -1 and \
135  self.indexFileName.find(self.JSON_SUFF) == len(self.indexFileName) - len(self.JSON_SUFF)):
136  if self.indexFileName[-1] != '/':
137  self.indexFileName += '/'
138  self.indexFileName += siteId
139  self.indexFileName += self.JSON_SUFF
141  # logger.debug('>>> self.indexFileName: ' + str(self.indexFileName))
144  # logger.debug('>>> self.proxyStruct: ' + str(self.proxyStruct))
145  # logger.debug('>>> self.internalIndexes: ' + str(self.internalIndexes))
147  if self.proxyStruct is not None:
148  for key in self.proxyStruct.keys():
149  if key in self.internalIndexes:
150  self.proxyStruct[key].update(self.internalIndexes[key])
152  # for key in self.internalIndexes:
153  # if key not in self.proxyStruct:
154  # self.proxyStruct.update({key:self.internalIndexes[key]})
155  # logger.debug('>>> self.proxyStruct2: ' + str(self.proxyStruct))
158  except Exception as err:
159  ExceptionLog.handler(logger, err, ">>> ProxyResolver exception", (), \
160  {ExceptionLog.LEVEL_NAME_ERROR:ExceptionLog.LEVEL_VALUE_DEBUG})
163  # #Method readIndexFile reads from file and return param index structure
164  #
165  # @param fileName - incoming file name
166  # @return json structure, just readed from file
167  @staticmethod
168  def readIndexFile(fileName):
169  ret = {}
170  if os.path.isfile(fileName):
171  fd = None
172  try:
173  fd = open(fileName, "r")
174  if fd is not None:
175  fileData = fd.read()
176  logger.debug('>>> readIndexFile fileData: ' + str(fileData) + ' length: ' + str(len(fileData)) + ' bytes.')
177  ret = json.loads(str(fileData))
179  # logger.debug('>>> readIndexFile ret: ' + str(ret))
180  fd.close()
181  except Exception as excp:
182  ExceptionLog.handler(logger, excp, ">>> readIndexFile error, file name = " + str(fileName))
183  if fd is not None:
184  fd.close()
186  return ret
189  # #Method saveIndexInFile saves indexes structute into the file
190  #
191  # @param fileName - incoming file name
192  # @param jsonData - output data in json format for save to file
193  # @return None
194  @staticmethod
195  def saveIndexInFile(fileName, jsonData):
196  if jsonData is not None and len(jsonData) > 0:
197  fd = None
198  try:
199  # logger.debug('jsonData: ' + str(jsonData))
200  fileData = json.dumps(jsonData)
201  fd = open(fileName, "w")
202  if fd is not None:
203  fd.write(fileData)
204  fd.close()
205  except Exception as excp:
206  ExceptionLog.handler(logger, excp, ">>> saveIndexInFile error, file name = " + str(fileName))
209 # # #Method fieldsToLower return dict, which is copy of elem with lowercase keys
210 # #
211 # # @param elem incoming dict
212 # # @return lowcase dict
213 # def fieldsToLower(self, elem):
214 # ret = {}
215 # for key in elem:
216 # ret[key.lower()] = elem[key]
217 # return ret
220  # #Method fieldsToObjectName return dict, which is copy of elem with names using in objects
221  #
222  # @param inDict - incoming dict
223  # @return lowcase dict
224  def fieldsToObjectName(self, inDict):
225  # variable for result value
226  ret = {}
227  for fieldName in inDict:
228  for key, value in DB_CONSTS.ProxyTableDict.items():
229  if value == fieldName:
230  ret[key] = inDict[fieldName]
231  break
233  return ret
236  # #Method readSQLProxy, reads proxy data from SQL storage
237  #
238  # @param dbWrapper - sql db acces wrapper
239  # @param siteId - current site id
240  # @return just fetched proxy data
241  def readSQLProxy(self, dbWrapper, siteId):
242  ret = {}
243  saveDBMode = dbWrapper.affect_db
244  dbWrapper.affect_db = True
245  result = dbWrapper.customRequest(self.PROXY_SQL_QUERY % siteId, self.PROXY_SQL_DB,
246  dbi.EventObjects.CustomRequest.SQL_BY_NAME)
247  dbWrapper.affect_db = saveDBMode
248  if result is not None and len(result) > 0:
249  for elem in result:
250  elemInLower = self.fieldsToObjectName(elem) # self.fieldsToLower(elem)
251  ret[elemInLower["host"]] = elemInLower
252  # ret[elem["host"]]["freq"] = 0
253  # #logger.debug('readSQLProxy ret: ' + varDump(ret))
255  try:
256  if "limits" in ret[elemInLower["host"]] and ret[elemInLower["host"]]["limits"] is not None:
257  if not ret[elemInLower["host"]]["limits"]:
258  ret[elemInLower["host"]]["limits"] = []
259  else:
260  ret[elemInLower["host"]]["limits"] = json.loads(ret[elemInLower["host"]]["limits"])
261  except Exception, err:
262  ExceptionLog.handler(logger, err, ">>> Wrong json in 'limits': " + \
263  varDump(ret[elemInLower["host"]]["limits"]) + \
264  ", host = " + (elem["host"] if "host" in elem else "None"))
265  try:
266  if "domains" in ret[elemInLower["host"]] and ret[elemInLower["host"]]["domains"] is not None:
267  if ret[elemInLower["host"]]["domains"] == "":
268  ret[elemInLower["host"]]["domains"] = ['*']
269  else:
270  ret[elemInLower["host"]]["domains"] = json.loads(ret[elemInLower["host"]]["domains"])
271  except Exception as err:
272  ExceptionLog.handler(logger, err, ">>> Wrong json in 'domains': " + \
273  varDump(ret[elemInLower["host"]]["domains"]) + ", host = " +
274  (elem["host"] if "host" in elem else "None"))
275  try:
276  if "cDate" in ret[elemInLower["host"]] and ret[elemInLower["host"]]["cDate"] is not None:
277  ret[elemInLower["host"]]["cDate"] = str(ret[elemInLower["host"]]["cDate"])
278  except Exception as err:
279  ExceptionLog.handler(logger, err, ">>> Wrong json in 'cDate': " + \
280  varDump(ret[elemInLower["host"]]["cDate"]) + ", host = " +
281  (elem["host"] if "host" in elem else "None"))
282  try:
283  if "uDate" in ret[elemInLower["host"]] and ret[elemInLower["host"]]["uDate"] is not None:
284  ret[elemInLower["host"]]["uDate"] = str(ret[elemInLower["host"]]["uDate"])
285  except Exception as err:
286  ExceptionLog.handler(logger, err, ">>> Wrong json in 'uDate': " + \
287  varDump(ret[elemInLower["host"]]["uDate"]) + ", host = " +
288  (elem["host"] if "host" in elem else "None"))
290  return ret
293 # # #Method updateProxyByFreqAndLimits, updates (if available) freq statistic in common proxy structure
294 # #
295 # def updateProxyByFreqAndLimits(self, proxyName):
296 # logger.debug('>>> updateProxyByFreqAndLimits enter...')
297 # # logger.debug('>>> self.proxyStruct: ' + str(self.proxyStruct))
298 # logger.debug('>>> self.internalIndexes: ' + str(self.internalIndexes))
299 # if self.proxyStruct is not None:
300 # for key in self.proxyStruct:
301 # if key == proxyName:
302 # if key in self.internalIndexes:
303 # logger.debug('>>> if key in self.internalIndexes')
304 # if "freq" in self.internalIndexes[key]:
305 # logger.debug('>>> if "freq" in self.internalIndexes[key]')
306 # self.proxyStruct[key]["freq"] = self.internalIndexes[key]["freq"]
307 # else:
308 # logger.debug('>>> else1')
309 # if "freq" in self.proxyStruct[key]:
310 # logger.debug('>>> if "freq" in self.proxyStruct[key]')
311 # self.proxyStruct[key]["freq"] += 1
312 # else:
313 # logger.debug('>>> else2')
314 # self.proxyStruct[key].update({"freq":1})
315 #
316 # if "limits_stat" in self.internalIndexes[key]:
317 # self.proxyStruct[key]["limits_stat"] = copy.deepcopy(self.internalIndexes[key]["limits_stat"])
318 # else:
319 # self.proxyStruct[key].update({"limits_stat":{}})
320 # else:
321 # if "freq" in self.proxyStruct[key]:
322 # logger.debug('>>> if "freq" in self.proxyStruct[key]')
323 # self.proxyStruct[key]["freq"] += 1
324 # else:
325 # logger.debug('>>> else3')
326 # self.proxyStruct[key]["freq"] = 1 # sys.maxint
327 #
328 # if not "limits_stat" in self.proxyStruct[key]:
329 # self.proxyStruct[key]["limits_stat"] = {}
330 #
331 # # logger.debug('>>> self.proxyStruct: ' + str(self.proxyStruct))
334  # #Method checkLimits, checks freq limits for current proxy
335  #
336  # @param key - proxy key
337  # @return bool value - is available proxy by limits or not
338  def checkLimits(self, key):
339  ret = True
340  if key in self.proxyStruct and "limits" in self.proxyStruct[key] and \
341  self.proxyStruct[key]["limits"] is not None and \
342  "limits_stat" in self.proxyStruct[key] and \
343  isinstance(self.proxyStruct[key]["limits_stat"], dict) and \
344  len(self.proxyStruct[key]["limits_stat"]) > 0:
345  index = 0
346  curTimeStamp = int(time.time())
347  for limit in self.proxyStruct[key]["limits"]:
348  if index >= len(self.LIMITS):
349  break
350  if self.LIMITS[index] + "_START_POINT" in self.proxyStruct[key]["limits_stat"]:
351  if curTimeStamp - self.proxyStruct[key]["limits_stat"][self.LIMITS[index] + "_START_POINT"] >= \
352  self.SECONDS_MULTI[index]:
353  self.proxyStruct[key]["limits_stat"][self.LIMITS[index] + "_START_POINT"] = curTimeStamp
354  self.proxyStruct[key]["limits_stat"][self.LIMITS[index] + "_FREQ"] = 0
355  if limit > 0 and self.LIMITS[index] + "_FREQ" in self.proxyStruct[key]["limits_stat"] and \
356  self.proxyStruct[key]["limits_stat"][self.LIMITS[index] + "_FREQ"] >= limit:
357  ret = False
358  break
359  index += 1
360  return ret
363  # #Method checkLimits, checks available domains for current proxy
364  #
365  # @param key - proxy key
366  # @return bool value - is available proxy by domain or not
367  def checkDomains(self, key):
368  ret = True
369  if self.domain is not None and key in self.proxyStruct and "domains" in self.proxyStruct[key] and \
370  self.proxyStruct[key]["domains"] is not None \
371  and self.domain not in self.proxyStruct[key]["domains"] and '*' not in self.proxyStruct[key]["domains"]:
372  ret = False
373  return ret
376  # #Method commonIncrementLimits, method increments freq fields in common type container
377  #
378  # @param container - incoming container
379  # @param key - proxy key
380  def commonIncrementLimits(self, container, key):
381  logger.debug('>>> commonIncrementLimits enter...')
382  if key in container:
383  if "freq" in container[key]:
384  logger.debug('>>> container[key]["freq"] += 1')
385  container[key]["freq"] += 1
386  else:
387  logger.debug('>>> container[key].update({"freq":1})')
388  container[key].update({"freq":1})
390  if "limits_stat" in container[key] and len(container[key]["limits_stat"]) > 0:
391  for elem in self.LIMITS:
392  if elem + "_FREQ" in container[key]["limits_stat"]:
393  container[key]["limits_stat"][elem + "_FREQ"] += 1
394  else:
395  logger.debug('>>> container[key].update({"limits_stat":{}})')
396  container[key].update({"limits_stat":{}})
398  else:
399  container.update({key:{"host":key, "freq":1, "limits_stat":{}}})
402  # #Method incrementLimits increments freq firlds in proxyStruct and internalIndexes containers
403  #
404  # @param key - proxy key
405  def incrementLimits(self, key):
406  self.commonIncrementLimits(self.proxyStruct, key)
409  # #Method fillProxyTuple - fills and returns proxy tuple
410  #
411  # @param elem - incoming proxy data in raw (string format)
412  # @return resolver proxy (as pair tuple (host, port)) or None
413  def fillProxyTuple(self, elem):
414  ret = None
415  if len(elem["host"].split(':')) > 1:
416  ret = (elem["host"].split(':')[0], elem["host"].split(':')[1])
417  else:
418  ret = (elem["host"].split(':')[0], self.DEFAULT_PORT)
419  return ret
422  # #Method getProxy - public method which implements main proxy resolver functionality
423  #
424  # @param previousProxy - previously returned proxy
425  # @return resolver proxy (as pair tuple (host, port)) or None
426  def getProxy(self, previousProxy=None):
427  logger.debug('>>> getProxy enter...')
428  ret = None
429  saveIndexFile = False
430  if self.proxyTuple is not None:
431  ret = self.proxyTuple
432  elif self.proxyStruct is not None:
433  logger.debug('>>> elif self.proxyStruct is not None')
434  if previousProxy is None and "priority" in self.proxyStruct:
435  logger.debug('>>> previousProxy is None')
436  for elem in sorted(self.proxyStruct.values(), key=lambda x: x["freq"] + \
437  x["priority"] * sys.maxint if "freq" in x else x["priority"]):
438  if self.checkLimits(elem["host"]) and self.checkDomains(elem["host"]):
439  logger.debug('>>> if self.checkLimits(elem["host"]) and self.checkDomains(elem["host"])')
440  ret = self.fillProxyTuple(elem)
441  self.incrementLimits(elem["host"])
442  saveIndexFile = True
443  break
444  else:
445  logger.debug('>>> else')
446  for elem in self.proxyStruct.values():
447  logger.debug('>>> self.checkLimits: ' + str(bool(self.checkLimits(elem["host"]))))
448  logger.debug('>>> self.checkDomains: ' + str(bool(self.checkDomains(elem["host"]))))
450  if self.checkLimits(elem["host"]) and self.checkDomains(elem["host"]):
451  ret = self.fillProxyTuple(elem)
452  self.incrementLimits(elem["host"])
453  saveIndexFile = True
454  break
455  # tmpRes = self.fillProxyTuple(elem)
456  # if (tmpRes[0] != previousProxy[1] or tmpRes[1] != previousProxy[2]) and len(self.proxyStruct) > 1:
457  # tmpRes = None
459  logger.debug('>>> self.indexFileName: ' + str(self.indexFileName))
460  logger.debug('>>> saveIndexFile: ' + str(saveIndexFile))
462  if saveIndexFile and self.indexFileName is not None:
463  self.saveIndexInFile(self.indexFileName, self.proxyStruct)
465  logger.debug('>>> getProxy leave... ret: ' + str(ret))
466  return ret
469  # #getTriesCount class's static method, parse and returns tries_count value from USER_PROXY property
470  #
471  # @param siteProperties - incomig sites property dict
472  # @return tries_count value or None
473  @staticmethod
474  def getTriesCount(siteProperties):
475  ret = None
476  if "USER_PROXY" in siteProperties:
477  try:
478  proxyJson = json.loads(siteProperties["USER_PROXY"])
479  if "tries_count" in proxyJson:
480  ret = int(proxyJson["tries_count"])
481  except Exception as excp:
482  ExceptionLog.handler(logger, excp, ">>> Bad json in USER_PROXY property: " + str(siteProperties["USER_PROXY"]))
483  return ret
486  # #Add fault proxy
487  #
488  # @param proxyName - proxy host name
489  # @param incrementSize - value for increment of faults counter
490  # @return - None
491  def addFault(self, proxyName, incrementSize=1):
492  logger.debug('addFault enter ... proxyName: ' + str(proxyName))
493  if proxyName in self.proxyStruct and "faultsMax" in self.proxyStruct[proxyName] and \
494  "faults" in self.proxyStruct[proxyName]:
495  faultsMax = int(self.proxyStruct[proxyName]["faultsMax"])
496  faults = int(self.proxyStruct[proxyName]["faults"])
497  faults += incrementSize
498  self.proxyStruct[proxyName].update({"faults":faults})
499  if self.source == self.SOURCE_SQL:
500  saveDBMode = self.dbWrapper.affect_db
501  self.dbWrapper.affect_db = True
502  result = self.dbWrapper.customRequest(self.PROXY_SQL_UPDATE_FAULTS_QUERY % (faults, self.siteId, proxyName),
503  self.PROXY_SQL_DB,
504  dbi.EventObjects.CustomRequest.SQL_BY_NAME)
505  self.dbWrapper.affect_db = saveDBMode
506  logger.debug('customRequest result: ' + varDump(result))
508  if faultsMax > 0 and faults >= faultsMax:
509  self.proxyStruct[proxyName].update({"state":0})
511  if self.source == self.SOURCE_SQL:
512  saveDBMode = self.dbWrapper.affect_db
513  self.dbWrapper.affect_db = True
514  result = self.dbWrapper.customRequest(self.PROXY_SQL_DISABLE_QUERY % (self.siteId, proxyName),
515  self.PROXY_SQL_DB,
516  dbi.EventObjects.CustomRequest.SQL_BY_NAME)
517  self.dbWrapper.affect_db = saveDBMode
518  logger.debug('customRequest result: ' + varDump(result))
520  self.saveIndexInFile(self.indexFileName, self.proxyStruct)
523  # # Check is empty proxies list
524  #
525  # @param - None
526  # @return True in case of empty list, otherwise False
528  # variable for result
529  ret = False
530  if self.proxyStruct is None or len(self.proxyStruct) == 0:
531  ret = True
533  return ret
536  # # Check raw content use regular expression patterns
537  #
538  # @param rawContent - raw content for check
539  # @return boolean flag - True if raw content has match or False otherwise
540  def checkPattern(self, rawContent):
541  # variable for result
542  ret = True
543  if rawContent is not None and isinstance(self.rawContentCheckPatterns, list):
544  ret = False
545  for pattern in self.rawContentCheckPatterns:
546  if re.search(pattern, rawContent, re.M | re.U) is not None:
547  ret = True
548  break
550  return ret
def readSQLProxy(self, dbWrapper, siteId)
def addFault(self, proxyName, incrementSize=1)
def saveIndexInFile(fileName, jsonData)
def __init__(self, siteProperties, dbWrapper, siteId, url=None)
def getProxy(self, previousProxy=None)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
def commonIncrementLimits(self, container, key)