Definition at line 16 of file ftest_DBTasksWrapper.py.
◆ insertNewSiteProperties()
def ftests.ftest_DBTasksWrapper.Test.insertNewSiteProperties |
( |
|
self, |
|
|
|
siteId, |
|
|
|
params |
|
) |
| |
Definition at line 56 of file ftest_DBTasksWrapper.py.
56 def insertNewSiteProperties(self, siteId, params):
57 if siteId
is not None and hasattr(params,
'__iter__')
and len(params) > 0:
59 for attr
in localSiteUpdate.__dict__:
60 if hasattr(localSiteUpdate, attr):
61 setattr(localSiteUpdate, attr,
None)
62 localSiteUpdate.updateType=dc.EventObjects.SiteUpdate.UPDATE_TYPE_APPEND
63 localSiteUpdate.id = siteId
64 localSiteUpdate.properties = []
67 newPropElem[
"siteId"] = param[0]
68 newPropElem[
"name"] = param[1]
69 newPropElem[
"value"] = param[2]
70 localSiteUpdate.properties.append(newPropElem)
◆ setUp()
def ftests.ftest_DBTasksWrapper.Test.setUp |
( |
|
self | ) |
|
Definition at line 21 of file ftest_DBTasksWrapper.py.
22 cfgParser = ConfigParser.ConfigParser()
23 cfgParser.read(self.CFG_NAME)
24 logging.config.fileConfig(cfgParser.get(
"TasksManager",
"log_cfg"))
25 self.wrapper = DBTasksWrapper.DBTasksWrapper(cfgParser)
◆ tearDown()
def ftests.ftest_DBTasksWrapper.Test.tearDown |
( |
|
self | ) |
|
◆ test_CustomRequest()
def ftests.ftest_DBTasksWrapper.Test.test_CustomRequest |
( |
|
self | ) |
|
Definition at line 74 of file ftest_DBTasksWrapper.py.
74 def test_CustomRequest(self):
76 self.wrapper.siteNewOrUpdate(newObject)
77 selectStr =
"SELECT COUNT(*) FROM urls_0b41d5052c7a52f5927ae7114cb288e9 " + \
78 "WHERE NOT (Status=4 AND Crawled=0 AND Processed=0)" 79 result = self.wrapper.customRequest(selectStr,
"dc_urls")
80 if len(result) > 0
and len(result[0]) > 0:
83 url_md5 =
"0b41d5052c7a52f5927ae7114cb288e9" 84 siteId =
"0b41d5052c7a52f5927ae7114cb288e9" 86 urlUpdateObj.tcDate = SQLExpression(
"NOW()")
87 urlUpdateObj.status = 2
88 urlUpdateObj.depth = 32
89 self.wrapper.urlUpdate(urlUpdateObj,
"`State`=0")
92 result = self.wrapper.urlStatus(urlStatusObj,
True)
94 print "Depth == " + str(result[0].depth)
97 params.append((siteId,
"http://ibm1.com", 0,
"32323232", 10, 10,
98 "GET",
"0b41d5052c7a52f5927ae7114cb288e9", 1, 1,
"test", 71))
99 params.append((siteId,
"http://ibm2.com", 0,
"55656565", 10, 10,
100 "GET",
"0b41d5052c7a52f5927ae7114cb288e9", 1, 1,
"text/xml", 71))
104 props_params.append((
"0b41d5052c7a52f5927ae7114cb288e9",
"HTTP_POST_FORM_AAA",
"AAA"))
105 props_params.append((
"0b41d5052c7a52f5927ae7114cb288e9",
"HTTP_POST_FORM_BBB",
"BBB"))
106 self.insertNewSiteProperties(siteId, props_params)
◆ urlNew()
def ftests.ftest_DBTasksWrapper.Test.urlNew |
( |
|
self, |
|
|
|
params |
|
) |
| |
Definition at line 32 of file ftest_DBTasksWrapper.py.
32 def urlNew(self, params):
34 if params
is not None:
35 if hasattr(params,
'__iter__'):
39 urlObject.type = param[2]
40 urlObject.urlMd5 = param[3]
41 urlObject.requestDelay = param[4]
42 urlObject.httpTimeout = param[5]
43 urlObject.httpMethod = param[6]
44 urlObject.parentMd5 = param[7]
45 urlObject.maxURLsFromPage = param[8]
46 urlObject.tcDate = SQLExpression(
"NOW()")
47 urlObject.UDate = SQLExpression(
"NOW()")
48 urlObject.depth = param[9]
49 urlObject.contentType = param[10]
50 urlObject.priority = param[11]
51 urlsList.append(urlObject)
52 ret = self.wrapper.urlNew(urlsList)
◆ CFG_NAME
string ftests.ftest_DBTasksWrapper.Test.CFG_NAME = "./db-task2.ini" |
|
static |
◆ wrapper
ftests.ftest_DBTasksWrapper.Test.wrapper |
The documentation for this class was generated from the following file: