Custom Jobs
Carium allows advanced customization through custom jobs. Custom jobs are custom Python scripts that can be executed in response to specific events in the system, on a schedule, or manually. Using custom jobs, you can automate tasks, integrate with external systems, and extend the functionality of Carium.
Code can be simple single-file scripts or complex multi-file projects fully-integrated with the Carium SDK. Carium runs custom jobs in a secure, isolated environment, ensuring that they don't interfere with the operation of the overall system.
A basic custom job is a Python class inheriting from carium.Job
that implements the on_event
method. The on_event
method is called whenever the custom job is triggered by an event. Here is an example of a simple custom job that prints
the event payload.
import carium
class MyJob(carium.Job):
def on_event(self, event: dict):
print("I got an event:", event)
The event payload will be written to standard output and stored in the Carium log store.
Executors
Each custom job is executed by an executor. An executor is a runtime environment owned by an organization and that runs custom jobs. It consists of a runtime version (including a vended set of libraries), a memory limit, and a timeout.
Configuration Item | Description |
---|---|
Name | The name of the executor. Unique within the organization. |
Runtime Version | The version of Python that the custom job runs on. Can be a version, $STABLE , or $LATEST . |
Memory Limit | The maximum amount of memory that the custom job can use. |
Timeout | The maximum amount of time that the custom jobs can run before being terminated. |
Carium provides a default executor that runs custom jobs using the latest $STABLE
runtime version with a memory limit
of 128MB and a timeout of 600 seconds.
Executors can be created using the
/pylon/v1/customjob-executors
API suite or the crm py-jobexec
CLI.
For example, to create an executor that runs custom jobs using the latest runtime version with a memory limit of 128MB:
crm py-jobexec create my_executor_name --runtime-version='$LATEST' --memory-limit=128
Once an executor is created, jobs can be assigned to it using the crm py-jobspec create|update
CLI --executor
flag.
crm py-jobspec update my_custom_job_name --executor=my_executor_name
Job Configuration
Custom jobs are configured using a py-jobspec
object. A py-jobspec
object contains the following configuration.
Core Parameters | |
---|---|
Name | The unique name of the custom job. |
Organization ID | The organization that the custom job belongs to. |
Executor | The executor that the custom job runs on. |
Code | The Python code that the custom job executes. |
Class Name | Python class name in uploaded code |
Event Triggers | |
Object | The object that the event is associated with. |
Event | The event that triggers the custom job. |
Filter Args | Additional filter arguments to match the event specified as a MongoDB $match query. This allows refining the event trigger to a subset of events. |
Schedule Triggers | |
Schedule RRULE | The recurrence rule for the schedule. |
Schedule Start | The start time of the scheduler. |
Schedule Timezone | The timezone of the scheduler. |
Security | |
Allow Participant | Whether the custom job can be triggered by a participant. This allows writing trusted code and an argument schema, and then having end-users trigger the job. |
Allow Provider | Whether the custom job can be triggered by a provider. Similar to "allow participant." |
Run As | The user ID that the custom job runs as. You must have write access to the user to change this setting. |
Miscellaneous | |
Log Limit | How many invocation logs to retain. |
Log Level | The log level of the custom job. |
Runs Per Minute | The maximum number of times the custom job can run per minute. |
Monitored | Whether an alert should be generated if the custom job fails. |
Allow Continuation | Whether the custom job can be continued after a failure. |
Event Hooks
Custom jobs support a wide range of event hooks that can be used to trigger custom jobs. A full list of events can be found here.
For example, if we want to create a todo whenever a new appointment is created, we can subscribe to the
appointment:create
event:
from brewmaster.objects.caredb import TodoEntry
class MakeATodoJob(carium.Job):
def on_event(self, event):
first_name = event["attributes"]["individual"]["first-name"]
indv_id = event["attributes"]["individual"]["id"]
seeing = event["attributes"]["practitioner"]["display-name"]
TodoEntry.create({
"text": f"Prepare for appointment with {first_name}",
"notes": f"The pt will see {seeing}",
"organization-id": event["organization-id"],
"references": [{"type": "individual", "id": indv_id}],
})
Next, we subscribe our existing job to the event:
crm py-jobspec update my_custom_job_name --object=appointment --event=create
Now, whenever a new appointment is created, the custom job will be executed and our code will create a new todo.
Secret Management
Custom jobs support injecting secrets into custom job at runtime through the use of customjob-secrets
. Secrets are
stored in the Carium secret store and can be referenced in custom jobs by name. The Carium secret store is an encrypted,
secure storage for sensitive information such as API keys, passwords, and other secrets. Using the secret store ensures
that sensitive information isn't stored in the custom job code, configuration files, or git repositories.
You can use either the /v1/customjob-secrets
API suite or
crm py-jobsecret
CLI to manage secrets.
# Create the secret value
$ crm py-jobsecret create the_secret_name --json-value '{"secret-key": "secret-value"}'
Secret b515841f-886d-4c13-a8b8-9321f81228b5 created
# Attach the secret to an existing custom job
$ crm py-jobspec add-secret the_job_name injection_key --secret-name the_secret_name
The secret value is now injected into every execution of the custom job under the secret key injection_key
. The custom
job code can access the secret value using self.secrets['injection_key']
.
Storage
Custom jobs can read and write values to a dedicated key-value store. This allows custom jobs to store and retrieve data
between executions, enabling stateful processing. You can interact with the Custom job storage system using the
/pylon/v1/customjob-storages
API suite, the crm py-jobspec *-storage
CLI, or the self.storage
object in custom job code.
Here is an example where every time the custom job is executed, it increments a counter stored in the key-value store:
class MyJob(carium.Job):
def on_event(self, event):
value = self.storage.get("my-counter", default=0)
self.storage.set("my-counter", value + 1)
Logging and Metrics
Custom jobs can write to standard output and standard error. Both streams are captured and stored in the Carium log store. Each invocation of a custom job is associated with a unique execution ID called the "custom job log." Metrics and logs are stored on the custom job log.
You can filter through job logs for a given custom job with the
/pylon/v1/customjob-logs
API. Specific logs can be retrieved using
crm py-jobspec log
,
crm py-jobspec log my_custom_job_name 71e6b7b4-57ac-47a9-a1e5-8e0b17a9faa4
# Id : 71e6b7b4-57ac-47a9-a1e5-8e0b17a9faa4
# Created : 2024-06-24 22:58:02
# Duration: 1.101896
# Status : success
# Input :
# {
# "example-input": "example-value"
# }
# Result : n/a
# Stdout :
# I printed this string to stdout!
#
# Stderr :
#
#
# # Runtime Statistics: {"stdout.size": 55, "stderr.size": 0}
Viewing the logs of a custom job can be useful for debugging and monitoring the execution of custom jobs.
Queues
Custom jobs support execution queues, allowing you to control the rate, order, and parallelism in which custom jobs
invocations are executed. Queues are useful for managing large numbers of custom jobs, ensuring that they're executed in
a controlled manner. You can interact with the Custom job queue system using the
/pylon/v1/customjob-queues
API suite or crm py-jobspec *-queue
CLI.
For example, suppose that we want to onboard 1,000 patients into the system. However, we want to limit the number of onboarding jobs that are executed at any given time to 1. We can create a queue with a concurrency of 1 and attach it to the onboarding custom job:
crm py-jobspec create-queue my_onboarding_jobspec the_queue_name --worker-count 1
names=("Alice" "Bob" "Charlie" "David" "Eve") # ... 1,000 names
for name in "${names[@]}"; do
crm py-jobspec create-queue-entry the_queue_name '{"name": "$name"}'
done
The onboarding custom job will be executed for each patient serially.
SFTP Integration
Custom jobs can be configured to read, write, or react to file events from the Carium hosted SFTP server.
Event Deduplication (Debouncing)
Custom jobs can be configured to deduplicate executions, ensuring that only one instance of an event instance is processed. This is commonly referred to as debouncing.
By default, custom jobs deduplicate executions based on the hash of the entire event payload over a window of 60
seconds. This means that if an identical event is received within 60 seconds of the previous event, the custom job will
not be executed. Instead, the execution will exit early with a status of dropped-dup
.