缓存穿透bitmap
❶ SpringBoot+Redis BitMap 实现签到与统计功能
欢迎来到技术小站,我们将持续分享优质的技术文章。
在项目开发中,签到与统计功能的应用非常普遍。通过签到,我们能够为用户提供奖励,以此激励用户保持活跃,深入使用平台。
实现签到功能,Redis中的BitMap是一个高效的选择。相比于MySQL,BitMap能够以极小的空间实现相同的功能。
BitMap的基本语法与指令如下:
Redis提供了一系列的BitMap操作指令,用于实现签到功能。
为了实现签到功能,我们将年和月作为BitMap的键,并将其存储在BitMap中。每次签到,就将对应的位从0变为1,表示用户已签到。
接下来,我们将通过SpringBoot整合Redis,实现签到功能。
为了实现签到功能,我们需要构建一个接口,用于将当前用户当天的签到信息保存至Redis中。
此外,我们还需要实现签到统计功能,包括连续签到天数、本月内的所有签到数据以及连续签到的次数等。
我们利用BitMap的特点,通过查询BitMap的值,实现对连续签到天数的统计。
同时,我们还需要关注缓存穿透问题的解决方案,通过将数据库的数据对应的id写入到一个list集合中,可以有效防止缓存穿透。
最后,通过整合SpringBoot与Redis,我们能够实现高效的签到与统计功能,提高用户体验,促进用户活跃。
技术改变世界,希望这些知识能为你的项目带来价值。
❷ 详解布隆过滤器的原理和实现
为什么需要布隆过滤器想象一下遇到下面的场景你会如何处理:
手机号是否重复注册
用户是否参与过某秒杀活动
伪造请求大量 id 查询不存在的记录,此时缓存未命中,如何避免缓存穿透
针对以上问题常规做法是:查询数据库,数据库硬扛,如果压力并不大可以使用此方法,保持简单即可。
改进做法:用 list/set/tree 维护一个元素集合,判断元素是否在集合内,时间复杂度或空间复杂度会比较高。如果是微服务的话可以用 redis 中的 list/set 数据结构, 数据规模非常大此方案的内存容量要求可能会非常高。
这些场景有个共同点,可以将问题抽象为:如何高效判断一个元素不在集合中? 那么有没有一种更好方案能达到时间复杂度和空间复杂双优呢?
有!布隆过滤器。
什么是布隆过滤器布隆过滤器(英语:Bloom Filter)是 1970 年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中,它的优点是空间效率和查询时间都远远超过一般的算法。
工作原理
布隆过滤器的原理是,当一个元素被加入集合时,通过 K 个散列函数将这个元素映射成一个位数组中的 K 个点(offset),把它们置为 1。检索时,我们只要看看这些点是不是都是 1 就(大约)知道集合中有没有它了:如果这些点有任何一个 0,则被检元素一定不在;如果都是 1,则被检元素很可能在。这就是布隆过滤器的基本思想。
简单来说就是准备一个长度为 m 的位数组并初始化所有元素为 0,用 k 个散列函数对元素进行 k 次散列运算跟 len(m)取余得到 k 个位置并将 m 中对应位置设置为 1。
布隆过滤器优缺点优点:
空间占用极小,因为本身不存储数据而是用比特位表示数据是否存在,某种程度有保密的效果。
插入与查询时间复杂度均为 O(k),常数级别,k 表示散列函数执行次数。
散列函数之间可以相互独立,可以在硬件指令层加速计算。
缺点:
误差(假阳性率)。
无法删除。
误差(假阳性率)
布隆过滤器可以 100% 判断元素不在集合中,但是当元素在集合中时可能存在误判,因为当元素非常多时散列函数产生的 k 位点可能会重复。 维基网络有关于假阳性率的数学推导(见文末链接)这里我们直接给结论(实际上是我没看懂...),假设:
位数组长度 m
散列函数个数 k
预期元素数量 n
期望误差ε
在创建布隆过滤器时我们为了找到合适的 m 和 k ,可以根据预期元素数量 n 与 ε 来推导出最合适的 m 与 k 。
java 中 Guava, Redisson 实现布隆过滤器估算最优 m 和 k 采用的就是此算法:
//计算哈希次数@(longn,longm){//(m/n)*log(2),butavoidtruncationetodivision!returnMath.max(1,(int)Math.round((double)m/n*Math.log(2)));}//计算位数组长度@(longn,doublep){if(p==0){p=Double.MIN_VALUE;}return(long)(-n*Math.log(p)/(Math.log(2)*Math.log(2)));}无法删除
位数组中的某些 k 点是多个元素重复使用的,假如我们将其中一个元素的 k 点全部置为 0 则直接就会影响其他元素。 这导致我们在使用布隆过滤器时无法处理元素被删除的场景。
可以通过定时重建的方式清除脏数据。假如是通过 redis 来实现的话重建时不要直接删除原有的 key,而是先生成好新的再通过 rename 命令即可,再删除旧数据即可。
go-zero 中的 bloom filter 源码分析core/bloom/bloom.go 一个布隆过滤器具备两个核心属性:
位数组:
散列函数
go-zero实现的bloom filter中位数组采用的是Redis.bitmap,既然采用的是 redis 自然就支持分布式场景,散列函数采用的是MurmurHash3
Redis.bitmap 为什么可以作为位数组呢?
Redis 中的并没有单独的 bitmap 数据结构,底层使用的是动态字符串(SDS)实现,而 Redis 中的字符串实际都是以二进制存储的。 a 的ASCII码是 97,转换为二进制是:01100001,如果我们要将其转换为b只需要进一位即可:01100010。下面通过Redis.setbit实现这个操作:
set foo a OK get foo "a" setbit foo 6 1 0 setbit foo 7 0 1 get foo "b"
bitmap 底层使用的动态字符串可以实现动态扩容,当 offset 到高位时其他位置 bitmap 将会自动补 0,最大支持 2^32-1 长度的位数组(占用内存 512M),需要注意的是分配大内存会阻塞Redis进程。 根据上面的算法原理可以知道实现布隆过滤器主要做三件事情:
k 次散列函数计算出 k 个位点。
插入时将位数组中 k 个位点的值设置为 1。
查询时根据 1 的计算结果判断 k 位点是否全部为 1,否则表示该元素一定不存在。
下面来看看go-zero 是如何实现的:
对象定义
//表示经过多少散列函数计算//固定14次maps=14type(//定义布隆过滤器结构体Filterstruct{bitsuintbitSetbitSetProvider}//位数组操作接口定义bitSetProviderinterface{check([]uint)(bool,error)set([]uint)error})位数组操作接口实现
首先需要理解两段 lua 脚本:
//ARGV:偏移量offset数组//KYES[1]:setbit操作的key//全部设置为1setScript=`for_,offsetinipairs(ARGV)doredis.call("setbit",KEYS[1],offset,1)end`//ARGV:偏移量offset数组//KYES[1]:setbit操作的key//检查是否全部为1testScript=`for_,offsetinipairs(ARGV)doiftonumber(redis.call("getbit",KEYS[1],offset))==`为什么一定要用 lua 脚本呢? 因为需要保证整个操作是原子性执行的。
//redis位数组typeredisBitSetstruct{store*redis.Clientkeystringbitsuint}//检查偏移量offset数组是否全部为1//是:元素可能存在//否:元素一定不存在func(r*redisBitSet)check(offsets[]uint)(bool,error){args,err:=r.buildOffsetArgs(offsets)iferr!=nil{returnfalse,err}//执行脚本resp,err:=r.store.Eval(testScript,[]string{r.key},args)//这里需要注意一下,底层使用的go-redis//redis.Nil表示key不存在的情况需特殊判断iferr==redis.Nil{returnfalse,nil}elseiferr!=nil{returnfalse,err}exists,ok:=resp.(int64)if!ok{returnfalse,nil}returnexists==1,nil}//将k位点全部设置为1func(r*redisBitSet)set(offsets[]uint)error{args,err:=r.buildOffsetArgs(offsets)iferr!=nil{returnerr}_,err=r.store.Eval(setScript,[]string{r.key},args)//底层使用的是go-redis,redis.Nil表示操作的key不存在//需要针对key不存在的情况特殊判断iferr==redis.Nil{returnnil}elseiferr!=nil{returnerr}returnnil}//构建偏移量offset字符串数组,因为go-redis执行lua脚本时参数定义为[]stringy//因此需要转换一下func(r*redisBitSet)buildOffsetArgs(offsets[]uint)([]string,error){varargs[]stringfor_,offset:=rangeoffsets{ifoffset>=r.bits{returnnil,ErrTooLargeOffset}args=append(args,strconv.FormatUint(uint64(offset),10))}returnargs,nil}//删除func(r*redisBitSet)del()error{_,err:=r.store.Del(r.key)returnerr}//自动过期func(r*redisBitSet)expire(secondsint)error{returnr.store.Expire(r.key,seconds)}funcnewRedisBitSet(store*redis.Client,keystring,bitsuint)*redisBitSet{return&redisBitSet{store:store,key:key,bits:bits,}}到这里位数组操作就全部实现了,接下来看下如何通过 k 个散列函数计算出 k 个位点
k 次散列计算出 k 个位点
//k次散列计算出k个offsetfunc(f*Filter)getLocations(data[]byte)[]uint{//创建指定容量的切片locations:=make([]uint,maps)//maps表示k值,作者定义为了常量:14fori:=uint(0);i<maps;i++{//哈希计算,使用的是"MurmurHash3"算法,并每次追加一个固定的i字节进行计算hashValue:=hash.Hash(append(data,byte(i)))//取下标offsetlocations[i]=uint(hashValue%uint64(f.bits))}returnlocations}插入与查询
添加与查询实现就非常简单了,组合一下上面的函数就行。
//添加元素func(f*Filter)Add(data[]byte)error{locations:=f.getLocations(data)returnf.bitSet.set(locations)}//检查是否存在func(f*Filter)Exists(data[]byte)(bool,error){locations:=f.getLocations(data)isSet,err:=f.bitSet.check(locations)iferr!=nil{returnfalse,err}if!isSet{returnfalse,nil}returntrue,nil}改进建议整体实现非常简洁高效,那么有没有改进的空间呢?
个人认为还是有的,上面提到过自动计算最优 m 与 k 的数学公式,如果创建参数改为:
预期总数量expectedInsertions
期望误差falseProbability
就更好了,虽然作者注释里特别提到了误差说明,但是实际上作为很多开发者对位数组长度并不敏感,无法直观知道 bits 传多少预期误差会是多少。
//NewcreateaFilter,storeisthebackedredis,keyisthekeyforthebloomfilter,//bitsishowmanybitswillbeused,.//bestpractices://elements-meanshowmanyactualelements//whenmaps=14,formula:0.7*(bits/maps),bits=20*elements,theerrorrateis0.000067<1e-4//fordetailederrorratetable,seehttp://pages.cs.wisc.e/~cao/papers/summary-cache/node8.htmlfuncNew(store*redis.Redis,keystring,bitsuint)*Filter{return&Filter{bits:bits,bitSet:newRedisBitSet(store,key,bits),}}//expectedInsertions-预期总数量//falseProbability-预期误差//这里也可以改为option模式不会破坏原有的兼容性funcNewFilter(store*redis.Redis,keystring,expectedInsertionsuint,falseProbabilityfloat64)*Filter{bits:=optimalNumOfBits(expectedInsertions,falseProbability)k:=optimalNumOfHashFunctions(bits,expectedInsertions)return&Filter{bits:bits,bitSet:newRedisBitSet(store,key,bits),k:k,}}//计算最优哈希次数funcoptimalNumOfHashFunctions(m,nuint)uint{returnuint(math.Round(float64(m)/float64(n)*math.Log(2)))}//计算最优数组长度funcoptimalNumOfBits(nuint,pfloat64)uint{returnuint(float64(-n)*math.Log(p)/(math.Log(2)*math.Log(2)))}回到问题如何预防非法 id 导致缓存穿透?
由于 id 不存在导致请求无法命中缓存流量直接打到数据库,同时数据库也不存在该记录导致无法写入缓存,高并发场景这无疑会极大增加数据库压力。 解决方案有两种:
采用布隆过滤器
数据写入数据库时需同步写入布隆过滤器,同时如果存在脏数据场景(比如:删除)则需要定时重建布隆过滤器,使用 redis 作为存储时不可以直接删除 bloom.key,可以采用 rename key 的方式更新 bloom
缓存与数据库同时无法命中时向缓存写入一个过期时间较短的空值。
资料布隆过滤器(Bloom Filter)原理及 Guava 中的具体实现
布隆过滤器-维基网络
Redis.setbit
项目地址https://github.com/zeromicro/go-zero
欢迎使用 go-zero 并 star 支持我们!
微信交流群关注‘微服务实践’公众号并点击 交流群 获取社区群二维码。
❸ 缓存击穿互斥锁 设置锁的失效时间
设置锁的失效时间是自己设置的,它的过期时间会很短,最长不超过五分钟
缓存穿透是指查询一个一定不存在的数据
由于缓存是不命中时被动写的,
并且出于容错考虑,如果从存储层查不到数据则不写入缓存,
这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。
在流量大时,可能DB就挂掉了,
要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。
最常见的则是采用布隆过滤器
将所有可能存在的数据哈希到一个足够大的bitmap中,
一个一定不存在的数据会被 这个bitmap拦截掉,
从而避免了对底层存储系统的查询压力。
另外也有一个更为简单粗暴的方法
如果一个查询返回的数据为空(不管是数 据不存在,还是系统故障),
我们仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟