Streaming Data to HTTP using Akka Streams with Exponential Backoff on 429 Too Many Requests

HTTP/REST is probably the most used protocol to exchange data between different services, especially in today’s microservice world.

Before I even start, let me make it clear this project/example builds on this blog post from Colin Breck: https://blog.colinbreck.com/backoff-and-retry-error-handling-for-akka-streams/ .

I stumbled on his post while working on an integration project, but I wanted to build on it to include a couple more features, plus I wanted to put together a little github project that can be used to play with different features/settings, while also make it a little more general.

Working on integration projects using Akka Streams, I was looking for a way to send potentially lots of data streaming over HTTP, while at the same time slow down when the server starts to complain.

Now, the ‘proper’ way that a server tells the client to slow down is by sending a 429 Too Many Requests code.

In this case, the behavior we want is exponential backoff, that is, keep retrying (up to a maximum number of times) but an exponentially increasing amount of time (for instance, doubling it at every try).

This is well explained in Colin’s blog post here
https://blog.colinbreck.com/backoff-and-retry-error-handling-for-akka-streams/ , it is achieved by building a small stream for each request, that is retried on failure, by using akka streams’ built-in exponential backoff.

At first I was skeptical since constructing a stream for every request seemed expensive, however in my tests (ran on a local HTTP server) each successful request took only a few milliseconds, including parsing the response.

One improvement I needed however was being able to select which response should be retried and which one should just cause the stream to fail altogether.

In the above code, we create the stream, posting simple data (a random UUID string) and a simple response (a number). For the sake of simplicity, the server, on success, returns a 1. So, in the stream we calculate the sum of all the ones. The materialized value of the stream must then be equal to the number of events the source will send, in this case, 10, no matter how many times we retried them.

This is the model for our request and our response.

The magic is done by the getResponse function, which creates a mini-stream with backoff, as also described by Colin’s blog post:

Now, lots of going on here. We’re connecting to our own test server, posting a UUID. We are using a SuperPool which will handle HTTP connections, keepalive, etc, for us (see https://doc.akka.io/docs/akka-http/current/client-side/request-level.html#flow-based-variant).

We set all the parameters for exponential backoff (see https://doc.akka.io/docs/akka/2.5/stream/operators/RestartSource/withBackoff.html ) . The request will be tried a maximum of 10 times before failing. We are catching the failure and rethrowing it with our own error.

Now, the last part is the handleResponse function. You’ll likely need to change this function to fit your needs, since it depends on how the server you’re dealing with behaves. Some will actually be able to rate limit your requests by sending back 429 errors, but some other may not be sophisticated enough and just start failing with 500s.

It also depends on your requirements, there may be a condition (like a 500) that means you’re done streaming. The code below shows how to handle the two situations differently, by just applying exponential backoff on 429 and terminating the stream on 500:

It’s worth noticing that exponential backoff happens also when you can’t connect to the server, by handling first the Try, which will fail if we can’t connect to the server, and then handling different codes withing a Success, which means we got a response from the server, but the server may be returning an error.

To test it, we can create first a server that randomly returns either OK or Too Many Requests:

You’ll see output like the following:

Return ok
Return ok
Return error
Return ok
[WARN] [03/12/2019 13:12:25.770] [QuickStart-akka.actor.default-dispatcher-9] [RestartWithBackoffSource(akka://QuickStart)] Restarting graph due to failure. stack_trace:
com.nuvola_tech.akka.streams.Model$TooManyRequestsException$
....

Return ok
(Sum : 10,
Map(6ab726e5-78c9-44ed-830b-c94dff66c7c3 -> 1,
f931f6fa-7917-48ad-bb58-5a5ec53f65ee -> 1,
d3ece91a-2b5e-4868-bc43-f60ef1d657dd -> 2,
7e943c2f-fc08-44fa-8667-be1b0e7253d7 -> 1,
5496e659-9083-49cd-8864-88af82e49c20 -> 1,
0dc9a0ab-e0ac-45e9-be26-a1bfdf8dd5ce -> 2,
64486cfc-a1ac-4663-aed5-51bd547765bb -> 1,
b39521a2-7592-4508-bf6f-81a7cedd33a5 -> 1,
33e8e8ed-c7f9-49e4-bd1c-e6ce83e2150c -> 1,
56b6547b-a635-48cb-98aa-004de4abff65 -> 1))
Execution time avg: 98: List(46, 3, 3, 371, 3, 3, 4, 351)

all terminated

The (edited) output above shows us that two tries failed and were retried. This is also reflected on the execution times. The sum is correct (10).

If we set error probability = 1, the stream will fail after it retries the request up to its maximum number of times, with com.nuvola_tech.akka.streams.Model$StreamFailedAfterMaxRetriesExceptionForId

Last, if you set the Test Server to return InternalServerError, the stream will terminate at the first error:

Starting HTTP server
Return ok
Return ok
Return ok
Return error
[WARN] [03/12/2019 13:20:31.399] [QuickStart-akka.actor.default-dispatcher-10] [RestartWithBackoffSource(akka://QuickStart)] Restarting graph due to failure. stack_trace:
com.nuvola_tech.akka.streams.Model$DatabaseUnexpectedException
Return ok
(Sum : 4,
Map(85966d9d-bbb3-4560-a06f-784cc20f043f -> 1,
42503ccb-b194-4d21-a831-f92a9ca21e47 -> 1,
5a83b3ef-4816-4b77-8058-df7ee90c9bc1 -> 2,
5d716bcd-42c9-4a76-9503-b662d10d1455 -> 1))
Execution time avg: 337: List(351, 324)

Note that the sum is now 4, because only 4 calls were successful, but also that it was able to retry one before the kill switch (which works in parallel) was able to terminate the stream. This was ok in my case.

You can play with the code on github here: https://github.com/nuvolatech/akka-streams-http-example

Basic Authorization and htaccess style authentication on the Play! Framework an Silhouete

Silhouette is probably the best library to implement authentication and authorization within the Play Framework.

Git repo here : https://github.com/rcongiu/play-silhouette-basic-auth

It is very powerful, as you can manage a common identity from multiple providers, so you can have users logging into your site from google, facebook, JWT,  and may other methods.

It also allows you to fine tune both authentication – that a user has valid credentials – and authorizaton – that after being authentication, that user also has the right permissions to access a particular resource.

Its API is very elegant, as you can just change the type of your controller Action to SecuredAction.

It is however a pretty sizable framework, and it can be daunting as a beginner.  It needs you to set up identities, often on a database, and you have to build your user credentials management.

Sometimes however you may just want a very simple authentication, for example, when prototyping or writing a Proof of Concept (POC). You also may be tasked to replace an application running on Apache or NGINX that uses a htpasswd password file.

I looked around to find an example for implementing Basic Authentication with play and I was pretty surprised that I couldn’t quite find one.
Let me be clear here, Basic Authentication is not the best idea for security, but sometimes you just need something that does the job of protecting your app with a password, but you don’t have the time to deal with full blown user management.
As a side note, if you’re working with containers, you could use a nginx reverse proxy to manage authentication, but sometimes you can’t do that. In my case, the play application had specific logic to execute on authentication failure, so I couldn’t just delegate it to nginx.
Or as I just said, you may just want to be backward compatible with an older app that uses an htpasswd file.

In this case, you can use the code here as a template.

Setting up the Dependency Injection and the Environment

Silhouette relies heavily on DI to configure its objects. First of all you have to configure an Environment, where you declare what’s yours user model and what it’s going to authenticate the user.

From silhouette’s documentation, an environment may look like:

We need our User definition – it depends on your case, he User class will hold all the relevant User information you want to be passed to the controller.
In this example, I will keep it minimal and will only keep the username there. In more complex cases you may want the date of last login, the email, name, etc.

Our definition is again the simplest, this is our utils/auth/User.scala:

And in the Module file, where we define all the DI and build the environment, we have:

This is actually what does most of the magic. A few comments:

  1. We use @provides for the method that builds the environment
  2. We use a simple UserService that creates the User object from the loginInfo:
    As you can see, it creates a User object using the username entered.
  3. We created both an abstract UserService trait, and an implementation, ConfigUserServiceImpl
  4. We use DummyAuthenticator for Basic Authorization because the Authenticator is needed only when some state is saved between two HTTP requests (cookie,session), while in Basic Authentication every request is authenticated through a Request Authenticator.
  5. The request authenticator is passed in the environment as and dinamically injected with
  6. The password could be stored anywhere. I wrote two classes to get the password from a htpasswd file or from the play config. You bind one with either or
  7. You also have to pick the hashers, since the password is usually stored hashed. It is done in

Using htpasswd

You can use apache’s htpasswd to generate a password file to be used with these classes:

If you’re using htpasswd, you have to

and configure where htpasswd is in the play configuration setting security.htpasswd.file.

That class reads htpasswd file, retrieves the user’s hashed password and compares it to the hashed supplied password.
Note that only bcrypt is supported (no md5, crypto).

Hashing, or not hashing

Sometimes you want the password stored in cleartext, it is insecure, but it may be just a simple prototype, in that case use:

I can’t stress enough how insecure it is to use a cleartext password, but sometimes you may be using a combination of other systems and it may be your only choice. I list it here as a last resort kind of tool.

Hope you enjoyed this article about implementing the simplest yet quickest kind of authentication on a play app.
If you want to integrate it quickly check it out from github and just plug in the auth directory and add the bindings in Module.scala.
Git repo here : https://github.com/rcongiu/play-silhouette-basic-auth

Schema Evolution with Hive and Parquet using partitioned views

One cool feature of parquet is that is supports schema evolution. But let’s take a step back and discuss what schema evolution means.

As every DBA knows, data definitions can change with time: we may want to add a new column, remove one that is obsolete, or do more complex things, for instance break down one column into multiple columns, like breaking down a string address “1234 Spring Road, Springfield, MA” into its components (number, street, city, state).

Schema changes can, unfortunately, break some queries that the users may run.  If we for instance change the type or name of a column, any query that uses that column would fail.

It’s worth noticing that if we add a new column, old queries will still work. Already existing records will have NULL as value for the new column, unless we populate it ourselves.

Continue reading “Schema Evolution with Hive and Parquet using partitioned views”