用tokio的异步通道实现一个并发的哈希表

原文约4300字,阅读约需11分钟。发表于:

把tokio-mini-redis键值数据处理这部分抽象出来成了一个并发的哈希表,可以稍微加深一下redis的理解。 思路上就是把每一个get/insert之类的操作抽象成枚举命令(Command),每次解析完消息后,就向核心协程发送命令,这个所谓的核心协程是单线程处理数据的,同时也发送一个回调通道,在tokio-mini-redis里用的是tokio官方的tokio::sync::oneshot一次性通道的发送端,等待核心进行有序的返回结果。 对于一个简单mini-redis来说,不考虑其他权限功能、消息解析、淘汰策略,单单只是数据的crud,每秒可以承受远超过百万次的操作,作为内存级别的缓存是完全够用的。 本文意在抽象为一个并发的数据结构,所以用泛型简单封装了一下,其中为了避免繁杂的生命周期标记,泛型里去掉了引用而采用克隆的形式进行数据传递,所以键值都受到克隆Clone、跨线程传输Sync、静态生命周期'static的约束,另外核心部分是哈希表,所以键必然会受到哈希和比较Hash + Eq的约束。 当然,tokio-mini-redis直接使用了bytes::Bytes作为存储类型,也不需要考虑复杂的泛型约束以及生命周期标注的问题。 方法实现上,主要实现了insert、get、remove、len等基础方法,同时也实现了元素快照items的方法,基本上可以说,只要实现了以上五种方法,就可以实现几乎所有标准库里的其他方法了。 代码如下: // 命令枚举,理论上每个命令都会对应一个结果 // 和标准库hashmap不同的是,所有命令都避开了引用形式 enum Command<K, V> { Insert { key: K, value: V }, InsertResult { result: Option<V> }, Get { key: K }, GetResult { result: Option<V> }, Remove { key: K }, RemoveResult { result: Option<V> }, Items, ItemsResult { result: Vec<(K, V)> }, Len, LenResult { len: usize }, } // 引入tokio的无限容量异步通道和一次性通道 use tokio::{ self, sync::{ mpsc::{ unbounded_channel, UnboundedSender }, oneshot::{ channel, Sender }, } }; // 核心数据结构,以通道发送端的形式来存储基础数据结构,整体需要实现Clone特性 #[derive(Clone)] struct ChanHashMap<K, V> { sender: UnboundedSender<(Command<K, V>, Sender<Command<K, V>>)> } // 哈希trait是需要单独引入的,其他trait已经自动引入可以直接使用 use std::collections::HashMap; use std::hash::Hash; // 为核心数据结构撰写方法,为泛型键值提供必要的约束 impl<K, V> ChanHashMap<K, V> where K: Hash + Eq + Clone + Send + 'static, V: Clone + Send + 'static { fn new() -> Self { // 初始化单线程的哈希表作为核心存储数据结构 let mut database = HashMap::new(); // 初始化公共的输入输出异步通道,让其他命令可以通过此通道传输到主协程 let (in_sender, mut in_receiver) = unbounded_channel::<(Command<K, V>, Sender<Command<K, V>>)>(); // 启动一个协程(单线程)处理命令 tokio::spawn(async move { // 其他协程通过传输命令的形式传进while循环中进行单线程处理 while let Some((cmd, out_sender)) = in_receiver.recv().await { match cmd { // 进行命令枚举匹配,并通过out_sender发送回原本的协程 Command::Insert { key, value } => { let result = database.insert(key, value); let _ = out_sender.send(Command::InsertResult{ result }); } Command::Get { key } => { let result = match database.get(&key) { // std的hashmap.get获得是值引用,在并发情景下改为克隆传输,能够适用于绝大部分基础数据类型 Some(value) => Some(value.clone()), None => None, }; let _ = out_sender.send(Command::GetResult{ result }); } Command::Remove { key } => { let result = database.remove(&key); let _ = out_sender.send(Command::RemoveResult{ result }); } Command::Items => { // 通过快照的形式获取全部键值对,并存成数组返回 let result = database.iter().map(|(k, v)| (k.clone(), v.clone())).collect::<Vec<_>>(); let _ = out_sender.send(Command::ItemsResult{ result }); } Command::Len => { let _ = out_sender.send(Command::LenResult{ len: database.len() }); } _ => todo!() } } }); // 把初始化的通道发送端装入基础数据结构中,方便多线程状态下克隆拷贝 Self { sender: in_sender } } async fn insert(&mut self, key: K, value: V) -> Option<V> { // 初始化一次性通道 let (sender, receiver) = channel(); // 将命令和一次性通道的发送端发送到处理数据的主协程之中 let _ = self.sender.send((Command::Insert { key, value }, sender)); // 一次性通道的接收端等待处理结果并输出结果 match receiver.await.unwrap() { Command::InsertResult{ result } => result, _ => unreachable!() } } async fn get(&mut self, key: K) -> Option<V> { let (sender, receiver) = channel(); let _ = self.sender.send((Command::Get { key }, sender)); match receiver.await.unwrap() { Command::GetResult{ result } => result, _ => unreachable!() } } async fn remove(&mut self, key: K) -> Option<V> { let (sender, receiver) = channel(); let _ = self.sender.send((Command::Remove { key }, sender)); match receiver.await.unwrap() { Command::RemoveResult{ result } => result, _ => unreachable!() } } async fn items(&mut self) -> Vec<(K, V)> { let (sender, receiver) = channel(); let _ = self.sender.send((Command::Items , sender)); match receiver.await.unwrap() { Command::ItemsResult{ result } => result, _ => unreachable!() } } async fn len(&mut self) -> usize { let (sender, receiver) = channel(); let _ = self.sender.send((Command::Len, sender)); match receiver.await.unwrap() { Command::LenResult{ len } => len, _ => unreachable!() } } } 单机百万次操作大约在370ms-500ms这样,如果将异步通道换成crossbeam推荐的async-channel性能大约还能优化20-30%,不过一次性通道却没有更好的替代品,第三方异步库的性能基本都不如tokio自带的oneshot。 放在redis这个情境中,加上tcp-io以及RESP协议消息解析的性能损耗,最终每秒操作次数大约在10^5数量级也是比较符合认知的。

tokio-mini-redis将键值数据处理抽象成一个并发的哈希表,实现了insert、get、remove、len等基础方法,可以承受百万次操作,每秒操作次数大约在10^5数量级。

相关推荐 去reddit讨论