HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
dbi.py
Go to the documentation of this file.
1 """
2 HCE project, Python bindings, Distributed Tasks Manager application.
3 Event objects definitions.
4 
5 @package: dtm
6 @file dbi.py
7 @author Oleksii <developers.hce@gmail.com>
8 @link: http://hierarchical-cluster-engine.com/
9 @copyright: Copyright &copy; 2013-2014 IOIX Ukraine
10 @license: http://hierarchical-cluster-engine.com/license/
11 @since: 0.1
12 """
13 
14 import collections
15 from threading import Lock
16 import sqlalchemy
17 from sqlalchemy.sql import select
18 from sqlalchemy import create_engine
19 from sqlalchemy.orm import sessionmaker
20 from sqlalchemy.engine import reflection
21 
22 from sqlalchemy import text
23 
24 import Constants as CONSTANTS # pylint: disable=W0403
25 import app.Utils as Utils
26 
27 # Logger initialization
28 logger = Utils.MPLogger().getLogger()
29 
30 # instantiate global database object
31 lock = Lock()
32 
33 
34 # #Common use wrapper class to interact with the ORM databases
35 # Provide CRUD interface (create, read, update, delete) entries
36 #
37 class DBI(object):
38 
39  MANDATORY_DBS = ["ee_responses_table", "resources_table", "scheduler_task_scheme", "task_back_log_scheme",
40  "task_log_scheme", "tasks_data_table"]
41 
42  # #constructor
43  # initialize fields
44  #
45  # @param config_dic dict object instance from config section
46  #
47  def __init__(self, config_dic):
48  # db_name = config_dic["db_name"]
49  engineString = "mysql+mysqldb://%s:%s@%s"
50  engineString = engineString % (config_dic["db_user"], config_dic["db_pass"], config_dic["db_host"])
51  if "db_port" in config_dic:
52  engineString += ':' + str(config_dic["db_port"])
53  engineString += '/' + config_dic["db_name"]
54 
55  # self.engine = create_engine("mysql+mysqldb://hce:hce12345@localhost:3306/dtm")
56  self.engine = create_engine(engineString)
57  self.checkTables()
58  Session = sessionmaker(bind=self.engine)
59  self.session = Session()
60 
61  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
62  self.errorMessage = CONSTANTS.DBI_SUCCESS_MSG
63 
64 
65  # #checkTables
66  # checkTables checks present of mandatory table in db
67  #
68  def checkTables(self):
69  try:
70  insp = reflection.Inspector.from_engine(self.engine)
71  avlTables = insp.get_table_names()
72  for dbTable in self.MANDATORY_DBS:
73  if dbTable not in avlTables:
74  raise Exception(">>> %s table not present in database" % dbTable)
75  except Exception as err:
76  self.errorCode = CONSTANTS.DBI_COMMON_ERROR_CODE
77  self.errorMessage = CONSTANTS.DBI_COMMON_ERROR_MSG + ": " + err.message
78  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
79 
80 
81  # #get error code
82  # return error code last performed action (e.g. insert, update, etc.)
83  #
84  # @return error code last performed action (e.g. insert, update, etc.)
85  #
86  def getErrorCode(self):
87  return self.errorCode
88 
89 
90  # #get error message
91  # return error message last performed action (e.g. insert, update, etc.)
92  #
93  # @return error message last performed action (e.g. insert, update, etc.)
94  #
95  def getErrorMsg(self):
96  return self.errorMessage
97 
98 
99  # #insert
100  # insert objects into the database
101  #
102  # @param tasks - list of objects (not mandatory the same type) for insertion
103  # @return the same list of objects
104  # @error can be checked by the getError() method
105  #
106  def insert(self, tasks):
107  with lock:
108  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
109 
110  taskList = []
111  if isinstance(tasks, collections.Iterable):
112  taskList = tasks
113  else:
114  taskList.append(tasks)
115 
116  # boolean flag exist faults
117  hasFaults = False
118 
119  for task in taskList:
120  try:
121  self.session.add(task)
122  self.session.commit()
123  except sqlalchemy.exc.SQLAlchemyError, err:
124  logger.info("tasks: %s, type: %s", str(tasks), str(type(tasks)))
125  self.errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG + ": " + str(err)
126  self.session.rollback()
127  hasFaults = True
128  except Exception, err:
129  logger.info("tasks: %s, type: %s", str(tasks), str(type(tasks)))
130  self.errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG + ": " + str(err)
131  hasFaults = True
132 
133  if hasFaults:
134  self.errorCode = CONSTANTS.DBI_INSERT_ERROR_CODE
135  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
136 
137  return tasks
138 
139 
140  # #fetch
141  # fetch objects from the database
142  #
143  # @param obj - list of objects (not mandatory the same type) for fetching
144  # @param clause - sql clause for fetching the object from table
145  # @return list of fetched objects
146  # @error can be checked by the getError() method
147  #
148  def fetch(self, obj, clause):
149  with lock:
150  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
151  fetched = []
152  try:
153  rows = self.session.query(type(obj)).filter(text(clause)).all()
154  if len(rows):
155  fetched = rows
156  # Firstly, we check if exception was thrown by the DBI itself
157  # If so we rollback DB
158  except sqlalchemy.exc.SQLAlchemyError as err:
159  self.session.rollback()
160  self.errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
161  self.errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG + ": " + str(err)
162  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
163  except Exception as err:
164  self.errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
165  self.errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG + ": " + str(err)
166  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
167 
168  return fetched
169 
170 
171  # #sql
172  # fetch objects from the database
173  #
174  # @param obj - list of objects (not mandatory the same type) for fetching
175  # @param clause - sql clause for fetching the object from table
176  # @return list of fetched objects
177  # @error can be checked by the getError() method
178  #
179  def sql(self, obj, clause):
180  with lock:
181  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
182  fetched = []
183  try:
184  rows = self.session.query(type(obj)).from_statement(text(clause)).all()
185  if len(rows):
186  fetched = rows
187  # Firstly, we check if exception was thrown by the DBI itself
188  # If so we rollback DB
189  except sqlalchemy.exc.SQLAlchemyError as err:
190  self.session.rollback()
191  self.errorCode = CONSTANTS.DBI_SQL_ERROR_CODE
192  self.errorMessage = CONSTANTS.DBI_SQL_ERROR_MSG + ": " + str(err) + ", clause:\n" + str(clause)
193  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
194  except Exception as err:
195  self.errorCode = CONSTANTS.DBI_SQL_ERROR_CODE
196  self.errorMessage = CONSTANTS.DBI_SQL_ERROR_MSG + ": " + str(err) + ", clause:\n" + str(clause)
197  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
198 
199  return fetched
200 
201 
202 # # #fetchAll
203 # # fetch all objects from the database
204 # #
205 # # @param tasks - object type for fetching
206 # # @return list of fetched objects
207 # # @error can be checked by the getError() method
208 # #
209 # def fetchAll(self, tasks):
210 # with lock:
211 # self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
212 # fetched = []
213 #
214 #
215 # try:
216 # conn = self.engine.connect()
217 # if hasattr(tasks, "__len__"):
218 # for task in tasks:
219 # fetched.append(conn.execute(select([type(task)])).fetchall())
220 # else:
221 # fetched.append(conn.execute(select([type(tasks)])).fetchall())
222 # # Firstly, we check if exception was thrown by the DBI itself
223 # # If so we rollback DB
224 # except sqlalchemy.exc.SQLAlchemyError as err:
225 # self.session.rollback()
226 # self.errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
227 # self.errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG + ": " + err.message
228 # raise DBIErr(self.getErrorCode(), self.getErrorMsg())
229 # except Exception as err:
230 # self.errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
231 # self.errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG + ": " + err.message
232 # raise DBIErr(self.getErrorCode(), self.getErrorMsg())
233 # return fetched
234 
235 
236  # fetch all objects from the database
237  #
238  # @param tasks - object type for fetching
239  # @return list of fetched objects
240  # @error can be checked by the getError() method
241  #
242  def fetchAll(self, tasks):
243  with lock:
244  # variable for result
245  fetched = []
246  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
247 
248  connection = None
249  try:
250  connection = self.engine.connect()
251 
252  taskList = []
253  if isinstance(tasks, collections.Iterable):
254  taskList = tasks
255  else:
256  taskList.append(tasks)
257 
258  # boolean flag exist faults
259  hasFaults = False
260  for task in taskList:
261  try:
262  fetched.append(connection.execute(select([type(task)])).fetchall())
263  except sqlalchemy.exc.SQLAlchemyError, err:
264  self.session.rollback()
265  self.errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG + ": " + str(err)
266  hasFaults = True
267  except Exception, err:
268  self.errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG + ": " + str(err)
269  hasFaults = True
270 
271  if hasFaults:
272  self.errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
273  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
274 
275  except DBIErr, e:
276  raise e
277  except Exception, e:
278  raise DBIErr(CONSTANTS.DBI_FETCH_ERROR_CODE, str(e))
279  finally:
280  if connection is not None:
281  connection.close()
282 
283  return fetched
284 
285 
286  # #update
287  # update objects in the database
288  #
289  # @param tasks - list of objects (not mandatory the same type) for updating
290  # @param clause - sql clause for updating the object in the table
291  # @return list of updated objects
292  # @error can be checked by the getError() method
293  #
294  def update(self, obj, clause):
295  with lock:
296  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
297  updated = []
298  try:
299  updated_task = self.session.query(type(obj)).filter(text(clause)).first()
300  if updated_task:
301  attributes = [attr for attr in dir(obj) if not attr.startswith('__') \
302  and not attr.startswith('_') and getattr(obj, attr) is not None]
303  # attributes = [attr for attr in dir(obj) if not attr.startswith('__') and not attr.startswith('_')]
304  for attr in attributes:
305  setattr(updated_task, attr, getattr(obj, attr))
306  updated.append(updated_task)
307  self.session.commit()
308  # Firstly, we check if exception was thrown by the DBI itself
309  # If so we rollback DB
310  except sqlalchemy.exc.SQLAlchemyError as err:
311  self.session.rollback()
312  self.errorCode = CONSTANTS.DBI_UPDATE_ERROR_CODE
313  self.errorMessage = CONSTANTS.DBI_UPDATE_ERROR_MSG + ": " + str(err)
314  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
315  except Exception as err:
316  self.errorCode = CONSTANTS.DBI_UPDATE_ERROR_CODE
317  self.errorMessage = CONSTANTS.DBI_UPDATE_ERROR_MSG + ": " + str(err)
318  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
319 
320  return updated
321 
322 
323  # #insertOnUpdate
324  # insert new object or update object if exists in the table
325  # @param tasks - list of objects (not mandatory the same type) for updating
326  # @param clause - (deprecated) sql clause for updating the object in the table
327  # @return list of updated objects
328  # @error can be checked by the getError() method
329  #
330  def insertOnUpdate(self, tasks, clause): # pylint: disable=W0613
331  with lock:
332  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
333  try:
334 
335  taskList = []
336  if isinstance(tasks, collections.Iterable):
337  taskList = tasks
338  else:
339  taskList.append(tasks)
340 
341  for task in taskList:
342  self.session.merge(task)
343 
344  self.session.commit()
345  # Firstly, we check if exception was thrown by the DBI itself
346  # If so we rollback DB
347  except sqlalchemy.exc.SQLAlchemyError as err:
348  self.session.rollback()
349  self.errorCode = CONSTANTS.DBI_INSERT_ERROR_CODE
350  self.errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG + ": " + str(err)
351  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
352  except Exception as err:
353  self.errorCode = CONSTANTS.DBI_INSERT_ERROR_CODE
354  self.errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG + ": " + str(err)
355  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
356 
357  return tasks
358 
359 
360  # #delete
361  # delete objects from the database
362  # @param obj- list of objects (not mandatory the same type) for deleting
363  # @param clause - sql clause for deleting the object from the table
364  # @return list of deleted objects
365  # @error can be checked by the getError() method
366  #
367  def delete(self, obj, clause):
368  with lock:
369  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
370  deleted = []
371  try:
372  deleted_task = self.session.query(type(obj)).filter(text(clause)).first()
373  if deleted_task:
374  deleted.append(deleted_task)
375  self.session.delete(deleted_task)
376  self.session.commit()
377  # Firstly, we check if exception was thrown by the DBI itself
378  # If so we rollback DB
379  except sqlalchemy.exc.SQLAlchemyError as err:
380  self.session.rollback()
381  self.errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
382  self.errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG + ": " + str(err)
383  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
384  except Exception as err:
385  self.errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
386  self.errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG + ": " + str(err)
387  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
388 
389  return deleted
390 
391 
392 # # #deleteAll
393 # # delete all objects from the database
394 # # @param tasks - object type for deleting
395 # # @return list of deleted objects
396 # # @error can be checked by the getError() method
397 # #
398 # def deleteAll(self, tasks):
399 # with lock:
400 # deleted = []
401 # self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
402 # try:
403 # conn = self.engine.connect()
404 # if hasattr(tasks, "__len__"):
405 # for task in tasks:
406 # conn.execute(type(task).delete())
407 # else:
408 # # conn.execute(type(tasks).delete())
409 # # self.session.query(delete(tasks))
410 # # self.session.delete(tasks)
411 # ress = self.session.query(tasks).filter(tasks.UThreads == "").first()
412 # # ress = conn.execute(select([type(tasks)])).fetchall()
413 # for res in ress:
414 # self.session.delete(res)
415 # self.session.commit()
416 # # Firstly, we check if exception was thrown by the DBI itself
417 # # If so we rollback DB
418 # except sqlalchemy.exc.SQLAlchemyError as err:
419 # self.session.rollback()
420 # self.errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
421 # self.errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG + ": " + err.message
422 # raise DBIErr(self.getErrorCode(), self.getErrorMsg())
423 # except Exception as err:
424 # self.errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
425 # self.errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG + ": " + err.message
426 # raise DBIErr(self.getErrorCode(), self.getErrorMsg())
427 # return deleted
428 
429 
430  # delete all objects from the database
431  # @param tasks - object type for deleting
432  # @return list of deleted objects
433  # @error can be checked by the getError() method
434  #
435  def deleteAll(self, tasks):
436  with lock:
437  deleted = []
438  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
439  connection = None
440  try:
441  connection = self.engine.connect()
442 
443  taskList = []
444  if isinstance(tasks, collections.Iterable):
445  taskList = tasks
446  else:
447  taskList.append(tasks)
448 
449  # boolean flag exist faults
450  hasFaults = False
451  for task in taskList:
452  try:
453  connection.execute(type(task).delete())
454  except sqlalchemy.exc.SQLAlchemyError, err:
455  self.session.rollback()
456  self.errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG + ": " + str(err)
457  hasFaults = True
458  except Exception, err:
459  self.errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG + ": " + str(err)
460  hasFaults = True
461 
462  if hasFaults:
463  self.errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
464  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
465 
466  except DBIErr, e:
467  raise e
468  except Exception, e:
469  raise DBIErr(CONSTANTS.DBI_DELETE_ERROR_CODE, str(e))
470  finally:
471  if connection is not None:
472  connection.close()
473 
474  return deleted
475 
476 
477  # #sqlCustom
478  # sqlCustom method makes SQL Custom request and returns result
479  # @param clause - sql clause for deleting the object from the table
480  # @return list of result objects
481  #
482  def sqlCustom(self, clause):
483  with lock:
484  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
485  customResponse = []
486  try:
487  sqlText = text(clause)
488  result = self.session.execute(sqlText)
489  if result is not None:
490  customResponse = result.fetchall()
491  self.session.commit()
492  # Firstly, we check if exception was thrown by the DBI itself
493  # If so we rollback DB
494  except sqlalchemy.exc.SQLAlchemyError as err:
495  self.session.rollback()
496  self.errorCode = CONSTANTS.DBI_SQL_ERROR_CODE
497  self.errorMessage = CONSTANTS.DBI_SQL_ERROR_MSG + ": " + err.message
498  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
499  except Exception as err:
500  self.errorCode = CONSTANTS.DBI_SQL_ERROR_CODE
501  self.errorMessage = CONSTANTS.DBI_SQL_ERROR_MSG + ": " + err.message
502  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
503 
504  return customResponse
505 
506 
507  # #makeQueryFromObj
508  # makeQueryFromObj static method creates and returns sqlalchemy query, based on incoming object
509  # @param obj incoming object, which represent table schema
510  # @param engine incoming db engine
511  # @param session incoming sqlalchemy session
512  # @return instance of sqlalchemy query
513  #
514  @staticmethod
515  def makeQueryFromObj(obj, engine, session):
516  ret = None
517  criterions = {}
518  insp = reflection.Inspector.from_engine(engine)
519  localColumns = insp.get_columns(obj.__tablename__)
520  for columName in [elem["name"] for elem in localColumns]:
521  if columName in obj.__dict__:
522  criterions[columName] = obj.__dict__[columName]
523  if len(criterions) > 0:
524  ret = session.query(type(obj)).filter_by(**criterions)
525  return ret
526 
527 
528 # #Class is used to inform about error of DBI object
529 #
530 #
531 class DBIErr(Exception):
532 
533  def __init__(self, errCode, message):
534  Exception.__init__(self, message)
535  self.errCode = errCode
errorMessage
Definition: dbi.py:62
engine
Definition: dbi.py:56
def __init__(self, errCode, message)
Definition: dbi.py:533
def insertOnUpdate(self, tasks, clause)
Definition: dbi.py:330
def fetch(self, obj, clause)
Definition: dbi.py:148
list MANDATORY_DBS
Definition: dbi.py:39
def execute(self, commands, nodes)
execute method execute incoming commands on nodes, keepts reult in responses and responsesDicts field...
Definition: NodeManager.py:63
errorCode
Definition: dbi.py:61
def deleteAll(self, tasks)
Definition: dbi.py:435
def makeQueryFromObj(obj, engine, session)
Definition: dbi.py:515
def checkTables(self)
Definition: dbi.py:68
def update(self, obj, clause)
Definition: dbi.py:294
def delete(self, obj, clause)
Definition: dbi.py:367
session
Definition: dbi.py:59
def sqlCustom(self, clause)
Definition: dbi.py:482
def getErrorMsg(self)
Definition: dbi.py:95
def insert(self, tasks)
Definition: dbi.py:106
def getErrorCode(self)
Definition: dbi.py:86
def sql(self, obj, clause)
Definition: dbi.py:179
def fetchAll(self, tasks)
Definition: dbi.py:242
def __init__(self, config_dic)
Definition: dbi.py:47