brod

Lets you produce messages to the Kafka distributed publish/subscribe messaging service
Download

brod Ranking & Summary

Advertisement

  • Rating:
  • License:
  • MIT/X Consortium Lic...
  • Price:
  • FREE
  • Publisher Name:
  • Datadog, Inc.
  • Publisher web site:
  • http://datadoghq.com

brod Tags


brod Description

brod is a Python module that lets you produce messages to the Kafka distributed publish/subscribe messaging service. It started as a fork of pykafka (https://github.com/dsully/pykafka), but became a total rewrite as we needed to add many features.It's named after Max Brod, Franz Kafka's friend and supporter.Installationeasy_install brodNote: the zc.zk package has a dependency on Python Zoo Keeper bindings which are not included during it's installation. They can be installed with easy_install zc-zookeeper-static see the zc.zk documentation for more information http://pypi.python.org/pypi/zc.zk/0.5.UsageSending a simple messageimport brodkafka = brod.Kafka(host='localhost')kafka.produce("test-topic", "Hello World")Sending a sequence of messagesimport brodkafka = brod.Kafka(host='localhost')kafka.produce("test-topic", )Consuming messages one by oneimport brodkafka = brod.Kafka(host='localhost')for offset, message in brod.fetch("test-topic", offset=0): print messageUsing a ZooKeeper-based consumerfrom brod.zk import ZKConsumerconsumer = ZKConsumer('zk_host:2181', 'my_consumer_group', 'my_topic', autocommit=True)# Polls foreverfor msg_set in consumer.poll(poll_interval=1): for offset, msg in msg_set: print offset, msg_set.broker_partition, msgNonblocking Tornado client supportimport timeimport tornado.ioloopimport tornado.webfrom brod import LATEST_OFFSETfrom brod.nonblocking import KafkaTornadoclass MainHandler(tornado.web.RequestHandler): def initialize(self, kafka, topic): self.kafka = kafka self.topic = topic def post(self): data = self.get_argument('data') self.kafka.produce(self.topic, data) @tornado.web.asynchronous def get(self): brod.offsets(self.topic, LATEST_OFFSET, max_offsets=2, callback=self._on_offset) def _on_offset(self, offsets): offset = offsets # Get the second to latest offset brod.fetch(self.topic, offset, callback=self._on_fetch) def _on_fetch(self, messages): for offset, message in messages: self.write("{0}: {1}".format(offset, message)) self.finish()kafka = KafkaTornado()application = tornado.web.Application()if __name__ == "__main__": parse_command_line() application.listen(8888) tornado.ioloop.IOLoop.instance().start()Product's homepage


brod Related Software