A simple workflow framework written in Go
[TODO insert image here]
Backend is responsible for persisting workflow state, including tasks, events, workflow runtime metadata.
In this example, we will use PSQL as backend.
First, start PSQL server locally
docker compose -f docker/docker-compose-psql.yaml up -d
Then, init a backend instance that connect to PSQL server
const (
DbHost = "localhost"
DbPort = 5432
DbName = "postgres"
DbUser = "user"
DbPassword = "123456"
)
func InitPSQLBackend(logger *zap.Logger) (backend.Backend, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
db, err := psql.Connect(DbHost, DbPort, DbUser, DbPassword, DbName, nil)
if err != nil {
return nil, err
}
err = psql.PrepareDB(db) // auto-create table if not exists
if err != nil {
return nil, err
}
dataConverter := dataconverter.NewJsonDataConverter()
be := psql.NewPSQLBackend(hostname, 5*time.Minute, dataConverter, db, logger)
return be, nil
}
be, err := examples.InitPSQLBackend(logger)
Activity is a function that used to implement service calls, I/O operation, long running operations, or costly actions which are not prefer to be re-executed
type PaymentInput struct {
Value int64
}
func PaymentActivity(ctx context.Context, input *PaymentInput) (*Void, error) {
r := rand.Intn(100)
if r < 30 { // 30% of failure
return &Void{}, nil
} else {
return nil, errors.New("payment failed")
}
}
note #1: code inside activity must be non-deterministic, which is when running the activity twice, it always yields the same result
note #2: if you experience an unexpected error while executing activity,
just call panic(...)
, the activity will be retried later
Workflow is the orchestration of activities
type SubscriptionWorkflowInput struct {
TotalAmount int64
Cycles int
CycleDuration time.Duration
}
type SubscriptionWorkflowOutput struct {
Paid int64
Overdue int64
}
func SubscriptionWorkflow(ctx context.Context, input *SubscriptionWorkflowInput) (*SubscriptionWorkflowOutput, error) {
startTimestamp := workflow.GetWorkflowExecutionStartedTimestamp(ctx)
paymentAmounts := calculatePaymentCycles(input.TotalAmount, input.Cycles)
paymentTimings := calculatePaymentTimings(startTimestamp, input.Cycles, input.CycleDuration)
//
var paid int64 = 0
var overdue int64 = 0
currentCycle := 0
for {
workflow.SetVar(ctx, "paid", paid)
workflow.SetVar(ctx, "overdue", overdue)
workflow.SetVar(ctx, "currentCycle", currentCycle)
if currentCycle >= input.Cycles {
break
}
currentCycleAmount := paymentAmounts[currentCycle]
amountToPay := currentCycleAmount + overdue
workflow.SetVar(ctx, "amountToPay", amountToPay)
workflow.WaitUntil(ctx, time.UnixMilli(paymentTimings[currentCycle]))
_, err := workflow.CallActivity(ctx, PaymentActivity, &PaymentInput{Value: amountToPay}).Await()
if err != nil {
overdue += paymentAmounts[currentCycle]
workflow.SetVar(ctx, fmt.Sprintf("cycle_%d_err", currentCycle), err.Error())
} else {
paid += amountToPay
overdue = 0
workflow.SetVar(ctx, fmt.Sprintf("cycle_%d_paid_amount", currentCycle), amountToPay)
}
workflow.SetVar(ctx, "amountToPay", 0)
workflow.SetVar(ctx, fmt.Sprintf("cycle_%d_completed_at", currentCycle), workflow.GetCurrentTimestamp(ctx))
currentCycle += 1
}
return &SubscriptionWorkflowOutput{
Paid: paid,
Overdue: overdue,
}, nil
}
note #1: DO NOT put any costly operations (IO operations, external service calls, etc.) on workflow code, put them in activity code instead
Workers, including ActivityWorker
and WorkflowWorker
are responsible for executing activity and workflow codes
aw, err := worker.NewActivityWorkersBuilder().
WithName("demo activity worker").
WithBackend(be).
WithLogger(logger).
RegisterActivities(
PaymentActivity,
).
WithActivityWorkerOpts(
activity_worker.WithTaskProcessorMaxBackoffInterval(1 * time.Minute),
).
Build()
ww, err := worker.NewWorkflowWorkersBuilder().
WithName("demo workflow worker").
WithBackend(be).
WithLogger(logger).
RegisterWorkflows(
SubscriptionWorkflow,
).Build()
Putting all pieces together, we can implement our worker program
func main() {
ctx := context.Background()
logger, err := examples.GetLogger()
if err != nil {
panic(err)
}
be, err := examples.InitPSQLBackend(logger)
if err != nil {
panic(err)
}
aw, err := worker.NewActivityWorkersBuilder().
WithName("demo activity worker").
WithBackend(be).
WithLogger(logger).
RegisterActivities(
PaymentActivity,
).
WithActivityWorkerOpts(
activity_worker.WithTaskProcessorMaxBackoffInterval(1 * time.Minute),
).
Build()
if err != nil {
panic(err)
}
ww, err := worker.NewWorkflowWorkersBuilder().
WithName("demo workflow worker").
WithBackend(be).
WithLogger(logger).
RegisterWorkflows(
SubscriptionWorkflow,
).Build()
if err != nil {
panic(err)
}
aw.Start(ctx)
defer aw.Stop(ctx)
ww.Start(ctx)
defer ww.Stop(ctx)
//
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
}
After having our worker instance running, we can write codes to start workflows and wait for their result
To schedule a workflow, call ScheduleWorkflow
, and then fill out the necessary parameters to start the workflow
err := client.ScheduleWorkflow(ctx, be, SubscriptionWorkflow, &SubscriptionWorkflowInput{
TotalAmount: totalAmount,
Cycles: cycles,
}, client.WorkflowScheduleOptions{
WorkflowID: workflowID,
Version: "1",
})
First, to debug a running workflow, we have to put several runtime variables inside the workflow.
We will use method SetVar[T any](ctx context.Context, name string, value T)
which is used to modify a runtime variable.
After that, we will use the WorkflowDebugger to debug the current runtime state by getting those variables out.
dbg := debug.NewWorkflowDebugger(be)
vars, err := dbg.QueryUserDefinedVars(SubscriptionWorkflow, workflowID)
if err != nil {
panic(err)
}
PrettyPrint(vars)
To wait for a workflow execution to complete and get its result, call AwaitWorkflowResult
method
workflowResult, workflowErr, err := client.AwaitWorkflowResult(ctx, be, SubscriptionWorkflow, workflowID)
all above code were taken from subscription_with_debug example
Run build.sh
to perform build, unit test, integration test and e2e test
See examples