Merhabalar.

Bu yazıda bir Spring Boot uygulamasında Spring Integration kullanarak RabbitMQ ile entegrasyonun nasıl yapılacağını öğreneceğiz.

Bu yazıda RabbitMQ’ya bir gönderici bir de RabbitMQ’dan alıcı 2 projemiz olacak, Sender ve Receiver projelerimiz.

Sender projemizin kodlarına şuradan erişebilirsiniz: https://github.com/ilkgunel/SpringIntegrationRabbitMQSender

Reveiver projemizin kodlarına da şuradan erişebilirsiniz: https://github.com/ilkgunel/SpringIntegrationRabbitMQReceiver

Şimdi öncelikle Sender uygulamamızı inceleyelim:

Product.java

package com.ilkaygunel.domain;

import java.io.Serializable;

public class Product implements Serializable {
	char type;
	int price;

	public char getType() {
		return type;
	}

	public void setType(char type) {
		this.type = type;
	}

	public int getPrice() {
		return price;
	}

	public void setPrice(int price) {
		this.price = price;
	}
}

Product sınıfımız bir üst sınıf görevi üstlenecek. İçerisinde ürün tipini belirten bir type alanı ve fiyatını belirten bir price alanı yer alıyor. Devam eden kısımda da bu alanlara ait get-set metotları yer alıyor.

Book.java

package com.ilkaygunel.domain;

public class Book extends Product {
	public Book() {
		this.type = 'B';
		this.price = 100;
	}
}

Product sınıfımızı kalıtan Book sınıfımız tip olarak B’yi ve fiyat olarak da 100 değerini kullanacak şekilde yapılandırılıyor.

package com.ilkaygunel.domain;

public class Perfume extends Product {
	public Perfume() {
		this.type = 'P';
		this.price = 200;
	}
}

Product sınıfımızı kalıtan Perfume sınıfımız tip olarak P’yi ve fiyat olarak da 200 değerini kullanacak şekilde yapılandırılıyor.

LineItem.java

package com.ilkaygunel.domain;

import java.io.Serializable;

public class LineItem implements Serializable {
	private Product product;
	private int qty;

	public Product getProduct() {
		return product;
	}

	public void setProduct(Product product) {
		this.product = product;
	}

	public int getQty() {
		return qty;
	}

	public void setQty(int qty) {
		this.qty = qty;
	}
}

LineItem sınıfımız ürünlerimizin siparişi sırasında kullanacağımız bir ara sınıf olacak. İçerisinde ürünü simgeleyecek bir product ve miktarını simgeleyecek qty alanları yer alıyor.

Order.java

package com.ilkaygunel.domain;

import java.io.Serializable;
import java.util.List;


public class Order implements Serializable {
	private String id;
	private List<LineItem> items;

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public List<LineItem> getItems() {
		return items;
	}

	public void setItems(List<LineItem> items) {
		this.items = items;
	}
}

Verilecek siparişleri simgelemek için oluşturulmuş Order sınıfımız içerisinde items isminde LineImtem tipinde bir liste ve bir de sipariş id tutuyor.

SenderConfiguration.java

package com.ilkaygunel.springintegration.sender;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SenderConfiguration {

	static final String topicExchangeName = "ecommExchange";

	static final String queueName = "orders";

	@Bean
	Queue queue() {
		return new Queue(queueName, false);
	}

	@Bean
	TopicExchange exchange() {
		return new TopicExchange(topicExchangeName);
	}

	@Bean
	Binding binding(Queue queue, TopicExchange exchange) {
		return BindingBuilder.bind(queue).to(exchange).with("orders.#");
	}
}

SenderConfiguration sınıfımız uygulamamızın RabbitMQ’ya nasıl mesaj göndereceğini bildiriyor arkadaşlar. topicExchangeName ismi ile tanımlanmış değişkenin değeri RabbitMQ içerisinde bir exchange yani dönüşüm uygulanması için kullanılacak. queueName ismi ile tanımlanmış değişken de mesajların hangi kuyruğa iletileceğini bildiriyor. Bizim uygulamamız ecommExchange adındaki bir exchange üzerinden geçip orders isimindeki bir que’ya gönderilecek. binding(…) metoduna dikkat edersek orders.# şeklinde bir ifade görüyoruz. Bu kısım aslında routingKey anlamına geliyor arkadaşlar. Exchange üzerinde bu key ile que’ya yönlendirme yapılacak. # işareti bildiğimiz gibi number ifade etmek için kullanılıyor. Bu nedenle biz de routingKey orders.1, orders.2 gibi ifadeler kullanacağız.

RabbitMQ’da Publisher konumunda olan birisi bir mesaj gönderdiğinde RabbitMQ içerisindeki Exchange bu mesajı karşılar ve routingKey’e bakarak bunu que’ya gönderir.

Runner.java

package com.ilkaygunel.springintegration.sender;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import com.ilkaygunel.domain.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class Runner implements CommandLineRunner {

	private final RabbitTemplate rabbitTemplate;

	public Runner(RabbitTemplate rabbitTemplate) {
		this.rabbitTemplate = rabbitTemplate;
	}

	@Override
	public void run(String... args) throws Exception {

		Product book = new Book();
		Product perfume = new Perfume();

		LineItem l1 = new LineItem();
		l1.setProduct(book);
		l1.setQty(2);

		System.out.println("Sending first order, two Books & three Perfumes");
		LineItem l2 = new LineItem();
		l2.setProduct(perfume);
		l2.setQty(3);

		ArrayList<LineItem> lineItems1 = new ArrayList<LineItem>();
		lineItems1.add(l1);
		lineItems1.add(l2);

		Order order1 = new Order();
		order1.setId("1001");
		order1.setItems(lineItems1);

		rabbitTemplate.convertAndSend(SenderConfiguration.topicExchangeName, "orders.1", order1);

		TimeUnit.SECONDS.sleep(5);

		System.out.println("Sending second order, five books & two Perfumes");
		LineItem l3 = new LineItem();
		l3.setProduct(book);
		l3.setQty(5);

		LineItem l4 = new LineItem();
		l4.setProduct(perfume);
		l4.setQty(2);

		ArrayList<LineItem> lineItems2 = new ArrayList<LineItem>();
		lineItems2.add(l3);
		lineItems2.add(l4);

		Order order2 = new Order();
		order2.setId("1002");
		order2.setItems(lineItems2);

		rabbitTemplate.convertAndSend(SenderConfiguration.topicExchangeName, "orders.2", order2);
	}
}

Runner sınıfı aslında bu örnek için oluşturduğumuz ve que’ya otomatik veri basması için kullanacağımız bir sınıf. CommandLineRunner’ı kalıtan bu sınıf içerisindeki run metodu uygulama başlatıldığında çalışacak que’ya veri gönderecek.

run(…) metodu içerisinde çeşitli ürünler oluşturulup bu ürünlerle LineItem nesneleri oluşturuluyor ve bu nesneler ile de siparişler oluşturuluyor. Yapılandırıcıdan gelen rabbitTemplate üzerinden convertAndSend(…) metodu çağırılarak da veriler RabbitMQ’ya gönderiliyor. convertAndSend metoduna parametre olarak exchangeName’i, routingKey’i ve sipariş nesnesini geçiyoruz. routingKey gördüğümüz gibi number’lı bir patterne sahip.

SenderApplication.java

package com.ilkaygunel.springintegration.sender;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SenderApplication {

	public static void main(String[] args) throws InterruptedException {
		SpringApplication.run(SenderApplication.class, args).close();
	}
}

SenderApplication sınıfımızın da yaptığı tek iş main metodu üzerinden uygulamayı ayağa kaldırmak.

Sender uygulaması ile işimiz bu kadar, şimdi Receiver uygulamasına bakalım.

RECEIVER APPLICATION

Reveiver uygulamamızda da Product, Book, Perfume, Order ve LineItem sınıfları aynen yer alıyor. Farklı olarak Invoice sınıfımız var.

Invoice.java

package com.ilkaygunel.domain;

import java.util.Date;

public class Invoice {
    private String id;
    private String order_id;
    private Date date;
    private Double value;
    private Double amount;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getOrder_id() {
        return order_id;
    }

    public void setOrder_id(String order_id) {
        this.order_id = order_id;
    }

    public Date getDate() {
        return date;
    }

    public void setDate(Date date) {
        this.date = date;
    }

    public Double getValue() {
        return value;
    }

    public void setValue(Double value) {
        this.value = value;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }
}

Invoice sınıfımız aslında gelen siparişler için faturalama işi yapacağından içerisinde buna ait alanlar tutuyor. Bu alanların nasıl kullandığına ReceiverApplication sınıfında bakacağız.

ReceiverConfiguration.java

package com.ilkaygunel.springintegration;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@Configuration
public class ReceiverConfiguration {

	@Bean
	public MessageChannel orderChannel() {
		return new DirectChannel();
	}

	@Bean
	public MessageChannel invoiceChannel() {
		return new DirectChannel();
	}

	@Bean
	public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
			@Qualifier("orderChannel") MessageChannel channel) {
		AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
		adapter.setOutputChannel(channel);
		return adapter;
	}

	@Bean
	public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
		container.setQueueNames("orders");
		return container;
	}
}

ReceiverConfiguration sınıfımız que’yu nasıl dinleyeceğimiz ve mesaj geldiğinde ne yapacağımıza dair tanımlamalarımız yer alıyor.

Receiver uygulamamız bizim Spring Integration’ın özelliklerinden yararlanacağımız bir uygulama olacak. Spring Integration içindeki channels, transformers ve serviceactivators’ı kullanarak burada işimi gerçekleştireceğiz.

Sınıfımız içerisinde bir orderChannel ve bir de invoiceChannel tanımı yapıyoruz. Bu channel’lar bizim que’dan gelen mesajı karşılamada ve fatura hazılarmada kullanacağımız channel’lardır. container(…) metodu ile tanımladığımız Bean bizim hangi que’yu dinleyeceğimizi tanımladığımız Bean’dir. container.setQueueNames(“orders”); ifadesi ile orders ismindeki que’yu dinlemeisni söylüyoruz. inbound(…) metodu ile de que’dan mesaj geldiğinde nasıl davranılacağını bildiriyoruz. Metoda parametre olarak geçilen listenerContainer que’yu dinleme görevini üstlenirken orderChannel‘a işaret eden channel ise mesajın iletileceği channel olma görevini üstleniyor.

ReceiverApplication.java

package com.ilkaygunel.springintegration;

import com.ilkaygunel.domain.Invoice;
import com.ilkaygunel.domain.LineItem;
import com.ilkaygunel.domain.Order;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import java.util.Date;
import java.util.List;

@SpringBootApplication
public class ReceiverApplication {

	public static void main(String[] args) {
		new SpringApplicationBuilder(ReceiverApplication.class).web(WebApplicationType.NONE).run(args);
	}

	@Transformer(inputChannel = "orderChannel", outputChannel = "invoiceChannel")
	Invoice convertToInvoice(Order order) {

		System.out.println("*****************************");
		System.out.println("..... Received an order .....");
		System.out.println("Order id = " + order.getId());
		System.out.println("Generating Invoice ..........");
		Invoice invoice = new Invoice();
		invoice.setDate(new Date());
		invoice.setOrder_id(order.getId());

		Double value    = 0.0;
		Double amount   = 0.0;
		Double tax_rate = 0.0;
		Double tax      = 0.0;

		List<LineItem> lineItems = order.getItems();
		Double lineItemPrice;

		for (LineItem lineItem : lineItems) {
			lineItemPrice = (double) (lineItem.getProduct().getPrice() * lineItem.getQty());

			// Books are taxed at 5%
			if (lineItem.getProduct().getType() == 'B')
				tax_rate = 0.05;
			// Perfumes are taxed at 8%
			else if (lineItem.getProduct().getType() == 'P')
				tax_rate = 0.08;

			tax = lineItemPrice * tax_rate;

			value  += lineItemPrice;
			amount += lineItemPrice + tax;
		}
		invoice.setValue(value);
		invoice.setAmount(amount);

		return invoice;
	}

	@Bean
	@ServiceActivator(inputChannel = "invoiceChannel")
	public MessageHandler handler() {
		return new MessageHandler() {
			@Override
			public void handleMessage(Message<?> message) throws MessagingException {

				Invoice invoice = (Invoice) message.getPayload();
				System.out.println("Received Invoice ............");
				System.out.println("Order Id = " + invoice.getOrder_id());
				System.out.println("Value = " + invoice.getValue());
				System.out.println("Invoice Amount = " + invoice.getAmount());
			}
		};
	}
}

ReceiverApplication sınıfımız uygulamayı başlatacak main metodumuzu barındırıyor ve bunun yanında faturalandırma yapacak metodu da içinde barındırıyor.

Invoice dönüş tipine sahip convertToInvoice(…) metodu @Transformer notasyonu ile işaretli. Bu notasyona bir inputChannel ve bir de outputChannel parametreleri verilmiş. inputChannel ile denmek istenen ilgili channel’a veri girişi olduğunda burasının tetikleneceğidir. outputChannel ile denmek istenen de bu metodun işini bitirip döndürdüğü objenin gönderileceği channel’dır. Bu örneğimizde orderChannel’a veri gönderildiğinde bu metot devreye girecek, işlemler yapıp Invoice tipinde bir obje dönecek ve bu obje de outputChannel olarak tanımlanmış invoiceChannel’a iletilecektir. convertToInvoice metodu içerisinde Sender uygulamasından gelen Order’dan alanlar alınıyor, gelen adete göre vergili ve vergisiz fiyat hesaplaması yapılıyor, akabinde de Invoice objesine koyulup göndeirliyor.

Son olarak handler(…) metodu da bir @ServiceActivator notasyonuna sahip. Burada da yine bir inputChannel tanımı var. @Transformer ile aynı şekilde @ServiceActivator’da da bir channel’a veri geldiğinde tetiklenmesi sağlanabilir. Burada da invoiceChannel’a veri geldiğinde handler(…) metodunun tetiklenmesini sağlıyoruz.Metot içerisinde gelen Invoice objesindeki alanlar konsola yazdırılıyor.

DEMO

Şimdi ilk olarak Sender uygulamasındaki main metodun olduğu sınıf üzerinden uygulamamızı çalıştıralım.

Aşağıdaki gibi 2 metni konsolda görüyoruz:

Sending first order, two Books & three Perfumes Sending second order, five books & two Perfumes

http://localhost:15672/ adresinden orders que’sunu kontrol ettiğimde kuyrukta 2 adet verinin bekleidğini görüyorum.

Şimdi de Receiver uygulamasını ayağa kaldırıyorum ve konsolu kontrol ediyorum.

Receiver uygulamasının konsolunda da Sender uygulamasından gönderilen ve que’da bekleyen mesajların işlendiğini görüyorum.

Bu yazıda anlatacaklarım da bu kadar arkadaşlar. Başka bir yazıda görüşene kadar sağlıcakla kalın.