HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
Filters.py
Go to the documentation of this file.
1 """
2 Created on Mar 17, 2015
3 
4 @package: app
5 @author: scorp
6 @link: http://hierarchical-cluster-engine.com/
7 @copyright: Copyright © 2013-2014 IOIX Ukraine
8 @license: http://hierarchical-cluster-engine.com/license/
9 @since: 0.1
10 """
11 
12 import re
13 import copy
14 import time
15 import datetime
16 import dbi.EventObjects
17 from app.DateTimeType import DateTimeType
18 from app.Utils import varDump # pylint: disable=W0611
19 import app.Utils as Utils # pylint: disable=F0401
20 
21 
23 
24 
25 class Filters(object):
26 
27  DB_NAME = "dc_sites"
28  REPLACE_MARKER = '%'
29  ACTION_MULTIPLE = -1
30  FILTER_SPLIT_PATTERN = '\r\n'
31 
32  FILTER_NAME_PATTERN = "Pattern"
33  FILTER_NAME_SUBJECT = "Subject"
34  FILTER_NAME_OP_CODE = "OperationCode"
35  FILTER_NAME_STAGE = "Stage"
36  FILTER_NAME_ACTION = "Action"
37  FILTER_NAME_GROUP = "Group_Id"
38  FILTER_NAME_STATE = "State"
39 
40  STAGE_COLLECT_URLS = 0
41  STAGE_BEFORE_DOM_PRE = 1
42  STAGE_AFTER_DOM_PRE = 2
43  STAGE_AFTER_DOM = 3
44  STAGE_AFTER_PROCESSOR = 4
45  STAGE_ALL = 5
46  STAGE_COLLECT_URLS_PROTOCOLS = 6
47  STAGE_BEFORE_PROCESSOR = 7
48  STAGE_REDIRECT_URL = 8
49 
50  # Constants select stage for STAGE_BEFORE_DOM_PRE
51  SELECT_SUBJECT_RAW_CONTENT = 'RAW_CONTENT'
52  SELECT_SUBJECT_HEADERS_ALL = 'HEADERS_ALL'
53  SELECT_SUBJECT_LAST_MODIFIED = 'LAST_MODIFIED'
54 
55  LOGIC_OR = 0
56  LOGIC_AND = 1
57 
58  # Constants for operation codes, implements in Filters.filterAll method body
59  OC_RE = 0
60  OC_EQ = 1
61  OC_NOTEQ = 2
62  OC_EQLESS = 3
63  OC_EQMORE = 4
64  OC_LESS = 5
65  OC_MORE = 6
66  OC_SQLE = 7
67 
68  # States of filters
69  STATE_DISABLE = 0
70  STATE_ENABLE = 1
71 
72  MACRO_CASE_ORIGINAL = 0
73  MACRO_CASE_UPPER = 1
74  MACRO_CASE_LOWER = 2
75 
76  # #Constructor
77  #
78  # @param filters - incoming filters, makes deepcopy inside class constructor
79  # @param dbTaskWrapper - db-task wrapper (not used now)
80  # @param siteId - used with db-task wrapper (not used now)
81  # @param readMode - read mode
82  # @param fields - dictionary values of support macro names ('PDATE' and other)
83  # @param opCode - operation code
84  # @param stage - stage of apply filter
85  # @param selectSubject - select subject use select from DB
86  def __init__(self, filters, dbTaskWrapper=None, siteId=None, readMode=0, fields=None, opCode=None, stage=None,
87  selectSubject=None):
88  self.patternCache = {}
89  self.reFlags = re.M | re.U
90  self.dbTaskWrapper = dbTaskWrapper
91  self.siteId = siteId
92  self.readMode = readMode
93  self.fields = fields
94  self.stage = stage
95 
96  self.filters = []
97 
98  if filters is not None:
99  # if filters is not None istantiate self.filters directly
100  self.readFiltersFromDict(filters, opCode, stage, selectSubject)
101  # #else:
102  # else fill self.filters from database, using dbTask and siteId params (make SiteStatusTask request)
103  # Look CrawlerTask.readSiteFromDB method, return Site.filters field and parse it
104  # #self.filters = []
105  if dbTaskWrapper is not None and siteId is not None:
106  self.readFiltersFromDB(dbTaskWrapper, siteId, readMode, opCode, stage, selectSubject)
107 
108 
109  # # Read filters from dict generates internal filters from incoming filters list
110  #
111  # @param filters - incoming filters as list of dc.EventObjects.SiteFilter objects
112  # @param opCode - operation code for select condition
113  # @param stage - stage for select condition
114  # @param selectSubject - select subject use select from DB
115  # @return - None
116  def readFiltersFromDict(self, filters, opCode=None, stage=None, selectSubject=None):
117  self.filters = []
118  for localFilter in filters:
119  if opCode is not None and int(opCode) != int(localFilter.opCode):
120  continue
121  if stage is not None and int(stage) != int(localFilter.stage):
122  continue
123 
124  if localFilter.state == 0:
125  logger.debug("Filter: '" + str(localFilter.pattern) + "' skipped as DISABLE")
126  continue
127 
128 # logger.debug('opCode: ' + str(opCode) + ' type: ' + str(type(opCode)))
129 # logger.debug('stage: ' + str(stage) + ' type: ' + str(type(stage)))
130 # logger.debug('localFilter.stage: ' + str(localFilter.stage) + ' type: ' + str(type(localFilter.stage)))
131 # logger.debug('selectSubject: ' + str(selectSubject) + ' type: ' + str(type(selectSubject)) + \
132 # ' localFilter.subject: ' + str(localFilter.subject) + ' type: ' + str(type(localFilter.subject)))
133 
134  if int(localFilter.stage) == self.STAGE_BEFORE_DOM_PRE and \
135  selectSubject is not None and len(localFilter.subject) > 0 and \
136  selectSubject != localFilter.subject:
137  logger.debug('!!!! Skipped !!!!! ')
138  continue
139 
140  for pattern in localFilter.pattern.split(self.FILTER_SPLIT_PATTERN):
141  localDict = {}
142  localDict[self.FILTER_NAME_PATTERN] = pattern
143  if localFilter.pattern.find(self.REPLACE_MARKER) == -1:
144  # logger.debug('Cache initialize key: ' + str(localFilter.pattern + str(self.reFlags)) + ' pattern: ' + \
145  # str(localFilter.pattern))
146  self.patternCache[str(localFilter.pattern + str(self.reFlags))] = re.compile(localFilter.pattern, self.reFlags)
147  localDict[self.FILTER_NAME_SUBJECT] = localFilter.subject
148  localDict[self.FILTER_NAME_OP_CODE] = localFilter.opCode
149  localDict[self.FILTER_NAME_STAGE] = localFilter.stage
150  localDict[self.FILTER_NAME_ACTION] = localFilter.action
151  localDict[self.FILTER_NAME_GROUP] = localFilter.groupId
152  localDict[self.FILTER_NAME_STATE] = localFilter.state
153  self.filters.append(localDict)
154 
155 
156  # #readFiltersFromDB read filters from DB (using SQLCustom request to db-task) for specific site
157  #
158  # @param dbTaskWrapper - db-task wrapper
159  # @param siteId - site Id
160  # @param readMode - read mode
161  # @param opCode - operation code for select condition
162  # @param stage - stage for select condition
163  # @param selectSubject - select subject use select from DB
164  def readFiltersFromDB(self, dbTaskWrapper, siteId, readMode, opCode=None, stage=None, selectSubject=None):
165  # SQL_SELECT_TEMPLATE = "SELECT `Pattern`, `Subject`, `OperationCode`, `Stage`, `Action`, `Group_Id` " + \
166  # "FROM `sites_filters` WHERE `Mode`='%s' AND `State`='1' AND `Site_Id`='%s'"
167  SQL_SELECT_TEMPLATE = "SELECT * FROM `sites_filters` WHERE `Mode`='%s' AND `State`='1' AND `Site_Id`='%s'"
168  query = SQL_SELECT_TEMPLATE % (str(readMode), str(siteId))
169 
170  if opCode is not None:
171  query += (" AND `OperationCode`='%s'" % str(opCode))
172 
173  if stage is not None:
174  query += (" AND `Stage`='%s'" % str(stage))
175 
176  if selectSubject is not None and int(stage) == self.STAGE_BEFORE_DOM_PRE:
177  query += (" AND `Subject`='%s'" % str(selectSubject))
178 
179  logger.debug(">>> Filter start SQL Req: " + str(query))
180  affectDB = dbTaskWrapper.affect_db
181  dbTaskWrapper.affect_db = True
182  customResponse = dbTaskWrapper.customRequest(query, self.DB_NAME, dbi.EventObjects.CustomRequest.SQL_BY_NAME)
183  dbTaskWrapper.affect_db = affectDB
184  logger.debug(">>> Filter end SQL Req: " + str(customResponse))
185  if customResponse is not None:
186  for i in xrange(len(customResponse)):
187 # logger.debug("customResponse[%s] = %s", str(i), str(customResponse[i]))
188  if customResponse[i] is not None:
189  patterns = customResponse[i][self.FILTER_NAME_PATTERN].split(self.FILTER_SPLIT_PATTERN)
190 # logger.debug("patterns: " + str(patterns))
191  for pattern in patterns:
192  elem = copy.copy(customResponse[i])
193  elem[self.FILTER_NAME_PATTERN] = pattern
194  self.filters.append(elem)
195 
196 # logger.debug("customResponse self.filters: " + str(self.filters))
197 
198 
199  # #macroReplace makes macroreplacement in incoming pattern string by string from values dict
200  #
201  # @param pattern - incoming pattern string
202  # @param values - dict with incoming old substrings (values.keys) and new substrings (values.values)
203  # @param marker - additional prefix+suffix for values.keys
204  # @param case - 0 - don not change name case, 1 - change to upper, 2 - change to lower
205  # @return replacemented string
206  def macroReplace(self, pattern, values, marker, case=MACRO_CASE_ORIGINAL):
207 # logger.info('>>> macroReplace values: ' + str(values) + ' pattern: ' + str(pattern))
208 
209  ret = copy.copy(pattern)
210  for key in values:
211  if values[key] is not None:
212  if case == self.MACRO_CASE_UPPER:
213  rkey = key.upper()
214  elif case == self.MACRO_CASE_LOWER:
215  rkey = key.lower()
216  else:
217  rkey = key
218  ret = ret.replace(marker + rkey + marker, "'" + str(values[key]) + "'" if isinstance(values[key], basestring) \
219  else str(values[key]))
220 
221  return ret
222 
223 
224  # #comparing values comparing method
225  #
226  # @param value1 - comparing value1
227  # @param value2 - comparing value2
228  # @param OCType - operation type
229  # @return bool result of values comparing
230  def comparing(self, value1, value2, OCType):
231  ret = False
232  # logger.debug("Value1:\n%s\nValue2:\n%s\nOCType:\n%s", str(value1), str(value2), str(OCType))
233 # logger.debug("Value1:\n%s\nValue2:\n%s\nOCType:\n%s", str(value1[:255] + ' . . . '), str(value2), str(OCType))
234  try:
235  if OCType == self.OC_RE:
236  # if str(value2 + str(self.reFlags)) in self.patternCache:
237  # pattern = self.patternCache[str(value2 + str(self.reFlags))]
238  # logger.debug("Use pattern '" + str(value2 + str(self.reFlags)) + "' from cache")
239  # else:
240  # #pattern = re.compile(value2, self.reFlags)
241  # logger.debug("Use pattern '" + str(value2 + str(self.reFlags)) + "' without cache")
242  # logger.debug('patternCache: ' + str(self.patternCache.keys()))
243  # #if pattern.match(value1, self.reFlags) is not None:
244  # # ret = True
245 
246  if re.search(value2, value1, self.reFlags) is not None:
247  ret = True
248 
249  elif OCType == self.OC_EQ:
250  ret = (int(value1) == int(value2))
251  elif OCType == self.OC_NOTEQ:
252  ret = (int(value1) != int(value2))
253  elif OCType == self.OC_EQLESS:
254  ret = (int(value1) <= int(value2))
255  elif OCType == self.OC_EQMORE:
256  ret = (int(value1) >= int(value2))
257  elif OCType == self.OC_LESS:
258  ret = (int(value1) < int(value2))
259  elif OCType == self.OC_MORE:
260  ret = (int(value1) > int(value2))
261  elif OCType == self.OC_SQLE:
262  ret = self.checkSqlExpression(value2, None, self.fields) # value2 content value of 'Pattern'
263  except ValueError as exp:
264  logger.debug(">>> Value error = " + str(exp))
265  except Exception, err:
266  logger.debug(">>> Common exception, OCType = " + str(OCType) + ", val1 = " + str(value1) + ", val2 = " \
267  + str(value2) + ", error: " + str(err))
268 # logger.debug('comparing ret = ' + str(ret))
269  return ret
270 
271 
272  def searchFiltersWithStage(self, stage):
273  ret = 0
274  if self.filters is not None:
275  for localFilter in self.filters:
276  if localFilter[self.FILTER_NAME_STAGE] == stage:
277  ret += 1
278  return ret
279 
280 
281  # # Check exists any filters with stage
282  #
283  # @param stage - stage of filtes
284  # @return True - if exist filter with this stage, otherwise False
285  def isExistStage(self, stage):
286  # variable for result
287  ret = False
288  if self.filters is not None:
289  for localFilter in self.filters:
290  if localFilter[self.FILTER_NAME_STAGE] == stage:
291  ret = True
292  break
293 
294  return ret
295 
296 
297  # # Check exists any filters with stage
298  #
299  # @param stage - stage of filters
300  # @param opCode - operation code
301  # @return True - if exist filter with this stage, otherwise False
302  def isExist(self, stage, opCode):
303  # variable for result
304  ret = False
305  if self.filters is not None:
306  for localFilter in self.filters:
307  if int(localFilter[self.FILTER_NAME_STAGE]) == int(stage) and \
308  int(localFilter[self.FILTER_NAME_OP_CODE]) == int(opCode):
309  ret = True
310  break
311 
312  return ret
313 
314 
315  # #filterAll method applyes all filters and return result
316  #
317  # @param stage - current stage, available stages looks above
318  # @param value - replacement values (dict)
319  # @param logic -
320  # @param subject -
321  # @param excludeIncludeMode - None - means any, >0 - include, else - exclude
322  # @return bool result of values comparing
323  def filterAll(self, stage, value, logic=LOGIC_AND, subject=None, excludeIncludeMode=None):
324 # logger.debug('filterAll() enter... filters count = ' + str(len(self.filters)) + '\nstage = ' + str(stage) + \
325 # # '\nvalue: ' + str(value) + \
326 # '\nlogic: ' + str(logic) + \
327 # '\nsubject: ' + str(subject[:255]) + \
328 # '\nexcludeIncludeMode: ' + str(excludeIncludeMode))
329  if stage is None:
330  stage = self.stage
331 
332  ret = []
333 # localGroupDict = {}
334  resValues = []
335  for localFilter in self.filters:
336  if int(localFilter[self.FILTER_NAME_STATE]) == self.STATE_ENABLE and \
337  (stage == self.STAGE_ALL or localFilter[self.FILTER_NAME_STAGE] == self.STAGE_ALL or \
338  localFilter[self.FILTER_NAME_STAGE] == stage) and ((excludeIncludeMode is None) or\
339  (int(excludeIncludeMode) == int(localFilter["Action"]))):
340 # logger.debug('Use filter: %s', varDump(localFilter))
341 
342  if subject is not None and (localFilter[self.FILTER_NAME_SUBJECT] == "" or \
343  localFilter[self.FILTER_NAME_SUBJECT] == self.SELECT_SUBJECT_RAW_CONTENT or \
344  localFilter[self.FILTER_NAME_SUBJECT] == self.SELECT_SUBJECT_HEADERS_ALL or \
345  localFilter[self.FILTER_NAME_SUBJECT] == self.SELECT_SUBJECT_LAST_MODIFIED):
346  localStage = subject
347  else:
348  localStage = localFilter[self.FILTER_NAME_SUBJECT]
349 
350  # make macroreplacement localFilter["Subject"] by correspond value from value param dict
351  # apply localFilter["Patter"] for previously getted result (result of macroreplacement)
352  if localFilter[self.FILTER_NAME_PATTERN] is not None:
353  localPattern = self.macroReplace(localFilter[self.FILTER_NAME_PATTERN], value, self.REPLACE_MARKER)
354 
355 # logger.info('>>> filterAll macroReplace localPattern: ' + str(localPattern))
356  localPattern = self.getGmtTime(localPattern, logger)
357 # logger.info('>>> filterAll getGmtTime localPattern: ' + str(localPattern))
358 # logger.info('>>> filterAll logic = ' + str(logic))
359 
360  if logic == self.LOGIC_OR:
361  # Return ret = [x1, x2, ... xN] where (xN = +/- localFilter["Action"] value) of applying pattern
362  # and +/- depend on corresponds filter for own "Pattern" or not.
363  # for example:
364  # we have 3 applying filters with localFilter["Action"] = {3, 4, 5} correspondingly,
365  # 1th filter doesn't correspond own localFilter["Pattern"] , 2th and 3th - correspond
366  # that we return ret = [-3, 4, 5]
367  # if self.comparing(localStage, localPattern, localFilter[self.FILTER_NAME_OP_CODE]):
368  # localRes = localFilter[self.FILTER_NAME_ACTION]
369  # else:
370  # localRes = localFilter[self.FILTER_NAME_ACTION] * self.ACTION_MULTIPLE
371 
372  localRes = int(self.comparing(localStage, localPattern, localFilter[self.FILTER_NAME_OP_CODE]))
373 
374 # logger.info('>>> localRes: ' + str(localRes))
375 
376 # if localFilter[self.FILTER_NAME_GROUP] in localGroupDict:
377 # logger.info('>>> localFilter[self.FILTER_NAME_GROUP]: ' + str(localFilter[self.FILTER_NAME_GROUP]))
378 # if localGroupDict[localFilter[self.FILTER_NAME_GROUP]] > 0:
379 # localGroupDict[localFilter[self.FILTER_NAME_GROUP]] = localRes
380 # else:
381 # localGroupDict[localFilter[self.FILTER_NAME_GROUP]] = localRes
382 # # set result values
383 # ret = localGroupDict.values()
384 
385  # add by alexv 22.06.2017
386  resValues.append(localRes)
387  ret = resValues
388 
389  elif logic == self.LOGIC_AND:
390  # If all applying filters correspond own "Pattern"s that ret = [True], else ret = [False]
391  if self.comparing(localStage, localPattern, localFilter[self.FILTER_NAME_OP_CODE]):
392  ret = [True]
393  else:
394  ret = [False]
395  break
396 
397  return ret
398 
399 
400  # Method found value in existing filters ([self.FILTER_NAME_ACTION] field)
401  # return bool result of finding
402  def isExistInActions(self, value):
403  ret = False
404  for localFilter in self.filters:
405  if localFilter is not None and localFilter[self.FILTER_NAME_ACTION] == value:
406  ret = True
407  break
408  return ret
409 
410 
411  # #Get GMT time from macros as string
412  #
413  # @param localPattern - input pattern as string
414  # @param loggerIns - instance of logger
415  # @return value of time as string
416  def getGmtTime(self, localPattern, loggerIns):
417  try:
418  d = {"SHORTYEAR":"y", "YEAR":"Y", "MONTH":"m", "DAY":"d", "HOUR":"H", "MINUTE":"M", "SECOND":"S"}
419  regex = re.compile("%@(SHORTYEAR|YEAR|MONTH|DAY|HOUR|MINUTE|SECOND)\\(([\\+|\\-]\\d{1,2})\\)%")
420  matchArray = regex.findall(localPattern)
421 
422  for i in matchArray:
423  ii = time.strftime("%" + d[i[0]], time.gmtime(time.time() + datetime.timedelta(hours=(+int(i[1]))).seconds))
424  localPattern = localPattern.replace("%@" + i[0] + "(" + i[1] + ")%", ii)
425  except Exception, err:
426  loggerIns.error(str(err))
427 
428  return localPattern
429 
430 
431  # #Check Sql expression operation
432  #
433  # @param value - sql expression
434  # @param pubdate - date extracted on crawler
435  # @param fields - dictionary values of support macro names ('PDATE' and other)
436  # @return True if success, otherwise False
437  def checkSqlExpression(self, pattern, pubdate=None, fields=None):
438  # variable for result
439  ret = False
440 
441  if self.dbTaskWrapper is None:
442  ret = True
443  else:
444  if fields is None:
445  logger.debug('pattern: ' + str(pattern))
446  logger.debug('pubdate: ' + str(pubdate))
447 
448  if pubdate is not None:
449  dt = DateTimeType.parse(pubdate, True, logger, False)
450  if dt is not None:
451  dateStr = "'" + dt.strftime("%Y-%m-%d %H:%M:%S") + "'"
452  localPattern = self.macroReplace(pattern, {'PDATE':dateStr}, self.REPLACE_MARKER)
453  logger.debug('localPattern: ' + str(localPattern))
454 
455  sqlQuery = "SELECT * FROM `sites_filters` WHERE `Mode`='%s' AND `State`='1' AND `Site_Id`='%s'" % \
456  (str(self.readMode), str(self.siteId))
457 
458  if localPattern:
459  sqlQuery += " AND " + localPattern
460 
461  logger.debug("sqlQuery: " + str(sqlQuery))
462 
463  customResponse = self.dbTaskWrapper.customRequest(sqlQuery, self.DB_NAME)
464  logger.debug("customResponse: " + str(customResponse))
465  if customResponse is not None:
466  for elem in customResponse:
467  if elem is not None:
468  logger.debug("elem: " + str(elem))
469 
470  ret = bool(len(customResponse) > 0)
471  else:
472  pattern = self.macroReplace(pattern, fields, self.REPLACE_MARKER, case=1)
473  sqlQuery = 'SELECT ' + pattern
474  logger.debug("sqlQuery: " + str(sqlQuery))
475  affectDB = self.dbTaskWrapper.affect_db
476  self.dbTaskWrapper.affect_db = True
477  customResponse = self.dbTaskWrapper.customRequest(sqlQuery, self.DB_NAME)
478  self.dbTaskWrapper.affect_db = affectDB
479  logger.debug("customResponse: " + str(customResponse))
480  if customResponse is not None and len(customResponse) > 0 and len(customResponse[0]) > 0 and \
481  int(customResponse[0][0]) > 0:
482  ret = True
483 
484  return ret
string FILTER_NAME_PATTERN
Definition: Filters.py:32
string FILTER_NAME_ACTION
Definition: Filters.py:36
def readFiltersFromDict(self, filters, opCode=None, stage=None, selectSubject=None)
Definition: Filters.py:116
def isExistStage(self, stage)
Definition: Filters.py:285
def isExist(self, stage, opCode)
Definition: Filters.py:302
def getGmtTime(self, localPattern, loggerIns)
Definition: Filters.py:416
def __init__(self, filters, dbTaskWrapper=None, siteId=None, readMode=0, fields=None, opCode=None, stage=None, selectSubject=None)
Definition: Filters.py:87
string FILTER_NAME_SUBJECT
Definition: Filters.py:33
def readFiltersFromDB(self, dbTaskWrapper, siteId, readMode, opCode=None, stage=None, selectSubject=None)
Definition: Filters.py:164
def macroReplace(self, pattern, values, marker, case=MACRO_CASE_ORIGINAL)
Definition: Filters.py:206
string FILTER_NAME_STATE
Definition: Filters.py:38
def filterAll(self, stage, value, logic=LOGIC_AND, subject=None, excludeIncludeMode=None)
Definition: Filters.py:323
string SELECT_SUBJECT_LAST_MODIFIED
Definition: Filters.py:53
def comparing(self, value1, value2, OCType)
Definition: Filters.py:230
string FILTER_NAME_STAGE
Definition: Filters.py:35
int STAGE_BEFORE_DOM_PRE
Definition: Filters.py:41
def searchFiltersWithStage(self, stage)
Definition: Filters.py:272
string FILTER_NAME_GROUP
Definition: Filters.py:37
def isExistInActions(self, value)
Definition: Filters.py:402
def checkSqlExpression(self, pattern, pubdate=None, fields=None)
Definition: Filters.py:437
string FILTER_NAME_OP_CODE
Definition: Filters.py:34
string SELECT_SUBJECT_HEADERS_ALL
Definition: Filters.py:52
string SELECT_SUBJECT_RAW_CONTENT
Definition: Filters.py:51
string FILTER_SPLIT_PATTERN
Definition: Filters.py:30
string REPLACE_MARKER
Definition: Filters.py:28