The previous post gave some background on why you should try out GCP Workflows. This post is more technical and shows how to use Infrastructure as Code to easily set up GCP Workflows that also supports re-runs and backfills.

Solution Architecture

The requirements we have are:

  1. We want re-run capabilities (idempotency) and backfill capabilities. We will use a custom cloud run service built with FastAPI.
  2. We want to set up workflow orchestration and scheduling of batch jobs with Infrastructure as Code (IaC). We will use Pulumi.

Re-run capabilities and backfill capabilities

One of the limitations stated by GCP’s guide to choose Workflows or Composer for orchestration is that GCP Workflows lacks data processing features such as backfills or re-running DAGs.

Re-runs

In order to re-run an execution you need idempotency, i.e. given an input the output should be the same. In the case of GCP Workflows the limitation is to a large degree that an execution doesn’t have an immutable timestamp for the execution. Also, GCP Workflows doesn’t have a built in scheduler, that is the responsibility of Cloud Scheduler. Cloud Scheduler on the other hand only allows for a static body to be defined and that is used as the input parameters in Workflows.

At Mathem we have solved this with a Cloud Run service (FastAPI) that takes the POST request from Cloud Scheduler and extract the timestamp from the request header (X-CloudScheduler-ScheduleTime) and adds that to the body as an attribute (scheduleTime) before forwarding the request to Workflows.

curl -X POST <service_url>/v1/projects/<my_project>/locations/europe-west4/workflows/workflow-1/executions?timezone=Europe/Stockholm \
   -H "Authorization: Bearer $(gcloud auth print-identity-token)" \
   -H 'Content-Type: application/json' \
   -H 'X-CloudScheduler-ScheduleTime: 2019-10-12T07:20:50.52Z' \
   -d '{"firstName":"first"}'


=> {"firstName":"first", "scheduleTime":"2019-10-12T08:20:50.52+01:00"}

This application is named workflows-runner and made open source and available under Mathem’s open source account mhlabs on Github (you also find some nice AWS stuff open sourced there).

This enable us to re-run a (failed?) job in Workflows using that immutable scheduleTime parameter as an input argument.

Backfills

When you create a new pipeline it is not unusual that you want to make a historical backfill, i.e. running a number of executions with historical timestamps as input. In order to run such a series of executions we added an endpoint to the already mentioned workflows-runner application that accepts a body with start and stop datetimes and a step defined as a cron schedule and it returns a list of timestamps.

curl -X POST <service_url>/range \
   -H "Authorization: Bearer $(gcloud auth print-identity-token)" \
   -H 'Content-Type: application/json' \
   -H 'X-CloudScheduler-ScheduleTime: 2019-10-12T07:20:50.52Z' \
   -d '{"start":"2022-02-15T00:00:00Z", "stop":"2022-02-18T00:00:00Z", "step":"0 1 * * *"}'

=> ["2022-02-15T01:00:00+00:00","2022-02-16T01:00:00+00:00","2022-02-17T01:00:00+00:00"]

This is then used in a GCP Workflow job that iterates over the series and execute the workflow job we want to backfill.

This is how the backfill workflow job looks like.

main:
    params: [input]
    steps:
    - getExecutionList:
        call: http.post
        args:
            url: https://workflows-runner-XXXXXXXX-ew.a.run.app/range
            auth:
                type: OIDC
                audience: https://workflows-runner-XXXXXXXX-ew.a.run.app
            body:
                start: ${input.start}
                stop: ${input.stop}
                step: ${input.step}
        result: scheduleTimes
    - iterateExecutions:
        for:
            value: scheduleTime
            in: ${scheduleTimes.body}
            steps:
                - updateScheduleTime:
                    assign:
                        - mymap: ${input.params}
                        - mymap.scheduleTime: ${scheduleTime}
                - workflowExecution:
                    call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.create
                    args:
                        parent: ${input.parent}
                        body:
                            argument: ${json.encode_to_string(mymap)}
    - returnOutput:
        return: ${scheduleTimes}

Hopefully you find this useful to build lightweight workflow orchestration with re-run and backfill capabilities. In the next post I will cover how to set up batch pipelines (workflow-runner application, Cloud Scheduler and Workflows) using Pulumi.