How to Consume RabbitMQ Messages in Kotlin

dimitrilc 1 Tallied Votes 2K Views Share

Introduction

In the last tutorial, we learned how to send messages to a RabbitMQ queue. In this tutorial, we will learn how to consume those messages in Kotlin.

Goals

At the end of the tutorial, you would have learned:

  1. How to use the RabbitMQ Java Client library to consume messages In Kotlin.

Tools Required

  1. A Kotlin IDE such as IntelliJ IDEA version 2022.2.1 (Community Edition).

Prerequisite Knowledge

  1. How to publish messages to RabbitMQ exchanges.
  2. Basic Kotlin

Project Setup

To follow along with the tutorial, perform the steps below:

  1. In IntelliJ, create a new Kotlin project using Gradle as the build system.
    Screen_Shot_2022-09-09_at_11.36.41_AM.png

  2. Add the required dependencies into the build.gradle.kts file. This is the RabbitMQ Java Client and its slf4j dependencies.

     // https://mvnrepository.com/artifact/com.rabbitmq/amqp-client
     implementation("com.rabbitmq:amqp-client:5.15.0")
    
     implementation("org.slf4j:slf4j-simple:2.0.0")
     implementation("org.slf4j:slf4j-api:2.0.0")

Creating A Consumer User

Similarly to how we created a dedicated user for publishing messages in the previous tutorial, let us create a new user for the purpose of consuming messages only.

  1. Log into the management interface.

  2. Go to the Admin tab.

  3. Select Add a user.

  4. Provide consumer as the username and password as the password.

  5. Hit Add user.
    Screen_Shot_2022-09-09_at_12.48.24_PM.png

  6. After the user is created, click on the username consumer.

  7. Give the consumer user read-only access to the virtual host /.

  8. Hit Set Permission.

Screen_Shot_2022-09-09_at_12.51.33_PM.png

The consumer user is now ready to consume messages in the next section

Basic Message Consumer

To create a consumer, you can follow the steps below:

  1. Create a Connection object using ConnectionFactory().

  2. Set the appropriate properties for the Connection object.

  3. Create a new Channel from the Connection object.

  4. Call basicConsume() on the Channel object, providing the queue name and an implementation of Consumer.

     import com.rabbitmq.client.*
    
     private const val USERNAME = "consumer"
     private const val PASSWORD = "password"
     private const val QUEUE = "my-queue"
    
     fun main() {
        val factory = ConnectionFactory()
            .apply {
                username = USERNAME
                password = PASSWORD
                host = ConnectionFactory.DEFAULT_HOST
                virtualHost = ConnectionFactory.DEFAULT_VHOST
                port = ConnectionFactory.DEFAULT_AMQP_PORT
            }
    
        val channel = factory
            .newConnection()
            .createChannel()
    
        channel.basicConsume(QUEUE, true, object : Consumer {
            override fun handleConsumeOk(consumerTag: String?) {
                consumerTag?.let {
                    println("$it has been registered as a callback")
                }
            }
    
            override fun handleCancelOk(consumerTag: String?) {
                //Perform cancellation tasks such as closing resources here
            }
    
            override fun handleCancel(consumerTag: String?) {
                //Perform cancellation tasks such as closing resources here
            }
    
            override fun handleShutdownSignal(consumerTag: String?, sig: ShutdownSignalException?) {
                sig?.let {
                    throw it
                }
            }
    
            override fun handleRecoverOk(consumerTag: String?) {
                // If connection issues, try to receive messages again
            }
    
            override fun handleDelivery(
                consumerTag: String?,
                envelope: Envelope?,
                properties: AMQP.BasicProperties?,
                body: ByteArray?
            ) {
                body?.let {
                    println((body.decodeToString()))
                }
            }
    
        })
    
     }

A few more important things to note in the code snippet above are:

  1. The version of basicConsume() used here included an autoAck argument, which means that an ack will be sent to the server after the message has been received. Our server will remove the message from the queue after it has been acked.
  2. You can check the properties argument in the handleDelivery() function to retrieve the custom headers and some other properties for the message.
  3. Depending on your use case, you can either wrap the channels and connection in use{} (same as Java try-with-resource) blocks or just leave it blocking the main thread (in this case). You can also have the consumer running permanently in a background thread or coroutine.

Summary

Congratulations, you have learned how to consume messages from RabbitMQ queues in this tutorial.