Redis 源码分析客户端数据结构(client)
数据结构
Redis 客户端数据结构如下所示: (src/server.h/client
)
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
connection *conn;
int resp; /* RESP protocol version. Can be 2 or 3. */
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
sds pending_querybuf; /* If this client is flagged as master, this buffer
represents the yet not applied portion of the
replication stream that we are receiving from
the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
int original_argc; /* Num of arguments of original command if arguments were rewritten. */
robj **original_argv; /* Arguments of original command if arguments were rewritten. */
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
user *user; /* User associated with this connection. If the
user is set to NULL the connection can do
anything (admin). */
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
uint64_t flags; /* Client flags: CLIENT_* macros. */
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */
int repl_put_online_on_ack; /* Install slave write handler on first ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
char *slave_addr; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate; /* MULTI/EXEC state */
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
blockingState bpop; /* blocking state */
long long woff; /* Last write global replication offset. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
listNode *paused_list_node; /* list node within the pause list */
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
* when the authenticated user
* changes. */
void *auth_callback_privdata; /* Private data that is passed when the auth
* changed callback is executed. Opaque for
* Redis Core. */
void *auth_module; /* The module that owns the callback, which is used
* to disconnect the client if the module is
* unloaded for cleanup. Opaque for Redis Core.*/
/* If this client is in tracking mode and this field is non zero,
* invalidation messages for keys fetched by this client will be send to
* the specified client ID. */
uint64_t client_tracking_redirection;
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
subscribed to in BCAST mode, in the
context of client side caching. */
/* In clientsCronTrackClientsMemUsage() we track the memory usage of
* each client and add it to the sum of all the clients of a given type,
* however we need to remember what was the old contribution of each
* client, and in which categoty the client was, in order to remove it
* before adding it the new value. */
uint64_t client_cron_last_memory_usage;
int client_cron_last_memory_type;
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
Redis 目前的功能非常强大,所以客户端也是有非常多的字段,我们开发者需要掌握的核心字段解析。
字段名称 | 描述 | 备注 |
---|---|---|
fd | -1: 表示伪客户单,不需要进行网络通讯(如 lua 脚本和 loadDataFormartDisk-> createFakeClient)>0: 表示正常通网络通讯,见 client list 命令 | |
flags | 客户端标识,以 CLIENT 开头的宏,常见的 CLIENT_SLAVE, 表示 slave; CLENT_MASTER 表示客户端为 MASTER , CLIETN_BLOCKED 表示客户端处理等待操作(执行某些阻塞操作);CLIENT_MULTI 表示可兑换正在处理事务 CLIENT_FORCE_AOF表示强制执行 AOP(pubsub 时,所有指令都会 aof ) | |
querybuf | 输入缓冲区,用户保存发送的命令,其缓冲区 PROTO_MAX_QUERYBUF_LEN 最大限制 1G, 超过进行关闭(readQueryFromClient) | |
buf、bufpos | 每个客户单都有两个缓冲区,一个是固定的,一个是动态的,buf 为固定输出缓冲区,大小为 PROTD_REPLY_CHUNK_BUTES 16k 通常用来回复端的内容, 如 ok 等, bufpos 用来记录 buf 实际长度,大于 0 比啊是使用 buf (见 writeToClient) | |
reply、reply_bytes | list 类型,动态输出缓冲区 、reply_bytes 表示回链表的总长度 | |
db | redisDb 列欣,表示当前所选的数据库 | |
time 相关 | ctime 创建时间,lastinteraction 最后一次交互时间,用来计算超时;obuf_soft_limit_reached_time 删除缓冲区最后一次达到软限制的时间。(见checkClientOutputBUfferLimits) | |
blockingState | 记录客户端阻塞相关的状态,包括超时时间,引起阻塞的 key 等 | |
mstate | multState 记录当前客户端的事务状态 |
配置文件
-
timeout
,客户端超时事件,默认为 0 ,设置 config set timeout xx -
tcp-keepalive
, 设置 keepalived -
maxclients
, 最大连接数,默认 10000 -
tcp_backlog
, tcp 套接字队列长度,默认为 511 , 见/proc/system/net/core/somaxconn
-
client-output-buffer-limit
,设置缓冲区限制,格式为 client-output-buffer-limitclient-output-buffer-limit
normal 0 0 0client-output-buffer-limit
replica 236mb 64mb 60client-output-buffer-limit
pubs 32mb 8mb 60其中
class
表示客户端类型,超过thread limit
客户端会立即关闭,超过soft limit
并且持续soft seconds
后在挂壁。class 取值:
nornal -> normal clients including MONITOR clients
slave -> slave clents
pubsub -> client subscribed to at least one pubsub chennel or pattern
客户端命令
"CACHING (YES|NO)",
" Enable/disable tracking of the keys for next command in OPTIN/OPTOUT modes.",
"GETREDIR",
" Return the client ID we are redirecting to when tracking is enabled.",
"GETNAME",
" Return the name of the current connection.",
"ID",
" Return the ID of the current connection.",
"INFO",
" Return information about the current client connection.",
"KILL <ip:port>",
" Kill connection made from <ip:port>.",
"KILL <option> <value> [<option> <value> [...]]",
" Kill connections. Options are:",
" * ADDR (<ip:port>|<unixsocket>:0)",
" Kill connections made from the specified address",
" * LADDR (<ip:port>|<unixsocket>:0)",
" Kill connections made to specified local address",
" * TYPE (normal|master|replica|pubsub)",
" Kill connections by type.",
" * USER <username>",
" Kill connections authenticated by <username>.",
" * SKIPME (YES|NO)",
" Skip killing current connection (default: yes).",
"LIST [options ...]",
" Return information about client connections. Options:",
" * TYPE (NORMAL|MASTER|REPLICA|PUBSUB)",
" Return clients of specified type.",
"UNPAUSE",
" Stop the current client pause, resuming traffic.",
"PAUSE <timeout> [WRITE|ALL]",
" Suspend all, or just write, clients for <timout> milliseconds.",
"REPLY (ON|OFF|SKIP)",
" Control the replies sent to the current connection.",
"SETNAME <name>",
" Assign the name <name> to the current connection.",
"UNBLOCK <clientid> [TIMEOUT|ERROR]",
" Unblock the specified blocked client.",
"TRACKING (ON|OFF) [REDIRECT <id>] [BCAST] [PREFIX <prefix> [...]]",
" [OPTIN] [OPTOUT]",
" Control server assisted client side caching.",
"TRACKINGINFO",
" Report tracking status for the current connection."
1、client list 查看客户端的信息
127.0.0.1:6379> client list
id=4 addr=127.0.0.1:61349 fd=8 name= age=4 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=32742 argv-mem=10 obl=0 oll=0 omem=0 tot-mem=62490 events=r cmd=client user=default
flags 表示客户端类型 N-表示普通客户端, M 表示 master
-
obl 表示固定缓冲区的长度
-
oll 表示动态缓冲区长度
-
omeom 代表使用的字节数
-
events 表示事件类型(r/w)
-
cmd 记录最后一次执行的命令
具体计算过程,见函数 catClientInfoString
2、info clients 查看所有的客户端
127.0.0.1:6379> info clients
# Clients
connected_clients:1
client_recent_max_input_buffer:16
client_recent_max_output_buffer:0
blocked_clients:0
tracking_clients:0
clients_in_timeout_table:0
connected_clients
: 代表当前 Redis 节点的客户端连接数,需要重点监控,一旦超过 maxclients , 新的客户单连接会被拒绝。client_longest_ouput_list
: 当前所有输出缓冲区中队列对象个数的最大值。client_biggest_input_buf
: 当前所有输入缓冲区中占用的最大容量。blocked_clients
: 正在执行阻塞命令(例如:blpop
、brpop
、brpoplpush
) 的客户端个数
客户端关闭
1、调用 client kill 命令;
2、不符合规范的命令;
3、客户端超时间;
4、输入缓冲区超过阈值 1G、受参数 RedisServer
的 clent_max_querybuf_len
控制。
由于redis是单线程的,所以他不可能一直循环来检测客户端。其中客户端超时检测是在serverCron
的clientsCron
中进行,serverCron
是一个周期函数,每100ms执行一次。server.hz
表示serverCron
函数的调用频率,默认为10。 clientsCron函数中为了每秒钟都能循环一次所有客户端,所以每次循环次数为iterations = numclients/server.hz
。如果客户端多的话,可能会导致redis主线程的阻塞。因此,5.0引入了动态hz,见配置文件dynamic-hz,默认打开。
参考资料
- 《Redis 设计与实现》黄健宏
转载自:https://juejin.cn/post/7069290229928034341