< Summary

Information
Class: Gateway.Apis.Amqp.RegistrationExtensions.QueueConsumerFactory
Assembly: Gateway
File(s): /home/runner/work/dotnet-microservice/dotnet-microservice/Gateway/Apis/Amqp/RegistrationExtensions/QueueConsumerFactory.cs
Tag: 34_11887803474
Line coverage
0%
Covered lines: 0
Uncovered lines: 54
Coverable lines: 54
Total lines: 98
Line coverage: 0%
Branch coverage
0%
Covered branches: 0
Total branches: 10
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%210%
CreateQueueConsumer(...)100%210%
CreateActivity(...)100%210%
ExtractTraceContextFromBasicProperties(...)0%620%
AddMessagingTags(...)0%7280%

File(s)

/home/runner/work/dotnet-microservice/dotnet-microservice/Gateway/Apis/Amqp/RegistrationExtensions/QueueConsumerFactory.cs

#LineLine coverage
 1using System.Diagnostics;
 2using System.Text;
 3using System.Text.Json;
 4using Microsoft.AspNetCore.Http.Json;
 5using Microsoft.Extensions.Options;
 6using OpenTelemetry;
 7using OpenTelemetry.Context.Propagation;
 8using RabbitMQ.Client;
 9using RabbitMQ.Client.Events;
 10
 11namespace Gateway.Apis.Amqp.RegistrationExtensions
 12{
 13    public static class QueueConsumerFactory
 14    {
 015        private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
 016        private static readonly ActivitySource ActivitySource = new(nameof(QueueConsumerFactory));
 17
 18        public static void CreateQueueConsumer<TMessage>(
 19            IServiceProvider service,
 20            IModel channel,
 21            string queueName
 22        )
 23        {
 024            channel.QueueDeclare(
 025                queue: queueName,
 026                durable: false,
 027                exclusive: false,
 028                autoDelete: false,
 029                arguments: null
 030            );
 031            var consumer = new AsyncEventingBasicConsumer(channel);
 032            consumer.Received += async (model, ea) =>
 033            {
 034                using (Activity? activity = CreateActivity(queueName, ea))
 035                {
 036                    using var serviceScope = service.CreateAsyncScope();
 037                    var jsonOptions = serviceScope
 038                        .ServiceProvider
 039                        .GetRequiredService<IOptions<JsonOptions>>();
 040                    var messageHandler = serviceScope
 041                        .ServiceProvider
 042                        .GetRequiredService<IConsumer<TMessage>>();
 043                    var body = ea.Body.ToArray();
 044                    var message = JsonSerializer.Deserialize<TMessage>(
 045                        ea.Body.ToArray(),
 046                        jsonOptions.Value.SerializerOptions
 047                    );
 048                    await messageHandler.RunAsync(message!, CancellationToken.None);
 049                }
 050            };
 51
 052            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
 053        }
 54
 55        private static Activity? CreateActivity(string queueName, BasicDeliverEventArgs ea)
 56        {
 057            var parentContext = Propagator.Extract(
 058                default,
 059                ea.BasicProperties,
 060                ExtractTraceContextFromBasicProperties
 061            );
 062            Baggage.Current = parentContext.Baggage;
 063            var activityName = $"{ea.RoutingKey} receive";
 064            var activity = ActivitySource.StartActivity(
 065                activityName,
 066                ActivityKind.Consumer,
 067                parentContext.ActivityContext
 068            );
 069            AddMessagingTags(activity, queueName, ea.Exchange);
 070            return activity;
 71        }
 72
 73        private static IEnumerable<string> ExtractTraceContextFromBasicProperties(
 74            IBasicProperties props,
 75            string key
 76        )
 77        {
 078            if (props.Headers.TryGetValue(key, out var value))
 79            {
 080                var bytes = value as byte[];
 081                return [Encoding.UTF8.GetString(bytes!)];
 82            }
 083            return [];
 84        }
 85
 86        private static void AddMessagingTags(
 87            Activity? activity,
 88            string queueName,
 89            string exchangeName
 90        )
 91        {
 092            activity?.SetTag("messaging.system", "rabbitmq");
 093            activity?.SetTag("messaging.destination_kind", "queue");
 094            activity?.SetTag("messaging.destination", queueName);
 095            activity?.SetTag("messaging.rabbitmq.routing_key", exchangeName);
 096        }
 97    }
 98}