用python从0实现一个redis
简介
Redis是一种高性能的内存数据库,广泛应用于缓存、消息队列、计数器等场景。它支持多种数据结构,如字符串、哈希、列表、集合、有序集合等,并提供了丰富的操作命令。
本文将介绍如何从零开始实现一个简化版的Redis服务器。通过这个实现,我们将深入理解Redis的内部工作原理,并学习如何使用Python构建一个高性能的网络服务器。
我们的实现目标是支持以下功能:
-
基本的键值对操作(SET、GET)
-
事务(MULTI、EXEC、DISCARD)
-
发布订阅(SUBSCRIBE、UNSUBSCRIBE、PUBLISH)
-
主从复制(SLAVEOF、SYNC、PSYNC)
-
数据持久化(RDB快照、AOF日志)
环境准备
在开始实现之前,我们需要准备好开发环境。本文使用Python 3.7+进行开发,并依赖以下库:
-
asyncio:用于异步I/O和事件循环的标准库
-
logging:用于日志记录的标准库
代码实现
服务器的整体架构
我们的Redis服务器由两个主要组件组成:协议处理器(ProtocolHandler)和Redis服务器(RedisServer)。
协议处理器负责与客户端进行通信,解析客户端发送的命令,并将命令转发给Redis服务器执行。Redis服务器负责维护数据库状态,处理命令,并将结果返回给协议处理器。
协议处理器 ProtocolHandler
协议处理器的主要职责是处理Redis协议。它接收客户端发送的数据,解析命令和参数,并将命令转发给Redis服务器执行。
初始化
class ProtocolHandler:def init(self, server):
self.server = server
self.buffer = ''
在初始化时,协议处理器接收一个RedisServer实例作为参数,并初始化一个空的缓冲区(buffer)用于存储接收到的数据。
数据接收和解析
def data_received(self, data):
logger.debug(f'Received data: {data}')
self.buffer += data.decode()
if '\n' in self.buffer:
lines = self.buffer.split('\n')
self.buffer = lines.pop()
for line in lines:
command, *args = line.split()
response = self.server.handle_command(command, *args)
self.server.transport.write(self.encode_response(response))
当接收到客户端发送的数据时,协议处理器将数据追加到缓冲区中。如果缓冲区中包含完整的命令(以'\n'结尾),则将缓冲区按行分割,并依次处理每个命令。
对于每个命令,协议处理器将其拆分为命令名和参数,并调用Redis服务器的handle_command方法进行处理。处理结果通过encode_response方法编码为Redis协议格式,并发送回客户端。
响应编码
def encode_response(self, response):
logger.debug(f'Sending response: {response}')
if response is None:
return '$-1\r\n'.encode()
elif isinstance(response, str):
return f'+{response}\r\n'.encode()
elif isinstance(response, int):
return f':{response}\r\n'.encode()
elif isinstance(response, bytes):
return f'${len(response)}\r\n{response.decode()}\r\n'.encode()
elif isinstance(response, list):
return f'*{len(response)}\r\n'.encode() + b''.join(self.encode_response(item) for item in response)
elif isinstance(response, Exception):
return f'-{response}\r\n'.encode()
else:
raise ValueError(f'Invalid response type: {type(response)}')
encode_response方法将命令的处理结果编码为Redis协议格式。根据响应的类型,它会生成相应的Redis协议响应:
-
如果响应为None,返回空批量字符串响应。
-
如果响应为字符串,返回简单字符串响应。
-
如果响应为整数,返回整数响应。
-
如果响应为字节串,返回批量字符串响应。
-
如果响应为列表,返回数组响应,并递归编码列表中的每个元素。
-
如果响应为异常,返回错误响应。
Redis服务器 RedisServer
Redis服务器是整个服务器的核心组件,负责维护数据库状态,处理命令,并与客户端进行交互。
初始化
class RedisServer:def init(self, host='127.0.0.1', port=6379, password=None, db=0, rdb_file='dump.rdb', aof_file='appendonly.aof', replication_id=None):print('begin init')
self.host = host
self.port = port
self.password = password
self.db = db
self.data = {}
self.expires = {}
self.watching = {}
self.pubsub_channels = {}
self.pubsub_patterns = {}
self.rdb_file = rdb_file
self.aof_file = aof_file
self.aof_buffer = []
self.replication_id = replication_id or str(int(time.time()))
self.replication_offset = 0
self.slaves = set()
self.transport = None
self.load_data()
在初始化时,Redis服务器设置了服务器的主机、��口、密码、数据库编号等配置。它还初始化了一些重要的数据结构,如data(存储键值对)、expires(存储键的过期时间)、pubsub_channels(存储发布订阅的频道)等。
此外,服务器还设置了RDB快照文件和AOF日志文件的路径,以及复制相关的属性(replication_id和replication_offset)。
最后,服务器调用load_data方法从RDB快照或AOF日志中加载数据。
客户端连接处理
async def handle_client(self, reader, writer):
self.transport = writer
protocol = ProtocolHandler(self)
logger.info('Client connected')
try:
while True:
data = await reader.read(1024)
if not data:
break
protocol.data_received(data)
except ConnectionResetError:
pass
finally:
writer.close()
await writer.wait_closed()
logger.info('Client disconnected')
当有新的客户端连接到服务器时,handle_client方法会被调用。它创建一个协议处理器实例,并进入一个循环,不断从客户端读取数据并交给协议处理器处理。
如果客户端关闭连接或发生连接重置错误,循环会退出,并关闭与客户端的连接。
服务器启动和运行
def run(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
coro = asyncio.start_server(self.handle_client, self.host, self.port)
server = loop.run_until_complete(coro)
logger.info(f'Serving on {server.sockets[0].getsockname()}')
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
run方法是服务器的入口点。它创建一个新的事件循环,并使用asyncio.start_server启动服务器,监听指定的主机和端口。
服务器启动后,事件循环会一直运行,直到发生键盘中断(Ctrl+C)。最后,服务器关闭,事件循环停止。
命令处理
def handle_command(self, command, *args):
print(f'Handling command: {command} {args}')
logger.debug(f'Executing command: {command} {args}')
if not hasattr(self, f'handle_{command.lower()}'):
logger.error(f'Unknown command: {command}')
raise Exception(f'ERR unknown command `{command}`')
result = getattr(self, f'handle_{command.lower()}')(*args)
logger.debug(f'Command result: {result}')
return result
handle_command方法是命令处理的入口点。它接收命令名和参数,并根据命令名调用相应的处理方法(handle_xxx)。
如果命令不存在,会抛出一个异常。命令处理的结果会被返回给协议处理器。
基本的键值对操作
SET命令
def handle_set(self, key, value, *args):
# 处理 SET 命令,支持 EX、PX、NX、XX 等选项
ex = None
px = None
nx = False
xx = False
i = 0
while i < len(args):
if args[i] == 'EX':
ex = int(args[i+1])
i += 2
elif args[i] == 'PX':
px = int(args[i+1])
i += 2
elif args[i] == 'NX':
nx = True
i += 1
elif args[i] == 'XX':
xx = True
i += 1
else:
raise Exception(f'ERR syntax error')
if nx and xx:
raise Exception(f'ERR syntax error')
if nx and key in self.data:
return None
if xx and key not in self.data:
return None
self.data[key] = value
if ex is not None:
self.expires[key] = time.time() + ex
elif px is not None:
self.expires[key] = time.time() + px / 1000
self.aof_buffer.append(f'SET {key} {value} {" ".join(args)}\n')
self.publish('__keyspace@0__:' + key, 'set')
return 'OK'
SET命令用于设置一个键值对。它支持以下选项:
-
EX:设置键的过期时间,单位为秒。
-
PX:设置键的过期时间,单位为毫秒。
-
NX:只有当键不存在时才设置值。
-
XX:只有当键已经存在时才设置值。
如果设置了过期时间,键会在指定的时间后自动过期。SET命令的结果会追加到AOF缓冲区中,用于AOF持久化。同时,它还会向keyspace@0:key频道发布一个'set'消息,用于键空间通知。
SET key value [EX seconds] [PX milliseconds] [NX|XX]
SET mykey myvalue
SET mykey myvalue EX 10
SET mykey myvalue PX 5000 NX
GET命令
def handle_get(self, key): //处理 GET 命令,返回键对应的值
return self.data.get(key)
示例
GET mykey
事务相关命令
MULTI命令
def handle_multi(self):
self.transaction = []return 'OK'
MULTI命令用于标记事务块的开始。它会将当前客户端的状态设置为事务状态,并初始化一个空的事务队列(transaction)。
EXEC命令
def handle_exec(self):
if not hasattr(self, 'transaction'):
raise Exception('ERR EXEC without MULTI')
result = []
for command, *args in self.transaction:
try:
result.append(self.handle_command(command, *args))
except Exception as e:
result.append(e)
self.transaction = []
return result
EXEC命令用于执行事务队列中的所有命令。它会依次执行事务队列中的每个命令,并将执行结果存储在一个列表中。如果执行过程中发生异常,异常也会被添加到结果列表中。
执行完所有命令后,事务队列会被清空,事务状态结束。
DISCARD命令
def handle_discard(self):
if not hasattr(self, 'transaction'):
raise Exception('ERR DISCARD without MULTI')
self.transaction = []
return 'OK'
DISCARD命令用于取消事务。它会清空事务队列,并将客户端的状态恢复为非事务状态。
发布订阅相关命令
SUBSCRIBE命令
def handle_subscribe(self, *channels):
for channel in channels:
if channel not in self.pubsub_channels:
self.pubsub_channels[channel] = set()
self.pubsub_channels[channel].add(self.transport)
return ['subscribe', len(channels), list(channels)]
SUBSCRIBE命令用于订阅一个或多个频道。它会将当前客户端的连接(transport)添加到指定频道的订阅者集合中。
示例
SUBSCRIBE channel [channel ...]
SUBSCRIBE mychannel
SUBSCRIBE channel1 channel2
UNSUBSCRIBE命令
def handle_unsubscribe(self, *channels):
if not channels:
channels = list(self.pubsub_channels.keys())
for channel in channels:
if channel in self.pubsub_channels:
self.pubsub_channels[channel].discard(self.transport)
if not self.pubsub_channels[channel]:
del self.pubsub_channels[channel]
return ['unsubscribe', len(channels), list(channels)]
UNSUBSCRIBE命令用于取消订阅一个或多个频道。它会将当前客户端的连接从指定频道的订阅者集合中移除。如果没有指定频道,则取消订阅所有频道。
示例
UNSUBSCRIBE [channel [channel ...]]
UNSUBSCRIBE mychannel
UNSUBSCRIBE
PSUBSCRIBE命令
def handle_psubscribe(self, *patterns):
for pattern in patterns:
if pattern not in self.pubsub_patterns:
self.pubsub_patterns[pattern] = set()
self.pubsub_patterns[pattern].add(self.transport)
return ['psubscribe', len(patterns), list(patterns)]
PSUBSCRIBE命令用于订阅一个或多个模式。它会将当前客户端的连接添加到指定模式的订阅者集合中。
PSUBSCRIBE pattern [pattern ...]
PSUBSCRIBE my*
PSUBSCRIBE pattern1 pattern2
PUNSUBSCRIBE命令
def handle_punsubscribe(self, *patterns):
if not patterns:
patterns = list(self.pubsub_patterns.keys())
for pattern in patterns:
if pattern in self.pubsub_patterns:
self.pubsub_patterns[pattern].discard(self.transport)
if not self.pubsub_patterns[pattern]:
del self.pubsub_patterns[pattern]
return ['punsubscribe', len(patterns), list(patterns)]
PUNSUBSCRIBE命令用于取消订阅一个或多个模式。它会将当前客户端的连接从指定模式的订阅者集合中移除。如果没有指定模式,则取消订阅所有模式。
示例:
PUNSUBSCRIBE [pattern [pattern ...]]
PUNSUBSCRIBE my*
PUNSUBSCRIBE
PUBLISH命令
def handle_publish(self, channel, message):
self.publish(channel, message)
return len(self.pubsub_channels.get(channel, set()))
def publish(self, channel, message):
for transport in self.pubsub_channels.get(channel, set()):
transport.write(self.encode_response(['message', channel, message]))
for pattern, transports in self.pubsub_patterns.items():
if self.match_pattern(pattern, channel):
for transport in transports:
transport.write(self.encode_response(['pmessage', pattern, channel, message]))
PUBLISH命令用于向指定的频道发布消息。它会将消息发送给订阅该频道的所有客户端。
publish方法实现了消息的发布逻辑。它会遍历频道的订阅者集合,将消息发送给每个订阅者。同时,它还会检查模式的匹配情况,将消息发送给匹配的模式订阅者。
match_pattern方法使用fnmatch模块实现了简单的模式匹配功能,支持*和?通配符。
主从复制相关命令
SLAVEOF命令
def handle_slaveof(self, host, port):if host == 'no' and port == 'one':
self.slaveof = Noneelse:
self.slaveof = (host, int(port))return 'OK'
SLAVEOF命令用于将当前服务器设置为另一个服务器的从服务器。它接收主服务器的主机和端口作为参数,并将slaveof属性设置为对应的值。
特殊地,SLAVEOF no one命令用于将从服务器转变为主服务器。
示例:
SLAVEOF host port
SLAVEOF 127.0.0.1 6380
SLAVEOF no one
SYNC命令
def handle_sync(self):
if not self.slaveof:
raise Exception('ERR not a slave')
self.transport.write(self.encode_response(['fullresync', self.replication_id, self.replication_offset]))
self.send_snapshot()
SYNC命令由从服务器发送给主服务器,用于请求完整的数据同步。当从服务器接收到SYNC命令时,它会向主服务器发送一个FULLRESYNC命令,并携带当前的复制ID和复制偏移量。
然后,从服务器会调用send_snapshot方法,将当前的数据快照发送给主服务器。
PSYNC命令
def handle_psync(self, replication_id, replication_offset):
if not self.slaveof:
raise Exception('ERR not a slave')
if replication_id == self.replication_id:
self.transport.write(self.encode_response(['continue', self.replication_offset]))
self.send_backlog(int(replication_offset))
else:
self.transport.write(self.encode_response(['fullresync', self.replication_id, self.replication_offset]))
self.send_snapshot()
PSYNC命令由从服务器发送给主服务器,用于请求部分重同步。当从服务器接收到PSYNC命令时,它会比较接收到的复制ID和自身的复制ID。
如果复制ID相同,说明从服务器只需要接收部分增量数据即可。主服务器会向从服务器发送一个CONTINUE响应,并调用send_backlog方法,将从指定偏移量开始的增量数据发送给从服务器。
如果复制ID不同,说明需要进行完整重同步。主服务器会向从服务器发送一个FULLRESYNC响应,并调用send_snapshot方法,将完整的数据快照发送给从服务器。
send_snapshot方法用于发送RDB快照数据,send_backlog方法用于发送AOF增量数据。
示例:
PSYNC replication_id replication_offset
数据持久化
RDB快照
def send_snapshot(self):
# 发送 RDB 快照
self.transport.write(self.encode_response(['rdb', self.rdb_file]))
with open(self.rdb_file, 'rb') as f:
while True:
data = f.read(1024)
if not data:
break
self.transport.write(data)
send_snapshot方法用于发送RDB快照数据。它首先向客户端发送一个RDB响应,指示快照文件的路径。然后,它打开快照文件,分块读取数据,并将数据发送给客户端。
AOF日志
从 RDB 或 AOF 文件中加载数据:
def load_data(self):
# 从 RDB 或 AOF 文件中加载数据
if os.path.exists(self.aof_file):
self.load_aof()
elif os.path.exists(self.rdb_file):
with open(self.rdb_file, 'rb') as f:
self.data = pickle.load(f)
def load_aof(self):
# 从 AOF 文件中加载数据
with open(self.aof_file, 'r') as f:
for line in f:
command, *args = line.split()
self.handle_command(command, *args)
async def bgrewriteaof(self):
# 在后台重写 AOF 文件
with open(f'{self.aof_file}.temp', 'w') as f:
for key, value in self.data.items():
f.write(f'SET {key} {value}\n')
os.rename(f'{self.aof_file}.temp', self.aof_file)
self.aof_buffer = []
def flush_aof(self):
# 将 AOF 缓冲区中的命令写入 AOF 文件
with open(self.aof_file, 'a') as f:
f.writelines(self.aof_buffer)
self.aof_buffer = []
服务器启动时,会优先尝试从AOF文件中加载数据。如果AOF文件不存在,则尝试从RDB快照文件中加载数据。
load_aof方法用于从AOF文件中加载数据。它逐行读取AOF文件,并将每一行解析为命令和参数,然后调用handle_command方法执行命令,以恢复数据库状态。
bgrewriteaof方法用于在后台重写AOF文件。它会创建一个新的AOF文件,并将当前数据库中的所有键值对写入新文件。写入完成后,新文件会替换旧的AOF文件,AOF缓冲区也会被清空。
flush_aof方法用于将AOF缓冲区中的命令写入AOF文件。它会打开AOF文件,将缓冲区中的命令追加到文件末尾,然后清空缓冲区。
其他命令
PING命令
def handle_ping(self, *args):if len(args) > 1:raise Exception('ERR wrong number of arguments for ping command')if len(args) == 0:return 'PONG'else:return args[0]
PING命令用于检查服务器是否正在运行,并测量服务器的响应时间。如果没有给定参数,服务器返回PONG。如果给定了参数,服务器返回相同的参数。
ECHO命令
def handle_echo(self, message):return message
ECHO命令用于打印给定的消息。服务器直接返回接收到的消息。
运行和测试
要运行Redis服务器,只需执行以下代码:
if name == '__main__':
logging.basicConfig(level=logging.INFO)
server = RedisServer()print(f'Server: {server}')
server.run()
服务器会在默认的主机(127.0.0.1)和端口(6379)上启动,并等待客户端连接。
你可以使用任何Redis客户端(如redis-cli)与服务器进行交互,测试各项功能。以下是一些测试案例:
通过这些测试,你可以验证服务器是否正确实现了各项功能。
流程图
完整代码
总结和展望
本文介绍了如何使用Python从零开始实现一个简化版的Redis服务器。我们实现了基本的键值对操作、事务、发布订阅、主从复制等功能,并支持RDB快照和AOF日志持久化。
通过这个实现,我们深入理解了Redis的内部工作原理,学习了如何使用asyncio构建高性能的网络服务器。
当然,这只是一个简化版的实现,还有许多功能和优化没有涉及,例如:
-
更多的数据结构支持(如列表、集合、有序集合等)
-
更完善的事务机制(如WATCH命令、CAS支持等)
-
更高效的持久化策略(如RDB-AOF混合持久化)
-
更健壮的主从复制和哨兵机制
-
集群支持和分片机制
-
更丰富的客户端协议和命令支持
这些都是进一步改进和扩展的方向。通过不断完善和优化,我们可以构建一个功能更加强大的redis库。
转载自:https://juejin.cn/post/7392115722867589170