Publishing Avro to Kafka in R

I’ve written a lot about Kafka and how easy it is to use, but that’s not always the case. The generally recommended method for interacting with Kafka is via the librdkafka C/C++ library and whilst some popular programming languages have an abstraction of this, that’s not always the case. R is one of the languages which does not have a modern implementation, the one package being from 2015 and targeted against Kafka 0.8. There is another option though, Confluent’s REST Proxy provides an HTTP interface for producing and consuming messages.

So this weekend, inspired by discussions at Berlin Buzzwords, I wrote some code to use the Kafka REST API in R. My R skills aren’t particularly good, but it works for me.

Setup

This time I’m developing against the Landoop Kafka Development Environment which provides all of the community Confluent packages in one Docker image as well as some nice web interfaces. I’m also using mitmproxy which makes it a bit easier to debug bad client requests.

I’m a big fan of tmux, so each of the following commands are running in a separate pane.

docker run --rm --net=host landoop/fast-data-dev
mitmproxy -R http://localhost:8082

It takes a minute or so for the processes in the docker container to settle, and longer the first time for the download. After that you’ll be able to open http://localhost:3030 and see things running and healthchecks passing.

The Code

First we need a couple of libraries, httr to make the HTTP requests and rjson to encode to JSON.

library(httr)
library(rjson)

It’s now quite simple to make an HTTP request to the Kafka REST Proxy in order to get a list of topics that are available. This should show us that everything is working.

# This would ordinarily be on port 8082, but we're going through mitmproxy here for debugging
proxy_uri = "http://localhost:8080"

# Get a list of available topics
get_topics <- function() {
  uri <- paste(proxy_uri, "topics", sep="/")
  http_topics <- GET(uri)
  stop_for_status(http_topics)
  return(fromJSON(content(http_topics, "text")))
}
topics <- get_topics()
print(paste("There are", length(topics), "topics"))
print(topics)

You should see a list of topics, and you should also see the http request going through mitmproxy if you are using that.

Producing messages to Kafka is a little harder, we need to build up some JSON to define the schema and a record.

# Produce one Avro message
# This looks complicated, but really the hardest part was working out that you need to do toJSON on value_schema
generatedbody <- list(
  value_schema=toJSON(list(
    type="record",
    name="User",
    fields=list(
      list(
        name="name",
        type="string"
      )
    )
  )),
  records=list(
    list(
      value=list(
        name="testUser"
      )
    )
  )
)

Which generates the following JSON:

"{\"value_schema\":\"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"User\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\"}]}\",\"records\":[{\"value\":{\"name\":\"testUser\"}},{\"value\":{\"name\":\"anotherTestUser\"}}]}"

As a side note, this suggests you can send multiple records in a single HTTP request. This does work for a few small records, however it makes debugging a lot more complicated and means that a single bad value will fail the whole request.

Now lets actually send this to Kafka:

# actually send the request
response <- POST(url=paste(proxy_uri, "topics", "avrotest", sep="/"),
                 content_type("application/vnd.kafka.avro.v2+json"),
                 accept("application/vnd.kafka.v2+json"),
                 body=toJSON(generatedbody),
                 encode='json'
)
stop_for_status(response)
content(response, "text")
if (is.null(fromJSON(content(response, "text"))$offsets[[1]]$error)){
  print("OK")
}

You should get back a good response and be able to see this in the mitmproxy if you’re using it. You should also be able to see the message in the web interface

Now that wasn’t too bad for a single static message. But it’s more useful to be able to send records from a data.frame. This is where my lack of R skills causes me a problem. The following code works, but it isn’t terribly elegant.

First of all lets build up some JSON to describe the cars data.frame:

# publish a data.frame to Kafka
# cars is an inbuilt data.frame to test with
df <- head(cars, n=2) # limit to just two records to make testing easier
fields <- list() # start with an empty list of fields
field_types <- lapply(df, typeof) # work out the type of the fields
# make an array of fields like [{name="name", type="type"}]
for(f in colnames(df)) {
  fields <- append(fields, list(list(name=f, type=field_types[[f]])))
}
value_schema <- 
  list(
    type="record",
    name="Car", # @TODO turn this into a function and accept a name for the record
    fields=fields
  )

And then lets loop through that data.frame sending an HTTP request for each row:

# For each car, generate and send an HTTP request to the REST api
for(car in 1:nrow(df)) {
  dfbody <- list(
    value_schema=toJSON(value_schema),
    records=list(list(value=list(speed=df[car, "speed"], dist=df[car, "dist"]))) #@TODO working out how to dynamicly generate column list here is breaking my head
  )
  print(toJSON(dfbody))
  dfresponse <- POST(url=paste(proxy_uri, "topics", "avrocars", sep="/"),
                     content_type("application/vnd.kafka.avro.v2+json"),
                     accept("application/vnd.kafka.v2+json"),
                     body=toJSON(dfbody),
                     encode='json'
  )
  stop_for_status(dfresponse)
  content(dfresponse, "text")
  if (is.null(fromJSON(content(dfresponse, "text"))$offsets[[1]]$error)){
    print("OK")
  }
}

Again, we can see multiple requests going through mitmproxy (hopefully without errors) and view the data in the webui.

This code is by no means a finished product, but it is something I can take to our data science team and say “Look what I did!”