简介

OpenGemini时序数据库对象关系映射框架,适用2.x版本

版本

版本号支持功能升级日期
0.0.4OpenGeminiDao自定义SQL2024/1/5
0.0.3Spring配置使用单连接操作数据库
使用InfluxDB写入数据
支持批量写入
支持time设置
2023/11/27
0.0.2OpenGeminiService通用接口2023/11/17

配置

依赖仓库

<repository>
    <id>yth-repository</id>
    <name>yth-repository</name>
    <url>http://10.1.1.43:8081/repository/yth-repository/</url>
    <releases>
        <enabled>true</enabled>
        <updatePolicy>never</updatePolicy>
    </releases>
    <snapshots>
        <enabled>true</enabled>
        <updatePolicy>never</updatePolicy>
    </snapshots>
</repository>

pom依赖

<dependency>
   <groupId>com.yth.tsdb</groupId>
   <artifactId>opengemini-common</artifactId>
   <version>0.0.4</version>
</dependency>

yml配置

spring:
  influx:
    url: http://10.1.1.127:8086
    user: admin
    password: password

组件扫描

@ComponentScan(basePackages = {"com.yth.tsdb"})

OpenGeminiService

OpenGeminiService是基于InfluxDB与InfluxDBMapper做的对象关系映射的通用接口封装

Entity

  • @Measurement对应的name属性对应数据库中的表名
  • 字段需要@Column注解修饰,name对应表字段名,tag=true对应mysql中的索引
  • OpenGemini支持的字段类型:String、Double、Integer、Boolean、Long
import com.yth.tsdb.opengemini.entity.OpenGeminiEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;

@Data
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
@Measurement(name = "example")
public class ExampleEntity extends OpenGeminiEntity {

    @Column(name = "code", tag = true)
    private String code;
    
    @Column(name = "env_code", tag = true)
    private String envCode;

    @Column(name = "max_value")
    private Double maxValue;

    @Column(name = "min_value")
    private Integer minValue;

    @Column(name = "flag")
    private Boolean flag;

    @Column(name = "collect_time")
    private Long collectTime;

}

Service

  • 实体类对应的Service接口,需继承OpenGeminiService接口
  • 对标内核层的Service, 应只进行数据操作,不进行业务处理
import cn.yth.base.repository.entity.ExampleEntity;
import com.yth.tsdb.opengemini.service.OpenGeminiService;

public interface ExampleService extends OpenGeminiService<ExampleEntity> {

}

ServiceImpl

  • Service接口的实现类,需继承OpenGeminiServiceImpl
import cn.yth.base.repository.entity.ExampleEntity;
import cn.yth.base.repository.service.ExampleService;
import com.yth.tsdb.opengemini.service.impl.OpenGeminiServiceImpl;
import org.springframework.stereotype.Service;

@Service
public class ExampleServiceImpl extends OpenGeminiServiceImpl<ExampleEntity> implements ExampleService {

}

测试类

import cn.yth.base.repository.entity.ExampleEntity;
import cn.yth.base.repository.service.ExampleService;
import cn.yth.common.base.PageResponse;
import com.alibaba.fastjson.JSON;
import com.yth.tsdb.opengemini.util.DateTimeUtil;
import org.influxdb.dto.Query;
import org.influxdb.querybuilder.WhereQueryImpl;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.*;

@RunWith(SpringRunner.class)
@SpringBootTest
public class OpenGeminiServiceTest {

    @Resource
    private ExampleService exampleService;

    @Test
    public void testSaveOne(){
        ExampleEntity example = new ExampleEntity();
        example.setCode("openGemini");
        example.setEnvCode("dev");
        example.setMinValue(50);
        example.setMaxValue(66.12);
        example.setFlag(true);
        LocalDateTime now = LocalDateTime.now();
        example.setCollectTime(DateTimeUtil.localDateTimeToNanosecond(now));
         // 可以不加, 默认写入CTT时间
        example.setTime(DateTimeUtil.localDateTimeToInstant(now));
        exampleService.save("test", example);
    }

    //批量插入
    @Test
    public void testSaveBatch(){
        ExampleEntity example1 = new ExampleEntity();
        example1.setCode("influxdb");
        example1.setEnvCode("dev");
        example1.setMinValue(100);
        example1.setMaxValue(183.1);
        example1.setFlag(true);

        ExampleEntity example2 = new ExampleEntity();
        example2.setCode("influxdb");
        example2.setEnvCode("prod");
        example2.setMinValue(100);
        example2.setMaxValue(158.99);
        example2.setFlag(true);

        ExampleEntity example3 = new ExampleEntity();
        example3.setCode("influxdb");
        example3.setEnvCode("test");
        example3.setMinValue(1000);
        example3.setMaxValue(1586.99);
        example3.setFlag(true);

        List<ExampleEntity> exampleList = new ArrayList<>();

        exampleList.add(example1);
        exampleList.add(example2);
        exampleList.add(example3);
        exampleService.saveBatch("test", exampleList);
    }

    @Test
    public void testList(){
        List<ExampleEntity> exampleList = exampleService.list("test");
        exampleList.forEach(System.out::println);
    }

    @Test
    public void testSortList() {
        WhereQueryImpl<?> whereQuery = exampleService.buildWhereQuery("test")
                .and(eq("code", "influxdb"));

        Sort sort = Sort.by(Sort.Direction.DESC, "min_value").and(
                Sort.by(Sort.Direction.ASC, "collect_time")
        );

        /*
         * 带排序的列表查询, 译为:
         * select * from example where code = 'influxdb' order by min_value desc,collect_time asc
         */
        List<ExampleEntity> exampleList = exampleService.list(whereQuery, sort);
        exampleList.forEach(System.out::println);
    }

    @Test
    public void testNestedList() {
        String measurementName = exampleService.getMeasurementName();
        Query query = select().from("test", measurementName)
                .where(eq("code","influxdb"))
                .andNested()
                .and(gt("max_value",666.21))
                .or(lt("max_value",123.45))
                .close();

        /*
         * 嵌套查询, 译为:
         * SELECT * FROM example WHERE code = 'influxdb' AND (max_value > 666.21 OR max_value < 123.45);
         */
        List<ExampleEntity> exampleList = exampleService.list(query);
        exampleList.forEach(System.out::println);
    }

    @Test
    public void testPageList() {
        // 分页从0开始, 第一页为0
        Pageable pageable = PageRequest.of(0, 2,
                Sort.by(Sort.Direction.DESC, "collect_time"));

        /*
         * 使用分页参数的list查询, 译为:
         * SELECT * FROM example WHERE code = 'influxdb' ORDER BY collect_time desc LIMIT 2
         */
        WhereQueryImpl<?> whereQuery = exampleService.buildWhereQuery("test").and(eq("code", "influxdb"));
        List<ExampleEntity> exampleList = exampleService.list(pageable, whereQuery);
        exampleList.forEach(System.out::println);
    }

    @Test
    public void testPage() {
        LocalDateTime startOfDay = LocalDateTime.now().minusHours(3);
        LocalDateTime endOfDay = LocalDateTime.now();

        Pageable pageable = PageRequest.of(0, 10,
                Sort.by(Sort.Direction.DESC, "collect_time"));

        /*
         * 注:为了查询结果与工具上展示的time值正确, time的默认写入为CTT时间, UTC+8, 可自行覆盖
         * time的查询可以使用字符串与时间戳入参, 区分为:
         *      字符串: 可以使用写入time对应的 yyyy-MM-dd HH🇲🇲ss字符串查询
         *      时间戳: LocalDateTime默认转换的时间戳为UTC, 如time写入了CTT时间,则查询需要+8小时才能正确查询到
         *
         * 带排序的分页查询, 译为:
         * SELECT * FROM example WHERE code = 'openGemini'
         * AND time >= '2024-01-04 11:08:05' AND time < '2024-01-04 14:08:05'
         * AND time >= 1704366485542000000 AND time < 1704377285542000000
         * ORDER BY collect_time desc LIMIT 10
         */
        Page<ExampleEntity> pageResult = exampleService.page(pageable, exampleService.buildWhereQuery("test")
                .and(eq("code", "openGemini"))
                .and(gte("time", DateTimeUtil.localDatetimeFormatNorm(startOfDay)))
                .and(lt("time", DateTimeUtil.localDatetimeFormatNorm(endOfDay)))
                .and(gte("time", DateTimeUtil.localDateTimePlusHours8ToNanosecond(startOfDay)))
                .and(lt("time", DateTimeUtil.localDateTimePlusHours8ToNanosecond(endOfDay)))
        );

        Pageable page = pageResult.getPageable();
        List<ExampleEntity> resultList = pageResult.getContent();

        // 转换对应的分页
        PageResponse<ExampleEntity> pageResponse = new PageResponse<>(200, "OK", resultList,
                page.getPageNumber(), page.getPageSize(),
                pageResult.getTotalElements());

        System.out.println(JSON.toJSONString(pageResponse));
    }

}

OpenGeminiDao

  • OpenGeminiDao是基于InfluxDB做的自定义SQL支持,继承该接口后, 可使用自定义SQL功能, 执行器会对结果集做对象关系映射

启动扫描配置

  • 启动类上添加@OpenGeminiDaoScan注解,扫描需要被代理的DAO
  • 因与Mybatis Mapper放同一个包下会导致Mybatis扫描异常,故命名为Dao,Dao包与mybatis的mapper包放在同层级即可
@OpenGeminiDaoScan(scanLocation = {"cn.yth.base.repository.dao"})

Dto

import lombok.Data;
import org.influxdb.annotation.Column;

@Data
public class ExampleDto {

    private String code;
    
    @Column(name = "env_code")
    private String envCode;

    private Double amount;

}

Dao

  • 实体类对应的Dao,对应Mybatis的mapper,需继承OpenGeminiDao
  • OpenGeminiDao已进行了insert、insertBatch的封装,在此写查询接口即可
  • 所有的方法设计遵循不跨库的原则,方法首参数必须为@Param修饰的参数database
import cn.yth.base.model.dto.ExampleDto;
import cn.yth.base.repository.entity.ExampleEntity;
import com.yth.tsdb.opengemini.annotation.Param;
import com.yth.tsdb.opengemini.annotation.Select;
import com.yth.tsdb.opengemini.dao.OpenGeminiDao;
import java.util.List;

public interface ExampleDao extends OpenGeminiDao<ExampleEntity> {

    @Select(value = "select * from example where code = #{code}")
    List<ExampleEntity> selectByCode(@Param("database") String database, @Param("code") String code);

    @Select(value = "select count(min_value) as amount from example group by code order by amount desc")
    List<ExampleDto> groupByCode(@Param("database") String database);

    @Select(value = "select mean(max_value) as amount from example")
    ExampleDto selectMean(@Param("database") String database);

    @Select(value = "select mean(min_value) as amount from example group by code, env_code")
    List<ExampleDto> selectMeanGroupCode(@Param("database") String database);

    @Select(value = "select sum(min_value) as amount from example")
    ExampleDto selectSum(@Param("database") String database);

    @Select(value = "select sum(max_value) as amount from example group by code")
    List<ExampleDto> selectSumGroup(@Param("database") String database);

    @Select(value = "select code, top(max_value, 3) as amount from example")
    List<ExampleDto> selectTop(@Param("database") String database);

    @Select(value = "select code, BOTTOM(min_value, 3) as amount from example")
    List<ExampleDto> selectBottom(@Param("database") String database);

}

测试类

import cn.yth.base.model.dto.ExampleDto;
import cn.yth.base.repository.dao.ExampleDao;
import cn.yth.base.repository.entity.ExampleEntity;
import cn.yth.base.util.DateTimeUtil;
import com.alibaba.fastjson.JSON;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
public class OpenGeminiDaoTest {

    @Resource
    private ExampleDao exampleDao;

    @Test
    public void testInsertOne(){
        ExampleEntity example = new ExampleEntity();
        example.setCode("influxdb");
        example.setEnvCode("dev");
        example.setMinValue(50);
        example.setMaxValue(66.12);
        example.setFlag(true);
        LocalDateTime now = LocalDateTime.now();
        example.setCollectTime(DateTimeUtil.localDateTimeToNanosecond(now));
        exampleDao.insert("test", example);
    }

    @Test
    public void testInsertBatch(){
        ExampleEntity example1 = new ExampleEntity();
        example1.setCode("influxdb");
        example1.setEnvCode("dev");
        example1.setMinValue(60);
        example1.setMaxValue(123.45);
        example1.setFlag(true);

        ExampleEntity example2 = new ExampleEntity();
        example2.setCode("influxdb");
        example2.setEnvCode("prod");
        example2.setMinValue(100);
        example2.setMaxValue(666.21);
        example2.setFlag(false);

        ExampleEntity example3 = new ExampleEntity();
        example3.setCode("influxdb");
        example3.setEnvCode("test");
        example3.setMinValue(100);
        example3.setMaxValue(666.21);
        example3.setFlag(false);

        List<ExampleEntity> exampleList = new ArrayList<>();

        exampleList.add(example1);
        exampleList.add(example2);
        exampleList.add(example3);
        exampleDao.insertBatch("test", exampleList);
    }

    @Test
    public void testSelect(){
        List<ExampleEntity> exampleList = exampleDao.selectByCode("test", "other");
        exampleList.forEach(System.out::println);
    }

    @Test
    public void testGroup(){
        List<ExampleDto> exampleList = exampleDao.groupByCode("test");
        exampleList.forEach(System.out::println);
    }

    @Test
    public void testMean(){
        ExampleDto example = exampleDao.selectMean("test");
        System.out.println(JSON.toJSONString(example));
    }

    @Test
    public void testMeanGroup(){
        List<ExampleDto> exampleList = exampleDao.selectMeanGroupCode("test");
        exampleList.forEach(System.out::println);
    }

    @Test
    public void testSum(){
        ExampleDto example = exampleDao.selectSum("test");
        System.out.println(JSON.toJSONString(example));
    }

    @Test
    public void testSumGroup(){
        List<ExampleDto> exampleList = exampleDao.selectSumGroup("test");
        exampleList.forEach(System.out::println);
    }

    @Test
    public void testTop(){
        List<ExampleDto> exampleList = exampleDao.selectTop("test");
        exampleList.forEach(System.out::println);
    }

    @Test
    public void testBottom(){
        List<ExampleDto> exampleList = exampleDao.selectBottom("test");
        exampleList.forEach(System.out::println);
    }

}

注意事项

时间

  • 为了查询结果与工具上展示的time值正确, time的默认写入为CTT时间, UTC+8, 可自行覆盖
  • time的查询可以使用字符串与时间戳入参, 区分为:
    • 字符串: 可以使用写入time对应的 yyyy-MM-dd HH🇲🇲ss字符串查询
    • 时间戳: LocalDateTime默认转换的时间戳为UTC, 如time写入了CTT时间,则查询需要+8小时才能正确查询到
  • 可使用DateTimeUtil转换需要的时间格式