1 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
4 This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
5 1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
6 It also uses the App abstract class to implement the applications.
7 2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
8 derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
9 various streams from the client, represented using the Stream class.
10 3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
11 and write of FLV file format.
14 Typically an application can launch this server as follows:
17 To know the command line options use the -h option:
20 To start the server with a different directory for recording and playing FLV files from, use the following command.
21 $ python rtmp.py -r some-other-directory/
22 Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
24 A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
25 from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
26 local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
27 same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
28 from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
29 it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
30 see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
31 your played video will start appearing after some initial delay.
34 If an application wants to use this module as a library, it can launch the server as follows:
35 >>> agent = FlashServer() # a new RTMP server instance
36 >>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
37 >>> agent.start() # start the server
38 >>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
41 If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
42 setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
43 the client connects to the server.
45 class MyApp(App): # a new MyApp extends the default App in rtmp module.
46 def __init__(self): # constructor just invokes base class constructor
48 def onConnect(self, client, *args):
49 result = App.onConnect(self, client, *args) # invoke base class method first
50 def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client
51 client.call('connected', 'some-arg')
53 multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback
55 agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
57 Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
58 If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
59 throw an exception and display the error message.
63 import os
, sys
, time
, struct
, socket
, traceback
, multitask
64 import threading
, Queue
66 from string
import strip
68 from BaseHTTPServer
import BaseHTTPRequestHandler
69 from SimpleAppHTTPServer
import SimpleAppHTTPRequestHandler
70 from cStringIO
import StringIO
71 from cookielib
import parse_ns_headers
, CookieJar
72 from Cookie
import SimpleCookie
80 class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler
):
83 self
.connection
= self
.request
84 self
.rfile
= StringIO()
85 self
.wfile
= StringIO()
93 def get_full_url(self
):
96 def get_header(self
, hdr
, default
):
97 return self
.headers
.getheader(hdr
, default
)
99 def add_cookies(self
):
100 for k
, v
in self
.response_cookies
.items():
101 val
= v
.OutputString()
102 self
.send_header("Set-Cookie", val
)
104 def process_cookies(headers
, remote
, cookie_key
="Cookie", add_sess
=True):
105 ch
= headers
.getheaders(cookie_key
)
106 print "messageReceived cookieheaders=", '; '.join(ch
)
113 c
= filter(lambda x
: x
, c
)
116 response_cookies
= SimpleCookie()
118 print "found cookie", repr(c
)
119 name
, value
= c
.split("=")
120 response_cookies
[name
] = value
121 #response_cookies[name]['path'] = "/"
122 #response_cookies[name]['domain'] = remote[0]
123 #response_cookies[name]['version'] = 0
124 if name
== "session":
125 response_cookies
[name
]['expires'] = 50000
128 return response_cookies
130 response_cookies
['session'] = uuid
.uuid4().hex
131 response_cookies
['session']['expires'] = 50000
132 #response_cookies['session']['path'] = '/'
133 #response_cookies['session']['domain'] = remote[0]
134 #response_cookies['session']['version'] = 0
135 return response_cookies
137 class ConnectionClosed
:
138 'raised when the client closed the connection'
140 class SockStream(object):
141 '''A class that represents a socket as a stream'''
142 def __init__(self
, sock
):
143 self
.sock
, self
.buffer = sock
, ''
144 self
.bytesWritten
= self
.bytesRead
= 0
147 self
.sock
.shutdown(1) # can't just close it.
153 nl
= self
.buffer.find("\n")
154 if nl
>= 0: # do not have newline in buffer
155 data
, self
.buffer = self
.buffer[:nl
+1], self
.buffer[nl
+1:]
156 raise StopIteration(data
)
157 data
= (yield multitask
.recv(self
.sock
, 4096)) # read more from socket
158 if not data
: raise ConnectionClosed
159 if _debug
: print 'socket.read[%d] %r'%(len(data
), data
)
160 self
.bytesRead
+= len(data
)
162 except StopIteration: raise
163 except: raise ConnectionClosed
# anything else is treated as connection closed.
165 def read(self
, count
):
168 if len(self
.buffer) >= count
: # do not have data in buffer
169 data
, self
.buffer = self
.buffer[:count
], self
.buffer[count
:]
170 raise StopIteration(data
)
171 data
= (yield multitask
.recv(self
.sock
, 4096)) # read more from socket
172 if not data
: raise ConnectionClosed
173 # if _debug: print 'socket.read[%d] %r'%(len(data), data)
174 self
.bytesRead
+= len(data
)
176 except StopIteration: raise
177 except: raise ConnectionClosed
# anything else is treated as connection closed.
179 def unread(self
, data
):
180 self
.buffer = data
+ self
.buffer
182 def write(self
, data
):
183 while len(data
) > 0: # write in 4K chunks each time
184 chunk
, data
= data
[:4096], data
[4096:]
185 self
.bytesWritten
+= len(chunk
)
186 if _debug
: print 'socket.write[%d] %r'%(len(chunk
), chunk
)
187 try: yield multitask
.send(self
.sock
, chunk
)
188 except: raise ConnectionClosed
189 if _debug
: print 'socket.written'
192 class Header(object):
193 FULL
, MESSAGE
, TIME
, SEPARATOR
, MASK
= 0x00, 0x40, 0x80, 0xC0, 0xC0
195 def __init__(self
, channel
=0, time
=0, size
=None, type=None, streamId
=0):
196 self
.channel
, self
.time
, self
.size
, self
.type, self
.streamId
= channel
, time
, size
, type, streamId
197 self
.extendedtime
= 0
198 if channel
<64: self
.hdrdata
= chr(channel
)
199 elif channel
<(64+256): self
.hdrdata
= '\x00'+chr(channel
-64)
200 else: self
.hdrdata
= '\x01'+chr((channel
-64)%256)+chr((channel
-64)/256)
202 def _appendExtendedTimestamp(self
, data
):
203 if self
.time
== 0xFFFFFF:
204 data
+= struct
.pack('>I', self
.extendedtime
)
207 def toBytes(self
, control
):
208 data
= chr(ord(self
.hdrdata
[0]) | control
) + self
.hdrdata
[1:]
209 if control
== Header
.SEPARATOR
: return self
._appendExtendedTimestamp
(data
)
211 data
+= struct
.pack('>I', self
.time
& 0xFFFFFF)[1:] # add time
212 if control
== Header
.TIME
: return self
._appendExtendedTimestamp
(data
)
214 data
+= struct
.pack('>I', self
.size
)[1:] # size
215 data
+= chr(self
.type) # type
216 if control
== Header
.MESSAGE
: return self
._appendExtendedTimestamp
(data
)
218 data
+= struct
.pack('<I', self
.streamId
) # add streamId
219 return self
._appendExtendedTimestamp
(data
)
222 return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
223 % (self
.channel
, self
.time
, self
.size
, self
.type, self
.type or 0, self
.streamId
))
225 class Message(object):
226 # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
227 RPC
, RPC3
, DATA
, DATA3
, SHAREDOBJ
, SHAREDOBJ3
, AUDIO
, VIDEO
, ACK
, CHUNK_SIZE
= \
228 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01
230 def __init__(self
, hdr
=None, data
=''):
231 if hdr
is None: hdr
= Header()
232 self
.header
, self
.data
= hdr
, data
234 # define properties type, streamId and time to access self.header.(property)
235 for p
in ['type', 'streamId', 'time']:
236 exec 'def _g%s(self): return self.header.%s'%(p
, p
)
237 exec 'def _s%s(self, %s): self.header.%s = %s'%(p
, p
, p
, p
)
238 exec '%s = property(fget=_g%s, fset=_s%s)'%(p
, p
, p
)
240 def size(self
): return len(self
.data
)
243 return ("<Message header=%r data=%r>"% (self
.header
, self
.data
))
245 class Protocol(object):
248 DEFAULT_CHUNK_SIZE
= 128
250 PROTOCOL_CHANNEL_ID
= 2
252 def __init__(self
, sock
):
253 self
.stream
= SockStream(sock
)
254 self
.lastReadHeaders
= dict() # indexed by channelId
255 self
.incompletePackets
= dict() #indexed by channelId
256 self
.readChunkSize
= self
.writeChunkSize
= Protocol
.DEFAULT_CHUNK_SIZE
257 self
.lastWriteHeaders
= dict() # indexed by streamId
258 self
.nextChannelId
= Protocol
.MIN_CHANNEL_ID
259 self
.writeLock
= threading
.Lock()
260 self
.writeQueue
= Queue
.Queue()
262 def messageReceived(self
, msg
):
265 def protocolMessage(self
, msg
):
266 if msg
.type == Message
.ACK
: # respond to ACK requests
268 response
.type, response
.data
= msg
.type, msg
.data
269 self
.writeMessage(response
)
270 elif msg
.type == Message
.CHUNK_SIZE
:
271 self
.readChunkSize
= struct
.unpack('>L', msg
.data
)[0]
273 def connectionClosed(self
):
278 yield self
.parseRequests() # parse http requests
279 except ConnectionClosed
:
280 yield self
.connectionClosed()
281 if _debug
: print 'parse connection closed'
283 def writeMessage(self
, message
):
284 self
.writeQueue
.put(message
)
286 def parseHandshake(self
):
287 '''Parses the rtmp handshake'''
288 data
= (yield self
.stream
.read(Protocol
.PING_SIZE
+ 1)) # bound version and first ping
289 yield self
.stream
.write(data
)
290 data
= (yield self
.stream
.read(Protocol
.PING_SIZE
)) # bound second ping
291 yield self
.stream
.write(data
)
293 def parseRequests(self
):
294 '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
295 self
.hr
= MultitaskHTTPRequestHandler(self
.stream
, self
.remote
, None)
296 self
.hr
.close_connection
= 1
297 self
.cookies
= CookieJar()
301 # prepare reading the request so that when it's handed
302 # over to "standard" HTTPRequestHandler, the data's already
304 print "parseRequests"
305 raw_requestline
= (yield self
.stream
.readline())
306 if _debug
: print "parseRequests, line", raw_requestline
307 if not raw_requestline
:
308 raise ConnectionClosed
312 line
= (yield self
.stream
.readline())
314 if line
in ['\n', '\r\n']:
316 except StopIteration:
317 if _debug
: print "parseRequests, stopiter"
321 print 'parseRequests', \
322 (traceback
and traceback
.print_exc() or None)
324 raise ConnectionClosed
326 self
.hr
.raw_requestline
= raw_requestline
327 self
.hr
.rfile
.write(data
)
328 print "parseRequests write after"
329 self
.hr
.rfile
.seek(0)
330 print "parseRequests seek after"
332 if not self
.hr
.parse_request():
333 raise ConnectionClosed
334 print "parseRequests parse_req after"
336 yield self
.messageReceived(self
.hr
)
339 print 'messageReceived', \
340 (traceback
and traceback
.print_exc() or None)
343 '''Writes messages to stream'''
345 while self
.writeQueue
.empty(): (yield multitask
.sleep(0.01))
346 data
= self
.writeQueue
.get() # TODO this should be used using multitask.Queue and remove previous wait.
347 if _debug
: print "write to stream", repr(data
)
349 # just in case TCP socket is not closed, close it.
351 print "stream closing"
353 yield self
.stream
.close()
354 print "stream closed"
359 yield self
.stream
.write(data
)
360 except ConnectionClosed
:
361 yield self
.connectionClosed()
363 print traceback
.print_exc()
365 class Command(object):
366 ''' Class for command / data messages'''
367 def __init__(self
, type=Message
.RPC
, name
=None, id=None, cmdData
=None, args
=[]):
368 '''Create a new command with given type, name, id, cmdData and args list.'''
369 self
.type, self
.name
, self
.id, self
.cmdData
, self
.args
= type, name
, id, cmdData
, args
[:]
372 return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self
.type, self
.name
, self
.id, self
.cmdData
, self
.args
))
374 def setArg(self
, arg
):
375 self
.args
.append(arg
)
377 def getArg(self
, index
):
378 return self
.args
[index
]
381 def fromMessage(cls
, message
):
382 ''' initialize from a parsed RTMP message'''
383 assert (message
.type in [Message
.RPC
, Message
.RPC3
, Message
.DATA
, Message
.DATA3
])
385 length
= len(message
.data
)
386 if length
== 0: raise ValueError('zero length message data')
388 if message
.type == Message
.RPC3
or message
.type == Message
.DATA3
:
389 assert message
.data
[0] == '\x00' # must be 0 in AMD3
390 data
= message
.data
[1:]
394 amfReader
= amf
.AMF0(data
)
397 inst
.name
= amfReader
.read() # first field is command name
400 if message
.type == Message
.RPC
:
401 inst
.id = amfReader
.read() # second field *may* be message id
402 inst
.cmdData
= amfReader
.read() # third is command data
405 inst
.args
= [] # others are optional
407 inst
.args
.append(amfReader
.read())
416 output
= amf
.BytesIO()
417 amfWriter
= amf
.AMF0(output
)
418 amfWriter
.write(self
.name
)
419 if msg
.type == Message
.RPC
or msg
.type == Message
.RPC3
:
420 amfWriter
.write(self
.id)
421 amfWriter
.write(self
.cmdData
)
422 for arg
in self
.args
:
425 #hexdump.hexdump(output)
427 if msg
.type == Message
.RPC3
or msg
.type == Message
.DATA3
:
428 data
= '\x00' + output
.read()
435 def getfilename(path
, name
, root
):
436 '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
437 the the path present in the path variable.'''
438 ignore
, ignore
, scope
= path
.partition('/')
439 if scope
: scope
= scope
+ '/'
440 result
= root
+ scope
+ name
+ '.flv'
441 if _debug
: print 'filename=', result
444 class Stream(object):
445 '''The stream object that is used for RTMP stream.'''
447 def __init__(self
, client
):
448 self
.client
, self
.id, self
.name
= client
, 0, ''
449 self
.recordfile
= self
.playfile
= None # so that it doesn't complain about missing attribute
450 self
.queue
= multitask
.Queue()
451 self
._name
= 'Stream[' + str(Stream
.count
) + ']'; Stream
.count
+= 1
452 if _debug
: print self
, 'created'
455 if _debug
: print self
, 'closing'
456 if self
.recordfile
is not None: self
.recordfile
.close(); self
.recordfile
= None
457 if self
.playfile
is not None: self
.playfile
.close(); self
.playfile
= None
458 self
.client
= None # to clear the reference
465 '''Generator to receive new Message on this stream, or None if stream is closed.'''
466 return self
.queue
.get()
469 '''Method to send a Message or Command on this stream.'''
470 if isinstance(msg
, Command
):
471 msg
= msg
.toMessage()
472 msg
.streamId
= self
.id
473 # if _debug: print self,'send'
474 if self
.client
is not None: self
.client
.writeMessage(msg
)
476 class Client(Protocol
):
477 '''The client object represents a single connected client to the server.'''
478 def __init__(self
, sock
, server
, remote
):
479 Protocol
.__init
__(self
, sock
)
485 self
._nextStreamId
= 1
486 self
.objectEncoding
= 0.0
487 self
.queue
= multitask
.Queue() # receive queue used by application
488 multitask
.add(self
.parse())
489 multitask
.add(self
.write())
492 '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
493 return self
.queue
.get()
495 def connectionClosed(self
):
496 '''Called when the client drops the connection'''
497 if _debug
: 'Client.connectionClosed'
498 self
.writeMessage(None)
499 yield self
.queue
.put((None,None))
501 def messageReceived(self
, msg
):
502 if _debug
: print 'messageReceived cmd=', msg
.command
, msg
.path
503 msg
.response_cookies
= process_cookies(msg
.headers
, self
.remote
)
505 if msg
.headers
.has_key('content-length'):
506 max_chunk_size
= 10*1024*1024
507 size_remaining
= int(msg
.headers
["content-length"])
509 while size_remaining
:
510 chunk_size
= min(size_remaining
, max_chunk_size
)
511 data
= (yield self
.stream
.read(chunk_size
))
513 size_remaining
-= len(L
[-1])
515 pos
= msg
.rfile
.tell()
516 msg
.rfile
.write(''.join(L
))
519 yield self
.server
.queue
.put((self
, msg
)) # new connection
522 '''Method to accept an incoming client.'''
524 response
.id, response
.name
, response
.type = 1, '_result', Message
.RPC
525 if _debug
: print 'Client.accept() objectEncoding=', self
.objectEncoding
526 response
.setArg(dict(level
='status', code
='NetConnection.Connect.Success',
527 description
='Connection succeeded.', details
=None,
528 objectEncoding
=self
.objectEncoding
))
529 self
.writeMessage(response
.toMessage())
531 def rejectConnection(self
, reason
=''):
532 '''Method to reject an incoming client.'''
534 response
.id, response
.name
, response
.type = 1, '_error', Message
.RPC
535 response
.setArg(dict(level
='status', code
='NetConnection.Connect.Rejected',
536 description
=reason
, details
=None))
537 self
.writeMessage(response
.toMessage())
539 def call(self
, method
, *args
):
540 '''Call a (callback) method on the client.'''
542 cmd
.id, cmd
.name
, cmd
.type = self
._nextCallId
, method
, (self
.objectEncoding
== 0.0 and Message
.RPC
or Message
.RPC3
)
543 cmd
.args
, cmd
.cmdData
= args
, None
544 self
._nextCallId
+= 1
545 if _debug
: print 'Client.call method=', method
, 'args=', args
, ' msg=', cmd
.toMessage()
546 self
.writeMessage(cmd
.toMessage())
548 def createStream(self
):
549 ''' Create a stream on the server side'''
550 stream
= Stream(self
)
551 stream
.id = self
._nextStreamId
552 self
.streams
[stream
.id] = stream
553 self
._nextStreamId
+= 1
557 class Server(object):
558 '''A RTMP server listens for incoming connections and informs the app.'''
559 def __init__(self
, sock
):
560 '''Create an RTMP server on the given bound TCP socket. The server will terminate
561 when the socket is disconnected, or some other error occurs in listening.'''
563 self
.queue
= multitask
.Queue() # queue to receive incoming client connections
564 multitask
.add(self
.run())
567 '''Generator to wait for incoming client connections on this server and return
568 (client, args) or (None, None) if the socket is closed or some error.'''
569 return self
.queue
.get()
574 sock
, remote
= (yield multitask
.accept(self
.sock
)) # receive client TCP
576 if _debug
: print 'rtmp.Server accept(sock) returned None.'
578 if _debug
: print 'connection received from', remote
579 sock
.setsockopt(socket
.IPPROTO_TCP
, socket
.TCP_NODELAY
, 1) # make it non-block
580 client
= Client(sock
, self
, remote
)
582 if _debug
: print 'rtmp.Server exception ', (sys
and sys
.exc_info() or None)
583 traceback
.print_exc()
586 try: self
.sock
.close(); self
.sock
= None
589 yield self
.queue
.put((None, None))
592 class BaseApp(object):
593 '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
596 self
.name
= str(self
.__class
__.__name
__) + '[' + str(App
.count
) + ']'; App
.count
+= 1
597 self
.players
, self
.publishers
, self
._clients
= {}, {}, [] # Streams indexed by stream name, and list of clients
598 if _debug
: print self
.name
, 'created'
600 if _debug
: print self
.name
, 'destroyed'
603 '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
604 return self
._clients
[1:] if self
._clients
is not None else []
606 class App(BaseApp
, SimpleAppHTTPRequestHandler
):
609 class HTTPServer(object):
610 '''A RTMP server to record and stream Flash video.'''
612 '''Construct a new HttpServer. It initializes the local members.'''
613 self
.sock
= self
.server
= None
614 self
.apps
= dict({'*': App
}) # supported applications: * means any as in {'*': App}
615 self
.clients
= dict() # list of clients indexed by scope. First item in list is app instance.
618 def start(self
, host
='0.0.0.0', port
=8080):
619 '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
620 if _debug
: print 'start', host
, port
622 sock
= self
.sock
= socket
.socket(type=socket
.SOCK_STREAM
)
623 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
624 sock
.bind((host
, port
))
625 if _debug
: print 'listening on ', sock
.getsockname()
627 server
= self
.server
= Server(sock
) # start rtmp server on that socket
628 multitask
.add(self
.serverlistener())
631 if _debug
: print 'stopping Flash server'
632 if self
.server
and self
.sock
:
633 try: self
.sock
.close(); self
.sock
= None
637 def serverlistener(self
):
638 '''Server listener (generator). It accepts all connections and invokes client listener'''
640 while True: # main loop to receive new connections on the server
641 client
, msg
= (yield self
.server
.recv()) # receive an incoming client connection.
642 # TODO: we should reject non-localhost client connections.
643 if not client
: # if the server aborted abnormally,
644 break # hence close the listener.
645 if _debug
: print 'client connection received', client
, msg
646 # if client.objectEncoding != 0 and client.objectEncoding != 3:
647 if client
.objectEncoding
!= 0:
648 yield client
.rejectConnection(reason
='Unsupported encoding ' + str(client
.objectEncoding
) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
649 yield client
.connectionClosed()
651 print "cookies", str(msg
.response_cookies
)
652 session
= msg
.response_cookies
['session'].value
654 print "serverlistener", name
655 if '*' not in self
.apps
and name
not in self
.apps
:
656 yield client
.rejectConnection(reason
='Application not found: ' + name
)
657 else: # create application instance as needed and add in our list
658 if _debug
: print 'name=', name
, 'name in apps', str(name
in self
.apps
)
659 app
= self
.apps
[name
] if name
in self
.apps
else self
.apps
['*'] # application class
660 print "clients", self
.clients
.keys()
661 if session
in self
.clients
:
662 inst
= self
.clients
[session
][0]
665 msg
.server
= inst
# whew! just in time!
667 methodname
= "on%s" % msg
.command
668 print methodname
, dir(inst
)
669 method
= getattr(inst
, methodname
, None)
670 result
= method(client
, msg
)
671 close_connection
= msg
.close_connection
672 print "close connection", close_connection
674 if _debug
: traceback
.print_exc()
675 yield client
.rejectConnection(reason
='Exception on %s' % methodname
)
677 if result
is True or result
is None:
678 if session
not in self
.clients
:
679 self
.clients
[session
] = [inst
]; inst
._clients
=self
.clients
[session
]
682 data
= msg
.wfile
.read()
685 yield client
.writeMessage(data
)
688 print 'close_connection requested'
690 yield client
.connectionClosed()
691 except ConnectionClosed
:
693 print 'close_connection done'
696 print "result", result
697 yield client
.rejectConnection(reason
='Rejected in onConnect')
698 except StopIteration: raise
701 print 'serverlistener exception', \
702 (sys
and sys
.exc_info() or None)
703 traceback
.print_exc()
705 def clientlistener(self
, client
):
706 '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
709 msg
, arg
= (yield client
.recv()) # receive new message from client
710 if not msg
: # if the client disconnected,
711 if _debug
: print 'connection closed from client'
712 break # come out of listening loop.
713 if msg
== 'command': # handle a new command
714 multitask
.add(self
.clienthandler(client
, arg
))
715 elif msg
== 'stream': # a new stream is created, handle the stream.
717 multitask
.add(self
.streamlistener(arg
))
718 except StopIteration: raise
720 if _debug
: print 'clientlistener exception', (sys
and sys
.exc_info() or None)
721 traceback
.print_exc()
723 # client is disconnected, clear our state for application instance.
724 if _debug
: print 'cleaning up client', client
.path
726 if client
.path
in self
.clients
:
727 inst
= self
.clients
[client
.path
][0]
728 self
.clients
[client
.path
].remove(client
)
729 for stream
in client
.streams
.values(): # for all streams of this client
730 self
.closehandler(stream
)
731 client
.streams
.clear() # and clear the collection of streams
732 if client
.path
in self
.clients
and len(self
.clients
[client
.path
]) == 1: # no more clients left, delete the instance.
733 if _debug
: print 'removing the application instance'
734 inst
= self
.clients
[client
.path
][0]
736 del self
.clients
[client
.path
]
737 if inst
is not None: inst
.onDisconnect(client
)
739 def closehandler(self
, stream
):
740 '''A stream is closed explicitly when a closeStream command is received from given client.'''
741 if stream
.client
is not None:
742 inst
= self
.clients
[stream
.client
.path
][0]
743 if stream
.name
in inst
.publishers
and inst
.publishers
[stream
.name
] == stream
: # clear the published stream
744 inst
.onClose(stream
.client
, stream
)
745 del inst
.publishers
[stream
.name
]
746 if stream
.name
in inst
.players
and stream
in inst
.players
[stream
.name
]:
747 inst
.onStop(stream
.client
, stream
)
748 inst
.players
[stream
.name
].remove(stream
)
749 if len(inst
.players
[stream
.name
]) == 0:
750 del inst
.players
[stream
.name
]
753 def clienthandler(self
, client
, cmd
):
754 '''A generator to handle a single command on the client.'''
755 inst
= self
.clients
[client
.path
][0]
757 if cmd
.name
== '_error':
758 if hasattr(inst
, 'onStatus'):
759 result
= inst
.onStatus(client
, cmd
.args
[0])
760 elif cmd
.name
== '_result':
761 if hasattr(inst
, 'onResult'):
762 result
= inst
.onResult(client
, cmd
.args
[0])
764 res
, code
, args
= Command(), '_result', dict()
765 try: result
= inst
.onCommand(client
, cmd
.name
, *cmd
.args
)
767 if _debug
: print 'Client.call exception', (sys
and sys
.exc_info() or None)
768 code
, args
= '_error', dict()
769 res
.id, res
.name
, res
.type = cmd
.id, code
, (client
.objectEncoding
== 0.0 and Message
.RPC
or Message
.RPC3
)
770 res
.args
, res
.cmdData
= args
, None
771 if _debug
: print 'Client.call method=', code
, 'args=', args
, ' msg=', res
.toMessage()
772 client
.writeMessage(res
.toMessage())
773 # TODO return result to caller
776 def streamlistener(self
, stream
):
777 '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
778 stream
.recordfile
= None # so that it doesn't complain about missing attribute
780 msg
= (yield stream
.recv())
782 if _debug
: print 'stream closed'
783 self
.closehandler(stream
)
786 multitask
.add(self
.streamhandler(stream
, msg
))
788 def streamhandler(self
, stream
, message
):
789 '''A generator to handle a single message on the stream.'''
791 if message
.type == Message
.RPC
:
792 cmd
= Command
.fromMessage(message
)
793 if _debug
: print 'streamhandler received cmd=', cmd
794 if cmd
.name
== 'publish':
795 yield self
.publishhandler(stream
, cmd
)
796 elif cmd
.name
== 'play':
797 yield self
.playhandler(stream
, cmd
)
798 elif cmd
.name
== 'closeStream':
799 self
.closehandler(stream
)
800 else: # audio or video message
801 yield self
.mediahandler(stream
, message
)
802 except GeneratorExit
: pass
803 except StopIteration: pass
805 if _debug
: print 'exception in streamhandler', (sys
and sys
.exc_info())
807 def publishhandler(self
, stream
, cmd
):
808 '''A new stream is published. Store the information in the application instance.'''
810 stream
.mode
= 'live' if len(cmd
.args
) < 2 else cmd
.args
[1] # live, record, append
811 stream
.name
= cmd
.args
[0]
812 if _debug
: print 'publishing stream=', stream
.name
, 'mode=', stream
.mode
813 inst
= self
.clients
[stream
.client
.path
][0]
814 if (stream
.name
in inst
.publishers
):
815 raise ValueError, 'Stream name already in use'
816 inst
.publishers
[stream
.name
] = stream
# store the client for publisher
817 result
= inst
.onPublish(stream
.client
, stream
)
819 if stream
.mode
== 'record' or stream
.mode
== 'append':
820 stream
.recordfile
= FLV().open(getfilename(stream
.client
.path
, stream
.name
, self
.root
), stream
.mode
)
821 response
= Command(name
='onStatus', id=cmd
.id, args
=[dict(level
='status', code
='NetStream.Publish.Start', description
='', details
=None)])
822 yield stream
.send(response
)
823 except ValueError, E
: # some error occurred. inform the app.
824 if _debug
: print 'error in publishing stream', str(E
)
825 response
= Command(name
='onStatus', id=cmd
.id, args
=[dict(level
='error',code
='NetStream.Publish.BadName',description
=str(E
),details
=None)])
826 yield stream
.send(response
)
828 def playhandler(self
, stream
, cmd
):
829 '''A new stream is being played. Just updated the players list with this stream.'''
830 inst
= self
.clients
[stream
.client
.path
][0]
831 name
= stream
.name
= cmd
.args
[0] # store the stream's name
832 start
= cmd
.args
[1] if len(cmd
.args
) >= 2 else -2
833 if name
not in inst
.players
:
834 inst
.players
[name
] = [] # initialize the players for this stream name
835 if stream
not in inst
.players
[name
]: # store the stream as players of this name
836 inst
.players
[name
].append(stream
)
837 path
= getfilename(stream
.client
.path
, stream
.name
, self
.root
)
838 if os
.path
.exists(path
):
839 stream
.playfile
= FLV().open(path
)
840 multitask
.add(stream
.playfile
.reader(stream
))
841 if _debug
: print 'playing stream=', name
, 'start=', start
842 result
= inst
.onPlay(stream
.client
, stream
)
843 response
= Command(name
='onStatus', id=cmd
.id, args
=[dict(level
='status',code
='NetStream.Play.Start', description
=stream
.name
, details
=None)])
844 yield stream
.send(response
)
846 def mediahandler(self
, stream
, message
):
847 '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
848 if stream
.client
is not None:
849 inst
= self
.clients
[stream
.client
.path
][0]
850 result
= inst
.onPublishData(stream
.client
, stream
, message
)
852 client
= stream
.client
853 for s
in (inst
.players
.get(stream
.name
, [])):
854 # if _debug: print 'D', stream.name, s.name
855 result
= inst
.onPlayData(s
.client
, s
, message
)
857 yield s
.send(message
)
858 if stream
.recordfile
is not None:
859 stream
.recordfile
.write(message
)
861 # The main routine to start, run and stop the service
862 if __name__
== '__main__':
864 from optparse
import OptionParser
865 parser
= OptionParser()
866 parser
.add_option('-i', '--host', dest
='host', default
='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
867 parser
.add_option('-p', '--port', dest
='port', default
=8080, type="int", help='listening port number. Default 8080')
868 parser
.add_option('-r', '--root', dest
='root', default
='./', help="document root directory. Default './'")
869 parser
.add_option('-d', '--verbose', dest
='verbose', default
=False, action
='store_true', help='enable debug trace')
870 (options
, args
) = parser
.parse_args()
872 _debug
= options
.verbose
874 if _debug
: print time
.asctime(), 'Options - %s:%d' % (options
.host
, options
.port
)
876 agent
.root
= options
.root
877 agent
.start(options
.host
, options
.port
)
878 if _debug
: print time
.asctime(), 'Flash Server Starts - %s:%d' % (options
.host
, options
.port
)
880 except KeyboardInterrupt:
881 print traceback
.print_exc()
885 print traceback
.print_exc()
886 if _debug
: print time
.asctime(), 'Flash Server Stops'