HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
DBTasksWrapper.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author scorp
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 from dc_db.TasksManager import TasksManager as DBTasksManager
11 from dc_db.FieldRecalculator import FieldRecalculator as DBFieldRecalculator
12 import dc.Constants
13 import dc.EventObjects
14 import dbi.EventObjects
15 from app.Utils import varDump
16 import app.Utils as Utils # pylint: disable=F0401
17 
18 
19 logger = Utils.MPLogger().getLogger()
20 
21 
22 # #class DBTasksWrapper - wrapper for common db-task operations
23 #
24 # This object is a run at once application
25 class DBTasksWrapper(object):
26 
27 
28  # #DBTasksWrapper's constructor
29  #
30  # cfgParser param - initialization config
31  def __init__(self, cfgParser):
32  self.dbTask = DBTasksManager(cfgParser)
33  self.fieldRecalculator = DBFieldRecalculator()
34  self.rid = 0
35  self.affect_db = True
36 
37 
38  # #simple Wrapper for dbTask.process method
39  #
40  def process(self, drceObject):
41  return self.dbTask.process(drceObject)
42 
43 
44  # #Recalculates common fields
45  #
46  def fieldsRecalculating(self, sites):
47  if self.affect_db:
48  fieldRecalculatingObjList = []
49  for site in sites:
50  fieldRecalculatingObjList.append(dc.EventObjects.FieldRecalculatorObj(site))
51  drceObject = dc.Constants.DRCESyncTasksCover(dc.Constants.EVENT_TYPES.FIELD_RECALCULATE,
52  fieldRecalculatingObjList)
53  retDRCE = self.dbTask.process(drceObject)
54  del retDRCE
55 
56 
57  # #Recalculates common fields
58  #
59  def collectedURLsRecalculating(self, siteId):
60  if self.affect_db:
61  self.fieldRecalculator.updateCollectedURLs(siteId, self.dbTask.executeQuery)
62 
63 
64  # #customRequest wrapper for dbTask.SQLCustom task
65  #
66  # @param query - custom query string or list of queries
67  # @param dbName - db name
68  # @return sql response or None
69  def customRequest(self, query, dbName, includeFieldsNames=dbi.EventObjects.CustomRequest.SQL_BY_INDEX):
70  ret = None
71  if self.affect_db:
72  customObject = dbi.EventObjects.CustomRequest(self.rid, query, dbName)
73  customObject.includeFieldsNames = includeFieldsNames
74  drceObject = dc.Constants.DRCESyncTasksCover(dc.Constants.EVENT_TYPES.SQL_CUSTOM, customObject)
75  retDRCE = self.dbTask.process(drceObject)
76  if retDRCE.eventType == dc.Constants.EVENT_TYPES.SQL_CUSTOM_RESPONSE:
77  if retDRCE.eventObject.rid == self.rid:
78  if retDRCE.eventObject.errString is None:
79  ret = retDRCE.eventObject.result
80  else:
81  logger.error("SQL_CUSTOM_RESPONSE >>> Resonse error = " + retDRCE.eventObject.errString)
82  else:
83  logger.error("SQL_CUSTOM_RESPONSE >>> Wrong response rid")
84  else:
85  logger.error("SQL_CUSTOM_RESPONSE >>> Wrong response type")
86  return ret
87 
88 
89  def urlUpdate(self, urlUpdateObject, criterionsWere=None, criterionsLimit=None, criterionsOrder=None):
90  ret = 0
91  if self.affect_db:
92  if isinstance(urlUpdateObject, list):
93  urlUpdateObjectList = urlUpdateObject
94  else:
95  urlUpdateObjectList = [urlUpdateObject]
96  if criterionsWere is not None or criterionsLimit is not None or criterionsOrder is not None:
97  urlUpdateObject.criterions = {}
98  if criterionsWere is not None:
99  urlUpdateObject.criterions[dc.EventObjects.URLFetch.CRITERION_WHERE] = criterionsWere
100  if criterionsLimit is not None:
101  urlUpdateObject.criterions[dc.EventObjects.URLFetch.CRITERION_LIMIT] = criterionsLimit
102  if criterionsOrder is not None:
103  urlUpdateObject.criterions[dc.EventObjects.URLFetch.CRITERION_ORDER] = criterionsOrder
104 
105  drceObject = dc.Constants.DRCESyncTasksCover(dc.Constants.EVENT_TYPES.URL_UPDATE, urlUpdateObjectList)
106  retDRCE = self.dbTask.process(drceObject)
107  if retDRCE.eventType == dc.Constants.EVENT_TYPES.URL_UPDATE_RESPONSE:
108  if hasattr(retDRCE.eventObject.statuses, '__iter__') and len(retDRCE.eventObject.statuses) > 0 and \
109  retDRCE.eventObject.statuses[0] is False:
110  logger.error("URL_UPDATE_RESPONSE >>> Operation failure, look db-task log")
111  ret = len([i for i in retDRCE.eventObject.statuses if i])
112  else:
113  logger.error("URL_UPDATE_RESPONSE >>> Wrong response type")
114  return ret
115 
116 
117  def urlStatus(self, urlStatusObject, useMd5Resolving=False):
118  ret = []
119  if self.affect_db:
120  if useMd5Resolving:
121  urlStatusObject.urlType = dc.EventObjects.URLStatus.URL_TYPE_MD5
122  drceObject = dc.Constants.DRCESyncTasksCover(dc.Constants.EVENT_TYPES.URL_STATUS, [urlStatusObject])
123  retDRCE = self.dbTask.process(drceObject)
124  if retDRCE.eventType == dc.Constants.EVENT_TYPES.URL_STATUS_RESPONSE:
125  ret = retDRCE.eventObject
126  else:
127  logger.error("URL_STATUS_RESPONSE >>> Wrong response type")
128  return ret
129 
130 
131  def urlContent(self, items):
132  ret = None
133  drceSyncTasksCoverObj = dc.Constants.DRCESyncTasksCover(dc.Constants.EVENT_TYPES.URL_CONTENT, items)
134  retDRCE = self.dbTask.process(drceSyncTasksCoverObj)
135  if retDRCE.eventType == dc.Constants.EVENT_TYPES.URL_CONTENT_RESPONSE:
136  ret = retDRCE.eventObject
137  else:
138  logger.error("URL_CONTENT_RESPONSE >>> Wrong response type")
139  return ret
140 
141 
142  def putURLContent(self, urlPut_list):
143  ret = None
144 
145  drceSyncTasksCoverObj = dc.Constants.DRCESyncTasksCover(dc.Constants.EVENT_TYPES.URL_PUT, urlPut_list)
146  responseDRCESyncTasksCover = self.dbTask.process(drceSyncTasksCoverObj)
147  if responseDRCESyncTasksCover.eventType == dc.Constants.EVENT_TYPES.URL_PUT_RESPONSE:
148  for obj in responseDRCESyncTasksCover.eventObject:
149  logger.debug("URL_PUT_RESPONSE: %s", varDump(obj))
150  else:
151  logger.error("URL_PUT_RESPONSE >>> Wrong response type")
152 
153  if responseDRCESyncTasksCover.eventType != dc.Constants.EVENT_TYPES.URL_PUT_RESPONSE:
154  logger.error("URL_PUT_RESPONSE >>> Wrong response type")
155 
156  return ret
157 
158 
159  def urlNew(self, urlNewObject):
160  ret = 0
161  if self.affect_db:
162  if isinstance(urlNewObject, list):
163  urlNewList = urlNewObject
164  else:
165  urlNewList = [urlNewObject]
166  drceObject = dc.Constants.DRCESyncTasksCover(dc.Constants.EVENT_TYPES.URL_NEW, urlNewList)
167  retDRCE = self.dbTask.process(drceObject)
168  if retDRCE.eventType == dc.Constants.EVENT_TYPES.URL_NEW_RESPONSE:
169  ret = len([i for i in retDRCE.eventObject.statuses if i == 0])
170  else:
171  logger.error("URL_NEW_RESPONSE >>> Wrong response type")
172  return ret
173 
174 
175  def siteNewOrUpdate(self, siteObject, properties=None, filters=None, siteId=None, stype=dc.EventObjects.SiteUpdate):
176  ret = []
177  if self.affect_db:
178  if stype == dc.EventObjects.Site:
179  reqType = dc.Constants.EVENT_TYPES.SITE_NEW
180  respType = dc.Constants.EVENT_TYPES.SITE_NEW_RESPONSE
181  elif stype == dc.EventObjects.SiteUpdate:
182  reqType = dc.Constants.EVENT_TYPES.SITE_UPDATE
183  respType = dc.Constants.EVENT_TYPES.SITE_UPDATE_RESPONSE
184  if siteObject is None:
185  if siteId is not None:
186  siteObject = stype(siteId)
187  if properties is not None:
188  siteObject.properties = properties
189  if filters is not None:
190  siteObject.filters = filters
191  if siteObject is not None:
192  drceObject = dc.Constants.DRCESyncTasksCover(reqType, siteObject)
193  retDRCE = self.dbTask.process(drceObject)
194  logger.debug("SITE_NEW_UPDATE_RESPONSE retDRCE: " + varDump(retDRCE))
195  if retDRCE.eventType == respType:
196  ret = retDRCE.eventObject.statuses
197  else:
198  logger.error("SITE_NEW_UPDATE_RESPONSE >>> Wrong response type")
199  else:
200  logger.error("SITE_NEW_UPDATE >>> siteObject is None!")
201  return ret
202 
203 
204  def siteStatus(self, siteStatusObject):
205  ret = None
206  if self.affect_db:
207  if siteStatusObject is not None:
208  drceObject = dc.Constants.DRCESyncTasksCover(dc.Constants.EVENT_TYPES.SITE_STATUS, siteStatusObject)
209  retDRCE = self.dbTask.process(drceObject)
210  if retDRCE.eventType == dc.Constants.EVENT_TYPES.SITE_STATUS_RESPONSE:
211  ret = retDRCE.eventObject
212  else:
213  logger.error("SITE_STATUS_RESPONSE >>> Wrong response type")
214  else:
215  logger.error("SITE_STATUS >>> Not enough incoming data")
216  return ret
217 
218 
219  # # Proxy status operation
220  #
221  # @param proxyStatusObject - proxy object
222  # @return result response
223  def proxyStatus(self, proxyStatusObject):
224  # variable for result
225  ret = None
226  if self.affect_db:
227  if proxyStatusObject is None:
228  logger.error("PROXY_STATUS: proxyObject is None!")
229  else:
230  if isinstance(proxyStatusObject, list):
231  proxyStatusObjectList = proxyStatusObject
232  else:
233  proxyStatusObjectList = [proxyStatusObject]
234 
235  drceObject = dc.Constants.DRCESyncTasksCover(dc.Constants.EVENT_TYPES.PROXY_STATUS, proxyStatusObjectList)
236  retDRCE = self.dbTask.process(drceObject)
237  if retDRCE.eventType == dc.Constants.EVENT_TYPES.PROXY_STATUS_RESPONSE:
238  ret = retDRCE.eventObject
239  else:
240  logger.error("PROXY_STATUS_RESPONSE: Wrong type of response object!")
241 
242  return ret
243 
244 
245  # # Proxy update operation
246  #
247  # @param proxyUpdateObject - proxy update object
248  # @return result response
249  def proxyUpdate(self, proxyUpdateObject):
250  # variable for result
251  ret = None
252  if self.affect_db:
253  if proxyUpdateObject is not None:
254  if isinstance(proxyUpdateObject, list):
255  proxyUpdateObjectList = proxyUpdateObject
256  else:
257  proxyUpdateObjectList = [proxyUpdateObject]
258 
259  drceObject = dc.Constants.DRCESyncTasksCover(dc.Constants.EVENT_TYPES.PROXY_UPDATE, proxyUpdateObjectList)
260  retDRCE = self.dbTask.process(drceObject)
261  if retDRCE.eventType == dc.Constants.EVENT_TYPES.PROXY_UPDATE_RESPONSE:
262  ret = retDRCE.eventObject
263  else:
264  logger.error("PROXY_UPDATE_RESPONSE >>> Wrong response type")
265  else:
266  logger.error("PROXY_UPDATE >>> Not enough incoming data")
267 
268  return ret
269 
270 
271  # # put attributes operation
272  #
273  # @param attributesList - attributes list
274  # @return - None
275  def putAttributes(self, attributes):
276 
277  attributesList = attributes if isinstance(attributes, list) else [attributes]
278 
279  drceSyncTasksCoverObj = dc.Constants.DRCESyncTasksCover(dc.Constants.EVENT_TYPES.ATTR_SET, attributesList)
280  responseDRCESyncTasksCover = self.dbTask.process(drceSyncTasksCoverObj)
281  if responseDRCESyncTasksCover.eventType != dc.Constants.EVENT_TYPES.ATTR_SET_RESPONSE:
282  logger.error("Operation 'ATTR_SET' has error: Wrong response type")
def urlUpdate(self, urlUpdateObject, criterionsWere=None, criterionsLimit=None, criterionsOrder=None)
def siteNewOrUpdate(self, siteObject, properties=None, filters=None, siteId=None, stype=dc.EventObjects.SiteUpdate)
def customRequest(self, query, dbName, includeFieldsNames=dbi.EventObjects.CustomRequest.SQL_BY_INDEX)
def siteStatus(self, siteStatusObject)
def proxyStatus(self, proxyStatusObject)
def urlStatus(self, urlStatusObject, useMd5Resolving=False)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
def proxyUpdate(self, proxyUpdateObject)