Overview

Either if you are building a completely new system composed of many discreet API applications, each of them with a clearly defined area of responsibility, or in the case you are trying to assemble a collaboration channel between an heterogeneous set of totally unrelated API applications, you need a way to orchestrate interactions between these applications.

So a workflow is effectively an orchestration.

You want a workflow, because it gives you a way to decompose a complex series of operations down to a sequence of discreet tasks with a state machine.

The sequence of tasks is more complex than just a series. The tasks of course can fail, and so you need to deal with timeouts, retries, "stuck" flows, etc.

You can define a workflow and its tasks using an arbitrarily complex language. Or you can keep it simple by taking some assumptions:

  • Code is the definition language.
  • Tasks are independent. Can be used into different workflows.
  • The only way of communication between tasks is the workflow. Tasks can add, remove or modify properties of the workflow.
  • If a task requires a specific property of the workflow to be present, the task can fail, or re-schedule itself within the workflow, or ...

System needs to be designed with failures in mind. Tasks can fail and, as a consequence, workflows may fail. You may want to recover from a task failure, or from a whole workflow failure.

The system itself may also fail due to any unexpected reason.

node-workflow

This package provides a way to define re-usable workflows using JSON and run concrete jobs with specific targets and parameters based on such workflows.

Terminology

  • Task: A single discreet operation, such as Send Email.
  • Workflow: An abstract definition of a sequence of Tasks, including transition conditions, and branches.
  • Job: An instance of a Workflow containing all the required information to execute itself and, eventually, a target.

System components

  • A workflow and task factory, where you can create tasks, workflows and queue jobs, using NodeJS.
  • Alongside the factory, Workflow API will allow creation of tasks, workflows and jobs through a REST API using any other language, with JSON as the payload.
  • An Status API, where you can check the status of a given job, get information about failures, ...
  • Job runners. You can have as many runners as you want. Runners can live everywhere. Once a runner "picks" a job, that job is flagged with the runner identifier - locked - so no other runner attempts to execute it. One runner can be composed of multiple associated processes to execute jobs.

The factory talks to a persistence layer, and saves workflows and tasks. Also, once a Job is created, it's saved with all the required information, including the tasks code at the moment of job creation. This will avoid any potential problems resulting of underlaying modifications of tasks once a Job has been queued.

A runner itself may go dive nuts while a job is being executed, leaving the job into an unappropriated "running" status. When a runner starts, there shouldn't be any job flagged with the runner identifier and, furthermore, it shouldn't be on a running status. If that happens, the first thing a runner must do on restart is to pick any job into such invalid state and cancel it.

Task properties

  • A name,
  • code to be executed,
  • timeout,
  • number of retries, when proceed
  • a fall-back task to be executed when the task fails.

    {
      name: 'A Task',
      timeout: 30,
      retry: 2,
      body: function(job, cb) {
        if (!job.foo) {
          job.foo = true;
          return cb('Foo was not defined');
        }
        return cb(null);
      },
      fallback: function (err, job, cb) {
        job.the_err = err;
        return cb(null);
      }
    }
    

Note that a task's timeout shouldn't be bigger than the workflow timeout, but it really doesn't matter. If a task execution exceeds the workflow timeout it will be failed with 'workflow timeout' error.

Workflow properties

  • A name.
  • The 'chain' of tasks to be executed.
  • A global timeout.
  • Alternate error branch.

    factory.workflow({
      name: 'Sample Workflow',
      chain: [aTask, anotherTask, aTaskWhichWillFail],
      timeout: 300,
      onError: [aFallbackTask]
    }, function(err, workflow) {
      if (err) {
        throw(err);
      }
      return workflow;
    });
    

Job properties

While a Workflow is something "abstract", a Job may "operate" over a concrete target. For example, you can use a REST URI as the target of the job, or an LDAP DN, or whatever you need to make sure that the same job will not be queued twice.

When jobs are created and queued, they will check if another job with the same target (and the same parameters) exists and, if that's the case, the job creation will fail.

Obviously, there are some cases where you may want the same job to be queued exactly for the same target; for example, POST to a given URI to create a new collection element. That's the reason job's parameters are also checked with job's target in order to allow or not creation and queueing of a new job.

In the case a job has failed for a given target and parameters, you may want to create a new one after some time. This is perfectly possible since the previous job uniqueness checks are made only versus "running" or "queued" jobs, not versus "finished" jobs, whatever is their result.

Same than the workflow plus:

  • Results for each one of the tasks (we need to decide about tasks results format)
  • The job target, when given.
  • The job parameters, when necessary.
  • The job status (something like "queued", "running", "finished" may work). Note that a job is running while a task is being executed. It's perfectly possible to change job status to "queued" once a task has been completed, and leave the job there to be picked by a runner at some later moment.
  • When to run the job. May we want to delay execution in time?.
  • Any additional property a task may want to save with the job to be used by a subsequent task.

    factory.job({
      workflow: aWorkflow,
      exec_after: '2012-01-03T12:54:05.788Z'
    }, function(err, job) {
      if (err) {
        callback(err);
      }
      aJob = job;
      callback(null, job);
    });
    

Workflow API and REST API.

You can create workflows and jobs either by using the provided REST API(s), or by embedding this module's API into your own system(s). The former will be easier to get up and running, but you should use the latter when:

  • You want to use the Worflow API in a (node.js) application that is not the bundled REST API.
  • You want to use a different backend storage system, or otherwise change the assumptions of the bundled REST API

The package also provides a binary file to run the WorkflowAPI using the same configuration file we pass to our WorkflowRunner:

./bin/workflow-api path/to/config.json

See REST API docs for the details of available end-points.

 Workflow Runners

In order to execute the jobs, at least one WorkflowRunner must be up and ready to take jobs. An arbitrary number of runners can be used on any set of hosts; configuration must match.

An example WorkflowRunner is provided with the package and can be started with:

./bin/workflow-runner path/to/config.json

The system design requires that we can have workflow runners everywhere. As many as needed, and all of them reporting health periodically.

All runners will periodically query the backend for information about other runners and, in case they detect one of those runners has been inactive for a configurable period of time, they will check for stale jobs associated with the inactive runner and cancel those jobs.

The first thing a runner does when it boots, is to register itself on the backend, (which is the same as report its health). At a configurable interval a runner will try to pick queued jobs and execute them. Runners will report activity at this same interval.

Every runner must have an unique identifier, which can be passed at runner's initialization, (or it will be auto-generated the first time the runner is created and saved for future runs).

Runners will spawn child processes, one process per job. Max number of child processes is also configurable.

 How runners pick and execute jobs

A runner will query the backend for queued jobs, (exactly the same number of them than available child processes to spawn). Once the runner gets a set of these queued jobs, it will try to obtain an exclusive lock on each job before processing it. When a job is locked by a runner, it will not be found by other runners searching for queued jobs.

Once the runner has the exclusive lock over the job, it'll change job status from queued to running, and begin executing the associated tasks.

In order to execute the job, the runner will spawn a child process, and pass it all the information about the job. (Child processes don't have access to the backend, just to the job, which must be a JSON object).

Note that everything must be executed within the acceptable amount of time provided for the job. If this time expires, the job execution will fail and the onerror branch will be executed when given.

Task execution:

Workflow runner will then try to execute the job chain of tasks, in order. For every task, the runner will try to execute the task body, using NodeJS VM API. Every task will get as arguments the job and a callback. A task should call the callback once it's completed.

// A task body:
function(job, cb) {
  // Task stuff here:
  cb(null);
}

If a task succeeds, it will call the callback without error:

cb(null);

Otherwise, the task should call the callback with an error message:

cb('whatever the error reason');

These error messages will be available for the task's fallback function, in order to allow it to decide if can recover the task from a failure.

It's also possible to set an specific timeout for every task execution.

Either if a task fails, or if the task timeout is reached, the runner will check if we've exceeded the number of retries for the task and, if that's not the case, it'll try to execute the task again.

Once the max number of retries for a task has been reached, the runner will check if the task has an fallback and, when that's the case, it'll call it with the error which caused the failure, as follows:

task.onerror(error, job, cb);

The same logic than for tasks bodies can be applied to fallbacks.

Note that tasks run sandboxed. Only the node modules we specify to the runner at initialization time, alongside with setTimeout, clearTimeout, setInterval and clearInterval global functions will be available for task body and fallback functions. (This is configurable).

All the tasks results will be saved sorted at the job's property chain_results. For every task, the results will be something like:

{
  error: '',
  results: 'OK'
}

or, for a task which failed:

{
  error: 'Whatever the error reason',
  results: ''
}

If the task fails because its fallback failed, or because the task doesn't has such fallback, the job's onerror chain will be invoked if present.

The logic to execute the job's onerror chain is exactly the same than we've described here for the main chain of tasks.

Once the job is finished, either successfully or right after a failure, or even in the case a task tells the runner to re-queue the job, due to whatever the reason, the child process running the job will communicate the runner the results, and the runner will save back those results into the backend, either finishing the job, or re-queueing it for another runner which may fetch it in order to continue its execution at a later time.

Configuration options

The following is an example configuration for node-workflow, with all the relevant sections included:

{ 
  backend: {
    module: '../lib/workflow-redis-backend',
    opts: {
      port: 6379,
      host: '127.0.0.1',
      db: 14 
    } 
  },
  api: {
    port: 8080
  },
  runner: {
    identifier: 'cd925eef-93fb-4bfe-a820-2aaedf9fc006',
    forks: 10,
    run_interval: 60,
    sandbox: {
      restify: 'restify',
      uuid: 'node-uuid',
      util: 'util'
    } 
  },
  logger: {
    streams: [ {
      level: 'info',
      stream: process.stdout
    }, {
      level: 'debug',
      path: './some-file.log'
    }]
  } 
}

Backend

Backend configuration has two main sections: module and opts. module value must be something we can use to load the backend module of choice by issuing require(backend.module). It doesn't matter if it's a node.js npm module or a relative path to the module. The included module must be a class which inherits from WorkflowBackend.

Then, the opts section is anything required by the chosen backend constructor to be initialized:

var BackendModule = require(config.backend.module);
var backend = new BackendModule(config.backend.opts);

Both, API and Runner will communicate with the backend using the configuration provided on this section.

API

Anything you want to pass to restify.createServer. restify's server logger configuration will be the same you provide for logger section.

Note you can pass either a port number or a socket path here too.

Runner

Configuration for workflow runners. None of the options on this section is required.

  • identifier: Unique identifier for a runner. If none given, the first time a runner is started it will generate an UUID and store it on a file workflow-runner which will be reused on the subsequent boots.
  • forks: Number of child processes to be forked by the runner, which will also match the max number of jobs to run in parallel. (10 if nothing given).
  • run_interval: Integer. Time in seconds for the runner to: report it is active, search for new queued jobs to process, search for stale jobs from other runners. Also, 3 times this value is what we'll take to decide if a runner is inactive. It's to say: if a given runner hasn't reported it's healthy since 3 * run_interval, it'll be considered inactive by other runners, and all the associated jobs will be canceled. (If nothing is given, 120 seconds is the default value for run_interval).
  • sandbox: node.js modules we want to make available for the VM where we run our tasks. Note that the key for each member will be the way you can refer to such object from your tasks body and fallback functions, while the value is what the system will use to require the module. For example: {uuid: 'node-uuid'}. Remember that, by default, only global node timers will be available for tasks if nothing is given.

Logger

Any streams you want to pass to node-bunyan, used by both, Runners and REST API.

Demo

The workflow-example repository contains everything needed to illustrate:

  • An example config file - config.json.sample which should be renamed to config.json and modified to properly match your local environment.

Remember that, in order to process any job the workflow-runner needs to be initialized pointing to the aforementioned configuration file:

./node_modules/.bin/workflow-runner config.json

Also, in order to be able to run the API based example mentioned below, the workflow-api HTTP server needs to be up and running too:

./node_modules/.bin/workflow-api config.json

Contents for the other files within the workflow-example repository are:

  • An example of how to use node-workflow as a node module in order to create workflows, queue jobs and wait for the results. See module.js.
  • Also, an example of how to achieve same goal using Workflow API instead of the node module. See api.js.
  • Both examples share the same workflow definition, contained at the file shared-workflow.js. The beginning of the aforementioned files can be useful to understand the differences when trying to create a workflow using these different approaches.
  • Finally, this directory also contains a file node.js which does exactly the same thing than the workflow/job does - create and star a gist using your github's username and password - but straight from NodeJS. This file is useful in order to understand the differences between writing code to be executed by NodeJS directly, and using it to create workflows and the associated tasks. Remember code withing tasks runs sandboxed using Node's VM API and that tasks are totally independent, and run each one on its own child process.

Also, see example.js for more options when defining workflows and the different possibilities for tasks fallbacks, retries, timeouts ...

LICENSE

The MIT License (MIT) Copyright (c) 2012 Pedro Palazón Candel

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Fork me on GitHub