HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ftests_db_in_memory.py
Go to the documentation of this file.
1 """
2 HCE project, Python bindings, Distributed Tasks Manager application.
3 Event objects definitions.
4 
5 @package: dtm
6 @file dbi.py
7 @author Oleksii <developers.hce@gmail.com>
8 @link: http://hierarchical-cluster-engine.com/
9 @copyright: Copyright &copy; 2013-2014 IOIX Ukraine
10 @license: http://hierarchical-cluster-engine.com/license/
11 @since: 0.1
12 """
13 
14 
15 from sqlalchemy.sql import select
16 from sqlalchemy import delete
17 import sqlalchemy
18 from sqlalchemy.engine.reflection import Inspector
19 from sqlalchemy import create_engine, MetaData, Table
20 from sqlalchemy import schema, types
21 from sqlalchemy.pool import StaticPool
22 from sqlalchemy.pool import SingletonThreadPool
23 from sqlalchemy.orm import sessionmaker
24 
25 
26 
29 class DBI(object):
30 
31 
32 
37  def __init__(self, config_dic):
38  db_name = config_dic["db_name"]
39  """
40  db_name - database name
41  Examples:
42  db_name=sqlite:// # in memory
43  db_name=sqlite:///:memory: # in memory
44  db_name=sqlite:///del.db # relative path to file
45  db_name=sqlite:////tmp/del.db # absolute path to file
46  """
47 
48  engine = create_engine('sqlite:///:memory:', echo=True)
49  Base.metadata.create_all(engine)
50  Session = sessionmaker(bind=engine)
51  self.session = Session()
52 
53  self.errorCode = 0
54  self.errorMessage = "Ok"
55 
56 
57 
62  def getErrorCode(self):
63  return self.errorCode
64 
65 
66 
71  def getErrorMsg(self):
72  return self.errorMessage
73 
74 
75 
82  def insert(self, task):
83  self.errorCode = 0
84  self.session.add(task)
85  #try:
86  self.session.commit()
87  #except:
88  #db.session.rollback()
89  #self.errorCode = CONSTANTS.DBI_INSERT_ERROR_CODE
90  #self.errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG
91  return task
92 
93 
94 
102  def fetch(self, obj, clause):
103  self.errorCode = 0
104  fetched = []
105  try:
106  fetched.append(db.session.query(type(obj)).filter(clause).first())
107  #fetched.append(db.session.query(type(obj)).from_statement(clause).first())
108  except:
109  db.session.rollback()
110  self.errorCode = 1
111  self.errorMessage = "Fetch error"
112  return fetched
113 
114 
115 
123  def sql(self, obj, clause):
124  self.errorCode = 0
125  fetched = []
126  try:
127  fetched.append(db.session.query(type(obj)).from_statement(clause).first())
128  except:
129  db.session.rollback()
130  self.errorCode = 2
131  self.errorMessage = "sql error"
132  return fetched
133 
134 
135 
142  def fetchAll(self, tasks):
143  self.errorCode = 0
144  fetched = []
145  try:
146  conn = db.engine.connect()
147  if hasattr(tasks, "__len__"):
148  for task in tasks:
149  fetched.append(conn.execute(select([type(task)])).fetchall())
150  else:
151  fetched.append(conn.execute(select([type(tasks)])).fetchall())
152  except:
153  db.session.rollback()
154  self.errorCode = 3
155  self.errorMessage = "fetch all error"
156  return fetched
157 
158 
159 
167  def update(self, obj, clause):
168  self.errorCode = 0
169  updated = []
170  try:
171  updated_task = db.session.query(type(obj)).filter(clause).first()
172  if updated_task:
173  attributes = [attr for attr in dir(obj) if not attr.startswith('__') and not attr.startswith('_') and getattr(obj,attr)]
174  for attr in attributes:
175  setattr(updated_task, attr, getattr(obj, attr))
176  updated.append(updated_task)
177  db.session.commit()
178  except:
179  db.session.rollback()
180  self.errorCode = 4
181  self.errorMessage = "update error"
182  return updated
183 
184 
185 
192  def insertOnUpdate(self, tasks, clause):
193  self.errorCode = 0
194  try:
195  if hasattr(tasks, "__len__"):
196  for task in tasks:
197  db.session.merge(task)
198  else:
199  db.session.merge(tasks)
200  db.session.commit()
201  except:
202  db.session.rollback()
203  self.errorCode = 5
204  self.errorMessage = "insert on update error"
205  return tasks
206 
207 
208 
215  def delete(self, obj, clause):
216  self.errorCode = 0
217  deleted = []
218  try:
219  deleted_task = db.session.query(type(obj)).filter(clause).first()
220  if deleted_task:
221  deleted.append(deleted_task)
222  db.session.delete(deleted_task)
223  db.session.commit()
224  except:
225  db.session.rollback()
226  self.errorCode = 6
227  self.errorMessage = "delete error"
228  return deleted
229 
230 
231 
237  def deleteAll(self, tasks):
238  deleted = []
239  self.errorCode = 0
240  try:
241  conn = db.engine.connect()
242  if hasattr(tasks, "__len__"):
243  for task in tasks:
244  conn.execute(type(task).delete())
245  else:
246  #conn.execute(type(tasks).delete())
247  #db.session.query(delete(tasks))
248  #db.session.delete(tasks)
249  ress = db.session.query(tasks).filter(tasks.UThreads=="").first()
250  #ress = conn.execute(select([type(tasks)])).fetchall()
251  for res in ress:
252  db.session.delete(res)
253  db.session.commit()
254  except:
255  db.session.rollback()
256  self.errorCode = 7
257  self.errorMessage = "delete all error"
258  return deleted
259 
260 
261 
262 
263 
264 
265 
266 from sqlalchemy.ext.declarative import declarative_base
267 Base = declarative_base()
268 from sqlalchemy import Column, Integer, String
269 
270 
273  __tablename__ = "tasks_data_table"
274  tableid = Column(Integer, primary_key=True)
275  id = Column(Integer, unique=True, index=True)
276  data = Column(String, unique=False, index=False)
277 
278 
279 
280 
281 
282 
283 
284 if __name__ == '__main__':
286  conf = {'db_name':'memory'}
287  db = DBI(conf)
288  tdt.id = 11
289  db.insert(tdt)
290  for instance in db.session.query(TasksDataTable).order_by(TasksDataTable.id):
291  print instance.id, instance.data
def insertOnUpdate(self, tasks, clause)
insertOnUpdate insert new object or update object if exists in the table
def insert(self, task)
insert insert objects into the database
def sql(self, obj, clause)
sql fetch objects from the database
def getErrorCode(self)
get error code return error code last performed action (e.g.
def __init__(self, config_dic)
constructor initialize fields
def deleteAll(self, tasks)
deleteAll delete all objects from the database
Class describes structures of task item used in TaskDataManager module.
Common use wrapper class to interact with the ORM databases Provide CRUD interface (create...
def update(self, obj, clause)
update update objects in the database
def getErrorMsg(self)
get error message return error message last performed action (e.g.
def fetch(self, obj, clause)
fetch fetch objects from the database
def delete(self, obj, clause)
delete delete objects from the database
def fetchAll(self, tasks)
fetchAll fetch all objects from the database