Skip to content

Custom event handlers

You can create a custom event handler to listen to events from Lime CRM, both the built in core events, and events that you define and publish yourself from a custom endpoint or a custom limeobject.

To add a custom event handler to your package, run the following from the root of your package directory:

lime-project generate event-handler

There are two approaches to writing event handlers in Lime CRM. Both use the same register_event_handlers plugin hook function, but differ in how message acknowledgment is handled. We recommend the auto-acking approach for new development as it's simpler and automatically handles message acknowledgment.

Version requirement

The auto-acking event handler approach is available in lime-crm v2.1135.0 or later.

This approach uses the @inject decorator with a LimeApplication argument and automatically acknowledges messages for you:

import logging
import os
import requests
import typing
from lime_application import LimeApplication
from lime_core.dependencies import inject
from lime_event_handler import EventHandlerRepository

logger = logging.getLogger(__name__)


@inject
def company_renamed(body: typing.Mapping, application: LimeApplication):
    """Handle company rename events with automatic message acknowledgment"""
    logger.info(f"Received message: {body}")

    if "name" in body["original_values"]:
        logger.info("Company renamed, calling external API")

        # Use environment variable with fallback
        api_base_url = os.getenv("API_BASE_URL", "https://some.api")

        data = {
            "old_name": body["original_values"]["name"],
            "new_name": body["values"]["name"],
        }

        try:
            requests.post(f"{api_base_url}/company_renamed", json=data, timeout=30)
            logger.info("Successfully notified external API about company rename")
        except Exception as e:
            logger.error(f"Failed to rename company: {e}")


def register_event_handlers(
    repository: EventHandlerRepository, config: typing.Mapping
):
    repository.register_event_handler(
        handler_func=company_renamed,
        key="core.limeobject.company.update.v1",
        queue_name="company_renamed",
        auto_ack=True)  # Automatically acknowledge messages after handler completes

The auto_ack=True parameter enables automatic message acknowledgment. When set to True, Lime CRM will automatically acknowledge the message after your handler function completes, regardless of whether it succeeds or raises an exception. This simplifies error handling since you don't need to manually call message.ack() in your code.

Manual acknowledgment event handler

For cases where you need more control over message acknowledgment, you can use this approach:

import logging
import os
import requests
import typing
from lime_event_handler import EventHandlerRepository, EventHandlerWorker

logger = logging.getLogger(__name__)


def company_renamed(worker: EventHandlerWorker, body: typing.Mapping, message):
    """Handle company rename events with manual message acknowledgment"""
    logger.info(f"Received message: {body}")

    try:
        if "name" in body["original_values"]:
            logger.info("Company renamed, calling external API")

            # Use environment variable with fallback
            api_base_url = os.getenv("API_BASE_URL", "https://some.api")

            data = dict(
                old_name=body["original_values"]["name"],
                new_name=body["values"]["name"])
            requests.post(f"{api_base_url}/company_renamed", json=data, timeout=30)
            logger.info("Successfully notified external API about company rename")
    except Exception as e:
        logger.error(f"Failed to rename company: {e}")
    finally:
        # Always acknowledge the message, even if processing failed
        message.ack()


def register_event_handlers(
    repository: EventHandlerRepository, config: typing.Mapping
):
    repository.register_event_handler(
        handler_func=company_renamed,
        key="core.limeobject.company.update.v1",
        queue_name="company_renamed")

Blocking operations in event handlers

Be careful with blocking operations like HTTP requests in event handlers. When an event handler is processing a message (such as making an HTTP request), other messages in that handler's queue must wait. This is why the timeout parameter is crucial for requests calls - it prevents handlers from blocking indefinitely and holding up message processing. See Making external requests for details.

Choosing the right approach

  • Use the auto-acking approach for most event handlers. It's simpler, less error-prone, and automatically handles message acknowledgment.
  • Use the manual acknowledgment approach only when you need fine-grained control over when messages are acknowledged, such as when implementing complex error handling or transactional logic.

Testing event handlers

You can write unit tests for your event handlers using the drain_events fixture from lime_test. This fixture allows you to publish events and verify that your handlers process them correctly.

Basic test setup

import pytest
from lime_event_handler import EventHandlerRepository
from limepkg_example.event_handlers import register_event_handlers

pytestmark = pytest.mark.usefixtures("event_handler_service_locator")


def test_company_renamed_event_handler(
    lime_app,
    event_handler_repository: EventHandlerRepository,
    drain_events,
):
    # Register your event handlers
    register_event_handlers(event_handler_repository, config={})

    # Create test data
    company_event = {
        "values": {
            "id": 123,
            "name": "New Company Name"
        },
        "original_values": {
            "name": "Old Company Name"
        }
    }

    # Use drain_events context to capture and process events
    with drain_events(lime_app, event_handler_repository):
        lime_app.publish("core.limeobject.company.update.v1", company_event)

    # Your assertions here - check that external API was called, etc.

Testing with pytest-httpserver

For event handlers that make HTTP requests (like our company_renamed example), use pytest-httpserver to mock the external API:

import json
import os
import pytest
import requests
from pytest_httpserver import HTTPServer
from werkzeug import Request, Response
from lime_event_handler import EventHandlerRepository
from limepkg_example.event_handlers import register_event_handlers

pytestmark = pytest.mark.usefixtures("event_handler_service_locator")


def test_company_renamed_calls_external_api(
    lime_app,
    event_handler_repository: EventHandlerRepository,
    drain_events,
    httpserver: HTTPServer,
    monkeypatch,
):
    # Set the API base URL environment variable to point to test server
    monkeypatch.setenv("API_BASE_URL", httpserver.url_for(""))

    # Mock the API endpoint
    requests_received = []

    def handler(request: Request):
        requests_received.append({
            "body": json.loads(request.data),
            "headers": dict(request.headers)
        })
        return Response("OK", status=200)

    httpserver.expect_request("/company_renamed", method="POST").respond_with_handler(handler)

    # Register event handlers
    register_event_handlers(event_handler_repository, config={})

    # Create test event data
    company_event = {
        "values": {
            "id": 123,
            "name": "Acme Corporation"
        },
        "original_values": {
            "name": "Acme Inc"
        }
    }

    # Process the event
    with drain_events(lime_app, event_handler_repository):
        lime_app.publish("core.limeobject.company.update.v1", company_event)

    # Verify the external API was called correctly
    assert len(requests_received) == 1
    request_data = requests_received[0]["body"]
    assert request_data["old_name"] == "Acme Inc"
    assert request_data["new_name"] == "Acme Corporation"


def test_company_renamed_ignores_non_name_changes(
    lime_app,
    event_handler_repository: EventHandlerRepository,
    drain_events,
):
    """Test that events without name changes don't trigger API calls"""
    register_event_handlers(event_handler_repository, config={})

    # Event with no name change
    company_event = {
        "values": {
            "id": 123,
            "name": "Acme Inc",
            "phone": "+1-555-0199"
        },
        "original_values": {
            "phone": "+1-555-0123"
        }
    }

    # Process the event - should not make any external calls
    with drain_events(lime_app, event_handler_repository):
        lime_app.publish("core.limeobject.company.update.v1", company_event)

    # Test passes if no exceptions are raised (no API calls attempted)

Testing error handling

Test how your event handlers behave when external services fail:

def test_company_renamed_handles_api_error(
    lime_app,
    event_handler_repository: EventHandlerRepository,
    drain_events,
    httpserver: HTTPServer,
    monkeypatch,
    caplog,
):
    # Set the API base URL environment variable to point to test server
    monkeypatch.setenv("API_BASE_URL", httpserver.url_for(""))

    # Make the API return an error
    httpserver.expect_request("/company_renamed", method="POST").respond_with_data(
        "Internal Server Error", status=500
    )

    register_event_handlers(event_handler_repository, config={})

    company_event = {
        "values": {"id": 123, "name": "New Name"},
        "original_values": {"name": "Old Name"}
    }

    with caplog.at_level("ERROR"):
        with drain_events(lime_app, event_handler_repository):
            lime_app.publish("core.limeobject.company.update.v1", company_event)

    # Verify error was logged but event was still acknowledged
    assert "Failed to rename company" in caplog.text

Summary

You now have the tools to create robust event handlers for Lime CRM! Here's what we've covered:

  • Auto-acking event handlers are the recommended approach for new development - they're simpler and handle message acknowledgment automatically
  • Manual acknowledgment gives you fine-grained control when needed, with proper error handling using try-except-finally
  • Comprehensive testing using drain_events and pytest-httpserver ensures your handlers work correctly

Integration testing is crucial

While unit tests are essential for validating your event handler logic, always remember to do integration testing using a real RabbitMQ broker. Unit tests with drain_events are great for development and CI, but only integration tests can catch issues with message serialization, network failures, queue durability, and other real-world scenarios that happen in production environments.

Start with the auto-acking approach, write comprehensive tests, and you'll have reliable event handlers that can handle the complexities of distributed systems!