HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ftest_DBTasksWrapper.py
Go to the documentation of this file.
1 '''
2 Created on Oct 28, 2014
3 
4 @author: scorp
5 '''
6 import unittest
7 import ppath # pylint: disable=W0611
8 from ppath import sys
9 import logging.config
10 import ConfigParser
11 import dc_crawler.DBTasksWrapper as DBTasksWrapper
12 import dc.EventObjects
13 from app.Utils import SQLExpression
14 
15 
16 class Test(unittest.TestCase):
17 
18  CFG_NAME = "./db-task2.ini"
19 
20 
21  def setUp(self):
22  cfgParser = ConfigParser.ConfigParser()
23  cfgParser.read(self.CFG_NAME)
24  logging.config.fileConfig(cfgParser.get("TasksManager", "log_cfg"))
25  self.wrapper = DBTasksWrapper.DBTasksWrapper(cfgParser)
26 
27 
28  def tearDown(self):
29  pass
30 
31 
32  def urlNew(self, params):
33  ret = 0
34  if params is not None:
35  if hasattr(params, '__iter__'):
36  urlsList = []
37  for param in params:
38  urlObject = dc.EventObjects.URL(param[0], param[1])
39  urlObject.type = param[2]
40  urlObject.urlMd5 = param[3]
41  urlObject.requestDelay = param[4]
42  urlObject.httpTimeout = param[5]
43  urlObject.httpMethod = param[6]
44  urlObject.parentMd5 = param[7]
45  urlObject.maxURLsFromPage = param[8]
46  urlObject.tcDate = SQLExpression("NOW()")
47  urlObject.UDate = SQLExpression("NOW()")
48  urlObject.depth = param[9]
49  urlObject.contentType = param[10]
50  urlObject.priority = param[11]
51  urlsList.append(urlObject)
52  ret = self.wrapper.urlNew(urlsList)
53  return ret
54 
55 
56  def insertNewSiteProperties(self, siteId, params):
57  if siteId is not None and hasattr(params, '__iter__') and len(params) > 0:
58  localSiteUpdate = dc.EventObjects.SiteUpdate(siteId)
59  for attr in localSiteUpdate.__dict__:
60  if hasattr(localSiteUpdate, attr):
61  setattr(localSiteUpdate, attr, None)
62  localSiteUpdate.updateType=dc.EventObjects.SiteUpdate.UPDATE_TYPE_APPEND
63  localSiteUpdate.id = siteId
64  localSiteUpdate.properties = []
65  for param in params:
66  newPropElem = {}
67  newPropElem["siteId"] = param[0]
68  newPropElem["name"] = param[1]
69  newPropElem["value"] = param[2]
70  localSiteUpdate.properties.append(newPropElem)
71  self.wrapper.siteNewOrUpdate(localSiteUpdate, stype=dc.EventObjects.SiteUpdate)
72 
73 
74  def test_CustomRequest(self):
75  newObject = dc.EventObjects.Site("http://ibm.com")
76  self.wrapper.siteNewOrUpdate(newObject)
77  selectStr ="SELECT COUNT(*) FROM urls_0b41d5052c7a52f5927ae7114cb288e9 " + \
78  "WHERE NOT (Status=4 AND Crawled=0 AND Processed=0)"
79  result = self.wrapper.customRequest(selectStr, "dc_urls")
80  if len(result) > 0 and len(result[0]) > 0:
81  print result[0][0]
82 
83  url_md5 = "0b41d5052c7a52f5927ae7114cb288e9"
84  siteId = "0b41d5052c7a52f5927ae7114cb288e9"
85  urlUpdateObj = dc.EventObjects.URLUpdate(siteId, url_md5, dc.EventObjects.URLStatus.URL_TYPE_MD5)
86  urlUpdateObj.tcDate = SQLExpression("NOW()")
87  urlUpdateObj.status = 2
88  urlUpdateObj.depth = 32
89  self.wrapper.urlUpdate(urlUpdateObj, "`State`=0")
90 
91  urlStatusObj = dc.EventObjects.URLStatus(siteId, url_md5)
92  result = self.wrapper.urlStatus(urlStatusObj, True)
93  if len(result) > 0 and type(result[0]) == dc.EventObjects.URL:
94  print "Depth == " + str(result[0].depth)
95 
96  params = []
97  params.append((siteId, "http://ibm1.com", 0, "32323232", 10, 10,
98  "GET", "0b41d5052c7a52f5927ae7114cb288e9", 1, 1, "test", 71))
99  params.append((siteId, "http://ibm2.com", 0, "55656565", 10, 10,
100  "GET", "0b41d5052c7a52f5927ae7114cb288e9", 1, 1, "text/xml", 71))
101  self.urlNew(params)
102 
103  props_params = []
104  props_params.append(("0b41d5052c7a52f5927ae7114cb288e9", "HTTP_POST_FORM_AAA", "AAA"))
105  props_params.append(("0b41d5052c7a52f5927ae7114cb288e9", "HTTP_POST_FORM_BBB", "BBB"))
106  self.insertNewSiteProperties(siteId, props_params)
107 
108 if __name__ == "__main__":
109 #import sys;sys.argv = ['', 'Test.testName']
110  unittest.main()
def insertNewSiteProperties(self, siteId, params)