likes
comments
collection
share

Apache Ignite基本概念及最佳实践(二)——实战篇

作者站长头像
站长
· 阅读数 63

一、前言

这是系列的第二篇文章,第一篇讲解的是Ignite基础,如下

二、Demo实战

0.实体类

下面列出用到的实体类,省略get, set和toString方法

public class Class {
    @QuerySqlField(index = true)
    private int id;

    @QuerySqlField
    private String name;

    private int level;

    public Class(int id, String name) {
        this.id = id;
        this.name = name;
    }
}
public class Student {
    @QuerySqlField(index = true)
    private int classId;

    @QuerySqlField
    private String name;

    @QuerySqlField
    private int age;

    public Student(int classId, String name, int age) {
        this.classId = classId;
        this.name = name;
        this.age = age;
    }
}

1.连接ignite节点

有两种连接方式,Client连接Thin Client连接

// Client连接
private static Ignite getIgnite() {
    TcpDiscoverySpi spi = new TcpDiscoverySpi();
    TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
    ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500"));// 要连接的远程节点ip+端口,默认是47500
    spi.setLocalPort(47501);// 当前启动节点的端口号,不配置则从47500往后顺延
    spi.setIpFinder(ipFinder);
    IgniteConfiguration cfg = new IgniteConfiguration();
    cfg.setDiscoverySpi(spi);
    cfg.setClientMode(true);// 当前启动节点是否为Client节点,默认是false,即server节点
    Ignite ignite = Ignition.start(cfg);
    return ignite;
}

// Thin Client连接
private static IgniteClient getIgniteClient() {
    ClientConfiguration cfg = new ClientConfiguration()
            .setAddresses("127.0.0.1:10800")// Thin Client默认连接端口号是10800
            .setPartitionAwarenessEnabled(true);
    IgniteClient client = Ignition.startClient(cfg);
    return client;
}

2.创建缓存

创建最基本的缓存配置

  • 缓存名称
  • CacheMode
  • 配置索引(注意,Thin Client连接是不支持配置索引的)
// client cache
private static IgniteCache<String, String> getCache(Ignite ignite) {
    CacheConfiguration<String, String> cfg = new CacheConfiguration<>();
    cfg.setName("myCache");// 缓存名称
    cfg.setCacheMode(CacheMode.PARTITIONED);// cache模式
    cfg.setIndexedTypes(String.class, String.class);// 配置索引
    IgniteCache<String, String> cache = ignite.getOrCreateCache(cfg);
    return cache;
}

// thin client cache
private static ClientCache<String, String> getCache(IgniteClient ignite) {
    ClientCacheConfiguration classCfg = new ClientCacheConfiguration();
    classCfg.setName("myCache");// 缓存名称
    classCfg.setCacheMode(CacheMode.PARTITIONED);// cache模式
    ClientCache<String, String> cache = ignite.getOrCreateCache(classCfg);
    return cache;
}

3.key-value缓存

简单的key-value存储,使用Thin Client连接即可,具体api如下

  • put, get方法:存取k-v键值对
  • getAndPutIfAbsent():如果不存在则存入新值,返回值为旧值
  • putIfAbsent():如果不存在则存入心智,返回是否插入成功
  • getAndReplace():获取旧值,存新值
  • replace():替换值
  • remove():删除值
private static void test(IgniteClient igniteClient) {
    ClientCache<String, String> cache = getCache(igniteClient);
    // put
    cache.put("testKey", "testValue");
    // get
    String value = cache.get("testKey");
    System.out.println(value);
    // getAndPutIfAbsent
    String oldValue = cache.getAndPutIfAbsent("testKey", "newKey");
    System.out.println("myKey:" + oldValue);
    // putIfAbsent
    boolean success = cache.putIfAbsent("myKey", "myValue");
    System.out.println("myKey:" + success);
    // getAndReplace
    oldValue = cache.getAndReplace("myKey", "newValue");
    System.out.println("myKey replace:" + oldValue);
    // replace
    success = cache.replace("myKey", "value");
    System.out.println("myKey replace:" + success);
    // remove
    success = cache.remove("myKey");
    System.out.println("myKey remove:" + success);
}

3.sql查询

  • SqlQuery:查询后返回整个对象,2.8版本后已经不建议使用了
  • SqlFieldQuery:返回部分属性或一个计算后的值
  • 跨缓存查询:Ignite每个缓存cache都有自己的名字,会被作为每一个table的schema,如果连接其他缓存的表,需要加上该缓存的schema
// SqlQuery
private static void testSqlQuery(IgniteCache<Integer, Student> stuCache) {
    SqlQuery<Integer, Student> sqlQuery = new SqlQuery<>(Student.class, "age > 0");
    QueryCursor<Cache.Entry<Integer, Student>> cursor1 = stuCache.query(sqlQuery);
    List<Cache.Entry<Integer, Student>> all = cursor1.getAll();
    for (Cache.Entry<Integer, Student> entry : all) {
        System.out.println("key:" + entry.getKey());
        System.out.println("value:" + entry.getValue());
    }
}
// SqlFieldQuery
private static void testSqlFieldQuery(IgniteCache<Integer, Student> stuCache) {
    SqlFieldsQuery sql = new SqlFieldsQuery("select concat(classId, '----', name) from STUDENT");
    QueryCursor<List<?>> cursor = stuCache.query(sql);
    for (List<?> row : cursor) {
        System.out.println("学生信息:" + row.get(0));
    }
}
// select from two cache
private static void testDataFromTwoCache(IgniteCache<Integer, Class> classCache) {
    SqlFieldsQuery sql = new SqlFieldsQuery("select concat(cla.id, '----', cla.name) as clainfo," +
            "concat(stu.name, '----', stu.age) as stuinfo " +
            "from CLASS as cla, STUDENT.STUDENT as stu " +
            "where cla.id = stu.classId");
    QueryCursor<List<?>> cursor = classCache.query(sql);
    for (List<?> row : cursor) {
        System.out.println("班级信息:" + row.get(0) + ",学生信息:" + row.get(1));
    }
}

4.DML语句

  • DML INSERT
  • DML SELECT
  • DML UPDATE
  • DML DELETE
private static void testDML(IgniteCache<Integer, Student> stuCache) {
    // DML INSERT
    stuCache.query(new SqlFieldsQuery("insert into STUDENT(_key,classId,name,age) values(?,?,?,?)").setArgs(3, 9, "小红", 12));
    // DML SELECT
    SqlFieldsQuery sql = new SqlFieldsQuery("select concat(classId, '----', name from STUDENT where classId = ?)").setArgs(9);
    QueryCursor<List<?>> cursor = stuCache.query(sql);
    for (List<?> row : cursor) {
        System.out.println("学生信息:" + row.get(0));
    }
    // DML UPDATE
    FieldsQueryCursor<List<?>> update = stuCache.query(new SqlFieldsQuery("update STUDENT set name = ? where _key = ?").setArgs("小粉", 3));
    System.out.println("update影响行数:" + update.getAll().get(0));
    // DML DELETE
    FieldsQueryCursor<List<?>> delete = stuCache.query(new SqlFieldsQuery("delete from STUDENT where _key = ?").setArgs(3));
    System.out.println("delete影响行数:" + delete.getAll().get(0));
}

5.分布式关联查询

启动多个节点时,需要设置开启分布式关联查询,否则join操作只会搜索本节点的数据,导致漏数据

  • sqlFieldQuery.setDistributedJoins(true);

6.索引分组

查询条件复杂时使用多字段索引加快查询的速度(类似于Mysql的联合索引)

  • @QuerySqlField(groups={"A"})
  • 每次查询只能用一个索引,因此组索引可以限制改善我们的查询速度

7.事务

  • 开启事务:cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
  • 创建事务:Ignition.ignite().transactions().txStart(TransactionConcurrency, TransactionIsolation)
  • 事务并发模式:TransactionConcurrency可以是OPTIMISTIC或PESSIMISTIC
  • 事务隔离级别:TransactionIsolation可以是READ-COMMITTED,REPEATABLE_READ和SERIALIZABLE
  • 事务结束:最后需要使用commit()来提交修改,或通过rollback()来回滚修改
private static void TestTransaction() throws Exception {
    final Ignite ignite = getIgnite();
    CacheConfiguration cacheCfg = new CacheConfiguration();
    cacheCfg.setName("default");
    cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
    cacheCfg.setBackups(1);
    final IgniteCache<String, String> cache = ignite.getOrCreateCache(cacheCfg);

    cache.remove("MyKey");

    new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("Transaction 1: begin");
            try (Transaction tx = Ignition.ignite().transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE)) {
                String value = cache.get("MyKey");
                cache.put("MyKey", "MyValue 1");
                try {
                    Thread.currentThread().sleep(5 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Transaction 1: before commit, " + cache.get("MyKey"));
                tx.commit();
                System.out.println("Transaction 1: after commit, " + cache.get("MyKey"));
            }
            System.out.println("Transaction 1: end");
        }
    }).start();
    
    Thread.currentThread().sleep(2 * 1000);
    
    new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("Transaction 2: begin");
            try (Transaction tx = Ignition.ignite().transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE)) {
                String value = cache.get("MyKey");
                cache.put("MyKey", "MyValue 2");
                System.out.println("Transaction 2: before commit, " + cache.get("MyKey"));
                tx.commit();
                System.out.println("Transaction 2: after commit, " + cache.get("MyKey"));
            }
            System.out.println("Transaction 2: end");
        }
    }).start();
}

8.集成oracle

  • 在Ignite中创建一个新的缓存,以存储从Oracle同步的数据
  • 在Ignite中创建一个数据源,并将其配置为连接到Oracle的数据库
  • 在Ignite中创建一个CacheStore的实现类,用于将数据从Oracle同步到缓存中
  • 实现CacheStore接口的loadCache方法,该方法将从Oracle数据库中读取数据并将其存储到缓存中
  • 将CacheStore实现类配置为Ignite缓存的write-through模式,这样可以确保每次向缓存写入数据时都会自动将数据同步到Oracle数据库
// 创建Ignite缓存配置
CacheConfiguration<Long, MyData> cacheConfig = new CacheConfiguration<>("myCache");
cacheConfig.setCacheMode(CacheMode.PARTITIONED);
cacheConfig.setIndexedTypes(Long.class, MyData.class);

// 创建数据源配置
DataSourceConfig dsCfg = new DataSourceConfig();
dsCfg.setDataSourceClassName("oracle.jdbc.pool.OracleDataSource");
dsCfg.setConnectionProperties(new Properties() {{
    setProperty("user", "myUser");
    setProperty("password", "myPassword");
    setProperty("url", "jdbc:oracle:thin:@//localhost:1521/myDatabase");
}});

// 创建CacheStore实现类
class OracleCacheStore extends CacheStoreAdapter<Long, MyData> {
    private final DataSource dataSource;

    public OracleCacheStore(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    public MyData load(Long key) throws CacheLoaderException {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement("SELECT * FROM myTable WHERE id = ?")) {
            stmt.setLong(1, key);
            ResultSet rs = stmt.executeQuery();
            if (rs.next()) {
                MyData data = new MyData();
                data.setId(rs.getLong("id"));
                data.setName(rs.getString("name"));
                return data;
            } else {
                return null;
            }
        } catch (SQLException e) {
            throw new CacheLoaderException("Failed to load data from Oracle", e);
        }
    }

    @Override
    public void write(Cache.Entry<? extends Long, ? extends MyData> entry) throws CacheWriterException {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement("INSERT INTO myTable (id, name) VALUES (?, ?)")) {
            stmt.setLong(1, entry.getKey());
            stmt.setString(2, entry.getValue().getName());
            stmt.executeUpdate();
        } catch (SQLException e) {
            throw new CacheWriterException("Failed to write data to Oracle", e);
        }
    }

    @Override
    public void delete(Object key) throws CacheWriterException {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement("DELETE FROM myTable WHERE id = ?")) {
            stmt.setLong(1, (Long)key);
            stmt.executeUpdate();
        } catch (SQLException e) {
            throw new CacheWriterException("



四、总结

Ignite的sql语法和mysql、Oracle一致,底层同样使用的B+树索引,使用起来非常方便。 后续会继续更新RestAPI部分内容

五、参考资料

转载自:https://juejin.cn/post/7250080519068827709
评论
请登录