From b7bdb1126d59dc99f9b5a3ac040f76d9fb5ad11e Mon Sep 17 00:00:00 2001 From: lijing03 <1481281891@qq.com> Date: Thu, 4 Jun 2026 16:56:27 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=B0=E5=BD=95=E5=88=86=E5=B8=83=E5=BC=8F?= =?UTF-8?q?=E9=94=81=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/repositories/DistributedLockRepo.java | 23 +++++ .../api/GspDistributedLockEntity.java | 39 ++++++++ caf-lock/caf-lock-redis/pom.xml | 4 + .../redis/DistributedLockFactoryImpl.java | 91 ++++++++++++++++- .../redis/DistributedLocksFactory.java | 98 ++++++++++++++++++- 5 files changed, 247 insertions(+), 8 deletions(-) create mode 100644 caf-lock/caf-lock-api/src/main/java/io/iec/edp/caf/lock/service/api/repositories/DistributedLockRepo.java create mode 100644 caf-lock/caf-lock-api/src/main/java/io/iec/edp/caf/lockservice/api/GspDistributedLockEntity.java diff --git a/caf-lock/caf-lock-api/src/main/java/io/iec/edp/caf/lock/service/api/repositories/DistributedLockRepo.java b/caf-lock/caf-lock-api/src/main/java/io/iec/edp/caf/lock/service/api/repositories/DistributedLockRepo.java new file mode 100644 index 000000000..fb9ee0ded --- /dev/null +++ b/caf-lock/caf-lock-api/src/main/java/io/iec/edp/caf/lock/service/api/repositories/DistributedLockRepo.java @@ -0,0 +1,23 @@ +package io.iec.edp.caf.lock.service.api.repositories; + +import io.iec.edp.caf.lockservice.api.GspDistributedLockEntity; + +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + + +import java.time.OffsetDateTime; + +public interface DistributedLockRepo extends JpaRepository { + + + @Modifying + @Query("delete from GspDistributedLockEntity b where b.lockedTime <= :lockedTime") + @Transactional(propagation= Propagation.REQUIRES_NEW) + void deleteByLockedTime(@Param("lockedTime") OffsetDateTime lockedTime); + +} diff --git a/caf-lock/caf-lock-api/src/main/java/io/iec/edp/caf/lockservice/api/GspDistributedLockEntity.java b/caf-lock/caf-lock-api/src/main/java/io/iec/edp/caf/lockservice/api/GspDistributedLockEntity.java new file mode 100644 index 000000000..7df5de626 --- /dev/null +++ b/caf-lock/caf-lock-api/src/main/java/io/iec/edp/caf/lockservice/api/GspDistributedLockEntity.java @@ -0,0 +1,39 @@ +package io.iec.edp.caf.lockservice.api; + +import lombok.Data; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; +import java.time.OffsetDateTime; + + +@Entity +@Table(name = "gspdistributedlock") +@Data +public class GspDistributedLockEntity { + + @Id + private String id; + + private String ip; + + @Column(name = "threadname") + private String threadName; + + @Column(name = "resourcekey") + private String resourceKey; + + @Column(name = "lockedtime") + private OffsetDateTime lockedTime; + + @Column(name = "expirytime") + private OffsetDateTime expiryTime; + + @Column(name = "waittime") + private OffsetDateTime waitTime; + + private String description; + +} diff --git a/caf-lock/caf-lock-redis/pom.xml b/caf-lock/caf-lock-redis/pom.xml index 556e7dddb..d735a1ef9 100644 --- a/caf-lock/caf-lock-redis/pom.xml +++ b/caf-lock/caf-lock-redis/pom.xml @@ -40,6 +40,10 @@ io.iec.edp caf-commons-serialization + + io.iec.edp + caf-msu-api + org.redisson diff --git a/caf-lock/caf-lock-redis/src/main/java/io/iec/edp/caf/lock/service/redis/DistributedLockFactoryImpl.java b/caf-lock/caf-lock-redis/src/main/java/io/iec/edp/caf/lock/service/redis/DistributedLockFactoryImpl.java index bd35227de..a2d4fd5a8 100644 --- a/caf-lock/caf-lock-redis/src/main/java/io/iec/edp/caf/lock/service/redis/DistributedLockFactoryImpl.java +++ b/caf-lock/caf-lock-redis/src/main/java/io/iec/edp/caf/lock/service/redis/DistributedLockFactoryImpl.java @@ -18,10 +18,14 @@ package io.iec.edp.caf.lock.service.redis; import io.iec.edp.caf.commons.exception.ExceptionLevel; +import io.iec.edp.caf.commons.runtime.CafEnvironment; import io.iec.edp.caf.lock.service.api.api.DistributedLock; import io.iec.edp.caf.lock.service.api.api.DistributedLockFactory; import io.iec.edp.caf.lock.service.api.exception.DLErrorDefinition; import io.iec.edp.caf.lock.service.api.exception.DistributedLockException; +import io.iec.edp.caf.lock.service.api.repositories.DistributedLockRepo; +import io.iec.edp.caf.lockservice.api.GspDistributedLockEntity; +import io.iec.edp.caf.msu.api.entity.MsuProperties; import lombok.extern.slf4j.Slf4j; import org.redisson.Redisson; import org.redisson.api.RLock; @@ -30,9 +34,12 @@ import org.redisson.config.Config; import java.text.SimpleDateFormat; import java.time.Duration; +import java.time.OffsetDateTime; import java.util.Date; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * 分布式锁创建服务实现类 @@ -44,7 +51,34 @@ public class DistributedLockFactoryImpl implements DistributedLockFactory,AutoCl // @Autowired串 private static RedissonClient redisson; private static Object lockObj=new Object(); + private MsuProperties configuration; private Config redissonConfig; + private DistributedLockRepo distributedLockRepo; + + private static final ExecutorService asyncExecutor = new ThreadPoolExecutor( + 16, + 128, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(100), + new ThreadFactory() { + private AtomicInteger id = new AtomicInteger(0); + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setName("lock-record-async-" + id.addAndGet(1)); + return thread; + } + }, + //自定义线程池策略,防止线程池报错影响加锁功能 + (r, executor) -> { + log.error("Thread pool is full, discarding async lock record task. This does not affect locking functionality. Pool size: {}, Active threads: {}, Queue size: {}", + executor.getPoolSize(), + executor.getActiveCount(), + executor.getQueue().size()); + } + ); + private static RedissonClient getClient(Config redissonConfig) { if(redisson!=null){ @@ -63,8 +97,10 @@ public class DistributedLockFactoryImpl implements DistributedLockFactory,AutoCl * 构造函数 * @param redissonConfig */ - public DistributedLockFactoryImpl(Config redissonConfig){ + public DistributedLockFactoryImpl(Config redissonConfig,MsuProperties configuration,DistributedLockRepo distributedLockRepo){ this.redissonConfig = redissonConfig; + this.configuration=configuration; + this.distributedLockRepo=distributedLockRepo; } /** @@ -86,23 +122,25 @@ public class DistributedLockFactoryImpl implements DistributedLockFactory,AutoCl SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); log.debug("addLock: "+cost+"ms。"+" resource: "+resource+"。" +" currentThread: "+Thread.currentThread().getId()+"。"+" currentTime: "+simpleDateFormat.format(new Date())); } + saveLockRecordAsync(resource, null,expiryTime,"Execute Create Lock"); return new DistributedLockImpl(resource, redLock); } @Override - public DistributedLock createMultiLock(String resoucePrefix, List dataIds, Duration expiryTime){ + public DistributedLock createMultiLock(String resourcePrefix, List dataIds, Duration expiryTime){ DistributedLock distributedLock=null; if(dataIds!=null&&dataIds.size()>0){ RLock[] rLocks=new RLock[dataIds.size()]; RedissonClient redissonClient= getClient(this.redissonConfig); for(int i=0;i { + try { + String ip = CafEnvironment.getLocalIp(configuration.getIp().getPrefix()); + OffsetDateTime now = OffsetDateTime.now(); + GspDistributedLockEntity entity = new GspDistributedLockEntity(); + entity.setId(UUID.randomUUID().toString()); + entity.setIp(ip); + entity.setThreadName(threadName); + entity.setResourceKey(resourceKey); + entity.setLockedTime(now); + if (waitTime!=null){ + entity.setWaitTime(now.plus(waitTime)); + } + if( expiryTime!=null){ + entity.setExpiryTime(now.plus(expiryTime)); + } + entity.setDescription(description); + distributedLockRepo.save(entity); + } catch (Exception e) { + // 异步任务内部异常不应影响主流程,仅记录日志 + log.error("Failed to save distributed lock record asynchronously for resource: {}", resourceKey, e); + } + }); + } catch (Exception e) { + log.error("Unexpected error submitting async task for resource: {}", resourceKey, e); + } + } } diff --git a/caf-lock/caf-lock-redis/src/main/java/io/iec/edp/caf/lock/service/redis/DistributedLocksFactory.java b/caf-lock/caf-lock-redis/src/main/java/io/iec/edp/caf/lock/service/redis/DistributedLocksFactory.java index 56b565c66..e5cb2b87d 100644 --- a/caf-lock/caf-lock-redis/src/main/java/io/iec/edp/caf/lock/service/redis/DistributedLocksFactory.java +++ b/caf-lock/caf-lock-redis/src/main/java/io/iec/edp/caf/lock/service/redis/DistributedLocksFactory.java @@ -18,11 +18,15 @@ package io.iec.edp.caf.lock.service.redis; import io.iec.edp.caf.commons.exception.ExceptionLevel; +import io.iec.edp.caf.commons.runtime.CafEnvironment; import io.iec.edp.caf.lock.service.api.api.DistributedLock; import io.iec.edp.caf.lock.service.api.exception.DLErrorDefinition; import io.iec.edp.caf.lock.service.api.exception.DistributedLockException; +import io.iec.edp.caf.lock.service.api.repositories.DistributedLockRepo; +import io.iec.edp.caf.lockservice.api.GspDistributedLockEntity; import io.iec.edp.caf.lockservice.api.IDistributedLock; import io.iec.edp.caf.lockservice.api.IDistributedLockFactory; +import io.iec.edp.caf.msu.api.entity.MsuProperties; import lombok.extern.slf4j.Slf4j; import org.redisson.Redisson; import org.redisson.api.RLock; @@ -30,7 +34,10 @@ import org.redisson.api.RedissonClient; import org.redisson.config.Config; import java.time.Duration; -import java.util.concurrent.TimeUnit; +import java.time.OffsetDateTime; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * 分布式锁创建服务实现类 @@ -38,22 +45,51 @@ import java.util.concurrent.TimeUnit; * @date 2019/8/21 14:22 * */ +@Slf4j @Deprecated //todo 被com.inspur.edp.cdp.coderule.runtime.server.lock.DistributedLockService依赖 -public class DistributedLocksFactory implements IDistributedLockFactory { +public class DistributedLocksFactory implements IDistributedLockFactory,AutoCloseable{ // @Autowired串 private static RedissonClient redisson; private static Object lockObj=new Object(); private Config redissonConfig; + private MsuProperties configuration; + private DistributedLockRepo distributedLockRepo; + + private static final ExecutorService asyncExecutor = new ThreadPoolExecutor( + 16, + 128, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(100), + new ThreadFactory() { + private AtomicInteger id = new AtomicInteger(0); + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setName("lock-record-async-" + id.addAndGet(1)); + return thread; + } + }, + //自定义线程池策略,防止线程池报错影响加锁功能 + (r, executor) -> { + log.error("Thread pool is full, discarding async lock record task. Pool size: {}, Active threads: {}, Queue size: {}", + executor.getPoolSize(), + executor.getActiveCount(), + executor.getQueue().size()); + } + ); /** * 构造函数 * * @param redissonConfig */ - public DistributedLocksFactory(Config redissonConfig) { + public DistributedLocksFactory(Config redissonConfig,MsuProperties configuration,DistributedLockRepo distributedLockRepo) { this.redissonConfig = redissonConfig; + this.configuration=configuration; + this.distributedLockRepo=distributedLockRepo; } private static RedissonClient getClient(Config redissonConfig) @@ -84,7 +120,7 @@ public class DistributedLocksFactory implements IDistributedLockFactory { public IDistributedLock createLock(String resource, Duration expiryTime) { RLock redLock = getClient(this.redissonConfig).getLock(resource); redLock.lock(expiryTime.getSeconds(), TimeUnit.SECONDS); -// log.error("分布式锁加锁操作DistributedLocksFactory+createLock方法1,数据为"+resource,new Exception()); + saveLockRecordAsync(resource, null,expiryTime,"Execute Create Lock"); return new DistributedLockOld(resource, redLock); } @@ -114,4 +150,58 @@ public class DistributedLocksFactory implements IDistributedLockFactory { throw new DistributedLockException("pfcomm", DLErrorDefinition.AddLock_IsAcquired_Error, null, e, ExceptionLevel.Error, false); } } + /** + * 关闭相关资源 + * @throws Exception + */ + @Override + public void close() throws Exception { + if(this.redisson!=null) + this.redisson.shutdown(); + + // 关闭线程池 + if (asyncExecutor != null && !asyncExecutor.isTerminated()) { + asyncExecutor.shutdown(); + try { + // 等待现有任务完成,最多等待60秒 + if (!asyncExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + asyncExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + asyncExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + private void saveLockRecordAsync(String resourceKey,Duration waitTime,Duration expiryTime, String description) { + try { + String threadName=Thread.currentThread().getName(); + asyncExecutor.execute(() -> { + try { + String ip = CafEnvironment.getLocalIp(configuration.getIp().getPrefix()); + OffsetDateTime now = OffsetDateTime.now(); + GspDistributedLockEntity entity = new GspDistributedLockEntity(); + entity.setId(UUID.randomUUID().toString()); + entity.setIp(ip); + entity.setThreadName(threadName); + entity.setResourceKey(resourceKey); + entity.setLockedTime(now); + if (waitTime!=null){ + entity.setWaitTime(now.plus(waitTime)); + } + if( expiryTime!=null){ + entity.setExpiryTime(now.plus(expiryTime)); + } + entity.setDescription(description); + distributedLockRepo.save(entity); + } catch (Exception e) { + // 异步任务内部异常不应影响主流程,仅记录日志 + log.error("Failed to save distributed lock record asynchronously for resource: {}", resourceKey, e); + } + }); + } catch (Exception e) { + log.error("Unexpected error submitting async task for resource: {}", resourceKey, e); + } + } } -- Gitee