HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ftest_test_async_MsgSend.py
Go to the documentation of this file.
1 '''
2 Created on Mar 3, 2014
3 
4 @author: igor
5 '''
6 
7 from app.BaseServerManager import BaseServerManager
8 from app.PollerManager import PollerManager
9 from transport.ConnectionBuilderLight import ConnectionBuilderLight
10 from dtm.Constants import EVENT_TYPES
11 import transport.Consts
12 import random
13 import sys
14 
15 clientConnectName = "testConnect"
16 
17 
18 class TestRequest(object):
19 
20  def __init__(self, uid, body):
21  self.uid = uid
22  self.body = body
23 
24 
26 
27  def __init__(self, iterNumber, body):
28  super(Client, self).__init__()
29  self.iterNumber = iterNumber
30  self.setEventHandler(EVENT_TYPES.NEW_TASK, self.newTaskHandler)
31  # request id - response
32  self.messages = dict()
33  self.is_send = False
34  self.body = body
35 
36 
37  def newTaskHandler(self, event):
38  if event.eventType != EVENT_TYPES.NEW_TASK:
39  raise Exception("get wrong event type " + str(event.eventType))
40 
41  testRequest = event.eventObj
42  if testRequest.uid in self.messages:
43  if testRequest.body == self.messages[testRequest.uid]:
44  del self.messages[testRequest.uid]
45  self.iterNumber = self.iterNumber - 1
46  if self.iterNumber == 0 and len(self.messages) == 0:
47  self.exit_flag = True
48  print "EXIT " + self.body
49 
50 
51  def on_poll_timeout(self):
52  if not self.is_send:
53  self.sendRequest(self.body)
54  self.is_send = True
55 
56 
57  def sendRequest(self, body):
58  for iter in xrange(0, self.iterNumber):
59  testRequest = TestRequest(iter, body + str(iter))
60  event = self.eventBuilder.build(EVENT_TYPES.NEW_TASK, testRequest)
61  self.messages[testRequest.uid] = testRequest.body
62  self.connections[clientConnectName].send(event)
63 
64 
65 
67 
68 
69  def __init__(self, pollerManager, totalExpectRequest):
70  super(Server, self).__init__(pollerManager)
71  self.events = list()
72  self.totalExpectRequest = totalExpectRequest
73  self.setEventHandler(EVENT_TYPES.NEW_TASK, self.newTaskHandler)
74 
75 
76  def newTaskHandler(self, event):
77  if event.eventType != EVENT_TYPES.NEW_TASK:
78  raise Exception("get wrong event type " + str(event.eventType))
79 
80  self.events.append(event)
81  if len(self.events) == self.totalExpectRequest: #made random
82  while len(self.events) > 0:
83  index = random.randrange(0, len(self.events))
84  self.reply(self.events[index], self.events[index])
85  del self.events[index]
86 
87 
88 
89 if __name__ == "__main__":
90  connectBuilder = ConnectionBuilderLight()
91  pollerManager = PollerManager(connectBuilder.zmq_poller)
92 
93  serverConnectAdmin = connectBuilder.build(transport.Consts.SERVER_CONNECT, BaseServerManager.ADMIN_CONNECT_ENDPOINT)
94 
95  client_request = 500
96  client_number = 5
97  connect_endpoint = "test_server1"
98  tcp_protocol = False
99 
100  if len(sys.argv) > 1:
101  tcp_protocol = True
102  connect_endpoint = "127.0.0.1:8090"
103 
104  clients = list()
105 
106  server = Server(pollerManager, client_request * client_number)
107  serverConnect = None
108  if not tcp_protocol:
109  serverConnect = connectBuilder.build(transport.Consts.SERVER_CONNECT, connect_endpoint)
110  else:
111  serverConnect = connectBuilder.build(transport.Consts.SERVER_CONNECT, connect_endpoint, transport.Consts.TCP_TYPE)
112 
113  server.addConnection("server", serverConnect)
114  server.start()
115 
116  for index in xrange(0, client_number):
117  client = Client(client_request, "client" + str(index))
118  clientConnect = None
119  if not tcp_protocol:
120  clientConnect = connectBuilder.build(transport.Consts.CLIENT_CONNECT, connect_endpoint)
121  else:
122  clientConnect = connectBuilder.build(transport.Consts.CLIENT_CONNECT, connect_endpoint, transport.Consts.TCP_TYPE)
123 
124  client.addConnection(clientConnectName, clientConnect)
125  clients.append(client)
126  client.start()
127 
128  print "START"
129 
130  for client in clients:
131  client.join()
132 
133  server.exit_flag = True
134  server.join()
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
This is app base class for management server connection end-points and parallel transport messages pr...
Class hides routines of bulding connection objects.
def send(self, connect_name, event)
send event
def __init__(self, pollerManager, totalExpectRequest)