HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
dbi.dbi.DBI Class Reference
Inheritance diagram for dbi.dbi.DBI:
Collaboration diagram for dbi.dbi.DBI:

Public Member Functions

def __init__ (self, config_dic)
 
def checkTables (self)
 
def getErrorCode (self)
 
def getErrorMsg (self)
 
def insert (self, tasks)
 
def fetch (self, obj, clause)
 
def sql (self, obj, clause)
 
def fetchAll (self, tasks)
 
def update (self, obj, clause)
 
def insertOnUpdate (self, tasks, clause)
 
def delete (self, obj, clause)
 
def deleteAll (self, tasks)
 
def sqlCustom (self, clause)
 

Static Public Member Functions

def makeQueryFromObj (obj, engine, session)
 

Public Attributes

 engine
 
 session
 
 errorCode
 
 errorMessage
 

Static Public Attributes

list MANDATORY_DBS
 

Detailed Description

Definition at line 37 of file dbi.py.

Constructor & Destructor Documentation

◆ __init__()

def dbi.dbi.DBI.__init__ (   self,
  config_dic 
)

Definition at line 47 of file dbi.py.

47  def __init__(self, config_dic):
48  # db_name = config_dic["db_name"]
49  engineString = "mysql+mysqldb://%s:%s@%s"
50  engineString = engineString % (config_dic["db_user"], config_dic["db_pass"], config_dic["db_host"])
51  if "db_port" in config_dic:
52  engineString += ':' + str(config_dic["db_port"])
53  engineString += '/' + config_dic["db_name"]
54 
55  # self.engine = create_engine("mysql+mysqldb://hce:hce12345@localhost:3306/dtm")
56  self.engine = create_engine(engineString)
57  self.checkTables()
58  Session = sessionmaker(bind=self.engine)
59  self.session = Session()
60 
61  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
62  self.errorMessage = CONSTANTS.DBI_SUCCESS_MSG
63 
64 
def __init__(self)
constructor
Definition: UIDGenerator.py:19

Member Function Documentation

◆ checkTables()

def dbi.dbi.DBI.checkTables (   self)

Definition at line 68 of file dbi.py.

68  def checkTables(self):
69  try:
70  insp = reflection.Inspector.from_engine(self.engine)
71  avlTables = insp.get_table_names()
72  for dbTable in self.MANDATORY_DBS:
73  if dbTable not in avlTables:
74  raise Exception(">>> %s table not present in database" % dbTable)
75  except Exception as err:
76  self.errorCode = CONSTANTS.DBI_COMMON_ERROR_CODE
77  self.errorMessage = CONSTANTS.DBI_COMMON_ERROR_MSG + ": " + err.message
78  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
79 
80 
Here is the call graph for this function:

◆ delete()

def dbi.dbi.DBI.delete (   self,
  obj,
  clause 
)

Definition at line 367 of file dbi.py.

367  def delete(self, obj, clause):
368  with lock:
369  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
370  deleted = []
371  try:
372  deleted_task = self.session.query(type(obj)).filter(text(clause)).first()
373  if deleted_task:
374  deleted.append(deleted_task)
375  self.session.delete(deleted_task)
376  self.session.commit()
377  # Firstly, we check if exception was thrown by the DBI itself
378  # If so we rollback DB
379  except sqlalchemy.exc.SQLAlchemyError as err:
380  self.session.rollback()
381  self.errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
382  self.errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG + ": " + str(err)
383  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
384  except Exception as err:
385  self.errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
386  self.errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG + ": " + str(err)
387  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
388 
389  return deleted
390 
391 
392 # # #deleteAll
393 # # delete all objects from the database
394 # # @param tasks - object type for deleting
395 # # @return list of deleted objects
396 # # @error can be checked by the getError() method
397 # #
398 # def deleteAll(self, tasks):
399 # with lock:
400 # deleted = []
401 # self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
402 # try:
403 # conn = self.engine.connect()
404 # if hasattr(tasks, "__len__"):
405 # for task in tasks:
406 # conn.execute(type(task).delete())
407 # else:
408 # # conn.execute(type(tasks).delete())
409 # # self.session.query(delete(tasks))
410 # # self.session.delete(tasks)
411 # ress = self.session.query(tasks).filter(tasks.UThreads == "").first()
412 # # ress = conn.execute(select([type(tasks)])).fetchall()
413 # for res in ress:
414 # self.session.delete(res)
415 # self.session.commit()
416 # # Firstly, we check if exception was thrown by the DBI itself
417 # # If so we rollback DB
418 # except sqlalchemy.exc.SQLAlchemyError as err:
419 # self.session.rollback()
420 # self.errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
421 # self.errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG + ": " + err.message
422 # raise DBIErr(self.getErrorCode(), self.getErrorMsg())
423 # except Exception as err:
424 # self.errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
425 # self.errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG + ": " + err.message
426 # raise DBIErr(self.getErrorCode(), self.getErrorMsg())
427 # return deleted
428 
429 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ deleteAll()

def dbi.dbi.DBI.deleteAll (   self,
  tasks 
)

Definition at line 435 of file dbi.py.

435  def deleteAll(self, tasks):
436  with lock:
437  deleted = []
438  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
439  connection = None
440  try:
441  connection = self.engine.connect()
442 
443  taskList = []
444  if isinstance(tasks, collections.Iterable):
445  taskList = tasks
446  else:
447  taskList.append(tasks)
448 
449  # boolean flag exist faults
450  hasFaults = False
451  for task in taskList:
452  try:
453  connection.execute(type(task).delete())
454  except sqlalchemy.exc.SQLAlchemyError, err:
455  self.session.rollback()
456  self.errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG + ": " + str(err)
457  hasFaults = True
458  except Exception, err:
459  self.errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG + ": " + str(err)
460  hasFaults = True
461 
462  if hasFaults:
463  self.errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
464  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
465 
466  except DBIErr, e:
467  raise e
468  except Exception, e:
469  raise DBIErr(CONSTANTS.DBI_DELETE_ERROR_CODE, str(e))
470  finally:
471  if connection is not None:
472  connection.close()
473 
474  return deleted
475 
476 
Here is the call graph for this function:

◆ fetch()

def dbi.dbi.DBI.fetch (   self,
  obj,
  clause 
)

Definition at line 148 of file dbi.py.

148  def fetch(self, obj, clause):
149  with lock:
150  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
151  fetched = []
152  try:
153  rows = self.session.query(type(obj)).filter(text(clause)).all()
154  if len(rows):
155  fetched = rows
156  # Firstly, we check if exception was thrown by the DBI itself
157  # If so we rollback DB
158  except sqlalchemy.exc.SQLAlchemyError as err:
159  self.session.rollback()
160  self.errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
161  self.errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG + ": " + str(err)
162  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
163  except Exception as err:
164  self.errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
165  self.errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG + ": " + str(err)
166  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
167 
168  return fetched
169 
170 
Here is the call graph for this function:

◆ fetchAll()

def dbi.dbi.DBI.fetchAll (   self,
  tasks 
)

Definition at line 242 of file dbi.py.

242  def fetchAll(self, tasks):
243  with lock:
244  # variable for result
245  fetched = []
246  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
247 
248  connection = None
249  try:
250  connection = self.engine.connect()
251 
252  taskList = []
253  if isinstance(tasks, collections.Iterable):
254  taskList = tasks
255  else:
256  taskList.append(tasks)
257 
258  # boolean flag exist faults
259  hasFaults = False
260  for task in taskList:
261  try:
262  fetched.append(connection.execute(select([type(task)])).fetchall())
263  except sqlalchemy.exc.SQLAlchemyError, err:
264  self.session.rollback()
265  self.errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG + ": " + str(err)
266  hasFaults = True
267  except Exception, err:
268  self.errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG + ": " + str(err)
269  hasFaults = True
270 
271  if hasFaults:
272  self.errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
273  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
274 
275  except DBIErr, e:
276  raise e
277  except Exception, e:
278  raise DBIErr(CONSTANTS.DBI_FETCH_ERROR_CODE, str(e))
279  finally:
280  if connection is not None:
281  connection.close()
282 
283  return fetched
284 
285 
Here is the call graph for this function:

◆ getErrorCode()

def dbi.dbi.DBI.getErrorCode (   self)

Definition at line 86 of file dbi.py.

86  def getErrorCode(self):
87  return self.errorCode
88 
89 
Here is the caller graph for this function:

◆ getErrorMsg()

def dbi.dbi.DBI.getErrorMsg (   self)

Definition at line 95 of file dbi.py.

95  def getErrorMsg(self):
96  return self.errorMessage
97 
98 
Here is the caller graph for this function:

◆ insert()

def dbi.dbi.DBI.insert (   self,
  tasks 
)

Definition at line 106 of file dbi.py.

106  def insert(self, tasks):
107  with lock:
108  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
109 
110  taskList = []
111  if isinstance(tasks, collections.Iterable):
112  taskList = tasks
113  else:
114  taskList.append(tasks)
115 
116  # boolean flag exist faults
117  hasFaults = False
118 
119  for task in taskList:
120  try:
121  self.session.add(task)
122  self.session.commit()
123  except sqlalchemy.exc.SQLAlchemyError, err:
124  logger.info("tasks: %s, type: %s", str(tasks), str(type(tasks)))
125  self.errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG + ": " + str(err)
126  self.session.rollback()
127  hasFaults = True
128  except Exception, err:
129  logger.info("tasks: %s, type: %s", str(tasks), str(type(tasks)))
130  self.errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG + ": " + str(err)
131  hasFaults = True
132 
133  if hasFaults:
134  self.errorCode = CONSTANTS.DBI_INSERT_ERROR_CODE
135  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
136 
137  return tasks
138 
139 
Here is the call graph for this function:
Here is the caller graph for this function:

◆ insertOnUpdate()

def dbi.dbi.DBI.insertOnUpdate (   self,
  tasks,
  clause 
)

Definition at line 330 of file dbi.py.

330  def insertOnUpdate(self, tasks, clause): # pylint: disable=W0613
331  with lock:
332  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
333  try:
334 
335  taskList = []
336  if isinstance(tasks, collections.Iterable):
337  taskList = tasks
338  else:
339  taskList.append(tasks)
340 
341  for task in taskList:
342  self.session.merge(task)
343 
344  self.session.commit()
345  # Firstly, we check if exception was thrown by the DBI itself
346  # If so we rollback DB
347  except sqlalchemy.exc.SQLAlchemyError as err:
348  self.session.rollback()
349  self.errorCode = CONSTANTS.DBI_INSERT_ERROR_CODE
350  self.errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG + ": " + str(err)
351  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
352  except Exception as err:
353  self.errorCode = CONSTANTS.DBI_INSERT_ERROR_CODE
354  self.errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG + ": " + str(err)
355  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
356 
357  return tasks
358 
359 
Here is the call graph for this function:

◆ makeQueryFromObj()

def dbi.dbi.DBI.makeQueryFromObj (   obj,
  engine,
  session 
)
static

Definition at line 515 of file dbi.py.

515  def makeQueryFromObj(obj, engine, session):
516  ret = None
517  criterions = {}
518  insp = reflection.Inspector.from_engine(engine)
519  localColumns = insp.get_columns(obj.__tablename__)
520  for columName in [elem["name"] for elem in localColumns]:
521  if columName in obj.__dict__:
522  criterions[columName] = obj.__dict__[columName]
523  if len(criterions) > 0:
524  ret = session.query(type(obj)).filter_by(**criterions)
525  return ret
526 
527 
528 # #Class is used to inform about error of DBI object
529 #
530 #

◆ sql()

def dbi.dbi.DBI.sql (   self,
  obj,
  clause 
)

Definition at line 179 of file dbi.py.

179  def sql(self, obj, clause):
180  with lock:
181  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
182  fetched = []
183  try:
184  rows = self.session.query(type(obj)).from_statement(text(clause)).all()
185  if len(rows):
186  fetched = rows
187  # Firstly, we check if exception was thrown by the DBI itself
188  # If so we rollback DB
189  except sqlalchemy.exc.SQLAlchemyError as err:
190  self.session.rollback()
191  self.errorCode = CONSTANTS.DBI_SQL_ERROR_CODE
192  self.errorMessage = CONSTANTS.DBI_SQL_ERROR_MSG + ": " + str(err) + ", clause:\n" + str(clause)
193  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
194  except Exception as err:
195  self.errorCode = CONSTANTS.DBI_SQL_ERROR_CODE
196  self.errorMessage = CONSTANTS.DBI_SQL_ERROR_MSG + ": " + str(err) + ", clause:\n" + str(clause)
197  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
198 
199  return fetched
200 
201 
202 # # #fetchAll
203 # # fetch all objects from the database
204 # #
205 # # @param tasks - object type for fetching
206 # # @return list of fetched objects
207 # # @error can be checked by the getError() method
208 # #
209 # def fetchAll(self, tasks):
210 # with lock:
211 # self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
212 # fetched = []
213 #
214 #
215 # try:
216 # conn = self.engine.connect()
217 # if hasattr(tasks, "__len__"):
218 # for task in tasks:
219 # fetched.append(conn.execute(select([type(task)])).fetchall())
220 # else:
221 # fetched.append(conn.execute(select([type(tasks)])).fetchall())
222 # # Firstly, we check if exception was thrown by the DBI itself
223 # # If so we rollback DB
224 # except sqlalchemy.exc.SQLAlchemyError as err:
225 # self.session.rollback()
226 # self.errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
227 # self.errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG + ": " + err.message
228 # raise DBIErr(self.getErrorCode(), self.getErrorMsg())
229 # except Exception as err:
230 # self.errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
231 # self.errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG + ": " + err.message
232 # raise DBIErr(self.getErrorCode(), self.getErrorMsg())
233 # return fetched
234 
235 
Here is the call graph for this function:

◆ sqlCustom()

def dbi.dbi.DBI.sqlCustom (   self,
  clause 
)

Definition at line 482 of file dbi.py.

482  def sqlCustom(self, clause):
483  with lock:
484  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
485  customResponse = []
486  try:
487  sqlText = text(clause)
488  result = self.session.execute(sqlText)
489  if result is not None:
490  customResponse = result.fetchall()
491  self.session.commit()
492  # Firstly, we check if exception was thrown by the DBI itself
493  # If so we rollback DB
494  except sqlalchemy.exc.SQLAlchemyError as err:
495  self.session.rollback()
496  self.errorCode = CONSTANTS.DBI_SQL_ERROR_CODE
497  self.errorMessage = CONSTANTS.DBI_SQL_ERROR_MSG + ": " + err.message
498  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
499  except Exception as err:
500  self.errorCode = CONSTANTS.DBI_SQL_ERROR_CODE
501  self.errorMessage = CONSTANTS.DBI_SQL_ERROR_MSG + ": " + err.message
502  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
503 
504  return customResponse
505 
506 
def execute(self, commands, nodes)
execute method execute incoming commands on nodes, keepts reult in responses and responsesDicts field...
Definition: NodeManager.py:63
Here is the call graph for this function:

◆ update()

def dbi.dbi.DBI.update (   self,
  obj,
  clause 
)

Definition at line 294 of file dbi.py.

294  def update(self, obj, clause):
295  with lock:
296  self.errorCode = CONSTANTS.DBI_SUCCESS_CODE
297  updated = []
298  try:
299  updated_task = self.session.query(type(obj)).filter(text(clause)).first()
300  if updated_task:
301  attributes = [attr for attr in dir(obj) if not attr.startswith('__') \
302  and not attr.startswith('_') and getattr(obj, attr) is not None]
303  # attributes = [attr for attr in dir(obj) if not attr.startswith('__') and not attr.startswith('_')]
304  for attr in attributes:
305  setattr(updated_task, attr, getattr(obj, attr))
306  updated.append(updated_task)
307  self.session.commit()
308  # Firstly, we check if exception was thrown by the DBI itself
309  # If so we rollback DB
310  except sqlalchemy.exc.SQLAlchemyError as err:
311  self.session.rollback()
312  self.errorCode = CONSTANTS.DBI_UPDATE_ERROR_CODE
313  self.errorMessage = CONSTANTS.DBI_UPDATE_ERROR_MSG + ": " + str(err)
314  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
315  except Exception as err:
316  self.errorCode = CONSTANTS.DBI_UPDATE_ERROR_CODE
317  self.errorMessage = CONSTANTS.DBI_UPDATE_ERROR_MSG + ": " + str(err)
318  raise DBIErr(self.getErrorCode(), self.getErrorMsg())
319 
320  return updated
321 
322 
Here is the call graph for this function:

Member Data Documentation

◆ engine

dbi.dbi.DBI.engine

Definition at line 56 of file dbi.py.

◆ errorCode

dbi.dbi.DBI.errorCode

Definition at line 61 of file dbi.py.

◆ errorMessage

dbi.dbi.DBI.errorMessage

Definition at line 62 of file dbi.py.

◆ MANDATORY_DBS

list dbi.dbi.DBI.MANDATORY_DBS
static
Initial value:
= ["ee_responses_table", "resources_table", "scheduler_task_scheme", "task_back_log_scheme",
"task_log_scheme", "tasks_data_table"]

Definition at line 39 of file dbi.py.

◆ session

dbi.dbi.DBI.session

Definition at line 59 of file dbi.py.


The documentation for this class was generated from the following file: