Strapper real-time deployment log

Archie To

This real-time deployment log feature was implemented by following a tutorial on Django Channels.

All of the code below can be found in the Strapper GitLab repository

Setup

Install daphne and channels:

$ pip install -U daphne channels["daphne"]

In app_starter/app_starter/settings.py:

INSTALLED_APPS = [
    "daphne",
    ...,
]

ASGI_APPLICATION = "app_starter.asgi.application"

Configure ASGI app

Since wsgi doesn’t support long-lived connections, we will have to serve our app as an asgi app. In app_starter/app_starter/asgi.py:


"""
ASGI config for app_starter project.

It exposes the ASGI callable as a module-level variable named ``application``.

For more information on this file, see
https://docs.djangoproject.com/en/4.1/howto/deployment/asgi/
"""

import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application

from normal_users.routing import websocket_urlpatterns

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app_starter.settings')
# Initialize Django ASGI application early to ensure the AppRegistry
# is populated before importing code that may import ORM models.
django_asgi_app = get_asgi_application()

application = ProtocolTypeRouter({
    "http": django_asgi_app,
    "websocket": AllowedHostsOriginValidator(
        AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
    ),
})

Here we are using ProtocolTypeRouter as the root application, which defines what asgi application to serve depending on the protocol type:

  • If the protocol is http, the Django app is served
  • If the protocol is websocket, there are multiple stages:
    • First we ensure that the websocket connection is from an allowed host (set in settings.py).
    • Next, we use AuthMiddlewareStack to authenticate if the current user has access to this websocket exactly like how Django does it for the views.
    • Lastly, we use URLRouter to define which application to serve based on the url of the websocket. The specifics of what application to serve is defined in normal_users/routing.py

Define consumers

Each consumer is basically an asgi app that does a couple things:

  • Structure your code as a series of functions to be called whenever an event happens, rather than making you write an event loop.
  • Allow you to write synchronous or async code, and deal with handoffs and threading for you.

Strapper consumers are defined in normal_users/consumers.py:

import json

from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer

class DeployConsumer(WebsocketConsumer):
    def connect(self):
        self.identifier = self.scope["url_route"]["kwargs"]["identifier"]
        self.app_group_name = f"deploy_{self.identifier}"

        # Join app group
        async_to_sync(self.channel_layer.group_add)(
            self.app_group_name, self.channel_name
        )

        # Accept connection from Javascript client
        self.accept()

    # Have to keep argument 'close_code' as it is how the disconnect() method of
    # class WebSocketConsumer is defined
    # pylint: disable-next=unused-argument
    def disconnect(self, close_code):
        # Leave app group
        async_to_sync(self.channel_layer.group_discard)(
            self.app_group_name, self.channel_name
        )

    # Listen for messages from consumers (including itself)
    # Messages are received only from ApplicationDeployment and StopApplicationDeploymentView
    # and only when there is a change in the log file of the application
    # This function is currently not used
    def receive(self, log_file_content):
        log_file_content_json = json.loads(log_file_content)
        print(f'log file content: {log_file_content_json}')
        content = log_file_content_json["content"]

        # Send content to app group
        async_to_sync(self.channel_layer.group_send)(
            self.room_group_name, {"type": "log_file_content", "content": content}
        )

    # Listen for log_file_content events
    # Messages are received only from ApplicationDeployment and StopApplicationDeploymentView
    # and only when there is a change in the log file of the application
    def log_file_content(self, event):
        content = event["content"]

        # Send message to WebSocket
        self.send(text_data=json.dumps({"content": content}))

To understand the code above, we need to understand the concept of channel layer.

A channel layer is a kind of communication system. It allows multiple consumer instances to talk with each other, and with other parts of Django. A channel layer provides the following abstractions:

  • A channel is a mailbox where messages can be sent to. Each channel has a name. Anyone who has the name of a channel can send a message to the channel.
  • A group is a group of related channels. A group has a name. Anyone who has the name of a group can add/remove a channel to the group by name and send a message to all channels in the group. It is not possible to enumerate what channels are in a particular group.

Every consumer instance has an automatically generated unique channel name, and so can be communicated with via a channel layer.

So, at a high level, when a consumer recevies a websocket connection from a deploy log page, it adds its own channel into a group for that app. The consumer identifies the requested app by reading the url of the websocket. This channel listens for log_file_content events. Upon the triggering of this event, the consumer sends the message of the event back to the client.

In settings.py:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels.layers.InMemoryChannelLayer"
    }
}

This code sets the backend that stores channel layers as in-memory, which I believe it’s the RAM. However, in the doc, this is not recommended for production. The production-ready backend is channels_redis. In the case of Strapper, since we don’t have a Redis storage running, we have to settle with an in-memory storage.

Setup routes for consumers

For each websocket URL, we define a handling consumer in normal_users/routing.py:

# pylint:
from django.urls import path

from . import consumers

websocket_urlpatterns = [
    path('ws/deploy/<slug:identifier>', consumers.DeployConsumer.as_asgi()),
]

Think of this as how Django views are served based on an URL. Every connection to the websocket with URL ws/deploy/<slug:identifier> will be handled by a DeployConsumer instance we created above. We call the as_asgi() classmethod in order to get an ASGI application that will instantiate an instance of our consumer for each user-connection.

Connect the deployment browser to a web socket

Everything displayed for application deployment logs are written in deploy_log.html:

{% extends "base.html" %}
{% block title %}Deploy log{% endblock %}
{% block content %}
<div style="margin-top:100px;">
    <div style="width: 90%;" class="mx-auto">
        <h1 id="deploy-heading"></h1>
        <p>Status: <strong>{{ status }}</strong></h1>
        <div class="log-container">
            <p id="deploy-log">{{ log |safe }}</p>
        </div>
    </div>
</div>
<script>
    // Get app id from the current URL
    const appId = window.location.href.split('/').at(-2);

    $('#deploy-heading').text(`Deployment log for ${appId}`);

    // Connect with the consumer in Django
    var ws_scheme = window.location.protocol == "https:" ? "wss" : "ws";
    const deploySocket = new WebSocket(
        `${ws_scheme}://${window.location.host}/ws/deploy/${appId}`
    );

    // Receive and render a new ouput everytime there is a change in the log file
    const deployLog = document.querySelector('#deploy-log');

    deploySocket.onmessage = e => {
        $('#deploy-log').empty();
        $('.ansi2html-content').remove();
        const data = JSON.parse(e.data);
        deployLog.innerHTML = (data.content + '\n');
    };

    // Return an error if the socket is closed unexpectedly
    deploySocket.onclose = e => {
        console.error('Deploy socket is closed');
    };

    // Closing the WebSocket before closing the browser
    window.onbeforeunload = () => {
        deploySocket.close();
    }
</script>
{% endblock %}

Let’s focus on the <script> tag content. We are connecting each app deploy log browser window to a consumer through a websocket with the set up URL. Now, everytime this socket receives a message from the consumer, it will update the log with the message content, which is the updated log.

Send updated logs to consumers

So where are the messages for the updated logs sent from to the consumers? They are sent from Django views, specically the ApplicationDeploymentView and StopApplicationDeploymentView. When these views are requested, they call poll_log_file(). The function is written as follows:

def poll_log_file(identifier, deployment):
    # Get the channel layer app group based on passed in identifier
    layer = get_channel_layer()
    app_group_name = f"deploy_{identifier}"
    # Add this thread to app channel layer group
    async_to_sync(layer.group_add)(
        app_group_name, f'poll_log_file_{identifier}'
    )

    # Get the log file of the deployment
    log_file = deployment.log_file

    # Construct converter to convert ANSI to HTML
    conv = Ansi2HTMLConverter()

    i = 0
    cached_stamp = os.stat(log_file).st_mtime
    # Time out updating log after 20s without any changes to the log file
    # Polling for output file content changes every 1 second
    while i < 20:
        stamp = os.stat(log_file).st_mtime
        if stamp != cached_stamp:
            cached_stamp = stamp
            # Send current log output to the front end
            with open(log_file, 'r', encoding='utf-8') as log_file_stream:
                output = log_file_stream.read()
            async_to_sync(layer.group_send)(app_group_name, {
                'type': 'log_file_content',
                'content': conv.convert(output)
            })
            # Reset time-out timer every time the file changes
            i = 0
        time.sleep(1)
        i += 1

    # Discard this thread from app channel layer group
    async_to_sync(layer.group_discard)(
        app_group_name, f'poll_log_file_{identifier}'
    )

Basically, this function polls the deployment log file for the requested app every second. If the log changes, the function will broadcast the message to the app group as type “log_file_content”, which triggers the log_file_content method of the consumers for the app. If there is no change in the file for 20 seconds, the function will remove itself from the group and exit.

Summary

To conclude, each deploy log window is connected to a consumer through a websocket. Each consumer listens for log_file_content events. If the event is triggered, the consumer sends the event message back to the window client. When applications are deployed, stopped and destroyed, a function is called to poll the deploy log file every second and broadcast the updated version, if there are changes, to the app group, which contains all app consumer channels. This function exits if there is no change in the log file after 20 seconds.