清单 6. Cometd 前端发布消息
dojox.cometd.publish("/service/echo", { msg: msg });
|
这里的所谓“发布消息”就是向后端发送消息,用于前端主动向后端推送。
这里的第一个参数是发送消息的渠道标识(channel),这种“channel”共有三种类型:
1. 元渠道(meta channels):示例“/meta/connect”(通常以“/meta/”为开头)。元渠道主要不是用来消息传输,而是用于客户端监听,如握手连 接或者网络连接等等的错误。通常我们会在客户端调用“addListener()”来开启监听元渠道,它可以在握手连接的建立之前就开启监听,而且这种消 息监听是同步的。
2. 服务渠道(service channels):示例“/service/connect”(通常以“/service/”为开头)。它主要用于私有消息通讯,主要是一对一的通讯。 通常我们会在客户端调用“subscribe()”来订阅服务渠道消息。服务渠道只有等握手连接建立好后才能开启,而且它是异步通讯的。
3. 普通渠道(normal channels):示例“/foo/bar”(无限制)。这种渠道没有什么限制,主要用于广播消息,即:多个客户端订阅了一个服务,该服务可以通过普通渠道进行消息广播。
渠道是通信的基础模式,我们可以根据需要选择相应的渠道模式。
第二个参数则是消息对象,这里的“msg”则是消息内容。
有一点要注意:这里的“publish”是基于 Bayeux 协议的,采用的异步消息传输机制,所以它是在服务端(Bayeux 服务器)收到消息之前就返回的。所以 publish 的返回并不代表服务端收到你 publish 的消息了。
Dojo 的 Cometd 还支持批量发送消息,通过这个接口可以有效地避免不必要的网络消息传输的浪费:
清单 7. Cometd 前端批量发布消息
// 方法 1
cometd.batch(function()
{
cometd.publish('/channel1', { product: 'foo' });
cometd.publish('/channel2', { notificationType: 'all' });
cometd.publish('/channel3', { update: false });
});
// 方法 2
cometd.startBatch()
cometd.publish('/channel1', { product: 'foo' });
cometd.publish('/channel2', { notificationType: 'all' });
cometd.publish('/channel3', { update: false });
cometd.endBatch()
|
上述两种方案都可以实现消息的批量发送,推荐使用方法 1。
接下来我们看看服务端的消息推送:
清单 8. Cometd 前端订阅消息
dojox.cometd.subscribe("/service/echo",echoRpcReturn);
function echoRpcReturn(msg){
dojo.byId("responses").innerHTML += msg;
}
|
这里所谓的“订阅消息”,其实就是接收服务端推送的消息,是后端主动向前端推送。这也是服务端推送的精华所在,同样也是很简单的一行代码。
这里我们看到了一个熟悉的方法 --- “subscribe”,之前我们已经介绍过了,它主要用于订阅服务渠道私有消息,这里就是它用法的一个示例。对应的服务端 Service 向对应的前端订阅者推送消息,这里就是通过“echo”渠道向前端推送消息,他会回调“echoRpcReturn”方法,并传入推送的消息作为实参。对 于后端的每次推送,都会调用前端的“echoRpcReturn”方法。
Dojo 的 Cometd 工具包之后端
Dojo 的 Cometd 工具包的后端实现是基于 Java 和 Jetty 组件的,通过 Dojo 的服务端 Cometd 组件,我们同样能极其迅速的构建 Cometd 框架。我们需要做的仅仅是加入我们的业务逻辑代码即可。
先来看看 web.xml 的配置参数:
清单 9. 基本配置参数(web.xml)
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
version="2.5">
<servlet>
<servlet-name>cometd</servlet-name>
<servlet-class>
org.cometd.server.continuation.ContinuationCometdServlet
</servlet-class>
<init-param>
<param-name>timeout</param-name>
<param-value>60000</param-value>
</init-param>
</servlet>
<servlet-mapping>
<servlet-name>cometd</servlet-name>
<url-pattern>/cometd/*</url-pattern>
</servlet-mapping>
<filter>
<filter-name>cross-origin</filter-name>
<filter-class>org.eclipse.jetty.servlets.CrossOriginFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>cross-origin</filter-name>
<url-pattern>/cometd/*</url-pattern>
</filter-mapping>
</web-app>
|
这里我们先来看看“ContinuationCometdServlet”,这个 Servlet 主要用于解释 Bayeux 协议,所以关于它的配置是必须的。基于“ContinuationCometdServlet”的其他配置参数还有很多,如:
Timeout:长轮询的过期时间。如果超过这个时间还没有客户端消息,服务端会推送一个空消息。
Interval:轮询间隔时间。客户端结束前一个请求到发送下一个请求之间的间隔时间。
maxInterval:服务端最长等待时间。即:建立连接时,如果超过这个时间仍没有接到一个新的长轮询连接请求,服务端就会认为该客户端无效或者关闭了。
logLevel:日志级别。“0 = warn, 1 = info, 2 = debug”。
以上是主要的配置参数,其余的配置参数还有很多,这里不一一介绍,有需要的读者可以查阅 Dojo 的帮助文档。另外,最后几行我们还配置了一个“cross-origin”,对应着“CrossOriginFilter”类,他用于支持跨域的 JavaScript 请求,如果您的项目中要支持跨域的服务器推送,请加入该配置。
接下来我们再来看看一些高级配置参数:
清单 10. 高级配置参数(web.xml)
<servlet>
<servlet-name>cometd</servlet-name>
<servlet-class>org.cometd.java.annotation.AnnotationCometdServlet</servlet-class>
<init-param>
<param-name>logLevel</param-name>
<param-value>1</param-value>
</init-param>
<init-param>
<param-name>services</param-name>
<param-value>org.cometd.examples.ChatService</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>cometd</servlet-name>
<url-pattern>/cometd/*</url-pattern>
</servlet-mapping>
<servlet>
<servlet-name>cometdDemo</servlet-name>
<servlet-class>org.cometd.examples.CometdDemoServlet</servlet-class>
<load-on-startup>2</load-on-startup>
</servlet>
|
这里我们主要要注意三个地方:
1. “CometdDemoServlet”:它是用于启动服务端 Cometd 框架的 Servlet,我们在后面会介绍。由于他配置了“load-on-startup”参数,所以在服务容器启动的时候,我们的 Cometd 服务端就已经搭建好了,之后我们会着重介绍他的“init”方法中的行为。
2. “AnnotationCometdServlet”:这个 Servlet 配置在这里表示了我们在服务端代码是基于 annotation 的。这是一个非常实用的 Servlet,通过这个 Servlet,你会发现,我们要做的事情仅仅是定义几个 Service 类,实现其中的几个方法即可。连很多调用 Cometd 框架 API 接口的代码都省去了。
3. “ChatService”:这里声明了一个 Service 类,他的用途是处理服务渠道的消息。这里声明的作用等同于代码中的“processor.process(new ChatService())”。
配置完成后,我们接下来可以看看代码了。通过以上的配置之后,你会发现,我们接下来要写的代码非常简单精炼:
清单 11. 服务类初始化 init
public void init() throws ServletException
{
final BayeuxServerImpl bayeux =
(BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);
if (bayeux==null)
throw new UnavailableException("No BayeuxServer!");
.................
// 创建扩展点
bayeux.addExtension(new TimesyncExtension());
bayeux.addExtension(new AcknowledgedMessagesExtension());
// 设定握手连接权限
bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(
GrantAuthorizer.GRANT_PUBLISH);
// 启动服务渠道
ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);
processor.process(new EchoRPC());
processor.process(new Monitor());
//processor.process(new ChatService());
bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer()
{
public void configureChannel(ConfigurableServerChannel channel)
{
channel.setPersistent(true);
}
});
if (bayeux.getLogger().isDebugEnabled())
System.err.println(bayeux.dump());
.................
}
|
这里我们介绍三个知识点:
1. Extension:Extension 是一个函数,它会在消息发出之前或者收到之后被调用,专门用来修改消息内容,例如加入一些特殊属性(这些属性多在消息的 ext 属性中)。注意,这些属性大多是应用无关的,如记录长轮询的次数等等。这里的“TimesyncExtension”和“AcknowledgedMessagesExtension”是两个比较常用的 Extension:
-
1) “Timesync Extension”用于计算客户端事件和服务端时间的偏差。客户端需要同时引入“dojox.cometd.timesync”类,该 Extension 使得客户端和服务端在每次握手或者连接的时候能够互相交换各自的时钟信息,这也是的客户端可以很精确的计算出他与服务端时钟的偏移量。消息格式如下:
{ext:{timesync:{tc:12345567890,ts:1234567900,p:123,a:3},...},...}
TC:客户端发消息的时间(距离 1970 年 1 月号的时长,单位为毫秒)
TS:服务端收到消息的时间
-
2) “Acknowledge Extension”用于提供可靠的顺序消息机制。一旦加入了“Acknowledge Extension”,服务端会阻截非长轮询的客户端请求,这样会使你的服务器更加的高效。注意:客户端需要同时引入“dojox.cometd.ack”类与其协同工作。
2. Authorizer:设定握手连接权限,这里设定值为“GrantAuthorizer.GRANT_PUBLISH”,表示允许所有客户端建立握手连接。
3. Process Service:启动服务渠道“processor.process(new EchoRPC())”。通过这些服务渠道类,我们可以启动服务渠道处理客户端请求。这是我们服务端推送技术的关键所在,我们的业务逻辑代码也是主要放在 这些服务渠道类里面。
接下来我们来看看这些服务渠道类的具体实现:
清单 12. Echo Service 实现
@Service("echo")
public static class EchoRPC
{
@Session
private ServerSession _session;
@SuppressWarnings("unused")
@Configure("/service/echo")
private void configureEcho(ConfigurableServerChannel channel)
{
channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH);
}
@Listener("/service/echo")
public void doEcho(ServerSession session, ServerMessage message)
{
Map<String,Object> data = message.getDataAsMap();
Log.info("ECHO from "+session+" "+data);
for(int i = 0; i < 50; i++){
session.deliver(_session, message.getChannel(), data, null);
}
}
}
|
我们可以在“configureEcho”里面设定该服务渠道支持的权限。我们主要来看看“doEcho”方法,它被标识为“@Listener("/service/echo")”,所以它可以用于像客户端推送服务渠道为“echo”的消息,我们之前客户端代码示例里面的如 下代码:“dojox.cometd.subscribe("/service/echo",echoRpcReturn)”就是专门用于处理这里服务渠 道推送的消息,消息推送通过“deliver”方法,推送的消息信息放在“data”实参中。
再来看看 Monitor 类:
清单 13. Monitor Service 实现
@Service("monitor")
public static class Monitor
{
@Listener("/meta/subscribe")
public void monitorSubscribe(ServerSession session, ServerMessage message)
{
Log.info("Monitored Subscribe from "+session+" for "
+message.get(Message.SUBSCRIPTION_FIELD));
}
@Listener("/meta/unsubscribe")
public void monitorUnsubscribe(ServerSession session, ServerMessage message)
{
Log.info("Monitored Unsubscribe from "+session+" for "
+message.get(Message.SUBSCRIPTION_FIELD));
}
@Listener("/meta/*")
public void monitorMeta(ServerSession session, ServerMessage message)
{
if (Log.isDebugEnabled())
Log.debug(message.toString());
}
}
|
Monitor 渠道类与之前的 Echo 服务渠道类比较类似,不过它主要用于处理 meta 渠道,与业务逻辑无关。
最后,我们来看看被注释掉的“ChatService”类,他也可以通过“processor.process(new ChatService())”来启用,但是我们这里用了一个更为简单的方法:直接配置在 web.xml 文件中:
清单 14. ChatService 的配置
<servlet>
...............
<init-param>
<param-name>services</param-name>
<param-value>org.cometd.examples.ChatService</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
|
细心的读者可能在之前的代码示例中已经看到,这里就是通过配置的方式加载服务渠道类。参考以下具体实现的代码:
清单 15. ChatService 实现
@Service("chat")
public class ChatService
{
..........................................
@Listener("/service/members")
public void handleMembership(ServerSession client, ServerMessage message)
{
Map<String, Object> data = message.getDataAsMap();
final String room = ((String)data.get("room")).substring("/chat/".length());
Map<String, String> roomMembers = _members.get(room);
if (roomMembers == null)
{
Map<String, String> new_room = new ConcurrentHashMap<String, String>();
roomMembers = _members.putIfAbsent(room, new_room);
if (roomMembers == null) roomMembers = new_room;
}
final Map<String, String> members = roomMembers;
String userName = (String)data.get("user");
members.put(userName, client.getId());
client.addListener(new ServerSession.RemoveListener()
{
public void removed(ServerSession session, boolean timeout)
{
members.values().remove(session.getId());
broadcastMembers(room,members.keySet());
}
});
broadcastMembers(room,members.keySet());
}
private void broadcastMembers(String room, Set<String> members)
{
// Broadcast the new members list
ClientSessionChannel channel =
_session.getLocalSession().getChannel("/members/"+room);
channel.publish(members);
}
..........................................
@Listener("/service/privatechat")
protected void privateChat(ServerSession client, ServerMessage message)
{
Map<String,Object> data = message.getDataAsMap();
String room = ((String)data.get("room")).substring("/chat/".length());
Map<String, String> membersMap = _members.get(room);
if (membersMap==null)
{
Map<String,String>new_room=new ConcurrentHashMap<String, String>();
membersMap=_members.putIfAbsent(room,new_room);
if (membersMap==null)
membersMap=new_room;
}
String[] peerNames = ((String)data.get("peer")).split(",");
ArrayList<ServerSession> peers = new ArrayList<ServerSession>(peerNames.length);
.................
}
}
|
以上是摘录部分 ChatService 实现代码,它主要是实现一个在线的聊天室,包括公开发言和私有(1 对 1)聊天等等功能,它的实现方式与之前的 Echo 和 Monitor 类似,这里不做详述,有兴趣的读者可以参考一下他的实现,来构造自己的服务器推送应用。
服务器推送技术之比较
其实有很多种方式实现服务器推送,它们各有各的优缺点:
- 传统轮询:此方法是利用 HTML 里面 meta 标签的刷新功能,在一定时间间隔后进行页面的转载,以此循环往复。它的最大缺点就是页面刷性给人带来的体验很差,而且服务器的压力也会比较大。
- Ajax 轮询:异步响应机制,即通过不间断的客户端 Ajax 请求,去发现服务端的变化。这种方式由于是客户端主动连接的,所以会有一定程度的延时,并且服务器的压力也不小。
- 长连接:这也是我们之前所介绍的一种方式。由于它是利用客户端的现有连接实现服务器主动向客户端推送信息,所以延时的情况很少,并且由于服务端的可操控性 使得服务器的压力也迅速减小。其实这种技术还有其他的实现方式,通过 Iframe,在页面上嵌入一个隐藏帧(Iframe),将其“src”属性指向一个长连接的请求,这样一来,服务端就能够源源不断的向客户端发送数据。 这种方式的不足就在于:它会造成浏览器的进度栏一直显示没有加载完成,当然我们可以通过 Google 的一个称为“htmlfile”的 ActiveX 控件解决,但是毕竟他需要安装 ActiveX 控件,对于终端用户也是不合适的。
- 套接字:可以利用 Flash 的 XMLSocket 类或者 Java 的 Applet 来建立 Socket 连接,实现全双工的服务器推送,然后通过 Flash 或者 Applet 与 JavaScript 通信的接口来实现最终的数据推送。但是这种方式需要 Flash 或者 JVM 的支持,同样不太合适于终端用户。
- HTML5 的 WebSocket:这种方式其实与套接字一样,但是这里需要单独强调一下:它是不需要用户而外安装任何插件的。HTML5 提供了一个 WebSocket 的 JavaScript 接口,可以直接与服务端建立 Socket 连接,实现全双工通信,这种方式的服务器推送就是完全意义上的服务器推送了,没有半点模拟的成分,只是现阶段支持 HTML5 的浏览器并不多,而且一般老版本的各种浏览器基本都不支持。不过 HTML5 是一套非常好的标准,在将来,当 HTML5 流行起来以后将是我们实现服务器推送技术的不二选择。
结束语
这篇文章介绍了 Dojo 中的服务器推送 Cometd 工具包。基于服务器推送的理念,介绍了 Bayeux 协议的核心思想,并结合一个简单示例介绍了服务器推送的基本实现。随后,本着快速建立服务器推送框架的想法,介绍了 Dojo 的 Cometd 工具包,并分别从客户端接口和服务端接口两个方面分别介绍了 Dojo 的服务器推送框架的搭建和实现原理。最后,通过一些简单的示例展示了基于服务端推送的业务逻辑的具体实现。服务端推送技术具有很强的实用性,希望广大读者 在开发自己的项目的过程中多关注一下,以尽可能多的完善自己的 Web 应用。 |