2009-08-12 15 views
1

Je suis en cours d'exécution dans un problème où la lecture d'un HttpResponseStream échoue parce que le StreamReader que j'enroulant autour de lit plus rapidement que le flux de réponse obtient la réponse réelle. Je récupérer un fichier de taille raisonnable petit (environ 60k), mais le Parser qui traite la réponse en un objet réel échoue parce qu'il frappe un caractère inattendu (code 65535), qui par expérience, je sais être le caractère produit lorsque vous lisez un StreamReader et il n'y a pas d'autres caractères disponibles.lecture d'un HttpResponseStream échoue

Pour mémoire, je sais que le contenu étant retourné est valide et analyser correctement car la défaillance se produit à différents points dans le fichier chaque fois que je lance le code. C'est la ligne parser.Load() dans la suite où elle échoue.

est-il un moyen d'assurer que je l'ai lu tout le contenu avant d'essayer de l'analyser à court de copier le flux de réponse dans un MemoryStream ou une chaîne puis de le traiter?

/// <summary> 
    /// Makes a Query where the expected Result is an RDF Graph ie. CONSTRUCT and DESCRIBE Queries 
    /// </summary> 
    /// <param name="sparqlQuery">SPARQL Query String</param> 
    /// <returns>RDF Graph</returns> 
    public Graph QueryWithResultGraph(String sparqlQuery) 
    { 
     try 
     { 
      //Build the Query URI 
      StringBuilder queryUri = new StringBuilder(); 
      queryUri.Append(this._endpoint.ToString()); 
      queryUri.Append("?query="); 
      queryUri.Append(Uri.EscapeDataString(sparqlQuery)); 

      if (!this._defaultGraphUri.Equals(String.Empty)) 
      { 
       queryUri.Append("&default-graph-uri="); 
       queryUri.Append(Uri.EscapeUriString(this._defaultGraphUri)); 
      } 

      //Make the Query via HTTP 
      HttpWebResponse httpResponse = this.DoQuery(new Uri(queryUri.ToString()),false); 

      //Set up an Empty Graph ready 
      Graph g = new Graph(); 
      g.BaseURI = this._endpoint; 

      //Parse into a Graph based on Content Type 
      String ctype = httpResponse.ContentType; 
      IRDFReader parser = MIMETypesHelper.GetParser(ctype); 
      parser.Load(g, new StreamReader(httpResponse.GetResponseStream())); 

      return g; 
     } 
     catch (UriFormatException uriEx) 
     { 
      //URI Format Invalid 
      throw new Exception("The format of the URI was invalid", uriEx); 
     } 
     catch (WebException webEx) 
     { 
      //Some sort of HTTP Error occurred 
      throw new Exception("A HTTP Error occurred", webEx); 
     } 
     catch (RDFException) 
     { 
      //Some problem with the RDF or Parsing thereof 
      throw; 
     } 
     catch (Exception) 
     { 
      //Other Exception 
      throw; 
     } 
    } 

    /// <summary> 
    /// Internal Helper Method which executes the HTTP Requests against the SPARQL Endpoint 
    /// </summary> 
    /// <param name="target">URI to make Request to</param> 
    /// <param name="sparqlOnly">Indicates if only SPARQL Result Sets should be accepted</param> 
    /// <returns>HTTP Response</returns> 
    private HttpWebResponse DoQuery(Uri target, bool sparqlOnly) 
    { 
     //Expect errors in this function to be handled by the calling function 

     //Set-up the Request 
     HttpWebRequest httpRequest; 
     HttpWebResponse httpResponse; 
     httpRequest = (HttpWebRequest)WebRequest.Create(target); 

     //Use HTTP GET/POST according to user set preference 
     if (!sparqlOnly) 
     { 
      httpRequest.Accept = MIMETypesHelper.HTTPAcceptHeader(); 
      //For the time being drop the application/json as this doesn't play nice with Virtuoso 
      httpRequest.Accept = httpRequest.Accept.Replace("," + MIMETypesHelper.JSON[0], String.Empty); 
     } 
     else 
     { 
      httpRequest.Accept = MIMETypesHelper.HTTPSPARQLAcceptHeader(); 
     } 
     httpRequest.Method = this._httpMode; 
     httpRequest.Timeout = this._timeout; 

     //HTTP Debugging 
     if (Options.HTTPDebugging) 
     { 
      Tools.HTTPDebugRequest(httpRequest); 
     } 

     httpResponse = (HttpWebResponse)httpRequest.GetResponse(); 

     //HTTP Debugging 
     if (Options.HTTPDebugging) 
     { 
      Tools.HTTPDebugResponse(httpResponse); 
     } 

     return httpResponse; 
    } 

Modifier

Pour clarifier ce que je l'ai déjà dit c'est pas un bug dans le Parser, c'est une question de la StreamReader lecture plus rapide que le flux de réponse fournit des données. Je peux contourner ce problème en procédant comme suit, mais je voudrais des suggestions de solutions meilleures ou plus élégantes:

  //Parse into a Graph based on Content Type 
      String ctype = httpResponse.ContentType; 
      IRDFReader parser = MIMETypesHelper.GetParser(ctype); 
      Stream response = httpResponse.GetResponseStream(); 
      MemoryStream temp = new MemoryStream(); 
      Tools.StreamCopy(response, temp); 
      response.Close(); 
      temp.Seek(0, SeekOrigin.Begin); 
      parser.Load(g, new StreamReader(temp)); 

Modifier 2

classe BlockingStreamReader selon la suggestion de Eamon:

/// <summary> 
/// A wrapper to a Stream which does all its Read() and Peek() calls using ReadBlock() to handle slow underlying streams (eg Network Streams) 
/// </summary> 
public sealed class BlockingStreamReader : StreamReader 
{ 
    private bool _peeked = false; 
    private int _peekChar = -1; 

    public BlockingStreamReader(StreamReader reader) : base(reader.BaseStream) { } 

    public BlockingStreamReader(Stream stream) : base(stream) { } 

    public override int Read() 
    { 
     if (this._peeked) 
     { 
      this._peeked = false; 
      return this._peekChar; 
     } 
     else 
     { 
      if (this.EndOfStream) return -1; 

      char[] cs = new char[1]; 
      base.ReadBlock(cs, 0, 1); 

      return cs[0]; 
     } 
    } 

    public override int Peek() 
    { 
     if (this._peeked) 
     { 
      return this._peekChar; 
     } 
     else 
     { 
      if (this.EndOfStream) return -1; 

      this._peeked = true; 

      char[] cs = new char[1]; 
      base.ReadBlock(cs, 0, 1); 

      this._peekChar = cs[0]; 
      return this._peekChar; 
     } 
    } 

    public new bool EndOfStream 
    { 
     get 
     { 
      return (base.EndOfStream && !this._peeked); 
     } 
    } 
} 

Modifier 3

Voici une solutio beaucoup améliorée n qui peut envelopper n'importe quel TextReader et fournir une propriété EndOfStream. Il utilise un tampon interne qui est rempli en utilisant ReadBlock() sur le TextReader enveloppé. Toutes les méthodes de lecture() du lecteur peut l'être définie en utilisant ce tampon, la taille de la mémoire tampon est configurable:

/// <summary> 
/// The BlockingTextReader is an implementation of a <see cref="TextReader">TextReader</see> designed to wrap other readers which may or may not have high latency. 
/// </summary> 
/// <remarks> 
/// <para> 
/// This is designed to avoid premature detection of end of input when the input has high latency and the consumer tries to read from the input faster than it can return data. All methods are defined by using an internal buffer which is filled using the <see cref="TextReader.ReadBlock">ReadBlock()</see> method of the underlying <see cref="TextReader">TextReader</see> 
/// </para> 
/// </remarks> 
public sealed class BlockingTextReader : TextReader 
{ 
    private char[] _buffer; 
    private int _pos = -1; 
    private int _bufferAmount = -1; 
    private bool _finished = false; 
    private TextReader _reader; 

    public const int DefaultBufferSize = 1024; 

    public BlockingTextReader(TextReader reader, int bufferSize) 
    { 
     if (reader == null) throw new ArgumentNullException("reader", "Cannot read from a null TextReader"); 
     if (bufferSize < 1) throw new ArgumentException("bufferSize must be >= 1", "bufferSize"); 
     this._reader = reader; 
     this._buffer = new char[bufferSize]; 
    } 

    public BlockingTextReader(TextReader reader) 
     : this(reader, DefaultBufferSize) { } 

    public BlockingTextReader(Stream input, int bufferSize) 
     : this(new StreamReader(input), bufferSize) { } 

    public BlockingTextReader(Stream input) 
     : this(new StreamReader(input)) { } 

    private void FillBuffer() 
    { 
     this._pos = -1; 
     if (this._finished) 
     { 
      this._bufferAmount = 0; 
     } 
     else 
     { 
      this._bufferAmount = this._reader.ReadBlock(this._buffer, 0, this._buffer.Length); 
      if (this._bufferAmount == 0 || this._bufferAmount < this._buffer.Length) this._finished = true; 
     } 
    } 

    public override int ReadBlock(char[] buffer, int index, int count) 
    { 
     if (count == 0) return 0; 
     if (buffer == null) throw new ArgumentNullException("buffer"); 
     if (index < 0) throw new ArgumentException("index", "Index must be >= 0"); 
     if (count < 0) throw new ArgumentException("count", "Count must be >= 0"); 
     if ((buffer.Length - index) < count) throw new ArgumentException("Buffer too small"); 

     if (this._bufferAmount == -1 || this._pos >= this._bufferAmount) 
     { 
      if (!this._finished) 
      { 
       this.FillBuffer(); 
       if (this.EndOfStream) return 0; 
      } 
      else 
      { 
       return 0; 
      } 
     } 

     this._pos = Math.Max(0, this._pos); 
     if (count <= this._bufferAmount - this._pos) 
     { 
      //If we have sufficient things buffered to fufill the request just copy the relevant stuff across 
      Array.Copy(this._buffer, this._pos, buffer, index, count); 
      this._pos += count; 
      return count; 
     } 
     else 
     { 
      int copied = 0; 
      while (copied < count) 
      { 
       int available = this._bufferAmount - this._pos; 
       if (count < copied + available) 
       { 
        //We can finish fufilling this request this round 
        int toCopy = Math.Min(available, count - copied); 
        Array.Copy(this._buffer, this._pos, buffer, index + copied, toCopy); 
        copied += toCopy; 
        this._pos += toCopy; 
        return copied; 
       } 
       else 
       { 
        //Copy everything we currently have available 
        Array.Copy(this._buffer, this._pos, buffer, index + copied, available); 
        copied += available; 
        this._pos = this._bufferAmount; 

        if (!this._finished) 
        { 
         //If we haven't reached the end of the input refill our buffer and continue 
         this.FillBuffer(); 
         if (this.EndOfStream) return copied; 
         this._pos = 0; 
        } 
        else 
        { 
         //Otherwise we have reached the end of the input so just return what we've managed to copy 
         return copied; 
        } 
       } 
      } 
      return copied; 
     } 
    } 

    public override int Read(char[] buffer, int index, int count) 
    { 
     return this.ReadBlock(buffer, index, count); 
    } 

    public override int Read() 
    { 
     if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1) 
     { 
      if (!this._finished) 
      { 
       this.FillBuffer(); 
       if (this.EndOfStream) return -1; 
      } 
      else 
      { 
       return -1; 
      } 
     } 

     this._pos++; 
     return (int)this._buffer[this._pos]; 
    } 

    public override int Peek() 
    { 
     if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1) 
     { 
      if (!this._finished) 
      { 
       this.FillBuffer(); 
       if (this.EndOfStream) return -1; 
      } 
      else 
      { 
       return -1; 
      } 
     } 

     return (int)this._buffer[this._pos + 1]; 
    } 

    public bool EndOfStream 
    { 
     get 
     { 
      return this._finished && (this._pos >= this._bufferAmount - 1); 
     } 
    } 

    public override void Close() 
    { 
     this._reader.Close(); 
    } 

    protected override void Dispose(bool disposing) 
    { 
     this.Close(); 
     this._reader.Dispose(); 
     base.Dispose(disposing); 
    } 
} 
+0

Ainsi, neuf ans après son introduction, vous êtes la première personne au monde à trouver que 'StreamReader' lit plus vite que le 'Stream' qu'il est censé lire, n'est-ce pas? –

+0

Non, je viens de me demander si quelqu'un avait des solutions plus élégantes que ce qui précède – RobV

+0

Solutions à quoi? Le 'StreamReader' ne lit pas plus vite que le' Stream'. –

Répondre

1

Sans connaître les détails de l'analyseur que vous utilisez, je ne peux que deviner le bug, mais il y a Êtes-vous conscient du fait que les Streams et les TextReaders peuvent lire moins d'octets/caractères que ce qui est requis?

Etes-vous conscient du fait que Streams et TextReaders peuvent lire moins d'octets/caractères que nécessaire?

En particulier, TextReader.Read (char [] tampon, int index, int count) de docs disent:

Valeur de retour

Type: System .. :: Int32.

Le nombre de caractères qui ont été lus. Le nombre sera inférieur à ou égal à compter, selon que les données sont disponibles dans le flux. Cette méthode renvoie zéro si elle est appelée lorsqu'il ne reste plus de caractères à lire.

Emphasis mine.

Par exemple, si vous appelez lecteur.Lire (tampon, 0, 100) ne peut pas supposer que 100 caractères ont été lus.

Éditer: Il est très probable que l'analyseur assume cela; et ceci explique votre comportement observé: si vous mettez complètement en cache le flux dans un MemoryStream, il y aura toujours suffisamment de caractères pour remplir la requête - mais si vous ne le faites pas, l'analyseur recevra moins de caractères que prévu à des moments imprévisibles. le flux sous-jacent est "lent".

Edit2: Vous pouvez corriger votre bug en remplaçant toutes les instances de TextReader.Read() dans l'analyseur avec TextReader.ReadBlock().

+0

Je le savais, je ne suis pas sûr que cela compte en tant que bogue dans StreamReader, il semble que ce soit plutôt la façon dont il se comporte lorsque le flux sous-jacent peut être lent. L'analyseur n'est pas le problème, si j'utilise le second fragment de code (ajouté à la question d'origine) qui lit le flux entier avant de l'analyser analyse correctement – RobV

+0

Ce _is_ un bogue dans l'analyseur avec une très forte probabilité. C'est par conception que si le flux sous-jacent est "lent", streamreader renvoie moins de caractères que demandé. L'utilisation d'un flux de mémoire en tant que flux sous-jacent permet à streamreader de toujours renvoyer le nombre de caractères complet - en travaillant autour du bogue dans l'analyseur. –

+0

L'analyseur utilise un tokeniser sous-jacent qui lit caractère par caractère en utilisant la méthode Read() donc vous avez probablement raison, je vais tester la chose ReadBlock() et accepter votre réponse si cela s'avère pour résoudre le problème – RobV

0

Pour soutenir un scénario de lecture de blocage, plutôt que le sous-classement StreamReader, vous pouvez sous-classe TextReader: cela évite des problèmes avec EndOfStream, et cela signifie que vous pouvez faire tout blocage du lecteur - pas seulement StreamReader s:

public sealed class BlockingReader : TextReader 
{ 
    bool hasPeeked; 
    int peekChar; 
    readonly TextReader reader; 

    public BlockingReader(TextReader reader) { this.reader = reader; } 

    public override int Read() 
    { 
     if (!hasPeeked) 
      return reader.Read(); 
     hasPeeked = false; 
     return peekChar; 
    } 

    public override int Peek() 
    { 
     if (!hasPeeked) 
     { 
      peekChar = reader.Read(); 
      hasPeeked = true; 
     } 
     return peekChar; 
    } 

    public override int Read(char[] buffer, int index, int count) 
    { 
     if (buffer == null) 
      throw new ArgumentNullException("buffer"); 
     if (index < 0) 
      throw new ArgumentOutOfRangeException("index"); 
     if (count < 0) 
      throw new ArgumentOutOfRangeException("count"); 
     if ((buffer.Length - index) < count) 
      throw new ArgumentException("Buffer too small"); 

     int peekCharsRead = 0; 
     if (hasPeeked) 
     { 
      buffer[index] = (char)peekChar; 
      hasPeeked = false; 
      index++; 
      count--; 
      peekCharsRead++; 
     } 

     return peekCharsRead + reader.ReadBlock(buffer, index, count); 
    } 

    protected override void Dispose(bool disposing) 
    { 
     try 
     { 
      if (disposing) 
       reader.Dispose(); 
     } 
     finally 
     { 
      base.Dispose(disposing); 
     } 
    } 
}