长连线貌似是一个很高深莫测的知识,但是只要你做直播、IM、游戏、弹幕里面的任何一种,或者是你的app想要实时的接收某些讯息,你就会要接触到长连线技术。本文主要教你如何在客户端如何使用Socket实现长连线。
Socket背景知识
要做长连线的话,是不能用http协议来做的,因为http协议已经是应用层协议了,并且http协议是无状态的,而我们要做长连线,肯定是需要在应用层封装自己的业务,所以就需要基于TCP协议来做,而基于TCP协议的话,就要用到Socket了。
Socket是java针对tcp层通讯封装的一套网络方案TCP协议我们知道,是基于ip(或者域名)和埠对指定机器进行的点对点访问,他的连线成功有两个条件,就是对方ip可以到达和埠是开放的Socket能帮完成TCP三次握手,而应用层的头部资讯需要自己去解析,也就是说,自己要制定好协议,并且要去解析bytehttp也有长连线。在http1.0的时候,使用的是短连线,也就是说,每次请求一次资料,都要重新建立连线。但是从http1.1之后,我们看到头部会有一个
Connection:keep-alive
这个表示tcp连线建立之后不会马上销毁,而是储存一段时间,在这段时间内如果需要请求改网站的其他资料,都是使用这个连线来完成传输的。
Socket使用方式
Socket看上去不是很好用,因为他是基于java.io来实现的,你要直接跟InputStream和OutputStream打交道,也就是直接跟byte[]打交道,所以用起来并不是这么友好。
下面通过一个简单的例子,往一台服务器发 这一串字节,服务器也返回相同的字节流,上程式码:
@Test
public void testSocket() throws Exception {
logger.debug("start");
Socket socket = new Socket();
socket.connect(address);
byte[] output = new byte[]{(byte) 1, (byte) 0, (byte) 0, (byte) 0, (byte) 0};
socket.getOutputStream().write(output);
byte[] input = new byte[64];
int readByte = socket.getInputStream().read(input);
logger.debug("readByte " + readByte);
for (int i = 0; i logger.debug("read [" + i + "]:" + input[i]);
}
socket.close();
}
输出:
11:40:40.326 [main] DEBUG com.roy.test.SocketTest - start
11:40:40.345 [main] DEBUG com.roy.test.SocketTest - readByte 5
11:40:40.345 [main] DEBUG com.roy.test.SocketTest - read 1
11:40:40.345 [main] DEBUG com.roy.test.SocketTest - read 0
11:40:40.345 [main] DEBUG com.roy.test.SocketTest - read 0
11:40:40.345 [main] DEBUG com.roy.test.SocketTest - read 0
11:40:40.345 [main] DEBUG com.roy.test.SocketTest - read 0
看出来写起来还是比较麻烦的,主要就是InputStream, OutputStream 和byte[]使用起来太不方便了。
SocketChannel blocking
Socket为了优化自己的封装和并发效能,推出了nio包下面的SocketChannel,这个相比于Socket的好处就是并发效能的提高和封装的优化了。
SocketChannel有两种方式——阻塞和非阻塞的,阻塞的用法和Socket差不多,都是在read和write的时候会阻塞执行绪,下面用一段程式码来实现相同的功能。
@Test
public void testSocketChannelBlock() throws Exception {
final SocketChannel channel = SocketChannel.open(address);
ByteBuffer output = ByteBuffer.allocate(5);
output.put((byte) 1);
output.putInt(0);
output.flip();
channel.write(output);
logger.debug("write complete, start read");
ByteBuffer input = ByteBuffer.allocate(5);
int readByte = channel.read(input);
logger.debug("readByte " + readByte);
input.flip();
if (readByte == -1) {
logger.debug("readByte == -1, return!");
return;
}
for (int i = 0; i logger.debug("read [" + i + "]:" + input.get());
}
}
log 输出:
23:24:34.684 [main] DEBUG com.dz.test.SocketTest - write complete, start read
23:24:34.901 [main] DEBUG com.dz.test.SocketTest - readByte 5
23:24:34.901 [main] DEBUG com.dz.test.SocketTest - read [0]:1
23:24:34.901 [main] DEBUG com.dz.test.SocketTest - read [1]:0
23:24:34.901 [main] DEBUG com.dz.test.SocketTest - read [2]:0
23:24:34.901 [main] DEBUG com.dz.test.SocketTest - read [3]:0
23:24:34.901 [main] DEBUG com.dz.test.SocketTest - read [4]:0
从上面的。封装优化主要体现在ByteBuffer,IntBuffer这一系列类的封装——因为是网络相关的,所以这里用到的主要是ByteBuffer。
ByteBuffer和byte[]最大的区别,就是ByteBuffer可以很方便的读取int, long等资料型别,他提供了getInt(), getInt(int offset)这样的方法,这种方法主要用在识别头部资料部分,因为头部资料一般都是由多种资料型别组成,比方说表示资料格式的contentType:String,表示长度的length:int等等,这些就是getInt()这样的方法主要的应用场景,而byte[]如果要取int,String相对来说就要复杂一些了,这是java.nio相比于java.io优势的一点。
这里需要说明一个比较坑的点,就是ByteBuffer.flip()这个方法,这个方法的作用主要是重置索引,在write()之前和read()之后呼叫,否则会因为索引不对,导致你的资料写不进去,读不出来。
ByteBuffer是一个功能强大的类,因为本文主要是讲Socket和SocketChannel,所以在这里就不做过多描述。具体ByteBuffer的详细介绍,可以参考:Java NIO系列教程(三) Buffer
而nio相比于io最大的优势还是在于并发效能,因为nio里面的n代表的就是non-blocking的意思,上面那个读取资料的程式码也相对老旧,一般我们如果要用SocketChannel,都是用non-blocking的方式来实现的,而如果要用non-blocking模式,首先要介绍的就是Selector。
Selector
我们知道,传统io是阻塞的,也就是说,一个执行绪只能处理一个io流,也就是一个Socket。有了Selector之后,一个执行绪就能处理多个SocketChannel。
Selector的原理是,他能接受多个SocketChannel,然后不断的遍历每一个Channel的状态,如果有Channel已经ready了,他就能通过他自身提供的方法,通知到执行绪,让执行绪去处理对应的业务。流程图如下:

Selector工作流程
更加详细的介绍,可以参考Java NIO系列教程(六) Selector
Netty对nio这一套有比较好的封装,里面就涉及到了Selector,具体可以参考Netty入门教程
Selector有三个重要方法,在这里对几个重要的点做一下补充说明:
selector.register(SocketChannel channel, int ops),该方法的第二个引数,代表这个SocketChannel向Selector注册哪些操作,如果这些操作一旦在这个channel上发生,Selector.select()的返回值会>0。register方法是阻塞的,他和isBlocking(), configureBlocking()共享同一个锁regLock。而这个方法的第二个引数,有四种取值:SelectionKey.OP_CONNECTSelectionKey.OP_ACCEPTSelectionKey.OP_READSelectionKey.OP_WRITE如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连线起来,如下:int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
selector.selectNow()是Selector的重要方法之一,类似的方法还有select()和select(int timeout),三个方法的含义是一样的,都是看现在有没有selectedKey是ready状态,返回的已经ready的selectedKey数目。不同的是,select()和select(int timeout)方法是阻塞的,而selectNow()是非阻塞的,不管有没有ready的selectedKey,他都会立即返回,所以如果你的执行绪不仅仅只做读操作、且写操作的实时性不要求这么实时的话,可以将读写都放在一个执行绪里面进行,读的话就用selectNow()。selector.selectedKeys(),返回已经ready的selectedKey,通过selectedKey可以拿到对应的SocketChannel,然后进行对应的读或者写操作。 需要注意的是,这个方法执行完成之后,如果不删除key,那么下一次呼叫selectedKeys(),会发现上一次的key还会在里面,因为Selector是不会帮你自动清理你处理过的key的,所以需要在处理结束之后清理掉已经处理的selectedKey。SocketChannel non-blocking
介绍完Selector之后,我们就可以介绍SocketChannel的非阻塞模式了,这里我对照程式码来讲SocketChannel非阻塞模式的使用方式:
@Test
public void testSocketChannelConcurrent() throws Exception {
final SocketChannel channel = SocketChannel.open(address);
Selector selector = Selector.open();
// 设定channel为非阻塞模式
channel.configureBlocking(false);
// 想Selector注册read讯息,一旦可以read了,就会通知到SocketChannel
channel.register(selector, SelectionKey.OP_READ);
ByteBuffer output = ByteBuffer.allocate(5);
output.put((byte) 1);
output.putInt(0);
// 写完资料之后记得呼叫flip(),否则索引不对,资料会写不进去
output.flip();
channel.write(output);
logger.debug("write complete, start read");
while (true) {
// selectNow()是立即返回结果的
if (selector.selectNow() > 0) {
// 拿到所有已经Ready的keys,通过keys可以拿到对应的channel
Set selectionKeys = selector.selectedKeys();
logger.debug("start iterator keys, size:{}", selectionKeys.size());
Iterator keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
logger.debug("key.isReadable(), startRead");
ByteBuffer input = ByteBuffer.allocate(64);
// 通过key.channel()可以拿到channel,需要进行下一步操作的话需要做一下强制转换
SocketChannel keyChannel = (SocketChannel) key.channel();
int readByte = keyChannel.read(input);
logger.debug("readByte " + readByte);
// 同样,read完成之后记得flip(),因为这里的read相当于往buffer里面写了资料进去,否则读不出资料
input.flip();
if (readByte == -1) {
logger.debug("readByte == -1, return!");
return;
}
for (int i = 0; i logger.debug("read [" + i + "]:" + input.get());
}
}
// 遍历结束之后记得删除key,否则key会一直存在于selector.selectedKeys()
keyIterator.remove();
}
break;
} else {
logger.debug("sleep(1000)");
Thread.sleep(1000);
}
}
}
认真阅读以上的程式码,你可以知道通过SocketChannel做非阻塞读取的一个基本流程。
上文提到的flip()方法,可以看到每次写完资料,准备读之前,都要呼叫它,为什么要这么做呢?先看一下他的源代码:
public final Buffer flip() {
limit = position;
position = 0;
mark = -1; // mark是个标记位,通过mark()和reset()方法使用,不展开讲
return this;
}
从这里可以看出,呼叫flip()之后,limit(也就是size)被锁定了,就是你读了的资料的size,position从0开始,后面读的时候就可以从0开始读了。如果不呼叫position=size,再怎么读都是空资料,这就是flip()的作用。
以上就是如果使用java来实现一个长连线的客户端,希望对您能有帮助!





























