Software Architectures examples : Asynchronous Command Queue

There are some scenarios where we don’t need to wait for fast feedback when executing tasks. In some cases, we can just request the computation of that a task, a command, and at some moment in the future, we can receive a callback notification with the execution result.

To explore a solution for this posibilities, I decided to build this example by applying an asynchronous command queue.

An asynchronous world needs asynchronous solutions

This means that in many cases we need to build systems that are able to deal with asynchronous use cases, where no one is waiting for an immediate response.

A typical example is when buying online. Traditional webs created big transactions with many comprobations like payment information, stock, delivery addresses… and users had to wait mandatory for all validations.

But nowadays, when we think about improving the user experience, UX today’s applications tend to simplify and accelerate the process for the user. The same buy action button, trigger a process. And at some time, the user will be notified via an email or a notification with the process result about if it’s finished o rejected.

Under the hood, we can find that the original request has an HTTP 200 ok. But the dialogs between the servers are quite different from when we were used. Patterns like pooling or WebSockets are very common in the frontend.

Benefits

  • Concurrency disappears. Because now commands can be processed one by one, so potentially there are no race conditions between resources.
  • Less coupling and more flexibility because systems can now be integrated just with a command contract.
  • Improve client usage with faster requests, and less error management, at least in command publishing.
  • Possibility to define a command execution context and pipeline, to deal with stuff like transactions scopes or schedule.
  • Possibility to implement with retry policities.
  • Good integration with technologies like webhooks, webshockets, etc.
  • If command history is stored, there is a possibility to replay production loads in test environments replaying production commands.

Drawnbacks

  • Much more complexity, compared with previous architectures examples.
  • Harder learning curve than with other approaches.
  • More technologies are involved, more points of failure.
  • Usually need of a callback mechanism for the client to receive results and manage errors.

This only has sense in a couple of scenarios, where executions and notifications are needed. Each solution has its suitable problem.

Example

Following this aspnetcore example, consider the next code distribution:

This music application example is built on top of the previous Event Driven example. It’s recommended to understand previous examples first. In this case, we had included some projects like MyMusic.Application.Commands and MyMusic.Application.CommandsHandlers:

Commands

Class: MyMusic.Application.Commands.CreatePLayList

public class CreatePLayList : Command {
    public string PlayListName { get; }

    public CreatePLayList(string playListName) {
        PlayListName = playListName;
    }
}

Commands are just request for use cases executions in this case.

Commands subscription

Class: MyMusic.Configuration

private static void RegisterPlayListCommandProcessorsInTo(CommandQueuePort commandQueue, PlayListCommandProcessor playListCommandProcessor) {
    commandQueue.SetQueueSingleConsumer<CreatePLayList>(playListCommandProcessor.Process);
    commandQueue.SetQueueSingleConsumer<RenamePlaylist>(playListCommandProcessor.Process);
    commandQueue.SetQueueSingleConsumer<ChangePlayListImageUrl>(playListCommandProcessor.Process);
    commandQueue.SetQueueSingleConsumer<ArchivePlayList>(playListCommandProcessor.Process);
}

As you can see, for this example we had decided to create an in-memory command queue, found in AsynchronousCommandQueueInMemoryAdapter. In this part, we manually register the commands we are going to deal with in this application, and who is it going to process them.

Commands creations

Class: MyMusic.Controllers.PlaylistsController

[HttpPost]
public ActionResult CreatePlayList([FromBody]CreatePlayListRequest request) {
    commandQueue.Queue(new CreatePLayList(request.PlayListName));
    return Ok();
}

Commands are created directly in the delivery layer, in this case in the controller.

Commands processing

Class: MyMusic.CommandProcessors.PlayListCommandProcessor

public class PlayListCommandProcessor {
    private readonly PlayListCommandHandlerCreator playListCommandHandlerCreator;

    public PlayListCommandProcessor(PlayListCommandHandlerCreator playListCommandHandlerCreator) {
        this.playListCommandHandlerCreator = playListCommandHandlerCreator;
    }

    public Either<DomainError, CommandResult> Process(CreatePLayList command) {
        var commandHandler = playListCommandHandlerCreator.CreateCreatePlayListCommandHandler();
        return commandHandler.Handle(command);
    }
}

I decided to create this processor abstraction on top of handlers, the same with events in previous example, to deal with injections and to generate a flow as similar as possible to the traditional controller-services approach.

Commands handling

Class: MyMusic.Application.CommandHandlers.CreatePlayListCommandHandler

public class CreatePlayListCommandHandler {
    
    private readonly UniqueIdentifiersPort uniqueIdentifiers;
    private readonly PlayListPersistencePort playListPersistence;
    private readonly EventPublisherPort eventPublisher;

    public CreatePlayListCommandHandler(UniqueIdentifiersPort uniqueIdentifiers, PlayListPersistencePort playListPersistence, EventPublisherPort eventPublisher) {
        this.uniqueIdentifiers = uniqueIdentifiers;
        this.playListPersistence = playListPersistence;
        this.eventPublisher = eventPublisher;
    }

    public Either<DomainError, CommandResult> Handle(CreatePLayList command) {
        var newPlayListId = uniqueIdentifiers.GetNewUniqueIdentifier();
        var playList = PlayList.Create(newPlayListId, command.PlayListName);
        
        playListPersistence.Persist(playList);
        eventPublisher.Publish(playList.Events());
        return CommandResult.Success;
    }
    
}

Commands handlers are now use cases executions and are the substitution of services from previous examples.

Notice that each command now has the side effect of publishing an event.

Running the example with SignalR

Class: MyMusic.Application.EventHandlers.PlayListHasBeenCreatedEventHandler

public class PlayListHasBeenCreatedEventHandler {
    private readonly PlayListNotifierPort playListNotifier;
    private readonly WebsocketPort websocket;

    public PlayListHasBeenCreatedEventHandler(PlayListNotifierPort playListNotifier, WebsocketPort websocket) {
        this.playListNotifier = playListNotifier;
        this.websocket = websocket;
    }

    public async Task Handle(PlayListHasBeenCreated @event) {
        playListNotifier.NotifyPlayListHasBeenCreated(@event.PlayListId, @event.PlayListName);
        await websocket.PushMessageWithEventToAll(@event);
    }
}

And, as you can see, an event handlers is subscribed to that event, and we had added websocket.PushMessageWithEventToAll(@event)

Send messages to sockets

Class: MyMusic.Websockets.SignalRWebsocketAdapter

public class SignalRWebsocketAdapter : Hub, WebsocketPort {
    
    public async  Task PushMessageWithEventToAll(Event @event) {
        if (Clients != null){
            await Clients.All.SendAsync("AllMyMusicTarget", @event.GetType().Name,@event);
        }
    }

    public async Task PushMessageWithErrorToAll(string error, Command command) {
        if (Clients != null){
            await Clients.All.SendAsync("AllMyMusicTarget", error, command);
        }
    }
}

This implementation simply sends to all server socket connections all the events.

Receibe messages in the client

Code: Websocket-Web-Client/index.html

var connection = new signalR.HubConnectionBuilder()
        .withUrl("https://localhost:44375/MyMusicHub", {
            skipNegotiation: true,
            transport: signalR.HttpTransportType.WebSockets
        })
        .configureLogging(signalR.LogLevel.Debug)
        .build();
        
    connection.on("AllMyMusicTarget", function (user, message) {
        console.log("AllMyMusicTarget", user, message)
    });

    connection.start().then(function () {
        console.log("SignalR connection started...")
    }).catch(function (error) {
        return console.error(error.toString());
    });

In the javascript frontend client, we can see the socket connection opening with the server and the subscription to the AllMyMusicTarget messages.

We can serve this code in a local web server by executing npm install and npm run server.

Run execution

Then If we want to test the playlist creation endpoint, we only need to create a request like:

curl -X POST "https://localhost:44375/playlists" -H "accept: */*" -H "Content-Type: application/json" -d "{\"playListName\":\"a playlist\"}"

And then we can see, that the message has traveled from the server through the client in the browser network tab: