17370845950

Infinispan分布式计数:解决并发登录用户统计问题

针对Infinispan缓存中并发用户登录计数存在的竞态条件问题,本文深入探讨了三种有效的同步策略:Infinispan计数器、事务机制和版本化操作。通过详细介绍其原理和应用场景,并提供示例代码,帮助开发者在分布式环境中实现准确、高效的用户数量统计,确保数据一致性。

分布式环境下的计数挑战

在分布式应用中,统计当前登录用户数是一个常见需求。如果使用infinispan缓存来存储用户总数,并采用“获取当前值 -> 增加1 -> 更新缓存”的简单逻辑,当多个用户几乎同时登录时,就会出现严重的竞态条件。例如:

  1. 用户A登录,从缓存中读取当前用户数为 N。
  2. 用户B登录,从缓存中读取当前用户数为 N。
  3. 用户A将 N+1 写回缓存。
  4. 用户B将 N+1 写回缓存。

最终缓存中的用户数是 N+1,而不是预期的 N+2,导致计数不准确。这是因为“读取-修改-写入”这一系列操作并非原子性的。为了解决这一问题,Infinispan提供了多种强大的同步机制。

解决方案一:Infinispan计数器 (Counters)

Infinispan提供了专门的分布式计数器API,能够以原子方式执行增量、减量等操作,从而避免竞态条件。这是处理分布式计数的首选和最有效的方法。

原理

Infinispan计数器是为分布式环境下的原子计数操作而设计的。它们可以是强一致性计数器(Strong Counter),保证每次操作都反映最新的全局状态;也可以是弱一致性计数器(Weak Counter),提供更高的吞吐量,但可能在短时间内存在数据传播延迟。

示例代码

以下是如何使用Infinispan的强一致性计数器来统计用户登录的示例:

import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.counter.api.CounterManager;
import org.infinispan.counter.api.StrongCounter;
import org.infinispan.counter.api.CounterConfiguration;
import org.infinispan.counter.api.CounterType;

public class UserLoginCounter {

    private RemoteCacheManager cacheManager;
    private StrongCounter userCounter;

    public UserLoginCounter(RemoteCacheManager cacheManager) {
        this.cacheManager = cacheManager;
        // 获取计数器管理器
        CounterManager counterManager = cacheManager.get
        // 定义计数器配置:强一致性,初始值为0
        CounterConfiguration config = CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG).initialValue(0).build();
        // 获取或创建名为 "loggedInUsers" 的强一致性计数器
        counterManager.defineCounter("loggedInUsers", config);
        userCounter = counterManager.get  ("loggedInUsers").toStrongCounter();
    }

    /**
     * 用户登录时调用,原子性地增加用户计数。
     * @return 增加后的用户总数
     */
    public long userLoggedIn() {
        return userCounter.incrementAndGet().join(); // join() 用于阻塞等待结果
    }

    /**
     * 用户登出时调用,原子性地减少用户计数。
     * @return 减少后的用户总数
     */
    public long userLoggedOut() {
        return userCounter.decrementAndGet().join();
    }

    /**
     * 获取当前用户总数。
     * @return 当前用户总数
     */
    public long getCurrentUserCount() {
        return userCounter.getValue().join();
    }

    public static void main(String[] args) throws Exception {
        // 假设已经配置并启动了RemoteCacheManager
        // 例如:RemoteCacheManager cacheManager = new RemoteCacheManager(new ConfigurationBuilder().addServer().host("127.0.0.1").port(11222).build());
        // 实际应用中需要替换为您的Infinispan服务器配置
        RemoteCacheManager cacheManager = new RemoteCacheManager(); // 简化示例,实际需配置连接

        UserLoginCounter counterService = new UserLoginCounter(cacheManager);

        System.out.println("初始用户数: " + counterService.getCurrentUserCount());

        // 模拟多个线程同时登录
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                long count = counterService.userLoggedIn();
                System.out.println(Thread.currentThread().getName() + " 登录,当前用户数: " + count);
            }, "Thread-" + i).start();
        }

        // 等待所有线程完成
        Thread.sleep(1000);

        System.out.println("最终用户数: " + counterService.getCurrentUserCount());

        cacheManager.stop();
    }
}

注意事项

  • 强一致性 vs 弱一致性: 对于用户登录计数这种需要精确反映当前状态的场景,通常推荐使用强一致性计数器。弱一致性计数器适用于对最终一致性要求不高、但对吞吐量要求极高的场景。
  • 性能: Infinispan计数器是高度优化的,专门用于原子性地处理计数操作,通常比通用事务或版本化操作具有更好的性能。

解决方案二:Infinispan事务 (Transactions)

Infinispan支持事务,允许将一系列缓存操作封装成一个原子单元。在事务中,所有操作要么全部成功,要么全部失败,从而保证数据的一致性。

原理

Infinispan事务通常与Java Transaction API (JTA) 集成,通过两阶段提交协议确保分布式环境下的原子性、一致性、隔离性和持久性(ACID)。当用户登录时,可以在一个事务中完成“读取旧值 -> 计算新值 -> 更新新值”的操作。

示例代码

以下是如何使用JTA事务来统计用户登录的示例:

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;

import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;

public class UserLoginTransactional {

    private RemoteCache userCountCache;
    private UserTransaction userTransaction; // 实际应用中通过JNDI或DI获取

    public UserLoginTransactional(RemoteCacheManager cacheManager, UserTransaction ut) {
        // 获取或创建支持事务的缓存
        userCountCache = cacheManager.getCache("userCounts");
        this.userTransaction = ut; // 假设UserTransaction已注入或获取
    }

    /**
     * 用户登录时调用,通过事务原子性地增加用户计数。
     * @return 增加后的用户总数
     * @throws Exception 如果事务操作失败
     */
    public long userLoggedIn() throws Exception {
        long newCount = 0;
        try {
            userTransaction.begin(); // 开启事务

            Long currentCount = userCountCache.get("loggedInUsers");
            if (currentCount == null) {
                currentCount = 0L;
            }
            newCount = currentCount + 1;
            userCountCache.put("loggedInUsers", newCount);

            userTransaction.commit(); // 提交事务
        } catch (NotSupportedException | SystemException | RollbackException | HeuristicMixedException | HeuristicRollbackException e) {
            if (userTransaction != null) {
                try {
                    userTransaction.rollback(); // 事务回滚
                } catch (SystemException se) {
                    System.err.println("事务回滚失败: " + se.getMessage());
                }
            }
            throw new RuntimeException("用户登录计数事务失败", e);
        }
        return newCount;
    }

    /**
     * 获取当前用户总数。
     * @return 当前用户总数
     */
    public long getCurrentUserCount() {
        Long count = userCountCache.get("loggedInUsers");
        return count != null ? count : 0L;
    }

    public static void main(String[] args) throws Exception {
        // 假设已经配置并启动了RemoteCacheManager
        // RemoteCacheManager cacheManager = new RemoteCacheManager(new ConfigurationBuilder().addServer().host("127.0.0.1").port(11222).build());
        RemoteCacheManager cacheManager = new RemoteCacheManager(); // 简化示例

        // 实际应用中需要配置JTA事务管理器,这里使用一个模拟的UserTransaction
        UserTransaction mockUserTransaction = new MockUserTransaction(); // 模拟实现

        UserLoginTransactional transactionalService = new UserLoginTransactional(cacheManager, mockUserTransaction);

        System.out.println("初始用户数: " + transactionalService.getCurrentUserCount());

        // 模拟多个线程同时登录
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    long count = transactionalService.userLoggedIn();
                    System.out.println(Thread.currentThread().getName() + " 登录,当前用户数: " + count);
                } catch (Exception e) {
                    System.err.println(Thread.currentThread().getName() + " 登录失败: " + e.getMessage());
                }
            }, "Thread-" + i).start();
        }

        Thread.sleep(1000); // 等待所有线程完成

        System.out.println("最终用户数: " + transactionalService.getCurrentUserCount());

        cacheManager.stop();
    }

    // 模拟一个简单的UserTransaction实现,实际应用中会由应用服务器或事务管理器提供
    static class MockUserTransaction implements UserTransaction {
        private boolean active = false;

        @Override
        public void begin() throws NotSupportedException, SystemException {
            if (active) throw new NotSupportedException("Transaction already active");
            System.out.println("Transaction Begin");
            active = true;
        }

        @Override
        public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException {
            if (!active) throw new IllegalStateException("No transaction active");
            System.out.println("Transaction Commit");
            active = false;
        }

        @Override
        public void rollback() throws IllegalStateException, SecurityException, SystemException {
            if (!active) throw new IllegalStateException("No transaction active");
            System.out.println("Transaction Rollback");
            active = false;
        }

        @Override
        public void setRollbackOnly() throws IllegalStateException, SystemException { /* Not implemented */ }

        @Override
        public int getStatus() throws SystemException { return active ? 0 : 6; /* Active or NoTransaction */ }

        @Override
        public void setTransactionTimeout(int seconds) throws SystemException { /* Not implemented */ }
    }
}

注意事项

  • 开销: 事务引入了额外的协调和日志记录开销,通常比专用计数器更重。
  • 复杂性: 需要正确配置和管理JTA事务管理器。
  • 适用场景: 当你需要在一个原子操作中执行多个复杂的缓存修改,而不仅仅是简单的计数时,事务是更好的选择。

解决方案三:版本化操作 (Versioned Operations)

Infinispan的RemoteCache接口提供了支持版本化操作的方法,例如 replace(K key, V oldValue, V newValue)。这种方法利用了乐观锁的思想,只有当缓存中键的当前值与 oldValue 匹配时,才会成功更新为 newValue。

原理

客户端首先读取键的当前值,然后基于这个值计算出新的值。接着,尝试使用 replace(key, oldValue, newValue) 方法进行更新,其中 oldValue 就是之前读取到的值。如果 replace 返回 true,表示更新成功;如果返回 false,则说明在读取到 oldValue 到尝试更新之间,有其他线程修改了该键的值。此时,客户端需要重新读取最新值并重试。

示例代码

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;

public class UserLoginVersioned {

    private RemoteCache userCountCache;
    private static final int MAX_RETRIES = 5; // 最大重试次数

    public UserLoginVersioned(RemoteCacheManager cacheManager) {
        userCountCache = cacheManager.getCache("userCounts");
        // 确保初始值存在
        userCountCache.putIfAbsent("loggedInUsers", 0L);
    }

    /**
     * 用户登录时调用,通过版本化操作原子性地增加用户计数。
     * @return 增加后的用户总数
     * @throws RuntimeException 如果重试次数超过限制
     */
    public long userLoggedIn() {
        for (int i = 0; i < MAX_RETRIES; i++) {
            Long currentCount = userCountCache.get("loggedInUsers");
            if (currentCount == null) {
                currentCount = 0L; // 理论上putIfAbsent已处理,但作为防御性编程
            }
            long newCount = currentCount + 1;

            // 尝试原子性替换:如果当前缓存中的"loggedInUsers"值仍是currentCount,则更新为newCount
            boolean success = userCountCache.replace("loggedInUsers", currentCount, newCount);
            if (success) {
                return newCount;
            } else {
                System.out.println(Thread.currentThread().getName() + " 冲突,重试 " + (i + 1));
                // 冲突发生,其他线程已修改,需要重试
                // 可以在这里加入短暂的等待以避免活锁
                try {
                    Thread.sleep(10 * (i + 1)); // 指数退避
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("重试中断", e);
                }
            }
        }
        throw new RuntimeException("用户登录计数更新失败:达到最大重试次数");
    }

    /**
     * 获取当前用户总数。
     * @return 当前用户总数
     */
    public long getCurrentUserCount() {
        Long count = userCountCache.get("loggedInUsers");
        return count != null ? count : 0L;
    }

    public static void main(String[] args) throws Exception {
        // 假设已经配置并启动了RemoteCacheManager
        // RemoteCacheManager cacheManager = new RemoteCacheManager(new ConfigurationBuilder().addServer().host("127.0.0.1").port(11222).build());
        RemoteCacheManager cacheManager = new RemoteCacheManager(); // 简化示例

        UserLoginVersioned versionedService = new UserLoginVersioned(cacheManager);

        System.out.println("初始用户数: " + versionedService.getCurrentUserCount());

        // 模拟多个线程同时登录
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    long count = versionedService.userLoggedIn();
                    System.out.println(Thread.currentThread().getName() + " 登录,当前用户数: " + count);
                } catch (RuntimeException e) {
                    System.err.println(Thread.currentThread().getName() + " 登录失败: " + e.getMessage());
                }
            }, "Thread-" + i).start();
        }

        Thread.sleep(1000); // 等待所有线程完成

        System.out.println("最终用户数: " + versionedService.getCurrentUserCount());

        cacheManager.stop();
    }
}

注意事项

  • 重试逻辑: 客户端必须实现重试逻辑,以处理更新冲突。
  • 活锁风险: 如果并发冲突非常频繁,可能会导致大量重试,甚至活锁。
  • 适用场景: 适用于并发冲突不那么频繁的场景,或者当需要更细粒度的控制,不希望引入事务的全部开销时。

选择合适的同步策略

在选择Infinispan的同步策略时,应考虑以下因素:

  • 简单性与效率: 对于纯粹的计数需求,Infinispan计数器是最简单、最高效且推荐的解决方案。它们专为此类场景设计,提供了原子操作和优化的性能。
  • 原子性范围: 如果你需要将多个不同的缓存操作(例如,更新用户计数同时修改用户状态)作为一个原子单元执行,那么Infinispan事务是必要的。它提供了全面的ACID保证。
  • 并发模式: 版本化操作(乐观锁)适用于并发冲突相对较少,且不希望引入事务复杂性的场景。它将重试逻辑推给客户端,可能在极高并发下导致较多的重试开销。

总结

在分布式环境中,确保数据一致性是核心挑战之一。对于Infinispan缓存中的并发用户计数问题,直接的“读取-修改-写入”模式会导致数据不准确。Infinispan提供了强大的内置机制来解决这些问题:专为原子计数设计的Infinispan计数器、提供全面ACID保证的事务机制,以及基于乐观锁思想的版本化操作。开发者应根据具体的业务需求、性能考量和复杂性权衡,选择最适合的同步策略,以构建健壮、高效的分布式应用。