The outbox pattern is a proven and scalable software design pattern often used in a distributed environment to publish events reliably and enforce data consistency. This article presents an overview of the outbox pattern and examines how it can be implemented using Kafka and C# in ASP.NET Core applications.

If you're to work with the code examples discussed in this article, you need the following installed in your system:

  • Visual Studio 2022
  • .NET 8.0
  • ASP.NET Core 8.0 Runtime

If you don't already have Visual Studio 2022 installed on your computer, you can download it from here: https://visualstudio.microsoft.com/downloads/.

In this article, I'll examine the following points:

  • An overview of the outbox pattern and its benefits and downsides
  • An introduction to Kafka as a message broker
  • How the transaction outbox pattern works
  • How to implement the outbox pattern in ASP.NET Core, Kafka, and C#
  • How to implement the consumer application to consume messages stored in Kafka

A Real-World Use Case

Typically, microservices-based applications maintain their own data store to persist and retrieve data. This strategy helps improve the application's scalability and promotes agility in the deployment process. Additionally, these applications take advantage of messaging to facilitate asynchronous communication between the services. There are several messaging solutions for you to choose from such as Kafka, Azure Service Bus, RabbitMQ, etc.

Understanding the Problem

Consider the flow of a typical Order Processing application at a glance:

  1. The client application sends a request to the API to persist the data to the underlying database.
  2. The API validates the incoming request, executes the necessary business logic, and persists the data in the database, as shown in Figure 1.
Figure 1: The flow of a typical Order Processing application
Figure 1: The flow of a typical Order Processing application
  1. Once the data is stored in the database, the API calls a message broker to publish the event so that other services can consume it if needed, as shown in Figure 2.
Figure 2: The API calls the message broker to publish the event.
Figure 2: The API calls the message broker to publish the event.

What's wrong with this? Although the flow may appear straightforward, complications can arise. For instance, if the message broker is unavailable, there's an error in publishing the messages or a runtime exception occurs in the application. As shown in Figure 3, the ordering service and the data can become inconsistent.

Figure 3: An error in publishing messages
Figure 3: An error in publishing messages

Addressing this issue can be challenging, as writing code to solve it may lead to a violation of the Single Responsibility Principle, potentially requiring additional responsibilities for the OrderService. Here's where the transactional outbox pattern comes to the rescue.

An Overview of the Outbox Pattern

The outbox pattern is a technique used in microservices architectures to guarantee reliable communication between services. For this purpose, a temporary storage area is created in the database of a microservice, often referred to as an outbox table, in which the outgoing data changes are recorded. The data is serialized before being sent from the outbox table to other external systems or services. In distributed systems, duplicate writes occur when an operation must update a local database and notify other services.

The outbox design solves this problem by combining the publication of messages/events with the same database transaction that updates the local state - making both actions atomic and, therefore, complete or incomplete. In the outbox pattern, you won't be publishing an event to the message broker directly. Instead, a record will be inserted into a special database named Outbox. Incidentally, the Outbox database table is stored in the same database where the data of the application is being stored. Then an asynchronous process executes in the background and periodically checks for incoming messages. When it finds one, it publishes the message to a message broker (i.e., Apache Kafka, RabbitMQ, etc.), as shown in Figure 4.

Figure 4: Demonstrating the outbox pattern
Figure 4: Demonstrating the outbox pattern

Components of the Outbox Pattern

The outbox pattern is comprised of the following key components:

  • Events: At runtime, an application generates events that are stored in the Outbox table.
  • Outbox table: This data store is used to store the messages until they are processed by the publisher.
  • Publisher: This component is used to process messages from the outbox table and then dispatch them to the appropriate destinations

Benefits and Downsides

The outbox pattern is a proven technique widely used for enhancing data consistency and reliability in microservices architectures. However, like any architectural pattern, it has its benefits and downsides.

Benefits

The following are the benefits of the outbox pattern:

  • Transactional consistency: The outbox pattern ensures that any changes to the database residing locally and the publishing of events/messages are part of the same database transaction. This prevents data inconsistencies that might occur due to operation failures after the database changes are committed but before the events are published. By sending the messages as part of the same transaction, the outbox pattern maintains atomicity and consistency. Moreover, the outbox pattern enables events to be retried until they have been successfully processed.
  • Reliable message publishing: The outbox pattern ensures reliable message sending by taking advantage of a persistent storage mechanism where the messages or events are stored in the database, thereby preventing message loss due to failures in the event publishing mechanism or network issues. This persistent storage of messages coupled with asynchronous message sending helps decouple the message dispatch process from the application's response, thereby minimizing the chances of message loss and improving resiliency.
  • Scalability: The outbox pattern enhances scalability of an application by separating the concerns of event creation from event distribution. It enables an application to focus on processing incoming requests without interruption by offloading message transmission to a different process or service.
  • Performance: The outbox pattern improves performance by sending messages asynchronously, thereby processing the messages without any delays to enhance responsiveness and throughput. Storing the events in the same database transaction as the business operations minimizes the number of total transactions executed, which can enhance performance.

Downsides

Despite the benefits of the outbox pattern stated earlier, there are certain downsides as well:

  • Increased complexity: The outbox pattern can introduce additional complexity to your architecture, application design, and development. By implementing this pattern, you need to manage the outbox processor and verify whether the messages are recorded correctly, thereby making your application difficult to manage, maintain, and debug.
  • Latency: When using the outbox pattern, there might be a delay from the time an event occurs and when it's published to other services. It usually publishes events in batches at a regular time interval set by the outbox processor. However, this delay isn't recommended in situations where an immediate or real-time communication is needed.
  • Database coupling: Because the outbox pattern depends on the database to store events before they're published, there's an inherent coupling with the database, thereby making future changes more challenging.
  • Scalability: Although the outbox pattern ensures that microservices are not tightly coupled and can be independently scaled, scaling your application may still be challenging. If an outbox processor isn't working as expected or there's too much traffic, you might have to consider scaling your infrastructure for processing outboxes.
  • Resiliency: Although the outbox pattern ensures that events aren't lost, there's no built-in mechanism to handle failures while processing the events. You need to write your code to handle duplicate events, idempotency, retries, and failure recovery.
  • Operational overhead: Monitoring the outbox repository regularly and adopting the strategy to transmit messages and potential retries can be an additional overhead. Keeping an eye on the outbox storage, ensuring that the messages are delivered, and dealing with errors or retries may be challenging.
  • Integration challenges: You must take the necessary steps to ensure reliable message delivery. This may involve implementing additional configurations and dependencies when connecting to external systems like RabbitMQ and Apache Kafka.

How Does the Outbox Pattern Work?

Assume an order processing application in which an order service is used to handle orders placed by the customer. As soon as a new order is placed, the application must update the order database table and also notify the inventory service that the stock should be updated. The complete flow is illustrated in the sequence of steps of a typical order processing use case given below:

  1. Create Order: A customer uses the application to place a new order. Next, the Order Service picks up this order and processes it.
  2. Update Order database table: The Order Service stores the new order in the Order database table with information that includes the order details and the customer details together with the Order status.
  3. Create Stock Update message: Besides updating the Order database table, the Order Service creates a message to indicate that the Stock database table needs to be updated. This message is persisted in the Outbox database table within the context of the same transaction.
  4. Commit transaction: Next, the transaction that comprises updating the Order database table and insertion of the message into the Outbox database table is committed.
  5. Check Outbox database table periodically: A process named Message Relay executes in the background to check the Outbox database table for new messages.
  6. Dispatch message to Stock service: When the Message Relay detects a new message in the Outbox database table, it forwards the message to the Stock service for processing via the Message queue.
  7. Stock update: Upon receipt of a new message, the Stock service updates the Stock database table accordingly.

In the outbox pattern, when an event occurs, instead of a message being sent to a message broker such as Kafka, a new record that contains the details of the event is stored in the Outbox database table. It should be noted that typically the Outbox table is part of the same database and this all happens as part of the same transaction. Figure 5 illustrates how this all works together.

Figure 5: The Outbox Pattern in action
Figure 5: The Outbox Pattern in action

Key Terminologies

Here are the key terminologies you often come across when exploring the Outbox pattern:

  • Event log or the Outbox: Each microservice maintains an event log or an Outbox where the events that have to be published are first recorded to ensure that the events are never lost. Essentially, an Outbox is a database table to store events or messages before the events are published.
  • Transactional operation: For the purpose of ensuring that only successful business operations are captured in the event log, events are added to the log as part of the same transaction as the business operations that generated them.
  • Outbox processor: An outbox processor runs as a separate background task and scans the event log at regular intervals of time. It collects events without processing them and then publishes them to the event bus or message broker.
  • Message broker or event bus: A message broker or an event bus receives those events that have been published. The message broker or event bus provides durability and reliability and distributes these events to the subscribers or the other microservices.

Introduction to Kafka as a Message Broker

Apache Kafka is a distributed, high-performance, open source, scalable, and versatile stream-processing software. It's written in Java and Scala, and has become popular in recent times for building systems that are adept at handling massive volumes of data. Kafka is a messaging platform based on the publish/subscribe model. It comes with built-in features for replication, partitioning, fault tolerance, and improved throughput, compared to applications without message brokers.

Kafka is a good option for applications that need vast data processing capabilities. The main use of Kafka is in creating real-time streaming data pipelines. For this reason, Kafka includes stream processing and fault-tolerant storage functionalities, which enable storing and analyzing historical and real-time data. Figure 6 shows how Kafka fits in a typical implementation of the outbox pattern.

Figure 6: Using Kafka as the message broker in a typical Outbox pattern implementation
Figure 6: Using Kafka as the message broker in a typical Outbox pattern implementation

Getting Started with Apache Kafka

To get started with Apache Kafka, follow the steps outlined in the next few sections.

Download and Extract Kafka

You can download Apache Kafka from here: https://kafka.apache.org/downloads.

Alternatively, you can run the following command at the terminal to download Kafka in your computer:

wget https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz

Once Kafka has been downloaded, you should extract the zip archive by running the following command at the terminal window:

tar -xzf kafka_2.13-3.7.0.tgz

Start ZooKeeper

You need ZooKeeper to run Kafka on your computer. Once you've downloaded and extracted Apache Kafka to your computer, start ZooKeeper by running the following command at the terminal window:

cd kafka_2.13-3.7.0 bin/windows/zookeeper-server-start.bat 
  config/zookeeper.properties

Create a Topic

Now that ZooKeeper and Kafka are up and running in your computer, you can start creating topics. You can create a new topic by running the following command at the terminal window.

.\bin\windows\kafka-topics.bat --create --topic mynewtopic 
  --bootstrap-server localhost:9092

Display All Kafka Messages in a Topic

To display all messages in a particular topic, use the following command:

 .\bin\windows\kafka-console-consumer.bat --bootstrap-server 
  localhost:9092 --topic mynewtopic --from-beginning

Display all Topics

To list all topics, you can use the following command:

.\bin\windows\kafka-topics.bat --list --bootstrap-server 
localhost:9092

Start the Producer

You can run the following command at the terminal window to start a producer.

bin/windows/kafka-console-producer.bat --topic mynewtopic 
  --bootstrap-server localhost:9092

Start the Consumer

In Kafka, you need a consumer to read the messages produced by a producer. You can run the following command at the terminal window to read all messages sent by the producer as shown in Figure 7.

bin/windows/kafka-console-consumer.bat --topic mynewtopic --from-beginning 
  --bootstrap-server localhost:9092
Figure 7: The Kafka consumer in action
Figure 7: The Kafka consumer in action

Shut Down Zookeeper and Kafka

To shut down Zookeeper, use the zookeeper-server-stop.bat script, as shown below:

bin\windows\zookeeper-server-stop.bat

To shut down Kafka, you use the kafka-server-stop.bat script as shown below:

bin\windows\kafka-server-stop.bat

Implementing the Outbox Pattern in ASP.NET Core, Kafka, and C#

In this section, I'll examine how to implement the outbox pattern in an ASP.NET Core application using Kafka. In this example, you'll use the following interfaces and classes.

  • OutboxMessage: This is the model class.
  • IKafkaProducer: This is the interface for the KafkaProducer class.
  • IKafkaConsumer: This is the interface for the KafkaConsumer class.
  • IOrderService: This interface contains the declaration of methods supported by the OrderService.
  • IOutboxMessageRepository: This represents the interface for the OutboxMessageRepository class.
  • KafkaProducer: This class is used to send messages to Kafka.
  • KafkaConsumer: This class is used to consume messages in Kafka.
  • OrderService: The OrderService class implements the IOrderService interface and encapsulates the logic for invoking the appropriate methods of the OrderMessageRepository.
  • KafkaMessageProcessor: This is a background service that calls KafkaConsumer at regular intervals of time to consume messages residing in Kafka.
  • OutboxMessageProcessor: This is yet another background service used to invoke KafkaProducer to transmit messages to Kafka.
  • OutboxMessageRepository: This class contains methods to retrieve messages from the Outbox database table and also update a message as needed.
  • ApplicationDbContext: This class represents the data context that acts as a gateway to connect to the underlying database being used by the application.
  • OrderController: This represents the Order API that can be consumed by clients to retrieve existing orders or create new orders.

Create a New ASP.NET Core 8 Project in Visual Studio 2022

You can create a project in Visual Studio 2022 in several ways such as, from the Visual Studio 2022 Developer Command Prompt or by launching the Visual Studio 2022 IDE. When you launch Visual Studio 2022, you'll see the Start window. You can choose “Continue without code” to launch the main screen of the Visual Studio 2022 IDE.

Now that you know the basics, let's start setting up the project. To create a new ASP.NET Core 8 Project in Visual Studio 2022:

  1. Start the Visual Studio 2022 IDE.
  2. In the “Create a new project” window, select “ASP.NET Core Web API” and click Next to move on.
  3. Specify the project name as “Outbox_Pattern_Demo” and the path where it should be created in the Configure your new project window.
  4. If you want the solution file and project to be created in the same directory, you can optionally check the “Place solution and project in the same directory” checkbox. Click Next to move on.
  5. In the next screen, specify the target framework and authentication type as well. Ensure that the “Configure for HTTPS,” “Enable Docker Support,” “Do not use top-level statements,” and the “Enable OpenAPI support” checkboxes are unchecked because you won't use any of these in this example.
  6. Remember to leave the “Use controllers” checkbox checked because you won't use minimal API in this example.
  7. Click Create to complete the process.

A new ASP.NET Core Web API project is created. You'll use this project to implement the outbox pattern using Kafka in ASP.NET Core and C#.

Create the Outbox Database Table

First off, create the Outbox database table to store messages. To do this, create a new database called Outbox_Pattern_Demo using the database script given below:

CREATE DATABASE Outbox_Pattern_Demo;

Next, create a new database table called OutboxMessage using the following database script.

CREATE TABLE Outbox_Message
(
   Event_Id BIGINT IDENTITY PRIMARY KEY,
   Event_Payload NVARCHAR(MAX) NOT NULL,
   Event_Date DATETIME NOT NULL   
   DEFAULT CURRENT_TIMESTAMP,
   IsMessageDispatched BIT NOT NULL
);

Create another database table in the same database named Order using the following script.

CREATE TABLE [Order]
(
    Order_Id BIGINT IDENTITY PRIMARY KEY,
    Customer_Id INT NOT NULL,
    Order_Date DATETIME,
    Order_Amount MONEY NOT NULL
);

Install NuGet Package(s)

In this example, you'll take advantage of Entity Framework to interact with the database and perform CRUD (an acronym for Create Read Update Delete) operations. You'll also need a Kafka client package to produce and consume messages. To work with Apache Kafka, you'll use the Confluent.Kafka NuGet package. To install the required package into your project, right-click on the solution and then select Manage NuGet Packages for Solution.… Now search for the Microsoft.EntityFramework.SqlServer and Confluent.Kafka packages in the search box and install it. Alternatively, you can type the commands shown below at the NuGet Package Manager Command Prompt:

PM> Install-Package Microsoft.EntityFrameworkCore.SqlServer 
  Install-Package Confluent.Kafka

You can also install this package by executing the following commands at the Windows Shell:

dotnet add package Microsoft.EntityFrameworkCore.SqlServer
dotnet add package Confluent.Kafka

Create the OutboxMessage Class

Create a new C# class named OutboxMessage in a file named OutboxMessage.cs and write the following code in there.

public class OutboxMessage
{
  public int Event_Id { get; set; }
  public string Event_Payload { get; set; }
  public DateTime Event_Date { get; set; }
  public bool IsMessageDispatched { get; set; }
}

Create the Order Model Class

Create a new class named Order in a file having the same name with a .cs extension and write the following code in there:

public class Order
{
   public long Order_Id { get; set; }
   public int Customer_Id { get; set; }
   public DateTime Order_Date { get; set; }
   public decimal Amount { get; set; }
}

Create the Data Context

In Entity Framework Core (EF Core), a data context is a component used by an application to interact with the database and manage database connections, and to query and persist data in the database. You'll now create a data context class named CustomerDbContext. To create a data context, create a class that extends the DbContext class of EF Core, as shown below:

public class ApplicationDbContext : DbContext
{
    public ApplicationDbContext(DbContextOptions
      <ApplicationDbContext> options) : base(options)
    {
    }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);
    }

    public DbSet<OutboxMessage> OutboxEvents { get; set; }
    public DbSet<Order> Orders { get; set; }
}

You should specify the primary keys and the table names for your entities in the `OnModelCreating method, as shown in the code snippet given below:

protected override void 
OnModelCreating
(ModelBuilder modelBuilder)
{
    modelBuilder.Entity<Order>(entity =>
    {
        entity.ToTable("Order");
        entity.HasKey(e => e.Order_Id);
    });

    modelBuilder.Entity<OutboxMessage>(entity =>
    {
        entity.ToTable("OutboxMessage");
        entity.HasKey(e => e.Event_Id);
    });

    base.OnModelCreating(modelBuilder);
}

The complete source of the data context class is given in Listing 1.

Listing 1: The ApplicationDbContext class

using Microsoft.EntityFrameworkCore;

namespace Outbox_Pattern_Demo
{
    public class ApplicationDbContext : DbContext
    {
        public ApplicationDbContext(DbContextOptions
          <ApplicationDbContext> options) : base(options)
        {
        }

        protected override void OnModelCreating(
          ModelBuilder modelBuilder)
        {
            modelBuilder.Entity<Order>(entity =>
            {
                entity.ToTable("Order");
                entity.HasKey(e => e.Order_Id);
            });

            modelBuilder.Entity<OutboxMessage>(entity =>
            {
                entity.ToTable("OutboxMessage");
                entity.HasKey(e => e.Event_Id);
            });

            base.OnModelCreating(modelBuilder);
        }

        public DbSet<OutboxMessage>OutboxMessages { get; set; }
        public DbSet<Order>Orders { get; set; }
    }
}

Create the OrderService Class

Now, create a new class named OrderService in a file having the same name with a .cs extension. Write the following code in there:

public class OrderService : IOrderService
{       
        
}

The OrderService class illustrated in the code snippet below implements the methods of the IOrderService interface. Here’s how the IOrderService interface should look:

public interface IOrderService
{
   public Task<List<Order>> GetAllOrdersAsync();
   public Task<Order> GetOrderAsync(int Id);
   public Task CreateOrderAsync(Order order);
}

The OrderService class implements the methods of the IOrderService interface.

public async Task CreateOrderAsync(Order order)
{
using var transaction = _context.Database.BeginTransaction();
    try
    {
        _context.Orders.Add(order);
        await _context.SaveChangesAsync();

        var outboxMessage = new OutboxMessage
        {
            Event_Payload = JsonSerializer.Serialize(order),
              Event_Date = DateTime.Now, IsMessageDispatched = false
        };

        _context.OutboxMessages.Add(outboxMessage);

        await _context.SaveChangesAsync();
        await transaction.CommitAsync();
    }
    catch
    {
        await transaction.RollbackAsync();
        throw;
    }
}

The complete source code of the OrderService class is given in Listing 2.

Listing 2: The OrderService Class

namespace Outbox_Pattern_Demo
{
   using System.Text.Json;

    public class OrderService : IOrderService
    {
        private readonly ApplicationDbContext _context;

        public OrderService(ApplicationDbContext context)
        {
            _context = context;
        }

        public async Task<List<Order>> GetAllOrdersAsync()
        {
            return await Task.FromResult(_context.Orders.ToList<Order>());
        }

        public async Task<Order> GetOrderAsync(int Id)
        {
            return await Task.FromResult(
              _context.Orders.FirstOrDefault(x => x.Order_Id == Id));
        }

        public async Task CreateOrderAsync(Order order)
        {
            using var transaction = _context.Database.BeginTransaction();

            try
            {
                _context.Orders.Add(order);
                await _context.SaveChangesAsync();

                var outboxMessage = new OutboxMessage
                {
                    Event_Payload = JsonSerializer.Serialize(order),
                      Event_Date = DateTime.Now, IsMessageDispatched = false
                };

                _context.OutboxMessages.Add(outboxMessage);

                await _context.SaveChangesAsync();
                await transaction.CommitAsync();
            }
            catch
            {
                await transaction.RollbackAsync();
                throw;
            }
        }
    }
}

Create the OutboxMessageProcessor Background Service

You'll now create a service that runs in the background and checks for messages in the OutboxMessage database table. If one or more messages are found, it sends them to Kafka for further processing. To create a background service in .NET Core, you can take advantage of the IHostedService interface. You can create your background service class by implementing the IHostedService interface.

This interface contains two methods, StartAsync and StopAsync as shown below:

public interface IHostedService
{
    Task StartAsync (CancellationToken cancellationToken);
    Task StopAsync (CancellationToken cancellationToken);
}

You can also use the abstract helper class called BackgroundService that's available in .NET Core. Because this class already implements the IHostedService interface, you can create your background service class by extending this class in lieu of implementing the IWorkerService interface. The BackgroundService class is defined in the Microsoft.Extensions.Hosting namespace as shown below:

public abstract class BackgroundService : IHostedService, IDisposable
{
    public virtual void Dispose();
    public virtual Task StartAsync(CancellationToken cancellationToken);
    public virtual Task StopAsync(CancellationToken cancellationToken);
    protected abstract Task ExecuteAsync(CancellationToken stoppingToken);
}

The following code snippet shows how you can create your background service class in this way.

public class OutboxMessageProcessor : BackgroundService
{
   protected override async Task ExecuteAsync(CancellationToken 
     cancellationToken)
   {
    
   }
}

The PublishOutboxMessagesAsync method of the OutboxMessageProcessor class is responsible for sending the outbox messages to Kafka. The foreach loop iterates through all messages residing in the Outbox database table that are yet to be dispatched. Such messages are sent to Kafka by making a call to the SendMessageToKafkaAsync method of the KafkaProducer class.

private async Task PublishOutboxMessagesAsync
  (CancellationToken cancellationToken)
{
    try
    {
        using var scope = _scopeFactory.CreateScope();
        await using var _dbContext = scope.ServiceProvider.
          GetRequiredService<ApplicationDbContext>();

        List<OutboxMessage> messages = _dbContext.OutboxMessages.Where(om => 
          om.IsMessageDispatched != false).ToList();

        foreach (OutboxMessage outboxMessage in messages)
        {
            try
            {
                await _producer.SendMessageToKafkaAsync(outboxMessage);

                outboxMessage.IsMessageDispatched = true;
                outboxMessage.Event_Date = DateTime.UtcNow;

                _dbContext.OutboxMessages.Update(outboxMessage);
                await _dbContext.SaveChangesAsync();
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
        }
    }
    catch
    {
        throw;
    }

    await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
}

The complete source code of the OutboxMessageProcessor class is given in Listing 3.

Listing 3: The OutboxMessageProcessor class

namespace Outbox_Pattern_Demo
{
    public class OutboxMessageProcessor : BackgroundService
    {
        private readonly IServiceScopeFactory _scopeFactory;
        private readonly IKafkaProducer _producer;

        public OutboxMessageProcessor(IServiceScopeFactory scopeFactory, 
          IKafkaProducer producer)
        {
            _scopeFactory = scopeFactory;
            _producer = producer;
        }

        protected override async Task ExecuteAsync(CancellationToken 
          cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                await PublishOutboxMessagesAsync(cancellationToken);
            }
        }

        private async Task PublishOutboxMessagesAsync(CancellationToken 
          cancellationToken)
        {
            try
            {
                using var scope = _scopeFactory.CreateScope();
                await using var _dbContext = 
                  scope.ServiceProvider.GetRequiredService
                  <ApplicationDbContext>();

                List<OutboxMessage> messages = _dbContext.OutboxMessages.Where
                (om => om.IsMessageDispatched != false).ToList();

                foreach (OutboxMessage outboxMessage in messages)
                {
                    try
                    {
                        await _producer.SendMessageToKafkaAsync(outboxMessage);

                        outboxMessage.IsMessageDispatched = true;
                        outboxMessage.Event_Date = DateTime.UtcNow;

                        _dbContext.OutboxMessages.Update(outboxMessage);
                        await _dbContext.SaveChangesAsync();
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e);
                    }
                }
            }
            catch
            {
                throw;
            }

            await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
        }
    }
}

Create the OutboxMessageRepository

Let’s now create the OutboxMessageRepository that will be used to get the messages from the Outbox database table and also update a message in the same table. The OutboxMessageRepository implements the IOutboxMessageRepository interface. The following code snippet shows the IOutboxMessageRepository interface:

namespace Outbox_Pattern_Demo
{
    public interface IOutboxMessageRepository
    {
        Task<IreadOnlyCollection<OutboxMessage>> GetUnsentMessagesAsync();
        Task<IreadOnlyCollection<OutboxMessage>> GetMessagesByIdsAsync
          (IEnumerable<int> ids);
        Task UpdateAsync(OutboxMessage message, bool status);
    };
}

The complete source code of the OutboxMessageRepository class is given in Listing 4.

Listing 4: The OutboxMessageRepository class

using System.Collections.ObjectModel;
using System.Data;

namespace Outbox_Pattern_Demo
{
    public class OutboxMessageRepository : IOutboxMessageRepository
    {
        private readonly ApplicationDbContext _context;
        private IreadOnlyCollection<OutboxMessage> _outboxMessages;

        public OutboxMessageRepository(ApplicationDbContext context)
        {
             _context = context;
        }

        public IreadOnlyCollection<OutboxMessage> OutboxMessages
        {
            get
            {
                return _outboxMessages ?? (_outboxMessages = new 
                  ReadOnlyCollection<OutboxMessage>
                    (_context.OutboxMessages.ToList()));
            }
        }

        public async Task<IreadOnlyCollection<OutboxMessage>> 
          GetUnsentMessagesAsync()
        {
            List<OutboxMessage>? unsentMessages = 
              _context.OutboxMessages.Where
              (e => e.IsMessageDispatched != true).ToList();
            ReadOnlyCollection<OutboxMessage>? result = new ReadOnlyCollection
              <OutboxMessage>(unsentMessages);
            return result;
        }

        public async Task<IreadOnlyCollection<OutboxMessage>> 
          GetMessagesByIdsAsync(IEnumerable<int> ids)
        {
            List<OutboxMessage>? orders = _context.OutboxMessages.ToList();
            var readOnlyOrders = new ReadOnlyCollection<OutboxMessage>(orders);
            return readOnlyOrders;
        }

        public async Task UpdateAsync(OutboxMessage message, bool status)
        {
            var entity = _context.OutboxMessages.FirstOrDefault
              (o => o.Event_Id == message.Event_Id);

            if (entity != null)
            {
                entity.Event_Id = message.Event_Id;
                entity.Event_Date = message.Event_Date;
                entity.Event_Payload = message.Event_Payload;
                entity.IsMessageDispatched = message.IsMessageDispatched;
                await _context.SaveChangesAsync();
            }
        }
    };
}

The OrderController Class

Next, create a new API controller class named OrderController and replace the generated code with the code given in Listing 5. The OrdersController class contains three action methods: GetOrder, GetOrders, and CreateOrder. The GetOrders action method returns a list of Order instances, and the GetOrder action method returns one Order based on the Order ID passed to the action method as a parameter.

Listing 5: The OrderController Class

using Microsoft.AspNetCore.Mvc;

namespace Outbox_Pattern_Demo.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class OrderController : ControllerBase
    {
        private IOrderService _orderService;
        public OrderController(IOrderService orderService)
        {
            _orderService = orderService;
        }

        [HttpGet("GetOrders")]
        public async Task<List<Order>> GetOrders()
        {
            return await _orderService.GetAllOrdersAsync();
        }

        [HttpGet("{id}")]
        public async Task<Order> GetOrder(int id)
        {
            return await _orderService.GetOrderAsync(id);
        }

        [HttpPost("CreateOrder")]
        public async Task<IActionResult>CreateOrder([FromBody] Order order)
        {
            await _orderService.CreateOrderAsync(order);
            return Ok();
        }
    }
}

The CreateOrder action method given below creates a new order record in the order database table.

[HttpPost("CreateOrder")]
public async Task<IActionResult> CreateOrder([FromBody] Order order)
{
    await _orderService.CreateOrderAsync(order);
    return Ok();
}

Send Messages to Apache Kafka

Create an interface named IKafkaProducer in a file named IKafkaProducerr.cs and write the following code in there:

public interface IKafkaProducer
{
   Task SendMessageToKafkaAsync(OutboxMessage message); 
};

Next, create a new class named KafkaProducer in a file having the same name with a .cs extension and write the code given in Listing 6 in there.

Listing 6: The KafkaProducer class

using Confluent.Kafka;
using System.Net;

namespace Outbox_Pattern_Demo
{
    public class KafkaProducer : IKafkaProducer
    {
        private readonly ProducerConfig _producerConfig;
        private readonly IOutboxRepository _outboxRepository;
        private readonly string topic = "test";

        public KafkaProducer(IOutboxRepository outboxRepository)
        {
            _outboxRepository = outboxRepository;

            _producerConfig = new ProducerConfig
            {
                BootstrapServers = "localhost:9092", 
                ClientId = Dns.GetHostName()
            };

            _outboxRepository = outboxRepository;
        }

        public async Task SendMessageToKafkaAsync(OutboxMessage message)
        {
            if (message == null)
            {
                throw new ArgumentNullException(nameof(message));
            }

            using var producer = new ProducerBuilder<Null, 
              string>(_producerConfig).Build();

            try
            {
                var result = await producer.ProduceAsync
                  (topic, new Message<Null, string>
                {
                    Value = message.EventPayload
                });

                if (result.Status == PersistenceStatus.Persisted)
                {
                    await _outboxRepository.UpdateAsync 
                      (message, OutboxMessageStatus.Sent);
                }
            }
            catch (Exception)
            {
                await _outboxRepository.UpdateAsync
                  (message, OutboxMessageStatus.New);
            }
        }
    };
}

In addition, you should register the instances of type ImessageRepository and IKafkaProducer with the service collection, as shown below:

builder.Services.AddScoped <IOutboxMessageRepository, 
  OutboxMessageRepository>();
builder.Services.AddScoped<IKafkaProducer, KafkaProducer>();

You should also register the hosted service in the Program.cs file, as shown in the code snippet given below:

services.AddSingleton<IHostedService, OutboxMessageProcessor>();

Sequence Diagram of the Complete Flow

The sequence diagram of the complete flow is shown in Figure 8.

Figure 8: A sequence diagram of the complete flow
Figure 8: A sequence diagram of the complete flow

Create the Message Consumer Application

Let's create a message consumer to consume the messages stored in Kafka. To do this, create another ASP.NET Core application by following the steps mentioned earlier in this article. Create a new interface named IKafkaConsumer and write the following code in there:

namespace Outbox_Pattern_Demo
{
    public interface IKafkaConsumer
    {
        public Task ConsumeMessagesAsync(CancellationToken cancellationToken);
    }
}

Next, create a new class named KafkaConsumer that implements the IKafkaConsumer interface, as shown in Listing 8. You now need a service that runs in the background and consumes the messages residing in Kafka. To do this, create a class named KafkaMessageProcessor that extends the BackgroundService class, as shown in Listing 9.

Listing 8: The KafkaConsumer class

using Confluent.Kafka;
using System.Text.Json;

namespace Outbox_Pattern_Demo
{
    public class KafkaConsumer : IKafkaConsumer
    {
        private readonly string topic = "test";
        private readonly string groupId = "test_group";
        private readonly string bootstrapServers = "localhost:9092";

        public async Task ConsumeMessagesAsync(CancellationToken 
          cancellationToken)
        {
            var config = new ConsumerConfig
            {
                GroupId = groupId,
                BootstrapServers = bootstrapServers,
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            try
            {
                using (var consumerBuilder = new ConsumerBuilder
                  <Ignore, string>(config).Build())
                {
                    consumerBuilder.Subscribe(topic);
                    var cancelToken = new CancellationTokenSource();

                    try
                    {
                        while (true)
                        {
                            var consumer = consumerBuilder.Consume
                              (cancelToken.Token);
                            var order = JsonSerializer.Deserialize
                              <Order>(consumer.Message.Value);
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        consumerBuilder.Close();
                    }
                }
            }
            catch
            {
                throw;
            }
        }
    }
}

Listing 9: The KafkaMessageProducer class

namespace Outbox_Pattern_Demo
{
    public class KafkaMessageProcessor : BackgroundService
    {
        private readonly IServiceScopeFactory _scopeFactory;
        private readonly IKafkaConsumer _consumer;

        public KafkaMessageProcessor(IServiceScopeFactory scopeFactory, 
          IKafkaConsumer consumer)
        {
            _scopeFactory = scopeFactory;
            _consumer = consumer;
        }

        protected override async Task ExecuteAsync
          (CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                await _consumer.ConsumeMessagesAsync(cancellationToken);
            }
        }
    }
}

Register the instance of type IKafkaConsumer as a scoped service and the instance of KafkaMessageProcessor as a singleton hosted service.

builder.Services.AddScoped<IKafkaConsumer, KafkaConsumer>();
builder.Services.AddSingleton<IHostedService, KafkaMessageProcessor>();

Figure 9 illustrates the complete flow visually.

Figure 9: The complete flow of the consumer application
Figure 9: The complete flow of the consumer application

Listing 10 shows the Program.cs file of the consumer application.

Listing 10: The Program.cs file (Consumer Application)

using Microsoft.EntityFrameworkCore;
using Outbox_Pattern_Client_Demo;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();
builder.Services.AddScoped<IKafkaConsumer, KafkaConsumer>();
builder.Services.AddSingleton<IHostedService, KafkaMessageProcessor>();

var app = builder.Build();

// Configure the HTTP request pipeline.

app.UseAuthorization();
app.MapControllers();
app.Run();

Execute the Application

Run the producer and the consumer applications separately because they're part of different solutions. You should also set appropriate breakpoints in the source code of both applications so that you can debug them. To run the application, follow the steps outlined below:

  1. Start ZooKeeper in a terminal window.
  2. Start Kafka in another terminal window.
  3. Execute the producer application.
  4. Execute the consumer application.
  5. Launch the Postman Http Debugger tool.
  6. Send an HTTP POST request to the producer API using Postman.

Figure 10 shows how you can invoke the API endpoint in Postman.

Figure 10: Invoking the API endpoint in Postman
Figure 10: Invoking the API endpoint in Postman

Real-World Use Cases for the Outbox Pattern

The outbox pattern has several use cases in real-life: real-time notifications and data synchronization. The next two sections discuss these.

Real-Time Notifications

You might often want real-time notifications in your application to send instant alerts, messages, etc. You can use the outbox pattern to implement notifications in your application in real-time. These events can then be processed asynchronously, thereby triggering real-time notifications to the logged-in users of the application.

Data Synchronization

Real-time data synchronization is yet another use case where the outbox pattern fits in. For example, in an e-commerce application when a customer places an order, both the order and the stock database tables are updated simultaneously in the same transaction. Additionally, the order, product, and cart data must be synchronized across all devices in use using asynchronous processing.

Alternatives to the Transactional Outbox Pattern

Although the outbox pattern is a popular strategy for handling distributed transactions efficiently and ensuring reliable and consistent communication between microservices, there are certain downsides to using it as well. I've discussed them earlier in this article. The Two-Phase Commit and the Saga Pattern are two popular alternatives to this strategy.

Two-Phase Commit

This approach is used for performing an atomic transaction across multiple resources. Keep in mind that there are two phases in this strategy. While the transaction coordinator notifies all other resources about its desire to execute a transaction in the first phase, it instructs all resources to execute this operation in the next phase. The major drawback of such an approach is that if any resource fails or there are network issues, the whole transaction stops and locks are applied on resources.

Saga Pattern

This approach splits the transactions into multiple steps and completes each step as a separate transaction. This particular design is best suited for long-running transactions or those that take place between microservices. However, when you adopt this approach, it can be difficult to manage which transactions must be rolled back in the event of an error in your application. This is because the transaction timing and sequence for such transactions can be difficult to comprehend.

Where Do We Go from Here?

The outbox pattern, a reliable pattern for consistent event publishing in microservices-based applications, is a great way to make your microservices apps more stable and scalable. It helps separate services through which the events are successfully delivered and isolates business operations from publishing events. However, it also has certain downsides, which explains why you should understand these constraints before deciding on your application's design. I'll discuss more on the Outbox Pattern in a future article.