Jedis是Redis官方推荐的Jedis客户端,在业界使用非常广泛。通过阅读Jedis源码,可以掌握如何设计和实现一个轻量级、带连接池的Redis客户端。我常常说,学习源码最好的方法就是以自己的思考重新写一遍,这样有助于把学到的知识固化成实际经验。本文会带着你从零开始设计一个极简版的Jedis,从支持最简单的GET/SET
命令出发,逐步添加缓冲区、多种命令、断线重连、连接池等多种功能。本文的源代码放在Github上(mini-jedis),感兴趣的读者可以自行取阅。
首先实现一个最简单的GET
命令。我们知道Jedis的IO模式是BIO,它发送和接收数据依赖输入输出流的阻塞式读写。所以我们先为Jedis创建一个socket:
// Jedis.javapublic Jedis(String host, int port) throws UnknownHostException, IOException {socket = new Socket();socket.setReuseAddress(true);socket.setKeepAlive(true); // Will monitor the TCP connection is validsocket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to ensure timely delivery of datasocket.setSoLinger(true, 0); // Control calls close () method, the underlying socket is closed immediatelysocket.connect(new InetSocketAddress(host, port), 2000);socket.setSoTimeout(2000);}
注意socket有多项TCP参数要设置(参考Jedis源码),设置后的效果如下:
REUSE_ADDRESS
:Jedis客户端断开连接处于TIMED_WAIT
状态时,重连可以复用之前的端口。SO_KEEPALIVE
:当长时间没有消息收发时,也能保证连接不断开。TCP_NODELAY
:禁用nagle算法,保证发送数据的实时性。SO_LINGER
:当调用close()方法时,连接立刻关闭,而不是通常那样等待缓冲区中的剩余数据发送完。建立好socket后,我们再来编写与Redis服务器通信的逻辑。在写之前,我们有必要了解一下Redis的通信协议:RESP协议(Redis Serialization Protocol)。
RESP协议是一种简单的应用层协议,它的特点是:实现精简、容易阅读。以GET命令为例,我们来看一个最简单的RESP协议例子:
// GET消息:发送
*2
$3
GET
$4
key1// GET消息:接收
$6
value1
以上消息中有一些特定符号,它们的意义总结如下:
意义 | 开头字符 | 示例 | 解释 |
---|---|---|---|
简单字符串 | + | +OK\r\n | 未知长度的字符串,如SET命令的返回 |
错误 | - | -Error message\r\n | Redis服务器返回的报错 |
整型 | : | :0\r\n | 整数,如LLEN命令的返回 |
批量字符串 | $ | $3\r\nGET\r\n | 已知长度的字符串,$后面的数字表示字符串长度 |
数组 | * | *2\r\n | 数组,每个元素都属于其他基本类型,*后面的数字表示数组长度 |
根据这里的符号定义,可以解释上面的GET消息:
清楚了GET消息的协议构成,很容易写出它的实现代码:
// Jedis.javapublic String get(String key) throws IOException {StringBuilder sb = new StringBuilder();sb.append("*2").append("\r\n").append("$3").append("\r\n").append("GET").append("\r\n").append("$").append(key.length()).append("\r\n").append(key).append("\r\n");socket.getOutputStream().write(sb.toString().getBytes());InputStream is = socket.getInputStream();byte[] b = new byte[1024];int len = is.read(b);String ret = new String(b, 0, len);return ret;}
同样地,我们编写SET消息的实现方法,并写好它们的测试例子:
// Jedis.java
public static void main(String args[]) throws UnknownHostException, IOException {Jedis jedis = new Jedis("localhost", 6379);System.out.print(jedis.set("key1", "value1"));System.out.print(jedis.get("key1"));
}
这样就得到了包含最简单两个命令GET/SET
的基础版代码。
V1版本的实现虽然能跑得通,但是它有两个问题:
socket.getOutputStream().write(sb.toString().getBytes());
,短短一行代码包含三次字节数组的拷贝:toString()第一次,getBytes()第二次,write()第三次。对于追求性能的我们这种情况肯定不能忍,必须找到一种方法降低字节数组的拷贝次数。解决上述问题的方法就是使用缓冲区来处理数据的读写。有了缓冲区,我们可以在恰当的时候直接将数据拷贝到socket的输出流中,仅需一次数据拷贝,而不会再有StringBuilder -> String -> byte array -> socket输出流的三次拷贝过程。同时,当从输入流读取较长数据时,我们可以使用缓冲区多次读取,再将每次读取结果加以拼接。设计缓冲区,也是许多网络通信底层应用提升性能的一种基本思路。
先来看输出缓冲区的设计。我们设计一个缓冲区的包装类RedisOutputStream
,它同时还包含从socket中获取的输出流:
// RedisOutputStream.java
public class RedisOutputStream extends FilterOutputStream {protected final byte[] buf;// 下一个写入的位置protected int count;private final static int BUFFER_SIZE = 8192;
上面代码显示:我们分配了一个大小为8192字节的缓冲区,并用变量count
监控下一个写入的位置。为了演示缓冲区的用法,我们以两个方法write(byte b)
和flush()
为例。前者用来向缓冲区写入一个字节,当缓冲区已满时,调用后者把当前缓冲区中的数据写入socket输出流:
// RedisOutputStream.javapublic void write(final byte b) throws IOException {if (count == buf.length) {flushBuffer();}buf[count++] = b;}public void flushBuffer() throws IOException {if (count > 0) {out.write(buf, 0, count);count = 0;}}
输入缓冲区的设计与输出缓冲区类似,不同点是有两个监控变量:count
代表当前读到的位置,limit
代表缓冲区中有用数据的边界。读者要是熟悉Java NIO的Buffer
类的API,对比里面的类似概念,会更容易理解。
// RedisInputStream.java
public class RedisInputStream extends FilterInputStream {protected final byte[] buf;// count: 当前读到的位置// limit: 缓冲区有用数据的边界protected int count, limit;private static final int BUFFER_SIZE = 8192;
同样我们用两个方法来演示输入缓冲区的用法。readByte()
用来从缓冲区中读取一个字节,读取前会先调用ensureFill()
方法。这个方法用来确保缓冲区中有可读的数据,否则会从socket输入流中读取一次数据到缓冲区,并重置count
和limit
两个字段。
// RedisInputStream.javapublic byte readByte() throws IOException {ensureFill();return buf[count++];}private void ensureFill() throws IOException {if (count >= limit) {limit = in.read(buf);count = 0;if (limit == -1) {throw new RuntimeException("Unexpected end of stream.");}}}
完成了缓冲区代码的编写。接下来另一件重要的事情是优化命令处理的代码,加以封装,增加可读性。之前的代码几近于裸写,输出输入数据的逻辑都糅合在一起,当支持的命令增多时,代码可读性肯定会很低。
还是以GET
命令为例。我们先创建CommandArguments
类用于封装命令的名字及参数,再将发送命令的逻辑放到sendCommand(CommandArguments args)
方法中,将处理接收数据的逻辑放到processReply()
方法中。注意sendCommand
方法调用后,会马上调用flushBuffer()
将当前缓冲区中的数据立即写入到socket输出流中。
// Jedis.javapublic String get(String key) throws IOException {CommandArguments args = new CommandArguments();args.add(Command.GET.name());args.add(key);sendCommand(args);os.flushBuffer();String resp = processReply();return resp;}
从下面的实现细节可以看出,原来发送数据用到的StringBuilder
不复存在,所有数据都是直接写入类型为RedisOutputStream
的os
。processReply()
方法中处理了输入数据以+
开头和以$
开头的两种情况。
// Jedis.javapublic void sendCommand(CommandArguments args) throws IOException {os.write(Protocol.ASTERISK);os.writeIntCrLf(args.size());for (String arg : args) {os.write(Protocol.DOLLAR);byte[] argBytes = arg.getBytes();os.writeIntCrLf(argBytes.length);os.write(argBytes);os.writeCrLf();}}public String processReply() throws IOException {final byte b = is.readByte();switch (b) {case Protocol.PLUS: // 以+开头return processStatusCodeReply();case Protocol.DOLLAR: // 以$开头return processBulkReply();default:throw new RuntimeException("can't parse reply");}}private String processStatusCodeReply() throws IOException {return is.readLine();}
其他实现细节在此不再赘述。至此,我们在版本V2中使用缓冲区优化了读写性能,并将命令处理的代码适当封装,进一步提升了可读性。
在前面的版本我们支持了GET/SET
两种命令。实际上Redis支持的命令成百上千,都支持显然是不现实的。但我们可以选择几个有代表性的命令,力求涵盖RESP命令中出现的所有数据类型。为此我们增加RPUSH
和LRANGE
两个命令,这里出现的新数据类型在于:RPUSH
返回的是整数,而LRANGE
返回的是多个字符串组成的数组。
扩展processReply()
方法,使得它能处理整数和数组类型的返回:
// Jedis.javapublic Object processReply() throws IOException {final byte b = is.readByte();switch (b) {case Protocol.PLUS:return processStatusCodeReply();case Protocol.DOLLAR:return processBulkReply();case Protocol.COLON:return processInteger();case Protocol.ASTERISK:return processMultiBulkReply();default:throw new RuntimeException("can't parse reply");}}
在以下的实现细节中,可以看到:processMultiBulkReply()
方法会根据读取到的数组元素个数,递归调用processReply()
。注意这里的返回类型是List
而不是List
,因为数组元素可能有多种,不一定是字符串:
// Jedis.javaprivate Long processInteger() throws IOException {return is.readLongCrLf();}private List
我们可以为测试代码加入 RPUSH
和LRANGE
命令的调用:
// Jedis.javapublic static void main(String args[]) throws UnknownHostException, IOException {Jedis jedis = new Jedis("localhost", 6379);System.out.println(jedis.set("key1", "value1"));System.out.println(jedis.get("key1"));System.out.println(jedis.rpush("lkey1", "lvalue1", "lvalue2", "lvalue3"));System.out.println(jedis.lrange("lkey1", 1, 2));}
代码写到这里,你会发现一个棘手的问题:通信相关的底层方法会抛出IOException
等大量checked exception。当数量不多时,可以一层层抛出来直到最上层。但是数量多时这么写很麻烦,而且也不是一种良好的编码风格。我们需要找到一种方法统一解决这个问题。
通常的做法是在代码的某一层次捕捉checked exception,并将其转换为unchecked exception。为方便处理,我们可以为这些unchecked exceptions创建一种新的异常类型。例如我们创建JedisConnectionException
类,用来表示数据读写时抛出的异常。当捕捉到这种异常后,我们就可以根据实际情况,决定继续抛出异常,还是从异常中恢复。以下是将IOException
转换成JedisConnectionException
的代码示例:
// RedisInputStream.javaprivate void ensureFill() {try {if (count >= limit) {limit = in.read(buf);count = 0;if (limit == -1) {throw new JedisConnectionException("Unexpected end of stream.");}}} catch (IOException e) {throw new JedisConnectionException(e);}}
除了异常处理,另一个需要解决的问题是断线重连。当我们读写数据时,可能会因为网络原因连接断开,这时需要一个机制能恢复网络连接。如下代码,sendCommand
方法中会先调用ensureConnect()
,保证连接可用;如果当前连接已断开,那么会立即重连:
// Jedis.javapublic void sendCommand(CommandArguments args) {try {ensureConnect();sendCommand0(args);} catch (JedisConnectionException e) {broken = true;throw e;}}public void ensureConnect() {if (!isConnected()) {connect();}}
我们前面说过,Jedis的IO模型是BIO。BIO的缺点是它是阻塞的,如果我们有很多Redis客户端请求,那么一个命令如果有延迟被阻塞,会影响到后面的所有命令。联想到JDBC访问数据库也是BIO的,但是通常会使用连接池,好处是可以并发发送消息,降低消息阻塞的影响;而且连接可以复用,因为毕竟创建一个连接的代价是很大的。
同样的,Jedis也支持连接池。Jedis底层使用了commons-pool2
包,这个包实现了一个通用的对象池,不过它比较多用于实现连接池。要使用它实现连接池,关键点在于实现JedisPool
和JedisFactory
两个类。
JedisPool
继承了GenericObjectPool
类,它定义了从池中借出和归还连接所需的逻辑。注意归还连接时,如果遇到异常,那么直接丢弃掉这个连接:
// JedisPool.java
public class JedisPool extends GenericObjectPool {...public Jedis getResource() throws Exception {Jedis jedis = super.borrowObject();jedis.setDataSource(this);return jedis;}public void returnResource(final Jedis resource) {if (resource != null) {try {super.returnObject(resource);} catch (Exception e) {this.returnBrokenResource(resource);}}}public void returnBrokenResource(final Jedis resource) {try {super.invalidateObject(resource);} catch (Exception e) {throw new JedisException(e);}}
}
JedisFactory
继承了PooledObjectFactory
类,它定义了创建和销毁对象(也就是连接)的逻辑:
// jedisFactory.java
public class JedisFactory implements PooledObjectFactory {...@Overridepublic void destroyObject(PooledObject p) throws Exception {Jedis jedis = p.getObject();if (jedis.isConnected()) {jedis.close();}}@Overridepublic PooledObject makeObject() throws Exception {Jedis jedis = null;try {jedis = new Jedis(host, port);jedis.connect();return new DefaultPooledObject<>(jedis);} catch (Exception e) {if (e instanceof IOException) {if (jedis != null) {jedis.close();}}throw e;}}
}
在上面销毁对象的代码中调用了Jedis.close()
,它的实现如下。如果这个连接绑定到了某个连接池,那么不会真的关闭,而是归还到连接池中。注意我们增加了一个字段broken
用于记录连接是否已经损坏,当平时读写数据时如发生异常就会将这个字段置为true
,然后关闭连接时直接丢弃,不再放入池中。
// Jedis.java@Overridepublic void close() {if (dataSource != null) {JedisPool pool = dataSource;dataSource = null;if (broken) {pool.returnBrokenResource(this);} else {pool.returnResource(this);}} else {closeConnection();}}
最后为JedisPool
编写测试代码如下:
public static void main(String args[]) throws Exception {try (JedisPool pool = new JedisPool("localhost", 6379)) {try (Jedis jedis = pool.getResource()) {System.out.println(jedis.set("key1", "value1"));System.out.println(jedis.get("key1"));System.out.println(jedis.rpush("lkey1", "lvalue1", "lvalue2", "lvalue3"));System.out.println(jedis.lrange("lkey1", 1, 2));}}}
经过多次迭代,我们从零开始实现了一个简单的Jedis客户端,支持多种命令、缓冲区、断线重连以及连接池。这里主要想带给大家一个逐步深入的过程和设计思路的取舍。再复杂的系统,只要自己实际设计过一遍,就能懂得作者的核心思路所在,再看源码就容易多了。当然Jedis的功能还远不止这些,包括multi
、pipeline
、cluster
等等。如果以后有时间,笔者会撰写进一步的后续文章,敬请期待。