锁的定义DistributedLock
package pkg1; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Transaction; import redis.clients.jedis.exceptions.JedisException; import java.util.List; import java.util.UUID; import com.sun.xml.internal.fastinfoset.algorithm.IEEE754FloatingPointEncodingAlgorithm; public class DistributedLock { private final JedisPool jedisPool; // 数据库连接池 public DistributedLock(JedisPool jedisPool) { this.jedisPool = jedisPool; } /** * 加锁 * @param locaName 锁的name * @param acquireTimeout 最大请求时间 * @param timeout 锁的生存时间 * @return lockId 锁标识 */ public String lock(String lockName, long acquireTimeout, long timeout) { Jedis conn = jedisPool.getResource(); // 连接对象 String lockId = UUID.randomUUID().toString(); // 锁的ID long end = System.currentTimeMillis() + acquireTimeout; // 请求锁的截止时间,超过这个时间则放弃请求锁 while (System.currentTimeMillis() < end) { // 一直尝试获取锁,除非超时或已经获取到 if (conn.setnx(lockName, lockId) == 1) { // 如果当前名称的锁没有被获取,获取锁,并设置生存时间 conn.expire(lockName, (int)(timeout/1000)); jedisPool.returnResource(conn); return lockId; // 获取锁成功,返回当前锁对应的id } if (conn.ttl(lockName) == -1) { // 如果当前lockName已存在,但没有被设置过生存时间,则设置生存时间 conn.expire(lockName, (int)(timeout/1000)); } } // 获取锁因超时失败,则释放连接,并返回null的lockId jedisPool.returnResource(conn); return null; } /** * 释放锁 * @param lockName 锁的name * @param lockId 释放锁的标识 * @return 是否释放锁成功 */ public boolean unLock(String lockName, String lockId) { Jedis conn = jedisPool.getResource(); boolean flag = false; List<Object> results = null; while (true) { // 没有释放锁,或释放失败,则不断尝试释放 conn.watch(lockName); // 监视lockName,准备开始事务 if (lockId.equals(conn.get(lockName))) { // 判断是否是该锁 Transaction transaction = conn.multi(); // transaction.del(lockName); results = transaction.exec(); if (results == null) { continue; } else { flag = true; } } conn.unwatch(); break; } jedisPool.returnResource(conn); return flag; } }消费线程
package pkg1; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class Service { private static JedisPool pool = null; static { JedisPoolConfig config = new JedisPoolConfig(); // 设置最大连接数 config.setMaxActive(200); // 设置最大空闲数 config.setMaxIdle(8); // 设置最大等待时间 config.setMaxWait(1000 * 100); // 在borrow一个jedis实例时,是否需要验证,若为true,则所有jedis实例均是可用的 config.setTestOnBorrow(true); pool = new JedisPool(config, "127.0.0.1", 6379, 3000); } DistributedLock lock = new DistributedLock(pool); int n = 500; public void seckill() { // 返回锁的value值,供释放锁时候进行判断 String lockId = lock.lock("resource", 5000, 1000); System.out.println(Thread.currentThread().getName() + "获得了锁"); System.out.println(--n); lock.unLock("resource", lockId); } }多线程竞争
package pkg1; public class MyThread extends Thread { private Service service; public MyThread(Service service) { this.service = service; } @Override public void run() { service.seckill(); } }测试部分
package pkg1; public class Test { public static void main(String[] args) { Service service = new Service(); for (int i = 0; i < 50; i++) { MyThread thread = new MyThread(service); thread.start(); } } }