Redis与Java – 数据结构

Redis(REmote DIctionary Server) is an open source (BSD licensed), in-memory data structure store, used as databasecache andmessage broker. It supports data structures such as stringshasheslistssetssorted sets with range queries, bitmaps, hyperloglogs and geospatial indexes with radius queries. Redis has built-in replicationLua scriptingLRU evictiontransactions and different levels of on-disk persistence, and provides high availability via Redis Sentinel and automatic partitioning with Redis Cluster.

Redis是一个开源、高性能、基于内存数据结构的Key-Value缓存/存储系统. 提供多种键值数据类型(String, Hash, List, Set, Sorted Set)来适应不同场景下的存储需求.同时Redis的诸多高级功能可以胜任消息队列 、任务队列数据库等不同的角色(主页: redis.io, 中文: redis.cn, 命令: redisfans.com ).

为什么使用 Redis及其产品定位


Install

Redis没有其他外部依赖, 编译安装过程非常简单.

  • 编译安装
    • wget http://download.redis.io/releases/redis-3.0.5.tar.gz
    • make(32位机器:make 32bit)
    • make test
    • make PREFIX=${redis-path} install
      安装完成后,在${redis-path}/bin/下生成如下二进制文件:
工具 描述
redis-server 服务端
redis-cli 客户端
redis-benchmark Redis性能测试工具
redis-check-aof AOF文件修复工具
redis-check-dump RDB文件检测工具
redis-sentinel Sentinel服务器(仅在2.8之后)
  • 配置
    cp ${redis-3.0.5}/redis.conf ${redis-path}

    注: 使Redis以后台进程的形式运行:
    编辑redis.conf配置文件,设置daemonize yes.

  • 启动
    ${redis-path}/bin/redis-server ./redis.conf
  • 连接
    ${redis-path}/bin/redis-cli连接服务器

    • - h: 指定server地址
    • - p: 指定server端口

基础命令

查询

  • KEYS pattern 查询key
    Redis支持通配符格式: *, ? ,[]:
* 通配任意多个字符
? 通配单个字符
[] 通配括号内的某1个字符
\x 转意符
  • RANDOMKEY 返回一个随机存在的key
  • EXISTS key 判断key是否存在
  • TYPE key 返回key存储类型

更新

  • SET key value 设置一对key-value
  • DEL key [key...] 删除key

    注: 返回真正删除的key数量, 且DEL并不支持通配符.

  • RENAME[NX] key new_key 重命名

    NX: not exists new_key不存在才对key重命名.

  • move key DB 移动key到另外一个DB

    一个Redis进程默认打开16个DB,编号0~15(可在redis.conf中配置,默认为0),使用SELECT n可在多个DB间跳转.


有效期

  • TTL/PTTL key 查询key有效期(以秒/毫秒为单位,默认-1永久有效)

    对于不存在的key,返回-2; 对于已过期/永久有效的key,都返回-1

  • EXPIRE/PEXPIRE key n 设置key有效期
  • PERSIST key 指定永久有效

Strings

字符串Strings是Redis最基本的数据类型,它能存储任何形式的字符串,如用户邮箱/JSON化的对象甚至是一张图片(二进制数据).一个字符串允许存储的最大容量为512MB.
字符串类型也是其他4种数据类型的基础,其他数据类型和字符串的区别从某种角度来说只是组织字符串的形式不同.


常用命令

1. 存/取

SEX key value [EX/PX] [NX/XX]
GET key
    # EX/PX: 设置有效时间 [秒/毫秒].
    # NX/XX: key存在与否

2. 增/减

INCR key    # 指定的key的值加1,并返回加1后的值
DECR key
    ## 1: 当key不存在时, 新建`<key, 0>`再执行`INCR`;
    ## 2: INCR/DECR的范围为64位有符号整数;
    ## 3: Redis包括`INCR`在内的所有命令保证是原子操作(可以不用考虑竞态条件).

实践

<code>存储文章(使用用Jedis) </code>

我们使用**Jedis**客户端连接Redis并存储文章数据(关于本篇博客实践部分的详细场景讲解,可以参考[Redis入门指南][5]一书,在此就不再赘述,下同).

  • 使用Jedis需要在pom.xml中添加如下依赖:
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>${jedis.version}</version>
</dependency>
  • applicationContext.xml

     使用Spring来管理Reids的连接.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd


http://www.springframework.org/schema/context


http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 批量扫描@Component -->
    <context:component-scan base-package="com.fq.redis"/>

    <bean id="jedis" class="redis.clients.jedis.Jedis">
        <constructor-arg name="host" value="aliyun"/>
    </bean>
</beans>
  • DO: Articles文章
/**
 * @author jifang
 * @since 16/3/4 下午5:38.
 */
@Message
public class Articles {

    private String title;

    private String content;

    private String author;

    private Date time;

    // ...
}
  • DAO
public interface ArticlesDAO {

    /* 文章 */
    Long putArticles(Articles articles);

    Articles getArticles(Long postID);
}
@Repository
public class ArticlesDAOImpl implements ArticlesDAO {

    private static final String POSTS_ID = "posts:id";

    private static final String POSTS_DATA = "posts:%s:data";

    @Autowired
    private Jedis jedis;

    @Override
    public Long putArticles(Articles articles) {
        Long id = jedis.incr(POSTS_ID);

        String key = String.format(POSTS_DATA, id);
        // 序列化value
        MessagePack pack = new MessagePack();
        byte[] value;
        try {
            value = pack.write(articles);
        } catch (IOException e) {
            value = new byte[0];
        }

        String result = jedis.set(key.getBytes(), value);
        if (!result.equals("OK")) {
            id = -1L;
            jedis.decr(POSTS_ID);
        }
        return id;
    }

    @Override
    public Articles getArticles(Long id) {
        String key = String.format(POSTS_DATA, id);
        byte[] value = jedis.get(key.getBytes());
        // 反序列化
        MessagePack message = new MessagePack();
        try {
            return message.read(value, Articles.class);
        } catch (IOException e) {
            return new Articles();
        }
    }
}

上面代码使用了SpringMessagePack的部分功能,因此需要在pom.xml中添加如下依赖:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-beans</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-expression</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-test</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.msgpack</groupId>
    <artifactId>msgpack</artifactId>
    <version>${msgpack.version}</version>
</dependency>

拓展

功能 关键词
增/减指定整数 INCREBY/DECY key number
增加指定浮点数 INCREBYFLOAT key number
尾部追加 APPEND key value
获取字符串长度 STRLEN key
同时设置多个键值 MSET key value [key value ...]
同时获得多个键值 MGET key [key ...]
返回旧值并设置新值 GETSET key value
位操作 GETBIT/SETBIT/BITCOUNT/BITOP

Hash

散列Hash类型的键值是一种字典结构, 其存储了字段(filed)和字段值(value)的映射.
但value只能是字符串,不支持其他数据类型, 且一个Hash类型Key键可以包含至多232-1个字段.


常用命令

1. 存取

HSET key field value
HGET key field
HMSET key field value [field value ...]
HMGET key field [value ...]
HGETALL key

HSET不区分插入还是更新,当key不存在时,HSET会自动建立并插入.插入返回1, 更新返回0.

2. 更新

HEXISTS key field           # 判断key下的filed是否存在
HSETNX key field value      # 当field不存在时赋值
HINCRBY key field number    # 增加数字
HDEL key field [field]      # 删除field.

实践

<code>添加存储文章缩略词 </code>

前面使用String存储整篇文章实际上有一个弊端, 如只需要更新文章标题,需要将篇文章都做更新然后存入Redis,费时费力.因此我们更推荐使用Hash来存储文章数据:

这样即使需要为文章新添加字段, 也只需为该Hash再添加一新key即可, 比如<slug, 文章缩略名>.

  • DAO
@Repository
public class ArticlesDAOImpl implements ArticlesDAO {

    private static final String POSTS_ID = "posts:id";

    private static final String POSTS_DATA = "posts:%s";

    @Autowired
    private Jedis jedis;

    @Override
    public Long putArticles(Articles articles) {
        Long id = jedis.incr(POSTS_ID);

        String key = String.format(POSTS_DATA, id);

        Map<String, String> map = new HashMap<>();
        map.put("title", articles.getTitle());
        map.put("content", articles.getContent());
        map.put("author", articles.getAuthor());
        map.put("time", String.valueOf(articles.getTime().getTime()));

        String result = jedis.hmset(key, map);
        if (!result.equals("OK")) {
            id = -1L;
            jedis.decr(POSTS_ID);
        }

        return id;
    }

    @Override
    public Articles getArticles(Long id) {
        String key = String.format(POSTS_DATA, id);
        Map<String, String> map = jedis.hgetAll(key);
        Date time = new Date(Long.valueOf(map.get("time")));

        return new Articles(map.get("title"), map.get("content"), map.get("author"), time);
    }
}

拓展

功能 关键词
值获取字段名 HKEYS key
只获取字段值 HVALS key
获取字段数量 HLEN key

注: 除了Hash, Redis的其他数据类型同样不支持类型嵌套, 如集合类型的每个元素只能是字符串, 不能是另一个集合或Hash等.


List

列表List可以存储一个有序的字符串列表, 其内部使用双向链表实现, 所以向列表两端插入/删除元素的时间复杂度为O(1),而且越接近两端的元素速度就越快.
不过使用链表的代价是通过索引访问元素较慢(详细可参考博客双向循环链表的设计与实现). 一个列表类型最多能容纳232-1个元素.


常用命令

1. 两端压入/弹出

LPUSH/LPUSHX key value [key value ...]
LPOP key
RPUSH/RPUSHX key value [key value ...]
RPOP key

2. 查询

LLEN key
LRANGE key start stop

LLEN命令的时间复杂度为O(1): Reids会保存链表长度, 不必每次遍历统计.

3. 删除

LREM key count value        # count>0:表头删除; count<0:表尾删除; count=0:全部删除
LTRIM key start stop        # 只保留[start,stop)内值

实践

<code>存储文章评论列表 </code>

考虑到评论时需要存储评论的全部数据(姓名/联系方式/内容/时间等),所以适合将一条评论的各个元素序列化为String之后作为列表的元素存储:

  • DO: Comment
@Message
public class Comment {

    private String author;

    private String email;

    private Date time;

    private String content;

    // ...
}
  • DAO
@Repository
public class CommentDAOImpl implements CommentDAO {

    @Autowired
    private Jedis jedis;

    private static final String POSTS_COMMENTS = "posts:%s:comments";

    @Override
    public void addComment(Long id, Comment comment) {
        MessagePack pack = new MessagePack();
        String key = String.format(POSTS_COMMENTS, id);
        byte[] value;
        try {
            value = pack.write(comment);
        } catch (IOException e) {
            value = new byte[0];
        }

        jedis.lpush(key.getBytes(), value);
    }

    @Override
    public List<Comment> getComments(Long id) {
        String key = String.format(POSTS_COMMENTS, id);

        List<byte[]> list = jedis.lrange(key.getBytes(), 0, -1);
        List<Comment> comments = new ArrayList<>(list.size());
        MessagePack pack = new MessagePack();
        for (byte[] item : list) {
            try {
                comments.add(pack.read(item, Comment.class));
            } catch (IOException ignored) {
            }
        }

        return comments;
    }
}

拓展

功能 关键词
获得指定索引元素值 LINDEX key index
设置指定索引元素值 LSET key index value
插入元素 LINSERT key BEFORE|AFTER pivoit value
将元素从一个列表转入另一个列表 RPOPLPUSH source destination
等待[弹出/转移][头/尾]元素 BLPOP/BRPOP/BRPOPLPUSH

RPOPLPUSH是一个很有意思的命令: 先执行RPOP, 再执行LPUSH, 先从source列表右边中弹出一个元素, 然后将其加入destination左边, 并返回这个元素值, 整个过程是原子的.

根据这一特性可将List作为循环队列使用:sourcedestination相同,RPOPLPUSH不断地将队尾的元素移到队首.好处在于在执行过程中仍可不断向队列中加入新元素,且允许多个客户端同时处理队列.


Set

集合Set内的元素是无序且唯一的,一个集合最多可以存储232-1个字符串.集合类型的常用操作是插入/删除/判断是否存在, 由于集合在Redis内部是使用值为空的HashTable实现, 所以这些操作的时间复杂度为O(1), 另外, Set最方便的还是多个集合之间还可以进行并/交/差的运算.


常用命令

1. 增/删

SADD key member [member ...]        #同一个member只会保存第一个
SREM key member [member ...]

2. 查找

SMEMBERS key            # 获得集合中所有的元素
SISMEMBER key           # 判断是否在集合中

3. 集合间运算

SDIF key [key ...]      # 差集
SINTER key [key ...]    # 交集
SUNION key [key ...]    # 并集

实践

<code>1. 存储文章标签 </code>

考虑到一个文章的所有标签都是互不相同的, 且对标签的保存顺序并没有特殊的要求, 因此Set比较适用:

@Repository
public class TagDAOImpl implements TagDAO {

    private static final String POSTS_TAGS = "posts:%s:tags";

    @Autowired
    private Jedis jedis;

    @Override
    public void addTag(Long id, String... tags) {
        String key = String.format(POSTS_TAGS, id);
        jedis.sadd(key, tags);
    }

    @Override
    public void rmTag(Long id, String... tags) {
        String key = String.format(POSTS_TAGS, id);
        jedis.srem(key, tags);
    }

    @Override
    public Set<String> getTags(Long id) {
        String key = String.format(POSTS_TAGS, id);
        return jedis.smembers(key);
    }
}
<code>2. 通过标签搜索文章: 列出某个(或同属于某几个)标签下的所有文章. </code>

在提出这样的需求之后, 前面的posts:[ID]:tags 文章维度的存储结构就不适用了, 因此借鉴索引倒排的思想, 我们使用tags:[tag]:posts这种标签维度的数据结构:

在这种结构下, 根据标签搜索文章就变得不费吹灰之力, 而Set自带交/并/补的支持, 使得多标签文章搜索有也变得十分简单:

@Repository
public class TagDAOImpl implements TagDAO {

    private static final String POSTS_TAGS = "posts:%s:tags";

    private static final String TAGS_POSTS = "tags:%s:posts";

    @Autowired
    private Jedis jedis;

    @Autowired
    private ArticlesDAO aDAO;

    @Override
    public void addTag(Long id, String... tags) {
        String key = String.format(POSTS_TAGS, id);
        if (jedis.sadd(key, tags) != 0L) {
            // 倒排插入
            for (String tag : tags) {
                String rKey = String.format(TAGS_POSTS, tag);
                jedis.sadd(rKey, String.valueOf(id));
            }
        }
    }

    @Override
    public void rmTag(Long id, String... tags) {
        String key = String.format(POSTS_TAGS, id);
        if (jedis.srem(key, tags) != 0L) {
            // 倒排删除
            for (String tag : tags) {
                String rKey = String.format(TAGS_POSTS, tag);
                jedis.srem(rKey, String.valueOf(id));
            }
        }
    }

    @Override
    public Set<String> getTags(Long id) {
        String key = String.format(POSTS_TAGS, id);
        return jedis.smembers(key);
    }

    @Override
    public List<Articles> getArticlesByTag(String tag) {
        // 需要首先由 tags:%s:posts 查出文章ID 列表
        String rKey = String.format(TAGS_POSTS, tag);
        Set<String> ids = jedis.smembers(rKey);
        return idToArticles(ids);
    }

    @Override
    public List<Articles> getArticlesByTagInter(String... tags) {
        String[] keys = new String[tags.length];
        for (int i = 0; i < tags.length; ++i) {
            keys[i] = String.format(TAGS_POSTS, tags[i]);
        }
        Set<String> ids = jedis.sinter(keys);
        return idToArticles(ids);
    }

    @Override
    public List<Articles> getArticlesByTagUnion(String... tags) {
        String[] keys = new String[tags.length];
        for (int i = 0; i < tags.length; ++i) {
            keys[i] = String.format(TAGS_POSTS, tags[i]);
        }
        Set<String> ids = jedis.sunion(keys);
        return idToArticles(ids);
    }

    private List<Articles> idToArticles(Set<String> ids) {
        List<Articles> articles = new ArrayList<>();
        for (String id : ids) {
            articles.add(aDAO.getArticles(Long.valueOf(id)));
        }
        return articles;
    }
}

拓展

功能 关键词
获得集合中元素数 SCARD key
集合运算并将结果存储 SDIFFSTORE/SINTERSTORE/SUNIONSTORE destination key [key ...]
随机获得集合中的元素 SRANDMEMBER key [count]
随机弹出集合中的一个元素 SPOP key

Sorted-Sets

有序集合Sorted-SetsSet基础上为每个元素都关联了一个分数[score],这使得我们不仅可以完成插入/删除和判断元素是否存在等操作,还能够获得与score有关的操作(如score最高/最低的前N个元素、指定score范围内的元素).Sorted-Sets具有以下特点:
1) 虽然集合中元素唯一, 但score可以相同.
2) 内部基于HashTableSkipList实现,因此即使读取中间部分的数据速度也很快(O(log(N))).
3) 可以通过更改元素score值来元素顺序(与List不同).


常用命令

1. 增/删/改

ZADD key score member [score member ...]
        # score还可以是双精度浮点数(+inf/-inf分别代表正无穷/负无穷), 相同元素会覆盖前面的score.
ZREM key member [member ...]
ZREMRANGEBYRANK key start stop
        # 按排名范围删除[start, stop]范围内元素.
ZREMRANGEBYSCORE key start stop
        # 按分数范围删除
ZINCRBY key increment member
        # 增加某个元素的score

2. 查询

ZSCORE key member                       #获得元素分数
ZRANGE key start stop [WITHSCORES]      #获得排名在[start, stop]范围内的元素列表(从小到大, 从0开始)
ZREVRANGE key start stop [WITHSCORES]   # (从大到小)
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]     
        # 获得指定分数范围内的元素(如果不希望包含端点值, 可在分数前加'(').
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count] 
        # 分数从大到小, 且注意min/max颠倒.

实践

<code>实现文章按点击量排序 </code>

要按照文章的点击量排序, 就必须再额外使用一个Sorted-Set类型来实现, 文章ID为元素,以该文章点击量为元素分数.

@Repository
public class BrowseDAOImpl implements BrowseDAO {

    private static final String POSTS_BROWSE = "posts:page.browse";

    @Autowired
    private Jedis jedis;

    @Autowired
    private ArticlesDAO aDAO;

    @Override
    public void addABrowse(Long id) {
        long score = 1L;
        jedis.zincrby(POSTS_BROWSE, score, String.valueOf(id));
    }

    @Override
    public List<Articles> getArticlesByBrowseOrder(Long start, Long end, boolean reverse) {

        Set<String> ids;
        if (!reverse) {
            ids = jedis.zrange(POSTS_BROWSE, start, end);
        } else {
            ids = jedis.zrevrange(POSTS_BROWSE, start, end);
        }

        return idToArticles(ids);
    }

    private List<Articles> idToArticles(Set<String> ids) {
        List<Articles> articles = new ArrayList<>();
        for (String id : ids) {
            articles.add(aDAO.getArticles(Long.valueOf(id)));
        }
        return articles;
    }
}

拓展

功能 关键词
获得集合中的元素数目 ZCARD key
获得指定分数范围内的元素个数 ZCOUNT key min max
获得元素排名 ZRANK/ZREVRANK key member
  • ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
    ZINTERSTORE用来计算多个Sorted-Set的交集并将结果存储在destination, 返回值为destination中的元素个数.

    • AGGREGATE:
      destination中元素的分数由AGGREGATE参数决定:SUM(和/默认), MIN(最小值), MAX(最大值)
    • WEIGHTS
      通过WEIGHTS参数设置每个集合的权重,在参与运算时元素的分数会乘上该集合的权重.
  • ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
    用法类似

实战微博

Key设计技巧

参考以往RDBMS的设计经验:
1. 将表名转换为key前缀, 如user:.
2. 第2段放置用于区分key的字段, 对应于RDBMS中的主键, 如user:[uid]:.
3. 第3段放置要存储的列名, 如user:[uid]:email.


需求

<code>微博MiBlog要实现的功能需求: 1. 用户模块: 注册、登录、新用户列表; 2. 关系模块: 关注、取消关注、已关注列表、粉丝列表、共同关注列表; 3. 微博模块: 发微博、删微博、已发微博列表、已关注人的微博列表、微博动态流(所有微博列表). </code>

设计与实现

1. 用户模块

用户模块数据分3个Key存储: 用户ID由user:count自增生成(String), 用户email与id映射关系由user:email.to.id存储(Hash), 用户真实数据由user:[id]:data存储(Hash):

  • User(domain)
public class User {

    private Long id;

    private String email;

    private String nickname;

    private String password;

    private Long time;

    // ...
}
  • UserDAO
@Repository
public class UserDAOImpl implements UserDAO {

    @Autowired
    private Jedis redis;

    @Override
    public long register(User user) {
        long id = -1;

        // 当前email没有注册过
        if (!redis.hexists(Constant.USER_EMAIL_TO_ID, user.getEmail())) {
            // 为用户生成id
            id = redis.incr(Constant.USER_COUNT);
            // 插入email -> id 对应关系
            redis.hset(Constant.USER_EMAIL_TO_ID, user.getEmail(), String.valueOf(id));

            Map<String, String> map = new HashMap<>();
            map.put(Constant.EMAIL, user.getEmail());
            map.put(Constant.PASSWORD, PasswordUtil.encode(user.getPassword()));
            map.put(Constant.NICKNAME, user.getNickname());
            map.put(Constant.REGIST_TIME, String.valueOf(System.currentTimeMillis()));

            // 写入user:[id]:data
            String key = String.format(Constant.USER_ID_DATA, id);
            redis.hmset(key, map);
        }

        return id;
    }

    @Override
    public boolean login(String email, String password) {
        String id = redis.hget(Constant.USER_EMAIL_TO_ID, email);
        if (!Strings.isNullOrEmpty(id)) {

            String key = String.format(Constant.USER_ID_DATA, id);
            Map<String, String> map = redis.hgetAll(key);

            return PasswordUtil.checkEqual(password, map.get(Constant.PASSWORD));
        }

        return false;
    }

    @Override
    public long getUserId(String email) {
        String id = redis.hget(Constant.USER_EMAIL_TO_ID, email);
        if (!Strings.isNullOrEmpty(id)) {
            return Long.valueOf(id);
        }
        return -1;
    }

    @Override
    public User getUser(long id) {
        String key = String.format(Constant.USER_ID_DATA, id);
        Map<String, String> map = redis.hgetAll(key);
        return Util.mapToSimpleObject(map, User.class);
    }

    @Override
    public List<Long> newUserList(int limit) {
        Long maxId = Long.valueOf((redis.get(Constant.USER_COUNT)));
        Long minId = maxId - (limit - 1);
        if (minId < 1) {
            minId = 1L;
        }

        List<Long> ids = new ArrayList<>((int) (maxId - minId + 1));
        for (Long i = maxId; i >= minId; --i) {
            ids.add(i);
        }
        return ids;
    }
}
  • UserService
@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private UserDAO dao;

    @Override
    public long register(String email, String nickname, String password) {
        return dao.register(new User(null, email, nickname, password, null));
    }

    @Override
    public boolean login(String email, String password) {
        return dao.login(email, password);
    }

    @Override
    public List<User> newUserList(int limit) {
        List<Long> ids = dao.newUserList(limit);
        List<User> users = new ArrayList<>(ids.size());
        for (Long id : ids) {
            users.add(dao.getUser(id));
        }
        return users;
    }
}

2. 关系模块

关系模块数据由2个Key存储: 关注relation:following:[id]存储(Set), 被关注relation:follower:[id]存储(Set):
这样存的优势是既可以快速的查询关注列表, 也可以快速的查询粉丝列表, 而且还可以基于Redis对Set的支持, 做共同关注功能.

  • Relation(domain)
public class Relation {

    private long from;

    private long to;

    // ...
}
  • RelationDAO
@Repository
public class RelationDAOImpl implements RelationDAO {

    @Autowired
    private Jedis redis;

    @Override
    public boolean follow(Relation relation) {
        if (relation.getFrom() != relation.getTo()) {

            // 主动关注
            String following = String.format(Constant.RELATION_FOLLOWING, relation.getFrom());
            Long result1 = redis.sadd(following, String.valueOf(relation.getTo()));

            // 被动被关注
            String follower = String.format(Constant.RELATION_FOLLOWER, relation.getTo());
            Long result2 = redis.sadd(follower, String.valueOf(relation.getFrom()));

            return result1 == 1L && result2 == 1L;
        }

        return false;
    }

    @Override
    public boolean unfollow(Relation relation) {
        if (relation.getFrom() != relation.getTo()) {

            // 取消主动关注
            String following = String.format(Constant.RELATION_FOLLOWING, relation.getFrom());
            Long result1 = redis.srem(following, String.valueOf(relation.getTo()));

            // 取消被动关注
            String follower = String.format(Constant.RELATION_FOLLOWER, relation.getTo());
            Long result2 = redis.srem(follower, String.valueOf(relation.getFrom()));

            return result1 == 1L && result2 == 1L;
        }

        return false;
    }

    @Override
    public List<Long> getFollowings(long id) {
        String following = String.format(Constant.RELATION_FOLLOWING, id);
        Set<String> members = redis.smembers(following);
        return stringToLong(members);
    }

    @Override
    public List<Long> getFollowers(long id) {
        String following = String.format(Constant.RELATION_FOLLOWER, id);
        Set<String> members = redis.smembers(following);
        return stringToLong(members);
    }

    @Override
    public List<Long> withFollowings(long... ids) {
        String[] keys = new String[ids.length];
        for (int i = 0; i < ids.length; ++i) {
            keys[i] = String.format(Constant.RELATION_FOLLOWING, ids[i]);
        }
        Set<String> sids = redis.sinter(keys);
        return stringToLong(sids);
    }

    private List<Long> stringToLong(Set<String> sets) {
        List<Long> list = new ArrayList<>(sets.size());
        for (String set : sets) {
            list.add(Long.valueOf(set));
        }
        return list;
    }
}
  • RelationService
@Service
public class RelationServiceImpl implements RelationService {

    @Autowired
    private RelationDAO rDAO;

    @Autowired
    private UserDAO uDAO;

    @Override
    public boolean follow(long from, long to) {
        return rDAO.follow(new Relation(from, to));
    }

    @Override
    public boolean unfollow(long from, long to) {
        return rDAO.unfollow(new Relation(from, to));
    }

    @Override
    public List<User> getFollowings(long id) {
        List<Long> ids = rDAO.getFollowings(id);
        return idToUser(ids);
    }

    @Override
    public List<User> getFollowers(long id) {
        List<Long> ids = rDAO.getFollowers(id);
        return idToUser(ids);
    }

    @Override
    public List<User> withFollowings(long... ids) {
        return idToUser(rDAO.withFollowings(ids));
    }

    private List<User> idToUser(List<Long> ids) {
        List<User> users = new ArrayList<>();
        for (Long id : ids) {
            users.add(uDAO.getUser(id));
        }
        return users;
    }
}

3. 微博模块

发微博功能我们采用推模式实现: 为每个用户建立一个信箱List, 存储关注的人发的微博, 因此每个用户在发微博时都需要获取自己的粉丝列表, 然后为每个粉丝推送一条微博数据(考虑到一个用户关注的人过多, 因此实际开发中只存最新1000条即可).
由此微博模块数据由4个Key存储: 微博ID由miblog:count自增生成(String), 微博真实数据由miblog:[id]:data存储(Hash), 自己发的微博由miblog:[uid]:my存储(List), 推送给粉丝的微博由miblog:[uid]:flow存储(List):

采用推模式的优势是用户在查看微博时响应迅速, 而且还可实现针对不同用户做定向推荐, 但带来的成本是部分数据冗余以及用户发微博逻辑较复杂导致时间开销较大.因此还可以考虑使用拉模式实现,拉模式节省了发微博的时间陈本, 但用户读取微博的速度会降低, 而且很难做定向推荐.因此在实际开发中最好推拉相结合(详细可参考微博feed系统的推(push)模式和拉(pull)模式和时间分区拉模式架构探讨).

  • MiBlog(domain)
public class MiBlog {

    private Long author;

    private String content;

    private Long time;

    // ...
}
  • MiBlogDAO
@Repository
public class MiBlogDAOImpl implements MiBlogDAO {

    @Autowired
    private Jedis redis;

    @Autowired
    private RelationDAO relationDAO;

    @Override
    public long publish(MiBlog miBlog) {
        // 获得微博ID
        long id = redis.incr(Constant.MI_BLOG_COUNT);

        // 插入微博数据
        Map<String, String> map = new HashMap<>();
        map.put(Constant.AUTHOR, String.valueOf(miBlog.getAuthor()));
        map.put(Constant.TIME, String.valueOf(System.currentTimeMillis()));
        map.put(Constant.CONTENT, miBlog.getContent());
        String dataKey = String.format(Constant.MI_BLOG_DATA, id);
        redis.hmset(dataKey, map);

        // 插入到当前用户已发表微博
        String myKey = String.format(Constant.MI_BLOG_MY, miBlog.getAuthor());
        redis.lpush(myKey, String.valueOf(id));

        // 为每一个自己的粉丝推送微博消息
        // 获得所有粉丝
        List<Long> followers = relationDAO.getFollowers(miBlog.getAuthor());
        for (Long follower : followers) {
            String key = String.format(Constant.MI_BLOG_FLOW, follower);
            redis.lpush(key, String.valueOf(id));
        }

        return id;
    }

    @Override
    public boolean unpublish(long uid, long id) {
        String sId = String.valueOf(id);
        String myKey = String.format(Constant.MI_BLOG_MY, uid);
        // 确实是uid发布的微博
        if (redis.lrem(myKey, 1L, sId) == 1L) {
            // 删除所有粉丝微博
            List<Long> followers = relationDAO.getFollowers(uid);
            for (Long follower : followers) {
                String flowKey = String.format(Constant.MI_BLOG_FLOW, follower);
                redis.lrem(flowKey, 1L, sId);
            }

            // 删除微博数据
            String dataKey = String.format(Constant.MI_BLOG_DATA, id);
            redis.del(dataKey);

            return true;
        }
        return false;
    }

    @Override
    public MiBlog getBlog(long id) {
        String key = String.format(Constant.MI_BLOG_DATA, id);
        Map<String, String> map = redis.hgetAll(key);
        return Util.mapToSimpleObject(map, MiBlog.class);
    }

    @Override
    public List<Long> getMyBlog(long uid) {
        String key = String.format(Constant.MI_BLOG_MY, uid);
        List<String> sids = redis.lrange(key, 0, -1);
        return CollectionUtil.stringToLong(sids);
    }

    @Override
    public List<Long> getFollowingBlog(long uid) {
        String key = String.format(Constant.MI_BLOG_FLOW, uid);
        List<String> sids = redis.lrange(key, 0, -1);
        return CollectionUtil.stringToLong(sids);
    }

    @Override
    public List<Long> getBlogFlow(long uid) {
        List<Long> myList = this.getMyBlog(uid);
        List<Long> flowList = this.getFollowingBlog(uid);
        int myEndIndex = 0;
        for (; myEndIndex < myList.size(); ++myEndIndex) {
            Long my = myList.get(myEndIndex);

            boolean isEnd = true;
            for (int i = 0; i < flowList.size(); ++i) {
                long flow = flowList.get(i);
                if (my > flow) {
                    flowList.add(i, my);
                    isEnd = false;
                    break;
                }
            }
            if (isEnd)
                break;
        }

        // 将所有my < flow的元素填充
        flowList.addAll(myList.subList(myEndIndex, myList.size()));

        return flowList;
    }
}
  • MiBlogService
@Service
public class MiBlogServiceImpl implements MiBlogService {

    @Autowired
    private MiBlogDAO miBlogDAO;

    @Override
    public long publish(long author, String content) {
        return miBlogDAO.publish(new MiBlog(author, content, null));
    }

    @Override
    public boolean unpublish(long uid, long id) {
        return miBlogDAO.unpublish(uid, id);
    }

    @Override
    public List<MiBlog> getMyBlog(long uid) {
        List<Long> ids = miBlogDAO.getMyBlog(uid);
        return idToBlog(ids);
    }

    @Override
    public List<MiBlog> getFollowingBlog(long uid) {
        List<Long> ids = miBlogDAO.getFollowingBlog(uid);
        return idToBlog(ids);
    }

    @Override
    public List<MiBlog> getBlogFlow(long uid) {
        List<Long> ids = miBlogDAO.getBlogFlow(uid);
        return idToBlog(ids);
    }

    private List<MiBlog> idToBlog(List<Long> ids) {
        List<MiBlog> blogs = new ArrayList<>();
        for (Long id : ids) {
            blogs.add(miBlogDAO.getBlog(id));
        }
        return blogs;
    }
}

限于篇幅, 在这儿只列出了最核心的代码, 详细代码可参考Git: 翡青/MiBlog


参考&扩展
微博关系服务与Redis的故事
Twitter如何使用Redis提高可伸缩性
为什么不能用memcached存储Session
Redis Geo: Redis新增位置查询功能
Redis开源文档《Redis设计与实现》发布


相关文章

发表评论

Comment form

(*) 表示必填项

还没有评论。

跳到底部
返回顶部