Skip to main content

Custom Jobs

Carium allows advanced customization through custom jobs. Custom jobs are custom Python scripts that can be executed in response to specific events in the system, on a schedule, or manually. Using custom jobs, you can automate tasks, integrate with external systems, and extend the functionality of Carium.

Code can be simple single-file scripts or complex multi-file projects fully-integrated with the Carium SDK. Carium runs custom jobs in a secure, isolated environment, ensuring that they don't interfere with the operation of the overall system.

A basic custom job is a Python class inheriting from carium.Job that implements the on_event method. The on_event method is called whenever the custom job is triggered by an event. Here is an example of a simple custom job that prints the event payload.

import carium

class MyJob(carium.Job):
def on_event(self, event: dict):
print("I got an event:", event)

The event payload will be written to standard output and stored in the Carium log store.

Executors

Each custom job is executed by an executor. An executor is a runtime environment owned by an organization and that runs custom jobs. It consists of a runtime version (including a vended set of libraries), a memory limit, and a timeout.

Configuration ItemDescription
NameThe name of the executor. Unique within the organization.
Runtime VersionThe version of Python that the custom job runs on. Can be a version, $STABLE, or $LATEST.
Memory LimitThe maximum amount of memory that the custom job can use.
TimeoutThe maximum amount of time that the custom jobs can run before being terminated.

Carium provides a default executor that runs custom jobs using the latest $STABLE runtime version with a memory limit of 128MB and a timeout of 600 seconds.

Executors can be created using the /pylon/v1/customjob-executors API suite or the crm py-jobexec CLI. For example, to create an executor that runs custom jobs using the latest runtime version with a memory limit of 128MB:

crm py-jobexec create my_executor_name --runtime-version='$LATEST' --memory-limit=128

Once an executor is created, jobs can be assigned to it using the crm py-jobspec create|update CLI --executor flag.

crm py-jobspec update my_custom_job_name --executor=my_executor_name

Job Configuration

Custom jobs are configured using a py-jobspec object. A py-jobspec object contains the following configuration.

Core Parameters
NameThe unique name of the custom job.
Organization IDThe organization that the custom job belongs to.
ExecutorThe executor that the custom job runs on.
CodeThe Python code that the custom job executes.
Class NamePython class name in uploaded code
Event Triggers
ObjectThe object that the event is associated with.
EventThe event that triggers the custom job.
Filter Args

Additional filter arguments to match the event specified as a MongoDB $match query. This allows refining the event trigger to a subset of events.

Schedule Triggers
Schedule RRULEThe recurrence rule for the schedule.
Schedule StartThe start time of the scheduler.
Schedule TimezoneThe timezone of the scheduler.
Security
Allow Participant

Whether the custom job can be triggered by a participant. This allows writing trusted code and an argument schema, and then having end-users trigger the job.

Allow ProviderWhether the custom job can be triggered by a provider. Similar to "allow participant."
Run AsThe user ID that the custom job runs as. You must have write access to the user to change this setting.
Miscellaneous
Log LimitHow many invocation logs to retain.
Log LevelThe log level of the custom job.
Runs Per MinuteThe maximum number of times the custom job can run per minute.
MonitoredWhether an alert should be generated if the custom job fails.
Allow ContinuationWhether the custom job can be continued after a failure.

Event Hooks

Custom jobs support a wide range of event hooks that can be used to trigger custom jobs. A full list of events can be found here.

For example, if we want to create a todo whenever a new appointment is created, we can subscribe to the appointment:create event:

from brewmaster.objects.caredb import TodoEntry

class MakeATodoJob(carium.Job):
def on_event(self, event):
first_name = event["attributes"]["individual"]["first-name"]
indv_id = event["attributes"]["individual"]["id"]
seeing = event["attributes"]["practitioner"]["display-name"]

TodoEntry.create({
"text": f"Prepare for appointment with {first_name}",
"notes": f"The pt will see {seeing}",
"organization-id": event["organization-id"],
"references": [{"type": "individual", "id": indv_id}],
})

Next, we subscribe our existing job to the event:

crm py-jobspec update my_custom_job_name --object=appointment --event=create

Now, whenever a new appointment is created, the custom job will be executed and our code will create a new todo.

Secret Management

Custom jobs support injecting secrets into custom job at runtime through the use of customjob-secrets. Secrets are stored in the Carium secret store and can be referenced in custom jobs by name. The Carium secret store is an encrypted, secure storage for sensitive information such as API keys, passwords, and other secrets. Using the secret store ensures that sensitive information isn't stored in the custom job code, configuration files, or git repositories.

You can use either the /v1/customjob-secrets API suite or crm py-jobsecret CLI to manage secrets.

# Create the secret value
$ crm py-jobsecret create the_secret_name --json-value '{"secret-key": "secret-value"}'
Secret b515841f-886d-4c13-a8b8-9321f81228b5 created

# Attach the secret to an existing custom job
$ crm py-jobspec add-secret the_job_name injection_key --secret-name the_secret_name

The secret value is now injected into every execution of the custom job under the secret key injection_key. The custom job code can access the secret value using self.secrets['injection_key'].

Storage

Custom jobs can read and write values to a dedicated key-value store. This allows custom jobs to store and retrieve data between executions, enabling stateful processing. You can interact with the Custom job storage system using the /pylon/v1/customjob-storages API suite, the crm py-jobspec *-storage CLI, or the self.storage object in custom job code.

Here is an example where every time the custom job is executed, it increments a counter stored in the key-value store:

class MyJob(carium.Job):
def on_event(self, event):
value = self.storage.get("my-counter", default=0)
self.storage.set("my-counter", value + 1)

Logging and Metrics

Custom jobs can write to standard output and standard error. Both streams are captured and stored in the Carium log store. Each invocation of a custom job is associated with a unique execution ID called the "custom job log." Metrics and logs are stored on the custom job log.

You can filter through job logs for a given custom job with the /pylon/v1/customjob-logs API. Specific logs can be retrieved using crm py-jobspec log,

crm py-jobspec log my_custom_job_name 71e6b7b4-57ac-47a9-a1e5-8e0b17a9faa4
# Id : 71e6b7b4-57ac-47a9-a1e5-8e0b17a9faa4
# Created : 2024-06-24 22:58:02
# Duration: 1.101896
# Status : success
# Input :
# {
# "example-input": "example-value"
# }
# Result : n/a
# Stdout :
# I printed this string to stdout!
#
# Stderr :
#
#
# # Runtime Statistics: {"stdout.size": 55, "stderr.size": 0}

Viewing the logs of a custom job can be useful for debugging and monitoring the execution of custom jobs.

Queues

Custom jobs support execution queues, allowing you to control the rate, order, and parallelism in which custom jobs invocations are executed. Queues are useful for managing large numbers of custom jobs, ensuring that they're executed in a controlled manner. You can interact with the Custom job queue system using the /pylon/v1/customjob-queues API suite or crm py-jobspec *-queue CLI.

For example, suppose that we want to onboard 1,000 patients into the system. However, we want to limit the number of onboarding jobs that are executed at any given time to 1. We can create a queue with a concurrency of 1 and attach it to the onboarding custom job:

crm py-jobspec create-queue my_onboarding_jobspec the_queue_name --worker-count 1

names=("Alice" "Bob" "Charlie" "David" "Eve") # ... 1,000 names
for name in "${names[@]}"; do
crm py-jobspec create-queue-entry the_queue_name '{"name": "$name"}'
done

The onboarding custom job will be executed for each patient serially.

SFTP Integration

Custom jobs can be configured to read, write, or react to file events from the Carium hosted SFTP server.

Event Deduplication (Debouncing)

Custom jobs can be configured to deduplicate executions, ensuring that only one instance of an event instance is processed. This is commonly referred to as debouncing.

By default, custom jobs deduplicate executions based on the hash of the entire event payload over a window of 60 seconds. This means that if an identical event is received within 60 seconds of the previous event, the custom job will not be executed. Instead, the execution will exit early with a status of dropped-dup.