Redis高级数据结构:超越String的Redis世界
Redis高级数据结构超越String的Redis世界引言Redis不仅仅是一个KV存储它提供了丰富的数据结构是现代应用架构中不可或缺的组件。深入理解Redis的数据结构能够帮助我们设计出更高效、更优雅的解决方案。本文将全面解析Redis的五大基础类型和五种扩展类型。一、String类型及其应用1.1 String基础操作package redis import ( context fmt time github.com/redis/go-redis/v9 ) type StringOperations struct { client *redis.Client } func NewStringOperations(client *redis.Client) *StringOperations { return StringOperations{client: client} } func (r *StringOperations) SetWithExpiry(ctx context.Context, key, value string, expiry time.Duration) error { return r.client.Set(ctx, key, value, expiry).Err() } func (r *StringOperations) Get(ctx context.Context, key string) (string, error) { return r.client.Get(ctx, key).Result() } func (r *StringOperations) MSet(ctx context.Context, values map[string]interface{}) error { return r.client.MSet(ctx, values).Err() } func (r *StringOperations) MGet(ctx context.Context, keys ...string) ([]interface{}, error) { return r.client.MGet(ctx, keys...).Result() } func (r *StringOperations) SetNX(ctx context.Context, key, value string, expiry time.Duration) (bool, error) { return r.client.SetNX(ctx, key, value, expiry).Result() }1.2 分布式锁实现package redis import ( context errors fmt time github.com/redis/go-redis/v9 ) var ( ErrLockNotAcquired errors.New(lock not acquired) ErrLockNotHeld errors.New(lock not held) ) type DistributedLock struct { client *redis.Client key string value string expiry time.Duration } func NewDistributedLock(client *redis.Client, key string, expiry time.Duration) *DistributedLock { return DistributedLock{ client: client, key: key, value: fmt.Sprintf(%d, time.Now().UnixNano()), expiry: expiry, } } func (dl *DistributedLock) Acquire(ctx context.Context) (bool, error) { result, err : dl.client.SetNX(ctx, dl.key, dl.value, dl.expiry).Result() if err ! nil { return false, err } return result, nil } func (dl *DistributedLock) Release(ctx context.Context) error { script : redis.NewScript( if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end ) result, err : script.Run(ctx, dl.client, []string{dl.key}, dl.value).Int64() if err ! nil { return err } if result 0 { return ErrLockNotHeld } return nil } func (dl *DistributedLock) Extend(ctx context.Context, expiry time.Duration) error { script : redis.NewScript( if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(pexpire, KEYS[1], ARGV[2]) else return 0 end ) result, err : script.Run(ctx, dl.client, []string{dl.key}, dl.value, expiry.Milliseconds()).Int64() if err ! nil { return err } if result 0 { return ErrLockNotHeld } return nil }二、Hash类型对象存储2.1 Hash基本操作package redis import ( context encoding/json time github.com/redis/go-redis/v9 ) type HashOperations struct { client *redis.Client } func NewHashOperations(client *redis.Client) *HashOperations { return HashOperations{client: client} } func (r *HashOperations) HSet(ctx context.Context, key string, values map[string]interface{}) error { return r.client.HSet(ctx, key, values).Err() } func (r *HashOperations) HGet(ctx context.Context, key, field string) (string, error) { return r.client.HGet(ctx, key, field).Result() } func (r *HashOperations) HGetAll(ctx context.Context, key string) (map[string]string, error) { return r.client.HGetAll(ctx, key).Result() } func (r *HashOperations) HIncrBy(ctx context.Context, key, field string, incr int64) (int64, error) { return r.client.HIncrBy(ctx, key, field, incr).Result() } func (r *HashOperations) HExists(ctx context.Context, key, field string) (bool, error) { return r.client.HExists(ctx, key, field).Result() }2.2 对象缓存实战package redis import ( context encoding/json fmt time github.com/redis/go-redis/v9 ) type UserCache struct { client *redis.Client prefix string } func NewUserCache(client *redis.Client) *UserCache { return UserCache{ client: client, prefix: user:, } } type User struct { ID int64 json:id Name string json:name Email string json:email Status string json:status CreatedAt int64 json:created_at } func (uc *UserCache) Get(ctx context.Context, userID int64) (*User, error) { key : fmt.Sprintf(%s%d, uc.prefix, userID) data, err : uc.client.HGetAll(ctx, key).Result() if err ! nil { return nil, err } if len(data) 0 { return nil, nil } user : User{ ID: userID, Name: data[name], Email: data[email], Status: data[status], CreatedAt: time.Now().Unix(), } return user, nil } func (uc *UserCache) Set(ctx context.Context, user *User, expiry time.Duration) error { key : fmt.Sprintf(%s%d, uc.prefix, user.ID) fields : map[string]interface{}{ name: user.Name, email: user.Email, status: user.Status, } pipe : uc.client.Pipeline() pipe.HSet(ctx, key, fields) pipe.Expire(ctx, key, expiry) _, err : pipe.Exec(ctx) return err } func (uc *UserCache) Delete(ctx context.Context, userID int64) error { key : fmt.Sprintf(%s%d, uc.prefix, userID) return uc.client.Del(ctx, key).Err() } func (uc *UserCache) UpdateField(ctx context.Context, userID int64, field string, value interface{}) error { key : fmt.Sprintf(%s%d, uc.prefix, userID) return uc.client.HSet(ctx, key, field, value).Err() } func (uc *UserCache) GetField(ctx context.Context, userID int64, field string) (string, error) { key : fmt.Sprintf(%s%d, uc.prefix, userID) return uc.client.HGet(ctx, key, field).Result() } func (uc *UserCache) IncrementCounter(ctx context.Context, userID int64, field string) (int64, error) { key : fmt.Sprintf(%s%d, uc.prefix, userID) return uc.client.HIncrBy(ctx, key, field, 1).Result() }三、List类型队列与栈3.1 阻塞队列实现package redis import ( context encoding/json time github.com/redis/go-redis/v9 ) type BlockingQueue struct { client *redis.Client name string } func NewBlockingQueue(client *redis.Client, name string) *BlockingQueue { return BlockingQueue{ client: client, name: name, } } func (bq *BlockingQueue) Enqueue(ctx context.Context, value interface{}) error { data, err : json.Marshal(value) if err ! nil { return err } return bq.client.LPush(ctx, bq.name, data).Err() } func (bq *BlockingQueue) Dequeue(ctx context.Context, timeout time.Duration) ([]byte, error) { result, err : bq.client.BRPop(ctx, timeout, bq.name).Result() if err ! nil { return nil, err } if len(result) 2 { return nil, nil } return []byte(result[1]), nil } func (bq *BlockingQueue) DequeueWithContext(ctx context.Context) ([]byte, error) { result, err : bq.client.BRPop(ctx, 0, bq.name).Result() if err ! nil { return nil, err } if len(result) 2 { return nil, nil } return []byte(result[1]), nil } func (bq *BlockingQueue) Length(ctx context.Context) (int64, error) { return bq.client.LLen(ctx, bq.name).Result() }3.2 延迟队列实现package redis import ( context encoding/json fmt time github.com/redis/go-redis/v9 ) type DelayedQueue struct { client *redis.Client name string zsetName string } func NewDelayedQueue(client *redis.Client, name string) *DelayedQueue { return DelayedQueue{ client: client, name: name, zsetName: name :delayed, } } type DelayedMessage struct { ID string Payload interface{} ExecuteAt time.Time } func (dq *DelayedQueue) Schedule(ctx context.Context, msg *DelayedMessage) error { data, err : json.Marshal(msg.Payload) if err ! nil { return err } score : float64(msg.ExecuteAt.Unix()) pipe : dq.client.Pipeline() pipe.ZAdd(ctx, dq.zsetName, redis.Z{ Score: score, Member: fmt.Sprintf(%s:%s, msg.ID, string(data)), }) pipe.LPush(ctx, dq.name, msg.ID) _, err pipe.Exec(ctx) return err } func (dq *DelayedQueue) Process(ctx context.Context, handler func(msg *DelayedMessage) error) error { now : float64(time.Now().Unix()) result, err : dq.client.ZPopMin(ctx, dq.zsetName, 1).Result() if err ! nil { return err } if len(result) 0 { return nil } msgID : result[0].Member.(string) payload, err : dq.client.LPop(ctx, dq.name).Result() if err ! nil { return err } msg : DelayedMessage{ ID: msgID, ExecuteAt: time.Unix(int64(result[0].Score), 0), } if err : json.Unmarshal([]byte(payload), msg.Payload); err ! nil { return err } return handler(msg) } func (dq *DelayedQueue) Cancel(ctx context.Context, msgID string) error { pattern : msgID :* result, err : dq.client.ZRangeByScore(ctx, dq.zsetName, redis.ZRangeBy{ Min: -inf, Max: inf, }).Result() if err ! nil { return err } for _, member : range result { if len(member) len(msgID)1 member[:len(msgID)] msgID { return dq.client.ZRem(ctx, dq.zsetName, member).Err() } } return nil }四、Set类型无序去重4.1 标签系统实现package redis import ( context fmt github.com/redis/go-redis/v9 ) type TagSystem struct { client *redis.Client } func NewTagSystem(client *redis.Client) *TagSystem { return TagSystem{client: client} } func (ts *TagSystem) AddTags(ctx context.Context, entityType string, entityID string, tags ...string) error { key : fmt.Sprintf(entity:%s:%s:tags, entityType, entityID) return ts.client.SAdd(ctx, key, tags).Err() } func (ts *TagSystem) RemoveTags(ctx context.Context, entityType string, entityID string, tags ...string) error { key : fmt.Sprintf(entity:%s:%s:tags, entityType, entityID) return ts.client.SRem(ctx, key, tags).Err() } func (ts *TagSystem) GetTags(ctx context.Context, entityType string, entityID string) ([]string, error) { key : fmt.Sprintf(entity:%s:%s:tags, entityType, entityID) return ts.client.SMembers(ctx, key).Result() } func (ts *TagSystem) HasTag(ctx context.Context, entityType string, entityID string, tag string) (bool, error) { key : fmt.Sprintf(entity:%s:%s:tags, entityType, entityID) return ts.client.SIsMember(ctx, key, tag).Result() } func (ts *TagSystem) GetEntitiesByTag(ctx context.Context, entityType string, tag string) ([]string, error) { key : fmt.Sprintf(tag:%s:%s:%s, entityType, tag, entities) return ts.client.SMembers(ctx, key).Result() } func (ts *TagSystem) AddEntityToTag(ctx context.Context, entityType string, entityID string, tag string) error { entityKey : fmt.Sprintf(entity:%s:%s:tags, entityType, entityID) tagKey : fmt.Sprintf(tag:%s:%s:%s, entityType, tag, entities) pipe : ts.client.Pipeline() pipe.SAdd(ctx, entityKey, tag) pipe.SAdd(ctx, tagKey, entityID) _, err : pipe.Exec(ctx) return err } func (ts *TagSystem) GetIntersection(ctx context.Context, entityType string, tags ...string) ([]string, error) { if len(tags) 0 { return nil, nil } keys : make([]string, len(tags)) for i, tag : range tags { keys[i] fmt.Sprintf(tag:%s:%s:%s, entityType, tag, entities) } return ts.client.SInter(ctx, keys...).Result() } func (ts *TagSystem) GetUnion(ctx context.Context, entityType string, tags ...string) ([]string, error) { if len(tags) 0 { return nil, nil } keys : make([]string, len(tags)) for i, tag : range tags { keys[i] fmt.Sprintf(tag:%s:%s:%s, entityType, tag, entities) } return ts.client.SUnion(ctx, keys...).Result() }五、ZSet类型有序集合5.1 排行榜实现package redis import ( context fmt github.com/redis/go-redis/v9 ) type Leaderboard struct { client *redis.Client key string } func NewLeaderboard(client *redis.Client, name string) *Leaderboard { return Leaderboard{ client: client, key: fmt.Sprintf(leaderboard:%s, name), } } func (l *Leaderboard) UpdateScore(ctx context.Context, member string, score float64) error { return l.client.ZAdd(ctx, l.key, redis.Z{ Score: score, Member: member, }).Err() } func (l *Leaderboard) IncrementScore(ctx context.Context, member string, increment float64) (float64, error) { return l.client.ZIncrBy(ctx, l.key, increment, member).Result() } func (l *Leaderboard) GetRank(ctx context.Context, member string) (int64, error) { rank, err : l.client.ZRevRank(ctx, l.key, member).Result() if err ! nil { return -1, err } return rank 1, nil } func (l *Leaderboard) GetScore(ctx context.Context, member string) (float64, error) { return l.client.ZScore(ctx, l.key, member).Result() } func (l *Leaderboard) GetTopN(ctx context.Context, n int64) ([]redis.Z, error) { return l.client.ZRevRangeWithScores(ctx, l.key, 0, n-1).Result() } func (l *Leaderboard) GetRange(ctx context.Context, start, stop int64) ([]redis.Z, error) { return l.client.ZRevRangeWithScores(ctx, l.key, start, stop).Result() } func (l *Leaderboard) GetRankedMembers(ctx context.Context, start, stop int64) ([]string, error) { return l.client.ZRevRange(ctx, l.key, start, stop).Result() } func (l *Leaderboard) RemoveMember(ctx context.Context, member string) error { return l.client.ZRem(ctx, l.key, member).Err() } func (l *Leaderboard) GetCount(ctx context.Context) (int64, error) { return l.client.ZCard(ctx, l.key).Result() } func (l *Leaderboard) GetMembersByScore(ctx context.Context, min, max float64) ([]redis.Z, error) { return l.client.ZRevRangeByScoreWithScores(ctx, l.key, redis.ZRangeBy{ Min: fmt.Sprintf(%f, min), Max: fmt.Sprintf(%f, max), }).Result() }六、Geospatial地理位置package redis import ( context fmt github.com/redis/go-redis/v9 ) type GeoOperations struct { client *redis.Client } func NewGeoOperations(client *redis.Client) *GeoOperations { return GeoOperations{client: client} } func (g *GeoOperations) AddLocation(ctx context.Context, key string, longitude, latitude float64, member string) error { return g.client.GeoAdd(ctx, key, redis.GeoLocation{ Name: member, Longitude: longitude, Latitude: latitude, }).Err() } func (g *GeoOperations) GetPosition(ctx context.Context, key string, member string) (*redis.GeoPos, error) { pos, err : g.client.GeoPos(ctx, key, member).Result() if err ! nil { return nil, err } if len(pos) 0 || pos[0] nil { return nil, fmt.Errorf(member not found) } return pos[0], nil } func (g *GeoOperations) GetDistance(ctx context.Context, key, member1, member2, unit string) (float64, error) { return g.client.GeoDist(ctx, key, member1, member2, unit).Result() } func (g *GeoOperations) SearchNearby(ctx context.Context, key string, longitude, latitude, radius float64, unit string, count int) ([]redis.GeoLocation, error) { return g.client.GeoSearchLocation(ctx, key, redis.GeoSearchLocationQuery{ GeoSearchQuery: redis.GeoSearchQuery{ Longitude: longitude, Latitude: latitude, Radius: radius, Unit: unit, WithCoord: true, WithDist: true, Count: count, Sort: ASC, }, }).Result() }七、总结Redis的丰富数据结构为开发者提供了强大的工具集String适用于简单的KV缓存、计数器、分布式锁Hash适用于对象存储适合字段级别的更新List适用于消息队列、任务队列、栈Set适用于标签系统、去重场景、交集运算ZSet适用于排行榜、有序事件、延迟队列Geospatial适用于附近的人、地理位置服务合理选择数据结构能够显著提升应用性能和代码可读性。