HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
SitesManager.py
Go to the documentation of this file.
1 '''
2 HCE project, Python bindings, Distributed Crawler application.
3 SitesManager object and related classes definitions.
4 
5 @package: dc
6 @author bgv bgv.hce@gmail.com
7 @link: http://hierarchical-cluster-engine.com/
8 @copyright: Copyright © 2013-2016 IOIX Ukraine
9 @license: http://hierarchical-cluster-engine.com/license/
10 @since: 0.1
11 '''
12 
13 
14 import time
15 import ctypes
16 import logging
17 import zlib
18 import threading
19 import json
20 import ConfigParser
21 
22 
23 # import pickle
24 try:
25  import cPickle as pickle
26 except ImportError:
27  import pickle
28 
29 from app.BaseServerManager import BaseServerManager
30 import app.Utils as Utils # pylint: disable=F0401
31 import app.Consts as APP_CONSTS
32 from dc import EventObjects
33 from dc.Constants import EVENT_TYPES
34 import dc.Constants as DC_CONSTS
35 from drce.CommandConvertor import CommandConvertor
36 from drce.Commands import Session
37 from drce.Commands import TaskExecuteRequest
38 from drce.Commands import TaskExecuteStruct
39 from drce.DRCEManager import ConnectionTimeout, TransportInternalErr, CommandExecutorErr
40 from drce.DRCEManager import DRCEManager
41 from drce.DRCEManager import HostParams
42 from transport.ConnectionBuilderLight import ConnectionBuilderLight
43 from transport.IDGenerator import IDGenerator
44 from transport.UIDGenerator import UIDGenerator
45 import transport.Consts as TRANSPORT_CONSTS
46 from dbi.EventObjects import CustomRequest
47 
48 # Logger initialization
49 logger = Utils.MPLogger().getLogger()
50 lock = threading.Lock()
51 
52 
53 # #The SitesManager class, is a main interface of DC application and distributed sites database.
54 #
55 # This object is a main interface of DC application and DRCE cluster that operates with distributed DB units
56 # It supports complete DRCE protocol requests and process responses from DRCE cluster.
57 #
59 
60  DRCE_REDUCER_TTL = 3000000
61  SITE_PROPERTIES_RECRAWL_WHERE_NAME = "RECRAWL_WHERE"
62  SITE_PROPERTIES_RECRAWL_DELETE_WHERE_NAME = "RECRAWL_DELETE_WHERE"
63  SITE_PROPERTIES_RECRAWL_DELETE_NAME = "RECRAWL_DELETE"
64  SITE_PROPERTIES_RECRAWL_OPTIMIZE_NAME = "RECRAWL_OPTIMIZE"
65 
66  SITE_PROPERTIES_RECRAWL_PERIOD_MODE_NAME = "RECRAWL_PERIOD_MODE"
67  SITE_PROPERTIES_RECRAWL_PERIOD_MIN_NAME = "RECRAWL_PERIOD_MIN"
68  SITE_PROPERTIES_RECRAWL_PERIOD_MAX_NAME = "RECRAWL_PERIOD_MAX"
69  SITE_PROPERTIES_RECRAWL_PERIOD_STEP_NAME = "RECRAWL_PERIOD_STEP"
70  SITE_RECRAWL_THREAD_NAME_PREFIX = 'ReCrawl_'
71 
72  # Configuration settings options names
73  CONFIG_SERVER = "server"
74  CONFIG_DRCE_HOST = "DRCEHost"
75  CONFIG_DRCE_PORT = "DRCEPort"
76  CONFIG_DRCE_TIMEOUT = "DRCETimeout"
77  CONFIG_DRCE_DB_APP_NAME = "DRCEDBAppName"
78  CONFIG_RECRAWL_SITES_MAX = "RecrawlSiteMax"
79  CONFIG_RECRAWL_SITES_ITER_PERIOD = "RecrawlSiteIterationPeriod"
80  CONFIG_RECRAWL_SITES_PERIOD_MODE = "RecrawlSitePeriodMode"
81  CONFIG_RECRAWL_SITES_PERIOD_MIN = "RecrawlSitePeriodMin"
82  CONFIG_RECRAWL_SITES_PERIOD_MAX = "RecrawlSitePeriodMax"
83  CONFIG_RECRAWL_SITES_PERIOD_STEP = "RecrawlSitePeriodStep"
84  CONFIG_RECRAWL_SITES_RECRAWL_DATE_EXP = "RecrawlSiteRecrawlDateExpression"
85  CONFIG_RECRAWL_SITES_SELECT_CRITERION = "RecrawlSiteSelectCriterion"
86  CONFIG_RECRAWL_SITES_SELECT_ORDER = "RecrawlSiteSelectOrder"
87  CONFIG_RECRAWL_SITES_MAX_THREADS = "RecrawlSiteMaxThreads"
88  CONFIG_RECRAWL_SITES_LOCK_STATE = "RecrawlSiteLockState"
89  CONFIG_RECRAWL_SITES_OPTIMIZE = "RecrawlSiteOptimize"
90  CONFIG_RECRAWL_SITES_DRCE_TIMEOUT = "RecrawlSiteDRCETimeout"
91  CONFIG_RECRAWL_SITES_MODE = "RecrawlSiteMode"
92  CONFIG_RECRAWL_DELAY_BEFORE = "RecrawlDelayBefore"
93  CONFIG_RECRAWL_DELAY_AFTER = "RecrawlDelayAfter"
94  CONFIG_POLLING_TIMEOUT = "PollingTimeout"
95  CONFIG_DEFAULT_RECRAWL_UPDATE_CRITERION = "DefaultRecrawUpdatelCriterion"
96  CONFIG_DEFAULT_RECRAWL_DELETE_OLD = "DefaultRecrawDeleteOld"
97  CONFIG_DEFAULT_RECRAWL_DELETE_OLD_CRITERION = "DefaultRecrawDeleteOldCriterion"
98  CONFIG_DRCE_ROUTE = "DRCERoute"
99  CONFIG_PURGE_METHOD = "PurgeMethod"
100  CONFIG_DRCE_NODES = "DRCENodes"
101  CONFIG_COMMON_COMMANDS_THREADING_MODE = "CommonCommandsThreadingMode"
102 
103  DRCE_CONNECTIONS_POOL = "DRCEConnectionsPool"
104  COMMON_COMMANDS_THREADING_SIMPLE = 0
105  COMMON_COMMANDS_THREADING_MULTI = 1
106  COMMON_COMMANDS_THREAD_NAME_PREFIX = 'Common_'
107 
108  # #constructor
109  # initialize fields
110  #
111  # @param configParser config parser object
112  # @param connectBuilderLight connection builder light
113  #
114  def __init__(self, configParser, connectionBuilderLight=None):
115  super(SitesManager, self).__init__()
116 
117  # Sites re-crawl counter name for stat variables
118  self.updateStatField(DC_CONSTS.SITES_RECRAWL_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
119  # Sites re-crawl sites updated counter name for stat variables
120  self.updateStatField(DC_CONSTS.SITES_RECRAWL_UPDATED_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
121  # Sites re-crawl sites deleted counter name for stat variables
122  self.updateStatField(DC_CONSTS.SITES_RECRAWL_DELETED_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
123  # Sites DRCE requests counter name for stat variables
124  self.updateStatField(DC_CONSTS.SITES_DRCE_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
125 
126  # Instantiate the connection builder light if not set
127  if connectionBuilderLight is None:
128  connectionBuilderLight = ConnectionBuilderLight()
129 
130  className = self.__class__.__name__
131 
132  # Get configuration settings
133  self.serverName = configParser.get(className, self.CONFIG_SERVER)
134  self.DRCEDBAppName = configParser.get(className, self.CONFIG_DRCE_DB_APP_NAME)
135 
136  # Create connections and raise bind or connect actions for correspondent connection type
137  serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.serverName)
138 
139  # Add connections to the polling set
140  self.addConnection(self.serverName, serverConnection)
141 
142  # Set handlers for all events of SITE and URL operations
143  self.eventTypes = {EVENT_TYPES.SITE_NEW:EVENT_TYPES.SITE_NEW_RESPONSE,
144  EVENT_TYPES.SITE_UPDATE:EVENT_TYPES.SITE_UPDATE_RESPONSE,
145  EVENT_TYPES.SITE_STATUS:EVENT_TYPES.SITE_STATUS_RESPONSE,
146  EVENT_TYPES.SITE_DELETE:EVENT_TYPES.SITE_DELETE_RESPONSE,
147  EVENT_TYPES.SITE_CLEANUP:EVENT_TYPES.SITE_CLEANUP_RESPONSE,
148  EVENT_TYPES.SITE_FIND:EVENT_TYPES.SITE_FIND_RESPONSE,
149  EVENT_TYPES.URL_NEW:EVENT_TYPES.URL_NEW_RESPONSE,
150  EVENT_TYPES.URL_STATUS:EVENT_TYPES.URL_STATUS_RESPONSE,
151  EVENT_TYPES.URL_UPDATE:EVENT_TYPES.URL_UPDATE_RESPONSE,
152  EVENT_TYPES.URL_DELETE:EVENT_TYPES.URL_DELETE_RESPONSE,
153  EVENT_TYPES.URL_FETCH:EVENT_TYPES.URL_FETCH_RESPONSE,
154  EVENT_TYPES.URL_CLEANUP:EVENT_TYPES.URL_CLEANUP_RESPONSE,
155  EVENT_TYPES.URL_CONTENT:EVENT_TYPES.URL_CONTENT_RESPONSE,
156  EVENT_TYPES.SQL_CUSTOM:EVENT_TYPES.SQL_CUSTOM_RESPONSE,
157  EVENT_TYPES.URL_PUT:EVENT_TYPES.URL_PUT_RESPONSE,
158  EVENT_TYPES.URL_HISTORY:EVENT_TYPES.URL_HISTORY_RESPONSE,
159  EVENT_TYPES.URL_STATS:EVENT_TYPES.URL_STATS_RESPONSE,
160  EVENT_TYPES.PROXY_NEW:EVENT_TYPES.PROXY_NEW_RESPONSE,
161  EVENT_TYPES.PROXY_UPDATE:EVENT_TYPES.PROXY_UPDATE_RESPONSE,
162  EVENT_TYPES.PROXY_DELETE:EVENT_TYPES.PROXY_DELETE_RESPONSE,
163  EVENT_TYPES.PROXY_STATUS:EVENT_TYPES.PROXY_STATUS_RESPONSE,
164  EVENT_TYPES.PROXY_FIND:EVENT_TYPES.PROXY_FIND_RESPONSE,
165  EVENT_TYPES.ATTR_SET:EVENT_TYPES.ATTR_SET_RESPONSE,
166  EVENT_TYPES.ATTR_UPDATE:EVENT_TYPES.ATTR_UPDATE_RESPONSE,
167  EVENT_TYPES.ATTR_DELETE:EVENT_TYPES.ATTR_DELETE_RESPONSE,
168  EVENT_TYPES.ATTR_FETCH:EVENT_TYPES.ATTR_FETCH_RESPONSE}
169  for ret in self.eventTypes:
170  self.setEventHandler(ret, self.onEventsHandler)
171 
172  # Initialize DRCE API
173  self.configVars[self.CONFIG_DRCE_TIMEOUT] = configParser.getint(className, self.CONFIG_DRCE_TIMEOUT)
174  self.configVars[self.CONFIG_DRCE_ROUTE] = configParser.get(className, self.CONFIG_DRCE_ROUTE)
175  try:
176  self.configVars[self.CONFIG_DRCE_NODES] = configParser.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME,
177  self.CONFIG_DRCE_NODES)
178  except ConfigParser.NoOptionError:
179  self.configVars[self.CONFIG_DRCE_NODES] = 1
180  self.drceHost = configParser.get(className, self.CONFIG_DRCE_HOST)
181  self.drcePort = configParser.get(className, self.CONFIG_DRCE_PORT)
182  hostParams = HostParams(self.drceHost, self.drcePort)
184  self.drceManager.activate_host(hostParams)
187 
188  # Init config vars storage for auto re-crawl
189  self.configVars[self.CONFIG_RECRAWL_SITES_MAX] = configParser.getint(className, self.CONFIG_RECRAWL_SITES_MAX)
191  configParser.getint(className, self.CONFIG_RECRAWL_SITES_ITER_PERIOD)
193  configParser.get(className, self.CONFIG_RECRAWL_SITES_RECRAWL_DATE_EXP)
195  configParser.get(className, self.CONFIG_RECRAWL_SITES_SELECT_CRITERION)
197  configParser.get(className, self.CONFIG_RECRAWL_SITES_SELECT_ORDER)
199  configParser.getint(className, self.CONFIG_RECRAWL_SITES_LOCK_STATE)
201  configParser.getint(className, self.CONFIG_RECRAWL_SITES_OPTIMIZE)
203  configParser.getint(className, self.CONFIG_RECRAWL_SITES_DRCE_TIMEOUT)
204  self.configVars[self.CONFIG_RECRAWL_DELAY_BEFORE] = configParser.getint(className, self.CONFIG_RECRAWL_DELAY_BEFORE)
205  self.configVars[self.CONFIG_RECRAWL_DELAY_AFTER] = configParser.getint(className, self.CONFIG_RECRAWL_DELAY_AFTER)
206  self.processRecrawlLastTs = time.time()
207 
208  # Set connections poll timeout, defines period of HCE cluster monitoring cycle, msec
209  self.configVars[self.POLL_TIMEOUT_CONFIG_VAR_NAME] = configParser.getint(className, self.CONFIG_POLLING_TIMEOUT)
210 
211  # Init default re-crawl update criterion
213  configParser.get(className, self.CONFIG_DEFAULT_RECRAWL_UPDATE_CRITERION)
214 
215  # Init default re-crawl delete old URLs operation
217  configParser.getint(className, self.CONFIG_DEFAULT_RECRAWL_DELETE_OLD)
218  # Init default re-crawl delete old URLs operation's criterion
220  configParser.get(className, self.CONFIG_DEFAULT_RECRAWL_DELETE_OLD_CRITERION)
221  # Init default re-crawl threads max number
223  configParser.getint(className, self.CONFIG_RECRAWL_SITES_MAX_THREADS)
224  # Init re-crawl mode
226  configParser.getint(className, self.CONFIG_RECRAWL_SITES_MODE)
227 
228  # Init sites re-crawl queue
230  # Init sites re-crawl threads counter
231  self.updateStatField(DC_CONSTS.RECRAWL_THREADS_COUNTER_QUEUE_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
232  # Init sites re-crawl threads counter
233  self.updateStatField(DC_CONSTS.RECRAWL_SITES_QUEUE_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
234  # Init sites re-crawl threads created counter
235  self.updateStatField(DC_CONSTS.RECRAWL_THREADS_CREATED_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
236 
237  # Purge algorithm init
238  self.configVars[self.CONFIG_PURGE_METHOD] = configParser.getint(className, self.CONFIG_PURGE_METHOD)
239 
240  # Recrawl period mode
242  configParser.getint(className, self.CONFIG_RECRAWL_SITES_PERIOD_MODE)
244  configParser.getint(className, self.CONFIG_RECRAWL_SITES_PERIOD_MIN)
246  configParser.getint(className, self.CONFIG_RECRAWL_SITES_PERIOD_MAX)
248  configParser.getint(className, self.CONFIG_RECRAWL_SITES_PERIOD_STEP)
249 
250  # Init common operations variables
251  # Init common threads created counter
252  self.updateStatField(DC_CONSTS.COMMON_THREADS_CREATED_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
253  # Init sites re-crawl threads counter
254  self.updateStatField(DC_CONSTS.COMMON_THREADS_COUNTER_QUEUE_NAME, 0, self.STAT_FIELDS_OPERATION_SET)
255  # Common operations counter name for stat variables
256  self.updateStatField(DC_CONSTS.COMMON_OPERATIONS_COUNTER_NAME, 0, self.STAT_FIELDS_OPERATION_INIT)
257 
258  # Init DRCE connections pool and event types / commands assignment
259  try:
260  self.drceConnectionsPool = json.loads(configParser.get(className, self.DRCE_CONNECTIONS_POOL))
261  except ConfigParser.NoOptionError:
262  self.drceConnectionsPool = None
263  except Exception as err:
264  logger.error("Error de-serialize json of connection parameters for DRCE connections pool: %s", err)
265  self.drceConnectionsPool = None
266 
267  # Init common commands threading model
268  try:
270  configParser.getint(className, self.CONFIG_COMMON_COMMANDS_THREADING_MODE)
271  except ConfigParser.NoOptionError:
273 
274 
275 
276  # #Events wait timeout handler, for timeout state of the connections polling. Executes periodical check of DTM tasks
277  # and initiate the main crawling iteration cycle
278  #
279  def on_poll_timeout(self):
280  logger.debug("Periodic iteration started.")
281  try:
282  # Process periodic re-crawling
283  if self.configVars[self.CONFIG_RECRAWL_SITES_ITER_PERIOD] > 0 and\
284  time.time() - self.processRecrawlLastTs > self.configVars[self.CONFIG_RECRAWL_SITES_ITER_PERIOD]:
285  if self.configVars[self.CONFIG_RECRAWL_SITES_MODE] == 1:
286  logger.debug("Now time to try to perform re-crawl, interval %s",
289  self.statFields[DC_CONSTS.RECRAWL_THREADS_COUNTER_QUEUE_NAME]:
290  self.processRecrawlLastTs = time.time()
291  logger.info("Forking new recrawl thread")
292  self.updateStatField(DC_CONSTS.RECRAWL_THREADS_CREATED_COUNTER_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
293  t1 = threading.Thread(target=self.processRecrawling, args=(logging,))
294  t1.setName(self.SITE_RECRAWL_THREAD_NAME_PREFIX + \
295  str(self.statFields[DC_CONSTS.RECRAWL_THREADS_CREATED_COUNTER_NAME]))
296  t1.start()
297  logger.info("New recrawl thread forked")
298  else:
299  # Max threads limit reached
300  logger.debug("Max recrawl threads limit reached %s",
302  else:
303  logger.debug("Re-crawl disabled!")
304  except IOError as e:
305  del e
306  except Exception as err:
307  Utils.ExceptionLog.handler(logger, err, "Exception:")
308 
309  logger.debug("Periodic iteration finished.")
310 
311 
312 
313  # #onEventsHandler event handler for all requests
314  #
315  # @param event instance of Event object
316  def onEventsHandler(self, event):
317  try:
319  logger.info("Common command in simple mode")
320  # Call threa-safe handler direct way as simple single m-type hce-node cluster
321  self.eventsHandlerTS(event, logging)
322  else:
323  # Call threa-safe handler multi-thread way as multi m-type hce-node cluster
324  logger.info("Forking new common commands thread")
325  self.updateStatField(DC_CONSTS.COMMON_THREADS_CREATED_COUNTER_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
326  t2 = threading.Thread(target=self.eventsHandlerTS, args=(event, logging,))
327  t2.setName(self.COMMON_COMMANDS_THREAD_NAME_PREFIX + \
328  str(self.statFields[DC_CONSTS.COMMON_THREADS_CREATED_COUNTER_NAME]))
329  t2.start()
330  logger.info("New common commands thread forked")
331  except IOError as e:
332  del e
333  except Exception as err:
334  Utils.ExceptionLog.handler(logger, err, "Exception:")
335 
336  self.on_poll_timeout()
337 
338 
339 
340  # #eventsHandlerThread thread-safe event handler for all requests
341  #
342  # @param event instance of Event object
343  # @param loggingObj
344  def eventsHandlerTS(self, event, loggingObj):
345  global logger # pylint: disable=W0603
346 
347  lock.acquire()
348  logger = loggingObj.getLogger(DC_CONSTS.LOGGER_NAME)
350  self.updateStatField(DC_CONSTS.COMMON_THREADS_COUNTER_QUEUE_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
351  self.updateStatField(DC_CONSTS.COMMON_OPERATIONS_COUNTER_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
352  # Fix some fields values (limits) of event object using the nodes number if >1
353  if self.configVars[self.CONFIG_DRCE_NODES] > 1:
354  self.fixFields(event, self.configVars[self.CONFIG_DRCE_NODES])
355  logger.debug("Request event:\n" + Utils.varDump(event))
356  # Prepare DRCE request
357  drceRequest = self.prepareDRCERequest(event.eventType, event.eventObj)
359  connectionParams, timeout = self.getDRCEConnectionParamsFromPool(event.eventType)
360  persistentConnection = False
361  else:
362  connectionParams = None
363  timeout = -1
364  persistentConnection = True
365  lock.release()
366 
367  # Send DRCE request
368  clientResponseObj = self.processDRCERequest(drceRequest, persistentConnection, timeout, connectionParams)
369  logger.debug("Response ClientResponseObj:\n" + Utils.varDump(clientResponseObj))
370 
371  lock.acquire()
372  # Prepare reply event
373  replyEvent = self.eventBuilder.build(self.eventTypes[event.eventType], clientResponseObj)
374 
375  # Append source event cookies because they will be copied to reply event
376  if event.eventType == EVENT_TYPES.URL_CONTENT:
377  if event.cookie is None:
378  event.cookie = {}
379  if isinstance(event.cookie, dict):
380  event.cookie[EventObjects.URLFetch.CRITERION_ORDER] = []
381  for urlContentRequestItem in event.eventObj:
382  if urlContentRequestItem.urlFetch is not None and\
383  EventObjects.URLFetch.CRITERION_ORDER in urlContentRequestItem.urlFetch.urlsCriterions:
384  event.cookie[EventObjects.URLFetch.CRITERION_ORDER].append(
385  urlContentRequestItem.urlFetch.urlsCriterions[EventObjects.URLFetch.CRITERION_ORDER]) # pylint: disable=C0330
386  else:
387  event.cookie[EventObjects.URLFetch.CRITERION_ORDER].append("")
388  if len(event.cookie) == 0:
389  event.cookie = None
390 
391  if event.eventType == EVENT_TYPES.URL_FETCH:
392  if event.cookie is None:
393  event.cookie = {}
394  if isinstance(event.cookie, dict):
395  event.cookie[EventObjects.URLFetch.CRITERION_ORDER] = []
396  for urlFetchRequestItem in event.eventObj:
397  if EventObjects.URLFetch.CRITERION_ORDER in urlFetchRequestItem.urlsCriterions:
398  event.cookie[EventObjects.URLFetch.CRITERION_ORDER].append(
399  urlFetchRequestItem.urlsCriterions[EventObjects.URLFetch.CRITERION_ORDER]) # pylint: disable=C0330
400  else:
401  event.cookie[EventObjects.URLFetch.CRITERION_ORDER].append("")
402  if len(event.cookie) == 0:
403  event.cookie = None
404 
405  # Send reply
406  self.reply(event, replyEvent)
407  logger.info("Reply sent")
408  lock.release()
409 
410  lock.acquire()
412  self.updateStatField(DC_CONSTS.COMMON_THREADS_COUNTER_QUEUE_NAME, 1, self.STAT_FIELDS_OPERATION_SUB)
413  lock.release()
414 
415 
416 
417  # #Get connection ((hots, port), timeout) tuple of tuples by the event type or tuple (None, -1)
418  #
419  # @param eventType event type from Constants
420  # @return the tuple of tuples ((host, port), timeout) or tuple (None, -1)
421  def getDRCEConnectionParamsFromPool(self, eventType):
422  ret = (None, -1)
423 
424  if self.drceConnectionsPool is not None:
425  try:
426  for item in self.drceConnectionsPool:
427  if eventType in self.drceConnectionsPool[item]:
428  parts = item.split(':')
429  if len(parts) > 2:
430  ret = ((parts[0], int(parts[1])), int(parts[2]))
431  logger.info("Connection options found for event %s: %s", str(eventType), str(ret))
432  break
433  else:
434  logger.error("Wrong items number 'host:port:timeout' in DRCE connections pool key: %s", str(item))
435  except Exception as err:
436  logger.error("Error get DRCE connection parameters, possible wrong ini value for DRCE connections pool: %s\n%s",
437  err, str(self.drceConnectionsPool))
438 
439  return ret
440 
441 
442 
443  # #Prepare DRCE request
444  #
445  # @param eventType event type from Constants
446  # @param eventObj instance of Event object
447  # @return the TaskExecuteStruct object instance
448  def prepareDRCERequest(self, eventType, eventObj):
449  # Prepare DRCE request data
450  drceSyncTasksCoverObj = DC_CONSTS.DRCESyncTasksCover(eventType, eventObj)
451  # Prepare DRCE request object
452  taskExecuteStruct = TaskExecuteStruct()
453  taskExecuteStruct.command = self.DRCEDBAppName
454  taskExecuteStruct.input = pickle.dumps(drceSyncTasksCoverObj)
455  taskExecuteStruct.session = Session(Session.TMODE_SYNC)
456  logger.debug("DRCE taskExecuteStruct:\n" + Utils.varDump(taskExecuteStruct))
457 
458  return taskExecuteStruct
459 
460 
461 
462  # #Request action processor for DRCE DB cluster
463  #
464  # @param taskExecuteStruct object
465  # @param persistentDCREConnection use persistent single connection to the DRCE or new per each request, boolean
466  # @param connectionParams - tuple (host, port) for additional m-type hce-node cluster
467  def processDRCERequest(self, taskExecuteStruct, persistentDCREConnection=True, timeout=-1, connectionParams=None):
468  lock.acquire()
469  # Create DRCE TaskExecuteRequest object
470  idGenerator = IDGenerator()
471  taskId = ctypes.c_uint32(zlib.crc32(idGenerator.get_connection_uid(), int(time.time()))).value
472  taskExecuteRequest = TaskExecuteRequest(taskId)
473  if self.configVars[self.CONFIG_DRCE_ROUTE] != "":
474  taskExecuteRequest.route = self.configVars[self.CONFIG_DRCE_ROUTE]
475  # Set taskExecuteRequest fields
476  taskExecuteRequest.data = taskExecuteStruct
477  lock.release()
478 
479  logger.info("Sending sync task id:" + str(taskId) + " to DRCE router!")
480  # Send request to DRCE Cluster router
481  response = self.sendToDRCERouter(taskExecuteRequest, persistentDCREConnection, timeout, connectionParams)
482  logger.info("Received response on sync task from DRCE router!")
483  logger.debug("Response: %s", Utils.varDump(response))
484 
485  # Create new client response object
486  clientResponse = EventObjects.ClientResponse()
487  # Check response returned
488  if response is None:
489  clientResponse.errorCode = EventObjects.ClientResponse.STATUS_ERROR_NONE
490  clientResponse.errorMessage = "Response error, None returned from DRCE, possible timeout " + \
491  str(self.configVars[self.CONFIG_DRCE_TIMEOUT]) + " msec!"
492  logger.error(clientResponse.errorMessage)
493  else:
494  if len(response.items) == 0:
495  clientResponse.errorCode = EventObjects.ClientResponse.STATUS_ERROR_EMPTY_LIST
496  clientResponse.errorMessage = "Response error, empty list returned from DRCE, possible no one node in cluster!"
497  logger.error(clientResponse.errorMessage)
498  else:
499  for item in response.items:
500  # New ClientResponseItem object
501  clientResponseItem = EventObjects.ClientResponseItem(None)
502  # If some error in response item or cli application exit status
503  if item.error_code > 0 or item.exit_status > 0:
504  clientResponseItem.errorCode = clientResponseItem.STATUS_ERROR_DRCE
505  clientResponseItem.errorMessage = "Response item error error_message=" + item.error_message + \
506  ", error_code=" + str(item.error_code) + \
507  ", exit_status=" + str(item.exit_status) + \
508  ", stderror=" + str(item.stderror)
509  logger.error(clientResponseItem.errorMessage)
510  else:
511  # Try to restore serialized response object from dump
512  try:
513  drceSyncTasksCover = pickle.loads(item.stdout)
514  clientResponseItem.itemObject = drceSyncTasksCover.eventObject
515  except Exception as e:
516  clientResponseItem.errorCode = EventObjects.ClientResponseItem.STATUS_ERROR_RESTORE_OBJECT
517  clientResponseItem.errorMessage = EventObjects.ClientResponseItem.MSG_ERROR_RESTORE_OBJECT + "\n" + \
518  str(e.message) + "\nstdout=" + str(item.stdout) + \
519  ", stderror=" + str(item.stderror)
520  logger.error(clientResponseItem.errorMessage)
521  # Set all information fields in any case
522  clientResponseItem.id = item.id
523  clientResponseItem.host = item.host
524  clientResponseItem.port = item.port
525  clientResponseItem.node = item.node
526  clientResponseItem.time = item.time
527  # Add ClientResponseItem object
528  clientResponse.itemsList.append(clientResponseItem)
529 
530  return clientResponse
531 
532 
533 
534  # #Send to send to DRCE Router transport router connection
535  #
536  # @param messageBody body of the message
537  # @param persistentDCREConnection use persistent DRCE connection or new
538  # @param connectionParams - tuple (host, port) for additional m-type hce-node cluster
539  # @return EEResponseData object instance
540  def sendToDRCERouter(self, request, persistentDCREConnection=True, timeout=-1, connectionParams=None):
541  lock.acquire()
542  if timeout == -1:
543  timeout = int(self.configVars[self.CONFIG_DRCE_TIMEOUT])
544  self.updateStatField(DC_CONSTS.SITES_DRCE_COUNTER_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
545  if persistentDCREConnection:
546  logger.info("DRCE router sending via persistent connection with timeout=%s", str(timeout))
547  drceManager = self.drceManager
548  else:
549  drceManager = DRCEManager()
550  if connectionParams is None:
551  drceManager.activate_host(HostParams(self.drceHost, self.drcePort))
552  logger.info("DRCE router sending via temporary connection with timeout=" + str(timeout) + \
553  ", and regular host:" + str(self.drceHost) + ", port:" + str(self.drcePort))
554  else:
555  logger.info("DRCE router sending via temporary connection with timeout=" + str(timeout) + \
556  ", and DRCE connections pool host:" + str(connectionParams[0]) + \
557  ", port:" + str(connectionParams[1]))
558  drceManager.activate_host(HostParams(connectionParams[0], int(connectionParams[1])))
559  lock.release()
560 
561  # Try to execute request
562  try:
563  response = drceManager.process(request, timeout, self.DRCE_REDUCER_TTL)
564  except (ConnectionTimeout, TransportInternalErr, CommandExecutorErr) as err:
565  response = None
566  logger.error("DRCE router transport send error : " + str(err.message))
567  except Exception as err:
568  response = None
569  logger.error("DRCE router common error : " + str(err.message))
570 
571  logger.info("DRCE router sent!")
572 
573  if not persistentDCREConnection:
574  lock.acquire()
575  drceManager.clear_host()
576  lock.release()
577 
578  return response
579 
580 
581 
582  # #Process periodic auto re-crawling with new thread as method function
583  # @param logging object instance
584  #
585  #
586  def processRecrawling(self, loggingObj):
587  try:
588  global logger # pylint: disable=W0603
589  lock.acquire()
590  logger = loggingObj.getLogger(DC_CONSTS.LOGGER_NAME)
591  self.updateStatField(DC_CONSTS.RECRAWL_THREADS_COUNTER_QUEUE_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
592  self.updateStatField(DC_CONSTS.SITES_RECRAWL_COUNTER_NAME, 1, self.STAT_FIELDS_OPERATION_ADD)
593  lock.release()
594  logger.info("RECRAWL_THREAD_STARTED")
595  # Set timeout in msecs
596  timeout = self.configVars[self.CONFIG_RECRAWL_SITES_DRCE_TIMEOUT] * 1000
597 
598  # Find sites that need to be re-crawled
599  crit = {EventObjects.SiteFind.CRITERION_WHERE: self.configVars[self.CONFIG_RECRAWL_SITES_SELECT_CRITERION],
600  EventObjects.SiteFind.CRITERION_ORDER: self.configVars[self.CONFIG_RECRAWL_SITES_SELECT_ORDER],
601  EventObjects.SiteFind.CRITERION_LIMIT: str(self.configVars[self.CONFIG_RECRAWL_SITES_MAX])}
602  siteFind = EventObjects.SiteFind(None, crit)
603  logger.debug("Send DRCE request SITE_FIND")
604  clientResponse = self.processDRCERequest(self.prepareDRCERequest(EVENT_TYPES.SITE_FIND, siteFind), False, timeout)
605  logger.debug("clientResponse:" + Utils.varDump(clientResponse))
606 
607  lock.acquire()
608  # Process results and create unique sites dict
609  sites = self.getSitesFromClientResponseItems(clientResponse.itemsList)
610  lock.release()
611 
612  sitesQueue = {}
613  for siteId in sites.keys():
614  if siteId in self.recrawlSiteslQueue:
615  # Site is already in progress of recrawl by some thread
616  logger.debug("Site %s is already in progress of recrawl by some thread", str(siteId))
617  continue
618 
619  lock.acquire()
620  # Push site item in to the cross-threading queue
621  t1 = time.time()
622  self.recrawlSiteslQueue[str(siteId)] = {"siteObj":sites[siteId], "time":t1}
623  # Push Site item in to the local queue
624  sitesQueue[str(siteId)] = {"time":t1}
625  # Refresh stat queue size
626  self.updateStatField(DC_CONSTS.RECRAWL_SITES_QUEUE_NAME, len(self.recrawlSiteslQueue),
628  lock.release()
629 
630  urlUpdateList = []
631  urlDeleteList = []
632  logger.debug("Site selected for recrawl, site[" + str(siteId) + "]:\n" + Utils.varDump(sites[siteId]))
633  # Save prev. site state to restore after re-crawl will be finished
634  sitePrevState = sites[siteId].state
635  # Update site including lock by state
636  siteUpdate = EventObjects.SiteUpdate(siteId)
637  siteUpdate.iterations = EventObjects.SQLExpression("Iterations+1")
638  siteUpdate.tcDate = EventObjects.SQLExpression("Now()")
639  siteUpdate.uDate = siteUpdate.tcDate
640  if sites[siteId].recrawlPeriod > 0 and self.configVars[self.CONFIG_RECRAWL_SITES_RECRAWL_DATE_EXP] != "":
641  # Set next re-crawl date if periodic re-crawl
642  siteUpdate.recrawlDate = \
644  else:
645  # Disable re-crawl
646  siteUpdate.recrawlDate = EventObjects.SQLExpression("NULL")
647  if self.configVars[self.CONFIG_RECRAWL_SITES_LOCK_STATE] != "":
648  # Set state to lock site
649  siteUpdate.state = int(self.configVars[self.CONFIG_RECRAWL_SITES_LOCK_STATE])
650  else:
651  logger.debug("Site is not locked due empty string value of configuration parameter %s",
653  logger.debug("Update site request including lock state if configured, id=%s", str(siteId))
654  # Update site fields
655  clientResponse = self.processDRCERequest(self.prepareDRCERequest(EVENT_TYPES.SITE_UPDATE, siteUpdate), False,
656  timeout)
657  logger.debug("Update site request done, id=%s", str(siteId))
658  # Check responses on errors
659  lock.acquire()
660  self.logGeneralResponseResults(clientResponse)
661  lock.release()
662 
663  if self.configVars[self.CONFIG_RECRAWL_DELAY_BEFORE] > 0:
664  # Delay after set site state SUSPENDED
665  logger.debug("Delay before, %s sec...", str(self.configVars[self.CONFIG_RECRAWL_DELAY_BEFORE]))
666  time.sleep(self.configVars[self.CONFIG_RECRAWL_DELAY_BEFORE])
667 
668  # Prepare URLs update
669  urlUpdate = EventObjects.URLUpdate(siteId, "")
670  urlUpdate.url = None
671  urlUpdate.urlMd5 = None
672  urlUpdate.status = EventObjects.URLUpdate.STATUS_NEW
673  crit = EventObjects.Site.getFromProperties(sites[siteId].properties, self.SITE_PROPERTIES_RECRAWL_WHERE_NAME)
674  if crit is None or crit == "":
676  logger.debug("Default update criterion: " + str(crit))
677  else:
678  logger.debug("Custom site update criterion: " + str(crit))
679  urlUpdate.criterions = {EventObjects.URLFetch.CRITERION_WHERE : crit}
680  urlUpdateList.append(urlUpdate)
681  # Delete old URLs from prev. re-crawling iteration
684  siteDelOld = EventObjects.Site.getFromProperties(sites[siteId].properties,
686  if siteDelOld is None or siteDelOld != "0":
687  siteSqlExpression = EventObjects.Site.getFromProperties(sites[siteId].properties,
689  logger.debug("Site expression: " + str(siteSqlExpression))
690  if siteSqlExpression is not None and siteSqlExpression != "":
691  sqlExpression = siteSqlExpression
692  logger.debug("Custom expression set: " + str(sqlExpression))
693  else:
694  logger.debug("Site delete old: " + str(siteDelOld))
695  urlDelete = EventObjects.URLDelete(siteId, None, EventObjects.URLStatus.URL_TYPE_MD5,
696  {EventObjects.URLFetch.CRITERION_WHERE:sqlExpression},
697  reason=EventObjects.URLDelete.REASON_RECRAWL)
698  urlDelete.urlType = None
699  urlDelete.delayedType = self.configVars[self.CONFIG_PURGE_METHOD]
700  logger.debug("Old URLs delete due re-crawl: " + Utils.varDump(urlDelete))
701  urlDeleteList.append(urlDelete)
702  logger.debug("URLDelete request, id=%s", str(siteId))
703  # Delete old URLs from prev. re-crawling
704  clientResponse = self.processDRCERequest(self.prepareDRCERequest(EVENT_TYPES.URL_DELETE, urlDeleteList),
705  False, timeout)
706  logger.debug("URLDelete request done, id=%s", str(siteId))
707  lock.acquire()
708  self.updateStatField(DC_CONSTS.SITES_RECRAWL_DELETED_COUNTER_NAME, len(urlDeleteList),
710  # Check responses on errors
711  self.logGeneralResponseResults(clientResponse)
712  lock.release()
713 
714  # Optimize urls table
715  optimize = EventObjects.Site.getFromProperties(sites[siteId].properties,
717  if (int(self.configVars[self.CONFIG_RECRAWL_SITES_OPTIMIZE]) == 1 and optimize is None) or int(optimize) == 1:
718  sqlQuery = "OPTIMIZE TABLE `urls_" + str(siteId) + "`"
719  logger.debug("CustomRequest query: %s", sqlQuery)
720  customRequest = CustomRequest(1, sqlQuery, "dc_urls")
721  # OPTIMIZE TABLE request
722  customResponse = self.processDRCERequest(self.prepareDRCERequest(EVENT_TYPES.SQL_CUSTOM, customRequest),
723  False, timeout)
724  logger.debug("CustomRequest request done, id=%s, customResponse:\n%s", str(siteId),
725  Utils.varDump(customResponse))
726 
727  # Auto tune RecrawlPeriod
728  recrawlPeriod = self.recalculateRecrawlPeriod(sites[siteId])
729  if recrawlPeriod is not None:
730  siteUpdate.recrawlPeriod = recrawlPeriod
731 
732  # Update URLs to push them to start crawling
733  clientResponse = self.processDRCERequest(self.prepareDRCERequest(EVENT_TYPES.URL_UPDATE, urlUpdateList), False,
734  timeout)
735  lock.acquire()
736  self.updateStatField(DC_CONSTS.SITES_RECRAWL_UPDATED_COUNTER_NAME, len(urlUpdateList),
738  # Check responses on errors
739  self.logGeneralResponseResults(clientResponse)
740  lock.release()
741 
742  if self.configVars[self.CONFIG_RECRAWL_DELAY_AFTER] > 0:
743  # Delay after re-crawl processing done
744  logger.debug("Delay after, %s sec...", str(self.configVars[self.CONFIG_RECRAWL_DELAY_AFTER]))
745  time.sleep(self.configVars[self.CONFIG_RECRAWL_DELAY_AFTER])
746 
747  # Return site to previous state, i.e. unlock
748  if self.configVars[self.CONFIG_RECRAWL_SITES_LOCK_STATE] != "":
749  # Set state to unlock site
750  siteUpdate.state = sitePrevState
751  siteUpdate.iterations = None
752  siteUpdate.tcDate = EventObjects.SQLExpression("Now()")
753  siteUpdate.uDate = None
754  siteUpdate.recrawlDate = None
755  logger.debug("Unlock site request, id=%s", str(siteId))
756  # Update site fields
757  clientResponse = self.processDRCERequest(self.prepareDRCERequest(EVENT_TYPES.SITE_UPDATE, siteUpdate), False,
758  timeout)
759  logger.debug("Unlock site request done, id=%s", str(siteId))
760  # Check responses on errors
761  lock.acquire()
762  self.logGeneralResponseResults(clientResponse)
763  lock.release()
764  else:
765  logger.debug("Site is not unlocked due empty string value of configuration parameter %s",
767  # lock.acquire()
768  # self.totalTime = self.totalTime + (time.time() - t)
769  # self.updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_TIME_AVG_NAME,
770  # str(self.totalTime / float(1 + self.statFields[DC_CONSTS.BATCHES_CRAWL_COUNTER_TOTAL_NAME])),
771  # self.STAT_FIELDS_OPERATION_SET)
772  # lock.release()
773  except Exception as err:
774  lock.acquire()
775  logger.error("Recrawl thread exception:" + str(err))
776  lock.release()
777  except: # pylint: disable=W0702
778  lock.acquire()
779  logger.error("Recrawl thread unknown exception!")
780  lock.release()
781 
782  lock.acquire()
783  # Remove processed sites from global the cross-threading queue
784  tmpQueue = {}
785  for siteId in self.recrawlSiteslQueue.keys():
786  if siteId not in sitesQueue:
787  # Accumulate only items that is not processed by this thread
788  tmpQueue[str(siteId)] = self.recrawlSiteslQueue[str(siteId)]
789  # Replace current queue with accumulated queue
790  self.recrawlSiteslQueue = tmpQueue
791  # Decrement of counter of threads
792  self.updateStatField(DC_CONSTS.RECRAWL_THREADS_COUNTER_QUEUE_NAME, 1, self.STAT_FIELDS_OPERATION_SUB)
793  # Refresh stat queue size
794  self.updateStatField(DC_CONSTS.RECRAWL_SITES_QUEUE_NAME, len(self.recrawlSiteslQueue),
796  logger.info("RECRAWL_THREAD_FINISHED")
797  lock.release()
798 
799 
800 
801  # #Get sites dict {SiteId:Site} from the ClientResponse object items
802  #
803  # @param clientResponseItems The list of ClientResponseItem objects
804  def getSitesFromClientResponseItems(self, clientResponseItems):
805  batchItemsCounter = 0
806  batchItemsTotalCounter = 0
807  uniqueSitesDic = {}
808 
809  for item in clientResponseItems:
810  if item.errorCode == EventObjects.ClientResponseItem.STATUS_OK:
811  if isinstance(item.itemObject, list):
812  for site in item.itemObject:
813  batchItemsTotalCounter = batchItemsTotalCounter + 1
814  if isinstance(site, EventObjects.Site):
815  if str(site.id) not in uniqueSitesDic:
816  uniqueSitesDic[str(site.id)] = site
817  batchItemsCounter = batchItemsCounter + 1
818  else:
819  # Sum for counters fields
820  uniqueSitesDic[str(site.id)].newURLs = uniqueSitesDic[str(site.id)].newURLs + site.newURLs
821  uniqueSitesDic[str(site.id)].collectedURLs = uniqueSitesDic[str(site.id)].collectedURLs + \
822  site.collectedURLs
823  uniqueSitesDic[str(site.id)].deletedURLs = uniqueSitesDic[str(site.id)].deletedURLs + site.deletedURLs
824  uniqueSitesDic[str(site.id)].contents = uniqueSitesDic[str(site.id)].contents + site.contents
825  uniqueSitesDic[str(site.id)].resources = uniqueSitesDic[str(site.id)].resources + site.resources
826  else:
827  logger.error("Wrong object type in the itemObject.item: " + str(type(site)) + \
828  " but 'Site' expected")
829  else:
830  logger.error("Wrong object type in the ClientResponseItem.itemObject: " + str(type(item.itemObject)) + \
831  " but 'list' expected")
832  else:
833  logger.debug("ClientResponseItem error: " + str(item.errorCode) + " : " + item.errorMessage)
834 
835  logger.debug("Unique sites: " + str(batchItemsCounter) + ", total sites: " + str(batchItemsTotalCounter))
836 
837  return uniqueSitesDic
838 
839 
840 
841  # #Recalculates the RecrawlPeriod value for the site if auto tune up is ON
842  #
843  # @param siteObj The Site object
844  def recalculateRecrawlPeriod(self, siteObj):
845  recrawlPeriod = None
846 
847  # Init per site settings
848  mode = EventObjects.Site.getFromProperties(siteObj.properties, self.SITE_PROPERTIES_RECRAWL_PERIOD_MODE_NAME)
849  if mode is None:
851  else:
852  mode = int(mode)
853  minv = EventObjects.Site.getFromProperties(siteObj.properties, self.SITE_PROPERTIES_RECRAWL_PERIOD_MIN_NAME)
854  if minv is None:
856  else:
857  minv = int(minv)
858  maxv = EventObjects.Site.getFromProperties(siteObj.properties, self.SITE_PROPERTIES_RECRAWL_PERIOD_MAX_NAME)
859  if maxv is None:
861  else:
862  maxv = int(maxv)
863  step = EventObjects.Site.getFromProperties(siteObj.properties, self.SITE_PROPERTIES_RECRAWL_PERIOD_STEP_NAME)
864  if step is None:
866  else:
867  step = int(step)
868 
869  logger.debug("RecrawlPeriod auto recalculate, siteId:%s, mode:%s, minv:%s, maxv:%s, step:%s", str(siteObj.id),
870  str(mode), str(minv), str(maxv), str(step))
871 
872  # If recrawl period recalculation mode is ON
873  if mode > 0:
874  logger.debug("RecrawlPeriod auto recalculate is ON, siteId:%s, current value:%s",
875  str(siteObj.id), str(siteObj.recrawlPeriod))
876  # If not all URLS in state NEW are crawled or processed (siteObj.resources - means crawled but not processed)
877  if siteObj.newURLs > 0 or siteObj.resources > 0:
878  # If maximum value of period is not reached
879  if siteObj.recrawlPeriod < maxv:
880  # Increase period on step value
881  recrawlPeriod = siteObj.recrawlPeriod + step
882  else:
883  logger.debug("Max value of RecrawlPeriod reached:%s", str(maxv))
884  else:
885  # If minimum value of period is not reached
886  if siteObj.recrawlPeriod > minv:
887  # Decrease the period on step value
888  recrawlPeriod = siteObj.recrawlPeriod - step
889  else:
890  logger.debug("Min value of RecrawlPeriod reached:%s", str(minv))
891  logger.debug("New RecrawlPeriod value for site %s is:%s", str(siteObj.id), str(recrawlPeriod))
892 
893  return recrawlPeriod
894 
895 
896 
897  # #Check the ClientResponse object with GeneralResponse objects in items list, log errors
898  #
899  # @param ClientResponse object
900  def logGeneralResponseResults(self, clientResponse):
901  if isinstance(clientResponse, EventObjects.ClientResponse):
902  if clientResponse.errorCode > 0:
903  logger.error("clientResponse.errorCode:" + str(clientResponse.errorCode) + ":" + clientResponse.errorMessage)
904  for clientResponseItem in clientResponse.itemsList:
905  if isinstance(clientResponseItem, EventObjects.ClientResponseItem):
906  if clientResponseItem.errorCode != EventObjects.ClientResponseItem.STATUS_OK:
907  logger.error("ClientResponseItem error: " + str(clientResponseItem.errorCode) + " : " + \
908  clientResponseItem.errorMessage + "\n" + Utils.varDump(clientResponseItem))
909  else:
910  logger.error("Wrong type: " + str(type(clientResponseItem)) + ", expected ClientResponseItem\n" + \
911  Utils.varDump(clientResponseItem))
912  else:
913  logger.error("Wrong type: " + str(type(clientResponse)) + ", expected ClientResponse\n" + \
914  Utils.varDump(clientResponse))
915 
916 
917 
918  # #Fix fields of the event object depends of operation type to reflect distributed structure if nodes>1
919  #
920  # @param event object
921  # @param nodes number
922  def fixFields(self, event, nodes):
923  try:
924  if (event.eventType == DC_CONSTS.EVENT_TYPES.SITE_NEW and isinstance(event.eventObj, EventObjects.Site)) or\
925  (event.eventType == DC_CONSTS.EVENT_TYPES.SITE_UPDATE and isinstance(event.eventObj, EventObjects.SiteUpdate)):
926  fieldsList = ["maxURLs", "maxResources", "maxErrors"]
927  for fieldName in fieldsList:
928  setattr(event.eventObj, fieldName, self.fixField(getattr(event.eventObj, fieldName), nodes, fieldName))
929  except Exception, e:
930  logger.error("Error %s", str(e))
931 
932 
933 
934  # #Fix field value with numeric divide with rounding to greater value
935  #
936  # @param value of the field
937  # @param divider value
938  # @param comment for log
939  def fixField(self, value, divider, comment):
940  if value is not None:
941  d = int(divider)
942  v = int(value)
943 
944  ret = int(int(v) / int(d))
945 
946  if ret < 1 and v > 0:
947  ret = 1
948 
949  if ret * d < v:
950  ret = ret + 1
951 
952  logger.debug("Initial value of field `%s` from %s was fixed to %s, divider %s", comment, str(value), str(ret),
953  str(divider))
954  else:
955  ret = value
956 
957  return ret
958 
def processRecrawling(self, loggingObj)
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def fixFields(self, event, nodes)
def __init__(self, configParser, connectionBuilderLight=None)
def prepareDRCERequest(self, eventType, eventObj)
string CONFIG_DEFAULT_RECRAWL_DELETE_OLD_CRITERION
Definition: SitesManager.py:97
def updateStatField(self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
update values of stat field - default sum
def onEventsHandler(self, event)
wrapper for TaskExecuteStruct
Definition: Commands.py:87
string SITE_PROPERTIES_RECRAWL_PERIOD_MODE_NAME
Definition: SitesManager.py:66
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
def getSitesFromClientResponseItems(self, clientResponseItems)
string CONFIG_DEFAULT_RECRAWL_UPDATE_CRITERION
Definition: SitesManager.py:95
wrapper for task request
Definition: Commands.py:158
string SITE_PROPERTIES_RECRAWL_PERIOD_STEP_NAME
Definition: SitesManager.py:69
wrapper for Session fields array of execute task
Definition: Commands.py:32
def on_poll_timeout(self)
function will call every time when ConnectionTimeout exception arrive
def fixField(self, value, divider, comment)
def eventsHandlerTS(self, event, loggingObj)
string SITE_PROPERTIES_RECRAWL_PERIOD_MIN_NAME
Definition: SitesManager.py:67
UIDGenerator is used to generate unique message id.
Definition: UIDGenerator.py:14
def sendToDRCERouter(self, request, persistentDCREConnection=True, timeout=-1, connectionParams=None)
Class hides routines of bulding connection objects.
string CONFIG_RECRAWL_SITES_RECRAWL_DATE_EXP
Definition: SitesManager.py:84
def getDRCEConnectionParamsFromPool(self, eventType)
string SITE_PROPERTIES_RECRAWL_OPTIMIZE_NAME
Definition: SitesManager.py:64
string SITE_PROPERTIES_RECRAWL_DELETE_WHERE_NAME
Definition: SitesManager.py:62
string SITE_PROPERTIES_RECRAWL_PERIOD_MAX_NAME
Definition: SitesManager.py:68
def logGeneralResponseResults(self, clientResponse)
string CONFIG_RECRAWL_SITES_SELECT_CRITERION
Definition: SitesManager.py:85
IDGenerator is used to generate unique id for connections.
Definition: IDGenerator.py:15
def recalculateRecrawlPeriod(self, siteObj)
Convertor which used to convert Task*Reques to json and TaskResponse from json.
def processDRCERequest(self, taskExecuteStruct, persistentDCREConnection=True, timeout=-1, connectionParams=None)