在分布式应用中,统计当前登录用户数是一个常见需求。如果使用infinispan缓存来存储用户总数,并采用“获取当前值 -> 增加1 -> 更新缓存”的简单逻辑,当多个用户几乎同时登录时,就会出现严重的竞态条件。例如:
最终缓存中的用户数是 N+1,而不是预期的 N+2,导致计数不准确。这是因为“读取-修改-写入”这一系列操作并非原子性的。为了解决这一问题,Infinispan提供了多种强大的同步机制。
Infinispan提供了专门的分布式计数器API,能够以原子方式执行增量、减量等操作,从而避免竞态条件。这是处理分布式计数的首选和最有效的方法。
Infinispan计数器是为分布式环境下的原子计数操作而设计的。它们可以是强一致性计数器(Strong Counter),保证每次操作都反映最新的全局状态;也可以是弱一致性计数器(Weak Counter),提供更高的吞吐量,但可能在短时间内存在数据传播延迟。
以下是如何使用Infinispan的强一致性计数器来统计用户登录的示例:
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.counter.api.CounterManager;
import org.infinispan.counter.api.StrongCounter;
import org.infinispan.counter.api.CounterConfiguration;
import org.infinispan.counter.api.CounterType;
public class UserLoginCounter {
private RemoteCacheManager cacheManager;
private StrongCounter userCounter;
public UserLoginCounter(RemoteCacheManager cacheManager) {
this.cacheManager = cacheManager;
// 获取计数器管理器
CounterManager counterManager = cacheManager.get
// 定义计数器配置:强一致性,初始值为0
CounterConfiguration config = CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG).initialValue(0).build();
// 获取或创建名为 "loggedInUsers" 的强一致性计数器
counterManager.defineCounter("loggedInUsers", config);
userCounter = counterManager.get ("loggedInUsers").toStrongCounter();
}
/**
* 用户登录时调用,原子性地增加用户计数。
* @return 增加后的用户总数
*/
public long userLoggedIn() {
return userCounter.incrementAndGet().join(); // join() 用于阻塞等待结果
}
/**
* 用户登出时调用,原子性地减少用户计数。
* @return 减少后的用户总数
*/
public long userLoggedOut() {
return userCounter.decrementAndGet().join();
}
/**
* 获取当前用户总数。
* @return 当前用户总数
*/
public long getCurrentUserCount() {
return userCounter.getValue().join();
}
public static void main(String[] args) throws Exception {
// 假设已经配置并启动了RemoteCacheManager
// 例如:RemoteCacheManager cacheManager = new RemoteCacheManager(new ConfigurationBuilder().addServer().host("127.0.0.1").port(11222).build());
// 实际应用中需要替换为您的Infinispan服务器配置
RemoteCacheManager cacheManager = new RemoteCacheManager(); // 简化示例,实际需配置连接
UserLoginCounter counterService = new UserLoginCounter(cacheManager);
System.out.println("初始用户数: " + counterService.getCurrentUserCount());
// 模拟多个线程同时登录
for (int i = 0; i < 5; i++) {
new Thread(() -> {
long count = counterService.userLoggedIn();
System.out.println(Thread.currentThread().getName() + " 登录,当前用户数: " + count);
}, "Thread-" + i).start();
}
// 等待所有线程完成
Thread.sleep(1000);
System.out.println("最终用户数: " + counterService.getCurrentUserCount());
cacheManager.stop();
}
}Infinispan支持事务,允许将一系列缓存操作封装成一个原子单元。在事务中,所有操作要么全部成功,要么全部失败,从而保证数据的一致性。
Infinispan事务通常与Java Transaction API (JTA) 集成,通过两阶段提交协议确保分布式环境下的原子性、一致性、隔离性和持久性(ACID)。当用户登录时,可以在一个事务中完成“读取旧值 -> 计算新值 -> 更新新值”的操作。
以下是如何使用JTA事务来统计用户登录的示例:
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
public class UserLoginTransactional {
private RemoteCache userCountCache;
private UserTransaction userTransaction; // 实际应用中通过JNDI或DI获取
public UserLoginTransactional(RemoteCacheManager cacheManager, UserTransaction ut) {
// 获取或创建支持事务的缓存
userCountCache = cacheManager.getCache("userCounts");
this.userTransaction = ut; // 假设UserTransaction已注入或获取
}
/**
* 用户登录时调用,通过事务原子性地增加用户计数。
* @return 增加后的用户总数
* @throws Exception 如果事务操作失败
*/
public long userLoggedIn() throws Exception {
long newCount = 0;
try {
userTransaction.begin(); // 开启事务
Long currentCount = userCountCache.get("loggedInUsers");
if (currentCount == null) {
currentCount = 0L;
}
newCount = currentCount + 1;
userCountCache.put("loggedInUsers", newCount);
userTransaction.commit(); // 提交事务
} catch (NotSupportedException | SystemException | RollbackException | HeuristicMixedException | HeuristicRollbackException e) {
if (userTransaction != null) {
try {
userTransaction.rollback(); // 事务回滚
} catch (SystemException se) {
System.err.println("事务回滚失败: " + se.getMessage());
}
}
throw new RuntimeException("用户登录计数事务失败", e);
}
return newCount;
}
/**
* 获取当前用户总数。
* @return 当前用户总数
*/
public long getCurrentUserCount() {
Long count = userCountCache.get("loggedInUsers");
return count != null ? count : 0L;
}
public static void main(String[] args) throws Exception {
// 假设已经配置并启动了RemoteCacheManager
// RemoteCacheManager cacheManager = new RemoteCacheManager(new ConfigurationBuilder().addServer().host("127.0.0.1").port(11222).build());
RemoteCacheManager cacheManager = new RemoteCacheManager(); // 简化示例
// 实际应用中需要配置JTA事务管理器,这里使用一个模拟的UserTransaction
UserTransaction mockUserTransaction = new MockUserTransaction(); // 模拟实现
UserLoginTransactional transactionalService = new UserLoginTransactional(cacheManager, mockUserTransaction);
System.out.println("初始用户数: " + transactionalService.getCurrentUserCount());
// 模拟多个线程同时登录
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
long count = transactionalService.userLoggedIn();
System.out.println(Thread.currentThread().getName() + " 登录,当前用户数: " + count);
} catch (Exception e) {
System.err.println(Thread.currentThread().getName() + " 登录失败: " + e.getMessage());
}
}, "Thread-" + i).start();
}
Thread.sleep(1000); // 等待所有线程完成
System.out.println("最终用户数: " + transactionalService.getCurrentUserCount());
cacheManager.stop();
}
// 模拟一个简单的UserTransaction实现,实际应用中会由应用服务器或事务管理器提供
static class MockUserTransaction implements UserTransaction {
private boolean active = false;
@Override
public void begin() throws NotSupportedException, SystemException {
if (active) throw new NotSupportedException("Transaction already active");
System.out.println("Transaction Begin");
active = true;
}
@Override
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException {
if (!active) throw new IllegalStateException("No transaction active");
System.out.println("Transaction Commit");
active = false;
}
@Override
public void rollback() throws IllegalStateException, SecurityException, SystemException {
if (!active) throw new IllegalStateException("No transaction active");
System.out.println("Transaction Rollback");
active = false;
}
@Override
public void setRollbackOnly() throws IllegalStateException, SystemException { /* Not implemented */ }
@Override
public int getStatus() throws SystemException { return active ? 0 : 6; /* Active or NoTransaction */ }
@Override
public void setTransactionTimeout(int seconds) throws SystemException { /* Not implemented */ }
}
} Infinispan的RemoteCache接口提供了支持版本化操作的方法,例如 replace(K key, V oldValue, V newValue)。这种方法利用了乐观锁的思想,只有当缓存中键的当前值与 oldValue 匹配时,才会成功更新为 newValue。
客户端首先读取键的当前值,然后基于这个值计算出新的值。接着,尝试使用 replace(key, oldValue, newValue) 方法进行更新,其中 oldValue 就是之前读取到的值。如果 replace 返回 true,表示更新成功;如果返回 false,则说明在读取到 oldValue 到尝试更新之间,有其他线程修改了该键的值。此时,客户端需要重新读取最新值并重试。
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
public class UserLoginVersioned {
private RemoteCache userCountCache;
private static final int MAX_RETRIES = 5; // 最大重试次数
public UserLoginVersioned(RemoteCacheManager cacheManager) {
userCountCache = cacheManager.getCache("userCounts");
// 确保初始值存在
userCountCache.putIfAbsent("loggedInUsers", 0L);
}
/**
* 用户登录时调用,通过版本化操作原子性地增加用户计数。
* @return 增加后的用户总数
* @throws RuntimeException 如果重试次数超过限制
*/
public long userLoggedIn() {
for (int i = 0; i < MAX_RETRIES; i++) {
Long currentCount = userCountCache.get("loggedInUsers");
if (currentCount == null) {
currentCount = 0L; // 理论上putIfAbsent已处理,但作为防御性编程
}
long newCount = currentCount + 1;
// 尝试原子性替换:如果当前缓存中的"loggedInUsers"值仍是currentCount,则更新为newCount
boolean success = userCountCache.replace("loggedInUsers", currentCount, newCount);
if (success) {
return newCount;
} else {
System.out.println(Thread.currentThread().getName() + " 冲突,重试 " + (i + 1));
// 冲突发生,其他线程已修改,需要重试
// 可以在这里加入短暂的等待以避免活锁
try {
Thread.sleep(10 * (i + 1)); // 指数退避
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("重试中断", e);
}
}
}
throw new RuntimeException("用户登录计数更新失败:达到最大重试次数");
}
/**
* 获取当前用户总数。
* @return 当前用户总数
*/
public long getCurrentUserCount() {
Long count = userCountCache.get("loggedInUsers");
return count != null ? count : 0L;
}
public static void main(String[] args) throws Exception {
// 假设已经配置并启动了RemoteCacheManager
// RemoteCacheManager cacheManager = new RemoteCacheManager(new ConfigurationBuilder().addServer().host("127.0.0.1").port(11222).build());
RemoteCacheManager cacheManager = new RemoteCacheManager(); // 简化示例
UserLoginVersioned versionedService = new UserLoginVersioned(cacheManager);
System.out.println("初始用户数: " + versionedService.getCurrentUserCount());
// 模拟多个线程同时登录
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
long count = versionedService.userLoggedIn();
System.out.println(Thread.currentThread().getName() + " 登录,当前用户数: " + count);
} catch (RuntimeException e) {
System.err.println(Thread.currentThread().getName() + " 登录失败: " + e.getMessage());
}
}, "Thread-" + i).start();
}
Thread.sleep(1000); // 等待所有线程完成
System.out.println("最终用户数: " + versionedService.getCurrentUserCount());
cacheManager.stop();
}
} 在选择Infinispan的同步策略时,应考虑以下因素:
在分布式环境中,确保数据一致性是核心挑战之一。对于Infinispan缓存中的并发用户计数问题,直接的“读取-修改-写入”模式会导致数据不准确。Infinispan提供了强大的内置机制来解决这些问题:专为原子计数设计的Infinispan计数器、提供全面ACID保证的事务机制,以及基于乐观锁思想的版本化操作。开发者应根据具体的业务需求、性能考量和复杂性权衡,选择最适合的同步策略,以构建健壮、高效的分布式应用。