My TransientFaultHandling utilitary classes for DocumentDB

Keluro uses extensively DocumentDB for data persistence. However, it’s extensive scaling capabilities come with a price. Indeed with your queries or your commands you may exceed the amout of request unit you are granted. In that case you will received a 429 error “Request rate too large” or a “DocumentException” if you use the .NET SDK. It is your responsability then to implement the retry policies to avoid such a failure and wait the proper amout of time before retrying.

Edit: look at the comment below. The release v1.8.0 of the .NET SDK proposes some settings options for these retry policies.

Some samples are provided by Microsoft on how to handle this 429 “Request too large error”, but they are concerning only commands, such as inserting or deleting a document, there is no sample on own to implement the retry policies for common queries. A Nuget package is also available: “Microsoft.Azure.Documents.Client.TransientFaultHandling” but even if integrating it is as quick as an eye blink, there is no logging capabilities. In my case it did not really resolve my exceeding RU problem, I even doubt that I made it work and the code is not opensource. Then, I decided to integrate the ideas from the samples in own utilitary classes on top of the DocumentDB .NET SDK.

The idea is similar to “TransientFaultHandling” package: to wrap the DocumentClient inside another class exposed only through an interface. By all accounts, it is a good thing to abstract the DocumentClient behind an interface for testability purposes. In our case this interface is named IDocumentClientWrapped.

public interface IDocumentClientWrapped : IDisposable
{
    /* Some command like methods*/
    Task DeleteDocumentAsync(Uri uri);

    Task CreateDocumentAsync(Uri uri, object obj);

    /* Some query like methods*/
    IRetryQueryable<T> CreateDocumentQuery<T>(Uri documentUri, FeedOptions feedOptions = null);

    IRetryQueryable<T> CreateDocumentQuery<T>(Uri documentCollectionUri, SqlQuerySpec sqlSpec);

    /* Copy here all other public method signature from DocumentClient but return IRetryQueryable instead of IOrderedQueryable or IQueryable */
}

Instead of returning an Queryable<T> instance as DocumentClient would do we return an IRetryQueryable<T>. This latter type, whose definition will follow, is also a wrapper on the IQueryable<T> instance returned by the DocumentDB client. However, this interface explicitely retries when the enumeration fails because of 429 request too large exception raised by the database engine, DocumentDB in our case.

public interface IRetryQueryable<T>
{
    IRetryQueryable<T> Where(Expression<Func<T, bool>> predicate);

    IDocumentQuery<T> AsDocumentQuery();

    IEnumerable<T> AsRetryEnumerable();

    IRetryQueryable<TResult> SelectMany<TResult>(Expression<Func<T, IEnumerable<TResult>>> predicate);

    IRetryQueryable<TResult> Select<TResult>(Expression<Func<T, TResult>> predicate);
}

In this interface we only expose the method extension methods that are actually supported by the “real” IQueryable<T> instance returned by DocumentDB: Select, SelectMany, Where etc. For example, at the time of the writing GroupBy is not supported. You would get an runtime exception if you used it directly on the IQueryable<T> instance returned by DocumentClient.

Now look at how we use these interfaces in the calling code.

IDocumentClientWrapped client = /* instanciation */;
IRetryQueryable<MyType> query = client.CreateDocumentQuery<MyType>(/* get usual uri using the UriFactory*/);
IEnumerable<string> = query.Where(c => c.Member1 == "SomeValue").Select(c=>c.StringMember2).AsRetryEnumerable();
/* manipulate your in memory enumeration as you wish*/

Independently of the retry policies classes and the “request too large errors”, let me emphasis that LINQ can be tricky here. Indeed, the piece of code above is completly different from this one:

IDocumentClientWrapped client = /* instanciation */;
IRetryQueryable<MyType> query = client.CreateDocumentQuery<MyType>(/* get usual uri using the UriFactory*/);
IEnumerable<string> = query.AsRetryEnumerable().Where(c => c.Member1 == "SomeValue").Select(c=>c.StringMember2);
/* manipulate your in memory enumeration as you wish*/

In this case the Where constraint is perfomed “in memory” by your .NET application server. It means that you have fetched all data from DocumentDB to your app server. If MyType contains a lot of data, then all have been transfered from DocumentDB to your application server and/or if the Where constraint filters a lot of documents you will probably have a bottleneck.

Let us get back to our problem. Now that we saw that having retry policy for a query means only calling AsRetryEnumerable() instead of AsEnumerable() let us jump to the implementation of thoses classes.

The idea is to use an IEnumerator that “retries” and use two utility method: ExecuteWithRetry,ExecuteWithRetryAsync. The former one for basic mono threaded calls while the latter is for the async/await context. Most of this code is verbose because it is only wrapping implementation. I hope it will be helpful for others.

public class DocumentClientWrapped : IDocumentClientWrapped
{
    private class RetriableEnumerable<T> : IEnumerable<T>
    {
        private readonly IEnumerable<T> _t;
        public RetriableEnumerable(IEnumerable<T> t)
        {
            _t = t;
        }

        public IEnumerator<T> GetEnumerator()
        {
            return new RetriableEnumerator<T>(ExecuteWithRetry(() => _t.GetEnumerator()));
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return this.GetEnumerator();
        }
    }

    private class RetriableEnumerator<T> : IEnumerator<T>
    {
        private IEnumerator<T> _t;
        public RetriableEnumerator(IEnumerator<T> t)
        {
            _t = t;
        }

        public T Current
        {
            get
            {
                return ExecuteWithRetry(()=> _t.Current);
            }
        }

        object IEnumerator.Current
        {
            get
            {
                return ExecuteWithRetry(() => _t.Current);
            }
        }

        public void Dispose()
        {
            _t.Dispose();
        }

        public bool MoveNext()
        {
            return ExecuteWithRetry(() => _t.MoveNext());
        }

        public void Reset()
        {
            _t.Reset();
        }
    }

    private class RetryQueryable<T> : IRetryQueryable<T>
    {
        private readonly IQueryable<T> _queryable;
        public RetryQueryable(IQueryable<T> queryable)
        {
            _queryable = queryable;
        }

        public IDocumentQuery<T> AsDocumentQuery()
        {
            return this._queryable.AsDocumentQuery();
        }

        public IEnumerable<T> AsRetryEnumerable()
        {
            return new RetriableEnumerable<T>(this._queryable.AsEnumerable());
        }

        public IRetryQueryable<T> Where(Expression<Func<T, bool>> predicate)
        {
            var queryable = this._queryable.Where(predicate);
            return new RetryQueryable<T>(queryable);
        }

        public IRetryQueryable<TResult> SelectMany<TResult>(Expression<Func<T, IEnumerable<TResult>>> predicate)
        {
            var queryable = this._queryable.SelectMany(predicate);
            return new RetryQueryable<TResult>(queryable);
        }

        public IRetryQueryable<TResult> Select<TResult>(Expression<Func<T, TResult>> predicate)
        {
            var queryable = this._queryable.Select(predicate);
            return new RetryQueryable<TResult>(queryable);
        }
    }


    private const int MaxRetryCount = 10;

    private static async Task<V> ExecuteWithRetriesAsync<V>(Func<Task<V>> function)
    {
        TimeSpan sleepTime = TimeSpan.FromSeconds(1.0);
        int count = 0;
        while (true)
        {

            try
            {
                return await function();
            }
            catch (DocumentClientException de)
            {
                if ((int)de.StatusCode != 429)
                {
                    throw;
                }
                if (++count > MaxRetryCount)
                {
                    throw new MaxRetryException(de, count);
                }
                Trace.TraceInformation("DocumentDB async retry count: {0} - retry in: {1} - RUs: {2}", ++count, sleepTime.TotalMilliseconds, de.RequestCharge);
                sleepTime = de.RetryAfter;
            }
            catch (AggregateException ae)
            {
                if (!(ae.InnerException is DocumentClientException))
                {
                    throw;
                }

                DocumentClientException de = (DocumentClientException)ae.InnerException;
                if ((int)de.StatusCode != 429)
                {
                    throw;
                }
                if (++count > MaxRetryCount)
                {
                    throw new MaxRetryException(de, count);
                }
                Trace.TraceInformation("DocumentDB async retry count: {0} - retry in: {1} - RUs: {2}", ++count, sleepTime.TotalMilliseconds, de.RequestCharge);
                sleepTime = de.RetryAfter;
            }
            await Task.Delay(sleepTime);
        }
    }

    private static V ExecuteWithRetry<V>(Func<V> function)
    {
        TimeSpan sleepTime = TimeSpan.FromSeconds(1.0);
        int count = 0;
        while (true)
        {
            try
            {
                return function();
            }
            catch (DocumentClientException de)
            {
                if ((int)de.StatusCode != 429)
                {
                    throw;
                }
                if (++count > MaxRetryCount)
                {
                    throw new MaxRetryException(de, count);
                }
                Trace.TraceInformation("DocumentDB sync retry count: {0} - retry in: {1} - RUs: {2}", ++count, sleepTime.TotalMilliseconds, de.RequestCharge);
                sleepTime = de.RetryAfter;
            }
            catch (AggregateException ae)
            {
                if (!(ae.InnerException is DocumentClientException))
                {
                    throw;
                }

                DocumentClientException de = (DocumentClientException)ae.InnerException;
                if ((int)de.StatusCode != 429)
                {
                    throw;
                }
                if (++count > MaxRetryCount)
                {
                    throw new MaxRetryException(de, count);
                }
                Trace.TraceInformation("DocumentDB sync retry count: {0} - retry in: {1} - RUs: {2}", ++count, sleepTime.TotalMilliseconds, de.RequestCharge);
                sleepTime = de.RetryAfter;
            }
            Thread.Sleep(sleepTime);
        }
    }

    private readonly DocumentClient _client;
    public DocumentClientWrapped(string endpointUrl, string authorizationKey)
    {
        _client = new DocumentClient(new Uri(endpointUrl), authorizationKey);
    }

    public async Task CreateDocumentAsync(Uri documentCollectionUri, object obj)
    {
        ResourceResponse<Document> response = await ExecuteWithRetriesAsync(()=> _client.CreateDocumentAsync(documentCollectionUri, obj));
        LogResponse<Document>("CreateDocumentAsync", response);/* Do something with the response if you want to use it */
    }

    public async Task DeleteDocumentAsync(Uri documentUri)
    {
        ResourceResponse<Document> response = await ExecuteWithRetriesAsync( () => _client.DeleteDocumentAsync(documentUri));
        LogResponse<Document>("DeleteDocumentAsync", response); /* Do something with the response if you want to use it */
    }

    public async Task ReplaceDocumentAsync(Uri documentCollectionUri, object document)
    {
        var response = await ExecuteWithRetriesAsync(()=>  _client.ReplaceDocumentAsync(documentCollectionUri, document));
        LogResponse<Document>("ReplaceDocumentAsync", response);
    }

    public IRetryQueryable<T> CreateDocumentQuery<T>(Uri documentCollectionUri, SqlQuerySpec sqlSpec)
    {
        var queryable = _client.CreateDocumentQuery<T>(documentCollectionUri, sqlSpec);
        return new RetryQueryable<T>(queryable);
    }
}

4 thoughts on “My TransientFaultHandling utilitary classes for DocumentDB

  1. Rajesh Nagpal

    Hi Benoit,

    Glad to hear that you use DocumentDB as your NoSQL data store. I went through your post and wanted to mention that we have built-in support for basic retry mechanism for 429s for all requests(query as well as commands) starting .NET SDK 1.8.0 release of DocumentDB.

    This is done by exposing a RetryOptions property on the ConnectionPolicy instance that gets passed to DocumentClient constructor. By default, all requests will be retried 9 times(so that you have 10 attempts for each request) and it will use the retryAfter response header to determine how much to wait between each request. There is a max wait time set to 30 sec for each request after which it will throw. Both these values(MaxRetryAttemptsOnThrottledRequests and MaxRetryWaitTimeInSeconds) can be overridden on the RetryOptions instance.

    Please give it a try and let us know if you have any feedback to further improve the retry mechanism.

    We will look into the tracing issue that you mentioned to see if there are better ways to expose the retry related information for each request.

    I’m from the DocumentDB Engineering team and feel free to reach us for any .NET SDK related issues in DocumentDB at https://github.com/Azure/azure-documentdb-dotnet/issues

    Regards,
    Rajesh

    Reply
    1. benoitpatra Post author

      Hi Rajesh,
      thank you very much for your insights. Indeed, my .NET SDK package as an update waiting: 1.8.0 version. I will definitely have a look at this ! Maybe you should mark the Nuget package “Microsoft.Azure.Documents.Client.TransientFaultHandling” as deprecated.

      Reply
      1. Rajesh Nagpal

        Hi Benoit,

        TransientFaultHandling package does some more stuff like retrying on some transient errors which the SDK doesn’t supports yet. Eventually, we plan to deprecate it once we have the parity.

        Keep me posted on your experience of using the build-in retry policy in .NET SDK.

        Regards,
        Rajesh

        Reply
        1. benoitpatra Post author

          I have upgraded to .NET SDK 1.8.0 and set the retry policies options in my pre-production environnement. It looks to work well but I did not conduct any measurement experiments yet.

          Reply

Leave a Reply

Your email address will not be published. Required fields are marked *