睡眠基础数据落库
我们在物联网睡眠类项目中,用户会有这样的需求:用户使用睡眠类设备在晚上睡眠之后,第二天会早上会查看睡眠报告,以此来判断自己的睡眠质量。这样的需求是很常见的,那么面对这样的业务需求,我们如何设计实现呢?
面对上面的业务需求,我们规划设计实现。
首先晚上睡眠设备肯定是会不断上报数据的,所以我们需要记录基础睡眠数据
。然后使用定时调度
,针对基础睡眠数据来定时在早上9点生产最终的睡眠报告
。
一、基础数据:用户每天的睡眠状态落库
首先,我们来模拟用户每天的睡眠基础数据(睡眠状态数据) 发送到Kafka,然后报告系统消费落库。
1.1 用户每天的睡眠状态发送到Kafka
@Resource
private KafkaProducer kafkaProducer;
/**
* 大数据平台从算法平台识别出睡眠状态数据,然后发送睡眠状态数据到Kafka
*/
@Test
public void sendSleepStatus() {
// 模拟用户每天的睡眠状态数据
List<SleepStatusVO> sleepStatusVOList = Lists.newArrayList();
for(int i = 2; i <= 4; i++) {
SleepStatusVO sleepStatusVO = new SleepStatusVO();
sleepStatusVO.setStartTime(DateUtil.parse(String.format("2024-05-26 12:%d0:00", i)));
sleepStatusVO.setStatus(i);
sleepStatusVO.setUserId(1);
sleepStatusVO.setDay("2024-05-26");
sleepStatusVOList.add(sleepStatusVO);
}
// 睡眠状态数据发送到Kafka
log.info("用户每天的睡眠状态数据发送到Kafka,sleepStatusVOList: {}", JSON.toJSONString(sleepStatusVOList));
kafkaProducer.send(sleepStatusVOList, "sleep_status_data_topic");
}
然后,我们有一个独立的报告系统服务,订阅消费用户每天的睡眠状态数据,并落库。保存这个睡眠状态数据的目的是便于后面定时调度系统来基于睡眠状态数据来生成最终的睡眠报告。
先添加spring-kafka依赖
<!--kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
订阅用户每天的睡眠状态数据,保存到user_day_data表中。
首先创建表,然后创建mapper、model实体类。手动创建这些类,太麻烦了,mybatisx插件可以自动生成mapper、model。
安装MyBatisX插件
填写数据库连接信息
选择要生成mapper、model的表,右键选择生成
设置包名称、实体类名称
如此,便可自动生成mapper、model等。
1.2 报告系统消费保存用户每天的睡眠状态数据
@Component
@Slf4j
public class KafkaConsumer {
@Resource
private UserDayDataMapper userDayDataMapper;
/**
* 从topic订阅接收、处理睡眠状态数据
* @param record
* @param ack
* @param topic
*/
@KafkaListener(topics = "sleep_status_data_topic", groupId = "sleep_status_data_group")
public void handle(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("订阅接收、处理睡眠状态数据:{}", JSON.toJSONString(msg));
List<SleepStatusVO> sleepStatusVOList = JSON.parseArray((String) msg, SleepStatusVO.class);
// 用户某天的睡眠状态数据,保存到user_day_data表中
UserDayData userDayData = new UserDayData();
userDayData.setUserId(sleepStatusVOList.get(0).getUserId());
userDayData.setDay(sleepStatusVOList.get(0).getDay());
List statusList = sleepStatusVOList.stream().map(s->{
Map<String, Object> statusMap = new HashMap<>(2);
statusMap.put("status", s.getStatus());
statusMap.put("startTime", DateUtil.formatDateTime(s.getStartTime()));
return statusMap;
}).collect(Collectors.toList());
userDayData.setStatus(JSON.toJSONString(statusList));
Date date = new Date();
userDayData.setCreateTime(date);
userDayData.setUpdateTime(date);
log.info("保存用户每天的睡眠状态数据,userDayData: {}", JSON.toJSONString(userDayData));
userDayDataMapper.saveUserDayStatus(userDayData);
ack.acknowledge();
}
}
}
用户每天的睡眠状态数据,转为json格式保存到user_day_data表的status字段。整合在一张表中,因为到时候查询用户某一天的睡眠状态,也是以天+用户为维度,可以把相同维度的数据整合在一起,一次性存储到一张表中,避免多次写入IO性能开销。同时也避免多表存储造成join查询带来性能开销。
二、基础数据:用户每天的心率、呼吸率均值落库
除了上面的用户每天睡眠状态这样的基础数据需要落库保存之外,还有用户每天的每5分钟的心率、呼吸率均值数据,需要落库保存。这样的数据的特点:
- 频繁写入
- 数据量大
- 写多读少
- 属于时序数据
适合使用时序数据库存储,例如TDengine来存储。
用户每天的每5分钟的心率、呼吸率均值数据由大数据平台计算出来之后,首先是要发送到Kafka,避免直接落库造成数据库压力过大,然后睡眠报告系统接收保存到TDengine。睡眠报告系统部署3个节点,保证水平扩展性和高可用性。TDengine部署3节点集群,保证高可用和水平扩展。
2.1 发送用户心率、呼吸率到Kafka
所以首先发送用户每天的每5分钟的心率、呼吸率均值数据到Kafka。
/**
* 发送用户每天的每5分钟的心率、呼吸率均值数据到Kafka
*/
@Test
public void send5minHeartRate() {
// 所以首先发送用户每天的每5分钟的心率、呼吸率均值数据到Kafka。
User5minAvgDataVO user5minAvgDataVO = new User5minAvgDataVO();
user5minAvgDataVO.setUserId(1);
user5minAvgDataVO.setDay("2024-05-29");
user5minAvgDataVO.setHeartRate(30);
user5minAvgDataVO.setBreathRate(60);
kafkaProducer.send(JSON.toJSONString(user5minAvgDataVO), "user_5min_avg_data_topic");
}
2.2 睡眠报告系统订阅保存心率呼吸率数据
然后,睡眠报告系统从Kafka订阅接收保存到TDengine。
2.2.1 配置双数据源
注意:这里需要配置双数据源。项目中涉及到mysql和TDengine两种类型的数据库。当然需要先引入必要的依赖。
<!--引入druid数据源-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.9</version>
</dependency>
<!--引入动态数据源依赖-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<!--taos-jdbc驱动-->
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.5</version>
</dependency>
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
dynamic:
primary: master # 默认数据源
datasource:
master:
url: jdbc:mysql://192.168.56.200:3306/learn?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false
username: root
password: password
driver-class-name: com.mysql.jdbc.Driver
# TDengine数据源
taosd:
driver-class-name: com.taosdata.jdbc.TSDBDriver
# 连接第一数据节点
url: jdbc:TAOS://192.168.56.200:6030/test_db?timezone=Asia/Beijing&charset=UTF-8
username: root
password: taosdata
druid:
initialSize: 5
minIdle: 5
maxActive: 200
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: false
filters: stat,wall,log4j2
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500
注意:启动类中,排除掉DruidDataSourceAutoConfigure,不然会报错:Failed to determine a suitable driver class
/**
* 报告系统
* 注意:排除掉DruidDataSourceAutoConfigure,不然会报错:Failed to determine a suitable driver class
* @author
*/
@SpringBootApplication(exclude = DruidDataSourceAutoConfigure.class)
@MapperScan(value = "org.example.report.mapper")
public class ReportApplication {
public static void main( String[] args ) {
SpringApplication.run(ReportApplication.class);
}
}
2.2.2 编写mapper
上面的准备工作做好之后,接下来我们编写操作TDengine数据表的mapper。这个mapper跟操作mysql表的mapper类似。
数据写入TDengine数据表之前,需要先创建超级表。下面是创建超级表sql。
create stable if not exists st_device_base_data (data_time TIMESTAMP, heart_rate int, breath_rate int, create_time TIMESTAMP) TAGS (user_id int, day binary(64));
设计超级表结构
字段 | 类型 | 描述 |
---|---|---|
data_time | TIMESTAMP | 数据生成时间 |
heart_rate | int | 心率 |
breath_rate | int | 呼吸率 |
create_time | TIMESTAMP | 数据保存到表的时间 |
标签
字段 | 类型 | 描述 |
---|---|---|
user_id | int | 用户id |
day | binary(64) | 数据日期,例如:2024-06-03 |
表设计好之后,可以编写mapper来保存设备基础数据(心率、呼吸率)。
/**
* @description: 用户(设备)每天的每5分钟的基础数据均值(心率、呼吸率均值)mapper
* @author:xg
* @date: 2024/4/5
* @Copyright:
*/
@Mapper
@DS("taosd")
public interface DeviceBaseDataMapper extends BaseMapper<DeviceBaseDataPO> {
/**
* 保存用户基础数据
* @param deviceBaseDataPO
*/
void save(DeviceBaseDataPO deviceBaseDataPO);
}
表对应的model类
/**
* @description: 用户(设备)每天的基础数据(心率、呼吸率)
* @author:xg
* @date: 2024/4/5
* @Copyright:
*/
@Data
public class DeviceBaseDataPO {
/**
* 数据产生时间
*/
private Timestamp dataTime;
/**
* 心率
*/
private Integer heartRate;
/**
* 呼吸率
*/
private Integer breathRate;
/**
* 数据保存到表的时间
*/
private Timestamp createTime;
}
xml中的sql语句。
<insert id="save" parameterType="org.example.report.model.DeviceBaseDataPO">
insert into tb_device_base_data_#{macAdd}
using st_device_base_data tags(#{userId}, #{day})
values (#{dataTime}, #{heartRate}, #{breathRate}, #{createTime})
</insert>
2.2.3 订阅心率呼吸率数据保存到TDengine
最终,我们的消费者逻辑:从topic订阅接收 "用户每天的每5分钟的心率、呼吸率均值数据" 保存到TDengine
@KafkaListener(topics = "user_5min_avg_data_topic", groupId = "user_5min_avg_data_group")
public void handle(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("用户每天的每5分钟的心率、呼吸率均值数据:{}", JSON.toJSONString(msg));
User5minAvgDataVO user5minAvgDataVO = JSON.parseObject(JSON.toJSONString(msg), User5minAvgDataVO.class);
// 用户每天的每5分钟的心率、呼吸率均值数据,保存到TDengine数据库的device_base_data表中
log.info("保存用户每天的每5分钟的心率、呼吸率均值数据,user5minAvgDataVO: {}", JSON.toJSONString(user5minAvgDataVO));
DeviceBaseDataPO deviceBaseDataPO = new DeviceBaseDataPO();
BeanUtils.copyProperties(user5minAvgDataVO, deviceBaseDataPO);
// 设置数据保存到表的时间
Timestamp creatTime = Timestamp.valueOf(DateUtil.formatDateTime(new Date()));
log.info("creatTime: {}", creatTime);
deviceBaseDataPO.setCreateTime(creatTime);
deviceBaseDataMapper.save(deviceBaseDataPO);
ack.acknowledge();
}
}
最后,我们验证下模拟发送设备每5分钟心率呼吸率数据到kafka,然后订阅保存心率呼吸率数据到TDengine数据表。
/**
* 发送用户(设备)每天的每5分钟的心率、呼吸率均值数据到Kafka
*/
@Test
public void send5minHeartRate() {
// 所以首先发送用户每天的每5分钟的心率、呼吸率均值数据到Kafka。
User5minAvgDataVO user5minAvgDataVO = new User5minAvgDataVO();
user5minAvgDataVO.setUserId(1);
user5minAvgDataVO.setDay("2024-05-29");
user5minAvgDataVO.setHeartRate(30);
user5minAvgDataVO.setBreathRate(60);
user5minAvgDataVO.setMacAdd("m001");
user5minAvgDataVO.setDataTime(new Date());
kafkaProducer.send(JSON.toJSONString(user5minAvgDataVO), "user_5min_avg_data_topic");
}
转载自:https://juejin.cn/post/7379865729024540684