《Redis Cookbook》

Warning

本项目正在进行中,如果有任何条目是空白或者不完全,都是可以理解的。

目录(以第一个字的中文拼音排序):

h

缓存(cache)

应用

缓存用于保存需要大量计算的操作结果,或者需要快速访问的数据。

比如一个数量庞大的排序,一个耗时的搜索,网站主页内容,等等。

定义

最简单的缓存实现一般只有两个操作,一个是set,另一个是get

更新缓存有两种常用策略:

  1. 手动更新
  2. 自动过期

通常我们可以在数据更新的时候手动更新缓存,比如每当有新数据插入的时候,我们就执行一次set,让新缓存覆盖旧缓存。

另一方面,我们可以用expire操作给缓存设定一个过期时间,当缓存过期时它被自动删除,然后在每次查找数据的时候查看缓存是否存在,如果缓存不存在则重建缓存,并为缓存设定过期时间,然后返回结果;如果缓存存在,则直接取出缓存里的结果并返回。

缓存的删除操作delete通常是可有可无的,其一是因为你可以直接用set覆盖旧缓存,其二是可以让缓存自动过期,所以通常不需要删除缓存,只是偶尔在调试的时候会用上。

实现

缓存可以用Redis的string_structhash_struct来实现。

字符串实现的优点是可以为每个缓存分别设置过期时间,缺点是比哈希表实现占用更多的空间。

而在哈希表实现中,每个哈希表只能共享同一个过期时间(也即是,放在同一个哈希表中的所有缓存会同时过期)。但是你可以利用这一特点为缓存分类,比如你可以将所有排序操作的缓存放到名为sort_cache的哈希表中,而将所有搜索操作的缓存放到名为search_cache的哈希表中,然后分别为sort_cachesearch_cache设置不同的过期时间。

并且哈希表实现比字符串实现更节省空间。

See also

我们通常不对缓存的数量进行限制,如果你需要限制缓存的数量(比如只允许最多100个缓存),请参考日志(log)

如果你需要实现一些复杂的缓存算法,比如Most Recently Used(MRU)Least Recently Used(LRU)请使用sorted_set_struct

关于哈希表比字符串更节约空间的讨论,请参考Redis官方的Memory optimization文档

字符串实现

# file: ./h/cache/string_implement.py

from redis import Redis

def set(name, value, ttl=None, client=Redis()):
    if ttl:
        client.setex(name, value, ttl)
    else:
        client.set(name, value)

def get(name, client=Redis()):
    return client.get(name)

def delete(name, client=Redis()):
    client.delete(name)


# test:
if __name__ == "__main__":

    from time import sleep

    key = 'phone'
    value = '10086'
    expire_time = 3

    set(key, value)
    assert get(key) == value 

    delete(key)
    assert get(key) == None 

    set(key, value, expire_time)
    assert get(key) == value 

    sleep(expire_time * 2)
    assert get(key) == None 

哈希表实现

哈希表实现比字符串实现提供更多功能,因此也相对复杂一些。

我们用category参数给缓存分类,并增加expire操作来设置整个哈希表的过期时间,ttl函数返回哈希表的剩余生存时间,size则返回给定类型的分类缓存的数量。

# file: ./h/cache/hash_implement.py

from redis import Redis

def set(category, name, value, client=Redis()):
    client.hset(category, name, value)

def get(category, name, client=Redis()):
    return client.hget(category, name)

def delete(category, name, client=Redis()):
    client.hdel(category, name)

def expire(category, ttl, client=Redis()):
    client.expire(category, ttl)

def ttl(category, client=Redis()):
    return client.ttl(category)

def size(category, client=Redis()):
    return client.hlen(category)


# test:
if __name__ == "__main__":

    from time import sleep

    category = 'greet'
    key = 'morning'
    value = 'good morning!'
    expire_time = 3

    set(category, key, value)
    assert get(category, key) == value
    assert size(category) == 1

    delete(category, key)
    assert get(category, key) == None
    assert size(category) == 0

    set(category, key,value)
    expire(category, expire_time)
    assert ttl(category) != None

    sleep(expire_time * 2)
    assert get(category, key) == None

实例:用Python装饰器为函数加上缓存

Python中有一个方便好用的特性,就是它的装饰器(decorator)机制,可以无缝地为特定的函数加上新的功能。

我们可以将装饰器、函数和我们的缓存实现三者集合起来,为指定的函数提供方便且通用的缓存机制。

比如现在有一个函数search,这个搜索非常耗时,所以我们想给它加上个缓存,我们利用装饰器cache,为search加上缓存机制。

@cache
def search(key):
    pass

这样,search函数就会在每次执行时查找缓存,如果缓存不命中,就执行一次搜索,将结果保存到缓存并返回。如果搜索命中,则直接返回缓存作为结果。

以下就是cache装饰器的实现方法:

# file: ./h/cache/example.py

from functools import wraps
from string_implement import set, get

def make_unique_id(function, args, kwargs):
    return function.__name__ + repr(args) + repr(kwargs)

def cache(function):
    @wraps(function)
    def _(*args, **kwargs):
        id = make_unique_id(function, *args, **kwargs)
        cache = get(id)

        if cache:
            return cache
        else:
            result = function(*args, **kwargs)
            set(result)
            return result
    return _

J

计数器(counter)

应用

计数器一般用于访问量、下载量、投票数等各种计数用途,和自增唯一id(autoincrementing unique identifier)不同的是,计数器的值不但可以被增加,还可以被清零(比如发现有作弊行为),或者被减少(比如部分计数无效),所以计数器生成的值也不是唯一的。

定义

一个计数器,至少应该拥有以下四个操作:

  1. 增加数值
  2. 减少数值
  3. 清零
  4. 查看当前数值

实现

计数器可以用以下两种方式实现:

  1. String类函数,INCRINCRBYDECRDECRBY,还有GETSET
  2. Hash类函数,HINCRBYHSETHGET

String实现

# coding:utf-8

# file: ./j/counter/string_implement.py

from redis import Redis

INITIAL_VALUE = 0

class Counter:
    
    def __init__(self, name, client=Redis()):
        self.name = name
        self.client = client

    def incr(self, increment=1):
        # redis-py 用 incr 代替 incrby,所以可以指定增量
        value = self.client.incr(self.name, increment)
        return int(value)

    def decr(self, decrement=1):
        # redis-py 用 decr 代替 decrby,所以可以指定减量
        value = self.client.decr(self.name, decrement)
        return int(value)

    def set(self, value):
        self.client.set(self.name, value)

    def get(self):
        value = self.client.get(self.name)
        return INITIAL_VALUE if value is None else int(value)

    def reset(self):
        self.set(INITIAL_VALUE)

Hash实现

Hash实现和String实现稍有不同,Hash实现还需提供一个key作Hash的键。另外,Hash只有HINCRBY而没有HDECRBY命令,但是我们可以通过代码0-decrement将负数作为“增量”,传入HINCRBY命令,来达到做减法的效果。

# file: ./j/counter/hash_implement.py

from redis import Redis

INITIALI_VALUE = 0
KEY = 'counter'

class Counter:
    
    def __init__(self, field, client=Redis(), key=KEY):
        self.key = key
        self.field = field
        self.client = client

    def incr(self, increment=1):
        value = self.client.hincrby(self.key, self.field, increment)
        return int(value)

    def decr(self, decrement=1):
        value = self.client.hincrby(self.key, self.field, 0-decrement)
        return int(value)

    def set(self, value):
        self.client.hset(self.key, self.field, value)

    def get(self):
        value = self.client.hget(self.key, self.field)
        return INITIALI_VALUE if value is None else int(value)

    def reset(self):
        self.set(INITIALI_VALUE)

r

日志(log)

应用

适用于各种单纯记录用途,要求按写入先后排序,但不必严格按时间排序的文本,比如事件日志、服务器日志、博客文章,等等。

See also

如果你需要按时间作参数排序或处理你的条目,可以使用时间线(timeline)

定义

一个日志对象通常需要写入、读出以及截断操作。

写入操作append只是单纯将条目追加到日志后面,读出操作read可以指定一个区间(比如最新100条,最旧200条等),或是用一个迭代器逐条读取。

length操作查看日志的条目数量。

截断功能keep接受一个数值,用来保证日志文件不会太大,比如只保留最新100条,只保留最新500条,等等。有时候还需要清空整个日志,所以clear方法也是有必要的。

条目的位置一般来说是无关紧要的,比如你通常不需要知道第101个条目的内容,所以不需要下标索引类(index)方法。

实现

日志可以用Redis的list_struct来实现,LPUSHRPUSH实现写入,LRANGE实现读出,LTRIM负责截断。

因为list_struct结构的特性,新条目默认总是被追加到末尾(最左边或最右边),所以你不需要向日志提供时间值作参数,尽管时间值常常是日志内容的一部分。

在这个例子中,我们将最新的日志条目追加到列表最左边(使用LPUSH),而旧的日志条目则被“挤”到右边的。

# coding:utf-8

# file: ./r/log/list_implement.py

from redis import Redis

# Redis列表的边界下标
LEFTMOST = 0
RIGHTMOST = -1

class Log:
    
    def __init__(self, name, client=Redis()):
        self.name = name
        self.client = client

    def append(self, content):
        return self.client.lpush(self.name, content)

    def read(self, start=LEFTMOST, stop=RIGHTMOST):
        return self.client.lrange(self.name, start, stop)

    def length(self):
        return self.client.llen(self.name)

    def clear(self):
        # 因为del是Python的保留字
        # 所以redis-py用delete代替del命令
        self.client.delete(self.name)

    def keep(self, size):
        # 只保留log[0:size-1]范围内的条目
        self.client.ltrim(self.name, LEFTMOST, size-1)

s

时间线(timeline)

应用

在有些应用程序中,数据通常带有一个时间值,程序以时间为单位操作数据(可以是一个时间范围,或者是单独的一个时间点):比如求一个博客在6月至8月的所有日志,或者是今天写博客的文章数。

还有一些微博客,只显示最近两天的内容,等等。

定义

一个时间线对象最少有两个属性

  1. 时间值
  2. 数据内容

针对时间线对象的操作通常都是一些范围型的操作:比如求某个时间点起到另一个时间点内的所有数据,或者是统计某个时间点起到另一个时间点内的数据数目,等等。

实现

在Redis中我们可以用sorted_set_struct表示时间线。

当增加一个新条目时,我们将条目内容作为有序集元素的member参数,使用当前时间的UnixTime格式,作为有序集元素的score值。

比如一条在2011年8月22日时47分7秒发出的信息,会被储存为:

ZADD tweet 1313981227.681918 "hello my friend"

其中tweet是时间线的key1313981227.681918UnixTime格式的UTC时间,而"hello my friend"则是条目内容。

这样一来,就可以用ZREVRANGE进行按条目数读取(比如读出最新10条数据),使用ZREVRANGEBYSCORE进行时间范围型的读取操作(比如读出2011年8月20日到2011年8月22日的所有数据),用ZREM对单条数据进行删除,用ZREMRANGEBYRANKZREMRANGEBYSCORE进行时间范围型的删除操作。

# coding: utf-8

# file: ./s/timeline/sorted_set_implement.py

from redis import Redis
from time import time

# Redis有序集边界
LEFTMOST = 0
RIGHTMOST = -1

class Timeline:

    def __init__(self, name, client=Redis()):
        self.name = name
        self.client = client

    def append(self, content):
        # time()函数生成当前时间的unixtime值
        self.client.zadd(self.name, score=time(), value=content)

    def range(self, start=LEFTMOST, stop=RIGHTMOST, display_time=False):
        # 使用zrevrange命令,读出最新的数据
        return self.client.zrevrange(self.name, start, stop, withscores=display_time)

    def range_between_time(self, min, max, display_time=False):
        # min和max也必须是unixtime值
        # 注意zrevrangebyscore命令参数的摆放是max先而min后
        return self.client.zrevrangebyscore(self.name, max, min, withscores=display_time)

    def delete(self, content):
        return self.client.zrem(self.name, content)

    def delete_between_time(self, min, max):
        # min和max也必须是unixtime值
        return self.client.zremrangebyscore(self.name, min=min, max=max)

    def length(self):
        return self.client.zcard(self.name)

锁(Lock)

应用

定义

实现

t

TAG系统

tag在互联网应用里尤其多见,如果以传统的关系型数据库来设计有点不伦不类。我们以查找书的例子来看看redis在这方面的优势。

关系型数据库的设计

两张表,一张book的明细,一张tag表,表示每本书的tag ,一本书可以有多个tag

mysql> select * from book;
+------+-------------------------------+----------------+
| id   | name                          | author         |
+------+-------------------------------+----------------+
|    1 | The Ruby Programming Language | Mark Pilgrim   |
|    1 | Ruby on rail                  | David Flanagan |
|    1 | Programming Erlang            | Joe Armstrong  |
+------+-------------------------------+----------------+
mysql> select * from tag;
+---------+---------+
| tagname | book_id |
+---------+---------+
| ruby    |       1 |
| ruby    |       2 |
| web     |       2 |
| erlang  |       3 |
+---------+---------+

假如有如此需求,查找即是ruby又是web方面的书籍,如果以关系型数据库会怎么处理?

select b.name, b.author  from tag t1, tag t2, book b
where t1.tagname = 'web' and t2.tagname = 'ruby' and t1.book_id = t2.book_id and b.id = t1.book_id

tag表自关联2次再与book关联,这个sql还是比较复杂的,如果要求即ruby,但不是web方面的书籍呢?

关系型数据其实并不太适合这些集合操作。

REDIS的设计

首先book的数据肯定要存储的,和上面一样。

SET book:1:name     ”The Ruby Programming Language”
SET book:2:name     ”Ruby on rail”
SET book:3:name     ”Programming Erlang”

SET book:1:author     ”Mark Pilgrim”
SET book:2:author     ”David Flanagan”
SET book:3:author     ”Joe Armstrong”

tag表我们使用集合来存储数据,因为集合擅长求交集、并集

SADD tag:ruby 1
SADD tag:ruby 2
SADD tag:web 2
SADD tag:erlang 3

那么,即属于ruby又属于web的书?

inter_list = redis.sinter("tag.web", "tag:ruby")

即属于ruby,但不属于web的书?

inter_list = redis.sdiff("tag.ruby", "tag:web")

属于ruby和属于web的书的合集?

inter_list = redis.sunion("tag.ruby", "tag:web")

简单到不行阿。

从以上2个例子可以看出在某些场景里,关系型数据库是不太适合的,你可能能够设计出满足需求的系统,但总是感觉的怪怪的,有种生搬硬套的感觉。

尤其登录系统这个例子,频繁的为业务建立索引。放在一个复杂的系统里,ddl(创建索引)有可能改变执行计划。导致其它的sql采用不同的执行计划,业务复杂的老系统,这个问题是很难预估的,sql千奇百怪。要求DBA对这个系统里所有的sql都了解,这点太难了。这个问题在oracle里尤其严重,每个DBA估计都碰到过。对于MySQL这类系统,ddl又不方便(虽然现在有online ddl的方法)。碰到大表,DBA凌晨爬起来在业务低峰期操作,这事我没少干过。而这种需求放到redis里就很好处理,DBA仅仅对容量进行预估即可。

未来的OLTP系统应该是kv和关系型的紧密结合。

x

序列化(Serialization)

y

用户登录系统

记录用户登录信息的一个系统, 我们简化业务后只留下一张表。

关系型数据库的设计

mysql> select * from login;
+---------+----------------+-------------+---------------------+
| user_id | name           | login_times | last_login_time     |
+---------+----------------+-------------+---------------------+
|       1 | ken thompson   |           5 | 2011-01-01 00:00:00 |
|       2 | dennis ritchie |           1 | 2011-02-01 00:00:00 |
|       3 | Joe Armstrong  |           2 | 2011-03-01 00:00:00 |
+---------+----------------+-------------+---------------------+

user_id表的主键,name表示用户名,login_times表示该用户的登录次数,每次用户登录后,login_times会自增,而last_login_time更新为当前时间。

REDIS的设计

关系型数据转化为KV数据库,我的方法如下:

key 表名:主键值:列名

value 列值

一般使用冒号做分割符,这是不成文的规矩。比如在php-admin for redis系统里,就是默认以冒号分割,于是user:1user:2等key会分成一组。于是以上的关系数据转化成kv数据后记录如下:

SET login:1:login_times 5
SET login:2:login_times 1
SET login:3:login_times 2

SET login:1:last_login_time 2011-1-1
SET login:2:last_login_time 2011-2-1
SET login:3:last_login_time 2011-3-1

SET login:1:name ”ken thompson“
SET login:2:name “dennis ritchie”
SET login:3:name ”Joe Armstrong“

这样在已知主键的情况下,通过GETset就可以获得或者修改用户的登录次数和最后登录时间和姓名。

一般用户是无法知道自己的id的,只知道自己的用户名,所以还必须有一个从nameid的映射关系,这里的设计与上面的有所不同。

SET "login:ken thompson:id"      1
SET "login:dennis ritchie:id"    2
SET "login: Joe Armstrong:id"    3

这样每次用户登录的时候业务逻辑如下(python版),r是redis对象,name是已经获知的用户名。

#获得用户的id
uid = r.get("login:%s:id" % name)

#自增用户的登录次数
ret = r.incr("login:%s:login_times" % uid)

#更新该用户的最后登录时间
ret = r.set("login:%s:last_login_time" % uid, datetime.datetime.now())

如果需求仅仅是已知id,更新或者获取某个用户的最后登录时间,登录次数,关系型和kv数据库无啥区别:一个通过btree pk,一个通过hash,效果都很好。

假设有如下需求,查找最近登录的N个用户。开发人员看看,还是比较简单的,一个sql搞定:

select * from login order by last_login_time desc limit N

DBA了解需求后,考虑到以后表如果比较大,所以在last_login_time上建个索引。执行计划从索引leafblock 的最右边开始访问N条记录,再回表N次,效果很好。

过了两天,又来一个需求,需要知道登录次数最多的人是谁。同样的关系型如何处理?DEV说简单:

select * from login order by login_times desc limit N

DBA一看,又要在login_time上建立一个索引。有没有觉得有点问题呢,表上每个字段上都有素引。

关系型数据库的数据存储的的不灵活是问题的源头,数据仅有一种储存方法,那就是按行排列的堆表。统一的数据结构意味着你必须使用索引来改变sql的访问路径来快速访问某个列的,而访问路径的增加又意味着你必须使用统计信息来辅助,于是一大堆的问题就出现了。

没有索引,没有统计计划,没有执行计划,这就是kv数据库。

redis里如何满足以上的需求呢?对于求最新的N条数据的需求,链表的后进后出的特点非常适合。我们在上面的登录代码之后添加一段代码,维护一个登录的链表,控制他的长度,使得里面永远保存的是最近的N个登录用户。

#把当前登录人添加到链表里
ret = r.lpush("login:last_login_times", uid)

#保持链表只有N位
ret = redis.ltrim("login:last_login_times", 0, N-1)

这样需要获得最新登录人的id,如下的代码即可:

last_login_list = r.lrange("login:last_login_times", 0, N-1)

另外,求登录次数最多的人,对于排序,积分榜这类需求,sorted_set_struct非常的适合,我们把用户和登录次数统一存储在一个sorted_set_struct里。

ZADD login:login_times 5 1
ZADD login:login_times 1 2
ZADD login:login_times 2 3

这样假如某个用户登录,额外维护一个sorted_set_struct,代码如下:

#对该用户的登录次数自增1
ret = r.zincrby("login:login_times", 1, uid)

那么如何获得登录次数最多的用户呢?逆序排列取的排名第N的用户即可:

ret = r.zrevrange("login:login_times", 0, N-1)

可以看出,DEV需要添加2行代码,而DBA不需要考虑索引什么的。

z

自增唯一id(autoincrementing unique identifier)

应用

自增唯一id最常见的应用就是作为关系型数据库的主键,因为主键必须确保每个数据项都有唯一id。

它也可以在不支持自增唯一id的数据库中(比如MongoDB)用来替代唯一id(uniqueidentifier,通常是一个哈希值),为用户提供更好的URL:比如将/topic/4e491e229f328b0cd900010d修改为/topic/10086

定义

一个自增唯一id对象最重要的是保证值的唯一性,要做到这一点,自增id的自增incr操作必须是一个原子操作,它应该能在一个原子时间内完成以下两件事:

  1. 增加id值
  2. 返回当前id值

并且它也没有减法decr和清零reset等操作,因为这些操作破坏了唯一性。

get操作一般只用于内部检查,比如观察值是否溢出,但在一般情况下,自增唯一id对象应该只有一个incr操作。

See also

如果你需要一个非唯一的,带incrresetdecr等操作的计数对象,请参考计数器(counter)

实现

自增唯一id可以用以下两种方式实现:

  1. String类函数: INCRGET
  2. Hash类函数:HINCRBYHGET

String实现

# file: ./z/auid/string_implement.py

from redis import Redis

INITIAL_VALUE = 0

class Auid:
    
    def __init__(self, name, client=Redis()):
        self.name = name
        self.client = client

    def incr(self):
        value = self.client.incr(self.name)
        return int(value)

    def get(self):
        value = self.client.get(self.name)
        return INITIAL_VALUE if value is None else int(value)

Hash实现

# file: ./z/auid/hash_implement.py

from redis import Redis

INITIA_VALUE = 0
INCREMENT = 1
KEY = 'auid'

class Auid:

    def __init__(self, name, client=Redis(), key=KEY):
        self.key = key
        self.name = name
        self.client = client

    def incr(self):
        value = self.client.hincrby(self.key, self.name, INCREMENT)
        return int(value)

    def get(self):
        value = self.client.hget(self.key, self.name)
        return INITIA_VALUE if value is None else int(value)

关于Redis cookbook

本项目的目标是展示Redis数据库的各种用法。

贡献本项目

如果你有任何关于Redis的新“菜式”,或者对现有“菜式”有更好的实现方法和见解,你可以通过发表issue或者直接提交代码来贡献本项目。

本项目正文是使用sphinx格式写成的,关于sphinx格式,具体可以参考sphinx的官方网站。不熟悉sphinx格式的朋友,可以直接联系本人,通过邮件或其他方式提交你的菜谱(转换格式之类的苦差就交给我吧)。

本项目菜谱实现主要使用Python语言,客户端使用redis-py,但也欢迎其他任何编程语言。

为了保持程序的正确性和严谨性,请在提交代码之前测试,并在提交代码时连测试一并提交。

贡献者名单

hotteran ruan撰写了“TAG系统”和“用户登录系统”两个菜式。