手把手教你写一个极简版Jedis
创始人
2024-06-01 11:13:35
0

目录

  • 前言
  • 版本V1:基础版
  • 版本V2:缓冲区、命令封装
  • 版本V3:支持多种Redis命令
  • 版本V4:异常处理、断线重连、连接池
  • 总结

前言

Jedis是Redis官方推荐的Jedis客户端,在业界使用非常广泛。通过阅读Jedis源码,可以掌握如何设计和实现一个轻量级、带连接池的Redis客户端。我常常说,学习源码最好的方法就是以自己的思考重新写一遍,这样有助于把学到的知识固化成实际经验。本文会带着你从零开始设计一个极简版的Jedis,从支持最简单的GET/SET命令出发,逐步添加缓冲区、多种命令、断线重连、连接池等多种功能。本文的源代码放在Github上(mini-jedis),感兴趣的读者可以自行取阅。

版本V1:基础版

首先实现一个最简单的GET命令。我们知道Jedis的IO模式是BIO,它发送和接收数据依赖输入输出流的阻塞式读写。所以我们先为Jedis创建一个socket:

	// Jedis.javapublic Jedis(String host, int port) throws UnknownHostException, IOException {socket = new Socket();socket.setReuseAddress(true);socket.setKeepAlive(true); // Will monitor the TCP connection is validsocket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to ensure timely delivery of datasocket.setSoLinger(true, 0); // Control calls close () method, the underlying socket is closed immediatelysocket.connect(new InetSocketAddress(host, port), 2000);socket.setSoTimeout(2000);}

注意socket有多项TCP参数要设置(参考Jedis源码),设置后的效果如下:

  1. REUSE_ADDRESS:Jedis客户端断开连接处于TIMED_WAIT状态时,重连可以复用之前的端口。
  2. SO_KEEPALIVE:当长时间没有消息收发时,也能保证连接不断开。
  3. TCP_NODELAY:禁用nagle算法,保证发送数据的实时性。
  4. SO_LINGER:当调用close()方法时,连接立刻关闭,而不是通常那样等待缓冲区中的剩余数据发送完。

建立好socket后,我们再来编写与Redis服务器通信的逻辑。在写之前,我们有必要了解一下Redis的通信协议:RESP协议(Redis Serialization Protocol)

RESP协议是一种简单的应用层协议,它的特点是:实现精简、容易阅读。以GET命令为例,我们来看一个最简单的RESP协议例子:

// GET消息:发送
*2
$3
GET
$4
key1// GET消息:接收
$6
value1

以上消息中有一些特定符号,它们的意义总结如下:

意义开头字符示例解释
简单字符串++OK\r\n未知长度的字符串,如SET命令的返回
错误--Error message\r\nRedis服务器返回的报错
整型::0\r\n整数,如LLEN命令的返回
批量字符串$$3\r\nGET\r\n已知长度的字符串,$后面的数字表示字符串长度
数组**2\r\n数组,每个元素都属于其他基本类型,*后面的数字表示数组长度

根据这里的符号定义,可以解释上面的GET消息:

  1. 发送:由包含2个元素的数组构成,第一个元素是长度为3的字符串”GET“,第二个元素是长度为4的字符串”key1“。这两个元素分别是命令名字和命令参数。
  2. 接收:仅包含一个长度为6的字符串”value1“。

清楚了GET消息的协议构成,很容易写出它的实现代码:

	// Jedis.javapublic String get(String key) throws IOException {StringBuilder sb = new StringBuilder();sb.append("*2").append("\r\n").append("$3").append("\r\n").append("GET").append("\r\n").append("$").append(key.length()).append("\r\n").append(key).append("\r\n");socket.getOutputStream().write(sb.toString().getBytes());InputStream is = socket.getInputStream();byte[] b = new byte[1024];int len = is.read(b);String ret = new String(b, 0, len);return ret;}

同样地,我们编写SET消息的实现方法,并写好它们的测试例子:

// Jedis.java
public static void main(String args[]) throws UnknownHostException, IOException {Jedis jedis = new Jedis("localhost", 6379);System.out.print(jedis.set("key1", "value1"));System.out.print(jedis.get("key1"));
}

这样就得到了包含最简单两个命令GET/SET的基础版代码。

版本V2:缓冲区、命令封装

V1版本的实现虽然能跑得通,但是它有两个问题:

  1. 性能不佳。问题出在socket.getOutputStream().write(sb.toString().getBytes());,短短一行代码包含三次字节数组的拷贝:toString()第一次,getBytes()第二次,write()第三次。对于追求性能的我们这种情况肯定不能忍,必须找到一种方法降低字节数组的拷贝次数。
  2. 对复杂情况的处理不当。在前面的代码处理中,我们分配了一个1024字节的数组来从输入流读取数据。在通常的情况下,这个长度是够用的。但是在特殊情况下,返回的数据长度可能大于1024,这时我们需要多次读取数据并拼接起来。

解决上述问题的方法就是使用缓冲区来处理数据的读写。有了缓冲区,我们可以在恰当的时候直接将数据拷贝到socket的输出流中,仅需一次数据拷贝,而不会再有StringBuilder -> String -> byte array -> socket输出流的三次拷贝过程。同时,当从输入流读取较长数据时,我们可以使用缓冲区多次读取,再将每次读取结果加以拼接。设计缓冲区,也是许多网络通信底层应用提升性能的一种基本思路。

先来看输出缓冲区的设计。我们设计一个缓冲区的包装类RedisOutputStream,它同时还包含从socket中获取的输出流:

// RedisOutputStream.java
public class RedisOutputStream extends FilterOutputStream {protected final byte[] buf;// 下一个写入的位置protected int count;private final static int BUFFER_SIZE = 8192;

上面代码显示:我们分配了一个大小为8192字节的缓冲区,并用变量count监控下一个写入的位置。为了演示缓冲区的用法,我们以两个方法write(byte b)flush()为例。前者用来向缓冲区写入一个字节,当缓冲区已满时,调用后者把当前缓冲区中的数据写入socket输出流:

// RedisOutputStream.javapublic void write(final byte b) throws IOException {if (count == buf.length) {flushBuffer();}buf[count++] = b;}public void flushBuffer() throws IOException {if (count > 0) {out.write(buf, 0, count);count = 0;}}

输入缓冲区的设计与输出缓冲区类似,不同点是有两个监控变量:count代表当前读到的位置,limit代表缓冲区中有用数据的边界。读者要是熟悉Java NIO的Buffer类的API,对比里面的类似概念,会更容易理解。

// RedisInputStream.java
public class RedisInputStream extends FilterInputStream {protected final byte[] buf;// count: 当前读到的位置// limit: 缓冲区有用数据的边界protected int count, limit;private static final int BUFFER_SIZE = 8192;

同样我们用两个方法来演示输入缓冲区的用法。readByte()用来从缓冲区中读取一个字节,读取前会先调用ensureFill()方法。这个方法用来确保缓冲区中有可读的数据,否则会从socket输入流中读取一次数据到缓冲区,并重置countlimit两个字段。

// RedisInputStream.javapublic byte readByte() throws IOException {ensureFill();return buf[count++];}private void ensureFill() throws IOException {if (count >= limit) {limit = in.read(buf);count = 0;if (limit == -1) {throw new RuntimeException("Unexpected end of stream.");}}}

完成了缓冲区代码的编写。接下来另一件重要的事情是优化命令处理的代码,加以封装,增加可读性。之前的代码几近于裸写,输出输入数据的逻辑都糅合在一起,当支持的命令增多时,代码可读性肯定会很低。

还是以GET命令为例。我们先创建CommandArguments类用于封装命令的名字及参数,再将发送命令的逻辑放到sendCommand(CommandArguments args)方法中,将处理接收数据的逻辑放到processReply()方法中。注意sendCommand方法调用后,会马上调用flushBuffer()将当前缓冲区中的数据立即写入到socket输出流中。

// Jedis.javapublic String get(String key) throws IOException {CommandArguments args = new CommandArguments();args.add(Command.GET.name());args.add(key);sendCommand(args);os.flushBuffer();String resp = processReply();return resp;}

从下面的实现细节可以看出,原来发送数据用到的StringBuilder不复存在,所有数据都是直接写入类型为RedisOutputStreamosprocessReply() 方法中处理了输入数据以+开头和以$开头的两种情况。

// Jedis.javapublic void sendCommand(CommandArguments args) throws IOException {os.write(Protocol.ASTERISK);os.writeIntCrLf(args.size());for (String arg : args) {os.write(Protocol.DOLLAR);byte[] argBytes = arg.getBytes();os.writeIntCrLf(argBytes.length);os.write(argBytes);os.writeCrLf();}}public String processReply() throws IOException {final byte b = is.readByte();switch (b) {case Protocol.PLUS: // 以+开头return processStatusCodeReply();case Protocol.DOLLAR: // 以$开头return processBulkReply();default:throw new RuntimeException("can't parse reply");}}private String processStatusCodeReply() throws IOException {return is.readLine();}

其他实现细节在此不再赘述。至此,我们在版本V2中使用缓冲区优化了读写性能,并将命令处理的代码适当封装,进一步提升了可读性。

版本V3:支持多种Redis命令

在前面的版本我们支持了GET/SET两种命令。实际上Redis支持的命令成百上千,都支持显然是不现实的。但我们可以选择几个有代表性的命令,力求涵盖RESP命令中出现的所有数据类型。为此我们增加RPUSHLRANGE两个命令,这里出现的新数据类型在于:RPUSH返回的是整数,而LRANGE返回的是多个字符串组成的数组。

扩展processReply()方法,使得它能处理整数和数组类型的返回:

// Jedis.javapublic Object processReply() throws IOException {final byte b = is.readByte();switch (b) {case Protocol.PLUS:return processStatusCodeReply();case Protocol.DOLLAR:return processBulkReply();case Protocol.COLON:return processInteger();case Protocol.ASTERISK:return processMultiBulkReply();default:throw new RuntimeException("can't parse reply");}}

在以下的实现细节中,可以看到:processMultiBulkReply()方法会根据读取到的数组元素个数,递归调用processReply()。注意这里的返回类型是List而不是List,因为数组元素可能有多种,不一定是字符串:

// Jedis.javaprivate Long processInteger() throws IOException {return is.readLongCrLf();}private List processMultiBulkReply() throws IOException {final int num = is.readIntCrLf();if (num == -1) {return null;}final List ret = new ArrayList<>(num);for (int i = 0; i < num; i++) {ret.add(processReply());}return ret;}
 

我们可以为测试代码加入 RPUSHLRANGE命令的调用:

// Jedis.javapublic static void main(String args[]) throws UnknownHostException, IOException {Jedis jedis = new Jedis("localhost", 6379);System.out.println(jedis.set("key1", "value1"));System.out.println(jedis.get("key1"));System.out.println(jedis.rpush("lkey1", "lvalue1", "lvalue2", "lvalue3"));System.out.println(jedis.lrange("lkey1", 1, 2));}

版本V4:异常处理、断线重连、连接池

代码写到这里,你会发现一个棘手的问题:通信相关的底层方法会抛出IOException等大量checked exception。当数量不多时,可以一层层抛出来直到最上层。但是数量多时这么写很麻烦,而且也不是一种良好的编码风格。我们需要找到一种方法统一解决这个问题。

通常的做法是在代码的某一层次捕捉checked exception,并将其转换为unchecked exception。为方便处理,我们可以为这些unchecked exceptions创建一种新的异常类型。例如我们创建JedisConnectionException类,用来表示数据读写时抛出的异常。当捕捉到这种异常后,我们就可以根据实际情况,决定继续抛出异常,还是从异常中恢复。以下是将IOException转换成JedisConnectionException的代码示例:

// RedisInputStream.javaprivate void ensureFill() {try {if (count >= limit) {limit = in.read(buf);count = 0;if (limit == -1) {throw new JedisConnectionException("Unexpected end of stream.");}}} catch (IOException e) {throw new JedisConnectionException(e);}}

除了异常处理,另一个需要解决的问题是断线重连。当我们读写数据时,可能会因为网络原因连接断开,这时需要一个机制能恢复网络连接。如下代码,sendCommand方法中会先调用ensureConnect(),保证连接可用;如果当前连接已断开,那么会立即重连:

// Jedis.javapublic void sendCommand(CommandArguments args) {try {ensureConnect();sendCommand0(args);} catch (JedisConnectionException e) {broken = true;throw e;}}public void ensureConnect() {if (!isConnected()) {connect();}}

我们前面说过,Jedis的IO模型是BIO。BIO的缺点是它是阻塞的,如果我们有很多Redis客户端请求,那么一个命令如果有延迟被阻塞,会影响到后面的所有命令。联想到JDBC访问数据库也是BIO的,但是通常会使用连接池,好处是可以并发发送消息,降低消息阻塞的影响;而且连接可以复用,因为毕竟创建一个连接的代价是很大的。

同样的,Jedis也支持连接池。Jedis底层使用了commons-pool2包,这个包实现了一个通用的对象池,不过它比较多用于实现连接池。要使用它实现连接池,关键点在于实现JedisPoolJedisFactory两个类。

JedisPool继承了GenericObjectPool类,它定义了从池中借出和归还连接所需的逻辑。注意归还连接时,如果遇到异常,那么直接丢弃掉这个连接:

// JedisPool.java
public class JedisPool extends GenericObjectPool {...public Jedis getResource() throws Exception {Jedis jedis = super.borrowObject();jedis.setDataSource(this);return jedis;}public void returnResource(final Jedis resource) {if (resource != null) {try {super.returnObject(resource);} catch (Exception e) {this.returnBrokenResource(resource);}}}public void returnBrokenResource(final Jedis resource) {try {super.invalidateObject(resource);} catch (Exception e) {throw new JedisException(e);}}
}

JedisFactory继承了PooledObjectFactory类,它定义了创建和销毁对象(也就是连接)的逻辑:

// jedisFactory.java
public class JedisFactory implements PooledObjectFactory {...@Overridepublic void destroyObject(PooledObject p) throws Exception {Jedis jedis = p.getObject();if (jedis.isConnected()) {jedis.close();}}@Overridepublic PooledObject makeObject() throws Exception {Jedis jedis = null;try {jedis = new Jedis(host, port);jedis.connect();return new DefaultPooledObject<>(jedis);} catch (Exception e) {if (e instanceof IOException) {if (jedis != null) {jedis.close();}}throw e;}}
}

在上面销毁对象的代码中调用了Jedis.close(),它的实现如下。如果这个连接绑定到了某个连接池,那么不会真的关闭,而是归还到连接池中。注意我们增加了一个字段broken用于记录连接是否已经损坏,当平时读写数据时如发生异常就会将这个字段置为true,然后关闭连接时直接丢弃,不再放入池中。

// Jedis.java@Overridepublic void close() {if (dataSource != null) {JedisPool pool = dataSource;dataSource = null;if (broken) {pool.returnBrokenResource(this);} else {pool.returnResource(this);}} else {closeConnection();}}

最后为JedisPool编写测试代码如下:

	public static void main(String args[]) throws Exception {try (JedisPool pool = new JedisPool("localhost", 6379)) {try (Jedis jedis = pool.getResource()) {System.out.println(jedis.set("key1", "value1"));System.out.println(jedis.get("key1"));System.out.println(jedis.rpush("lkey1", "lvalue1", "lvalue2", "lvalue3"));System.out.println(jedis.lrange("lkey1", 1, 2));}}}

总结

经过多次迭代,我们从零开始实现了一个简单的Jedis客户端,支持多种命令、缓冲区、断线重连以及连接池。这里主要想带给大家一个逐步深入的过程和设计思路的取舍。再复杂的系统,只要自己实际设计过一遍,就能懂得作者的核心思路所在,再看源码就容易多了。当然Jedis的功能还远不止这些,包括multipipelinecluster等等。如果以后有时间,笔者会撰写进一步的后续文章,敬请期待。

相关内容

热门资讯

常用商务英语口语   商务英语是以适应职场生活的语言要求为目的,内容涉及到商务活动的方方面面。下面是小编收集的常用商务...
六年级上册英语第一单元练习题   一、根据要求写单词。  1.dry(反义词)__________________  2.writ...
复活节英文怎么说 复活节英文怎么说?复活节的英语翻译是什么?复活节:Easter;"Easter,anniversar...
2008年北京奥运会主题曲 2008年北京奥运会(第29届夏季奥林匹克运动会),2008年8月8日到2008年8月24日在中华人...
英语道歉信 英语道歉信15篇  在日常生活中,道歉信的使用频率越来越高,通过道歉信,我们可以更好地解释事情发生的...
六年级英语专题训练(连词成句... 六年级英语专题训练(连词成句30题)  1. have,playhouse,many,I,toy,i...
上班迟到情况说明英语   每个人都或多或少的迟到过那么几次,因为各种原因,可能生病,可能因为交通堵车,可能是因为天气冷,有...
小学英语教学论文 小学英语教学论文范文  引导语:英语教育一直都是每个家长所器重的,那么有关小学英语教学论文要怎么写呢...
英语口语学习必看的方法技巧 英语口语学习必看的方法技巧如何才能说流利的英语? 说外语时,我们主要应做到四件事:理解、回答、提问、...
四级英语作文选:Birth ... 四级英语作文范文选:Birth controlSince the Chinese Governmen...
金融专业英语面试自我介绍 金融专业英语面试自我介绍3篇  金融专业的学生面试时,面试官要求用英语做自我介绍该怎么说。下面是小编...
我的李老师走了四年级英语日记... 我的李老师走了四年级英语日记带翻译  我上了五个学期的小学却换了六任老师,李老师是带我们班最长的语文...
小学三年级英语日记带翻译捡玉... 小学三年级英语日记带翻译捡玉米  今天,我和妈妈去外婆家,外婆家有刚剥的`玉米棒上带有玉米籽,好大的...
七年级英语优秀教学设计 七年级英语优秀教学设计  作为一位兢兢业业的人民教师,常常要写一份优秀的教学设计,教学设计是把教学原...
我的英语老师作文 我的英语老师作文(通用21篇)  在日常生活或是工作学习中,大家都有写作文的经历,对作文很是熟悉吧,...
英语老师教学经验总结 英语老师教学经验总结(通用19篇)  总结是指社会团体、企业单位和个人对某一阶段的学习、工作或其完成...
初一英语暑假作业答案 初一英语暑假作业答案  英语练习一(基础训练)第一题1.D2.H3.E4.F5.I6.A7.J8.C...
大学生的英语演讲稿 大学生的英语演讲稿范文(精选10篇)  使用正确的写作思路书写演讲稿会更加事半功倍。在现实社会中,越...
VOA美国之音英语学习网址 VOA美国之音英语学习推荐网址 美国之音网站已经成为语言学习最重要的资源站点,在互联网上还有若干网站...
商务英语期末试卷 Part I Term Translation (20%)Section A: Translate ...