spark遇到的死锁

xiaoxiao2021-02-28  92

数据库基础知识: http://blog.csdn.net/luyaran/article/details/53502917

 

死锁:如果需要“修改”一条数据,首先数据库管理系统会在上面加锁,以保证在同一时间只有一个事务能进行修改操作。锁定(Locking)发生在当一个事务获得对某一资源的“锁”时,这时,其他的事务就不能更改这个资源了,这种机制的存在是为了保证数据一致性。

 错误: Causedby:java.sql.BatchUpdateException: Deadlock found when trying to get lock;tryrestarting transaction 

 

产生死锁的两个必要条件:

1.使用事物

2.where 条件后面 一个是索引,一个不是索引

 

场景一:

sparkStreaming 在foreachportion(相当于多线程) 里批量执行下面语句

updatebi_publish_back_cost set main_man='' where pkg_code='204022' andperformance_time='2017-05-22'

 

解决方法:

1.删掉其中的一个索引就可以了

2.把两个单独索引改为一个整体索引 (最终解决方法)

3.不使用事物

 

其他错误的尝试:dobatch时,每  100 条execute一次,还是锁住

 

场景二:

sparkStreaming 在foreachportion(相当于多线程) 里批量执行下面语句

INSERT INTObi_gamepublic_base_day_kpi(parent_game_id,child_game_id,medium_channel,ad_site_channel,pkg_code,publish_date,new_regi_device_num)VALUES (?,?,?,?,?,?,?) ON DUPLICATE KEY updateos=?,group_id=?,medium_account=?,promotion_channel=?,promotion_mode=?,head_people=?,new_regi_device_num=new_regi_device_num+ VALUES(new_regi_device_num)

产生死锁的原因: child_game_id,medium_channel,ad_site_channel,pkg_code中  child_game_id 和  pkg_code 为 unique key ,执行这样的语句,更新的时候 unique key  就相当于where,后续原因如场景一

解决方法:

1.把5个字段设置为一个整体索引

2.不使用事物插入(最终解决方法)

 

死锁解决方法总结:

1.多线程 批量更新数据库的时候,where后面的限制条件要么是同一索引,要不都不是索引,不要一般是索引,一半不是索引

2.不要使用 多线程事物插入

 

===多线程死锁

public class DeadLocker {

public static void main(String[] args) {

DeadLock t1 = new DeadLock();

t1.setName("线程1");

DeadLock t2 = new DeadLock();

t2.setName("线程2");

t1.flag = true;

t2.flag = false;

t1.start();

t2.start();

}

}

 

class DeadLock extends Thread {

 

public boolean flag = true;

static Object o1 = new Object();// 定义两个公共资源

static Object o2 = new Object();

 

@Override

public void run() {

// TODO Auto-generated method stub

if (flag) {

System.out.println(Thread.currentThread().getName() + "等待o1");

synchronized (o1) {

System.out.println(Thread.currentThread().getName() + "占用o1,等待o2");

synchronized (o2) {

System.out.println(Thread.currentThread().getName() + "占用o2");

}

System.out.println(Thread.currentThread().getName() + "释放o2");

}

System.out.println(Thread.currentThread().getName() + "释放o1");

} else {

System.out.println(Thread.currentThread().getName() + "等待o2");

synchronized (o2) {

System.out.println(Thread.currentThread().getName() + "占用o2,等待o1");

synchronized (o1) {

System.out.println(Thread.currentThread().getName() + "占用o1");

}

System.out.println(Thread.currentThread().getName() + "释放o1");

}

System.out.println(Thread.currentThread().getName() + "释放o2");

}

}

}

 

 

 

 

 

 

 

转载请注明原文地址: https://www.6miu.com/read-55730.html

最新回复(0)