Using Kafka on OSX with a F# producer and consumer

This is just a quick note on installing Kafka on OSX and how to produce and consum messages with a F# client.

Installation

The easiast way to install Apache Kafka on OSX is probably with Homebrew

Open your favorite terminale and issue the following command:

brew install kafka

This will also install Zookeeper which Kafka has a strong dependency on.

Does it work

You can test the installation from the command line with the following few steps:

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

kafka-server-start /usr/local/etc/kafka/server.properties

kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic testTopic

kafka-console-producer –broker-list localhost:9092 –topic testTopic

kafka-console-consumer –zookeeper localhost:2181 –topic testTopic –from-beginning

kafka-topics –zookeeper 127.0.0.1:2181 –delete –topic testTopic

Your path must include Java for this to work. Hopefully the messages that you type into the producer command line ends up being printed on the consumer command line.

Here is a simple producer, that just pumps messages out quite fast:

open KafkaNet
open KafkaNet.Model
open KafkaNet.Protocol
open System
open System.Reactive
open System.Reactive.Linq

let produceWith (producer : Producer) topic messages =
      producer.SendMessageAsync(topic, messages)
      |> Async.AwaitTask
      |> ignore

[<EntryPoint>]
let main argv =
    let topic = "testTopic"
    let router =  new BrokerRouter(new KafkaOptions(new Uri("http://localhost:9092")))
    use producer = new Producer(router)

    let createAndSendMessage = (fun x -> [ new Message(x) ] |> produceWith producer topic)

    let obs = Observable.Interval(TimeSpan.FromSeconds(0.))
                |> Observable.subscribe(fun s -> createAndSendMessage (s.ToString()))

    Console.ReadKey() |> ignore
    producer.Dispose()
    0

The corresponding consumer looks like this. Please note the call to Consume() is a blocking call but by turning it into an observable sequence one can work around this.

open KafkaNet.Protocol
open System
open System.Reactive.Linq

let printMessage (message: Message) =
    let encoding = new System.Text.UTF8Encoding()
    message.Value
    |> encoding.GetString
    |> printfn "%s"  

[<EntryPoint>]
let main argv =
    let topic = "testTopic"
    let router =  new BrokerRouter(new KafkaOptions(new Uri("http://localhost:9092")))

    let consumer = new KafkaNet.Consumer(new ConsumerOptions(topic, router))
    consumer.SetOffsetPosition(new OffsetPosition(0, 0L))

    let observable = consumer.Consume()
                        |> Observable.ToObservable
                        |> Observable.subscribe(fun m -> printMessage m)
    0

Please note

I do not consider the code snippets above production ready. There are a lot of configurations to take into account when deploying Kafka to production. Furthermore kafka-net does not seem to be actively developed for the time being. The current version was build for kafka version 0.8.0 but the parts that I have played around with also works for version 0.9.0.

Kafta is consider to be very fast and one way to obtain low latency is by design choice. It is the responsibility of the consumers to keep track of where in the message partion the consumer has handled messages for. Kafka supports the consumer by offering a way to commit offset positions but this feature is not supported by kafka-net.

Conclusion

The Apache Kafka website lists four DotNet clients for Kafka. In addition to kafka-net used in the examples above the other three clients are:

Compared to the official Java API all four project seems to be missing quite a lot of functionality. If you are using Kafka from DotNet in production I would very much like to hear about your experience. Kafka itself appears to be an attractive choice for messaging infrastructure but the support from DotNet is rather limited.

References

[1] The Log: What every software engineer should know about real-time data’s unifying abstraction.

[2] Kafka by example: Kafka as Unix Pipes.

[3] Apache Kafka for Beginners.

[4] Kafka in a Nutshell.

[5] Kafka: The Definitive Guide. Real-time data and stream processing at scale by Neha Narkhede, Gwen Shapira, Todd Palino, O’Reilly Media, 2016.

[6] ZooKeeper. Distributed Process Coordination by Flavio Junqueira, Benjamin Reed, O’Reilly Media, 2013.


Please create issues at the Github repo Twitter.

Edit page on GitHub. Please help me to improve the blog by fixing mistakes on GitHub. This link will take you directly to this page in our GitHub repository.

There are more posts on the front page.

Creative Commons License
Content of this blog by Carsten Jørgensen is licensed under a Creative Commons Attribution 4.0 International License.