Making a streaming API from scraped data using Clojure
I recently found myself having to play around with some stock
exchange data. The stock exchange in Nepal, unsurprisingly, doesn't
provide a data API so I had to scrape their website. The non-realtime
data isn't very interesting, just regular old scraping made a little
more tedious by the fact that whoever designed the website didn't
know about how to use HTML id
attributes.
Now, to the live trading data. For the live data, the website shows a ticker of stock prices, which I think is a really bad representation of the data. If you want to know at what price ZXY was traded at, you have to wait till the end of the ticker. If the ZXY stock was all you were interested in, you'd still have to bear with the rest of the ticker. And to get the actual live data, you have to hit refresh. This is kind of okay on TV, but having to do this on a computer is terrible. Computers are more interactive than TV sets and should be treated as such. Bret Victor has given a great talk titled "Stop Drawing Dead Fish" that conveys this in a much more articulated way. The talk is about art, but I think having data represented on a ticker is like drawing dead fish.
So, I got around to thinking about how to build a better interface for
the live trading data. To do that, I first had to build a streaming
API which pushes stock prices as the trades happen. And doing that
wasn't all that complicated, thanks to clojure.data/diff
, watches
and http-kit.
The first step is to pull in the page and scrape out the ticker data to get a map of the latest trades for each company like this:
{"ABC" {:price 100 :volume 12} "FOO" {:price 432 :volume 22} "BAR" {:price 94 :volume 34}}
I used laser for the scraping- you could use that; Enlive seems to be great too. I won't go into the details of the scraping.
Store this into an atom, lets call this atom current-prices
. After,
say, 5 seconds when we scrape again, new trades will have happened and
the map we get will be different than the one above:
{"ABC" {:price 100 :volume 12} "FOO" {:price 434 ;; this has changed :volume 300} "BAR" {:price 90 ;; this too :volume 25}}
Since we called our atom current-prices
, it would be sensible to
reset!
it now to hold the second, more recent map of trading data. Its
nice that we now have the trading data in a Clojure data structure
but note that reset!
-ing our atom is really just the equivalent of
refreshing our browser- we aren't done yet.
Now, Clojure comes with a handy function called diff
which is in
the clojure.data
namespace. Here's how it works:
(require 'clojure.data) (clojure.data/diff {:a 42 :b "foo"} {:a 43 :b "foo"}) ;; ({:a 42} {:a 43} {:b "foo"})
The diff
function tells how one data structure varies from another.
The first map shows the key-value pairs that exist in the first map
but not in the second; the second map shows the pairs existent in
only the second map. And the third map shows the pairs that exist in
both of the maps.
diff
works on seqs too, but we won't bother with that right now.
Let's see what we get when we diff the older and newer versions of
our current-prices
atom:
(clojure.data/diff {"ABC" {:price 100 :volume 12} "FOO" {:price 432 :volume 22} "BAR" {:price 94 :volume 34}} {"ABC" {:price 100 :volume 12} "FOO" {:price 434 ;; this has changed :volume 300} "BAR" {:price 90 ;; this too :volume 25}}) ;; ({"FOO" {:price 432, :volume 22}, "BAR" {:price 94, :volume 34}} ;; {"FOO" {:price 434, :volume 300}, "BAR" {:price 90, :volume 25}} ;; {"ABC" {:price 100, :volume 12}})
Great. This is telling us that no trade happened for ABC. For FOO and BAR this is showing the older and newer trading data.
Now, lets add a watch
to our current-prices
atom, so that whenever
we pull in new data, the watch function finds out the stocks for
which new trades happened and pushes its prices to the appropriate
clients.
(add-watch current-prices :send (fn [key identity old new] (let [diff (clojure.data/diff old new) new-trades (second diff)] (doseq [client @clients trade new-trades] (send! client (str (key trade) " traded for " (:price (val trade))))))))
Every time the current-prices
atom is reset!
or swap!
-ed, the
function above gets called.
Here we're simply sending all our clients a string. In practice, you'd
probably pass JSON or EDN to only those clients who are interested in
a specific company. The send!
function is from http-kit which has a
unified API for WebSockets, HTTP long polling and streaming. I wrote
about using Websockets with http-kit in a previous post.
And that’s it. We have now built a streaming API using just a watch
function and clojure.data/diff
. I think that’s pretty cool.