- Wiki
- Release Notes
- Upgrade Guide
- Description
- Dev Setup
- Usage
- Configuration
- Contribution Guidelines
- License
- Changelog
Ziggurat is a framework built to simplify stream processing on Kafka. It can be used to create a full-fledged Clojure app that reads and processes messages from Kafka. Ziggurat abstracts the following features:
- Reading messages from Kafka
- Retrying failed messages via RabbitMQ
- Setting up an HTTP server
Refer to concepts to understand the concepts referred to in this document.
- Ziggurat HTTP Server
- Toggle Streams on a Running Actor
- Middlewares in Ziggurat
- Consuming and Publishing Messages to Kafka
- Connecting RabbitMQ and using channels
- Configuration and Config Description
- To read about all concepts, please refer the Concepts file
- Install Clojure:
brew install clojure
- Install Leiningen:
brew install leiningen
- Run docker-compose:
docker-compose up
. This starts:
- Kafka on localhost:9092
- ZooKeeper on localhost:2181
- RabbitMQ on localhost:5672
- Run tests:
make test
- Run
make setup-cluster
. This clears up the volume and starts:- 3 Kafka brokers on localhost:9091, localhost:9092, and localhost:9093
- Zookeeper on localhost:2181
- RabbitMQ on localhost:5672
- Run
make test-cluster
. This usesconfig.test.cluster.edn
instead ofconfig.test.edn
.
Add this to your project.clj
:
[tech.gojek/ziggurat "4.11.1"]
_Please refer [clojars](https://clojars.org/tech.gojek/ziggurat) for the latest stable version_
To start a stream (a thread that reads messages from Kafka), add this to your core namespace.
```clojure
(require '[ziggurat.init :as ziggurat])
(defn start-fn []
;; your logic that runs at startup goes here
)
(defn stop-fn []
;; your logic that runs at shutdown goes here
)
(defn main-fn
[{:keys [message metadata] :as message-payload}]
(println message)
:success)
(def handler-fn
(-> main-fn
(middleware/protobuf->hash ProtoClass :stream-id)))
;; Here ProtoClass refers to the fully qualified name of the Java class which the code is used to de-serialize the message.
(ziggurat/main start-fn stop-fn {:stream-id {:handler-fn handler-fn}})
NOTE: this example assumes that the message is serialized in Protobuf format
Please refer the Middleware section for understanding handler-fn
here.
- The main-fn is the function that will be applied to every message that is read from the Kafka stream.
- The main-fn will take map as an argument that takes 2 keys i.e
- message - It is the byte[] array received from kafka.
- metadata
- topic - It is the topic from where kafka message is consumed.
- timestamp - It is ingestion timestamp in kafka.
- partition - The partition from message is consumed.
- rabbitmq-retry-count - The number of retries done by rabbitmq for given message.
- The main-fn returns a keyword which can be any of the below words
- :success - The message was successfully processed and the stream should continue to the next message
- :retry - The message failed to be processed and it should be retried via RabbitMQ.
- :dead-letter - The message is not retried and is directly pushed to the dead letter queue
- :skip - The message should be skipped without reporting its failure or retrying the message. Same as :success except that a different metric is published to track skipped messages
- The start-fn is run at the application startup and can be used to initialize connection to databases, http clients, thread-pools, etc.
- The stop-fn is run at shutdown and facilitates graceful shutdown, for example, releasing db connections, shutting down http servers etc.
- Ziggurat enables reading from multiple streams and applying same/different functions to the messages.
:stream-id
is a unique identifier per stream which needs to be included in config.edn file - All configs, queues and metrics will be namespaced under this id.
(ziggurat/main start-fn stop-fn {:stream-id-1 {:handler-fn handler-fn-1}
:stream-id-2 {:handler-fn handler-fn-2}})
(require '[ziggurat.init :as ziggurat])
(defn start-fn []
;; your logic that runs at startup goes here
)
(defn stop-fn []
;; your logic that runs at shutdown goes here
)
(defn api-handler [_request]
{:status 200
:headers {"Content-Type" "application/json"}
:body (get-resource)})
(def routes [["v1/resources" {:get api-handler}]])
(defn main-fn
[{:keys [message metadata] :as message-payload}]
(println message)
:success)
(def handler-fn
(-> main-fn
(middleware/protobuf->hash ProtoClass :stream-id)))
(ziggurat/main start-fn stop-fn {:stream-id {:handler-fn handler-fn}} routes)
NOTE: this example assumes that the message is serialized in Protobuf format
- Sentry has been deprecated from version 4.6.3.
- For dev setup and contributions please refer to CONTRIBUTING.md
Copyright 2018, GO-JEK Tech <http://gojek.tech>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.