RabbitMQ Sending Scheduled/delayed messages Using Spring Boot

Perquisites

Start Rabbit With delay message exchange plugin enabled

Create Project

Create Boot project with following dependencies

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
		    <groupId>com.fasterxml.jackson.datatype</groupId>
		    <artifactId>jackson-datatype-jsr310</artifactId>
		</dependency>

application.properties

# Rabbit
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

rabbitmq.flight.dg.exchange=otel_flight_direct
rabbitmq.flight.received.routingkey=flight.event.received
rabbitmq.flight.received.queue=flight.received.queue

RabbitMQConfig.java

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

	private static final String EXCHANGE_TYPE_DELAY = "x-delayed-message";
	private static final String EXCHANGE_TYPE_DELAY_ARGUMENT = "x-delayed-type";
	private static final String EXCHANGE_TYPE_DELAY_ARGUMENT_DIRECT = "direct";

	@Value("${rabbitmq.flight.dg.exchange}")
	private String exchange;

	@Value("${rabbitmq.flight.received.queue}")
	private String flightQueue;
	@Value("${rabbitmq.flight.received.routingkey}")
	private String flightRoutingKey;

	@Bean
	CustomExchange directExchange() {
		Map<String, Object> args = new HashMap<>();
		args.put(EXCHANGE_TYPE_DELAY_ARGUMENT, EXCHANGE_TYPE_DELAY_ARGUMENT_DIRECT);
		return new CustomExchange(exchange, EXCHANGE_TYPE_DELAY, true, false, args);
	}

	@Bean(name = "flightReceived")
	Queue flightReceivedQueue() {
		return new Queue(flightQueue, true);
	}

	@Bean
	Binding flightReceivedBinding(@Qualifier("flightReceived") Queue queue, CustomExchange exchange) {
		return BindingBuilder
				.bind(queue)
				.to(exchange)
				.with(flightRoutingKey)
				.noargs();
	}

	@Bean
	RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
	    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
	    rabbitTemplate.setMessageConverter(converter);
	    return rabbitTemplate;
	}

	@Bean
	Jackson2JsonMessageConverter producerJackson2MessageConverter() {
	    return new Jackson2JsonMessageConverter();
	}
}

Flight.java

import java.time.LocalDateTime;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;

public class Flight {

	private String origin;
	private String destination;
	private String airline;
	
	@JsonSerialize(using = LocalDateTimeSerializer.class)
	@JsonDeserialize(using = LocalDateTimeDeserializer.class)
	private LocalDateTime departureTime;

	public String getOrigin() {
		return origin;
	}

	public void setOrigin(String origin) {
		this.origin = origin;
	}

	public String getDestination() {
		return destination;
	}

	public void setDestination(String destination) {
		this.destination = destination;
	}

	public String getAirline() {
		return airline;
	}

	public void setAirline(String airline) {
		this.airline = airline;
	}

	public LocalDateTime getDepartureTime() {
		return departureTime;
	}

	public void setDepartureTime(LocalDateTime departureTime) {
		this.departureTime = departureTime;
	}

	@Override
	public String toString() {
		return "Flight [origin=" + origin + ", destination=" + destination + ", airline=" + airline + ", departureTime="
				+ departureTime + "]";
	}	
}

FlightReceivedProducer.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class FlightReceivedProducer {

	private static final Logger LOGGER = LoggerFactory.getLogger(FlightReceivedProducer.class);

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Value("${rabbitmq.flight.dg.exchange}")
	private String exchangeName;

	@Value("${rabbitmq.flight.received.routingkey}")
	private String routingKey;

	public void sendMessage(Flight flight) {
		try {
			rabbitTemplate.convertAndSend(exchangeName, routingKey, flight, new MessagePostProcessor() {
				
				@Override
				public Message postProcessMessage(Message message) throws AmqpException {
					message.getMessageProperties().setDelay(1000);
					return message;
				}
			});
			LOGGER.debug("Sent message for flight");
		} catch (Exception e) {
			LOGGER.error("Unable to send Message ", e);
		}
	}
}

FlightMQConsumer.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FlightMQConsumer {

	private static final Logger LOGGER = LoggerFactory.getLogger(FlightMQConsumer.class);

	@RabbitListener(queues = "#{'${rabbitmq.flight.received.queue}'}")
	//public void consumeMessage(Flight flight, @Header(name = "receivedDelay", required = false) Integer delay) {
	public void consumeMessage(Flight flight, Message message) {
		try {
			LOGGER.trace("Message received, after delay of {} : {}", message.getMessageProperties().getReceivedDelay(), flight);

		} catch (Exception e) {
			LOGGER.error("Unnable to process the Message", e);
		}
	}

}

RabbitDelayMessageApplication.java

import java.time.LocalDateTime;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.mnadeem.app.flight.messaging.Flight;
import com.mnadeem.app.flight.messaging.FlightReceivedProducer;

@SpringBootApplication
public class RabbitDelayMessageApplication implements CommandLineRunner {
	
	@Autowired
	private FlightReceivedProducer producer;

	public static void main(String[] args) {
		SpringApplication.run(RabbitDelayMessageApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		producer.sendMessage(flight());		
	}

	private Flight flight() {
		Flight flight = new Flight();
		flight.setAirline("1");
		flight.setDepartureTime(LocalDateTime.now());
		flight.setDestination("d");
		return flight;
	}
}

Run the Application

Message delivered to exchange immediately, however delivered to queue after the delay.

Exchange Definition

{
            "name": "otel_flight_direct",
            "vhost": "/",
            "type": "x-delayed-message",
            "durable": true,
            "auto_delete": false,
            "internal": false,
            "arguments": {
                "x-delayed-type": "direct"
            }
        }

References

Adding Another Plugin To Rabbit Management Docker Image

You are using RabbitMQ management plugin ( rabbitmq:3.8.27-management) and you are getting the following error enabling rabbitmq_delayed_message_exchange

{:plugins_not_found, [:rabbitmq_delayed_message_exchange]}

Command to enable rabbitmq_delayed_message_exchange

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Perquisites

Create folder structure

Dockerfile

FROM rabbitmq:3.8.27-management

RUN apt-get -o Acquire::Check-Date=false update && apt-get install -y curl

RUN curl -L https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez > $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-3.8.17.ez

RUN chown rabbitmq:rabbitmq $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-3.8.17.ez

#RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange

enabled_plugins

[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus].

definitions.json

{
   "users": [
    {
      "name": "admin",
      "password": "admin",
      "tags": "administrator"
    }
  ],
  "vhosts": [
    {
      "name": "/"
    }
  ],
  "policies": [
    {
      "vhost": "/",
      "name": "ha",
      "pattern": "",
      "apply-to": "all",
      "definition": {
        "ha-mode": "all",
        "ha-sync-batch-size": 256,
        "ha-sync-mode": "automatic"
      },
      "priority": 0
    }
  ],
  "permissions": [
    {
      "user": "admin",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "queues": [
    {
      "name": "job-import.triggered.queue",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {}
    }
  ],
  "exchanges": [
    {
      "name": "lob-proj-dx",
      "vhost": "/",
      "type": "direct",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ],
  "bindings": [
    {
      "source": "lob-proj-dx",
      "vhost": "/",
      "destination": "job-import.triggered.queue",
      "destination_type": "queue",
      "routing_key": "job-import.event.triggered",
      "arguments": {}
    }
  ]
}

rabbitmq.conf

auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN
loopback_users.guest = false
listeners.tcp.default = 5672
#default_pass = admin
#default_user = admin
hipe_compile = false
#management.listener.port = 15672
#management.listener.ssl = false
management.tcp.port = 15672
management.load_definitions = /etc/rabbitmq/definitions.json
#default_pass = admin
#default_user = admin

Build Custom Docker image

docker build -t rabbitmq:3.8.27-md .
docker image ls

Run the custom Docker Image

docker run --restart=always -d -p 5672:5672 -p 15672:15672 --mount type=bind,source=E:\docker\rabbit\data,target=/var/lib/rabbitmq/ --mount type=bind,source=E:\docker\rabbit\etc,target=/etc/rabbitmq/ --name rabbitmq --hostname my-rabbit rabbitmq:3.8.27-md

Validate the plugin enabled

 docker exec -it 9b0bffed911e bash

Start Options

Directly Running

docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8.27-management

docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8.27-md

 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Docker Compose

version: "3.8"

services:
  rabbitmq:
    build: .
    #image: rabbitmq:3.7.28-management
    #container_name: rabbitmq
    volumes:
        - ./etc/:/etc/rabbitmq/
        - ./data/:/var/lib/rabbitmq/
        - ./logs/:/var/log/rabbitmq/
    environment:
        RABBITMQ_ERLANG_COOKIE: ${RABBITMQ_ERLANG_COOKIE:-secret_cookie}
        RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER:-admin}
        RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS:-admin}
    ports:
        - 5672:5672    #amqp
        - 15672:15672  #http
        - 15692:15692  #prometheus

Either

docker-compose up

Or

docker swarm init
docker stack deploy --compose-file docker-compose.yaml rabbit-delay
docker stack services rabbit-delay
docker stack rm rabbit-delay

References

Hashi Vault And NodeJs Application

node-vault

.env

NODE_TLS_REJECT_UNAUTHORIZED=0
SKIP_PREFLIGHT_CHECK=true

VAULT_SKIP_VERIFY=true
VAULT_ENDPOINT=https://enterprise.serer.org.com
VAULT_ROLE_ID=g1e85bfa-cwea-10b0-a91d-e505t255c6c0
VAULT_SECRET_ID=cd4cd268-dde4-1664-0e85-7a298555fd04

Asynchronous version

const vault = require('node-vault')({
    apiVersion: 'v1',
    endpoint: process.env.VAULT_ENDPOINT,
    namespace: 'NS/PROD',
})

vault
    .approleLogin({
        role_id: process.env.VAULT_ROLE_ID,
        secret_id: process.env.VAULT_SECRET_ID,
    })
    .then((result) => {
        console.log('Start Reading', result)
        vault.token = result.auth.client_token

        vault
            .read('kv/data/mysql/webapp')
            .then((result) => {
                console.log('Read webapp', result)
            })
            .catch((err) => console.error('webapp error', err))

        vault
            .read('ad/creds/app_prod')
            .then((result) => {
                console.log('Read app_prod', result)
            })
            .catch((err) =>
                console.error('app_proderror', JSON.stringify(err))
            )
    })
    .catch((err) => console.error('Final error', JSON.stringify(err)))

const readVault = async (key) => {
    const { data } = await vault.read(key)
    return data
}

module.exports = {
    readVault,
}

Synchronous Version


const vault = require('node-vault')({
    apiVersion: 'v1',
    endpoint: process.env.VAULT_ENDPOINT,
    namespace: 'NS/PROD',
})

const init = async () => {
  const result = await vault.approleLogin({
    role_id: process.env.VAULT_ROLE_ID,
    secret_id: process.env.VAULT_SECRET_ID,
  });

  vault.token = result.auth.client_token; // Add token to vault object for subsequent requests.

  await sampleRuns(); // This is for testing purpose only, remove it
};

const readVault = async (key) => {
    const { data } = await vault.read(key);
    return data;
}

const sampleRuns = async () => {
    const db = await readVault("kv/data/mysql/webapp");

    const databaseName = db.data.db_name;
    const username = db.data.username;
    const password = db.data.password;
  
    console.log({
      databaseName,
      username,
      password,
    });
  
    const  nas  = await readVault("ad/creds/app_prod");
    const currentPassword = nas.current_password;
    const lastPassword = nas.last_password;
    const nasUsername = nas.username;
  
    console.log({
      nasUsername,
      currentPassword,
      lastPassword,
    });
}

init();

module.exports = {
    readVault,
}


hashi-vault-js

.env

NODE_TLS_REJECT_UNAUTHORIZED=0
SKIP_PREFLIGHT_CHECK=true

VAULT_SKIP_VERIFY=true
VAULT_ENDPOINT=https://enterprise.serer.org.com/v1
VAULT_ROLE_ID=g1e85bfa-cwea-10b0-a91d-e505t255c6c0
VAULT_SECRET_ID=cd4cd268-dde4-1664-0e85-7a298555fd04

Asynchronous version

const Vault = require('hashi-vault-js');

const vault = new Vault( {
    https: false,
    baseUrl: process.env.VAULT_ENDPOINT,
    //rootPath: 'PROD',
    timeout: 2000,
    proxy: false,
    // Only for Vault Enterprise
    namespace: 'NS/PROD'
});

let token;

vault.healthCheck().then(status => {
    if (!status.sealed) {
        console.log('Vault Status: ', status)
        vault.loginWithAppRole(process.env.VAULT_ROLE_ID, process.env.VAULT_SECRET_ID)
        .then(data => {
            token = data.client_token
            console.log('server token ', token)

            vault
                .readKVSecret(token, "mysql/webapp", null, 'kv')
                .then(data => {
                    console.log('webapp ', data)
                }).catch((error) => {
                    console.log('webapp error ', error)
                });

        }).catch((error) => {
            console.log('Login error ', error)
        });
    }
}).catch((error) => {
    console.log('Health Check error ', error)
})

RabbitMQ Up & Running in Kubernetes 1.20+

rbac.yaml

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: rabbitmq
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: rabbitmq
rules:
- apiGroups: [""]
  resources: ["endpoints"]
  verbs: ["get"]
#- apiGroups: [""]
#  resources: ["events"]
#  verbs: ["create"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: rabbitmq
subjects:
- kind: ServiceAccount
  name: rabbitmq
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: rabbitmq

configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: rabbitmq-config
data:
  RMQ_ADMIN_USER: admin
  enabled_plugins: |
    [rabbitmq_peer_discovery_k8s, rabbitmq_management, rabbitmq_prometheus].
  rabbitmq.conf: |
    ## Clustering
    #cluster_formation.peer_discovery_backend = k8s
    cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s    
    cluster_formation.k8s.host = <Your Cluster host>
    cluster_formation.k8s.port = 443
    #cluster_formation.k8s.host = kubernetes.default.svc.cluster.local
    cluster_formation.k8s.address_type = hostname
    cluster_formation.k8s.service_name = rabbitmq-headless
    cluster_partition_handling = autoheal

    #cluster_formation.k8s.hostname_suffix = .${CLUSTER_NAME}.${NAMESPACE}.svc.cluster.local
    #cluster_formation.node_cleanup.interval = 10
    #cluster_formation.node_cleanup.only_log_warning = true
    
    ## queue master locator
    queue_master_locator=min-masters
    loopback_users.guest = false

    auth_mechanisms.1 = PLAIN
    auth_mechanisms.2 = AMQPLAIN

    ## set max memory available to MQ
    #vm_memory_high_watermark.absolute = 1GB
    vm_memory_high_watermark.absolute = 900MB
    ## load definitions file
    management.load_definitions = /etc/rabbitmq/definitions.json

    management.path_prefix = /mqadmin

definitions.json

{
  "users": [
    {
      "name": "proj_mq_dev",
      "password": "<PWD>",
      "tags": ""
    },
    {
      "name": "admin",
      "password": "<PWD>",
      "tags": "administrator"
    }
  ],
  "vhosts": [
    {
      "name": "/"
    }
  ],
  "policies": [
    {
      "vhost": "/",
      "name": "ha",
      "pattern": "",
      "apply-to": "all",
      "definition": {
        "ha-mode": "all",
        "ha-sync-batch-size": 256,
        "ha-sync-mode": "automatic"
      },
      "priority": 0
    }
  ],
  "permissions": [
    {
      "user": "proj_mq_dev",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "admin",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "queues": [
    {
      "name": "lookup-import.triggered.queue",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {}
    },
   
  ],
  "exchanges": [
    {
      "name": "lob-proj-dx",
      "vhost": "/",
      "type": "direct",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ],
  "bindings": [
    {
      "source": "lob-proj-dx",
      "vhost": "/",
      "destination": "lookup-import.triggered.queue",
      "destination_type": "queue",
      "routing_key": "lookup-import.event.triggered",
      "arguments": {}
    }
  ]
}

secrets.yaml

apiVersion: v1
kind: Secret
metadata:
  name: rabbitmq-secrets
type: Opaque
data:
  RMQ_ERLANG_COOKIE: Wm1GclpWOXdZWA==
  definitions.json: >-
    <Base 64 encoded definitions.json>


client-service-ci.yaml

kind: Service
apiVersion: v1
metadata:
  name: rabbitmq-client
  labels:
    app: rabbitmq
spec:
  type: ClusterIP
  ports:
   - name: http
     protocol: TCP
     port: 15672
     targetPort: management
   - name: prometheus
     protocol: TCP
     port: 15692
     targetPort: prometheus
   - name: amqp
     protocol: TCP
     port: 5672
     targetPort: amqp
  selector:
    app: rabbitmq

client-service-lb.yaml

kind: Service
apiVersion: v1
metadata:
  name: rabbitmq-client
  labels:
    app: rabbitmq
    type: LoadBalancer
spec:
  type: LoadBalancer
  sessionAffinity: None
  loadBalancerIP: <External IP Address>
  externalTrafficPolicy: Cluster
  ports:
   - name: http
     protocol: TCP
     port: 15672
     targetPort: management
   - name: prometheus
     protocol: TCP
     port: 15692
     targetPort: prometheus
   - name: amqp
     protocol: TCP
     port: 5672
     targetPort: amqp
  selector:
    app: rabbitmq

statefulset.yaml

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rabbitmq
spec:
  selector:
    matchLabels:
      app: "rabbitmq"
  # headless service that gives network identity to the RMQ nodes, and enables them to cluster
  serviceName: rabbitmq-headless # serviceName is the name of the service that governs this StatefulSet. This service must exist before the StatefulSet, and is responsible for the network identity of the set. Pods get DNS/hostnames that follow the pattern: pod-specific-string.serviceName.default.svc.cluster.local where "pod-specific-string" is managed by the StatefulSet controller.
  replicas: 1
  revisionHistoryLimit: 2
  volumeClaimTemplates:
  - metadata:
      name: rabbitmq-data
    spec:
      storageClassName: nas-thin
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: "6Gi"
  template:
    metadata:
      name: rabbitmq
      labels:
        app: rabbitmq
      annotations:
        prometheus.io/scrape: "true"
        #prometheus.io/port: "9090"
        prometheus.io/path: "/metrics"
    spec:
      initContainers:
      # Since k8s 1.9.4, config maps mount read-only volumes. Since the Docker image also writes to the config file,
      # the file must be mounted as read-write. We use init containers to copy from the config map read-only
      # path, to a read-write path
      - name: "rabbitmq-config"
        image: busybox:1.32.0
        volumeMounts:
        - name: rabbitmq-config
          mountPath: /tmp/rabbitmq
        - name: rabbitmq-config-rw
          mountPath: /etc/rabbitmq
        - name: mq-secret-def
          mountPath: /tmp/rabbitsec
        command:
        - sh
        - -c
        # the newline is needed since the Docker image entrypoint scripts appends to the config file
        - cp /tmp/rabbitmq/rabbitmq.conf /etc/rabbitmq/rabbitmq.conf && echo '' >> /etc/rabbitmq/rabbitmq.conf;
          cp /tmp/rabbitmq/enabled_plugins /etc/rabbitmq/enabled_plugins;
          cp /tmp/rabbitsec/definitions.json /etc/rabbitmq/definitions.json
        securityContext:
          runAsUser: 1001
          runAsGroup: 53134
          allowPrivilegeEscalation: false
          capabilities:
            drop:
            - KILL
            - MKNOD
            - SYS_CHROOT
        resources:
            requests:
              memory: 64Mi
              cpu: 60m
            limits:
              memory: 64Mi
              cpu: 60m
      volumes:
      - name: rabbitmq-config
        configMap:
          name: rabbitmq-config
          optional: false
          items:
          - key: enabled_plugins
            path: "enabled_plugins"
          - key: rabbitmq.conf
            path: "rabbitmq.conf"
      - name: mq-secret-def
        secret:
          secretName: rabbitmq-secrets
          items:
            - key: definitions.json
              path: definitions.json
      # read-write volume into which to copy the rabbitmq.conf and enabled_plugins files
      # this is needed since the docker image writes to the rabbitmq.conf file
      # and Kubernetes Config Maps are mounted as read-only since Kubernetes 1.9.4
      - name: rabbitmq-config-rw
        emptyDir:
          sizeLimit: 1Mi
      - name: rabbitmq-data
        persistentVolumeClaim:
          claimName: rabbitmq-data
      serviceAccount: rabbitmq
      containers:
      - name: rabbitmq
        # Community Docker Image
        image: rabbitmq:3.8-management
        volumeMounts:
        # mounting rabbitmq.conf and enabled_plugins
        # this should have writeable access, this might be a problem
        - name: rabbitmq-config-rw
          mountPath: "/etc/rabbitmq"
          # mountPath: "/etc/rabbitmq/conf.d/"
        # rabbitmq data directory
        - name: rabbitmq-data
          mountPath: "/var/lib/rabbitmq/mnesia"
        env:
        - name: RABBITMQ_DEFAULT_USER
          value: "admin"
        - name: RABBITMQ_ERLANG_COOKIE
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secrets
              key: RMQ_ERLANG_COOKIE
        ports:
        - name: amqp
          containerPort: 5672
          protocol: TCP
        - name: management
          containerPort: 15672
          protocol: TCP
        - name: metrics
          containerPort: 15692
          protocol: TCP
        - name: prometheus
          containerPort: 15692
          protocol: TCP
        - name: epmd
          containerPort: 4369
          protocol: TCP
        resources:
            requests:
              memory: 1Gi
              cpu: '2'
            limits:
              memory: 1Gi
              cpu: '2'
        livenessProbe:
          exec:
            # This is just an example. There is no "one true health check" but rather
            # several rabbitmq-diagnostics commands that can be combined to form increasingly comprehensive
            # and intrusive health checks.
            # Learn more at https://www.rabbitmq.com/monitoring.html#health-checks.
            #
            # Stage 2 check:
            command: ["rabbitmq-diagnostics", "status"]
          initialDelaySeconds: 120
          # See https://www.rabbitmq.com/monitoring.html for monitoring frequency recommendations.
          periodSeconds: 60
          timeoutSeconds: 15
        readinessProbe: # probe to know when RMQ is ready to accept traffic
          exec:
            # This is just an example. There is no "one true health check" but rather
            # several rabbitmq-diagnostics commands that can be combined to form increasingly comprehensive
            # and intrusive health checks.
            # Learn more at https://www.rabbitmq.com/monitoring.html#health-checks.
            #
            # Stage 1 check:
            command: ["rabbitmq-diagnostics", "ping"]
          initialDelaySeconds: 20
          periodSeconds: 60
          timeoutSeconds: 10
              # The Docker image runs as the `rabbitmq` user with uid 999 
        # and writes to the `rabbitmq.conf` file
        # The security context is needed since the image needs
        # permission to write to this file. Without the security 
        # context, `rabbitmq.conf` is owned by root and inaccessible
        # by the `rabbitmq` user
        securityContext:
          runAsUser: 1001
          runAsGroup: 53134
          allowPrivilegeEscalation: false
          capabilities:
            drop:
            - KILL
            - MKNOD
            - SYS_CHROOT 

Running Preconfigured RabbitMQ Server Using Docker In Windows

Create Two folders, data and etc

enabled_plugins

[rabbitmq_management,rabbitmq_prometheus].

rabbitmq.conf

auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN
loopback_users.guest = false
listeners.tcp.default = 5672
#default_pass = admin
#default_user = admin
hipe_compile = false
#management.listener.port = 15672
#management.listener.ssl = false
management.tcp.port = 15672
management.load_definitions = /etc/rabbitmq/definitions.json
#default_pass = admin
#default_user = admin

definitions.json

{
   "users": [
    {
      "name": "admin",
      "password": "admin",
      "tags": "administrator"
    }
  ],
  "vhosts": [
    {
      "name": "/"
    }
  ],
  "policies": [
    {
      "vhost": "/",
      "name": "ha",
      "pattern": "",
      "apply-to": "all",
      "definition": {
        "ha-mode": "all",
        "ha-sync-batch-size": 256,
        "ha-sync-mode": "automatic"
      },
      "priority": 0
    }
  ],
  "permissions": [
    {
      "user": "admin",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "queues": [
    {
      "name": "job-import.triggered.queue",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {}
    }
  ],
  "exchanges": [
    {
      "name": "lob-proj-dx",
      "vhost": "/",
      "type": "direct",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ],
  "bindings": [
    {
      "source": "lob-proj-dx",
      "vhost": "/",
      "destination": "job-import.triggered.queue",
      "destination_type": "queue",
      "routing_key": "job-import.event.triggered",
      "arguments": {}
    }
  ]
}

docker run --restart=always -d -p 5672:5672 -p 15672:15672 --mount type=bind,source=E:\docker\rabbit\data,target=/var/lib/rabbitmq/ --mount type=bind,source=E:\docker\rabbit\etc,target=/etc/rabbitmq/ --name rabbitmq --hostname my-rabbit rabbitmq:3.7.28-management
docker run --restart=always \
-d \
-p 5672:5672 \
-p 15672:15672  \
--mount type=bind,source=E:\docker\rabbit\data,target=/var/lib/rabbitmq/ \
--mount type=bind,source=E:\docker\rabbit\etc,target=/etc/rabbitmq/ \
--name rabbitmq \
--hostname my-rabbit \
rabbitmq:3.7.28-management

Things are persisted across restarts

Running MySQL

docker run --name my-mysql -e MYSQL_ROOT_PASSWORD=Admin@123 -d -p 3306:3306 mysql

Spring Boot Data JPA Multiple Data Sources

aplication.properties

# Data sources
abc.datasource.url=jdbc:mysql://${GFR_DB_HOST}:${GFR_DB_PORT}/abc
abc.datasource.driverClassName=com.mysql.cj.jdbc.Driver
abc.datasource.username=${GFR_DB_USER}
abc.datasource.password=${GFR_DB_PASS}

# Hibernate props
abc.hibernate.hbm2ddl.auto=update
abc.hibernate.dialect=org.hibernate.dialect.MySQL5InnoDBDialect
abc.hibernate.show_sql=true

### Connection Pool Details
#abc.datasource.hikari.connectionTimeout=20000
#abc.datasource.hikari.maximumPoolSize=5
#abc.datasource.hikari.connection-test-query=SELECT 1
#abc.datasource.hikari.minimum-idle=5 
#abc.datasource.hikari.maximum-pool-size=20
#abc.datasource.hikari.idle-timeout=600000 
#abc.datasource.hikari.max-lifetime=1800000 
#abc.datasource.hikari.auto-commit=true
#abc.datasource.hikari.poolName=SpringBoot-HikariCP

Repository Config

import java.util.HashMap;
import java.util.Map;

import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.data.envers.repository.config.EnableEnversRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.context.annotation.Primary;

import com.zaxxer.hikari.HikariDataSource;

@Configuration
@EnableEnversRepositories(basePackages = {"com.org.lob.abc.membrer.repository"},
		entityManagerFactoryRef = "abcEntityManagerFactory",
		transactionManagerRef = "abcTransactionManager")
public class ABCRepositoryConfig {
	@Autowired
	private Environment env;

    @Primary
	@Bean(name = "abcDataSourceProperties")
	@ConfigurationProperties("abc.datasource")
	DataSourceProperties dataSourceProperties() {
		return new DataSourceProperties();
	}
	
    @Primary
	@Bean(name = "abcDataSource")
	DataSource dataSource(@Qualifier("abcDataSourceProperties") DataSourceProperties dataSourceProperties) {
		return dataSourceProperties
				.initializeDataSourceBuilder()
				.type(HikariDataSource.class)
				.build();
	}
	
    @Primary
	@Bean(name = "abcEntityManagerFactory")
	LocalContainerEntityManagerFactoryBean entityManagerFactory(EntityManagerFactoryBuilder builder, @Qualifier("abcDataSource") DataSource dataSource) {
		return builder
				.dataSource(dataSource)
				.packages("com.org.lob.abc.membrer.repository")
				.persistenceUnit("abc")
				.properties(abcJpaProperties())
				.build();
	}

	private Map<String, String> abcJpaProperties() {
		Map<String, String> jpaProperties = new HashMap<>();

		jpaProperties.put("hibernate.dialect", env.getProperty("abc.hibernate.dialect"));
		jpaProperties.put("hibernate.hbm2ddl.auto", env.getProperty("abc.hibernate.hbm2ddl.auto"));
		jpaProperties.put("hibernate.show_sql", env.getProperty("abc.hibernate.show_sql"));

		return jpaProperties;
	}
	
    @Primary
	@Bean(name = "abcTransactionManager")
	PlatformTransactionManager transactionManager(@Qualifier("abcEntityManagerFactory") EntityManagerFactory entityManagerFactory) {
		return new JpaTransactionManager(entityManagerFactory);
	}

}

You can add another config

# Data sources
pbm.datasource.url=jdbc:mysql://${GFR_DB_HOST}:${GFR_DB_PORT}/${GFR_DB_NAME}
pbm.datasource.driverClassName=com.mysql.cj.jdbc.Driver
pbm.datasource.username=${GFR_DB_USER}
pbm.datasource.password=${GFR_DB_PASS}

# Hibernate props
pbm.hibernate.hbm2ddl.auto=update
pbm.hibernate.dialect=org.hibernate.dialect.MySQL5InnoDBDialect
pbm.hibernate.show_sql=true

Repository Config

import java.util.HashMap;
import java.util.Map;

import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.data.domain.AuditorAware;
import org.springframework.data.envers.repository.config.EnableEnversRepositories;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;

import com.zaxxer.hikari.HikariDataSource;

@Configuration
@EnableJpaAuditing
@EnableEnversRepositories(basePackages = {"com.org.lob.pbm.member.repository"},
		entityManagerFactoryRef = "pbmEntityManagerFactory",
		transactionManagerRef = "pbmTransactionManager")
public class PBMRepositoryConfig {

	@Autowired
	private Environment env;

	
	@Bean(name = "pbmDataSourceProperties")
	@ConfigurationProperties("pbm.datasource")
	DataSourceProperties dataSourceProperties() {
		return new DataSourceProperties();
	}
	
	
	@Bean(name = "pbmDataSource")
	DataSource dataSource(@Qualifier("pbmDataSourceProperties") DataSourceProperties dataSourceProperties) {
		return dataSourceProperties
				.initializeDataSourceBuilder()
				.type(HikariDataSource.class)
				.build();
	}
	
	
	@Bean(name = "pbmEntityManagerFactory")
	LocalContainerEntityManagerFactoryBean entityManagerFactory(EntityManagerFactoryBuilder builder, @Qualifier("pbmDataSource") DataSource dataSource) {
		return builder
				.dataSource(dataSource)
				.packages("com.org.lob.pbm.member.repository")
				.persistenceUnit("pbm")
				.properties(pbmJpaProperties())
				.build();
	}

	private Map<String, String> pbmJpaProperties() {
		Map<String, String> jpaProperties = new HashMap<>();

		jpaProperties.put("hibernate.dialect", env.getProperty("pbm.hibernate.dialect"));
		jpaProperties.put("hibernate.hbm2ddl.auto", env.getProperty("pbm.hibernate.hbm2ddl.auto"));
		jpaProperties.put("hibernate.show_sql", env.getProperty("pbm.hibernate.show_sql"));
		//jpaProperties.put("hibernate.naming.physical-strategy", env.getProperty("org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy"));

		return jpaProperties;
	}
	
	
	@Bean(name = "pbmTransactionManager")
	PlatformTransactionManager transactionManager(@Qualifier("pbmEntityManagerFactory") EntityManagerFactory entityManagerFactory) {
		return new JpaTransactionManager(entityManagerFactory);
	}

	@Bean
	AuditorAware<String> auditorProvider() {
		return new RequestAttributeAuditorAware();
	}
}

Spring Boot With H2 DataBase

https://start.spring.io/

In Memory

spring.datasource.url=jdbc:h2:mem:testdb
#spring.datasource.url=jdbc:h2:file:~/test
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=password
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect

# Enabling H2 Console
spring.h2.console.enabled=true
# Custom H2 Console UR
#spring.h2.console.path=/h2-console
# Whether to enable trace output.
#spring.h2.console.settings.trace=false
# Whether to enable remote access.
#spring.h2.console.settings.web-allow-others=false

http://localhost:8080/h2-console

File DB

#spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.url=jdbc:h2:file:~/test;DB_CLOSE_ON_EXIT=FALSE;AUTO_SERVER=TRUE
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=password
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect

Connect with DBeaver

Edit Driver Settings

Other related properties

spring.jpa.show-sql=true
spring.jpa.generate-ddl=true
spring.jpa.hibernate.ddl-auto=update
#spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
#spring.jpa.properties.hibernate.type=trace
#spring.jpa.properties.hibernate.show_sql=true
#spring.jpa.properties.hibernate.format_sql=true
#spring.jpa.properties.hibernate.use_sql_comments=false

## default connection pool
spring.datasource.hikari.connectionTimeout=20000
spring.datasource.hikari.maximumPoolSize=5
#spring.datasource.hikari.connection-test-query=SELECT 1
#spring.datasource.hikari.minimum-idle=5 
#spring.datasource.hikari.maximum-pool-size=20
#spring.datasource.hikari.idle-timeout=600000 
#spring.datasource.hikari.max-lifetime=1800000 
#spring.datasource.hikari.auto-commit=true
#spring.datasource.hikari.poolName=SpringBoot-HikariCP

Spring Boot Caching With Ehcache3

Dependencies

Add the following to your pom

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-cache</artifactId>
		</dependency>
		<dependency>
		    <groupId>org.ehcache</groupId>
		    <artifactId>ehcache</artifactId>
		</dependency>
		<dependency>
		    <groupId>javax.cache</groupId>
		    <artifactId>cache-api</artifactId>
		</dependency>

Cache configuration

Keep the following ehcache.xml under src/main/resources

<?xml version="1.0" encoding="UTF-8"?>
<config xmlns='http://www.ehcache.org/v3'>

	<persistence directory="${java.io.tmpdir}" />

	<!-- Default cache template -->
	<cache-template name="default">
		<expiry>
			<tti unit="hours">4</tti>
			<!-- <ttl unit="minutes">2</ttl> -->
		</expiry>
		<listeners>
			<listener>
				<class>com.org.lob.support.LoggingTaskCacheListener</class>
				<event-firing-mode>ASYNCHRONOUS</event-firing-mode>
				<event-ordering-mode>UNORDERED</event-ordering-mode>
				<events-to-fire-on>CREATED</events-to-fire-on>
				<events-to-fire-on>EXPIRED</events-to-fire-on>
				<events-to-fire-on>REMOVED</events-to-fire-on>
				<events-to-fire-on>UPDATED</events-to-fire-on>
			</listener>
		</listeners>
		<resources>
			<heap unit="MB">10</heap>
			<offheap unit="MB">50</offheap>
			<disk persistent="true" unit="GB">1</disk>
		</resources>
		<!-- 
		<heap-store-settings>
			<max-object-graph-size>2000</max-object-graph-size>
			<max-object-size unit="kB">5</max-object-size>
		</heap-store-settings>
		-->
	</cache-template>

	<!-- Cache configurations -->
	<cache alias="books" uses-template="default" >
		<key-type>java.lang.String</key-type>
		<value-type>com.org.lob.project.repository.entity.Book</value-type>		
	</cache>

	<cache alias="files" uses-template="default" >
		<key-type>java.lang.String</key-type>
		<value-type>java.lang.String</value-type>		
	</cache>

</config>

Spring Boot Config

Add the following to your application.properties

# Cache
spring.cache.jcache.config=classpath:ehcache.xml

Add the following spring boot config

import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableCaching
public class CacheConfig {
	
}

Start Caching

@Component
@CacheConfig(cacheNames = "books")
public class SimpleBookRepository implements BookRepository {
	
	@Cacheable
	@Override
	public Book getByIsbn(String isbn) {
		simulateSlowService();
		return new Book(isbn, "Some book");
	}

	// Don't do this at home
	private void simulateSlowService() {
		try {
			long time = 3000L;
			Thread.sleep(time);
		} catch (InterruptedException e) {
			throw new IllegalStateException(e);
		}
	}
}

@Component
@CacheConfig(cacheNames = "files")
public class SimpleFileRepository implements FileRepository {

	private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFileRepository.class);

	@Override
	@Cacheable
	public String load(String project) {
		return asString(new FileSystemResource(project));
	}

	@Override
	@CachePut
	public String reLoad(String project) {
		return asString(new FileSystemResource(project));
	}

	private String asString(Resource resource) {
        try (Reader reader = new InputStreamReader(resource.getInputStream(), UTF_8)) {
            return FileCopyUtils.copyToString(reader);
        } catch (IOException e) {
        	LOGGER.error("Error Proessing ", e);
            throw new UncheckedIOException(e);
        }
    }
}

Source Code

Download the source code from github

Add Performance Monitor Aspect To Spring Boot Application

Add Aspect

package com.lob.proj.config;

import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.aop.Advisor;
import org.springframework.aop.aspectj.AspectJExpressionPointcut;
import org.springframework.aop.interceptor.PerformanceMonitorInterceptor;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;

@Configuration
@EnableAspectJAutoProxy
@Aspect
public class AspectConfig {

	@Pointcut("execution(public * com.lob.proj.api.CustomerApi.*(..))")
	public void monitor() {
	}

	@Bean
	PerformanceMonitorInterceptor performanceMonitorInterceptor() {
		return new PerformanceMonitorInterceptor(false);
	}

	@Bean
	Advisor performanceMonitorAdvisor(PerformanceMonitorInterceptor performanceMonitorInterceptor) {
		AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
		pointcut.setExpression("com.lob.proj.config.AspectConfig.monitor()");
		return new DefaultPointcutAdvisor(pointcut, performanceMonitorInterceptor);
	}
}

Add Logging level

logging.level.org.springframework.aop.interceptor.PerformanceMonitorInterceptor=TRACE

Kubernetes CronJob To Delete Expired Files

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: delete-older-data-files
spec:
  schedule: "0 * * * *"
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      ttlSecondsAfterFinished: 3600
      template:
        spec:
          securityContext: 
            fsGroup: 53134
            runAsUser: 1001
          containers:
          - name: delete-older-data-files
            image: docker.repo1.uhc.com/busybox:1.32.0
            command: ["/bin/sh", "-c"]
            args:
              - echo starting;
                find /app/data/ -mindepth 1 -type f -mmin +120 -print -delete;
                find /app/data/imports -mindepth 1 -empty -type d -mmin +120 -print -delete;
                find /app/data/exports -mindepth 1 -empty -type d -mmin +120 -print -delete;
                echo done;
            volumeMounts:
            - name: app-data-volume
              mountPath: "/app/data"
          restartPolicy: OnFailure
          volumes:
          - name: app-data-volume
            persistentVolumeClaim:
              claimName: app-data

find /path/to/directory/ -mindepth 1 -mtime +5 -delete

References