Java Design Demo -简单的队列-异步多任务队列(java android)

xiaoxiao2021-02-27  161

Demo代码下载地址

简单的单线程队列 – 工作的时候遇到劣质打印机。给打印机发消息,打印机就会打印,如果在打印机还在打印的时候,就

再发消息打印,就会出现消息丢失。所以需要给上一个任务一些处理的间隔时间.

单线程的消息队列示例

[java] view plain copy print ? package demo1;    import java.util.LinkedList;    public class Main {        /**      * @param args      */        private static Thread thread;      private static LinkedList<Runnable> list = new LinkedList<Runnable>();        static int test = 0;        public static void main(String[] args) {          // TODO Auto-generated method stub          final long time = System.currentTimeMillis();          for (int i = 0; i < 20; i++) {                tastEvent(new Runnable() {                  public void run() {                              try {                              Thread.sleep(500);                          } catch (InterruptedException e) {                              // TODO Auto-generated catch block                              e.printStackTrace();                          }                            System.out                                  .println(”第”                                          + (++test)                                          + (”个任务  耗时:” + (System                                                  .currentTimeMillis() - time)));                      }                });          }      }        public static void tastEvent(Runnable r) {          synchronized (list) {              list.add(r);          }              if (thread == null) {                  thread = new Thread(run);                  thread.start();              }          }        static Runnable run = new Runnable() {            @Override          public void run() {              // TODO Auto-generated method stub              synchronized (list) {                    while (!list.isEmpty()) {                      // new Thread(list.poll()).start();                      list.poll().run();                  }                  thread = null;              }          }      };    }   package demo1; import java.util.LinkedList; public class Main { /** * @param args */ private static Thread thread; private static LinkedList<Runnable> list = new LinkedList<Runnable>(); static int test = 0; public static void main(String[] args) { // TODO Auto-generated method stub final long time = System.currentTimeMillis(); for (int i = 0; i < 20; i++) { tastEvent(new Runnable() { public void run() { try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out .println("第" + (++test) + ("个任务 耗时:" + (System .currentTimeMillis() - time))); } }); } } public static void tastEvent(Runnable r) { synchronized (list) { list.add(r); } if (thread == null) { thread = new Thread(run); thread.start(); } } static Runnable run = new Runnable() { @Override public void run() { // TODO Auto-generated method stub synchronized (list) { while (!list.isEmpty()) { // new Thread(list.poll()).start(); list.poll().run(); } thread = null; } } }; }

工作的时候遇到非常大的并发的情况,比如机器1秒只支持1000的并发,但是1秒接收了4000的并发。服务器就会崩掉。

最好将并发放到队列中,按1000的并发吞吐量来处理,这就是异步队列应用。

一个工程交给一个人做,需要花费3个月,交给2个人做,需要2个人做需要2个月,需要3个人做需要1个月半…..100.人…..1000人,几年也完不成。

带上以上道理看待以下的代码

观察以下代码(复制到Android工程下运行):最后耗时约1600毫秒 而使用android的AsyncTask类来改写这段代码只需要耗时约200 

[java] view plain copy print ? final long  timer=System.currentTimeMillis();  count=0;  final Handler h=new Handler();  for(int k=0;k<100;k++){  new Thread(){      @Override      public void run() {          // TODO Auto-generated method stub          try {              Thread.sleep(10);          } catch (InterruptedException e) {              // TODO Auto-generated catch block              e.printStackTrace();          }          h.post(new Runnable() {              @Override              public void run() {                  Toast.makeText(getApplicationContext(),” 耗时”+ (System.currentTimeMillis() - timer), 1).show();                  System.err.println(”编号”+(count++)+“线程消耗了”+(System.currentTimeMillis()-timer));              }          });      }  }.start();   final long timer=System.currentTimeMillis(); count=0; final Handler h=new Handler(); for(int k=0;k<100;k++){ new Thread(){ @Override public void run() { // TODO Auto-generated method stub try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } h.post(new Runnable() { @Override public void run() { Toast.makeText(getApplicationContext()," 耗时"+ (System.currentTimeMillis() - timer), 1).show(); System.err.println("编号"+(count++)+"线程消耗了"+(System.currentTimeMillis()-timer)); } }); } }.start();

可见增加多线程不提高性能,反而因为系统在不同的线程之间切换降低效率。因此我们需要让线程有序执行任务

以下是异步多线程处理队列的demo

[java] view plain copy print ? package demo2;    import demo2.Task.OnFinishListen;    public class Main {        /**      * @param args      */      public static void main(String[] args) {          // TODO Auto-generated method stub                    Task.setThreadMaxNum(3);          for (int i = 0; i < 15; i++) {              new Task() {                    @Override                  public Object obtainData(Task task, Object parameter)                          throws Exception {                      // TODO Auto-generated method stub                      Thread.sleep(500);                      return task.taskID;                  }                }              .setOnFinishListen(new OnFinishListen() {                    @Override                  public void onFinish(Task task, Object data) {                      // TODO Auto-generated method stub                      System.err.println(”任务编号”+task.taskID+“任务完成”);                  }              })              .setTaskID(i)              .start();          }      }    }   package demo2; import demo2.Task.OnFinishListen; public class Main { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub Task.setThreadMaxNum(3); for (int i = 0; i < 15; i++) { new Task() { @Override public Object obtainData(Task task, Object parameter) throws Exception { // TODO Auto-generated method stub Thread.sleep(500); return task.taskID; } } .setOnFinishListen(new OnFinishListen() { @Override public void onFinish(Task task, Object data) { // TODO Auto-generated method stub System.err.println("任务编号"+task.taskID+"任务完成"); } }) .setTaskID(i) .start(); } } }

[java] view plain copy print ? package demo2;    import java.util.HashMap;    import java.util.Map;  import java.util.Observable;  import java.util.Observer;    public abstract class Task<P,R> implements Runnable, Observer,TaskAction<P,R>{                      //设置最大任务数      public static void setThreadMaxNum(int num) {          TaskQueue.ThreadMaxNum = num<1?1:num>100?100:num;      }                    public static enum TaskPriority {          max, min;      }        /** 单例 可以提高性能 */      protected final static Exception withoutException = new Exception(              ”The state is without”);        // 名字映射      private static HashMap<String, Task> nameTasks;        public static HashMap<String, Task> getNameTask() {          if (nameTasks == null) {              nameTasks = new HashMap<String, Task>();          }          return nameTasks;        }        public Task<P,R> setSingletonName(String singletonName) {          this.singletonName = singletonName;          return this;      }        public String getSingletonName() {          return singletonName;      }        public interface OnStartListen {          void onStart(Task t);      }        public interface OnProgressListen {          void onProgress(Task task, int progress, Object data);      }        public static interface OnFinishListen<P,R> {          void onFinish(Task<P,R> task, R data);      }        public interface OnSystemStartListen {          void onSystemStart(Task task);      }        public interface OnSystemFinishListen {          void OnSystemFinish(Task t, Object data);      }                /** 请求参数 */      protected P parameter;      /** 任务开始监听 */      protected OnStartListen onStartListen;      /** 任务进度监听 */      protected OnProgressListen onProgressListen;      /** 任务完成监听 */      protected OnFinishListen<P,R> onFinishListen;      /** 任务在队列中完成 监听 */      protected OnSystemStartListen onSystemStartListen;      /** 任务在队列中开始 监听 */      protected OnSystemFinishListen onSystemFinishListen;      /** 用于任务完成后发送消息 */      /** 结果 */      protected R result;      /** 任务编号标示 */      protected int taskID = -1;      /** 任务名字标示 */      /** 设置此任务名是否为单例,单例模式下,如果相同名字的任务未执行完,则无法添加新任务 */      protected String singletonName;        /** 保存一个对象 */      protected Object tag;      /** 获得当前自身线程的引用 在threadRun方法 */      protected Thread thread;      /** 重连次数 */      protected int tryAgainCount = 1;      /** 重连间隔 */      protected int tryAgainTime = 1000;            /** 默认优先级低 */      protected TaskPriority priority = TaskPriority.min;          protected HashMap<String,Object> dataMap;              protected Task() {      }            // 任务状态      public static enum TaskStatus {          // 未处理 出错 完成 执行中 排除          untreated, wait,error, finsh, running, without;      }        /** 状态 */      TaskStatus status = TaskStatus.untreated;        public void setWithout() {          this.status = TaskStatus.without;      }        public void remove() {          this.status = TaskStatus.without;      }        public TaskPriority getPriority() {          return priority;      }        public void setPriority(TaskPriority priority) {          this.priority = priority;      }                /** 启动线程 */      public void start() {          if (this.priority == null)              this.priority = TaskPriority.min;                    synchronized (TaskQueue.tasks_wait) {              if (getSingletonName() != null                      && Task.getNameTask().get(this.getSingletonName()) != null) {                  this.setWithout();              } else {                  Task.getNameTask().put(this.getSingletonName(), this);                }                switch (priority) {              case min:                  TaskQueue.tasks_wait.remove(this);                  TaskQueue.tasks_wait.add(this);                  break;              case max:                  TaskQueue.tasks_wait.remove(this);                  TaskQueue.tasks_wait.addFirst(this);                  break;              default:                  break;              }              // 启动此服务              TaskQueue.serivesRun();          }                }        /** 启动线程 */      public void start(TaskPriority priority) {                              this.priority = priority;          status=TaskStatus.wait;          start();      }          /** 启动线程 */      final void threadRun() {          thread = new Thread(this);          thread.start();      }        // 中断Execute方法      public  void shutDownExecute(){};        public  R cacheData(P parameter){          return result;};        // 禁止被重写      public final Object Execute() throws Exception {          // TODO Auto-generated method stub          if (onStartListen != null)              onStartListen.onStart(this);            // 队列中回调          if (onSystemStartListen != null)              onSystemStartListen.onSystemStart(this);          // 状态从未处理改变为处理中          status = TaskStatus.running;            // 获取最后一次是否错误          Exception exception = null;          // 是否有缓存数据如果没有          if ((result = cacheData(parameter)) == null) {                // 失败重联次数              for (int i = 0; i < tryAgainCount; i++) {                  try {                      // 如果状态改变为排除则跳出失败重联                      if (status == TaskStatus.without) {                          break;                      }                      exception = null;                      result = obtainData(this, parameter);                      System.out.println(”result=” + result);                      break;                  } catch (Exception e) {                      // TODO Auto-generated catch block                      if ((exception = e) == withoutException) {                          break;                      }                      e.printStackTrace();                      try {                          Thread.sleep(tryAgainTime);                      } catch (Exception e1) {                          // TODO Auto-generated catch block                          e1.printStackTrace();                      }                  }              }          }          // 如果最后一次仍然失败则抛出          if (exception != null) {              throw exception;          }              // 如果状态改变为处理完但不通知          if (status != TaskStatus.without) {                if (onFinishListen != null) {                  //完成监听并将结果加入到主线程                  onFinishListen.onFinish(this, result);              }              ;              }          if (onSystemFinishListen != null) {              onSystemFinishListen.OnSystemFinish(this, result);          }          status = TaskStatus.finsh;          return result;      }        public abstract  R obtainData(Task<P,R> task, P parameter)throws Exception;        @Override      public void update(Observable observable, Object data) {          // 移除观察          observable.deleteObserver(this);          // 中断 停止关闭连接          this.shutDownExecute();          this.setWithout();          if (this.thread != null) {              this.thread.interrupt();          }          // 错误尝试次数为0          this.tryAgainCount = 0;      };        @Override      public void run() {            try {              Execute();          } catch (Exception e) {              e.printStackTrace();              status = TaskStatus.error;                                // 如果状态改变为处理完但不通知              if (status != TaskStatus.without) {                                    if (onFinishListen != null) {                      //将结果加入到主线程                      onFinishListen.onFinish(this, result);                  }                }              if (onSystemFinishListen != null) {                  onSystemFinishListen.OnSystemFinish(this, e);              }          }            //递归 避免新开线程   唤醒等待中的任务           TaskQueue.getRunnable().notifyWaitingTask();                }            public Object getTag() {          return tag;      }        public Task setTag(Object tag) {          this.tag = tag;          return this;      }        public Thread getThread() {          return thread;      }        public TaskStatus getStatus() {          return status;      }        public Object getParameter() {          return parameter;      }        public Task setParameter(P parameter) {          this.parameter = parameter;          return this;      }        public OnStartListen getOnStartListen() {          return onStartListen;      }        public Task setOnStartListen(OnStartListen onStartListen) {          this.onStartListen = onStartListen;          return this;      }        public OnProgressListen getOnProgressListen() {          return onProgressListen;      }        public Task setOnProgressListen(OnProgressListen onProgressListen) {          this.onProgressListen = onProgressListen;          return this;      }        public OnFinishListen getOnFinishListen() {          return onFinishListen;      }        public Task setOnFinishListen(OnFinishListen onFinishListen) {          this.onFinishListen = onFinishListen;          return this;      }        public OnSystemStartListen getOnSystemStartListen() {          return onSystemStartListen;      }        public OnSystemFinishListen getOnSystemFinishListen() {          return onSystemFinishListen;      }        public void setOnSystemFinishListen(              OnSystemFinishListen onSystemFinishListen) {          this.onSystemFinishListen = onSystemFinishListen;      }          public int getTaskID() {          return taskID;      }        public Task setTaskID(int taskID) {          this.taskID = taskID;          return this;      }        public Object getResult() {          return result;      }        public int getTryAgainCount() {          return tryAgainCount;      }        public Task setTryAgainCount(int tryAgainCount) {          this.tryAgainCount = tryAgainCount;          return this;      }        public int getTryAgainTime() {          return tryAgainTime;      }        private Task setTryAgainTime(int tryAgainTime) {          this.tryAgainTime = tryAgainTime;          return this;      }                public Object  put(String key,Object value) {          if(dataMap==null)          {              dataMap=new HashMap<String, Object>();          }          return dataMap.put(key, value);      }      public Object  get(String key,Object value) {          if(dataMap==null)          {              dataMap=new HashMap<String, Object>();          }          return dataMap.get(key);      }              }   package demo2; import java.util.HashMap; import java.util.Map; import java.util.Observable; import java.util.Observer; public abstract class Task<P,R> implements Runnable, Observer,TaskAction<P,R>{ //设置最大任务数 public static void setThreadMaxNum(int num) { TaskQueue.ThreadMaxNum = num<1?1:num>100?100:num; } public static enum TaskPriority { max, min; } /** 单例 可以提高性能 */ protected final static Exception withoutException = new Exception( "The state is without"); // 名字映射 private static HashMap<String, Task> nameTasks; public static HashMap<String, Task> getNameTask() { if (nameTasks == null) { nameTasks = new HashMap<String, Task>(); } return nameTasks; } public Task<P,R> setSingletonName(String singletonName) { this.singletonName = singletonName; return this; } public String getSingletonName() { return singletonName; } public interface OnStartListen { void onStart(Task t); } public interface OnProgressListen { void onProgress(Task task, int progress, Object data); } public static interface OnFinishListen<P,R> { void onFinish(Task<P,R> task, R data); } public interface OnSystemStartListen { void onSystemStart(Task task); } public interface OnSystemFinishListen { void OnSystemFinish(Task t, Object data); } /** 请求参数 */ protected P parameter; /** 任务开始监听 */ protected OnStartListen onStartListen; /** 任务进度监听 */ protected OnProgressListen onProgressListen; /** 任务完成监听 */ protected OnFinishListen<P,R> onFinishListen; /** 任务在队列中完成 监听 */ protected OnSystemStartListen onSystemStartListen; /** 任务在队列中开始 监听 */ protected OnSystemFinishListen onSystemFinishListen; /** 用于任务完成后发送消息 */ /** 结果 */ protected R result; /** 任务编号标示 */ protected int taskID = -1; /** 任务名字标示 */ /** 设置此任务名是否为单例,单例模式下,如果相同名字的任务未执行完,则无法添加新任务 */ protected String singletonName; /** 保存一个对象 */ protected Object tag; /** 获得当前自身线程的引用 在threadRun方法 */ protected Thread thread; /** 重连次数 */ protected int tryAgainCount = 1; /** 重连间隔 */ protected int tryAgainTime = 1000; /** 默认优先级低 */ protected TaskPriority priority = TaskPriority.min; protected HashMap<String,Object> dataMap; protected Task() { } // 任务状态 public static enum TaskStatus { // 未处理 出错 完成 执行中 排除 untreated, wait,error, finsh, running, without; } /** 状态 */ TaskStatus status = TaskStatus.untreated; public void setWithout() { this.status = TaskStatus.without; } public void remove() { this.status = TaskStatus.without; } public TaskPriority getPriority() { return priority; } public void setPriority(TaskPriority priority) { this.priority = priority; } /** 启动线程 */ public void start() { if (this.priority == null) this.priority = TaskPriority.min; synchronized (TaskQueue.tasks_wait) { if (getSingletonName() != null && Task.getNameTask().get(this.getSingletonName()) != null) { this.setWithout(); } else { Task.getNameTask().put(this.getSingletonName(), this); } switch (priority) { case min: TaskQueue.tasks_wait.remove(this); TaskQueue.tasks_wait.add(this); break; case max: TaskQueue.tasks_wait.remove(this); TaskQueue.tasks_wait.addFirst(this); break; default: break; } // 启动此服务 TaskQueue.serivesRun(); } } /** 启动线程 */ public void start(TaskPriority priority) { this.priority = priority; status=TaskStatus.wait; start(); } /** 启动线程 */ final void threadRun() { thread = new Thread(this); thread.start(); } // 中断Execute方法 public void shutDownExecute(){}; public R cacheData(P parameter){ return result;}; // 禁止被重写 public final Object Execute() throws Exception { // TODO Auto-generated method stub if (onStartListen != null) onStartListen.onStart(this); // 队列中回调 if (onSystemStartListen != null) onSystemStartListen.onSystemStart(this); // 状态从未处理改变为处理中 status = TaskStatus.running; // 获取最后一次是否错误 Exception exception = null; // 是否有缓存数据如果没有 if ((result = cacheData(parameter)) == null) { // 失败重联次数 for (int i = 0; i < tryAgainCount; i++) { try { // 如果状态改变为排除则跳出失败重联 if (status == TaskStatus.without) { break; } exception = null; result = obtainData(this, parameter); System.out.println("result=" + result); break; } catch (Exception e) { // TODO Auto-generated catch block if ((exception = e) == withoutException) { break; } e.printStackTrace(); try { Thread.sleep(tryAgainTime); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } } // 如果最后一次仍然失败则抛出 if (exception != null) { throw exception; } // 如果状态改变为处理完但不通知 if (status != TaskStatus.without) { if (onFinishListen != null) { //完成监听并将结果加入到主线程 onFinishListen.onFinish(this, result); } ; } if (onSystemFinishListen != null) { onSystemFinishListen.OnSystemFinish(this, result); } status = TaskStatus.finsh; return result; } public abstract R obtainData(Task<P,R> task, P parameter)throws Exception; @Override public void update(Observable observable, Object data) { // 移除观察 observable.deleteObserver(this); // 中断 停止关闭连接 this.shutDownExecute(); this.setWithout(); if (this.thread != null) { this.thread.interrupt(); } // 错误尝试次数为0 this.tryAgainCount = 0; }; @Override public void run() { try { Execute(); } catch (Exception e) { e.printStackTrace(); status = TaskStatus.error; // 如果状态改变为处理完但不通知 if (status != TaskStatus.without) { if (onFinishListen != null) { //将结果加入到主线程 onFinishListen.onFinish(this, result); } } if (onSystemFinishListen != null) { onSystemFinishListen.OnSystemFinish(this, e); } } //递归 避免新开线程 唤醒等待中的任务 TaskQueue.getRunnable().notifyWaitingTask(); } public Object getTag() { return tag; } public Task setTag(Object tag) { this.tag = tag; return this; } public Thread getThread() { return thread; } public TaskStatus getStatus() { return status; } public Object getParameter() { return parameter; } public Task setParameter(P parameter) { this.parameter = parameter; return this; } public OnStartListen getOnStartListen() { return onStartListen; } public Task setOnStartListen(OnStartListen onStartListen) { this.onStartListen = onStartListen; return this; } public OnProgressListen getOnProgressListen() { return onProgressListen; } public Task setOnProgressListen(OnProgressListen onProgressListen) { this.onProgressListen = onProgressListen; return this; } public OnFinishListen getOnFinishListen() { return onFinishListen; } public Task setOnFinishListen(OnFinishListen onFinishListen) { this.onFinishListen = onFinishListen; return this; } public OnSystemStartListen getOnSystemStartListen() { return onSystemStartListen; } public OnSystemFinishListen getOnSystemFinishListen() { return onSystemFinishListen; } public void setOnSystemFinishListen( OnSystemFinishListen onSystemFinishListen) { this.onSystemFinishListen = onSystemFinishListen; } public int getTaskID() { return taskID; } public Task setTaskID(int taskID) { this.taskID = taskID; return this; } public Object getResult() { return result; } public int getTryAgainCount() { return tryAgainCount; } public Task setTryAgainCount(int tryAgainCount) { this.tryAgainCount = tryAgainCount; return this; } public int getTryAgainTime() { return tryAgainTime; } private Task setTryAgainTime(int tryAgainTime) { this.tryAgainTime = tryAgainTime; return this; } public Object put(String key,Object value) { if(dataMap==null) { dataMap=new HashMap<String, Object>(); } return dataMap.put(key, value); } public Object get(String key,Object value) { if(dataMap==null) { dataMap=new HashMap<String, Object>(); } return dataMap.get(key); } } [java] view plain copy print ? package demo2;    import java.util.AbstractCollection;  import java.util.ArrayList;  import java.util.Collections;  import java.util.Iterator;  import java.util.LinkedList;  import java.util.List;  import java.util.Queue;  import java.util.Random;    import demo2.Task.OnSystemFinishListen;  import demo2.Task.TaskStatus;      public class TaskQueue implements Runnable, OnSystemFinishListen {      static String debug = “TaskQueue”;      @SuppressWarnings(“unchecked”)      // 在等待的任务队列       static LinkedList<Task> tasks_wait = new LinkedList<Task>();        public static class TaskQueueExpection extends Exception{          TaskQueueExpection(String detailMessage) {              super(detailMessage);              // TODO Auto-generated constructor stub          }                };            // 正在执行的任务       static ArrayList<Task> tasks_running = new ArrayList<Task>();      // 是否持续运行      public static boolean isRun=true;      // runnable保证线程安全      private static TaskQueue runnable = new TaskQueue();;      // 最大线程数      static int ThreadMaxNum = 1;        public static TaskQueue getRunnable() {          return runnable;      }        // 如果队列线程为空或者停止则重新开启      public static void serivesRun() {          // TODO Auto-generated method stub          boolean isCanSeriver=false;          synchronized (tasks_running) {              isCanSeriver=tasks_running.size() < ThreadMaxNum;          }         runnable.run();      }            //获取正在执行的任务数      public static int getRunningTaskCount() {          synchronized (TaskQueue.tasks_running) {              return TaskQueue.tasks_running.size();          }      }      //设置最大任务数      public static void setThreadMaxNum(int num) {          TaskQueue.ThreadMaxNum = num<1?1:num>100?100:num;      }        // 线程锁 如果等待队列的任务数不为空,并且当前线程数字少于最大线程数      public static boolean taskRun() {          synchronized (tasks_wait) {              synchronized (tasks_running) {                  return !tasks_wait.isEmpty()                          && tasks_running.size() < ThreadMaxNum;              }          }      }      //开启新线程      public void run() {          // 线程锁 如果等待队列的任务数不为空,并且当前线程数字少于最大线程数          Task newTask;          while((newTask=getWaittingTask())!=null)          {              System.err.println(”开启新线程处理一个新任务,ID:”+newTask.getTaskID());              newTask.setOnSystemFinishListen(runnable);              newTask.threadRun();              newTask=null;          }               }             //递归 避免新开线程   唤醒等待中的任务 但此方案会造成java.lang.StackOverflowError       void notifyWaitingTask()      {          Task newTask;          while((newTask=getWaittingTask())!=null)          {              System.err.println(”唤醒旧线程处理一个新任务,ID:”+newTask.getTaskID());              newTask.setOnSystemFinishListen(runnable);              newTask.run();              newTask=null;          }                }            private  Task getWaittingTask()      {          Task t=null;          //测试          while (isRun && taskRun()) {              // 添加带执行中动态数组中              synchronized (tasks_wait) {                  // 从等待任务的队列中获取并移除此列表的头(第一个元素)                  t = tasks_wait.poll();                  // 如果h为空则从队列重新取对象或者任务绑定的状态变化了                  if (t == null || t.status == TaskStatus.without) {                      System.out.println(”任务取消 编号” + t!=null?String.valueOf(t.getTaskID()):“空任务”);                      continue;                  }              }              synchronized (tasks_running) {                  tasks_running.add(t);              }              System.out.println( ”正在执行任务数” + tasks_running.size() + “/上限”                      + ThreadMaxNum);                return t;          }          return t;      }              @Override      public void OnSystemFinish(Task t, Object data) {          // TODO Auto-generated method stub          synchronized (tasks_running) {              // 从处理中的动态数组中移除此任务              tasks_running.remove(t);              System.out.println( ”执行队列中移除任务taskid=” + t.taskID);              // 通知执行后续未处理的任务              System.out.println(”正在执行任务数” + tasks_running.size() + “/上限”                      + ThreadMaxNum);                           // 移除此名字映射              if (t.getSingletonName() != null) {                  Task.getNameTask().remove(t.getSingletonName());              }          }                  }    }   package demo2; import java.util.AbstractCollection; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.Random; import demo2.Task.OnSystemFinishListen; import demo2.Task.TaskStatus; public class TaskQueue implements Runnable, OnSystemFinishListen { static String debug = "TaskQueue"; @SuppressWarnings("unchecked") // 在等待的任务队列 static LinkedList<Task> tasks_wait = new LinkedList<Task>(); public static class TaskQueueExpection extends Exception{ TaskQueueExpection(String detailMessage) { super(detailMessage); // TODO Auto-generated constructor stub } }; // 正在执行的任务 static ArrayList<Task> tasks_running = new ArrayList<Task>(); // 是否持续运行 public static boolean isRun=true; // runnable保证线程安全 private static TaskQueue runnable = new TaskQueue();; // 最大线程数 static int ThreadMaxNum = 1; public static TaskQueue getRunnable() { return runnable; } // 如果队列线程为空或者停止则重新开启 public static void serivesRun() { // TODO Auto-generated method stub boolean isCanSeriver=false; synchronized (tasks_running) { isCanSeriver=tasks_running.size() < ThreadMaxNum; } runnable.run(); } //获取正在执行的任务数 public static int getRunningTaskCount() { synchronized (TaskQueue.tasks_running) { return TaskQueue.tasks_running.size(); } } //设置最大任务数 public static void setThreadMaxNum(int num) { TaskQueue.ThreadMaxNum = num<1?1:num>100?100:num; } // 线程锁 如果等待队列的任务数不为空,并且当前线程数字少于最大线程数 public static boolean taskRun() { synchronized (tasks_wait) { synchronized (tasks_running) { return !tasks_wait.isEmpty() && tasks_running.size() < ThreadMaxNum; } } } //开启新线程 public void run() { // 线程锁 如果等待队列的任务数不为空,并且当前线程数字少于最大线程数 Task newTask; while((newTask=getWaittingTask())!=null) { System.err.println("开启新线程处理一个新任务,ID:"+newTask.getTaskID()); newTask.setOnSystemFinishListen(runnable); newTask.threadRun(); newTask=null; } } //递归 避免新开线程 唤醒等待中的任务 但此方案会造成java.lang.StackOverflowError void notifyWaitingTask() { Task newTask; while((newTask=getWaittingTask())!=null) { System.err.println("唤醒旧线程处理一个新任务,ID:"+newTask.getTaskID()); newTask.setOnSystemFinishListen(runnable); newTask.run(); newTask=null; } } private Task getWaittingTask() { Task t=null; //测试 while (isRun && taskRun()) { // 添加带执行中动态数组中 synchronized (tasks_wait) { // 从等待任务的队列中获取并移除此列表的头(第一个元素) t = tasks_wait.poll(); // 如果h为空则从队列重新取对象或者任务绑定的状态变化了 if (t == null || t.status == TaskStatus.without) { System.out.println("任务取消 编号" + t!=null?String.valueOf(t.getTaskID()):"空任务"); continue; } } synchronized (tasks_running) { tasks_running.add(t); } System.out.println( "正在执行任务数" + tasks_running.size() + "/上限" + ThreadMaxNum); return t; } return t; } @Override public void OnSystemFinish(Task t, Object data) { // TODO Auto-generated method stub synchronized (tasks_running) { // 从处理中的动态数组中移除此任务 tasks_running.remove(t); System.out.println( "执行队列中移除任务taskid=" + t.taskID); // 通知执行后续未处理的任务 System.out.println("正在执行任务数" + tasks_running.size() + "/上限" + ThreadMaxNum); // 移除此名字映射 if (t.getSingletonName() != null) { Task.getNameTask().remove(t.getSingletonName()); } } } } Demo代码下载地址

转载请注明原文地址: https://www.6miu.com/read-12312.html

最新回复(0)