Pull Diagnostics

This implements the push-model of diagnostics.

This is a fairly new addition to LSP (v3.17), so not all clients will support this.

Instead of the server broadcasting updates whenever it feels like, the client explicitly requests diagnostics for a particular document (textDocument/diagnostic) or for the entire workspace (workspace/diagnostic). This approach helps guide the server to perform work that’s most relevant to the client.

This server scans a document for sums e.g. 1 + 2 = 3, highlighting any that are either missing answers (warnings) or incorrect (errors).

import logging
import re

from lsprotocol import types

from pygls.cli import start_server
from pygls.lsp.server import LanguageServer
from pygls.workspace import TextDocument

ADDITION = re.compile(r"^\s*(\d+)\s*\+\s*(\d+)\s*=\s*(\d+)?$")


class PullDiagnosticServer(LanguageServer):
    """Language server demonstrating "pull-model" diagnostics."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.diagnostics = {}

    def parse(self, document: TextDocument):
        _, previous = self.diagnostics.get(document.uri, (0, []))
        diagnostics = []

        for idx, line in enumerate(document.lines):
            match = ADDITION.match(line)
            if match is not None:
                left = int(match.group(1))
                right = int(match.group(2))

                expected_answer = left + right
                actual_answer = match.group(3)

                if actual_answer is not None and expected_answer == int(actual_answer):
                    continue

                if actual_answer is None:
                    message = "Missing answer"
                    severity = types.DiagnosticSeverity.Warning
                else:
                    message = f"Incorrect answer: {actual_answer}"
                    severity = types.DiagnosticSeverity.Error

                diagnostics.append(
                    types.Diagnostic(
                        message=message,
                        severity=severity,
                        range=types.Range(
                            start=types.Position(line=idx, character=0),
                            end=types.Position(line=idx, character=len(line) - 1),
                        ),
                    )
                )

        # Only update if the list has changed
        if previous != diagnostics:
            self.diagnostics[document.uri] = (document.version, diagnostics)

        # logging.info("%s", self.diagnostics)


server = PullDiagnosticServer("diagnostic-server", "v1")


@server.feature(types.TEXT_DOCUMENT_DID_OPEN)
def did_open(ls: PullDiagnosticServer, params: types.DidOpenTextDocumentParams):
    """Parse each document when it is opened"""
    doc = ls.workspace.get_text_document(params.text_document.uri)
    ls.parse(doc)


@server.feature(types.TEXT_DOCUMENT_DID_CHANGE)
def did_change(ls: PullDiagnosticServer, params: types.DidOpenTextDocumentParams):
    """Parse each document when it is changed"""
    doc = ls.workspace.get_text_document(params.text_document.uri)
    ls.parse(doc)


@server.feature(
    types.TEXT_DOCUMENT_DIAGNOSTIC,
    types.DiagnosticOptions(
        identifier="pull-diagnostics",
        inter_file_dependencies=False,
        workspace_diagnostics=True,
    ),
)
def document_diagnostic(
    ls: PullDiagnosticServer, params: types.DocumentDiagnosticParams
):
    """Return diagnostics for the requested document"""
    # logging.info("%s", params)

    if (uri := params.text_document.uri) not in ls.diagnostics:
        return

    version, diagnostics = ls.diagnostics[uri]
    result_id = f"{uri}@{version}"

    if result_id == params.previous_result_id:
        return types.UnchangedDocumentDiagnosticReport(result_id)

    return types.FullDocumentDiagnosticReport(items=diagnostics, result_id=result_id)


@server.feature(types.WORKSPACE_DIAGNOSTIC)
def workspace_diagnostic(
    ls: PullDiagnosticServer, params: types.WorkspaceDiagnosticParams
):
    """Return diagnostics for the workspace."""
    # logging.info("%s", params)
    items = []
    previous_ids = {result.value for result in params.previous_result_ids}

    for uri, (version, diagnostics) in ls.diagnostics.items():
        result_id = f"{uri}@{version}"
        if result_id in previous_ids:
            items.append(
                types.WorkspaceUnchangedDocumentDiagnosticReport(
                    uri=uri, result_id=result_id, version=version
                )
            )
        else:
            items.append(
                types.WorkspaceFullDocumentDiagnosticReport(
                    uri=uri,
                    version=version,
                    items=diagnostics,
                )
            )

    return types.WorkspaceDiagnosticReport(items=items)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format="%(message)s")
    start_server(server)