1. ホーム
  2. Java

WatchDog + Redisによる分散ロック

2022-02-19 14:20:54
<パス

コードの説明。
watchDog機構は、主にredisの商品キーの更新をロックし、業務処理時間の超過によるキー値の偶発的な削除を防止するために使用されます。
lua スクリプトは、redis のコマンドをアトミックに操作するために使用されます。なお、redisがブロックされないように、luaスクリプトには複雑なロジックを書かないようにしてください。

/**
     * Create a timed task thread factory
     */
    private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("watchDog-").get();
	
	/**
	* Create timed task threads
	*/
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(10,THREAD_FACTORY);

    /**
     * Store the thread object returned by the watchdog
     */
    private static final ConcurrentHashMap<String,ScheduledFuture> CONCURRENT_HASH_MAP = new ConcurrentHashMap<>(16);


 @GetMapping("/normal")
    public String normalRedisLock() throws InterruptedException {
        // set the corresponding UUID for each thread
        String productId = "product_huawei_p30";
        String stock = "stock";
        String clientId = UUID.randomUUID().toString();
        try {
            //If the thread is already locked, dead loop waiting to release the lock
            while (true){
                Integer stockNum = Integer.parseInt(stringRedisTemplate.opsForValue().get(stock));
                if(stockNum <= 0){
                    return "Item is sold out";
                }
                // Thread locking, for 10 seconds, set to the corresponding client ID
                Boolean setIfAbsent = stringRedisTemplate.opsForValue().setIfAbsent(productId, clientId, 10, TimeUnit.SECONDS);
                if(Objects.nonNull(setIfAbsent) && setIfAbsent){
                    break;
                }
            }
            System.out.println("---------------------------------- start deducting inventory ----------------------------------");
            /**
             * watchdog mechanism, the purpose is to cause the lock to be released early when the thread business processing time is too long, resulting in the erroneous release of another thread's lock when processing is complete
             */
            WatchDogThread watchDogThread = new WatchDogThread(productId,clientId,stringRedisTemplate,CONCURRENT_HASH_MAP,SCHEDULED_EXECUTOR_ SERVICE);
            ScheduledFuture<? > scheduledFuture = SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(watchDogThread, 1, 5, TimeUnit.SECONDS);
            /**
             * Use ConcurrentHaspMap to store the watchDog task and stop the specified watchDog task.
             */
            CONCURRENT_HASH_MAP.put(clientId,scheduledFuture);
            // Execute the business logic
            int stockNum = Integer.parseInt(stringRedisTemplate.opsForValue().get(stock));
            if(stockNum > 0){
                /*System.out.println("Simulation of business processing time is too long, watchdog renewal mechanism ..... ");
                Thread.sleep(20000);*/
                stringRedisTemplate.opsForValue().set(stock,String.valueOf(stockNum-1));
                System.out.println("Deducted stock successfully stock number is: "+stringRedisTemplate.opsForValue().get(stock));
            }else {
                System.out.println("Inventory deduction failed 。。。。 ");
            }
        } catch (Exception e) {
            /**
             * When an exception is thrown, the watchdog thread corresponding to the client ID is obtained and the watchdog mechanism is stopped
             */
            ScheduledFuture scheduledFuture = CONCURRENT_HASH_MAP.get(clientId);
            if(scheduledFuture ! = null){
                System.out.println("Exception message, remove watchdog thread. ");
                scheduledFuture.cancel(true);
                CONCURRENT_HASH_MAP.remove(clientId);
            }
        } finally {
            //release the lock
            stringRedisTemplate.delete(productId);
            System.out.println("---------------------------------- business execution completed ----------------------------------");
        }
        return "";

    }


WatchDogの実装メカニズム

public class WatchDogThread implements Runnable {

    private String productId;
    private String clientId;
    private StringRedisTemplate stringRedisTemplate;
    private ConcurrentHashMap<String, ScheduledFuture> cacheMap;
    // Get the reference to the thread pool
    private ScheduledExecutorService scheduledExecutorService;

    /**
     * lua script, purpose atomic operation, get the commodity lock if equal to the current client ID, execute the lock renewal
     */
    private static final String SCRIPT = "if redis.call('get',KEYS[1]) == ARGV[1] then" +
            " local ttl = tonumber(redis.call('ttl',KEYS[1]));" +
            " redis.call('expire',KEYS[1],ttl+ARGV[2]) return redis.call('ttl',KEYS[1]) end";

    public WatchDogThread(String productId,String clientId, StringRedisTemplate stringRedisTemplate, ConcurrentHashMap<String, ScheduledFuture> concurrentHashMap,ScheduledExecutorService scheduledExecutorService) {
        this.clientId = clientId;
        this.productId = productId;
        this.stringRedisTemplate = stringRedisTemplate;
        this.cacheMap = concurrentHashMap;
        this.scheduledExecutorService = scheduledExecutorService;
    }

    @Override
    public void run() {
        String lock = stringRedisTemplate.opsForValue().get(productId);
        try {
            // If the lock is null or the lock obtained is not equal to the current client ID, then just stop the watchdog
            if (StringUtils.isEmpty(lock) || !clientId.equals(lock)) {
                ScheduledFuture scheduledFuture = cacheMap.get(clientId);
                if (scheduledFuture ! = null) {
                    System.out.println("Inventory deduction complete, closing the door dog. ");
                    scheduledFuture.cancel(true);
                    cacheMap.remove(clientId);
                }
                return;
            }
            System.out.println("Execute renewal task ID:"+lock);
            //Execute the lua script to perform the lock renewal atomically
            stringRedisTemplate.execute(new DefaultRedisScript(SCRIPT, Long.class), Collections.singletonList(productId),clientId,"10") ;
            Long expire = stringRedisTemplate.getExpire(productId, TimeUnit.SECONDS);
            System.out.println("time after renewal;"+expire);
        } catch (Exception e) {
            System.out.println("watchdog execution failed"+e.getMessage());
            /**
             * If the watchDog execution renewal task has an exception, directly set the 30 seconds expiration time to prevent the key value from failing, resulting in accidental deletion
             */
            this.stringRedisTemplate.expire(productId,30,TimeUnit.SECONDS);
            /*WatchDogThread watchDogThread = new WatchDogThread(productId,clientId,stringRedisTemplate,this.cacheMap,this. scheduledExecutorService);
            this.scheduledExecutorService.scheduleAtFixedRate(watchDogThread, 1, 5, TimeUnit.SECONDS);*/
        }
    }
}



感想です。

同時実行性が高すぎる場合は、アイテム在庫を分割し、redis-clusterアーキテクチャであれば、一貫したハッシュによって異なるredisにアイテム在庫を割り当てて保存することで、同時実行性を高めることができます。