实时计算与定时计算

定时计算是争对首页推荐文章更新
实时计算是争对点赞、收藏数的
流式计算应用场景
- 日志分析:网站的用户访问日志进行实时的分析,计算访问量、用户画像、留存率等。实时的进行数据分析。
- 大屏看板统计:实时的查看网站注册数量,订单数量,购买数量,金额等。
- 公交的实时数据:可以随时更新公交车方位,计算多久到达站牌。
- 实时文章热度计算:头条类文章的分值计算,通过用户的行为实时更新文章的分值,分值越高的就越被推荐。
Kafka Stream

源处理器(Source Processor):消息的生产者(可以有多个),发送消息给Stream流式处理,Stream汇总数据后往下游发送给当前某个topic。
案例. 统计单词个数

- 引入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency>
|
- 编写流式处理代码
public class KafkaStreamQuickStart { public static void main(String[] args) { Properties prop = new Properties(); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.140.102:9092"); prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart"); StreamsBuilder streamsBuilder = new StreamsBuilder();
streamProcessor(streamsBuilder);
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), prop); streams.start(); }
private static void streamProcessor(StreamsBuilder streamsBuilder) { KStream<String, String> stream = streamsBuilder.stream("xiaolin0333-topic-input");
stream.flatMapValues(value -> Arrays.asList(value.split(" "))) .groupBy((key, value) -> value) .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) .count() .toStream() .map((key, value) -> new KeyValue<>(key.key().toString(), value.toString())) .to("xiaolin0333-topic-out");
} }
|
- 测试
- 生产者在topic为xiaolin0333-topic-input中发送多条消息
- 消费者接收topic为xiaolin0333-topic-out
通过流式计算,把生产者的多条消息汇总成一条发送到消费者中输出
springboot集成kafkaStream
- 添加配置
@Data @Configuration @EnableKafkaStreams @ConfigurationProperties(prefix="kafka") public class KafkaStreamConfig { private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024; private String hosts; private String group; @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration defaultKafkaStreamsConfig() { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid"); props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid"); props.put(StreamsConfig.RETRIES_CONFIG, 10); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return new KafkaStreamsConfiguration(props); } }
|
kafka: hosts: 192.168.140.102:9092 group: ${spring.application.name}
|
- 在配置类中定义方法:
@Configuration @Slf4j public class KafkaStreamHelloListener { @Bean public KStream<String, String> kStream(StreamsBuilder streamsBuilder) { KStream<String, String> stream = streamsBuilder.stream("xiaolin0333-topic-input");
stream.flatMapValues(value -> Arrays.asList(value.split(" "))) .groupBy((key, value) -> value) .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) .count() .toStream() .map((key, value) -> new KeyValue<>(key.key().toString(), value.toString())) .to("xiaolin0333-topic-out"); return stream; } }
|
应用:热点文章实时计算
用户点赞、阅读、评论、收藏后,发送消息给kafkastream,更新数据库的数量,重新计算文章分值,更新对应频道的缓存数据以及推荐对应的热点数据。

- 在点赞、阅读、收藏、评论的业务代码下发送消息
UpdateArticleMess mess = new UpdateArticleMess(); mess.setArticleId(dto.getArticleId()); mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS); mess.setAdd(1); kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));
|
- 使用kafkaStream实时接收消息
@Configuration @Slf4j public class HotArticleStreamHandler { @Bean public KStream<String, String> kStream(StreamsBuilder builder) { KStream<String, String> stream = builder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC); stream.map((key, value) -> { UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class); return new KeyValue<>(mess.getArticleId().toString(), mess.getType().name() + ":" + mess.getAdd()); }).groupBy((key, value) -> key) .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) .aggregate(new Initializer<String>() {
@Override public String apply() { return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0"; } }, new Aggregator<String, String, String>() {
@Override public String apply(String key, String value, String aggValue) { if(StringUtils.isBlank(value)) { return aggValue; } String[] aggAry = aggValue.split(","); int col = 0, com = 0, like = 0, views = 0;
for (String agg : aggAry) { String[] split = agg.split(":"); switch(UpdateArticleMess.UpdateArticleType.valueOf(split[0])){ case COLLECTION: col = Integer.parseInt(split[1]); break; case COMMENT: com = Integer.parseInt(split[1]); break; case LIKES: like = Integer.parseInt(split[1]); break; case VIEWS: views = Integer.parseInt(split[1]); break; } }
String[] valAry = value.split(":"); switch(UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){ case COLLECTION: col += Integer.parseInt(valAry[1]); break; case COMMENT: com += Integer.parseInt(valAry[1]); break; case LIKES: like += Integer.parseInt(valAry[1]); break; case VIEWS: views += Integer.parseInt(valAry[1]); break; } String format = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, like, views); System.out.println("文章id" + key); System.out.println("当前时间窗口内消息的处理结果" + format); return format; } }, Materialized.as("hot-article-stream-count-001")) .toStream() .map((key, value)-> { return new KeyValue<>(key.key().toString(), formatObj(key.key().toString(), value)); }) .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC); return stream; }
public String formatObj(String articleId, String value) { ArticleVisitStreamMess mess = new ArticleVisitStreamMess(); mess.setArticleId(Long.valueOf(articleId)); String[] valueArr = value.split(","); for (String val : valueArr) { String[] valAry = val.split(":"); switch(UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){ case COLLECTION: mess.setCollect(Integer.parseInt(valAry[1])); break; case COMMENT: mess.setComment(Integer.parseInt(valAry[1])); break; case LIKES: mess.setLike(Integer.parseInt(valAry[1])); break; case VIEWS: mess.setView(Integer.parseInt(valAry[1])); break; } } log.info("聚合消息处理之后的结果:{}", JSON.toJSONString(mess)) ; return JSON.toJSONString(mess); } }
|
- 添加listener监听器,重新计算文章的分值,并更新到数据库和缓存中
监听器代码:
@Component public class ArticleIncrHandleListener { @Autowired private ApArticleService apArticleService; @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC) public void onMessage(String message) { if(StringUtils.isNotBlank(message)){ ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(message, ArticleVisitStreamMess.class); apArticleService.updateScore(articleVisitStreamMess); } } }
|
业务代码:
@Override public void updateScore(ArticleVisitStreamMess mess) { ApArticle apArticle = updateArticle(mess); int score = computeScore(apArticle); score *= 3; replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId(), apArticle, score); replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG, apArticle, score); }
private ApArticle updateArticle(ArticleVisitStreamMess mess) { ApArticle article = getById(mess.getArticleId()); article.setCollection((article.getCollection() == null ? 0 : article.getCollection()) + mess.getCollect()); article.setComment((article.getComment() == null ? 0 : article.getComment()) + mess.getComment()); article.setLikes((article.getLikes() == null ? 0 : article.getLikes()) + mess.getLike()); article.setViews((article.getViews() == null ? 0 : article.getViews()) + mess.getView()); updateById(article); return article; }
private int computeScore(ApArticle apArticle) { Integer scere = 0; if(apArticle.getLikes() != null){ scere += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT; } if(apArticle.getViews() != null){ scere += apArticle.getViews(); } if(apArticle.getComment() != null){ scere += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT; } if(apArticle.getCollection() != null){ scere += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT; } return scere; }
private void replaceDataToRedis(String key, ApArticle apArticle, int score) { String articleListStr = cacheService.get(key); if (StringUtils.isNotBlank(articleListStr)) { List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class); Boolean flag = true; for (HotArticleVo hotArticleVo : hotArticleVoList) { if (hotArticleVo.getId().equals(apArticle.getId())) { hotArticleVo.setScore(score); flag = false; break; } } if (flag && hotArticleVoList.size() >= 30) { hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()) .collect(Collectors.toList()); HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1); if (lastHot.getScore() < score) { hotArticleVoList.remove(hotArticleVoList.size() - 1); HotArticleVo hot = new HotArticleVo(); BeanUtils.copyProperties(apArticle, hot); hot.setScore(score); hotArticleVoList.add(hot); } } else { HotArticleVo hot = new HotArticleVo(); BeanUtils.copyProperties(apArticle, hot); hot.setScore(score); hotArticleVoList.add(hot); } hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()) .collect(Collectors.toList()); cacheService.set(key, JSON.toJSONString(hotArticleVoList)); } }
|