root / trunk / pyopencoin / oc / transports.py

Revision 241, 19.0 kB (checked in by ocmathew, 4 years ago)

Clear some fixmes. Performs some tests. Fix an error with HANDSHAKE

  • Property svn:mime-type set to text/plain
  • Property svn:eol-style set to native
  • Property svn:executable set to *
Line 
1
2"""A transport has a write method, to which the protocol sends new data. The
3   transport send of this data, and delivers the response (or new data without
4   a prior 'write') to the protocols newMessage method.
5
6   This means that the protocol must be happy to be fed newMessages at any
7   point in time.
8 
9   """
10from messages import Message
11
12class Transport:
13
14    def __init__(self):
15        "Constructor. Override this"
16
17    def setProtocol(self,protocol):
18        "This sets the protocol instance that is used with this transport"
19        self.protocol = protocol
20        protocol.setTransport(self)
21
22    def write(self,message):
23        "The protocol will write into this"
24
25    def newMessage(self,message):
26        """Once data has been read (by whatever means) this method is called to
27        put in the new message to deliver it to the protocol"""
28        if message:
29            self.protocol.newMessage(message)
30
31    def start(self):
32        """start the transport"""
33
34class SocketServerHandler:
35    """SocketServerHandler accepts connections on a socket, makes a SocketServerTransport, and feeds the transport to function."""
36    def __init__(self, addr, port, function):
37        self.addr = addr
38        self.port = port
39        self.debug = False
40        self.function = function
41
42    def start(self):
43
44        import socket
45        self.runserver = True
46        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
47        s.bind((self.addr, self.port))
48        s.listen(5)
49        self.socket = s
50
51        while self.runserver:
52            incoming_sock, incoming_addr = self.socket.accept()
53            sst = SocketServerTransport(incoming_sock, self.debug)
54
55            # TODO: Add in ability to use select, or spawn off a thread, or something
56
57            self.function(sst)
58           
59
60
61class SocketServerTransport(Transport):
62    """ No idea how to test this with a doctest
63
64        So, please run testWalletServer.py then testClientServer.py in different
65        shells
66
67    """
68    def __init__(self, sock, debug=False):
69        self.conn = sock
70        self.debug = debug
71
72    def start(self):
73
74        read = ''
75
76        try:
77            data = self.conn.recv(2048)
78        except socket.error:
79            self.close()
80            return
81
82        while data:
83            data  = data.replace('\r','')
84            read = read + data
85               
86            # Read through, trying to find a full message.
87            position = 0
88            found = 0
89            quotes = False
90            braces = 0
91            for c in read:
92                if c == '"':
93                    quotes = not quotes
94
95                elif c == '[':
96                    if not quotes:
97                        braces = braces + 1
98
99                elif c == ']':
100                    if not quotes:
101                        braces = braces - 1
102                        if braces == 0:
103                            found = position
104                            break
105
106                position = position + 1
107
108            if found:
109                try:
110                    m = Message(jsontext=read[:found + 1])
111                except Exception, e:
112                    raise
113                else:
114                    read = read[found + 1:]
115
116                try:
117                    if self.debug:
118                        print m
119                    self.newMessage(m)
120                except Exception, e:
121                    raise
122
123
124            # The socket may already be closed. Check.
125            if not self.conn:
126                break
127
128            # read more information
129            try:
130                data = self.conn.recv(2048)
131            except socket.error:
132                self.close()
133                return
134
135        # No more data, the connection is closed. Close the socket
136        if self.conn:
137            self.conn.close()
138
139        # TODO: Somehow signal death. Remove ourselves from a select or kill the thread?
140
141    def write(self,message):
142        if self.debug:
143            print message
144
145        if message:
146            try:
147                self.conn.send(message.toJson())
148            except socket.error:
149                self.close()
150                return
151
152        if self.protocol.done:
153            self.close()
154
155    def close(self):
156        if self.debug:
157            print 'Closing socket'
158        self.conn.close()
159        self.conn = None
160        return
161           
162
163class SocketClientTransport(Transport):
164    """
165    >>> import entities
166    >>> #w = entities.Wallet()
167    >>> #sct = SocketClientTransport('copycan.org',12008)
168    >>> #w.sendMoney(sct)
169
170    """
171    def __init__(self,addr,port):
172        self.addr = addr
173        self.port = port
174        self.debug = 0
175
176    def start(self):
177        import socket
178        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
179        self.socket.connect((self.addr, self.port))
180       
181    def write(self,message):   
182        if message:
183            try:
184                self.socket.send(message.toJson())
185            except socket.error:
186                self.close()
187                return
188
189        if self.protocol.done:
190            self.close()
191            return
192       
193        else:           
194            if self.debug:
195                print 'Message.type: %s' % message.type
196            read = ''
197
198            try:
199                data = self.socket.recv(2048)
200            except socket.error:
201                self.close()
202                return
203
204            while data:
205                data = data.replace('\r','')
206                read = read + data
207               
208                # Read through, trying to find a full message.
209                position = 0
210                found = 0
211                quotes = False
212                braces = 0
213                for c in read:
214                    if c == '"':
215                        quotes = not quotes
216
217                    elif c == '[':
218                        if not quotes:
219                            braces = braces + 1
220
221                    elif c == ']':
222                        if not quotes:
223                            braces = braces - 1
224                            if braces == 0:
225                                found = position
226                                break
227
228                    position = position + 1
229
230                if found:
231                    try:
232                        m = Message(jsontext=read[:found + 1])
233                    except Exception, e:
234                        raise
235                    else:
236                        # Remove the message from read
237                        read = read[found + 1:]
238
239                    try:
240                        if self.debug:
241                            print m
242                        self.newMessage(m)
243                    except Exception, e:
244                        raise
245
246                # read more information unless we had a message queued
247                if not found and self.socket:
248                    try:
249                        data = self.socket.recv(2048)
250                    except socket.error:
251                        self.close()
252                        return
253                else:
254                    data = ''
255
256    def close(self):
257        if self.debug:
258            print 'Closing socket'
259        self.socket.close()
260        self.socket = None
261        return
262
263
264       
265
266class HTTPClientTransport(Transport):
267    'doctest disabled while offline'
268    """To use if the other side is reachable via http.
269
270    >>> import urllib
271    >>> t = HTTPClientTransport('https://opencoin.org/Members/jhb/testresponse')
272
273    This is for testing only
274    >>> t.messages = []
275    >>> t.newMessage = t.messages.append
276
277    Now write a message
278    >>> m = Message('TestMessage',{'foo':'bar'})
279    >>> t.write(m)
280
281    Look what we've got
282    >>> t.messages[0]
283    <Message('TestResponse',{'everything': 'good'})>
284    """
285
286    def __init__(self,url):
287        self.url = url
288
289    def write(self,message):
290        import urllib
291        data = message.toJson()
292        response = urllib.urlopen(self.url,data).read()
293        self.newMessage(Message(jsontext=response))
294
295class SimpleTestTransport(Transport):
296    """
297    >>> from entities import Wallet
298    >>> w = Wallet()
299    >>> st = SimpleTestTransport()
300    >>> w.sendMoney(st)
301    >>> st.send('foo')
302    <Message('sendMoney',[1, 2])>
303    """
304    def __init__(self):
305        self.messages = []
306
307    def write(self,message):
308        if message:
309            self.messages.append(message)
310
311    def read(self):
312        'return one buffered message. Returns None if there are not any'
313        if self.messages:
314            return self.messages.pop(0)
315        else:
316            return None
317
318 
319    def send(self,type,data=None):
320        """a shortcut for doing
321         
322          transport.write(Message("foo",[somedata,..]))
323          transport.read()
324       
325        instead just
326
327          transport.send("foo",[somedata,...])"""
328
329        self.newMessage(Message(type,data))
330        return self.read()
331
332
333################# These two are for real testing ######################
334
335class ServerTest(Transport):
336   
337    def __init__(self, other=None, autocontinue=1, autoprint='message'):
338        self.buffer = None
339        self.autocontinue = autocontinue
340        self.autoprint = autoprint
341        self.log = []
342
343        if other:
344            #Lets connect the two
345            self.other = other
346            other.other = self
347
348    def write(self,message):
349        if message:
350            l = '%s %s' % (self.nick, message)
351            if self.autoprint == 'message':
352                print '%s %s' % (self.nick, message)
353            elif self.autoprint == 'json':
354                print '%s: %s' % (self.nick, message.toJson())
355               
356            self.log.append((self.nick,message))
357            if message.type != 'finished':
358                if self.autocontinue:
359                    #print 'transport'
360                    self.other.newMessage(message)
361                else:
362                    self.buffer = message
363
364    def next(self):
365        if self.buffer:
366            m = self.buffer
367            self.buffer = None
368            self.other.newMessage(m)
369   
370
371    def printlog(self):
372        print '\n'.join(['%s: %s' % (l[0],l[1].toJson()) for l in self.log])
373           
374
375class ClientTest(ServerTest):
376    """
377    >>> from entities import Wallet
378    >>> client = Wallet()
379    >>> server = Wallet()
380    >>> t = ClientTest(server.receiveMoney,clientnick='walletA',servernick='walletB')
381    >>> client.sendMoney(t)
382    walletA <Message('sendMoney',[1, 2])>
383    walletB <Message('Receipt',None)>
384    walletA <Message('GOODBYE',None)>
385    walletB <Message('GOODBYE',None)>
386
387
388
389    """
390       
391    def __init__(self, callback, clientnick=None, servernick=None, autocontinue=1, autoprint='message', **kwargs):
392        self.callback = callback
393        self.kwargs = kwargs
394        self.nick=clientnick or 'Client'
395        self.servernick = servernick or 'Server'
396        self.log = []
397
398        self.autocontinue = autocontinue
399        self.buffer = None
400        self.autoprint = autoprint
401
402    def start(self):
403        servertransport = ServerTest(self,autoprint=self.autoprint)
404        servertransport.nick = self.servernick
405        kwargs = self.kwargs
406        self.callback(servertransport,**kwargs)
407
408   
409class SocketClientTest(Transport):
410
411    """
412    >>> from entities import Wallet
413    >>> client = Wallet()
414    >>> server = Wallet()
415    >>> t = SocketClientTest('127.0.0.1',3456,server.receiveMoney)
416    >>> #client.sendMoney(t)
417
418
419    """
420
421
422    def __init__(self,addr,port,callback,**kwargs):
423        self.kwargs = kwargs
424        self.addr = addr
425        self.port = port
426        self.callback = callback
427        self.debug = 1
428       
429    def start(self):
430        # Okay. This doesn't work for a few reasons.
431        # servertransport gets set up as the transport to use. The socket is not opened at this time though.
432        servertransport = SocketServerTransport(self.addr,self.port)
433       
434        # We try to connect to the socket. It fails becuase the socket isn't open
435        import socket
436        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
437        self.socket.connect((self.addr, self.port))
438
439        # We never get here.
440        kwargs = self.kwargs
441        self.callback(servertransport,**kwargs)
442
443        # Okay. To fix this, we have to do a few things. ServerSocketTransport is written using blocking
444        # sockets. And it never yields. That means that once we start a ServerSocketTransport, or a
445        # SocketClientTest we never give back control for the other side. It works great with tests using
446        # different python instances or threads. It doesn't work, however if we don't have those things
447        # setup to use.
448
449        # Now, to fix this, you can stick a SocketServerTransport in it's own thread. Once you set it off,
450        # it should work find. However, to make sure you don't do anything weird, add a timeout to the
451        # socket on listen and remove the loop, so it doens't continue forever. And also, you should
452        # probably throw in something to make sure the thread is closed before you can continue, if python
453        # allows things like that.
454
455        # - Mathew
456       
457    def write(self,message):   
458        self.socket.send(message.toJson())
459        if message.type == 'finished':
460            self.socket.close()
461            return
462        else:           
463            data = self.socket.recv(2048)
464            if self.debug:
465                print message
466            self.newMessage(Message(jsontext=data))
467
468
469
470######################### old stuff #############################################
471
472class ServerTestTransport(Transport):
473    """
474    This is really really weird. We test two entities,
475    they write to each other, and we can see the whole
476    conversation.
477
478    XXX need a way to inject somewhere....
479    maybe if we had a servertransport.test(server.callback) method
480    that would yield every so often...
481
482    >>> from entities import Wallet
483    >>> client = Wallet()
484    >>> server = Wallet()
485    >>> t = ServerTestTransport(client.sendMoney)
486    >>> server.receiveMoney(t)
487    Client <Message('sendMoney',[1, 2])>
488    Server <Message('Receipt',None)>
489    Client <Message('GOODBYE',None)>
490    Server <Message('GOODBYE',None)>
491    """
492
493    def __init__(self,callback,**kwargs):
494        self.callback = callback
495        self.kwargs = kwargs
496        self.log = []
497
498    def start(self):
499        clienttransport = ClientTestTransport(self)
500        kwargs = self.kwargs
501        self.callback(clienttransport,**kwargs)
502
503    def write(self,message):
504        if message:
505            l = 'Server %s' % message
506            print l
507            self.log.append(l)
508            if message.type != 'finished':
509                #print 'transport'
510                self.other.newMessage(message)
511
512    def printlog(self):
513        print '\n'.join(self.log)
514
515class ClientTestTransport(Transport):
516   
517    def __init__(self,other=None):
518        if other:
519            #Lets connect the two
520            self.other = other
521            other.other = self
522
523    def write(self,message):
524        if message:
525            l = 'Client %s' %message
526            print l
527            self.other.log.append(l)
528            if message.type != 'finished':
529                #print 'transport'
530                self.other.newMessage(message)
531
532
533class ServerTestTransport2(Transport):
534    """
535   
536    >>> from entities import Wallet
537    >>> client = Wallet()
538    >>> server = Wallet()
539    >>> t = ServerTestTransport2(client.sendMoney)
540    >>> server.receiveMoney(t)
541    >>> ct = t.clienttransport
542    >>> m = ct.buffer
543    >>> m
544    <Message('sendMoney',[1, 2])>
545
546    >>> t.newMessage(m)
547    >>> m2 = t.buffer
548    >>> m2
549    <Message('Receipt',None)>
550
551    XXX in the end the ServerTestTransport could be simplified - it
552    does not really need the client on the other side.
553
554    >>> w2 = Wallet()
555    >>> tt = SimpleTestTransport()
556    >>> w2.listen(tt)
557   
558    XXX All nice, but how do I get the listen method to switch after the handshake?
559    """
560
561    def __init__(self,callback):
562        self.callback = callback
563        self.buffer = None
564
565    def start(self):
566        self.clienttransport = ClientTestTransport2(self)
567        self.callback(self.clienttransport)
568
569    def write(self,message):
570        self.buffer = message
571        return
572        if message.type != 'finished':
573            self.other.newMessage(message)
574
575
576class ClientTestTransport2(Transport):
577   
578    def __init__(self,other=None):
579        if other:
580            #Lets connect the two
581            self.other = other
582            other.other = self
583        self.buffer = None
584
585    def write(self,message):
586        self.buffer = message
587        return
588        if message.type != 'finished':
589            self.other.newMessage(message)
590
591
592
593#Server will call start on the transport
594#Should trigger client to start
595#client writes message
596
597
598
599
600
601class TestingTransport(Transport):
602    """
603    # This class does nothing good right now, pretty much ignore it #
604   
605    Some tricking around to be able to test other transports. This one
606    connects to another Transport, assuming that we manually read and write
607    data, while the other side behaves like a 'real' transport.
608   
609    Note that on a TestingTransport you have the convinience method of
610    'send'.
611
612    >>> from entities import Wallet
613    >>> w = Wallet()
614    >>> ct = HTTPClientTransport('http://opencoin.org/testwallet')
615
616    >>> tt = TestingTransport()
617    >>> tt.connect(ct)
618   
619    >>> w.sendMoney(ct)
620    >>> tt.read()
621    <Message('sendMoney',[1, 2])>
622   
623    >>> tt.read()
624
625    >>> #tt.send('foobar')
626    <Message('PROTOCOL_ERROR','send again')>
627
628    >>> tt.send('Receipt')
629    <Message('GOODBYE',None)>
630
631    >>> tt.send('Another receipt')
632
633    """
634
635    def __init__(self):
636        self.messages = []
637
638    def connect(self,otherTransport):
639        'This will hook up with another Transport, directly feeding into it'
640       
641        self.other = otherTransport
642
643        #Just a hack, does nothing clever yet. Instead of replacing the write,
644        #it should do something that actually uses it. Now idea yet how to do
645        # it
646        self.other.write = self.newMessage
647
648    def write(self,message):
649        'directly pass on the message to the other transport'
650
651        self.other.newMessage(message)
652
653    def send(self,type,data=None):
654        """a shortcut for doing
655         
656          transport.write(Message("foo",[somedata,..]))
657          transport.read()
658       
659        instead just
660
661          transport.send("foo",[somedata,...])"""
662
663        self.write(Message(type,data))
664        return self.read()
665
666    def newMessage(self,message):
667        'We store messages, so that we than can read them'
668        self.messages.append(message)
669
670    def read(self):
671        'return one buffered message. Returns None if there are not any'
672        if self.messages:
673            return self.messages.pop(0)
674        else:
675            return None
676
677    def send(self,type,data=None):
678        """a shortcut for doing
679         
680          transport.write(Message("foo",[somedata,..]))
681          transport.read()
682       
683        instead just
684
685          transport.send("foo",[somedata,...])"""
686
687        self.write(Message(type,data))
688        return self.read()
689
690if __name__ == "__main__":
691    import doctest
692    doctest.testmod(optionflags=doctest.ELLIPSIS)
Note: See TracBrowser for help on using the browser.