Apache Ignite基本概念及最佳实践(二)——实战篇
一、前言
Apache Ignite系列:
- Apache Ignite基本概念及最佳实践(一)——概念篇
- Apache Ignite基本概念及最佳实践(二)——实战篇
- Apache Ignite基本概念及最佳实践(三)——监测篇
- Apache Ignite基本概念及最佳实践(四)——问答篇
二、Demo实战
0.实体类
下面列出用到的实体类,省略get, set和toString方法
public class Class {
@QuerySqlField(index = true)
private int id;
@QuerySqlField
private String name;
private int level;
public Class(int id, String name) {
this.id = id;
this.name = name;
}
}
public class Student {
@QuerySqlField(index = true)
private int classId;
@QuerySqlField
private String name;
@QuerySqlField
private int age;
public Student(int classId, String name, int age) {
this.classId = classId;
this.name = name;
this.age = age;
}
}
1.连接ignite节点
有两种连接方式,Client连接和Thin Client连接
// Client连接
private static Ignite getIgnite() {
TcpDiscoverySpi spi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500"));// 要连接的远程节点ip+端口,默认是47500
spi.setLocalPort(47501);// 当前启动节点的端口号,不配置则从47500往后顺延
spi.setIpFinder(ipFinder);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setDiscoverySpi(spi);
cfg.setClientMode(true);// 当前启动节点是否为Client节点,默认是false,即server节点
Ignite ignite = Ignition.start(cfg);
return ignite;
}
// Thin Client连接
private static IgniteClient getIgniteClient() {
ClientConfiguration cfg = new ClientConfiguration()
.setAddresses("127.0.0.1:10800")// Thin Client默认连接端口号是10800
.setPartitionAwarenessEnabled(true);
IgniteClient client = Ignition.startClient(cfg);
return client;
}
2.创建缓存
创建最基本的缓存配置
- 缓存名称
- CacheMode
- 配置索引(注意,Thin Client连接是不支持配置索引的)
// client cache
private static IgniteCache<String, String> getCache(Ignite ignite) {
CacheConfiguration<String, String> cfg = new CacheConfiguration<>();
cfg.setName("myCache");// 缓存名称
cfg.setCacheMode(CacheMode.PARTITIONED);// cache模式
cfg.setIndexedTypes(String.class, String.class);// 配置索引
IgniteCache<String, String> cache = ignite.getOrCreateCache(cfg);
return cache;
}
// thin client cache
private static ClientCache<String, String> getCache(IgniteClient ignite) {
ClientCacheConfiguration classCfg = new ClientCacheConfiguration();
classCfg.setName("myCache");// 缓存名称
classCfg.setCacheMode(CacheMode.PARTITIONED);// cache模式
ClientCache<String, String> cache = ignite.getOrCreateCache(classCfg);
return cache;
}
3.key-value缓存
简单的key-value存储,使用Thin Client连接即可,具体api如下
- put, get方法:存取k-v键值对
- getAndPutIfAbsent():如果不存在则存入新值,返回值为旧值
- putIfAbsent():如果不存在则存入心智,返回是否插入成功
- getAndReplace():获取旧值,存新值
- replace():替换值
- remove():删除值
private static void test(IgniteClient igniteClient) {
ClientCache<String, String> cache = getCache(igniteClient);
// put
cache.put("testKey", "testValue");
// get
String value = cache.get("testKey");
System.out.println(value);
// getAndPutIfAbsent
String oldValue = cache.getAndPutIfAbsent("testKey", "newKey");
System.out.println("myKey:" + oldValue);
// putIfAbsent
boolean success = cache.putIfAbsent("myKey", "myValue");
System.out.println("myKey:" + success);
// getAndReplace
oldValue = cache.getAndReplace("myKey", "newValue");
System.out.println("myKey replace:" + oldValue);
// replace
success = cache.replace("myKey", "value");
System.out.println("myKey replace:" + success);
// remove
success = cache.remove("myKey");
System.out.println("myKey remove:" + success);
}
4.sql查询
- SqlQuery:查询后返回整个对象,2.8版本后已经不建议使用了
- SqlFieldQuery:返回部分属性或一个计算后的值
- 跨缓存查询:Ignite每个缓存cache都有自己的名字,会被作为每一个table的schema,如果连接其他缓存的表,需要加上该缓存的schema
// SqlQuery
private static void testSqlQuery(IgniteCache<Integer, Student> stuCache) {
SqlQuery<Integer, Student> sqlQuery = new SqlQuery<>(Student.class, "age > 0");
QueryCursor<Cache.Entry<Integer, Student>> cursor1 = stuCache.query(sqlQuery);
List<Cache.Entry<Integer, Student>> all = cursor1.getAll();
for (Cache.Entry<Integer, Student> entry : all) {
System.out.println("key:" + entry.getKey());
System.out.println("value:" + entry.getValue());
}
}
// SqlFieldQuery
private static void testSqlFieldQuery(IgniteCache<Integer, Student> stuCache) {
SqlFieldsQuery sql = new SqlFieldsQuery("select concat(classId, '----', name) from STUDENT");
QueryCursor<List<?>> cursor = stuCache.query(sql);
for (List<?> row : cursor) {
System.out.println("学生信息:" + row.get(0));
}
}
// select from two cache
private static void testDataFromTwoCache(IgniteCache<Integer, Class> classCache) {
SqlFieldsQuery sql = new SqlFieldsQuery("select concat(cla.id, '----', cla.name) as clainfo," +
"concat(stu.name, '----', stu.age) as stuinfo " +
"from CLASS as cla, STUDENT.STUDENT as stu " +
"where cla.id = stu.classId");
QueryCursor<List<?>> cursor = classCache.query(sql);
for (List<?> row : cursor) {
System.out.println("班级信息:" + row.get(0) + ",学生信息:" + row.get(1));
}
}
5.DML语句
- DML INSERT
- DML SELECT
- DML UPDATE
- DML DELETE
private static void testDML(IgniteCache<Integer, Student> stuCache) {
// DML INSERT
stuCache.query(new SqlFieldsQuery("insert into STUDENT(_key,classId,name,age) values(?,?,?,?)").setArgs(3, 9, "小红", 12));
// DML SELECT
SqlFieldsQuery sql = new SqlFieldsQuery("select concat(classId, '----', name from STUDENT where classId = ?)").setArgs(9);
QueryCursor<List<?>> cursor = stuCache.query(sql);
for (List<?> row : cursor) {
System.out.println("学生信息:" + row.get(0));
}
// DML UPDATE
FieldsQueryCursor<List<?>> update = stuCache.query(new SqlFieldsQuery("update STUDENT set name = ? where _key = ?").setArgs("小粉", 3));
System.out.println("update影响行数:" + update.getAll().get(0));
// DML DELETE
FieldsQueryCursor<List<?>> delete = stuCache.query(new SqlFieldsQuery("delete from STUDENT where _key = ?").setArgs(3));
System.out.println("delete影响行数:" + delete.getAll().get(0));
}
6.分布式关联查询
启动多个节点时,需要设置开启分布式关联查询,否则join操作只会搜索本节点的数据,导致漏数据
- sqlFieldQuery.setDistributedJoins(true);
7.索引分组
查询条件复杂时使用多字段索引加快查询的速度(类似于Mysql的联合索引)
- @QuerySqlField(groups={"A"})
- 每次查询只能用一个索引,因此组索引可以限制改善我们的查询速度
8.事务
- 开启事务:cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
- 创建事务:Ignition.ignite().transactions().txStart(TransactionConcurrency, TransactionIsolation)
- 事务并发模式:TransactionConcurrency可以是OPTIMISTIC或PESSIMISTIC
- 事务隔离级别:TransactionIsolation可以是READ-COMMITTED,REPEATABLE_READ和SERIALIZABLE
- 事务结束:最后需要使用commit()来提交修改,或通过rollback()来回滚修改
private static void TestTransaction() throws Exception {
final Ignite ignite = getIgnite();
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setName("default");
cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cacheCfg.setBackups(1);
final IgniteCache<String, String> cache = ignite.getOrCreateCache(cacheCfg);
cache.remove("MyKey");
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Transaction 1: begin");
try (Transaction tx = Ignition.ignite().transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE)) {
String value = cache.get("MyKey");
cache.put("MyKey", "MyValue 1");
try {
Thread.currentThread().sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Transaction 1: before commit, " + cache.get("MyKey"));
tx.commit();
System.out.println("Transaction 1: after commit, " + cache.get("MyKey"));
}
System.out.println("Transaction 1: end");
}
}).start();
Thread.currentThread().sleep(2 * 1000);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Transaction 2: begin");
try (Transaction tx = Ignition.ignite().transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE)) {
String value = cache.get("MyKey");
cache.put("MyKey", "MyValue 2");
System.out.println("Transaction 2: before commit, " + cache.get("MyKey"));
tx.commit();
System.out.println("Transaction 2: after commit, " + cache.get("MyKey"));
}
System.out.println("Transaction 2: end");
}
}).start();
}
9.集成oracle
- 在Ignite中创建一个新的缓存,以存储从Oracle同步的数据
- 在Ignite中创建一个数据源,并将其配置为连接到Oracle的数据库
- 在Ignite中创建一个CacheStore的实现类,用于将数据从Oracle同步到缓存中
- 实现CacheStore接口的loadCache方法,该方法将从Oracle数据库中读取数据并将其存储到缓存中
- 将CacheStore实现类配置为Ignite缓存的write-through模式,这样可以确保每次向缓存写入数据时都会自动将数据同步到Oracle数据库
// 创建Ignite缓存配置
CacheConfiguration<Long, MyData> cacheConfig = new CacheConfiguration<>("myCache");
cacheConfig.setCacheMode(CacheMode.PARTITIONED);
cacheConfig.setIndexedTypes(Long.class, MyData.class);
// 创建数据源配置
DataSourceConfig dsCfg = new DataSourceConfig();
dsCfg.setDataSourceClassName("oracle.jdbc.pool.OracleDataSource");
dsCfg.setConnectionProperties(new Properties() {{
setProperty("user", "myUser");
setProperty("password", "myPassword");
setProperty("url", "jdbc:oracle:thin:@//localhost:1521/myDatabase");
}});
// 创建CacheStore实现类
class OracleCacheStore extends CacheStoreAdapter<Long, MyData> {
private final DataSource dataSource;
public OracleCacheStore(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public MyData load(Long key) throws CacheLoaderException {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement("SELECT * FROM myTable WHERE id = ?")) {
stmt.setLong(1, key);
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
MyData data = new MyData();
data.setId(rs.getLong("id"));
data.setName(rs.getString("name"));
return data;
} else {
return null;
}
} catch (SQLException e) {
throw new CacheLoaderException("Failed to load data from Oracle", e);
}
}
@Override
public void write(Cache.Entry<? extends Long, ? extends MyData> entry) throws CacheWriterException {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement("INSERT INTO myTable (id, name) VALUES (?, ?)")) {
stmt.setLong(1, entry.getKey());
stmt.setString(2, entry.getValue().getName());
stmt.executeUpdate();
} catch (SQLException e) {
throw new CacheWriterException("Failed to write data to Oracle", e);
}
}
@Override
public void delete(Object key) throws CacheWriterException {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement("DELETE FROM myTable WHERE id = ?")) {
stmt.setLong(1, (Long)key);
stmt.executeUpdate();
} catch (SQLException e) {
throw new CacheWriterException("
四、总结
Ignite的sql语法和mysql、Oracle一致,底层同样使用的B+树索引,使用起来非常方便。 后续会继续更新RestAPI部分内容
五、参考资料
转载自:https://juejin.cn/post/7250080519068827709