2010-02-15 8 views
4

Dans un cadre que je développe, l'utilisateur peut choisir d'exécuter une certaine tâche chronophage en tâche de fond tout en faisant autre chose. Cette tâche calcule une série de résultats. En un moment donné, quand il/elle a besoin des résultats de la tâche de fond, il est acceptable d'attendre un peu plus de temps jusqu'à ce que:Comment faire un calcul temporel en Java et être capable de calculer tous les résultats jusqu'à maintenant, même quand le budget-temps se termine (time-out)?

  • a) une temporisation se produit (dans ce cas, l'utilisateur souhaite obtenir tous les résultats calculés jusqu'à présent, s'ils existent); ou

  • b) un nombre maximum ou les résultats calculés est atteint (fin normale),

selon la première éventualité. Le gotcha est: Même si un délai d'attente se produit, l'utilisateur veut toujours les résultats calculés jusqu'à présent.

J'ai essayé de le faire en utilisant Future<V>.get(long timeout, TimeUnit unit) et une Callable<V> classe dérivée de, mais il arrive que lorsqu'un TimeoutException se produit, cela signifie généralement la tâche a été prématurément terminé, donc pas de résultats sont disponibles. J'ai donc dû ajouter une méthode getPartialResults() (voir DiscoveryTask ci-dessous) et j'ai peur que cette utilisation soit trop contre-intuitive pour les utilisateurs potentiels.

invocation de découverte:

public Set<ResourceId> discover(Integer max, long timeout, TimeUnit unit) 
    throws DiscoveryException 
{ 
    DiscoveryTask task = new DiscoveryTask(max); 
    Future<Set<ResourceId>> future = taskExec.submit(task); 
    doSomethingElse(); 
    try { 
     return future.get(timeout, unit); 
    } catch (CancellationException e) { 
     LOG.debug("Discovery cancelled.", e); 
    } catch (ExecutionException e) { 
     throw new DiscoveryException("Discovery failed to execute.", e); 
    } catch (InterruptedException e) { 
     LOG.debug("Discovery interrupted.", e); 
    } catch (TimeoutException e) { 
     LOG.debug("Discovery time-out."); 
    } catch (Exception e) { 
     throw new DiscoveryException("Discovery failed unexpectedly.", e); 
    } finally { 
     // Harmless if task already completed 
     future.cancel(true); // interrupt if running 
    } 
    return task.getPartialResults(); // Give me what you have so far! 
} 

réalisation de découverte:

public class DiscoveryTask extends Callable<Set<ResourceId>> 
     implements DiscoveryListener 
{ 
    private final DiscoveryService discoveryService; 
    private final Set<ResourceId> results; 
    private final CountDownLatch doneSignal; 
    private final MaximumLimit counter; 
    //... 
    public DiscoveryTask(Integer maximum) { 
     this.discoveryService = ...; 
     this.results = Collections.synchronizedSet(new HashSet<ResourceId>()); 
     this.doneSignal = new CountDownLatch(1); 
     this.counter = new MaximumLimit(maximum); 
     //... 
    } 

    /** 
    * Gets the partial results even if the task was canceled or timed-out. 
    * 
    * @return The results discovered until now. 
    */ 
    public Set<ResourceId> getPartialResults() { 
     Set<ResourceId> partialResults = new HashSet<ResourceId>(); 
     synchronized (results) { 
      partialResults.addAll(results); 
     } 
     return Collections.unmodifiableSet(partialResults); 
    } 

    public Set<ResourceId> call() throws Exception { 
     try { 
      discoveryService.addDiscoveryListener(this); 
      discoveryService.getRemoteResources(); 
      // Wait... 
      doneSignal.await(); 
     } catch (InterruptedException consumed) { 
      LOG.debug("Discovery was interrupted."); 
     } catch (Exception e) { 
      throw new Exception(e); 
     } finally { 
      discoveryService.removeDiscoveryListener(this); 
     } 
     LOG.debug("Discovered {} resource(s).", results.size()); 
     return Collections.unmodifiableSet(results); 
    } 

    // DiscoveryListener interface 
    @Override 
    public void discoveryEvent(DiscoveryEvent de) { 
     if (counter.wasLimitReached()) { 
      LOG.debug("Ignored discovery event {}. " 
       + "Maximum limit of wanted resources was reached.", de); 
      return; 
     } 
     if (doneSignal.getCount() == 0) { 
      LOG.debug("Ignored discovery event {}. " 
       + "Discovery of resources was interrupted.", de); 
      return; 
     } 
     addToResults(de.getResourceId()); 
    } 

    private void addToResults(ResourceId id) { 
     if (counter.incrementUntilLimitReached()) { 
      results.add(id); 
     } else { 
      LOG.debug("Ignored resource {}. Maximum limit reached.",id); 
      doneSignal.countDown(); 
     } 
    } 
} 

Dans le chapitre 6 du livre Java Concurrency dans la pratique de Brian Goetz et al, les auteurs montrent une solution pour une problème connexe, mais dans ce cas tous les résultats peuvent être calculés en parallèle, ce qui n'est pas mon cas. Pour être précis, mes résultats dépendent de sources externes, donc je n'ai aucun contrôle sur quand ils viennent. Mon utilisateur définit le nombre maximum de résultats souhaités avant d'invoquer l'exécution de la tâche, et une limite de temps maximum qu'elle a accepté d'attendre après qu'elle soit prête à obtenir les résultats.

C'est OK pour vous? Le feriez-vous différemment? Y a-t-il une meilleure approche?

+0

Avez-vous besoin d'informations supplémentaires concernant la solution suggérée? – yawn

Répondre

2

Passez un délai (plus court) à la tâche elle-même et faites-la revenir prématurément lorsqu'elle atteint ce "délai d'attente". Le type de résultat peut alors avoir un drapeau indiquant si le résultat est parfait ou non:

Future<Result> future = exec.submit(new Task(timeout*.9)); 
//if you get no result here then the task misbehaved, 
//i.e didn't obey the soft timeout. 
//This should be treated as a bug 
Result result = future.get(timeout); 
if (result.completedInTime()) { 
    doSomethingWith(result.getData());   
} else { 
    doSomethingElseWith(result.getData());  
} 
0

Si resulte individuelle vient « pas cher », puis faire votre routine générer des résultats et les ajouter à une file d'attente de sortie et voir combien de temps est gauche et l'a comparé à l'heure où les résultats actuels ont pris. Ne calculez le résultat suivant que si vous le pouvez dans le temps. Cela vous permettra de rester mono-thread, ce qui tend généralement à rendre votre code plus simple et donc moins sujette aux erreurs.

+0

Eh bien, je ne peux pas dire quand le prochain résultat sera disponible. C'est un système distribué, et les résultats sont basés sur les réponses des autres machines du réseau. Ils peuvent me répondre ou pas, le réseau peut échouer, etc. Le calcul basé sur leur réponse est, cependant, bon marché. – msugar

0

Quelque chose comme ceci:

public static class Consumer<T> { 

    private BlockingQueue<T> queue; 
    private T lastElement; 

    public Consumer(BlockingQueue<T> queue, T lastElement) { 
     this.queue = queue; 
     this.lastElement = lastElement; 
    } 

    public Collection<T> acquireResults(long timeout, TimeUnit timeoutTimeUnit) throws InterruptedException { 

     LinkedList<T> result = new LinkedList<T>(); 
     queue.drainTo(result); 

     if (result.getLast() == lastElement) 
      return result; 
     else 
      result.add(queue.poll(timeout, timeoutTimeUnit)); 

     result.removeLast(); 

     return result; 

    } 

} 

public static void main(String[] args) throws InterruptedException { 

    String lastElement = "_lastElement"; 

    BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); 
    Consumer<String> consumer = new Consumer<String>(queue, lastElement); 

    for (int i=0; i<100; i++) 
     queue.put(UUID.randomUUID().toString()); 

    System.out.println(consumer.acquireResults(5, TimeUnit.SECONDS)); 

    queue.put("foo"); 
    queue.put("bar"); 

    queue.put(lastElement); 

    System.out.println(consumer.acquireResults(5, TimeUnit.SECONDS)); 

} 

Vous voudrez probablement utiliser une enveloppe de résultat pour indiquer que certains résultat est en effet le dernier résultat au lieu d'utiliser une valeur magique.