Asynchronous responses in Rack


14 April 2012

Rack is an interface for Ruby components accepting and responding to HTTP requests synchronously. Though there is not an official Rack spec for asynchronous responses, Rack servers such as Rainbows! and Thin provide an asynchronous interface for responses that may happen at an unknown time in the future, such as in a web chat room.

The synchronous Rack interface is defined in its SPEC file. The essential part is an object defined by the application which is given to a Rack server. This object must implement call(env), where env contains information about the HTTP request. The method returns and array of [status, headers, body] which the Rack server sends along to the client.

class SynchronousApp
  def call(env)
    [200, {'Content-Type' => 'text/plain'}, ['Hello, World!']]
  end
end

With this synchronous interface, the entire body must be prepared immediately, or be otherwise quickly available, for example by reading from a file. A response that must be waited upon will tie up the process or thread executing the HTTP request. In a multi-process Rack server such as Unicorn, this will block the entire worker process, making in unavailable to server other requests.

Asynchronous responses

Some Rack servers provide an alternate interface that allows the request to be suspended, unblocking the worker. At some later time, the request may be resumed, and the response sent to the client.

While there is not yet an async interface in the Rack specification, several Rack servers have implemented James Tucker’s async scheme. Rather than returning [status, headers, body], the app returns a status of -1, or throws the symbol :async. The server provides env['async.callback'] which the app saves and later calls with the usual [status, headers, body] to send the response.

Note: returning a status of -1 is illegal as far as Rack::Lint is concerned. throw :async is not flagged as an error.

class AsyncApp
  def call(env)
    Thread.new do
      sleep 5  # simulate waiting for some event
      response = [200, {'Content-Type' => 'text/plain'}, ['Hello, World!']]
      env['async.callback'].call response
    end

    [-1, {}, []]  # or throw :async
  end
end

In the example above, the request is suspended, nothing is sent back to the client, the connection remains open, and the client waits for a response. The app returns the special status, and the worker process is able to handle more HTTP requests (i.e. it is not blocked). Later, inside the thread, the full response is prepared and sent to the client.

Deferred or streaming response bodies

The example shows a one-shot request, wait, response cycle. But it is possible to have multiple wait, response segments to allow the headers to be sent to the client immediately, or the body to trickle in slowly without blocking the worker process. Async Rack servers, when env['async.callback'] is called, send the status and headers to the client and then begin iterating through each part of the body with #each. After the last body part the server must decide if the connection to the client should be closed (entire body has been provided) or if it should remain open (body parts will be provided later). The details of this decision are implementation-specific. For now, assume the connection is not closed. To send additional body parts, env['async.callback'] may not be called a second time since the status code and headers have already been sent to the client and can not be changed. The app takes advantage of the server’s iteration through the body with #each: the server calls body.each(&block), and the trick is to save &block for later use. This turns the iteration inside-out: rather than the sever iterating through a body, the app takes control to send each part of the body itself.

class DeferredBody
  def each(&block)
    # normally we'd yield each part of the body, but since
    # it isn't available yet, we save &block for later
    @server_block = &block
  end

  def send(data)
    # calling the saved &block has the same effect as
    # if we had yielded to it
    @server_block.call data
  end
end

class AsyncApp
  def call(env)
    Thread.new do
      sleep 5  # simulate waiting for some event
      body = DeferredBody.new
      response = [200, {'Content-Type' => 'text/plain'}, body]
      env['async.callback'].call response

      # at this point, the server may send the status and headers,
      # but the body was empty

      body.send 'Hello, '
      sleep 5
      body.send 'World'
    end

    [-1, {}, []]  # or throw :async
  end
end

Note that the above won’t quite work because we haven’t signaled to the server that the body will be deferred and streamed in part by part.

Signaling deferred body with Thin and Rainbows::EventMachine

Thin is a single-threaded EventMachine based Rack server as is Rainbows! when used with Rainbows::EventMachine. The first async example above won’t actually work within EventMachine. The call to env['async.callback'] must be done from the reactor thread (for reasons I don’t quite understand):

class AsyncApp
  def call(env)
    Thread.new do
      sleep 5  # simulate waiting for some event
      response = [200, {'Content-Type' => 'text/plain'}, ['Hello, World!']]
      EM::next_tick { env['async.callback'].call response }
    end

    [-1, {}, []]  # or throw :async
  end
end

In order to properly defer the body so it may be sent part by part rather than all at once, these servers expect the body object to respond to #callback and #errback which the server calls to register handlers that, when called, will terminate the request, closing the connection (unless it is persistent/pipelining, but irrelevant details). It is up to the app to invoke the callbacks after the entire body has been sent or after an error has occurred. The callback interface is that of EventMachine::Deferrable which may simply be included in deferred body classes. The app may then call body.succeed after all the body has been sent. For example, from Thin’s examples/async_app.ru:

class DeferrableBody
  include EventMachine::Deferrable

  def call(body)
    body.each do |chunk|
      @body_callback.call(chunk)
    end
  end

  def each &blk
    @body_callback = blk
  end
end

class AsyncApp
  def call(env)
    body = DeferrableBody.new
    EventMachine::next_tick {
      env['async.callback'].call [200, {'Content-Type' => 'text/plain'}, body]
    }

    EventMachine::add_timer(1) {
      body.call ["Cheers then!"]
      body.succeed       # calls Thin's callback which closes the request
    }
  end
end

Mizuno

Mizuno is a Rack server for JRuby. It uses the embeddable Jetty Java servlet container. Mizuno takes a slightly different approach to env['async.callback'] than Thin and Rainbows!. Multiple calls to env['async.callback'] are allowed. The status and headers of the second and subsequent calls are ignored. A call with both empty headers and an empty body signals the end of the response.

class AsyncApp
  def call(env)
    Thread.new do
      sleep 5  # simulate waiting for some event

      # first call send the headers
      env['async.callback'].call [200, {'Content-Type' => 'text/plain'}, []]

      # status of second call is ignored; only the body is sent
      env['async.callback'].call [404, {}, ['Hello, ']]

      # additional body part is sent
      env['async.callback'].call [0, {}, ['World!']]

      # empty headers and body signal end of response
      env['async.callback'].call [0, {}, []]
    end

    throw 
  end
end