Workflow message passing - TypeScript SDK
A Workflow can act like a stateful web service that receives messages: Queries, Signals, and Updates. The Workflow implementation defines these endpoints via handler methods that can react to incoming messages and return values. Temporal Clients use messages to read Workflow state and control its execution. See Workflow message passing for a general overview of this topic. This page introduces these features for the Temporal Typescript SDK.
Write message handlersβ
The code that follows is part of a working message-passing sample.
Follow these guidelines when writing your message handlers:
- Define a message type as a global variable using
defineQuery
,defineSignal
, ordefineUpdate
. This is what your client code will use to send a message to the workflow. - Message handlers are defined by calling
workflow.setHandler
in your Workflow function. - The parameters and return values of handlers and the main Workflow function must be serializable.
- Prefer using a single object over multiple input parameters. A single object allows you to add fields without changing the signature.
Query handlersβ
A Query is a synchronous operation that retrieves state from a Workflow Execution:
export enum Language {
ARABIC = 'ARABIC',
CHINESE = 'CHINESE',
ENGLISH = 'ENGLISH',
FRENCH = 'FRENCH',
HINDI = 'HINDI',
PORTUGUESE = 'PORTUGUESE',
SPANISH = 'SPANISH',
}
interface GetLanguagesInput {
includeUnsupported: boolean;
}
// π Use the object returned by defineQuery to set the query handler in
// Workflow code, and when sending the Query in Client code.
export const getLanguages = wf.defineQuery<Language[], [GetLanguagesInput]>('getLanguages');
export async function greetingWorkflow(): Promise<string> {
const greetings: Partial<Record<Language, string>> = {
[Language.CHINESE]: 'δ½ ε₯½οΌδΈη',
[Language.ENGLISH]: 'Hello, world',
};
wf.setHandler(getLanguages, (input: GetLanguagesInput): Language[] => {
// π A Query handler returns a value: it must not mutate the Workflow state
// and can't perform async operations.
if (input.includeUnsupported) {
return Object.values(Language);
} else {
return Object.keys(greetings) as Language[];
}
});
...
}
-
A Query handler cannot be
async
. You can't perform async operations like executing an Activity in a Query handler. -
setHandler
can takeQueryHandlerOptions
(such asdescription
) as described in the API reference docs forworkflow.setHandler
.
Signal handlersβ
A Signal is an asynchronous message sent to a running Workflow Execution to change its state and control its flow:
// π Use the object returned by defineSignal to set the Signal handler in
// Workflow code, and to send the Signal from Client code.
export const approve = wf.defineSignal<[ApproveInput]>('approve');
export async function greetingWorkflow(): Promise<string> {
let approvedForRelease = false;
let approverName: string | undefined;
wf.setHandler(approve, (input) => {
// π A Signal handler mutates the Workflow state but cannot return a value.
approvedForRelease = true;
approverName = input.name;
});
...
}
...
-
The handler cannot return a value. The response is sent immediately from the server, without waiting for the Workflow to process the Signal.
-
Signal (and Update) handlers can be
async
. This allows you to use Activities, Child Workflows, durableworkflow.sleep
Timers,workflow.condition
conditions, and more. See Async handlers and Workflow message passing for guidelines on safely using async Signal and Update handlers. -
setHandler
can takeSignalHandlerOptions
(such asdescription
andunfinishedPolicy
) as described in the API reference docs forworkflow.setHandler
.
Update handlers and validatorsβ
An Update is a trackable synchronous request sent to a running Workflow Execution. It can change the Workflow state, control its flow, and return a result. The sender must wait until the Worker accepts or rejects the Update. The sender may wait further to receive a returned value or an exception if something goes wrong:
// π Use the object returned by defineUpdate to set the Update handler in
// Workflow code, and to send Updates from Client code.
export const setLanguage = wf.defineUpdate<Language, [Language]>('setLanguage');
export async function greetingWorkflow(): Promise<string> {
const greetings: Partial<Record<Language, string>> = {
[Language.CHINESE]: 'δ½ ε₯½οΌδΈη',
[Language.ENGLISH]: 'Hello, world',
};
let language = Language.ENGLISH;
wf.setHandler(
setLanguage,
(newLanguage: Language) => {
// π An Update handler can mutate the Workflow state and return a value.
const previousLanguage = language;
language = newLanguage;
return previousLanguage;
},
{
validator: (newLanguage: Language) => {
// π Update validators are optional
if (!(newLanguage in greetings)) {
throw new Error(`${newLanguage} is not supported`);
}
},
}
);
...
}
-
setHandler
can takeUpdateHandlerOptions
(such asvalidator
,description
andunfinishedPolicy
) as described in the API reference docs forworkflow.setHandler
. -
About validators:
- Use validators to reject an Update before it is written to History. Validators are always optional. If you don't need to reject Updates, you don't need a validator.
- To set a validator, pass the validator function in
UpdateHandlerOptions
when callingworkflow.setHandler
. The validator must be a non-async function that accepts the same argument types as the handler and returnsvoid
.
-
Accepting and rejecting Updates with validators:
- To reject an Update, throw an error of any type in the validator.
- Without a validator, Updates are always accepted.
-
Validators and Event History:
- The
WorkflowExecutionUpdateAccepted
event is written into History whether the acceptance was automatic or due to a validator function not throwing an error. - When a Validator throws an error, the Update is rejected and
WorkflowExecutionUpdateAccepted
won't be added to the Event History. The caller receives an "Update failed" error.
- The
-
Use
workflow.currentUpdateInfo
to obtain information about the current Update. This includes the Update ID, which can be useful for deduplication when using Continue-As-New: see Ensuring your messages are processed exactly once. -
Update (and Signal) handlers can be
async
, letting them use Activities, Child Workflows, durableworkflow.sleep
Timers,workflow.condition
conditions, and more. See Async handlers and Workflow message passing for safe usage guidelines.
Send messagesβ
To send Queries, Signals, or Updates, you call methods on a WorkflowHandle object:
-
Use
client.workflow.start
and return its handle. -
Use
client.workflow.getHandle
to retrieve a Workflow handle by its Workflow Id.
For example:
const handle = await client.workflow.start(greetingWorkflow, {
taskQueue: 'my-task-queue',
args: [myArg],
workflowId: 'my-workflow-id',
});
To check the argument types required when sending messages -- and the return type for Queries and Updates -- refer to the corresponding handler method in the Workflow Definition.
Send a Queryβ
Use WorkflowHandle.query
to send a Query to a Workflow Execution:
const supportedLanguages = await handle.query(getLanguages, { includeUnsupported: false });
-
Sending a Query doesnβt add events to a Workflow's Event History.
-
You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period. This includes Workflows that have completed, failed, or timed out. Querying terminated Workflows is not safe and, therefore, not supported.
-
A Worker must be online and polling the Task Queue to process a Query.
Send a Signalβ
You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution. However, you can only send Signals to Workflow Executions that havenβt closed.
Send a Signal from a Clientβ
Use WorkflowHandle.signal to send a Signal:
await handle.signal(greetingWorkflow.approve, { name: 'me' });
-
The call returns when the server accepts the Signal; it does not wait for the Signal to be delivered to the Workflow Execution.
-
The WorkflowExecutionSignaled Event appears in the Workflow's Event History.
Send a Signal from a Workflowβ
A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.
Use getExternalWorkflowHandle
:
import { getExternalWorkflowHandle } from '@temporalio/workflow';
import { joinSignal } from './other-workflow';
export async function yourWorkflowThatSignals() {
const handle = getExternalWorkflowHandle('workflow-id-123');
await handle.signal(joinSignal, { userId: 'user-1', groupId: 'group-1' });
}
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiated Event appears in the sender's Event History.
- A WorkflowExecutionSignaled Event appears in the recipient's Event History.
The getExternalWorkflowHandle
method helps ensure that Workflows remain deterministic.
Recall that one aspect of deterministic Workflows means not directly making network calls from the Workflow.
This means that developers cannot use a Temporal Client directly within the Workflow code to send Signals or start other Workflows.
Instead, to communicate between Workflows, we use getExternalWorkflowHandle
to both ensure that Workflows remain deterministic and also that these interactions are recorded as Events in the Workflow's Event History.
Signal-With-Startβ
Signal-With-Start allows a Client to send a Signal to a Workflow Execution, starting the Execution if it is not already running.
Use Client.workflow.signalWithStart
:
import { Client } from '@temporalio/client';
import { joinSignal, yourWorkflow } from './workflows';
const client = new Client();
await client.workflow.signalWithStart(yourWorkflow, {
workflowId: 'workflow-id-123',
taskQueue: 'my-taskqueue',
args: [{ foo: 1 }],
signal: joinSignal,
signalArgs: [{ userId: 'user-1', groupId: 'group-1' }],
});
Signal-With-Start is limited to Client use. It cannot be called from a Workflow.
Send an Updateβ
An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result.
A client sending an Update must wait until the Server delivers the Update to a Worker. Workers must be available and responsive. If you need a response as soon as the Server receives the request, use a Signal instead. Also note that you can't send Updates to other Workflow Executions or perform an Update equivalent of Signal-With-Start.
WorkflowExecutionUpdateAccepted
is added to the Event History when the Worker confirms that the Update passed validation.WorkflowExecutionUpdateCompleted
is added to the Event History when the Worker confirms that the Update has finished.
To send an Update to a Workflow Execution, you can:
-
Call
WorkflowHandle.executeUpdate
and wait for the Update to complete. This code fetches an Update result:let previousLanguage = await handle.executeUpdate(setLanguage, {
args: [Language.CHINESE],
}); -
Send
WorkflowHandle.startUpdate
to receive anWorkflowUpdateHandle
as soon as the Update is accepted or rejected.- Use this
UpdateHandle
later to fetch your results. async
Update handlers normally perform long-running asynchronous operations, such as calling an Activity.startUpdate
only waits until the Worker has accepted or rejected the Update, not until all asynchronous operations are complete.
For example:
const updateHandle = await handle.startUpdate(setLanguage, {
args: [Language.ENGLISH],
waitForStage: WorkflowUpdateStage.ACCEPTED,
});
previousLanguage = await updateHandle.result();For more details, see the "Async handlers" section.
- Use this
To obtain an Update handle, you can:
- Use
WorkflowHandle.startUpdate
to start an Update and return the handle, as shown in the preceding example. - Use
getUpdateHandle
to fetch a handle for an in-progress Update using the Update ID.
In real-world development, sometimes you may be unable to import message type objects defined by defineQuery
, defineSignal
, or defineUpdate
.
When you don't have access to the Workflow Definition or it isn't written in Typescript, you can still use APIs that aren't type-safe, and dynamic method invocation.
Pass message type names instead of message type objects to:
client.workflow.start
WorkflowHandle.query
- WorkflowHandle.signal
WorkflowHandle.executeUpdate
WorkflowHandle.startUpdate
Pass Workflow IDs to these APIs to get Workflow handles:
Message handler patternsβ
This section covers common write operations, such as Signal and Update handlers. It doesn't apply to pure read operations, like Queries or Update Validators.
For additional information, see Inject work into the main Workflow, Ensuring your messages are processed exactly once, and this sample demonstrating safe async
message handling.
Use async handlersβ
Signal and Update handlers can be async
functions.
Using async
allows you to use await
with Activities, Child Workflows, durable workflow.sleep
Timers, workflow.condition
conditions, etc.
This expands the possibilities for what can be done by a handler but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at await
calls.
It's essential to understand the things that could go wrong in order to use async
handlers safely.
See Workflow message passing for guidance on safe usage of async Signal and Update handlers, the Safe message handlers sample, and the Controlling handler concurrency and Waiting for message handlers to finish sections below.
The following code executes an Activity that makes a network call to a remote service.
It modifies the Update handler from earlier on this page, turning it into an async
function:
// π Use the objects returned by defineUpdate to set the Update handler in
// Workflow code, and to send Updates from Client code.
export const setLanguageUsingActivity = wf.defineUpdate<Language, [Language]>('setLanguageUsingActivity');
export async function greetingWorkflow(): Promise<string> {
const greetings: Partial<Record<Language, string>> = {
[Language.CHINESE]: 'δ½ ε₯½οΌδΈη',
[Language.ENGLISH]: 'Hello, world',
};
let language = Language.ENGLISH;
const lock = new Mutex();
wf.setHandler(setLanguageUsingActivity, async (newLanguage) => {
// π An Update handler can mutate the Workflow state and return a value.
// π Since this update handler is async, it can execute an activity.
if (!(newLanguage in greetings)) {
// π Do the following with the lock held to ensure that multiple calls to set_language are processed in order.
await lock.runExclusive(async () => {
if (!(newLanguage in greetings)) {
const greeting = await callGreetingService(newLanguage);
if (!greeting) {
// π An update validator cannot be async, so cannot be used to check that the remote
// call_greeting_service supports the requested language. Raising ApplicationError
// will fail the Update, but the WorkflowExecutionUpdateAccepted event will still be
// added to history.
throw new wf.ApplicationFailure(`${newLanguage} is not supported by the greeting service`);
}
greetings[newLanguage] = greeting;
}
});
}
const previousLanguage = language;
language = newLanguage;
return previousLanguage;
});
...
}
After updating the code to use async
, your Update handler can schedule an Activity and await the result.
Although an async
Signal handler can also execute an Activity, using an Update handler allows the client to receive a result or error once the Activity completes.
This lets your client track the progress of asynchronous work performed by the Update's Activities, Child Workflows, etc.
Add wait conditions to blockβ
Sometimes, async
Signal or Update handlers need to meet certain conditions before they should continue.
You can use workflow.condition
to prevent the code from proceeding until a condition is true.
You specify the condition by passing a function that returns true
or false
.
This is an important feature that helps you control your handler logic.
Here are three important use cases for workflow.condition
:
- Waiting for a Signal or Update to arrive
- Waiting in a handler until it is appropriate to continue.
- Waiting in the main Workflow until all active handlers have finished.
Wait for a Signal or Update to arriveβ
It's common to use workflow.condition
to wait for a particular Signal or Update to be sent by a Client:
export async function greetingWorkflow(): Promise<string> {
let approvedForRelease = false;
let approverName: string | undefined;
wf.setHandler(approve, (input) => {
approvedForRelease = true;
approverName = input.name;
});
...
await wf.condition(() => approvedForRelease);
...
}
Use wait conditions in handlersβ
It's common to use a Workflow wait condition in a handler.
For example, suppose your Workflow has a mutable variable readyForUpdateToExecute
that indicates whether your Update handler should be allowed to start executing.
You can use workflow.condition
in the handler to make the handler pause until the condition is met:
let readyForUpdateToExecute = false;
wf.setHandler(myUpdate, async (input: MyUpdateInput): Promise<MyUpdateOutput> => {
await wf.condition(() => readyForUpdateToExecute);
...
});
Remember: handlers can execute before the main Workflow method starts.
You can also use wait conditions anywhere else in the handler to wait for a specific condition to become true. This allows you to write handlers that pause at multiple points, each time waiting for a required condition to become true.
Ensure your handlers finish before the Workflow completesβ
Workflow wait conditions can ensure your handler completes before a Workflow finishes.
When your Workflow uses async
Signal or Update handlers, your main Workflow method can return or Continue-as-New while a handler is still waiting on an async task, such as an Activity result.
The Workflow completing may interrupt the handler before it finishes crucial work and cause client errors when trying retrieve Update results.
Use workflow.condition
and allHandlersFinished
to address this problem and allow your Workflow to end smoothly:
export async function myWorkflow(): Promise<MyWorkflowOutput> {
await wf.condition(wf.allHandlersFinished);
return workflowOutput;
}
By default, your Worker will log a warning when you allow a Workflow Execution to finish with unfinished handler executions.
You can silence these warnings on a per-handler basis by setting the unfinishedPolicy
in SignalHandlerOptions
or UpdateHandlerOptions
when calling workflow.setHandler
See Finishing handlers before the Workflow completes for more information.
Use a lock to prevent concurrent handler executionβ
Concurrent processes can interact in unpredictable ways. Incorrectly written concurrent message-passing code may not work correctly when multiple handler instances run simultaneously. Here's an example of a pathological case:
export async function myWorkflow(): Promise<MyWorkflowOutput> {
let x = 0;
let y = 0;
wf.setHandler(mySignal, async () => {
const data = await myActivity();
x = data.x;
// ππ Bug!! If multiple instances of this handler are executing
// concurrently, then there may be times when the Workflow has x from one
// Activity execution and y from another.
await wf.sleep(500); // or await anything else
y = data.y;
});
...
}
Coordinating access using a lock (also known as a mutex) corrects this code. Locking makes sure that only one handler instance can execute a specific section of code at any given time:
import { Mutex } from 'async-mutex'; // https://github.com/DirtyHairy/async-mutex
...
export async function myWorkflow(): Promise<MyWorkflowOutput> {
let x = 0;
let y = 0;
const lock = new Mutex();
wf.setHandler(mySignal, async () => {
await lock.runExclusive(async () => {
const data = await myActivity();
x = data.x;
// β
OK: node's event loop may switch now to a different handler
// execution, or to the main workflow function, but no other execution of
// this handler can run until this execution finishes.
await wf.sleep(500); // or await anything else
y = data.y;
});
});
return {
name: 'hello',
};
}
Message handler troubleshootingβ
When sending a Signal, Update, or Query to a Workflow, your Client might encounter the following errors:
-
The client can't contact the server: You'll receive a
client.ServiceError
on which thecause.code
attribute is gRPC status code 14UNAVAILABLE
(after some retries). -
The workflow does not exist: You'll receive an
common.WorkflowNotFoundError
error.
Problems when sending a Signalβ
When using Signal, the two errors described above are the only errors that will result from your requests.
For Queries and Updates, the client waits for a response from the Worker and therefore additional errors may occur during the handler Execution by the Worker.
Problems when sending an Updateβ
When working with Updates, you may encounter these problems:
-
No Workflow Workers are polling the Task Queue: Your request will be retried by the SDK Client indefinitely.
-
Update failed: You'll receive a
client.WorkflowUpdateFailedError
exception. There are two ways this can happen:-
The Update was rejected by an Update validator defined in the Workflow alongside the Update handler.
-
The Update failed after having been accepted.
Update failures are like Workflow failures. Issues that cause a Workflow failure in the main method also cause Update failures in the Update handler. These might include:
- A failed Child Workflow
- A failed Activity (if the Activity retries have been set to a finite number)
- The Workflow author raising
ApplicationFailure
-
-
The handler caused the Workflow Task to fail: A Workflow Task Failure causes the server to retry Workflow Tasks indefinitely. What happens to your Update request depends on its stage:
- If the request hasn't been accepted by the server, you receive a
client.ServiceError
on which thecause.code
attribute is gRPC status code 9FAILED_PRECONDITION
(after some retries). - If the request has been accepted, it is durable.
Once the Workflow is healthy again after a code deploy, use an
WorkflowUpdateHandle
to fetch the Update result.
- If the request hasn't been accepted by the server, you receive a
-
The Workflow finished while the Update handler execution was in progress: You'll receive a
client.ServiceError
on which thecause.code
attribute is gRPC status code 5NOT_FOUND
. This happens if the Workflow finished while the Update handler execution was in progress, for example because-
The Workflow was canceled or failed.
-
The Workflow completed normally or continued-as-new and the Workflow author did not wait for handlers to be finished.
-
Problems when sending a Queryβ
When working with Queries, you may encounter these errors:
-
There is no Workflow Worker polling the Task Queue: You'll receive a
client.ServiceError
on which thecause.code
attribute is gRPC status code 9FAILED_PRECONDITION
. -
Query failed: You'll receive a
client.QueryNotRegisteredError
exception if something goes wrong during a Query. Any error in a Query handler will trigger this error. This differs from Signal and Update requests, where errors can lead to Workflow Task Failure instead. -
The handler caused the Workflow Task to fail. This would happen, for example, if the Query handler blocks the thread for too long without yielding.
Define Signals and Queries statically or dynamicallyβ
- Handlers for both Signals and Queries can take arguments, which can be used inside
setHandler
logic. - Only Signal Handlers can mutate state, and only Query Handlers can return values.
Define Signals and Queries staticallyβ
If you know the name of your Signals and Queries upfront, we recommend declaring them outside the Workflow Definition.
signals-queries/src/workflows.ts
import * as wf from '@temporalio/workflow';
export const unblockSignal = wf.defineSignal('unblock');
export const isBlockedQuery = wf.defineQuery<boolean>('isBlocked');
export async function unblockOrCancel(): Promise<void> {
let isBlocked = true;
wf.setHandler(unblockSignal, () => void (isBlocked = false));
wf.setHandler(isBlockedQuery, () => isBlocked);
wf.log.info('Blocked');
try {
await wf.condition(() => !isBlocked);
wf.log.info('Unblocked');
} catch (err) {
if (err instanceof wf.CancelledFailure) {
wf.log.info('Cancelled');
}
throw err;
}
}
This technique helps provide type safety because you can export the type signature of the Signal or Query to be called by the Client.
Define Signals and Queries dynamicallyβ
For more flexible use cases, you might want a dynamic Signal (such as a generated ID). You can handle it in two ways:
- Avoid making it dynamic by collapsing all Signals into one handler and move the ID to the payload.
- Actually make the Signal name dynamic by inlining the Signal definition per handler.
import * as wf from '@temporalio/workflow';
// "fat handler" solution
wf.setHandler(`genericSignal`, (payload) => {
switch (payload.taskId) {
case taskAId:
// do task A things
break;
case taskBId:
// do task B things
break;
default:
throw new Error('Unexpected task.');
}
});
// "inline definition" solution
wf.setHandler(wf.defineSignal(`task-${taskAId}`), (payload) => {
/* do task A things */
});
wf.setHandler(wf.defineSignal(`task-${taskBId}`), (payload) => {
/* do task B things */
});
// utility "inline definition" helper
const inlineSignal = (signalName, handler) =>
wf.setHandler(wf.defineSignal(signalName), handler);
inlineSignal(`task-${taskBId}`, (payload) => {
/* do task B things */
});
API Design FAQs
Why not "new Signal" and "new Query"?
The semantic of defineSignal
and defineQuery
is intentional.
They return Signal and Query definitions, not unique instances of Signals and Queries themselves
The following is their entire source code:
/**
* Define a signal method for a Workflow.
*/
export function defineSignal<Args extends any[] = []>(
name: string,
): SignalDefinition<Args> {
return {
type: 'signal',
name,
};
}
/**
* Define a query method for a Workflow.
*/
export function defineQuery<Ret, Args extends any[] = []>(
name: string,
): QueryDefinition<Ret, Args> {
return {
type: 'query',
name,
};
}
Signals and Queries are instantiated only in setHandler
and are specific to particular Workflow Executions.
These distinctions might seem minor, but they model how Temporal works under the hood, because Signals and Queries are messages identified by "just strings" and don't have meaning independent of the Workflow having a listener to handle them. This will be clearer if you refer to the Client-side APIs.
Why setHandler and not OTHER_API?
We named it setHandler
instead of subscribe
because a Signal or Query can have only one "handler" at a time, whereas subscribe
could imply an Observable with multiple consumers and is a higher-level construct.
wf.setHandler(MySignal, handlerFn1);
wf.setHandler(MySignal, handlerFn2); // replaces handlerFn1
If you are familiar with RxJS, you are free to wrap your Signals and Queries into Observables if you want, or you could dynamically reassign the listener based on your business logic or Workflow state.