Usage of Sidekiq middleware

May 31, 2018 | David Bourguignon | 6-minute read

At Drivy, we use a lot of background jobs, called from service objects, API calls, cron, etc.
A time came when we needed to add some context data across several of these code layers.

For instance, we have some context data we need to keep for auditing reasons. This data can originate from several points in the application: maybe from some part of the web application, from the mobile app, or from a service object.

We tried to find a way to keep this new context data through all code layers and jobs without having to resort to adding context data arguments everywhere.

We decided to use Thread.current objects to host this data for the current process.

CAVEAT: Using this kind of global data in this way is usually considered to be bad practice. I will not discuss it here, but you can look at this discussion for more detail.
We use global data with caution, in a limited scope and only after having really thought about it. All interactions with the global data is tightly contained in service objects to limit the risk of using the data outside of its intended scope.

module ProcessContext
  module_function

  def reset
    self.attributes = {}
    attributes
  end

  def attributes
    Thread.current["process_context"] || reset
  end

  def attributes=(new_attributes)
    Thread.current["process_context"] = new_attributes
  end
end

It works well, up to the point where we delegate some of this processing to background jobs. The jobs run on a different thread (even on a different machine).

We use Sidekiq to manage our jobs. Sidekiq works in the following way (a simplified version):

  • The client side enqueues a job into a Redis database;
  • On the server side the workers:
    • read the database to pick a job in the queue;
    • run them.

Conveniently, Sidekiq provides a way to add some code around job processing, on the client side, the server side or both. So we used these middlewares to propagate the context information from the client (our Rails application) to the Sidekiq server.

Client side

The Sidekiq middleware client API is:

class Drivy::MyClientMiddleware
  def call(worker_class, job, queue, redis_pool)
    # custom code
    yield
    # custom code
  end
end

And you add it to Sidekiq configuration in this way:

# config/initializers/sidekiq.rb
Sidekiq.configure_client do |config|
  config.client_middleware do |chain|
    chain.add Drivy::MyClientMiddleware
  end
end

Note: You may want to add this client middleware to the server middleware pipe, see below

In our case, we want to enrich the job with some metadata. Sidekiq allows the adding of information to the job that will be available on the server side:

module Drivy::Sidekiq::Middleware::Client

  class AddProcessContext
    def call(_, job, _, _)
      process_context(job)
      yield
    end

    private

    def process_context(job)
      if ProcessContext.attributes.present?
        job['process_context'] = TrackedEventContext.attributes.to_json
      end
    rescue => e
      # Log/notify error as we do not want to fail the job in this case
      puts e
    end
  end
end

We only need the job argument here. It’s basically a regular Hash. We just add here our own information (be careful to store only data that will be serialised in JSON).

Server Side

The Sidekiq middleware server API is:

class Drivy::MyServerMiddleware
  def call(worker, job, queue)
    # custom code
    yield
    # custom code
  end
end

And you add it to Sidekiq configuration in this way:

# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  config.server_middleware do |chain|
    chain.add Drivy::MyServerMiddleware
  end
end

In our usage, we need to retrieve the metadata from the job and set it in the current process:

module Drivy::Sidekiq::Middleware::Server
  class AddProcessContext

    def call(_, job, _)
      process_metadata(job)
      yield
      reset_metadata
    end

    private

    def process_metadata(job)
      if job['process_context']
        ProcessContext.attributes = job['process_context']
      end
    rescue => e
      # Log/notify error as we do not want to fail the job in this case
      puts e
    end

    def reset_metadata
      ProcessContext.reset
    end
  end
end

We simply restore the data from the serialised version.

Each middleware is executed in the same thread as the main job process, so we know the context data will be available to the Ruby job.

A word of caution

Thread reuse

Sidekiq will reuse threads for different jobs in some cases, so we must be very careful to cleanup our ProcessContext to ensure we do not pollute the context of other jobs.

Middleware client on the server side

Sometimes, jobs running on the server can enqueue jobs, and act as a client. In this case, you’ll want to add the client middleware to the server configuration as well:

# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  config.client_middleware do |chain|
    chain.add Drivy::MyClientMiddleware
  end
  config.server_middleware do |chain|
    chain.add Drivy::MyServerMiddleware
  end
end

Conclusion

Middlewares are a useful tool, we use them for logging, and monitoring mainly. You can find some interesting plugins using middleware on the Sidekiq Wiki.

And again, do not use global states if you can avoid it.

👍  Like this post? Join Drivy's engineering team! View openings