亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.430618.com 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

多級(jí)緩存在微服務(wù)的架構(gòu)設(shè)計(jì)中可謂隨處可見(jiàn),多級(jí)緩存作為提升系統(tǒng)高并發(fā)的常規(guī)手段,在各類大中小型的系統(tǒng)設(shè)計(jì)中都有體現(xiàn);

下圖是一張簡(jiǎn)單的服務(wù)端多級(jí)緩存設(shè)計(jì)示意圖,多級(jí)緩存的常用解決方案,像ehcache + redis,或caffeine + springcache等,即利用JVM內(nèi)存緩存 + redis緩存配合;


 

一、緩存一致性問(wèn)題

多級(jí)緩存帶來(lái)的好處是顯著的,一定程度上可以應(yīng)對(duì)較高的并發(fā),但隨之帶來(lái)了一個(gè)比較大的問(wèn)題就是緩存一致性問(wèn)題;

我們知道,JVM緩存屬于進(jìn)程級(jí)的緩存,和當(dāng)前服務(wù)實(shí)例是綁定的,而redis緩存可以作為分布式緩存,通常JVM緩存的是那些生命周期較短的熱點(diǎn)查詢數(shù)據(jù),即過(guò)期時(shí)間不會(huì)太久,而redis緩存相對(duì)來(lái)說(shuō),過(guò)期時(shí)間相對(duì)長(zhǎng)一點(diǎn),JVM緩存通常作為服務(wù)端扛壓的第一道屏障,如果設(shè)置的過(guò)期時(shí)間太長(zhǎng),將會(huì)對(duì)JVM內(nèi)存的開(kāi)銷非常大,所以一般作為短頻使用;

設(shè)想這么一個(gè)場(chǎng)景,服務(wù)A采用多實(shí)例部署,這里假設(shè)部署了兩個(gè)節(jié)點(diǎn),首次根據(jù)ID查詢一個(gè)用戶信息的對(duì)象數(shù)據(jù)將會(huì)同時(shí)被JVM緩存,同時(shí)也會(huì)被redis緩存,下一次過(guò)來(lái)同樣參數(shù)的請(qǐng)求時(shí),首先走JVM緩存,查到了直接返回,否則走redis緩存;

上面是一個(gè)正常的關(guān)于緩存存取的過(guò)程,問(wèn)題是,JVM緩存是同進(jìn)程綁定的,如果第一個(gè)節(jié)點(diǎn)的數(shù)據(jù)發(fā)生了變更,比如刪除了,對(duì)于redis緩存來(lái)說(shuō),可以做到動(dòng)態(tài)刷緩存的效果,但是redis緩存和本地緩存之間并沒(méi)有一種強(qiáng)同步的機(jī)制確保兩者的緩存保持一致;

甚至來(lái)說(shuō),第一個(gè)節(jié)點(diǎn)與第二個(gè)節(jié)點(diǎn)之間,兩者是無(wú)狀態(tài)的,當(dāng)?shù)谝粋€(gè)節(jié)點(diǎn)上面的數(shù)據(jù)被刪除時(shí),假如此刻并發(fā)的查詢請(qǐng)求到達(dá)第二個(gè)節(jié)點(diǎn),JVM緩存查詢到必然是上一次緩存的數(shù)據(jù);

于是,我們的問(wèn)題就是,在多級(jí)緩存模式下,如何解決緩存一致性的問(wèn)題呢?

二、一個(gè)簡(jiǎn)單的案例

基于之前的一篇springcache 詳細(xì)使用和spring boot 二級(jí)緩存案例基礎(chǔ)上我們進(jìn)行案例演示和改造;

在案例中,我們提供了幾個(gè)核心的接口:

 

  • 根據(jù)用戶ID查詢用戶,并緩存到redis;
  • 根據(jù)用戶ID查詢用戶,并緩存到JVM,這里采用caffeine;
  • 根據(jù)用戶ID刪除用戶;
1、緩存核心配置類import com.fasterxml.jackson.annotation.JsonAutoDetect;import com.fasterxml.jackson.annotation.JsonInclude;import com.fasterxml.jackson.annotation.JsonTypeInfo;import com.fasterxml.jackson.annotation.PropertyAccessor;import com.fasterxml.jackson.databind.MApperFeature;import com.fasterxml.jackson.databind.ObjectMapper;import com.fasterxml.jackson.databind.SerializationFeature;import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;import com.fasterxml.jackson.datatype.jsr310.JAVATimeModule;import com.github.benmanes.caffeine.cache.Caffeine;import org.springframework.cache.CacheManager;import org.springframework.cache.annotation.CachingConfigurerSupport;import org.springframework.cache.caffeine.CaffeineCacheManager;import org.springframework.cache.interceptor.KeyGenerator;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.data.redis.cache.RedisCacheConfiguration;import org.springframework.data.redis.cache.RedisCacheManager;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;import org.springframework.data.redis.serializer.RedisSerializationContext;import org.springframework.data.redis.serializer.StringRedisSerializer;import org.springframework.util.StringUtils;import java.lang.reflect.Method;import java.time.Duration;import java.util.concurrent.TimeUnit;@Configurationpublic class RedisConfig extends CachingConfigurerSupport {@Beanpublic RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);//使用Jackson2JsonRedisSerializer來(lái)序列化和反序列化redis的value值Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper mapper = new ObjectMapper();mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(mapper);template.setValueSerializer(jackson2JsonRedisSerializer);//使用StringRedisSerializer來(lái)序列化和反序列化redis的key值template.setKeySerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;* 分鐘級(jí)別* @param connectionFactory* @return@Bean("cacheManagerMinutes")public RedisCacheManager cacheManagerMinutes(RedisConnectionFactory connectionFactory){RedisCacheConfiguration configuration = instanceConfig(3 * 60L);return RedisCacheManager.builder(connectionFactory).cacheDefaults(configuration).transactionAware().build();* 小時(shí)級(jí)別* @param connectionFactory* @return@Bean("cacheManagerHour")@Primarypublic RedisCacheManager cacheManagerHour(RedisConnectionFactory connectionFactory){RedisCacheConfiguration configuration = instanceConfig(3600L);return RedisCacheManager.builder(connectionFactory).cacheDefaults(configuration).transactionAware().build();* 天級(jí)別* @param connectionFactory* @return@Bean("cacheManagerDay")public RedisCacheManager cacheManagerDay(RedisConnectionFactory connectionFactory){RedisCacheConfiguration configuration = instanceConfig(3600 * 24L);;return RedisCacheManager.builder(connectionFactory).cacheDefaults(configuration).transactionAware().build();* 正常時(shí)間的本地緩存@Bean("caffeineCacheManager")public CacheManager caffeineCacheManager() {CaffeineCacheManager cacheManager = new CaffeineCacheManager();cacheManager.setCaffeine(Caffeine.newBuilder().expireAfterWrite(50, TimeUnit.SECONDS).initialCapacity(256).maximumSize(10000));return cacheManager;private RedisCacheConfiguration instanceConfig(long ttl){Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);objectMapper.registerModule(new JavaTimeModule());objectMapper.configure(MapperFeature.USE_ANNOTATIONS,false);//只針對(duì)非空的值進(jìn)行序列化objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);//將類型序列化到屬性的json字符串objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,ObjectMapper.DefaultTyping.NON_FINAL,JsonTypeInfo.As.PROPERTY);jackson2JsonRedisSerializer.setObjectMapper(objectMapper);return RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofSeconds(ttl)).disableCachingNullValues().serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer));* 自定義key生成策略* @return@Bean("defaultSpringKeyGenerator")public KeyGenerator defaultSpringKeyGenerator(){return new KeyGenerator() {@Overridepublic Object generate(Object o, Method method, Object... objects) {String key = o.getClass().getSimpleName() + "_"+ method.getName() +"_"+ StringUtils.arrayToDelimitedString(objects,"_");System.out.println("key :" + key);return key;}2、配置文件開(kāi)啟使用 springcachespring:redis:host: localhostport: 6379cache:type: redis3、幾個(gè)核心接口1)根據(jù)用戶ID獲取用戶@GetMapping("/getById")public DbUser getById(String id){return dbUserService.getById(id);@Override@Cacheable(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public DbUser getById(String id) {System.out.println("首次查詢走數(shù)據(jù)庫(kù)");DbUser dbUser = dbUserMapper.getByUserId(id);return dbUser;2)根據(jù)用戶ID查詢用戶,并緩存到JVM;@GetMapping("/getByIdFromCaffeine")public DbUser getByIdFromCaffeine(String id){return dbUserService.getByIdFromCaffeine(id);@Override@Cacheable(value = {"dbUser"},key = "#root.args[0]",cacheManager = "caffeineCacheManager")public DbUser getByIdFromCaffeine(String id) {System.out.println("查詢數(shù)據(jù)庫(kù)");DbUser dbUser = dbUserMapper.getByUserId(id);System.out.println("第一次走緩存");return dbUser;3)根據(jù)用戶ID刪除用戶;@GetMapping("/deleteById")public String deleteById(String id){return dbUserService.deleteById(id);@Override@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public String deleteById(String id) {dbUserMapper.deleteByUserId(id);return "delete success";4、功能測(cè)試

 

首先在數(shù)據(jù)庫(kù)的 db_user 表準(zhǔn)備一條測(cè)試數(shù)據(jù)

分別調(diào)用查詢用戶接口

1、 http://localhost:8083/getById?id=1 ;

多次刷新接口,sql只輸出了一次


 

2、 http://localhost:8083/getByIdFromCaffeine?id=1


 

多次刷新接口,sql只輸出了一次


 

從上面的結(jié)果可以看到,我們模擬了查詢數(shù)據(jù)分別緩存到了JVM內(nèi)存和redis的效果,接下來(lái),刪除當(dāng)前這條數(shù)據(jù),執(zhí)行下面的接口

3、 http://localhost:8083/deleteById?id=1


 


 

再次調(diào)用第一個(gè)查詢用戶的接口,無(wú)返回?cái)?shù)據(jù),表明redis中緩存的結(jié)果被清理了,這是我們使用了springcache后,通過(guò) CacheEvict 這個(gè)注解,會(huì)自動(dòng)幫我們管理redis中的緩存;

但這時(shí),再次調(diào)用查詢JVM緩存的接口,發(fā)現(xiàn)仍然可以從本地緩存中得到數(shù)據(jù)

4、 http://localhost:8083/getByIdFromCaffeine?id=1


 

基于上面的測(cè)試結(jié)果,可以看到,緩存一致性的問(wèn)題就產(chǎn)生了,這里我故意將本地緩存的時(shí)間調(diào)整的長(zhǎng)了一點(diǎn),實(shí)際開(kāi)發(fā)過(guò)程中,建議本地緩存的時(shí)間一般不要超過(guò)1分鐘;

三、解決方案一:清理redis緩存,同步清理本地緩存1、增加一個(gè)本地緩存操作的工具類import com.congge.config.SpringContextHolder;import org.springframework.cache.Cache;import org.springframework.cache.CacheManager;import java.util.Objects;public class CaffeineCacheUtils {private static CacheManager cm;static {cm = SpringContextHolder.getBean("caffeineCacheManager");* 添加緩存* @param cacheName 緩存名稱* @param key 緩存key* @param value 緩存值public static void put(String cacheName, String key, Object value) {Cache cache = cm.getCache(cacheName);cache.put(key, value);* 獲取緩存* @param cacheName 緩存名稱* @param key 緩存key* @returnpublic static Object get(String cacheName, String key) {Cache cache = cm.getCache(cacheName);if (cache == null) {return null;return Objects.requireNonNull(cache.get(key)).get();* 獲取緩存(字符串)* @param cacheName 緩存名稱* @param key 緩存key* @returnpublic static String getString(String cacheName, String key) {Cache cache = cm.getCache(cacheName);if (cache == null) {return null;Cache.ValueWrapper wrapper = cache.get(key);if (wrapper == null) {return null;return Objects.requireNonNull(wrapper.get()).toString();* 獲取緩存(泛型)* @param cacheName 緩存名稱* @param key 緩存key* @param clazz 緩存類* @param 返回值泛型* @returnpublic static T get(String cacheName, String key, Class clazz) {Cache cache = cm.getCache(cacheName);if (cache == null) {return null;Cache.ValueWrapper wrapper = cache.get(key);if (wrapper == null) {return null;return (T) wrapper.get();* 清理緩存* @param cacheName 緩存名稱* @param key 緩存keypublic static void evict(String cacheName, String key) {Cache cache = cm.getCache(cacheName);System.out.println(cache.getName());if (cache != null) {cache.evict(key);2、刪除用戶接口中同步清理本地緩存

只需改造下刪除接口的服務(wù)實(shí)現(xiàn)方法即可

private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();* 刪除,同時(shí)需要?jiǎng)h除相關(guān)的key* @param id* @return@Override@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public String deleteById(String id) {dbUserMapper.deleteByUserId(id);caffeineCacheUtils.evict("dbUser",id);return "delete success";3、方案優(yōu)缺點(diǎn)優(yōu)點(diǎn)

  • 操作簡(jiǎn)便;
  • 只要參數(shù)傳入正確,就可以確保緩存一致性;
  • 適合單機(jī)模式下使用
缺點(diǎn)
  • 代碼產(chǎn)生了一定的耦合性;
  • 不適合分布式環(huán)境使用;
  • 需要手動(dòng)管理key的相關(guān)參數(shù);
四、解決方案二:使用zookeeper 實(shí)現(xiàn)緩存同步

 

對(duì)zookeeper有所了解和使用的同學(xué),應(yīng)該對(duì)zk的節(jié)點(diǎn)管理不陌生,zk作為一款分布式協(xié)調(diào)中間件,在很多分布式場(chǎng)景都有著廣泛的使用,比如實(shí)現(xiàn)集群選舉,分布式鎖,節(jié)點(diǎn)管理等等,利用zk的節(jié)點(diǎn)屬性,可以很好的解決這個(gè)問(wèn)題;

使用zk的解決思路

  • 查詢用戶接口中,注冊(cè)一個(gè)節(jié)點(diǎn),節(jié)點(diǎn)命名最好和緩存的key保持一致;
  • 刪除接口中,手動(dòng)觸zk的節(jié)點(diǎn)刪除;
  • zk監(jiān)聽(tīng)到刪除節(jié)點(diǎn)的事件變化時(shí),同步清理本地緩存;
1、zk客戶端依賴com.101teczkclient0.10org.slf4jslf4j-log4j122、提供一個(gè)zk節(jié)點(diǎn)操作工具類import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.serialize.SerializableSerializer;import org.Apache.zookeeper.CreateMode;public class ZkUtils {private ZkClient zkClient = null;private String node;/*public static void main(String[] args) {ZkUtils zkUtils = new ZkUtils();zkUtils.createNode(node);zkUtils.nodeExist(node);zkUtils.deleteNode(node);public ZkUtils(String node) {zkClient = new ZkClient("localhost:2181", 60000 * 30, 60000, new SerializableSerializer());//監(jiān)聽(tīng)節(jié)點(diǎn)變化//需要通過(guò)java修改zookeeper數(shù)據(jù),才能監(jiān)聽(tīng)到zkClient.subscribeDataChanges("/" + node, new IZkDataListener() {//節(jié)點(diǎn)數(shù)據(jù)變化時(shí)觸發(fā)@Overridepublic void handleDataChange(String s, Object o) throws Exception {System.out.println("change Node: " + s);System.out.println("change data: " + o);//節(jié)點(diǎn)數(shù)據(jù)刪除時(shí)觸發(fā)@Overridepublic void handleDataDeleted(String s) throws Exception {System.out.println("delete Node: " + s);* 創(chuàng)建zk節(jié)點(diǎn)* @param nodepublic void createNode(String node) {//創(chuàng)建持久節(jié)點(diǎn)String node1 = zkClient.create("/" + node, node, CreateMode.PERSISTENT);System.out.println(node1);* 修改zk節(jié)點(diǎn)數(shù)據(jù)* @param node* @param datapublic void writeNodeData(String node, String data) {zkClient.writeData("/" + node, 233);* 查詢zk節(jié)點(diǎn)* @param nodepublic boolean nodeExist(String node) {boolean exists = zkClient.exists("/" + node);return exists;* 查詢節(jié)點(diǎn)數(shù)據(jù)* @param node* @returnpublic String findNodeData(String node) {Object data = zkClient.readData("/" + node);System.out.println(data);return data.toString();* 刪除節(jié)點(diǎn)* @param nodepublic void deleteNode(String node) {boolean b2 = zkClient.deleteRecursive("/" + node);System.out.println(b2);3、查詢用戶接口,注冊(cè)緩存的key對(duì)應(yīng)的z-node節(jié)點(diǎn)@Override@Cacheable(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public DbUser getById(String id) {System.out.println("首次查詢走數(shù)據(jù)庫(kù)");DbUser dbUser = dbUserMapper.getByUserId(id);//FIXME 將緩存注冊(cè)到節(jié)點(diǎn)registerCacheNode(id);return dbUser;public void registerCacheNode(String id){String node = "user:" + id;ZkUtils zkUtils = new ZkUtils(node);zkUtils.createNode(node);4、刪除用戶接口添加刪除zk節(jié)點(diǎn)邏輯@Override@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public String deleteById(String id) {dbUserMapper.deleteByUserId(id);//刪除 z-node 節(jié)點(diǎn)String node = "user:" + id;ZkUtils zkUtils = new ZkUtils(node);zkUtils.deleteNode(node);return "delete success";5、改造zk監(jiān)聽(tīng)邏輯,同步移除本地緩存private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();public ZkUtils(String node) {zkClient = new ZkClient("localhost:2181", 60000 * 30, 60000, new SerializableSerializer());//監(jiān)聽(tīng)節(jié)點(diǎn)變化//需要通過(guò)java修改zookeeper數(shù)據(jù),才能監(jiān)聽(tīng)到zkClient.subscribeDataChanges("/" + node, new IZkDataListener() {//節(jié)點(diǎn)數(shù)據(jù)變化時(shí)觸發(fā)@Overridepublic void handleDataChange(String s, Object o) throws Exception {System.out.println("change Node: " + s);System.out.println("change data: " + o);//節(jié)點(diǎn)數(shù)據(jù)刪除時(shí)觸發(fā)@Overridepublic void handleDataDeleted(String s) throws Exception {System.out.println("delete Node: " + s);caffeineCacheUtils.evict("dbUser","1");6、測(cè)試1、啟動(dòng)服務(wù)后,按照上面的測(cè)試步驟,分別調(diào)用2個(gè)查詢接口

 

通過(guò)控制臺(tái)輸出結(jié)果,可以看到節(jié)點(diǎn)數(shù)據(jù)注冊(cè)到zk中


 


 

2、調(diào)用刪除接口

此時(shí)zk的監(jiān)聽(tīng)邏輯中監(jiān)聽(tīng)到了節(jié)點(diǎn)數(shù)據(jù)變更的事件,在變更的邏輯中,我們將同步刪除本地緩存的數(shù)據(jù);


 

再次調(diào)用時(shí)發(fā)現(xiàn)緩存已經(jīng)被清理


 

通過(guò)上面的操作演示,實(shí)現(xiàn)了基于zk的節(jié)點(diǎn)注冊(cè)與事件監(jiān)聽(tīng)機(jī)制實(shí)現(xiàn)緩存一致性的問(wèn)題處理;

五、解決方案三:使用redis事件訂閱與發(fā)布機(jī)制實(shí)現(xiàn)緩存同步

Redis 發(fā)布訂閱 (pub/sub) 是一種消息通信模式:發(fā)送者 (pub) 發(fā)送消息,訂閱者 (sub) 接收消息。

這種模式很像消息隊(duì)列的實(shí)現(xiàn)機(jī)制,服務(wù)端發(fā)布消息到topic,客戶端監(jiān)聽(tīng)topic的消息,并做自身的業(yè)務(wù)處理;

只不過(guò)在redis這里,不叫topic,而是channel,下面來(lái)看一個(gè)簡(jiǎn)單的redis實(shí)現(xiàn)的發(fā)布訂閱使用

1、導(dǎo)入依賴org.springframework.bootspring-boot-starter-data-redis2、自定義 RedisMessageListener

該類的功能和消息中間件中的監(jiān)聽(tīng)邏輯很相似

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;@Componentpublic class RedisMessageListener implements MessageListener {@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {// 獲取消息byte[] messageBody = message.getBody();// 使用值序列化器轉(zhuǎn)換Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);// 獲取監(jiān)聽(tīng)的頻道byte[] channelByte = message.getChannel();// 使用字符串序列化器轉(zhuǎn)換Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);// 渠道名稱轉(zhuǎn)換String patternStr = new String(pattern);System.out.println(patternStr);System.out.println("---頻道---: " + channel);System.out.println("---消息內(nèi)容---: " + msg);3、自定義 RedisSubConfig

該類用于配置特定的channel,即監(jiān)聽(tīng)來(lái)自哪些channel的消息

import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.listener.ChannelTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;@Configurationpublic class RedisSubConfig {@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisMessageListener listener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);//訂閱頻道redis.news 和 redis.life 這個(gè)container 可以添加多個(gè) messageListenercontainer.addMessageListener(listener, new ChannelTopic("redis.life"));container.addMessageListener(listener, new ChannelTopic("redis.news"));return container;4、最后編寫(xiě)一個(gè)接口做測(cè)試@GetMapping("/testPublish")public void testPublish(){dbUserService.testPublish();@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void testPublish() {redisTemplate.convertAndSend("redis.life", "aaa");redisTemplate.convertAndSend("redis.news", "bbb");

調(diào)用下接口,可以看到控制臺(tái)輸出如下信息


 

通過(guò)上面的演示,快速了解了一下redis的這種發(fā)布訂閱模式的功能使用,下面就來(lái)使用這種方式來(lái)解決緩存一致性問(wèn)題;

5、刪除用戶接口中向redis channel 推送消息@Override@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public String deleteById(String id) {dbUserMapper.deleteByUserId(id);redisTemplate.convertAndSend("redis.user", id);return "delete success";6、RedisMessageListener 改造

添加刪除本地緩存邏輯

@Overridepublic void onMessage(Message message, byte[] pattern) {CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();// 獲取消息byte[] messageBody = message.getBody();// 使用值序列化器轉(zhuǎn)換Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);// 獲取監(jiān)聽(tīng)的頻道byte[] channelByte = message.getChannel();// 使用字符串序列化器轉(zhuǎn)換Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);// 渠道名稱轉(zhuǎn)換String patternStr = new String(pattern);System.out.println(patternStr);System.out.println("---頻道---: " + channel);System.out.println("---消息內(nèi)容---: " + msg);caffeineCacheUtils.evict("dbUser",patternStr);7、模擬測(cè)試

啟動(dòng)服務(wù)后,直接調(diào)用刪除用戶接口,可以看到,監(jiān)聽(tīng)邏輯中收到了一條消息,然后調(diào)用本地緩存工具類刪除本地緩存即可


 


 

六、解決方案四:使用消息隊(duì)列實(shí)現(xiàn)緩存同步

了解了redis發(fā)布訂閱這種方式實(shí)現(xiàn)原理后,如果再更換為消息中間件來(lái)實(shí)現(xiàn)就不難理解了,其實(shí)現(xiàn)的大致思路如下:

 

  1. 調(diào)用刪除接口刪除用戶;
  2. 向特定的隊(duì)列推送一條刪除消息;
  3. 在消息監(jiān)聽(tīng)邏輯中接收消息,并清理本地緩存

 

以rabbitmq為例,其核心實(shí)現(xiàn)如下:

@RabbitHandlerpublic void process(String msg) {System.out.println("topicMessageReceiver 接收到了消息 : " +msg);//執(zhí)行本地緩存的刪除操作

關(guān)于rabbitmq的相關(guān)實(shí)現(xiàn)感興趣的同學(xué)可以參考:rabbbitmq 技術(shù)全解

七、總結(jié)

關(guān)于后3三種的實(shí)現(xiàn),不僅可以解決緩存一致性問(wèn)題,同時(shí)適用于分布式應(yīng)用的場(chǎng)景,算是比較通用的解決方案,但這樣一來(lái),引入了第三方組件,也增加了系統(tǒng)整體的復(fù)雜性,這一點(diǎn)需要在架構(gòu)設(shè)計(jì)中進(jìn)行綜合考量,結(jié)合小編本人的一些實(shí)踐經(jīng)驗(yàn),比較推薦使用redis的發(fā)布訂閱模式,這種方式簡(jiǎn)單高效,同時(shí)兼顧了避免引入更多的外部組件,可酌情參考。

分享到:
標(biāo)簽:springboot
用戶無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定