# Copyright (C) The Dojo Foundation 2006-2007
# All rights reserved.
#
# Distributed under the terms of the BSD License
import twisted
import twisted.web2
from twisted.web2 import http, resource, channel, stream, server, static, http_headers, responsecode
from twisted.python import log
from twisted.internet import reactor
from twisted.application import service, strports
from path import path
import re
import os
import md5
import time
import simplejson
import string
import base64
import types
import weakref
"""
The cometd modules provides a twisted.web2.resource.Resource endpoint which
acts as a generalized multi-endpoint event router that speaks the cometd JSON
message format and protocol on the wire.
"""
# constants and configuration
verbose = True
# auto-generated configuration
tmp = md5.new()
tmp.update(str(time.ctime()))
mimeBoundary = tmp.hexdigest()
errorHttpCode = 406
# FIXME: implement advices!
# FIXME: need to implement resource constraints
# FIXME: these should probably be pulled from a file or directory containing
# files and read at server startup. Hard-coding them here is fine for
# development but just won't do for deployment.
ConnectionTypes = {
"iframe": {
"closeOnDelivery": False,
"preamble": """
cometd: The Long Tail of Comet
""",
"envelope": """
""" + (" " * 2048), # double this if it's not working
"keepalive": "
" + (" " * 2048),
"signoff": """
""",
# this tunnelInit is borrowed from Dojo
"tunnelInit": """
cometd: The Long Tail of Comet
cometd: The Long Tail of Comet
""",
"contentType": "text/html"
},
"callback-polling": {
# NOTE: the "callback-polling" method can be used via ScriptSrcIO for
# x-domain polling
"closeOnDelivery": True,
"preamble": "",
# "envelope": "cometd.deliver(%s);",
"envelope": "(%s)",
"keepalive": " ",
"signoff": "",
"tunnelInit": "",
"contentType": "text/javascript"
},
"long-polling": {
"closeOnDelivery": True,
"preamble": "",
"envelope": "%s",
"keepalive": "",
"signoff": "",
"tunnelInit": "",
"contentType": "text/plain"
},
"ie-message-block": {
"closeOnDelivery": False,
"preamble": """
""",
"envelope": "",
"keepalive": "",
"signoff": """
""",
"tunnelInit": "",
"contentType": "text/xml"
},
"mime-message-block": {
"closeOnDelivery": False,
"preamble": "--"+mimeBoundary+"\r\n",
"envelope": """Content-Type: text/plain\r\n\r\n
%s
\r\n
--"""+mimeBoundary+"\r\n",
"signoff": "\n--"+mimeBoundary+"--\n",
"tunnelInit": "",
"contentType": "multipart/x-mixed-replace; boundary=%s" % (mimeBoundary,)
},
# failed experiment
# "preamble": "",
# "envelope": "\n--"+mimeBoundary+"""
#Content-Type: text/plain\n\n
#%s
#
#""",
# --"""+mimeBoundary+"\n",
# "text-stream": {
# "closeOnDelivery": False,
# "preamble": "--"+mimeBoundary+"\r\n",
# "envelope": """\r\n%s\r\n--"""+mimeBoundary+"\r\n"+(1024*" "),
# "signoff": "\r\n--"+mimeBoundary+"--\r\n",
# "tunnelInit": "",
# "contentType": "text/plain"
# },
# "flash": {
# "closeOnDelivery": False,
# "preamble": "",
# "envelope": "",
# "signoff": "",
# "tunnelInit": "",
# "contentType": "text/xml"
# }
}
# need to specify http-polling for entirely disconnected clients!
SupportedConnectionTypes = [
"callback-polling", "long-polling",
"mime-message-block", "iframe",
# "ie-message-block", # doesn't really work?
]
def getIdStr(length=32):
if os.__dict__.has_key("urandom"): # python 2.4
return base64.encodestring(os.urandom(length))[:-1]
else: # python 2.3 on unix-like systems
return base64.encodestring(open("/dev/urandom").read(length))[:-1]
def getTimestamp():
# YYYY-MM-DDThh:mm:ss.ss
# FIXME: need to add accurate sub-second time-stamp data!
# NOTE: see http://feedparser.org/docs/date-parsing.html for background
# FIXME: should we add a "Z" at the end for RFC 3339 compat?
return time.strftime("%Y-%m-%dT%H:%M:%S.00", time.gmtime())
class Connection:
"""
The cometd Connecton class is responsible for a logical connection between
a client and a server. This is *NOT* implemented as a
twisted.internet.protocol.Protocol subclass due to the seeming magicness of
Protocol instances and the Factories that create/use them. Instead, the
Connection class manages one (or more) instances of stream.ProducerStream
instances which constitute output buffers for the connection.
The Connection class knows how to set up and tear down streams, register
and unregister with a connection registry, and communicate with the event
router.
"""
# an incrementing ID basis for connections
counter = 0
def __init__(self, request, message, client):
"""
Initializing a cometd Connection object creates a "stream" object
that all delivered events are passed down to. Encapsulating this allows
our Connection to handle multiple requests over its lifetime.
"""
# should be set or re-set depending on what type of request is coming in
self.client = client
self.endpointId = 0
# we fall back to polling if otherwise unspecified
self.connectionType = message["connectionType"]
if verbose:
log.msg("****************************************************")
log.msg("connectionType:", self.connectionType);
log.msg("****************************************************")
self.ctypeProps = ConnectionTypes[self.connectionType]
self.contentType = self.ctypeProps["contentType"]
self.backlog = []
self.stream = stream.ProducerStream()
self.id = getIdStr()
self.jsonp = False
self.jsonpCallback = None
self.initFromRequest(request, message)
def initFromRequest(self, request, message):
# if the /connect endpoint is our originator, we need to return with
# the correct header and initialization message
resp = {
"channel": "/meta/connect",
"successful": True,
"error": self.client.lastError,
"authToken": self.client.authToken,
"clientId": self.client.id,
"advice": {
"reconnect": "retry"
}
}
if verbose: log.msg(self.ctypeProps["preamble"])
self.stream.write(self.ctypeProps["preamble"])
if "jsonp" in request.args: # FIXME: hack!
self.jsonp = True
self.jsonpCallback = request.args["jsonp"][0]
self.deliver(resp)
def deliver(self, message=None):
# should this be using twisted.internet.reactor.callLater(seconds,
# callback) to actually preform the writes?
delivered = False
if message is not None:
self.backlog.append(message)
if not self.stream.closed:
if len(self.backlog):
delivered = True
# log.msg(
# self.ctypeProps["envelope"] % (
# simplejson.dumps(self.backlog[0]),
# )
# )
if self.jsonp: # FIXME: hack!
self.stream.write(self.jsonpCallback+"(")
self.stream.write(
self.ctypeProps["envelope"] % (
simplejson.dumps(self.backlog),
)
)
self.backlog = []
if self.jsonp:
self.stream.write(");")
if self.ctypeProps["closeOnDelivery"] and delivered is not False:
self.stream.finish()
def reopen(self, request, message):
if self.stream.closed:
if "jsonp" in request.args: # FIXME: hack!
self.jsonp = True
self.jsonpCallback = request.args["jsonp"][0]
self.stream = stream.ProducerStream()
self.deliver()
class Client:
def __init__(self, id=None, authSuccessful=False, authToken=None, lastError=""):
self.connection = None
self.id = id
if not self.id:
self.id = getIdStr()
self.authSuccessful = authSuccessful
self.authToken = authToken
self.lastError = lastError
def setConnection(self, conn):
self.connection = conn
def __del__(self):
del self.connection
def buildResponse(data, code=200, type="text/html", headers={}):
respStream = None
if isinstance(data, str):
respStream = stream.MemoryStream(data)
else:
respStream = data
parts = type.split("/", 1)
thead = http_headers.Headers()
thead.addRawHeader("Content-Type", type)
for name in headers:
thead.addRawHeader(name, headers[name])
return http.Response(code, stream=respStream, headers=thead)
class cometd(resource.PostableResource):
version = 1.0
minimumVersion = 1.0
def __init__(self, credChecker=None):
self.credChecker = credChecker
self.clients = {}
self.subscriptions = { }
# "__cometd_subscribers": {}
# FIXME: we need to implement client culling. Choices are between keeping a
# list of update times and associated clients, but that requires a
# per-message, per-client update to the state to remove the clients from
# the "below the line" state.
############################################################################
# UTILITY METHODS
############################################################################
############################################################################
# resource.Resource METHODS
############################################################################
def locateChild(self, request, segments):
# when we're reached, switch immediately to render mode
if verbose:
log.msg(request)
log.msg(segments)
return (self, server.StopTraversal)
def render(self, request):
"""
parse the request, dispatching it to the event router as necessaray and
returning errors where appropriate
"""
if verbose: log.msg("----------------------- render -----------------------------")
messages = None
resp = []
# we'll get called as the result of a post or get
if verbose: log.msg(request.args)
# if we get a tunnelInit request in the form of:
# http://blah.endpoint.com/cometd/?tunnelInit=iframe&domain=endpoint.com
# just pass back a MemoryStream that has the right junk
if request.args.has_key("tunnelInit") and \
ConnectionTypes.has_key(request.args["tunnelInit"][0]):
resp = ConnectionTypes[request.args["tunnelInit"][0]]["tunnelInit"]
# log.msg(resp)
# FIXME: should we be getting the content type from the
# ConnectionTypes object?
return buildResponse(resp)
# otherwise if we got a "message" parameter, deserialize it
if request.args.has_key("message"):
try:
# log.msg(request.args["message"][0])
messages = simplejson.loads(request.args["message"][0])
except ValueError:
if verbose: log.msg("message parsing error")
return buildResponse("message not valid JSON", errorHttpCode, "text/plain")
else:
return buildResponse("no message provided. Please pass a message parameter to cometd", 400)
ctr = 0
if verbose: log.msg("messages:", type(messages), ":", simplejson.dumps(messages))
while len(messages):
if verbose: log.msg(len(messages))
m = messages.pop(0)
if verbose:
log.msg("tmp message:", type(m), ":", simplejson.dumps(m))
if not isinstance(m, dict):
continue
"""
if isinstance(m, types.StringTypes):
log.msg(m)
break
if isinstance(m, types.NoneType):
continue
"""
if not m.has_key("channel"):
resp.append({"error":"invalid message passed"})
break
# continue
chan = m["channel"]
if chan == "/meta/handshake" and ctr == 0:
# looks like we'll need to create a Connection
return self.initHandshake(request, m)
elif chan == "/meta/connect" and ctr == 0:
# finish connection initialization!
return self.connect(request, m)
elif chan == "/meta/reconnect" and ctr == 0: # FIXME: legacy!
return self.connect(request, m)
elif chan == "/meta/subscribe":
resp.append(self.subscribe(request, m))
elif chan == "/meta/unsubscribe":
resp.append(self.unsubscribe(request, m))
else:
# otherwise we're publishing. Route the message to listeners
resp.append(self.route(request, m))
# FIXME: implement /meta/ping and /meta/status !!
ctr += 1
# FIXME:
# need to determine here if/how we should be delivering back on
# an open connection channel if one was pre-existing
return buildResponse(simplejson.dumps(resp), type="text/plain")
############################################################################
# PROTOCOL METHODS
############################################################################
def initHandshake(self, request, message):
# handle initial auth and create a Connection that subsequent
# /meta/connect messages can talk to
# tell the client what we can and can't support:
# FIXME: is there a way to keep from re-defining/copying this data
# structure?
resp = {
"channel": "/meta/handshake",
"version": self.version,
"minimumVersion": self.minimumVersion,
"supportedConnectionTypes": SupportedConnectionTypes
}
client = self.checkHandshakeAuth(request, message)
resp["clientId"] = client.id
resp["successful"] = client.authSuccessful
resp["authToken"] = client.authToken
resp["error"] = client.lastError
if message["id"] is not None:
resp["id"] = message["id"]
rstr = simplejson.dumps([ resp ])
if verbose: log.msg("initHandshake response:", rstr)
# accomidation for JSONP handshakes
if "jsonp" in request.args:
rstr = request.args["jsonp"][0]+"("+rstr+");"
return buildResponse(rstr, type="text/plain")
# FIXME: should we look into using twisted.cred here to handle auth types?
def checkHandshakeAuth(self, request, message):
(success, token, error) = self.checkCredentials(request, message)
# FIXME: mst suggests using something like md5(clientIP, localMAC, time) instead
if success:
client = Client(authSuccessful=success, authToken=token, lastError=error)
self.clients[client.id] = client
return client
def checkCredentials(self, request, message):
# return's a tuple with the form:
# (success, token, error)
#
# FIXME: plug in auth check here!
if self.credChecker is not None:
return self.credChecker.checkCredentials(request, message)
return (True, None, None)
def _checkClient(self, request, message):
if "clientId" not in message or \
message["clientId"] not in self.clients:
return False
else:
return True
def _sanityCheckConnection(self, request, message):
isSane = True
errorResp = None
error = ""
# sanity check the connection request
if "connectionType" not in message or \
message["connectionType"] not in SupportedConnectionTypes:
isSane = False
error = "invalid connectionType requested"
elif not self._checkClient(request, message):
isSane = False
error = "invalid clientId provided"
# log.msg(message["clientId"])
if not isSane:
resp = simplejson.dumps({ "error": str(error) })
errorResp = buildResponse(resp, 500, "text/plain")
return (isSane, errorResp)
def connect(self, request, message):
"""
Create a new connection object for the client if one does not already
exist. Otherwise, handles wait state for existing connection.
"""
(isSane, errorResp) = self._sanityCheckConnection(request, message)
if not isSane: return errorResp
clientId = message["clientId"]
client = self.clients[clientId]
( client.authSuccessful,
client.authToken,
client.lastError ) = self.checkCredentials(request, message)
if not client.authSuccessful:
# auth failure, nuke the client from the list
del self.clients[clientId]
resp = simplejson.dumps({ "error": client.error })
if verbose:
log.msg(resp)
return buildResponse(resp, 500, "text/plain")
if client.connection is not None:
# if verbose:
# log.msg("from a reconnect!")
client.connection.reopen(request, message)
else:
# if the request is sane and valid, set up a new Connection object
# which will initiate the response and manage it from here on out
conn = Connection(request, message, client)
client.setConnection(conn)
return buildResponse(client.connection.stream, type=client.connection.contentType)
def subscribe(self, request, message):
# get the client and the channel here
# self._subscribe()
if not self._checkClient(request, message):
# FIXME: we should probably send advice here instead of just raw failure
resp = { "error": "invalid clientId provided" }
if verbose:
log.msg(simplejson.dumps(resp))
return resp
client = self.clients[message["clientId"]]
if verbose: log.msg(client)
# FIXME: need to switch to using globbing subscribe!
# self._subscribe(client, message["subscription"])
self._globbing_subscribe(client, message["subscription"])
# FIXME: hoist template object to top level to avoid redef
resp = {
"channel": "/meta/subscribe",
"subscription": message["channel"],
"successful": True
# "authToken": "SOME_NONCE"
}
if message["id"] is not None:
resp["id"] = message["id"]
# FIXME: should we be calling client.deliver and having *that* dispatch
# down to the correct connection object?
client.connection.deliver(resp)
# return { "successful": True }
return resp
def unsubscribe(self, request, message):
if not self._checkClient(request, message):
resp = { "error": "invalid clientId provided" }
return resp
client = self.clients[message["clientId"]]
if verbose: log.msg(client)
self._globbing_unsubscribe(client, message["subscription"])
# FIXME: hoist template object to top level to avoid redef
resp = {
"channel": "/meta/unsubscribe",
"subscription": message["channel"],
"successful": True
}
if message["id"] is not None:
resp["id"] = message["id"]
# log.msg(simplejson.dumps(resp))
client.connection.deliver(resp)
# return { "successful": True }
return resp
def route(self, request, message):
return self._globbing_route(request, message)
def _globbing_unsubscribe(self, client, chan):
"remove a subscription"
cparts = chan.split("/")[1:]
if verbose: log.msg(cparts)
root = self.subscriptions
for part in cparts: # create parts of the topic tree that don't yet exist
if not part in root:
return
root = root[part]
del root["__cometd_subscribers"][client.id]
def _globbing_subscribe(self, client, chan):
"set up a subscription"
# the channels data structure is a tree, relying on the speed of Python
# dictionary lookups to quickly return a list of interested clients.
# NOTE: while we do "glob", it's only on terminal path components
cparts = chan.split("/")[1:]
if verbose: log.msg(cparts)
root = self.subscriptions
for part in cparts: # create parts of the topic tree that don't yet exist
if not part in root:
if verbose:
log.msg("creating part: ", part)
root[part] = { "__cometd_subscribers": None }
# root[part] = weakref.WeakValueDictionary()
root[part]["__cometd_subscribers"] = weakref.WeakValueDictionary()
root = root[part]
root["__cometd_subscribers"][client.id] = client
def _globbing_route(self, request, message):
"""
Event routing and delivery. The guts of cometd.
"""
success = {
"successful": True,
"channel": message["channel"]
}
if message["id"] is not None:
success["id"] = message["id"]
cparts = message["channel"].split("/")[1:]
if verbose: log.msg(cparts)
root = self.subscriptions
# FIXME: need to implement the "**" glob
for part in cparts: # FIXME: is iteration order garunteed?
if "*" in root:
# log.msg("delivering to wildcard subscribers")
subs = root["*"]["__cometd_subscribers"]
for client in subs:
if verbose: log.msg(client)
# FIXME: check for "openness"?
subs[client].connection.deliver(message)
if not part in root:
if verbose: log.msg("no part:", part, "matches for delivery")
return success
root = root[part]
subs = root["__cometd_subscribers"]
for client in subs:
subs[client].connection.deliver(message)
return success
# FIXME: aggregate ACKs!
###########################################################################
# old code, unused now
###########################################################################
def _subscribe(self, client, chan):
"set up a subscription"
# the channels data structure is a flat map, with each value being map of subsribed clients
# dictionary lookups to quickly return a list of interested clients.
# NOTE: we are not currently supporting the "*" glob operator in channels
cparts = chan.split("/")[1:]
if verbose: log.msg(cparts)
root = self.subscriptions
if not chan in root:
if verbose:
log.msg("creating channel: ", chan)
root[chan] = weakref.WeakValueDictionary()
root[chan][client.id] = client
def _flat_route(self, request, message):
"""
Event routing and delivery. The guts of cometd.
"""
if verbose: log.msg(message["channel"])
root = self.subscriptions
if root.has_key(message["channel"]):
subs = root[message["channel"]]
for client in subs:
subs[client].connection.deliver(message)
return { "successful": True }
# vim:ts=4:noet: