Мониторинг сообщений в RabbitMQ +16


Рассмотрим классическую схему построения конвейера сообщений в RabbitMQ состоящую из элементов Producer, Exchange, Queue и Consumer.



Задача состоит в том, что бы организовать мониторинг происходящего в очереди и не затронуть основное программное обеспечение (ПО), добавить гибкую возможность построения отчетов и при этом избежать дополнительных затрат. Итоговая конструкция должна позволять быстро строить отчеты для анализа потока данных на конвейере без использования основных мощностей серверов (что позволит избежать дополнительной нагрузки) и основного ПО. Подход должен быть легко переносимым на более сложную архитектуру.

Прежде всего организуем демонстрационный стенд, для этого внесем следующие изменения в работу конвейера:



Изначально для Exchange (faust) была задана следующая конфигурация, которая не меняется в рассматриваем примере при модификации:



Важным является настройка типа fanaut — которая позволяет создать две равноправные очереди и дублировать весь поток сообщений в новую очередь Statistics:





без какого бы то ни-было вмешательства в основной процесс в очереди Logs. Приступим к обработке потока сообщений. Прежде всего создаем таблицу на MS SQL сервере для хранения статистической информации. Вы можете использовать любой другой подход, например, сохранить сообщения в файл в xml формате или любой другой способ, в рассматриваемом примере выбран SQL сервер для того чтобы избежать дополнительного программирования

create table RabbitMsg(  
id int PRIMARY KEY IDENTITY(1000,1),  
[Message] nvarchar(1000) DEFAULT '',
RegDate datetime default GETDATE()) 

Как видно из SQL запроса это таблица с номером записи, некоторым текстом и датой вставки записи в таблицу.

Создадим клиента, который будет обращаться к RabbitMQ в очередь Statistics, забирать полученные данные и перекладывать их в таблицу RabbitMsg

using System; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 
using System.Text; 
using System.Data.SqlClient; 
  
namespace Getter 
{ 
    class Program 
    { 
        static void Main(string[] args) 
        { 
            var factory = new ConnectionFactory() { HostName = "192.168.1.241", Port = 30672, UserName = "robotics01", Password = "" }; 
            using (var connection = factory.CreateConnection()) 
            using (var channel = connection.CreateModel()) 
            { 
                channel.ExchangeDeclare(exchange: "faust", type: "fanout", durable: true); 
                 
                var queueName  = "Statistics"; 
                channel.QueueBind(queue: queueName, exchange: "faust", routingKey: ""); 
  
                Console.WriteLine(" [*] Waiting for logs."); 
  
                var consumer = new EventingBasicConsumer(channel); 
                consumer.Received += (model, ea) => 
                { 
                    var body = ea.Body; 
                    var message = Encoding.UTF8.GetString(body); 
                    Console.WriteLine(" [x] {0}", message); 
  
                    SqlConnection sqlconnection = new SqlConnection("Server=tcp:fastreportsql,1433;Initial Catalog=FastReportSQL;Persist Security Info=False;User ID=ufocombat;Password=;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;"); 
                    sqlconnection.Open(); 
                    SqlCommand cmd = new SqlCommand($"INSERT INTO RabbitMsg(Message) VALUES (@msg)", sqlconnection); 
                    cmd.Parameters.AddWithValue("msg", message); 
  
                    cmd.ExecuteNonQuery(); 
                    sqlconnection.Close(); 
                }; 
                channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); 
  
                Console.WriteLine(" Press [enter] to exit."); 
                Console.ReadLine(); 
            } 
            Console.WriteLine("Hello World!"); 
        } 
    } 
} 

Посмотрим, как это работает в режиме реального времени


Тем временем на MS SQL Server



Построим отчет по данным очереди Statistics


Вот что получилось:



Заключение


В рассмотренном примере показано как быстро собрать статистику и даже построить отчет, который можно сохранить в PDF и отправить по почте по данным конвейера RabbitMQ и дополнительной очереди. Легко придумать примеры задач, когда информация собирается о каких-либо процессах и строятся отчеты без разработки серверной части. Учитывая, что FastReports предлагает open-source версию, то можно значительно сократить стоимость разработки без дополнительных затрат. Сам конвейер, также легко перенастраивается и может быть приспособлен для более сложных задач.




К сожалению, не доступен сервер mySQL