Versioning - Python SDK
The definition code of a Temporal Workflow must be deterministic because Temporal uses event sourcing to reconstruct the Workflow state by replaying the saved history event data on the Workflow definition code. This means that any incompatible update to the Workflow Definition code could cause a non-deterministic issue if not handled correctly.
Introduction to Versioning
Because we design for potentially long running Workflows at scale, versioning with Temporal works differently. We explain more in this optional 30 minute introduction: https://www.youtube.com/watch?v=kkP899WxgzY
How to use the Python SDK Patching API
In principle, the Python SDK's patching mechanism operates similarly to other SDKs in a "feature-flag" fashion. However, the "versioning" API now uses the concept of "patching in" code.
To understand this, you can break it down into three steps, which reflect three stages of migration:
- Running
pre_patch_activity
code while concurrently patching inpost_patch_activity
. - Running
post_patch_activity
code with deprecation markers formy-patch
patches. - Running only the
post_patch_activity
code.
Let's walk through this process in sequence.
Suppose you have an initial Workflow version called pre_patch_activity
:
View the source code
in the context of the rest of the application code.
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from activities import pre_patch_activity
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
pre_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
Now, you want to update your code to run post_patch_activity
instead. This represents your desired end state.
View the source code
in the context of the rest of the application code.
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from activities import post_patch_activity
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
Problem: You cannot deploy post_patch_activity
directly until you're certain there are no more running Workflows created using the pre_patch_activity
code, otherwise you are likely to cause a non-deterministic error.
Instead, you'll need to deploy post_patched_activity
and use the patched function to determine which version of the code to execute.
Implementing patching involves three steps:
- Use patched to patch in new code and run it alongside the old code.
- Remove the old code and apply deprecate_patch.
- Once you're confident that all old Workflows have finished executing, remove
deprecate_patch
.
View the source code
in the context of the rest of the application code.
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
Patching in new code
Using patched
inserts a marker into the Workflow History.
During replay, if a Worker encounters a history with that marker, it will fail the Workflow task when the Workflow code doesn't produce the same patch marker (in this case, my-patch
). This ensures you can safely deploy code from post_patch_activity
as a "feature flag" alongside the original version (pre_patch_activity
).
View the source code
in the context of the rest of the application code.
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
if workflow.patched("my-patch"):
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
else:
self._result = await workflow.execute_activity(
pre_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
Understanding deprecated Patches in the Python SDK
After ensuring that all Workflows started with pre_patch_activity
code have finished, you can deprecate the patch.
Once you're confident that your Workflows are no longer running the pre-patch code paths, you can deploy your code with deprecate_patch()
.
These Workers will be running the most up-to-date version of the Workflow code, which no longer requires the patch.
The deprecate_patch()
function works similarly to the patched()
function by recording a marker in the Workflow history.
This marker does not fail replay when Workflow code does not emit it.
Deprecated patches serve as a bridge between the pre-patch code paths and the post-patch code paths, and are useful for avoiding errors resulting from patched code paths in your Workflow history.
View the source code
in the context of the rest of the application code.
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
workflow.deprecate_patch("my-patch")
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
Safe Deployment of post_patch_activity
Once you're sure that you will no longer need to Query or Replay any of your pre-patch Workflows, you can then safely deploy Workers that no longer use either the patched()
or deprecate_patch()
calls:
View the source code
in the context of the rest of the application code.
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
How to use Worker Versioning in Python
Worker Versioning is currently in Pre-release.
See the Pre-release README for more information.
A Build ID corresponds to a deployment. If you don't already have one, we recommend a hash of the code--such as a Git SHA--combined with a human-readable timestamp. To use Worker Versioning, you need to pass a Build ID to your Java Worker and opt in to Worker Versioning.
Assign a Build ID to your Worker and opt in to Worker Versioning
You should understand assignment rules before completing this step. See the Worker Versioning Pre-release README for more information.
To enable Worker Versioning for your Worker, assign the Build ID--perhaps from an environment variable--and turn it on.
# ...
worker = Worker(
task_queue="your_task_queue_name",
build_id=build_id,
use_worker_versioning=True,
# ... register workflows & activities, etc
)
# ...
Importantly, when you start this Worker, it won't receive any tasks until you set up assignment rules.
Specify versions for Activities, Child Workflows, and Continue-as-New Workflows
Python support for this feature is under construction!
By default, Activities, Child Workflows, and Continue-as-New Workflows are run on the build of the workflow that created them if they are also configured to run on the same Task Queue. When configured to run on a separate Task Queue, they will default to using the current assignment rules.
If you want to override this behavior, you can specify your intent via the versioning_intent
argument available on the methods you use to invoke these commands.
For example, if you want an Activity to use the latest assignment rules rather than inheriting from its parent:
# ...
await workflow.execute_activity(
say_hello,
"hi",
versioning_intent=VersioningIntent.USE_ASSIGNMENT_RULES,
start_to_close_timeout=timedelta(seconds=5),
)
# ...
Tell the Task Queue about your Worker's Build ID (Deprecated)
This section is for a previous Worker Versioning API that is deprecated and will go away at some point. Please redirect your attention to Worker Versioning.
Now you can use the SDK (or the Temporal CLI) to tell the Task Queue about your Worker's Build ID. You might want to do this as part of your CI deployment process.
# ...
await client.update_worker_build_id_compatibility(
"your_task_queue_name", BuildIdOpAddNewDefault("deadbeef")
)
This code adds the deadbeef
Build ID to the Task Queue as the sole version in a new version set, which becomes the default for the queue.
New Workflows execute on Workers with this Build ID, and existing ones will continue to process by appropriately compatible Workers.
If, instead, you want to add the Build ID to an existing compatible set, you can do this:
# ...
await client.update_worker_build_id_compatibility(
"your_task_queue_name", BuildIdOpAddNewCompatible("deadbeef", "some-existing-build-id")
)
This code adds deadbeef
to the existing compatible set containing some-existing-build-id
and marks it as the new default Build ID for that set.
You can also promote an existing Build ID in a set to be the default for that set:
# ...
await client.update_worker_build_id_compatibility(
"your_task_queue_name", BuildIdOpPromoteBuildIdWithinSet("deadbeef")
)
You can also promote an entire set to become the default set for the queue. New Workflows will start using that set's default build.
# ...
await client.update_worker_build_id_compatibility(
"your_task_queue_name", BuildIdOpPromoteSetByBuildId("deadbeef")
)