android网络业务的封装与调度

12年前

手机客户端程序由于网络宽带的约束,尤其在GPRS网络环境下,大数据量的网络交互很大程度上降低应用的响应,影响用户体验。比如,如果做一个手机网盘客户端,在后台上传文件时(大数据量的交互),获取文件列表(命令类的交互)这个过程就显得太别慢。而我们的要求是希望这些命令类操作能尽快得到响应。

 

通常,在手机客户端,我们设计一个网络操作的管理器,来统一管理这些需要联网的操作。

具体做法是把网络操作封装成一个Command(或者说是Task),管理器实现特定的调度规则来调度运行这些Task。

这样做的好处至少有三:

一. 用Command封装了网络操作,使得这些操作与上传的业务分离,解除了强耦合。

二. 可以根据网络情况来确定来采用不同的调度规则,提高用户体验。

三. 重用,这些Task和TaskManager的代码在别的手机应用上基本上能照搬过去。

四. 扩展,当应用需要扩展新的业务时,只有扩展一个新的Command(或者说是Task),接受调度即可,易于扩展。

 

例子:

还是以上文提到的微盘为例,可以概括我们对管理器的设计要求有:

在Wifi网络环境下:

一:各种网络操作可以并行运行。

在GPRS网络环境下:

二:支持优先级抢占调度,命令类操作的优先级比数据传输类的优先级高,当命令类的Task(获取文件列表)提交后,打断数据传输的Task(如上传,下载),等命令类的任务运行完毕,再接着运行数据类任务(断点上传,下载)。

二:同一个优先级的任务可以并行运行,如多个命令一起在网络上传输。

 

实现思路:

TaskManager

1. TaskManager开辟一个后台线程进行调度工作。

2. 由于要支持多个优先级的抢占调度,我们需要两个队列来维护运行中的Task和等待中的Task。

3. 由于Task的调度是基于优先级的,我们可以使用优先级队列,运行队列采用PriorityQueue,等待队列使用PriorityBlockingQueue,当没有网络业务需要运行时,调度线程阻塞挂起,避免空转。

4. TaskManager设计为单一实例(单一模式)。

5. 每个Task被调度运行时,该Task被从等待队列移动运行队列,当Task执行完毕时,从运行队列删除,唤醒调度线程进行新的调度。

下面是简单的设计代码

public final class TaskEngine implements Runnable{

private PriorityQueue<Task> runningQueue;//运行的task队列

private PriorityBlockingQueue<Task> readyQueue;//就绪的task队列,准备接受调度的task列表

private final AtomicLong taskIdProducer = new AtomicLong(1);//Task Id生成器

private Object sheduleLock = new Object();//同步锁

private static TaskEngine instance;

public long addTask(BusinessObject bo){
  Task task = new Task(bo);
  long newTaskId = taskIdProducer.incrementAndGet();
  task.setTaskId(newTaskId);
  if(this.isWifiNetWork()){ //WIFI网络
  synchronized(sheduleLock){
  runningQueue.add(task);
  }
  new Thread(task).start();
  }else{ //GPRS网络
  if(readyQueue.offer(task)){ //task入就绪队列
  final ReentrantLock lock = this.lock;
  lock.lock();
  try{
  needSchedule.signal(); //唤醒调度线程重新调度
  }finally{
  lock.unlock();}
  }
}
return newTaskId;
}

public final void run(){//task调度逻辑
....
....
}

//挂起调度线程 当不需要调度时
private void waitUntilNeedSchedule() throws InterruptedException
{
.....
}
}

Task:

1. 对要执行的网络操作的封装。

2. Task执行完毕时,发Task结束信号,唤醒调度线程进行新的调度

3. Task要实现Comparable接口,才能让TaskManager的两个队列自动其包含的Task排序。

下面是简单的设计代码

public class Task implements Runnable,Comparable<Task>{

private long taskId;

private BusinessObject bo;//封装网络操作的业务对象的抽象父类,
//它封装了具体的Command,保证了业务扩展中Task的接口不变

@Override
public void run() {
  this.onTaskStart();
  this.bo.execute();
  this.onTaskEnd();
}

private voidonTaskStart()
{...}

public int getPriority()
{...}

public void setPriority(intpriority)
{...}

@Override
public int compareTo(Task object1) {
return this.getPriority()>object1.getPriority()?-1:1;
}
} 

 

小注意事项:

Android对PriorityQueue的实现和Jdk中的实现有点不一样。

(PriorityQueue.java在android中的代码路径是usr\dalvik\libcore\luni\src\main\java\java\util)

PriorityQueue.remove(Object o) ;//从PriorityQueue中删除一个元素。

对于完成这个删除操作android和jdk都是分两个过程实现,一,找出待删除元素的索引index,二,删除index所在元素。

在JDK中,是通过调用元素的equals方法来找到待删除元素的索引,

private int indexOf(Object o) {
if (o != null) {
for (int i = 0; i < size; i++)
if (o.equals(queue[i]))
return i;
}
return -1;
}

在android中,是间接调用元素的compareTo方法判断结果是否为0来找到待删除元素的索引,

int targetIndex;
for (targetIndex = 0; targetIndex < size; targetIndex++) {
if (0 == this.compare((E) o, elements[targetIndex])) {
break;
}
}

private int compare(E o1, E o2) {
if (null != comparator) {
return comparator.compare(o1, o2);
}
return ((Comparable<? super E>) o1).compareTo(o2);
}

所以为了Task能在执行完毕时从PriorityQueue找到这个Task并删除之,需要在compareTo方法里在优先级相等时

返回0。

@Override
public int compareTo(Task object1) {
if(this.getPriority()==object1.getPriority())
return 0;
return this.getPriority()>object1.getPriority()?-1:1;
}

当是这样运行PriorityQueue.remove(Object o) 逻辑上只能删除PriorityQueue里第一个优先级与被删除的元素

优先级相等的元素(可能是待删除的元素也可能不是),有误删的可能,需要做如下修改:

@Override
public int compareTo(Task object1) {
if(this.getPriority()==object1.getPriority() && this.equals(object1))
return 0;
return this.getPriority()>object1.getPriority()?-1:1;
}

这样才能正确执行remove(Object o),删除指定的对象o。

 

个人觉得android这样设计使得remove(Object o)复杂化了,不然JDK中那么简洁。语义上也不是那么好懂了,

因为按通常理解,equals是判断两个对象是否相等,compareTo可能是对象的某个属性的比较(类别数据库中的order by),

而现在执行PriorityQueue.remove(Object o),这个对象o明明在容器PriorityQueue中,却删除不了,除非去翻看android中PriorityQueue的实现代码,然后重写compareTo这个方法。这样的API使用时比较容易出错,应该不符号良好的API设计规范

吧。

完整的代码实现如下:

* @类名:TaskEngine    * @创建:baiyingjun (devXiaobai@gmail.com)    * @创建日期:2011-7-7    * @说明:task调度引擎    ***************************************************/   public final class TaskEngine implements Runnable{          private static final String TAG=Log.makeTag(TaskEngine.class);              private PriorityQueue<Task> runningQueue;//运行的task队列              private PriorityBlockingQueue<Task> readyQueue;//就绪的task队列,准备接受调度的task列表              private final AtomicLong taskIdProducer = new AtomicLong(1);              private Object sheduleLock = new Object();//调度锁              private static TaskEngine instance;              private final ReentrantLock lock = new ReentrantLock(true);               private final Condition needSchedule = lock.newCondition();              private Task currentTask;//准备接受调度的task              private Context mAppContext;              /**        * add BusinessObject to taskEngine   */       public long addTask(BusinessObject bo) throws NetworkNotConnectException{           Task task = new Task(bo);           long newTaskId = taskIdProducer.incrementAndGet();           task.setTaskId(newTaskId);           if(Log.DBG){               Log.d(TAG, "Add task with task id "+newTaskId+", priority "+task.getPriority());           }                      if(this.isWifiNetWork()){ //WIFI网络               synchronized(sheduleLock){                   runningQueue.add(task);               }               new Thread(task).start();           }else{ //GPRS网络               if(readyQueue.offer(task)){ //task入就绪队列                   if(Log.DBG)                       Log.d(TAG, "add task " +task.bo.methodName+" "+task.taskId+" to ready queue");                   final ReentrantLock lock = this.lock;                   lock.lock();                   try{                       needSchedule.signal(); //唤醒调度线程重新调度                   }finally{                       lock.unlock();                   }               }               //schedule();           }           return newTaskId;       }              private TaskEngine(Context context){           mAppContext = context;           runningQueue = new PriorityQueue<Task>();           readyQueue = new PriorityBlockingQueue<Task>();           new Thread(this).start();           Log.i(TAG, "shedule thread working");       }              public synchronized static TaskEngine getInstance(Context context){           Context appContext = context.getApplicationContext();           if(instance==null || instance.mAppContext!=appContext){               instance=new TaskEngine(appContext);           }           return instance;       }              protected boolean isWifiNetWork() throws NetworkNotConnectException{           return NetworkManager.isWIFINetWork(mAppContext);       }              /**        * task调度逻辑   */       public final void run(){           Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);           while(true){               try {                   if(this.isWifiNetWork()){                       Task task = this.readyQueue.take();                       if(task !=null){                           synchronized(sheduleLock){                               runningQueue.add(task);                           }                           new Thread(task).start();                       }                   }                   else{//非wifi网络   //空就绪队列,空运行队列,等待直到有任务到来                       if(this.readyQueue.size()==0 && runningQueue.size()==0){                           currentTask=readyQueue.take();                           synchronized(sheduleLock){                               runningQueue.add(currentTask);                           }                           new Thread(currentTask).start();                       }                       //抢占式调度(就绪队列非空,运行队列优先级比就绪队列优先级低)                       else if(readyQueue.size()>0 &&                                this.readyQueue.element().getPriority()>=this.getMaxPriority()){                           currentTask = readyQueue.take();                           if(currentTask.getPriority()>this.getMaxPriority()){//暂停低优先级的任务运行                               synchronized(sheduleLock){                                   for(int i=0;i<runningQueue.size();i++){                                       Task toStopTask =runningQueue.remove();                                       //因为任务调度,将低优先级的任务暂时给冻结起来                                       toStopTask.setState(Task.STATE_FROST);                                       readyQueue.add(toStopTask);                                   }                               }                           }                           //运行被调度的任务                           runningQueue.add(currentTask);                           new Thread(currentTask).start();                       }else {//等高优先级的任务运行完毕                           waitUntilNeedSchedule();                       }                   }                              }catch (InterruptedException e) {                   Log.e(TAG, "Schedule error "+e.getMessage());               } catch (NetworkNotConnectException e) {                   // TODO Auto-generated catch block                   e.printStackTrace();               }           }       }          /*        * 等待,直到就绪队列里的最高优先级比当前运行优先级高,或者就绪队列为空时等待运行队列运行完毕   */       private void waitUntilNeedSchedule() throws InterruptedException{            final ReentrantLock lock = this.lock;            lock.lockInterruptibly();               try {                   try{                       while ((readyQueue.size()>0                                && readyQueue.element().getPriority()<getMaxPriority())//等高优先级的任务运行完毕                               || (readyQueue.size()==0 && runningQueue.size()>0) ){//或者等运行队列运行完毕                           if(Log.DBG)                               Log.d(TAG, "waiting sheduling........");                           needSchedule.await();                       }                   }                   catch (InterruptedException ie) {                       needSchedule.signal(); // propagate to non-interrupted thread                       throw ie;                   }               } finally {                   lock.unlock();               }       }              /**        * Hand the specified task ,such as pause,delete and so on   */       public boolean handTask(long taskId,int handType) {           Log.i(TAG, "set task`s state with taskId "+taskId);           synchronized(this.sheduleLock){               //如果在运行队列里,取消该任务               Iterator<Task> runningItor= this.runningQueue.iterator();               while(runningItor.hasNext()){                   Task task = runningItor.next();                   boolean b = task.equals(this);                   if(task.getTaskId()==taskId){                       runningQueue.remove(task);                       task.setState(handType);                       Log.i(TAG, "set runningQueue taskId = "+taskId + " state " + handType);                       return true;                   }               }               //如果在就绪队列里,删除               Iterator<Task> readyItor= this.readyQueue.iterator();               while(readyItor.hasNext()){                   Task task = readyItor.next();                   if(task.getTaskId()==taskId){   //                    readyQueue.remove(task);                       task.setState(handType);                       Log.i(TAG, "set readyQueue taskId = "+taskId + " state " + handType);                       return true;                   }               }               return false;           }       }       /***        * 获取运行队列任务的最高优先级   */       private int getMaxPriority(){           if(this.runningQueue==null || this.runningQueue.size()==0)               return -1;           else{               return this.runningQueue.element().getPriority();           }       }                  /***************************************************        * @类名:Task        * @创建:baiyingjun (devXiaobai@gmail.com)        * @创建日期:2011-7-7        * @说明:业务对象的包装成可运行实体        ***************************************************/       public class Task implements Runnable,Comparable<Task>{              //运行           public static final int STATE_INIT = -1;           //运行           public static final int STATE_RUN = 0;           //停止           public static final int STATE_STOP = 1;           //暂停           public static final int STATE_PAUSE = 2;           //取消           public static final int STATE_CANCLE = 3;           //冻结,如果在GPRS下,因为线程调度的时候低优先级的被放readyqueue里的时候,要把这个任务暂时给“冻结”起来           public static final int STATE_FROST = 4;                      private long taskId;                      private BusinessObject bo;                      public Task(){                          }                      public Task(BusinessObject bo){               this.bo=bo;           }                      @Override           public void run() {               this.onTaskStart();               this.bo.execute();               this.onTaskEnd();           }              private void onTaskStart(){               this.bo.setmState(STATE_RUN);           }                      public long getTaskId() {               return taskId;           }              public void setTaskId(long taskId) {               this.taskId = taskId;           }              public int getPriority() {               return this.bo.getPriority();           }              public void setPriority(int priority) {               this.bo.setPriority(priority);           }              /*             * compare task priority   */           @Override           public int compareTo(Task object1) {               if(this.getPriority()==object1.getPriority()&& this.equals(object1))                   return 0;               return this.getPriority()>object1.getPriority()?-1:1;           }                      public void setState(int state){//设置当前运行的task的state               this.bo.setmState(state);               Log.d(TAG, "Set task "+this.bo.methodName+" "+this.taskId + " state " + state);           }                      private void onTaskEnd(){//运行完毕后从taskengine运行队列里删除               if(Log.DBG){                   Log.d(TAG, "task "+this.bo.methodName+" "+taskId+" End");               }               if(this.bo.getmState() == STATE_FROST)//因为调度停止了该业务                   return;                              final ReentrantLock lock = TaskEngine.this.lock;               lock.lock();               try{                   boolean removed = runningQueue.remove(this); //remove from running queue                   assert removed;                   if(Log.DBG)                       Log.d(TAG, this.bo.methodName+" "+this.taskId+" remove from runningQueue");                   needSchedule.signal(); //唤醒调度线程重新调度               }finally{                   lock.unlock();               }           }              @Override           public boolean equals(Object o) {               // TODO Auto-generated method stub               if(this==o){                   return true;               }               if(o instanceof Task){                   return taskId==((Task)o).taskId;               }               return false;           }       }          }

 

拾漏补遗:

1.补充最初的设计类图(可能与代码不太一致,但能说明问题)android网络业务的封装与调度
2. BusinessObject的实现代码:

/***************************************************    * @类名:BusinessObject    * @创建:baiyingjun (devXiaobai@gmail.com)    * @创建日期:2011-7-6    * @说明:抽象的业务,扩展的网络业务要继承这个类并实现抽象方法execute()    ***************************************************/   public abstract class BusinessObject {              public static final int LOWEST_PRIORITY=1;//最低优先级              public static final int LOW_PRIORITY=2;//低优先级              public static final int NORMAL_PRIORITY=3;//正常优先级              public static final int HIGH_PRIORITY=4;//高优先级              public static final int HIGHEST_PRIORITY=5;//最高优先级              protected BusinessListener listnener;//运行业务的回调              protected Context mContext;              private long taskId;              protected Map<String,Object> params;//运行业务需要的参数              private int priority;              public int getPriority() {           return priority;       }          public void setPriority(int priority) {           this.priority = priority;       }          //设置回调       public void addBusinessListener(BusinessListener listnener){           this.listnener=listnener;       }              public long doBusiness() throws NetworkNotConnectException{           taskId= TaskEngine.getInstance(mContext).addTask(this);           return taskId;       }              public abstract void execute();   }
转自:http://www.cnblogs.com/devxiaobai/archive/2011/12/14/taskEngine.html