Redis中处理处理没有ACK确认的Stream
文章来源:凌兰 时间:2025-03-11
Stream是1个只可逃添内乱容的数据规范。也便是道Stream这类数据典型,尔们对于他的加添操纵,只可是背Stream的开端逃添内乱容,没有能正在头部大概中央拔出内乱容。那逃添的是甚么内乱容呢?Stream中逃添的内乱容实在便是1个或者多个key-value pair。那些键值对于不用遵照相反的机关。每次逃添的键值对于皆能够没有共。比方第1次逃添name=hello的键值对于,第两次也能够造成逃添desc=word的键值对于。
对于Redis Stream的作品谦天飞,那里没有再反复,然则那些著作不过通知您怎样收新闻,支新闻,然则不道假使尔们的生意处置进程中呈现了题目,那些不处置的新闻,何如再次处置?正在之前新闻队伍时提过1个逝世疑队伍,Kafka 生产端生产沉试战逝世疑队伍 https://www.javacui.com/tool/686.html ,那末Redis中,怎样处置少少同常新闻呢?要认识底下的代码,起首要毗连以下少许饬令,为领会绝那个题目,尔也是盘查了民圆文档后,写出了底下处置代码。XADD key ID field string [field string ...]将指定的淌条件逃添到指定key的淌中。 即使key没有生存,行动运转那个饬令的反作用,将应用淌的条件主动成立key。1个条件是由1组键值对于构成的,它根基上是1个小的字典。 键值对于以用户给定的依次保存,而且读与淌的饬令(如XRANGE 大概 XREAD) 能够包管依照经由过程XADD加添的挨次前往。XADD是唯独能够背淌加添数据的Redis饬令,然则另有其余饬令, 比方XDEL战XTRIM,他们也许从淌中节略数据。XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]XREADGROUP饬令是XREAD饬令的特别版原,拥护泯灭者组。要是不泯灭者组,仅应用XREAD,全部客户端皆将得到全部抵达淌的条件。差异,假使应用带有XREADGROUP的花费者组,则能够创立没有共的客户端组去花费抵达给定淌的没有共的部门。比方,即使淌得到新的条件A,B战C,而且有二个泯灭者经由过程耗费者组读与淌,个中1个客户端将会获得比方,新闻A战C,别的1个客户端获得新闻B,等等,以此类推。XPENDING key group [start end count] [consumer]经由过程花费者组从淌中获得数据,而没有是确认那些数据,拥有创办待处置条件的成就。XACK饬令会当即从待处置条件列表(PEL)中移除待处置条款,由于一朝新闻被乐成处置,消耗者组便没有再须要追踪它并记取新闻确当前全部者。XRANGE key start end [COUNT count]此饬令前往淌中知足给定ID限度的条款。边界由最小战最年夜ID指定。全部ID正在指定的二个ID之间或者取个中1个ID相当(关开区间)的条件将会被前往。特别ID:- 战 +,特出ID-战+别离示意淌中大概的最小ID战最年夜IDXACK key group ID [ID ...]XACK饬令用于从淌的泯灭者组的待处置条件列表(简称PEL)中节略1条或者多条新闻。一朝耗费者乐成天处置完1条新闻,它应当移用XACK,如许那个新闻便没有会被再次处置, 且举动1个反作用,对于此新闻的PEL条件也会被肃清,从Redis效劳器开释内乱存。XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]该饬令用于办理淌数据构造干系的耗费者组。
那里应用Jedis停止编码尝试,支到新闻后不脚动ACK确认,用于演练处置此类数据,曲交望代码
packagecom.example.springboot;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importredis.clients.jedis.Jedis;importredis.clients.jedis.StreamEntryID;importredis.clients.jedis.params.XAddParams;importredis.clients.jedis.params.XPendingParams;importredis.clients.jedis.params.XReadGroupParams;importredis.clients.jedis.resps.StreamEntry;importredis.clients.jedis.resps.StreamPendingEntry;importjava.time.LocalDateTime;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.stream.IntStream;/***应用jedis实行监闻stream新闻*/publicclassJedisStreamMQTest{privatestaticStringhost="127.0.0.1";privatestaticintport=6379;privatestaticStringpassword="l52u27lv1Jur";//Stream称号privatestaticStringstreamKeyName="Javacui";//组称呼privatestaticStringgroupName="g1";//消磨者称呼privatestaticString[]consumerNames={"c1","c2"};publicstaticvoidmain(String[]args){JedisStreamMQTesttest=newJedisStreamMQTest();//成立群组test.createGroup(streamKeyName,groupName);//写进尝试数据newThread(()->{Jedisjedis=newJedis(host,port);jedis.auth(password);while(true){try{Thread.sleep(1000L);Map<String,String>map=newHashMap<>();map.put("CurrentTime",LocalDateTime.now().toString());map.put("BlogUrl","www.javacui.com");map.put("BlogAuth","java小强");jedis.xadd(streamKeyName,map,XAddParams.xAddParams());}catch(Exceptione){e.printStackTrace();}}}).start();//处置营业数据test.listenerByGroup(streamKeyName,groupName,consumerNames);//处置已ACK的数据test.listenerNoAck(streamKeyName,groupName);}/***应用群组战泯灭者监闻,该监闻能够保证新闻没有会反复花费,原因每一个组每一个用户只会耗费1次新闻*应用群组战消磨者的观点读与,多个线程会反复生产数据*/privatevoidlistenerByGroup(StringkeyName,StringgroupName,String...consumerNames){Map<String,StreamEntryID>entryIDMap=newHashMap<>();entryIDMap.put(keyName,StreamEntryID.UNRECEIVED_ENTRY);//缔造二个线程用于演练IntStream.range(0,2).forEach(i->{newThread(()->{Jedisjedis=newJedis(host,port);jedis.auth(password);while(true){try{//底下的xreadGroup办法同等取redis中的xreadgroup饬令,将block窒塞年光设备为0显示从来壅闭晓得支到新闻,而后下面StreamEntryID配置为接纳最新值List<Map.Entry<String,List<StreamEntry>>>entries=jedis.xreadGroup(groupName,consumerNames[i],XReadGroupParams.xReadGroupParams().block(0),entryIDMap);if(null!=entries&&!entries.isEmpty()){for(Map.Entry<String,List<StreamEntry>>entry:entries){System.out.println("支到数据:"+JSON.toJSONString(entry));}}//那里不ACK,用于演练营业处置进程中形成同常后,由其余线程处置那些数据}catch(Exceptione){e.printStackTrace();}}}).start();});}/***用于处置仍旧读与然则不被ACK的数据*/privatevoidlistenerNoAck(StringkeyName,StringgroupName){newThread(()->{Jedisjedis=newJedis(host,port);jedis.auth(password);while(true){//经由过程损耗者组从淌中获得数据,而没有是确认那些数据,拥有创设待处置条件的效益//查问待ACK的条件,注重必需是有客户端XREAD过然则不ACK,那里与1条,会前往待处置全部数据的最前方1条List<StreamPendingEntry>list=jedis.xpending(keyName,groupName,XPendingParams.xPendingParams("-","+",1));if(null!=list&&!list.isEmpty()){//经由过程最前方1条数据的ID,而后查该ID到末了的全部待ACK数据,1次索取10条List<StreamEntry>streamEntryList=jedis.xrange(keyName,list.get(0).getID().getTime()+"","+",10);for(StreamEntryentry:streamEntryList){System.out.println("支到待处置数据:"+JSONObject.toJSONString(entry));//确认新闻,一朝损耗者乐成天处置完1条新闻,它应当挪用XACK,如许那个新闻便没有会被再次处置//且当作1个反作用,对于此新闻的PEL条款也会被肃清,从Redis效劳器开释内乱存jedis.xack(keyName,groupName,entry.getID());}}}}).start();}/***创设Group*/privatevoidcreateGroup(StringkeyName,StringgroupName){Jedisjedis=newJedis(host,port);jedis.auth(password);try{jedis.xgroupCreate(keyName,groupName,StreamEntryID.LAST_ENTRY,true);}catch(Exceptione){//那里拘捕同常的缘故是大概建树时群组仍旧生存}}}END
推举您浏览更多相关于“ 队伍streamredis新闻确认 ”的作品
文章推荐
Copyright © 2024-2025 燿动吧 – 知识分享,快乐你我,燿动青春 http://www.yaodong8.com All Rights Reserved 网站地图