2010-12-08 22 views
1

J'utilise des extensions réactives pour appeler une méthode asynchrone et je souhaite mettre en cache le résultat et le renvoyer pour les appels suivants à la méthode.Créer un objet IObservable et renvoyer immédiatement le résultat d'une opération asynchrone en mémoire cache

Comment puis-je créer une instance Observable, la renvoyer et fournir les données (cacheResult) requises par l'abonnement?

public IObservable<Bar> GetBars(int pageIndex, int pageSize) 
{ 
    var @params = new object[] { pageIndex, pageSize }; 
    var cachedResult = _cache.Get(@params); 
    if (cachedResult != null) 
    { 
// How do I create a Observable instance and return the 'cacheResult'... 
return ... 
    } 

    var observable = new BaseObservable<Bar>(); 
    _components.WithSsoToken(_configuration.SsoToken) 
     .Get(@params) 
     .Select(Map) 
     .Subscribe(c => 
        { 
          _cache.Add(@params, c); 
          observable.Publish(c); 
          observable.Completed(); 
        }, exception => 
        { 
         observable.Failed(exception); 
         observable.Completed(); 
        }); 

     return observable; 
} 

Répondre

3

Je crois que vous cherchez Observable.Return:

return Observable.Return((Bar)cachedResult); 

Sur une note sans rapport avec:

  • Il n'y a pas besoin de retourner un BaseObservable<T>. Vous devez renvoyer un Subject<T> car il fait ce que votre implémentation est en train de faire mais est thread-safe (vous devez également appeler .AsObservable() sur la valeur de retour à il ne peut pas être renvoyé).
  • Vous utilisez Do pour ajouter la valeur à la cache:

var observable = new Subject<Bar>(); 
_components.WithSsoToken(_configuration.SsoToken) 
    .Get(@params) 
    .Select(Map) 
    .Subscribe(c => 
    { 
     _cache.Add(@params, c); 
     observable.OnNext(c); 
     observable.OnCompleted(); 
    }, exception => 
    { 
     observable.OnError(exception); 
    }); 

return observable.AsObservable(); 
+0

merci pour la réponse et toutes les infos – AwkwardCoder

2

Idéalement, j'ai écrit une classe qui fait de ce modèle pour vous, vérifier:

https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ObservableAsyncMRUCache.cs

var cache = new ObservableAsyncMRUCache<int, int>(
    x => Observable.Return(x*10).Delay(1000) /* Return an IObservable here */, 
    100 /*items to cache*/, 
    5 /* max in-flight, important for web calls */ 
    ); 

IObservable<int> futureResult = cache.AsyncGet(10); 

futureResult.Subscribe(Console.WriteLine); 
>>> 100 

Certaines choses délicates qu'il gère correctement:

  • Il met en cache les derniers articles n et jette des objets qui ne sont pas utilisés
  • Il assure un maximum de n éléments sont en cours d'exécution à la time - Si vous ne le faites pas, vous pouvez facilement générer des milliers d'appels Web si le cache est vide
  • Si vous demandez le même article deux fois de suite, la première demande initiera une demande, et la seconde l'appel attendra le premier au lieu de générer une requête identique, de sorte que vous ne finirez pas par interroger les données de manière redondante.