HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ftests.ftest_DBTasksWrapper.Test Class Reference
Inheritance diagram for ftests.ftest_DBTasksWrapper.Test:
Collaboration diagram for ftests.ftest_DBTasksWrapper.Test:

Public Member Functions

def setUp (self)
 
def tearDown (self)
 
def urlNew (self, params)
 
def insertNewSiteProperties (self, siteId, params)
 
def test_CustomRequest (self)
 

Public Attributes

 wrapper
 

Static Public Attributes

string CFG_NAME = "./db-task2.ini"
 

Detailed Description

Definition at line 16 of file ftest_DBTasksWrapper.py.

Member Function Documentation

◆ insertNewSiteProperties()

def ftests.ftest_DBTasksWrapper.Test.insertNewSiteProperties (   self,
  siteId,
  params 
)

Definition at line 56 of file ftest_DBTasksWrapper.py.

56  def insertNewSiteProperties(self, siteId, params):
57  if siteId is not None and hasattr(params, '__iter__') and len(params) > 0:
58  localSiteUpdate = dc.EventObjects.SiteUpdate(siteId)
59  for attr in localSiteUpdate.__dict__:
60  if hasattr(localSiteUpdate, attr):
61  setattr(localSiteUpdate, attr, None)
62  localSiteUpdate.updateType=dc.EventObjects.SiteUpdate.UPDATE_TYPE_APPEND
63  localSiteUpdate.id = siteId
64  localSiteUpdate.properties = []
65  for param in params:
66  newPropElem = {}
67  newPropElem["siteId"] = param[0]
68  newPropElem["name"] = param[1]
69  newPropElem["value"] = param[2]
70  localSiteUpdate.properties.append(newPropElem)
71  self.wrapper.siteNewOrUpdate(localSiteUpdate, stype=dc.EventObjects.SiteUpdate)
72 
73 
Here is the caller graph for this function:

◆ setUp()

def ftests.ftest_DBTasksWrapper.Test.setUp (   self)

Definition at line 21 of file ftest_DBTasksWrapper.py.

21  def setUp(self):
22  cfgParser = ConfigParser.ConfigParser()
23  cfgParser.read(self.CFG_NAME)
24  logging.config.fileConfig(cfgParser.get("TasksManager", "log_cfg"))
25  self.wrapper = DBTasksWrapper.DBTasksWrapper(cfgParser)
26 
27 

◆ tearDown()

def ftests.ftest_DBTasksWrapper.Test.tearDown (   self)

Definition at line 28 of file ftest_DBTasksWrapper.py.

28  def tearDown(self):
29  pass
30 
31 

◆ test_CustomRequest()

def ftests.ftest_DBTasksWrapper.Test.test_CustomRequest (   self)

Definition at line 74 of file ftest_DBTasksWrapper.py.

74  def test_CustomRequest(self):
75  newObject = dc.EventObjects.Site("http://ibm.com")
76  self.wrapper.siteNewOrUpdate(newObject)
77  selectStr ="SELECT COUNT(*) FROM urls_0b41d5052c7a52f5927ae7114cb288e9 " + \
78  "WHERE NOT (Status=4 AND Crawled=0 AND Processed=0)"
79  result = self.wrapper.customRequest(selectStr, "dc_urls")
80  if len(result) > 0 and len(result[0]) > 0:
81  print result[0][0]
82 
83  url_md5 = "0b41d5052c7a52f5927ae7114cb288e9"
84  siteId = "0b41d5052c7a52f5927ae7114cb288e9"
85  urlUpdateObj = dc.EventObjects.URLUpdate(siteId, url_md5, dc.EventObjects.URLStatus.URL_TYPE_MD5)
86  urlUpdateObj.tcDate = SQLExpression("NOW()")
87  urlUpdateObj.status = 2
88  urlUpdateObj.depth = 32
89  self.wrapper.urlUpdate(urlUpdateObj, "`State`=0")
90 
91  urlStatusObj = dc.EventObjects.URLStatus(siteId, url_md5)
92  result = self.wrapper.urlStatus(urlStatusObj, True)
93  if len(result) > 0 and type(result[0]) == dc.EventObjects.URL:
94  print "Depth == " + str(result[0].depth)
95 
96  params = []
97  params.append((siteId, "http://ibm1.com", 0, "32323232", 10, 10,
98  "GET", "0b41d5052c7a52f5927ae7114cb288e9", 1, 1, "test", 71))
99  params.append((siteId, "http://ibm2.com", 0, "55656565", 10, 10,
100  "GET", "0b41d5052c7a52f5927ae7114cb288e9", 1, 1, "text/xml", 71))
101  self.urlNew(params)
102 
103  props_params = []
104  props_params.append(("0b41d5052c7a52f5927ae7114cb288e9", "HTTP_POST_FORM_AAA", "AAA"))
105  props_params.append(("0b41d5052c7a52f5927ae7114cb288e9", "HTTP_POST_FORM_BBB", "BBB"))
106  self.insertNewSiteProperties(siteId, props_params)
107 
Here is the call graph for this function:

◆ urlNew()

def ftests.ftest_DBTasksWrapper.Test.urlNew (   self,
  params 
)

Definition at line 32 of file ftest_DBTasksWrapper.py.

32  def urlNew(self, params):
33  ret = 0
34  if params is not None:
35  if hasattr(params, '__iter__'):
36  urlsList = []
37  for param in params:
38  urlObject = dc.EventObjects.URL(param[0], param[1])
39  urlObject.type = param[2]
40  urlObject.urlMd5 = param[3]
41  urlObject.requestDelay = param[4]
42  urlObject.httpTimeout = param[5]
43  urlObject.httpMethod = param[6]
44  urlObject.parentMd5 = param[7]
45  urlObject.maxURLsFromPage = param[8]
46  urlObject.tcDate = SQLExpression("NOW()")
47  urlObject.UDate = SQLExpression("NOW()")
48  urlObject.depth = param[9]
49  urlObject.contentType = param[10]
50  urlObject.priority = param[11]
51  urlsList.append(urlObject)
52  ret = self.wrapper.urlNew(urlsList)
53  return ret
54 
55 
Here is the caller graph for this function:

Member Data Documentation

◆ CFG_NAME

string ftests.ftest_DBTasksWrapper.Test.CFG_NAME = "./db-task2.ini"
static

Definition at line 18 of file ftest_DBTasksWrapper.py.

◆ wrapper

ftests.ftest_DBTasksWrapper.Test.wrapper

Definition at line 25 of file ftest_DBTasksWrapper.py.


The documentation for this class was generated from the following file: