Palermo is a job processing system for the JVM inspired by Resque, backed by RabbitMQ and written in Clojure.
The library is just a thin layer on top of RabbitMQ with the following features:
- Defining jobs
- Defining job queues
- Defining workers
- Serialisation/deserialisation of jobs
- Queue management functionality
Palermo also includes a web front-end to manage the system that can be run as a standalone Jetty application.
If you plan to use Palermo as a library, you can grab the latest release from Clojars Maven repository.
If you want to use Palermo to run the web frontend or to start standalone workers from the command line, the easiest way is to clone the project and build an executable standalone jar using leiningen:
palermo $ lein uberjar
The following section will review the main functionality offered by Palermo using code snippets in Clojure and Java.
Palermo implements a certain usage pattern (a job processing system) on top of RabbitMQ. To work it needs to connect to a RabbitMQ exchange where all the job queues will be created. Parameters like the RabbitMQ server, port, username, password, virtual host and exchange name can be passed to the initialisation function.
The following samples show how to establish this connection.
(use 'palermo.core)
; Default values:
; host: localhost
; port: 5672
; username: guest
; password: guest
; vhost /
; exchange palermo
(def palermo-server (palermo {:host rabbit-host}))
// overloaded constructor
palermo.PalermoServer palermoServer = new palermo.PalermoServer(host, port, exchange);
Jobs in Palermo are classes with a default constructor and implementing the palermo.job.PalermoJob
interface.
The interface defines a single method process
that will recieve the arguments for the job sent to the working queue.
In the following example we will define a job that will just make the worker thread sleep for an interval of time before printing a message on standard output.
In Clojure the macro defpalermojob
can be used to define a Palermo job that needs be compiled into the right Java class.
(ns palermotests
(:use palermo.core))
(defpalermojob SleepyJob
(process [job timeout]
(println "SLEEPING...")
(Thread/sleep timeout)
(println "BACK!")))
The defpalermojob
macro generates a new Java class using the provided name when the code is compiles.
It must be taken into account that Clojure namespaces are not loaded in the code of the generated Java class, so they must be required inside the body of the job process
function.
For instance, in the following example job, we are using a MongoDB library to save a timestamp in the database. For this to work, the body of the job class must require explicitely the Clojure namespace in order for the MongoDB functions to be correctly bound.
Wihtout that line, java.lang.IllegalStateException: Attempting to call unbound fn
errors would be generated.
(ns lampedusa.jobs
(:use [palermo.core]
[somnium.congomongo]))
(def conn
(make-connection "lampedusa"
:host "127.0.0.1"
:port 27017))
(defpalermojob LampedusaClojureJob
(process [j arg]
; requiring the Clojure namespace
(require 'lampedusa.jobs)
(let [local conn
now (java.util.Date.)
sdf (java.text.SimpleDateFormat. "yyyy-MM-dd'T'HH:mm'Z'")]
(with-mongo local
(insert! :times {:iso8601 (.format sdf now)
:unix (* (.getTime now) 1000)
:date now})))))
The following code is the equivalent Java implementation. It is important to define a default constructor without arguments for Palermo to instantiate the job class correctly.
package palermotests;
import palermo.job.PalermoJob;
public class SleepyJob implements PalermoJob {
public SleepyJob(){}
@Override
public void process(Object arguments) throws Exception {
int timeout = (Integer) arguments;
System.out.println("SLEEPING...");
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("BACK!");
}
}
Once we have opened a Palermo connection to RabbitMQ, we can enqueue jobs into the system that will be picked and processed by workers.
In order to enqueue a job we need to define a tuple with three components:
- The type of the job
- The queue where the job will be inserted
- The arguments for the job
The following code show how a job can be enqueued in Clojure:
(let [queue-name "tests"
timeout-seconds-argument 5]
(enqueue palermo-server queue-name
palermotests.SleepyJob
timeout-seconds-argument))
In the same way, using the Java interface:
String queueName = "slow";
Integer timeoutArgument = 5;
palermoServer.enqueue(queueName,
palermotests.WaitJob.class,
timeoutArgument);
When a job is enqueued, the arguments passed to the enqueue
function are serialised and sent to the Palermo job queue.
When a worker picks the job from the queue, Palermo will deserialise the argument, instantiate the job class and invoke the process
method of the job passing the deserialised object as the argument in the invocation.
JBoss serialisation is the standard mechanism used by Palermo to serialise and deserialise job arguments. In order for the enqueuing/processing mechanism of Palermo to work, certain constraints must be taken into account:
- The job class must be available in the class path of the publisher and in the class path of the worker picking the job.
- Job classes must have a default constructor without arguments.
- Argument objects need to be supported by JBoss serialisation, basic objects, containers or POJOs with a default constructor are supported.
Workers are threads managed by Palermo that will pick jobs from Palermo's job queues and will process the jobs with the provided arguments. Workers can connect to multiple queues and if multiple workers are connected to the same queue, jobs will be distributed among them in a round-robbin fashion.
To start a worker programmatically, the following Clojure and Java code can be used:
(start-worker palermo-server ["high" "low" "tests"])
String[] queues = {"high", "low", "tests"};
palermoServer.startWorker(queues);
Workers can also be stopped provided the identifier of th worker.
The identifier can be retrieved using the workers
function or method.
; stopping all the workers
(doseq [worker-id (workers palermo-server)]
(stop-worker palermo-server worker-id))
// stopping all the workers
for(String workerId : palermoServer.workers())
palermoServer.stopWorker(workerId);
Runtime information about the status of the job queues or the connected workers can be obtained with a small set of Clojure functions and Java methods provided by the Palermo connection.
; summary information about workers, pending jobs
; and jobs being processed per queue
(queues-info palermo-server)
; information about the workers
(workers-info palermo-server)
; enumeration of the worker identifiers in the system
(workers palermo-server)
; enumeration of the jobs pending in a particular queue
(jobs-in-queue palermo-server "tests")
// summary information about workers, pending jobs
// and jobs being processed per queue
HashMap info = palermoServer.getQueuesInfo();
// information about the workers
HashMap workers = palermoServer.getWorkersInfo();
// enumeration of th worker identifiers in the system
String[] workerIds = palermoServer.workers();
// enumeration of the jobs pending in a particular queue
ArrayList jobs = palermoServer.getQueueJobs("tests");
Jobs in a queue can also be removed in a single invocation using the purge
function.
(purge-queue palermo-server "tests")
palermoServer.purgeQueue("tests");
If a worker throws an exception during the processing of a job, the job will be re-enqueued into a special queue named failures.
The jobs in this queue can be inspected using the queue inspection functions and individual failed jobs or all the failed jobs can be re-enqueued programatically.
;; retrying all the failed jobs 1 by 1
(doseq [failed-job (jobs-in-queue palermo-server "failed")]
(let [failed-job-id (-> failed-job :metadata :message-id)]
(retry-failed-job palermo-server failed-job-id)))
// retrying all the failed jobs 1 by 1
ArrayList<HashMap> jobs = palermoServer.getQueueJobs("failed");
for(HashMap job : jobs){
HashMap metadata = (HashMap) job.get("metadata");
String messageId = (String) metadata.get("message-id");
palermoServer.retryJob(messageId);
}
All the failed jobs can also be re-enqued in a single invocation using the retry-all
function.
(retry-all palermo-server)
palermoServer.retryAllFailedJobs();
Palermo includes a command line client that can be used to start a worker that will automatically start processing jobs from the Palermo RabbitMQ connection configured according the parameters passed in the command line.
The list of possible command line options are shown in the following snippet.
$ java -jar palermo-standalone.jar
usage: worker
-exchange <EXCHANGE> RabitMQ exchange for Palermo, defaults to palermo
-host <HOST> Host of the RabbitMQ server, defaults to localhost
-password <PASSWORD> Password for the RabbitMQ server, defaults to
guest
-port <PORT> Port of the RabbitMQ server, defaults to 5672
-queues <QUEUES> Comma separated list of queues this worker will
connect to, defaults to jobs
-username <USERNAME> Username for the RabbitMQ server, defaults to
guest
-vhost <VHOST> Virtual Host for the RabbitMQ server, defaults to
In order for the worker to be able to process the jobs, the jobs class files must be available in the class path of the worker.
Palermo includes a web interface that can be used to manage the queues and perform common operations like retrying failed jobs.
To start the web frontend a command line utility is provided. These are all the option available to the command line launcher, accessible through the palermo.cli
class or through the executable jar that is built when invoking lein uberjar
.
usage: web
-exchange <EXCHANGE> RabitMQ exchange for Palermo, defaults to palermo
-host <HOST> Host of the RabbitMQ server, defaults to localhost
-password <PASSWORD> Password for the RabbitMQ server, defaults to
guest
-port <PORT> Port of the RabbitMQ server, defaults to 5672
-username <USERNAME> Username for the RabbitMQ server, defaults to
guest
-vhost <VHOST> Virtual Host for the RabbitMQ server, defaults to
-webport <WEBPORT> Port where the Palermo web interface will be
waiting for connections, defaults to 3000
Copyright © 2014 Antonio Garrote
Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.