HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
dc.ClientInterfaceService.ClientInterfaceService Class Reference
Inheritance diagram for dc.ClientInterfaceService.ClientInterfaceService:
Collaboration diagram for dc.ClientInterfaceService.ClientInterfaceService:

Public Member Functions

def __init__ (self, configParser, connectBuilderLight)
 
def onSitesManagerRoute (self, event)
 
def onBatchTasksManagerRealTimeRoute (self, event)
 
def onDCClientRoute (self, event)
 
def registerEvent (self, event)
 
def getRequestEvent (self, event)
 
def unregisterEvent (self, event)
 
def mergeResultsData (self, eventType, eventObj, eventCookie)
 
def sortURLContentResults (self, itemObject, criterion)
 
def sortURLFetchResults (self, itemObject, criterion)
 
def mergeResultsURLContent (self, newItemsList, clientResponseItem)
 
def mergeResultsSiteStatus (self, mergedSite, clientResponseItem)
 
def mergeResultsSiteFields (self, siteToMerge, siteMergeWith)
 
def mergeResultsSiteFind (self, newItemsList, clientResponseItem)
 
def mergeResultsURLFetch (self, newItemsList, clientResponseItem)
 
def mergeResultsGeneralResponse (self, mergedGeneralResponse, clientResponseItem)
 
def mergeResultsBatch (self, newItemsList, clientResponseItem)
 
def getUnixTimeFromString (self, buf, dateFormat='%Y-%m-%d %H:%M:%S', valueType=0)
 
- Public Member Functions inherited from app.BaseServerManager.BaseServerManager
def __init__ (self, poller_manager=None, admin_connection=None, conectionLightBuilder=None, exceptionForward=False, dumpStatVars=True)
 constructor More...
 
def addConnection (self, name, connection)
 
def setEventHandler (self, eventType, handler)
 set event handler rewrite the current handler for eventType More...
 
def send (self, connect_name, event)
 send event More...
 
def reply (self, event, reply_event)
 wrapper for sending event in reply for event More...
 
def poll (self)
 poll function polling connections receive as multipart msg, the second argument is pickled pyobj More...
 
def process (self, event)
 process event call the event handler method that was set by user or on_unhandled_event method if not set More...
 
def run (self)
 
def is_connection_registered (self, name)
 check is a connection was registered in a instance of BaseServerManager i object More...
 
def on_poll_timeout (self)
 function will call every time when ConnectionTimeout exception arrive More...
 
def on_unhandled_event (self, event)
 function will call every time when arrive doesn't set handler for event type of event.evenType More...
 
def build_poller_list (self)
 
def clear_poller (self)
 
def onAdminState (self, event)
 onAdminState event handler process admin SHUTDOWN command More...
 
def onAdminFetchStatData (self, event)
 onAdminState event handler process admin command More...
 
def onAdminSuspend (self, event)
 onAdminState event handler process admin command More...
 
def getStatDataFields (self, fields)
 getStatDataFields returns stat data from storage More...
 
def getSystemStat (self)
 getSystemStat returns stat data for system indicators: RAMV, RAMR and CPU More...
 
def getConfigVarsFields (self, fields)
 getConfigVarsFields returns config vars from storage More...
 
def onAdminGetConfigVars (self, event)
 onAdminGetConfigVars event handler process getConfigVars admin command, fill and return config vars array from internal storage More...
 
def onAdminSetConfigVars (self, event)
 onAdminSetConfigVars event handler process setConfigVars admin command More...
 
def setConfigVars (self, setConfigVars)
 processSetConfigVars sets config vars in storage More...
 
def sendAdminReadyEvent (self)
 send ready event to notify adminInterfaceService More...
 
def createLogMsg (self, event)
 from string message from event object More...
 
def initStatFields (self, connect_name)
 add record in statFields More...
 
def updateStatField (self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
 update values of stat field - default sum More...
 
def processSpecialConfigVars (self, name, value)
 send ready event to notify adminInterfaceService More...
 
def getLogLevel (self)
 Get log level from first of existing loggers. More...
 
def setLogLevel (self, level)
 Set log level for all loggers. More...
 
def saveStatVarsDump (self)
 Save stat vars in json file. More...
 
def loadStatVarsDump (self)
 Load stat vars in json file. More...
 
def getStatVarsDumpFileName (self)
 Get stat vars file name. More...
 
def createDBIDict (self, configParser)
 

Public Attributes

 sitesManager
 
 batchTasksManagerRealTime
 
 processEvents
 
- Public Attributes inherited from app.BaseServerManager.BaseServerManager
 dumpStatVars
 
 poller_manager
 
 eventBuilder
 
 exit_flag
 
 pollTimeout
 
 connections
 
 event_handlers
 
 statFields
 stat fields container More...
 
 configVars
 
 exceptionForward
 

Static Public Attributes

string CONFIG_SECTION = "ClientInterfaceService"
 
string CONFIG_SERVER_HOST = "serverHost"
 
string CONFIG_SERVER_PORT = "serverPort"
 
string CONFIG_SITES_MANAGER = "clientSitesManager"
 
string SERVER_CONNECTION_NAME = "server"
 
string CONNECTION_PREFIX = "Connection"
 
string CONFIG_BATCH_TASKS_MANAGER_REALTIME = "clientBatchTasksManagerRealTime"
 
string CONFIG_DRCE_NODES = "DRCENodes"
 
- Static Public Attributes inherited from app.BaseServerManager.BaseServerManager
string ADMIN_CONNECT_ENDPOINT = "Admin"
 
string ADMIN_CONNECT_CLIENT = "Admin"
 
int POLL_TIMEOUT_DEFAULT = 3000
 
int STAT_FIELDS_OPERATION_ADD = 0
 
int STAT_FIELDS_OPERATION_SUB = 1
 
int STAT_FIELDS_OPERATION_SET = 2
 
int STAT_FIELDS_OPERATION_INIT = 3
 
string POLL_TIMEOUT_CONFIG_VAR_NAME = "POLL_TIMEOUT"
 
string LOG_LEVEL_CONFIG_VAR_NAME = "LOG_LEVEL"
 
string STAT_DUMPS_DEFAULT_DIR = "/tmp/"
 
string STAT_DUMPS_DEFAULT_NAME = "%APP_NAME%_%CLASS_NAME%_stat_vars.dump"
 
dictionary LOGGERS_NAMES = {APP_CONSTS.LOGGER_NAME, "dc", "dtm", "root", ""}
 

Detailed Description

Definition at line 29 of file ClientInterfaceService.py.

Constructor & Destructor Documentation

◆ __init__()

def dc.ClientInterfaceService.ClientInterfaceService.__init__ (   self,
  configParser,
  connectBuilderLight 
)
Constructor

Definition at line 45 of file ClientInterfaceService.py.

45  def __init__(self, configParser, connectBuilderLight):
46  '''
47  Constructor
48  '''
49  BaseServerManager.__init__(self)
50 
51  serverHost = configParser.get(self.CONFIG_SECTION, self.CONFIG_SERVER_HOST)
52  serverPort = configParser.get(self.CONFIG_SECTION, self.CONFIG_SERVER_PORT)
53  server = serverHost + ":" + str(serverPort)
54  self.sitesManager = configParser.get(self.CONFIG_SECTION, self.CONFIG_SITES_MANAGER)
55  self.batchTasksManagerRealTime = configParser.get(self.CONFIG_SECTION, self.CONFIG_BATCH_TASKS_MANAGER_REALTIME)
56 
57  try:
58  self.configVars[self.CONFIG_DRCE_NODES] = configParser.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME,
59  self.CONFIG_DRCE_NODES)
60  except ConfigParser.NoOptionError:
61  self.configVars[self.CONFIG_DRCE_NODES] = 1
62 
63  serverConnection = connectBuilderLight.build(consts.SERVER_CONNECT, server, consts.TCP_TYPE)
64  sitesManagerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, self.sitesManager)
65  batchTasksManagerRealTimeConnection = connectBuilderLight.build(consts.CLIENT_CONNECT,
66  self.batchTasksManagerRealTime)
67 
68  self.addConnection(self.SERVER_CONNECTION_NAME + self.CONNECTION_PREFIX, serverConnection)
69  self.addConnection(str(self.sitesManager) + self.CONNECTION_PREFIX, sitesManagerConnection)
70  self.addConnection(str(self.batchTasksManagerRealTime) + self.CONNECTION_PREFIX,
71  batchTasksManagerRealTimeConnection)
72 
73  self.setEventHandler(EVENT_TYPES.SITE_NEW, self.onSitesManagerRoute)
74  self.setEventHandler(EVENT_TYPES.SITE_UPDATE, self.onSitesManagerRoute)
75  self.setEventHandler(EVENT_TYPES.SITE_STATUS, self.onSitesManagerRoute)
76  self.setEventHandler(EVENT_TYPES.SITE_DELETE, self.onSitesManagerRoute)
77  self.setEventHandler(EVENT_TYPES.SITE_CLEANUP, self.onSitesManagerRoute)
78  self.setEventHandler(EVENT_TYPES.SITE_FIND, self.onSitesManagerRoute)
79 
80  self.setEventHandler(EVENT_TYPES.URL_NEW, self.onSitesManagerRoute)
81  self.setEventHandler(EVENT_TYPES.URL_STATUS, self.onSitesManagerRoute)
82  self.setEventHandler(EVENT_TYPES.URL_UPDATE, self.onSitesManagerRoute)
83  self.setEventHandler(EVENT_TYPES.URL_FETCH, self.onSitesManagerRoute)
84  self.setEventHandler(EVENT_TYPES.URL_DELETE, self.onSitesManagerRoute)
85  self.setEventHandler(EVENT_TYPES.URL_CLEANUP, self.onSitesManagerRoute)
86  self.setEventHandler(EVENT_TYPES.URL_CONTENT, self.onSitesManagerRoute)
87  self.setEventHandler(EVENT_TYPES.SQL_CUSTOM, self.onSitesManagerRoute)
88  self.setEventHandler(EVENT_TYPES.URL_PUT, self.onSitesManagerRoute)
89  self.setEventHandler(EVENT_TYPES.URL_HISTORY, self.onSitesManagerRoute)
90  self.setEventHandler(EVENT_TYPES.URL_STATS, self.onSitesManagerRoute)
91 
92  self.setEventHandler(EVENT_TYPES.PROXY_NEW, self.onSitesManagerRoute)
93  self.setEventHandler(EVENT_TYPES.PROXY_UPDATE, self.onSitesManagerRoute)
94  self.setEventHandler(EVENT_TYPES.PROXY_DELETE, self.onSitesManagerRoute)
95  self.setEventHandler(EVENT_TYPES.PROXY_STATUS, self.onSitesManagerRoute)
96  self.setEventHandler(EVENT_TYPES.PROXY_FIND, self.onSitesManagerRoute)
97 
98  self.setEventHandler(EVENT_TYPES.ATTR_SET, self.onSitesManagerRoute)
99  self.setEventHandler(EVENT_TYPES.ATTR_UPDATE, self.onSitesManagerRoute)
100  self.setEventHandler(EVENT_TYPES.ATTR_DELETE, self.onSitesManagerRoute)
101  self.setEventHandler(EVENT_TYPES.ATTR_FETCH, self.onSitesManagerRoute)
102 
103  self.setEventHandler(EVENT_TYPES.SITE_NEW_RESPONSE, self.onDCClientRoute)
104  self.setEventHandler(EVENT_TYPES.SITE_UPDATE_RESPONSE, self.onDCClientRoute)
105  self.setEventHandler(EVENT_TYPES.SITE_STATUS_RESPONSE, self.onDCClientRoute)
106  self.setEventHandler(EVENT_TYPES.SITE_DELETE_RESPONSE, self.onDCClientRoute)
107  self.setEventHandler(EVENT_TYPES.SITE_CLEANUP_RESPONSE, self.onDCClientRoute)
108  self.setEventHandler(EVENT_TYPES.SITE_FIND_RESPONSE, self.onDCClientRoute)
109 
110  self.setEventHandler(EVENT_TYPES.URL_NEW_RESPONSE, self.onDCClientRoute)
111  self.setEventHandler(EVENT_TYPES.URL_STATUS_RESPONSE, self.onDCClientRoute)
112  self.setEventHandler(EVENT_TYPES.URL_UPDATE_RESPONSE, self.onDCClientRoute)
113  self.setEventHandler(EVENT_TYPES.URL_FETCH_RESPONSE, self.onDCClientRoute)
114  self.setEventHandler(EVENT_TYPES.URL_DELETE_RESPONSE, self.onDCClientRoute)
115  self.setEventHandler(EVENT_TYPES.URL_CLEANUP_RESPONSE, self.onDCClientRoute)
116  self.setEventHandler(EVENT_TYPES.URL_CONTENT_RESPONSE, self.onDCClientRoute)
117  self.setEventHandler(EVENT_TYPES.SQL_CUSTOM_RESPONSE, self.onDCClientRoute)
118  self.setEventHandler(EVENT_TYPES.URL_PUT_RESPONSE, self.onDCClientRoute)
119  self.setEventHandler(EVENT_TYPES.URL_HISTORY_RESPONSE, self.onDCClientRoute)
120  self.setEventHandler(EVENT_TYPES.URL_STATS_RESPONSE, self.onDCClientRoute)
121 
122  self.setEventHandler(EVENT_TYPES.PROXY_NEW_RESPONSE, self.onDCClientRoute)
123  self.setEventHandler(EVENT_TYPES.PROXY_UPDATE_RESPONSE, self.onDCClientRoute)
124  self.setEventHandler(EVENT_TYPES.PROXY_DELETE_RESPONSE, self.onDCClientRoute)
125  self.setEventHandler(EVENT_TYPES.PROXY_STATUS_RESPONSE, self.onDCClientRoute)
126  self.setEventHandler(EVENT_TYPES.PROXY_FIND_RESPONSE, self.onDCClientRoute)
127 
128  self.setEventHandler(EVENT_TYPES.ATTR_SET_RESPONSE, self.onDCClientRoute)
129  self.setEventHandler(EVENT_TYPES.ATTR_UPDATE_RESPONSE, self.onDCClientRoute)
130  self.setEventHandler(EVENT_TYPES.ATTR_DELETE_RESPONSE, self.onDCClientRoute)
131  self.setEventHandler(EVENT_TYPES.ATTR_FETCH_RESPONSE, self.onDCClientRoute)
132 
133  self.setEventHandler(EVENT_TYPES.BATCH, self.onBatchTasksManagerRealTimeRoute)
134  self.setEventHandler(EVENT_TYPES.BATCH_RESPONSE, self.onDCClientRoute)
135 
136  # map of incoming event, which are in processing
137  # event.uid => event without eventObj field
138  self.processEvents = dict()
139 
140 
141 
def __init__(self)
constructor
Definition: UIDGenerator.py:19

Member Function Documentation

◆ getRequestEvent()

def dc.ClientInterfaceService.ClientInterfaceService.getRequestEvent (   self,
  event 
)

Definition at line 204 of file ClientInterfaceService.py.

204  def getRequestEvent(self, event):
205  return self.processEvents[event.uid]
206 
207 
208 
Here is the caller graph for this function:

◆ getUnixTimeFromString()

def dc.ClientInterfaceService.ClientInterfaceService.getUnixTimeFromString (   self,
  buf,
  dateFormat = '%Y-%m-%d %H:%M:%S',
  valueType = 0 
)

Definition at line 749 of file ClientInterfaceService.py.

749  def getUnixTimeFromString(self, buf, dateFormat='%Y-%m-%d %H:%M:%S', valueType=0):
750  ret = 0
751 
752  try:
753  ret = time.mktime(time.strptime(str(buf), dateFormat))
754  if valueType == 0:
755  ret = int(ret)
756  except Exception as err:
757  logger.error("Error get date from: `%s` with format: `%s` : %s", str(buf), dateFormat, str(err))
758 
759  return ret
760 
761 
Here is the caller graph for this function:

◆ mergeResultsBatch()

def dc.ClientInterfaceService.ClientInterfaceService.mergeResultsBatch (   self,
  newItemsList,
  clientResponseItem 
)

Definition at line 715 of file ClientInterfaceService.py.

715  def mergeResultsBatch(self, newItemsList, clientResponseItem):
716  itemsList = clientResponseItem.itemObject
717 
718  # If this is first item in results list, init itemObject with empty list
719  if newItemsList is None:
720  newItemsList = []
721 
722  if isinstance(itemsList, list):
723  # For each URLContentResponse object in the clientResponseItem.itemObject
724  for urlContentResponse in itemsList:
725  # If raw or processed content exists
726  # if len(urlContentResponse.rawContents) > 0 or len(urlContentResponse.processedContents) > 0:
727  # Add content to list
728  if urlContentResponse is not None:
729  urlContentResponse.host = clientResponseItem.host
730  newItemsList.append(urlContentResponse)
731  logger.debug("Added urlContentResponse:\n" + Utils.varDump(urlContentResponse))
732  else:
733  logger.debug("Rejected urlContentResponse")
734  # else:
735  # logger.debug("Empty contents lists in urlContentResponse:\n" + Utils.varDump(urlContentResponse))
736  else:
737  # Object is null or wrong type
738  logger.error("Wrong type of clientResponseItem.itemObject\n" + Utils.varDump(itemsList))
739 
740  return newItemsList
741 
742 
Here is the caller graph for this function:

◆ mergeResultsData()

def dc.ClientInterfaceService.ClientInterfaceService.mergeResultsData (   self,
  eventType,
  eventObj,
  eventCookie 
)

Definition at line 223 of file ClientInterfaceService.py.

223  def mergeResultsData(self, eventType, eventObj, eventCookie):
224  if isinstance(eventObj, EventObjects.ClientResponse):
225  # If number of client response items grater than one perform the merging, else leave untouched
226  if len(eventObj.itemsList) > 1:
227  if isinstance(eventObj.itemsList[0], EventObjects.ClientResponseItem):
228  # Create new list of ClientResponseItem items and fill it with first item from response
229  newItemObject = None
230  newHost = ""
231  newPort = ""
232  newNode = ""
233  newTime = ""
234  newErrorMessage = ""
235  newErrorCode = ""
236  logger.debug("Merging, response items: %s", str(len(eventObj.itemsList)))
237  mergedCounter = 0
238  # Cycle response items
239  for clientResponseItem in eventObj.itemsList:
240  # If response item exists
241  if clientResponseItem.itemObject is not None:
242  mergedCounter = mergedCounter + 1
243  logger.debug("clientResponseItem:\n" + Utils.varDump(clientResponseItem))
244  newHost += clientResponseItem.host + ";"
245  newPort += str(clientResponseItem.port) + ";"
246  newNode += clientResponseItem.node + ";"
247  newTime += str(clientResponseItem.time) + ";"
248  newErrorMessage += clientResponseItem.errorMessage
249  newErrorCode += str(clientResponseItem.errorCode) + ";"
250  if eventType == EVENT_TYPES.SITE_FIND_RESPONSE:
251  # Merge SITE_FIND operation results response
252  newItemObject = self.mergeResultsSiteFind(newItemObject, clientResponseItem)
253  elif eventType == EVENT_TYPES.URL_FETCH_RESPONSE or eventType == EVENT_TYPES.URL_STATUS_RESPONSE:
254  # Merge URL_FETCH operation results response
255  newItemObject = self.mergeResultsURLFetch(newItemObject, clientResponseItem)
256  elif eventType == EVENT_TYPES.URL_CONTENT_RESPONSE:
257  # Merge URL_CONTENT operation results response
258  newItemObject = self.mergeResultsURLContent(newItemObject, clientResponseItem)
259  elif eventType == EVENT_TYPES.SITE_STATUS_RESPONSE:
260  # Merge SITE_STATUS operation results response
261  newItemObject = self.mergeResultsSiteStatus(newItemObject, clientResponseItem)
262  # elif eventType == EVENT_TYPES.SITE_NEW_RESPONSE:
263  # #Merge SITE_NEW operation results response
264  # newItemObject = self.mergeResultsGeneralResponse(newItemObject, clientResponseItem.itemObject)
265  # #Merge BATCH_RESPONSE operation results response
266  elif eventType == EVENT_TYPES.BATCH_RESPONSE:
267  # Merge URL_CONTENT operation results response
268  newItemObject = self.mergeResultsBatch(newItemObject, clientResponseItem)
269  else:
270  l = {EVENT_TYPES.SITE_NEW_RESPONSE, EVENT_TYPES.SITE_UPDATE_RESPONSE, EVENT_TYPES.SITE_DELETE_RESPONSE,
271  EVENT_TYPES.SITE_CLEANUP_RESPONSE, EVENT_TYPES.URL_NEW_RESPONSE, EVENT_TYPES.URL_UPDATE_RESPONSE,
272  EVENT_TYPES.URL_DELETE_RESPONSE, EVENT_TYPES.URL_CLEANUP_RESPONSE, EVENT_TYPES.URL_UPDATE_RESPONSE,
273  EVENT_TYPES.URL_DELETE_RESPONSE, EVENT_TYPES.URL_CLEANUP_RESPONSE} # pylint: disable=unused-import
274  if eventType in l:
275  # Merge another result types that no need special processing of fields but check and leave just one
276  newItemObject = self.mergeResultsGeneralResponse(newItemObject, clientResponseItem)
277  else:
278  # Object is null or wrong type
279  logger.error("The clientResponseItem.itemObject is None!")
280  # Replace items list with newly created and merged if itemObject is set
281  if newItemObject is not None:
282  eventObj.itemsList = [eventObj.itemsList[0]]
283  eventObj.itemsList[0].errorCode = newErrorCode.rstrip(";")
284  eventObj.itemsList[0].errorMessage = newErrorMessage.rstrip(";")
285  eventObj.itemsList[0].itemObject = newItemObject
286  eventObj.itemsList[0].host = newHost.rstrip(";")
287  eventObj.itemsList[0].port = newPort.rstrip(";")
288  eventObj.itemsList[0].node = newNode.rstrip(";")
289  eventObj.itemsList[0].time = newTime.rstrip(";")
290  if isinstance(newItemObject, list):
291  itemsNumber = len(newItemObject)
292  else:
293  itemsNumber = 1
294  logger.debug("Merged with " + str(mergedCounter) + " response objects, " + \
295  str(itemsNumber) + " merged items.")
296  # Does event is URL_CONTENT_RESPONSE and sort criterion defined
297  if eventType == EVENT_TYPES.URL_CONTENT_RESPONSE and eventCookie is not None\
298  and isinstance(eventCookie, dict) and EventObjects.URLFetch.CRITERION_ORDER in eventCookie and\
299  len(eventCookie[EventObjects.URLFetch.CRITERION_ORDER]) > 0 and\
300  eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0].strip() != '':
301  # TODO: now use only criterion of the first item of request items list
302  eventObj.itemsList[0].itemObject = self.sortURLContentResults(newItemObject,
303  eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0]) # pylint: disable=C0330
304  logger.debug("URL_CONTENT results sorted by the: %s",
305  str(eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0]))
306  # Does event is URL_FETCH_RESPONSE and sort criterion defined
307  elif eventType == EVENT_TYPES.URL_FETCH_RESPONSE and eventCookie is not None\
308  and isinstance(eventCookie, dict) and EventObjects.URLFetch.CRITERION_ORDER in eventCookie and\
309  len(eventCookie[EventObjects.URLFetch.CRITERION_ORDER]) > 0 and\
310  eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0].strip() != '':
311  # TODO: now use only criterion of the first item of request items list
312  eventObj.itemsList[0].itemObject = self.sortURLFetchResults(newItemObject,
313  eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0]) # pylint: disable=C0330
314  logger.debug("URL_CONTENT results sorted by the: %s",
315  str(eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0]))
316  else:
317  logger.debug("Sort conditions are not satisfied, results not sorted.")
318  else:
319  logger.debug("No items collected while merge procedure, merge skipped.")
320  else:
321  # Wrong type
322  logger.error("Wrong eventObj.itemsList[0] type " + str(type(eventObj.itemsList[0])) + \
323  " expected EventObjects.ClientResponseItem\n" + Utils.varDump(eventObj.itemsList[0]))
324  else:
325  logger.error("Wrong eventObj type " + str(type(eventObj)) + " expected EventObjects.ClientResponse\n" + \
326  Utils.varDump(eventObj))
327 
328  return eventObj
329 
330 
331 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mergeResultsGeneralResponse()

def dc.ClientInterfaceService.ClientInterfaceService.mergeResultsGeneralResponse (   self,
  mergedGeneralResponse,
  clientResponseItem 
)

Definition at line 684 of file ClientInterfaceService.py.

684  def mergeResultsGeneralResponse(self, mergedGeneralResponse, clientResponseItem):
685  currentGeneralResponse = clientResponseItem.itemObject
686 
687  if isinstance(currentGeneralResponse, dtm.EventObjects.GeneralResponse):
688  if mergedGeneralResponse is None:
689  # If this is first time call for results list init. with this Site object
690  mergedGeneralResponse = currentGeneralResponse
691  mergedGeneralResponse.host = clientResponseItem.host
692  logger.debug("Merge init GeneralResponse object:\n" + Utils.varDump(currentGeneralResponse))
693  else:
694  logger.debug("Merge with GeneralResponse object:\n" + Utils.varDump(currentGeneralResponse))
695  # Merge fields values
696  mergedGeneralResponse.errorCode = str(mergedGeneralResponse.errorCode) + ";" + \
697  str(currentGeneralResponse.errorCode)
698  mergedGeneralResponse.errorMessage = str(mergedGeneralResponse.errorMessage) + ";" + \
699  str(currentGeneralResponse.errorMessage)
700  mergedGeneralResponse.statuses.extend(currentGeneralResponse.statuses)
701  else:
702  # Object is null or wrong type
703  logger.error("Wrong type of currentGeneralResponse object: " + str(type(currentGeneralResponse)) + \
704  ", dtm.GeneralResponse expected\n" + Utils.varDump(currentGeneralResponse))
705 
706  return mergedGeneralResponse
707 
708 
709 
GeneralResponse event object, represents general state response for multipurpose usage.
Here is the caller graph for this function:

◆ mergeResultsSiteFields()

def dc.ClientInterfaceService.ClientInterfaceService.mergeResultsSiteFields (   self,
  siteToMerge,
  siteMergeWith 
)

Definition at line 556 of file ClientInterfaceService.py.

556  def mergeResultsSiteFields(self, siteToMerge, siteMergeWith):
557  # Set merged values for correspondent fields of the Site objects
558  siteToMerge.resources += siteMergeWith.resources
559  siteToMerge.contents += siteMergeWith.contents
560  siteToMerge.collectedURLs += siteMergeWith.collectedURLs
561  siteToMerge.newURLs += siteMergeWith.newURLs
562  siteToMerge.deletedURLs += siteMergeWith.deletedURLs
563  siteToMerge.iterations = max([siteToMerge.iterations, siteMergeWith.iterations])
564  siteToMerge.errors += siteMergeWith.errors
565  siteToMerge.errorMask |= siteMergeWith.errorMask
566  siteToMerge.size += siteMergeWith.size
567  siteToMerge.avgSpeed = min([siteToMerge.avgSpeed, siteMergeWith.avgSpeed])
568  siteToMerge.avgSpeedCounter = min([siteToMerge.avgSpeedCounter, siteMergeWith.avgSpeedCounter])
569 
570  if self.configVars[self.CONFIG_DRCE_NODES] > 1:
571  siteToMerge.maxURLs += siteMergeWith.maxURLs
572  siteToMerge.maxResources += siteMergeWith.maxResources
573  siteToMerge.maxErrors += siteMergeWith.maxErrors
574 
575  return siteToMerge
576 
577 
578 
Here is the caller graph for this function:

◆ mergeResultsSiteFind()

def dc.ClientInterfaceService.ClientInterfaceService.mergeResultsSiteFind (   self,
  newItemsList,
  clientResponseItem 
)

Definition at line 584 of file ClientInterfaceService.py.

584  def mergeResultsSiteFind(self, newItemsList, clientResponseItem):
585  itemsList = clientResponseItem.itemObject
586 
587  # If this is first item in results list, init itemObject with empty list
588  if newItemsList is None:
589  newItemsList = []
590 
591  if isinstance(itemsList, list):
592  # For each Site object in the clientResponseItem.itemObject
593  for site in itemsList:
594  # Check presence of this site in the newItemsList
595  present = False
596  for addedSite in newItemsList:
597  if addedSite.id == site.id:
598  # Merge site's data
599  addedSite = self.mergeResultsSiteFields(addedSite, site)
600  addedSite.host = clientResponseItem.host
601  present = True
602  break
603  if not present:
604  # Add content to list
605  site.host = clientResponseItem.host
606  newItemsList.append(site)
607  logger.debug("Added Site:\n" + Utils.varDump(site))
608  else:
609  # Object is null or wrong type
610  logger.error("Wrong type of clientResponseItem.itemObject\n" + Utils.varDump(itemsList))
611 
612  return newItemsList
613 
614 
615 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mergeResultsSiteStatus()

def dc.ClientInterfaceService.ClientInterfaceService.mergeResultsSiteStatus (   self,
  mergedSite,
  clientResponseItem 
)

Definition at line 529 of file ClientInterfaceService.py.

529  def mergeResultsSiteStatus(self, mergedSite, clientResponseItem):
530  currentSite = clientResponseItem.itemObject
531 
532  if isinstance(currentSite, EventObjects.Site):
533  # If this is first time call for results list init. with this Site object
534  if mergedSite is None:
535  mergedSite = currentSite
536  else:
537  logger.debug("Merge with Site object:\n" + Utils.varDump(currentSite))
538  # Merge site's data
539  mergedSite = self.mergeResultsSiteFields(mergedSite, currentSite)
540 
541  mergedSite.host = clientResponseItem.host
542  else:
543  # Object is null or wrong type
544  logger.error("Wrong type of currentSite object: " + str(type(currentSite)) + ", Site expected\n" + \
545  Utils.varDump(currentSite))
546 
547  return mergedSite
548 
549 
550 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mergeResultsURLContent()

def dc.ClientInterfaceService.ClientInterfaceService.mergeResultsURLContent (   self,
  newItemsList,
  clientResponseItem 
)

Definition at line 456 of file ClientInterfaceService.py.

456  def mergeResultsURLContent(self, newItemsList, clientResponseItem):
457  itemsList = clientResponseItem.itemObject
458 
459  # If this is first item in results list, init itemObject with empty list
460  if newItemsList is None:
461  newItemsList = []
462 
463  if isinstance(itemsList, list):
464  # For each URLContentResponse object in the clientResponseItem.itemObject
465  for urlContentResponse in itemsList:
466  # If raw or processed content exists
467  if len(urlContentResponse.rawContents) > 0 or len(urlContentResponse.processedContents) > 0:
468  # Check is candidate valid
469  for i, itemObject in enumerate(newItemsList):
470  # Is item already exists in accumulated list by urlMd5 from the same site or by the rawContentMd5 or
471  # the contentURLMd5 from another
472  if itemObject.siteId == urlContentResponse.siteId and\
473  (itemObject.urlMd5 == urlContentResponse.urlMd5 or\
474  (urlContentResponse.rawContentMd5 != '' and urlContentResponse.rawContentMd5 != '""' and\
475  itemObject.rawContentMd5 == urlContentResponse.rawContentMd5) or\
476  (urlContentResponse.contentURLMd5 != '' and urlContentResponse.contentURLMd5 != '""' and\
477  itemObject.contentURLMd5 == urlContentResponse.contentURLMd5)):
478  # Is item that exists is not processed or migrated from another host and candidate is not
479  # or status greater
480  if 'Batch_Id' not in itemObject.dbFields:
481  itemObject.dbFields['Batch_Id'] = 0
482  if 'Batch_Id' not in urlContentResponse.dbFields:
483  urlContentResponse.dbFields['Batch_Id'] = 0
484  # Is both items have crawled and processed but some one is older by UDate (experimental).
485  itemObjectUDate = 0
486  urlContentResponseUDate = 0
487  if 'UDate' in itemObject.dbFields:
488  itemObjectUDate = self.getUnixTimeFromString(itemObject.dbFields['UDate'])
489  if 'UDate' in urlContentResponse.dbFields:
490  urlContentResponseUDate = self.getUnixTimeFromString(urlContentResponse.dbFields['UDate'])
491  # logger.debug("urlContentResponseUDate: %s, itemObjectUDate: %s",
492  # str(urlContentResponseUDate), str(itemObjectUDate))
493  if (int(itemObject.dbFields['Batch_Id']) == 0 and int(urlContentResponse.dbFields['Batch_Id']) != 0) or\
494  (urlContentResponse.dbFields['Status'] > itemObject.dbFields['Status']) or\
495  (urlContentResponseUDate > itemObjectUDate):
496  # Item already exists and candidate is better and replaces it
497  logger.debug("Already exists itemObject:\n" + Utils.varDump(itemObject.dbFields) + \
498  "\nand replaced urlContentResponse:\n" + Utils.varDump(urlContentResponse.dbFields))
499  urlContentResponse.host = clientResponseItem.host
500  # itemObject = urlContentResponse
501  newItemsList[i] = urlContentResponse
502  else:
503  # Item already exists and better than candidate
504  logger.debug("Already exists urlContentResponse:\n" + Utils.varDump(urlContentResponse))
505  urlContentResponse = None
506  break
507  # Add content to list
508  if urlContentResponse is not None:
509  urlContentResponse.host = clientResponseItem.host
510  newItemsList.append(urlContentResponse)
511  logger.debug("Added urlContentResponse:\n" + Utils.varDump(urlContentResponse))
512  else:
513  logger.debug("Rejected urlContentResponse")
514  else:
515  logger.debug("Empty contents lists in urlContentResponse:\n" + Utils.varDump(urlContentResponse))
516  else:
517  # Object is null or wrong type
518  logger.error("Wrong type of clientResponseItem.itemObject\n" + Utils.varDump(itemsList))
519 
520  return newItemsList
521 
522 
523 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ mergeResultsURLFetch()

def dc.ClientInterfaceService.ClientInterfaceService.mergeResultsURLFetch (   self,
  newItemsList,
  clientResponseItem 
)

Definition at line 621 of file ClientInterfaceService.py.

621  def mergeResultsURLFetch(self, newItemsList, clientResponseItem):
622  itemsList = clientResponseItem.itemObject
623 
624  logger.debug("Merging object of URLFEtch, host: %s, items: %s", str(clientResponseItem.host), str(len(itemsList)))
625  # If this is first item in results list, init itemObject with empty list
626  if newItemsList is None:
627  newItemsList = []
628 
629  replacements = 0
630  insertions = 0
631 
632  if isinstance(itemsList, list):
633  # For each URL object in the clientResponseItem.itemObject
634  for url in itemsList:
635  # Check presence of this url in the newItemsList
636  present = False
637  for i, addedURL in enumerate(newItemsList):
638  if addedURL.urlMd5 == url.urlMd5:
639  logger.debug("URL found in list: %s", url.urlMd5)
640  # For case if both are valid - to get newer (experimental)
641  addedURLUDate = self.getUnixTimeFromString(addedURL.UDate)
642  urUDate = self.getUnixTimeFromString(url.UDate)
643  # logger.debug("addedURLUDate: %s, urUDate: %s", str(addedURLUDate), str(urUDate))
644  # Replace with the URL object with higher status value and not migrated from another host
645  if (addedURL.status <= url.status and (url.crawled > 0 and url.processed > 0 and url.batchId > 0)) or\
646  (addedURL.status == url.status and addedURL.crawled == 0 and url.crawled > 0) or\
647  (addedURL.status == url.status and addedURL.crawled > 0 and url.crawled > 0 and urUDate > addedURLUDate):
648  logger.debug("URL replaced in list with best fields values: %s, old.status=%s, new.status=%s, " + \
649  "old.crawled=%s, new.crawled=%s, old.processed=%s, new.processed=%s, old.batchId=%s, " + \
650  "new.batchId=%s, old.UDate=%s, new.UDate=%s",
651  url.urlMd5, str(addedURL.status), str(url.status), str(addedURL.crawled), str(url.crawled),
652  str(addedURL.processed), str(url.processed), str(addedURL.batchId), str(url.batchId),
653  str(addedURLUDate), str(urUDate))
654  url.host = clientResponseItem.host
655  newItemsList[i] = url
656  replacements += 1
657  else:
658  logger.debug("URL not replaced cause conditions not matched, old url:\n%s\ncandidate url:\n%s",
659  str(addedURL), str(url))
660  present = True
661  break
662  if not present:
663  # Add content to list
664  url.host = clientResponseItem.host
665  newItemsList.append(url)
666  logger.debug("Added URL: %s", str(url.urlMd5))
667  insertions += 1
668  else:
669  # Object is null or wrong type
670  logger.error("Wrong type of clientResponseItem.itemObject\n%s", Utils.varDump(itemsList))
671 
672  logger.debug("Merging object of URLFEtch finished, replacements: %s, insertions: %s", str(replacements),
673  str(insertions))
674 
675  return newItemsList
676 
677 
678 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ onBatchTasksManagerRealTimeRoute()

def dc.ClientInterfaceService.ClientInterfaceService.onBatchTasksManagerRealTimeRoute (   self,
  event 
)

Definition at line 160 of file ClientInterfaceService.py.

160  def onBatchTasksManagerRealTimeRoute(self, event):
161  try:
162  logger.debug("Received event: " + Utils.varDump(event))
163  self.send(str(self.batchTasksManagerRealTime) + self.CONNECTION_PREFIX, event)
164  self.registerEvent(event)
165  except KeyError as err:
166  logger.error(err.message)
167  except Exception, err:
168  logger.error("Error `%s`", str(err))
169 
170 
171 
Here is the call graph for this function:

◆ onDCClientRoute()

def dc.ClientInterfaceService.ClientInterfaceService.onDCClientRoute (   self,
  event 
)

Definition at line 175 of file ClientInterfaceService.py.

175  def onDCClientRoute(self, event):
176  try:
177  request_event = self.getRequestEvent(event)
178  if event.cookie is not None and isinstance(event.cookie, dict) and\
179  DC_CONSTANTS.MERGE_PARAM_NAME in event.cookie and bool(event.cookie[DC_CONSTANTS.MERGE_PARAM_NAME]) is False:
180  logger.debug("No merge results specified in Event.cookie!")
181  else:
182  logger.debug("Results merge try by default.")
183  event.eventObj = self.mergeResultsData(event.eventType, event.eventObj, event.cookie)
184  self.reply(request_event, event)
185  logger.debug("Results sent to DCC.")
186  self.unregisterEvent(request_event)
187  except KeyError as err:
188  logger.error(err.message)
189 
190 
191 
Here is the call graph for this function:

◆ onSitesManagerRoute()

def dc.ClientInterfaceService.ClientInterfaceService.onSitesManagerRoute (   self,
  event 
)

Definition at line 145 of file ClientInterfaceService.py.

145  def onSitesManagerRoute(self, event):
146  try:
147  logger.debug("Received event: " + Utils.varDump(event))
148  self.send(str(self.sitesManager) + self.CONNECTION_PREFIX, event)
149  self.registerEvent(event)
150  except KeyError as err:
151  logger.error(err.message)
152  except Exception, err:
153  logger.error("Error `%s`", str(err))
154 
155 
156 
Here is the call graph for this function:

◆ registerEvent()

def dc.ClientInterfaceService.ClientInterfaceService.registerEvent (   self,
  event 
)

Definition at line 195 of file ClientInterfaceService.py.

195  def registerEvent(self, event):
196  event.eventObj = None
197  self.processEvents[event.uid] = event
198 
199 
200 
Here is the caller graph for this function:

◆ sortURLContentResults()

def dc.ClientInterfaceService.ClientInterfaceService.sortURLContentResults (   self,
  itemObject,
  criterion 
)

Definition at line 337 of file ClientInterfaceService.py.

337  def sortURLContentResults(self, itemObject, criterion):
338  ret = itemObject
339 
340  try:
341  crits = criterion.split(',')
342  crits = crits[0]
343  crits = criterion.split(' ')
344  if len(crits) == 0:
345  crits = ('CDate', False)
346  elif len(crits) == 1:
347  crits = (crits[0], False)
348  else:
349  if crits[1] == 'ASC':
350  crits[1] = False
351  else:
352  crits[1] = True
353 
354  if crits[0] == 'PDate':
355  def orderKey(itemObject):
356  if itemObject.dbFields['PDate'] is not None:
357  return parse(itemObject.dbFields['PDate']).strftime('%s')
358  else:
359  return -1
360  elif crits[0] == 'CDate':
361  def orderKey(itemObject):
362  return parse(itemObject.dbFields['CDate']).strftime('%s')
363  elif crits[0] == 'UDate':
364  def orderKey(itemObject):
365  return parse(itemObject.dbFields['UDate']).strftime('%s')
366  elif crits[0] == 'TcDate':
367  def orderKey(itemObject):
368  return parse(itemObject.dbFields['TcDate']).strftime('%s')
369  elif crits[0] == 'Size':
370  def orderKey(itemObject):
371  return itemObject.dbFields['Size']
372  elif crits[0] == 'Depth':
373  def orderKey(itemObject):
374  return itemObject.dbFields['Depth']
375  elif crits[0] == 'TagsCount':
376  def orderKey(itemObject):
377  return itemObject.dbFields['TagsCount']
378  elif crits[0] == 'ContentURLMd5':
379  def orderKey(itemObject):
380  return itemObject.dbFields['ContentURLMd5']
381  else:
382  def orderKey(itemObject):
383  return itemObject.dbFields['CDate']
384 
385  ret = sorted(ret, key=orderKey, reverse=crits[1])
386 
387  except Exception, err:
388  logger.error("Exception: '%s', criterion: '%s'", str(err), str(criterion))
389 
390  return ret
391 
392 
Here is the caller graph for this function:

◆ sortURLFetchResults()

def dc.ClientInterfaceService.ClientInterfaceService.sortURLFetchResults (   self,
  itemObject,
  criterion 
)

Definition at line 398 of file ClientInterfaceService.py.

398  def sortURLFetchResults(self, itemObject, criterion):
399  ret = itemObject
400 
401  try:
402  crits = criterion.split(',')
403  crits = crits[0]
404  crits = criterion.split(' ')
405  if len(crits) == 0:
406  crits = ('CDate', False)
407  elif len(crits) == 1:
408  crits = (crits[0], False)
409  else:
410  if crits[1] == 'ASC':
411  crits[1] = False
412  else:
413  crits[1] = True
414 
415  if crits[0] == 'PDate':
416  def orderKey(itemObject):
417  return parse(itemObject.pDate).strftime('%s')
418  elif crits[0] == 'CDate':
419  def orderKey(itemObject):
420  return parse(itemObject.CDate).strftime('%s')
421  elif crits[0] == 'UDate':
422  def orderKey(itemObject):
423  return parse(itemObject.UDate).strftime('%s')
424  elif crits[0] == 'TcDate':
425  def orderKey(itemObject):
426  return parse(itemObject.tcDate).strftime('%s')
427  elif crits[0] == 'Size':
428  def orderKey(itemObject):
429  return itemObject.size
430  elif crits[0] == 'Depth':
431  def orderKey(itemObject):
432  return itemObject.depth
433  elif crits[0] == 'TagsCount':
434  def orderKey(itemObject):
435  return itemObject.tagsCount
436  elif crits[0] == 'ContentURLMd5':
437  def orderKey(itemObject):
438  return itemObject.contentURLMd5
439  else:
440  def orderKey(itemObject):
441  return itemObject.CDate
442 
443  ret = sorted(ret, key=orderKey, reverse=crits[1])
444  except Exception as err:
445  logger.error("Exception: %s", str(err))
446 
447  return ret
448 
449 
450 
Here is the caller graph for this function:

◆ unregisterEvent()

def dc.ClientInterfaceService.ClientInterfaceService.unregisterEvent (   self,
  event 
)

Definition at line 212 of file ClientInterfaceService.py.

212  def unregisterEvent(self, event):
213  del self.processEvents[event.uid]
214 
215 
216 
Here is the caller graph for this function:

Member Data Documentation

◆ batchTasksManagerRealTime

dc.ClientInterfaceService.ClientInterfaceService.batchTasksManagerRealTime

Definition at line 55 of file ClientInterfaceService.py.

◆ CONFIG_BATCH_TASKS_MANAGER_REALTIME

string dc.ClientInterfaceService.ClientInterfaceService.CONFIG_BATCH_TASKS_MANAGER_REALTIME = "clientBatchTasksManagerRealTime"
static

Definition at line 37 of file ClientInterfaceService.py.

◆ CONFIG_DRCE_NODES

string dc.ClientInterfaceService.ClientInterfaceService.CONFIG_DRCE_NODES = "DRCENodes"
static

Definition at line 38 of file ClientInterfaceService.py.

◆ CONFIG_SECTION

string dc.ClientInterfaceService.ClientInterfaceService.CONFIG_SECTION = "ClientInterfaceService"
static

Definition at line 31 of file ClientInterfaceService.py.

◆ CONFIG_SERVER_HOST

string dc.ClientInterfaceService.ClientInterfaceService.CONFIG_SERVER_HOST = "serverHost"
static

Definition at line 32 of file ClientInterfaceService.py.

◆ CONFIG_SERVER_PORT

string dc.ClientInterfaceService.ClientInterfaceService.CONFIG_SERVER_PORT = "serverPort"
static

Definition at line 33 of file ClientInterfaceService.py.

◆ CONFIG_SITES_MANAGER

string dc.ClientInterfaceService.ClientInterfaceService.CONFIG_SITES_MANAGER = "clientSitesManager"
static

Definition at line 34 of file ClientInterfaceService.py.

◆ CONNECTION_PREFIX

string dc.ClientInterfaceService.ClientInterfaceService.CONNECTION_PREFIX = "Connection"
static

Definition at line 36 of file ClientInterfaceService.py.

◆ processEvents

dc.ClientInterfaceService.ClientInterfaceService.processEvents

Definition at line 138 of file ClientInterfaceService.py.

◆ SERVER_CONNECTION_NAME

string dc.ClientInterfaceService.ClientInterfaceService.SERVER_CONNECTION_NAME = "server"
static

Definition at line 35 of file ClientInterfaceService.py.

◆ sitesManager

dc.ClientInterfaceService.ClientInterfaceService.sitesManager

Definition at line 54 of file ClientInterfaceService.py.


The documentation for this class was generated from the following file: