Thread

Threads are the Ruby implementation for a concurrent programming model.

Programs that require multiple threads of execution are a perfect candidate for Ruby’s Thread class.

For example, we can create a new thread separate from the main thread’s execution using ::new.

thr = Thread.new { puts "What's the big deal" }

Then we are able to pause the execution of the main thread and allow our new thread to finish, using #join:

thr.join #=> "What's the big deal"

If we don’t call thr.join before the main thread terminates, then all other threads including thr will be killed.

Alternatively, you can use an array for handling multiple threads at once, like in the following example:

threads = []
threads << Thread.new { puts "What's the big deal" }
threads << Thread.new { 3.times { puts "Threads are fun!" } }

After creating a few threads we wait for them all to finish consecutively.

threads.each { |thr| thr.join }

To retrieve the last value of a thread, use #value

thr = Thread.new { sleep 1; "Useful value" }
thr.value #=> "Useful value"

Thread initialization

In order to create new threads, Ruby provides ::new, ::start, and ::fork. A block must be provided with each of these methods, otherwise a ThreadError will be raised.

When subclassing the Thread class, the initialize method of your subclass will be ignored by ::start and ::fork. Otherwise, be sure to call super in your initialize method.

Thread termination

For terminating threads, Ruby provides a variety of ways to do this.

The class method ::kill, is meant to exit a given thread:

thr = Thread.new { sleep }
Thread.kill(thr) # sends exit() to thr

Alternatively, you can use the instance method #exit, or any of its aliases #kill or #terminate.

thr.exit

Thread status

Ruby provides a few instance methods for querying the state of a given thread. To get a string with the current thread’s state use #status

thr = Thread.new { sleep }
thr.status # => "sleep"
thr.exit
thr.status # => false

You can also use #alive? to tell if the thread is running or sleeping, and #stop? if the thread is dead or sleeping.

Thread variables and scope

Since threads are created with blocks, the same rules apply to other Ruby blocks for variable scope. Any local variables created within this block are accessible to only this thread.

Fiber-local vs. Thread-local

Each fiber has its own bucket for Thread#[] storage. When you set a new fiber-local it is only accessible within this Fiber. To illustrate:

Thread.new {
  Thread.current[:foo] = "bar"
  Fiber.new {
    p Thread.current[:foo] # => nil
  }.resume
}.join

This example uses #[] for getting and #[]= for setting fiber-locals, you can also use #keys to list the fiber-locals for a given thread and #key? to check if a fiber-local exists.

When it comes to thread-locals, they are accessible within the entire scope of the thread. Given the following example:

Thread.new{
  Thread.current.thread_variable_set(:foo, 1)
  p Thread.current.thread_variable_get(:foo) # => 1
  Fiber.new{
    Thread.current.thread_variable_set(:foo, 2)
    p Thread.current.thread_variable_get(:foo) # => 2
  }.resume
  p Thread.current.thread_variable_get(:foo)   # => 2
}.join

You can see that the thread-local :foo carried over into the fiber and was changed to 2 by the end of the thread.

This example makes use of #thread_variable_set to create new thread-locals, and #thread_variable_get to reference them.

There is also #thread_variables to list all thread-locals, and #thread_variable? to check if a given thread-local exists.

Exception handling

When an unhandled exception is raised inside a thread, it will terminate. By default, this exception will not propagate to other threads. The exception is stored and when another thread calls #value or #join, the exception will be re-raised in that thread.

t = Thread.new{ raise 'something went wrong' }
t.value #=> RuntimeError: something went wrong

An exception can be raised from outside the thread using the Thread#raise instance method, which takes the same parameters as Kernel#raise.

Setting Thread.abort_on_exception = true, Thread#abort_on_exception = true, or $DEBUG = true will cause a subsequent unhandled exception raised in a thread to be automatically re-raised in the main thread.

With the addition of the class method ::handle_interrupt, you can now handle exceptions asynchronously with threads.

Scheduling

Ruby provides a few ways to support scheduling threads in your program.

The first way is by using the class method ::stop, to put the current running thread to sleep and schedule the execution of another thread.

Once a thread is asleep, you can use the instance method #wakeup to mark your thread as eligible for scheduling.

You can also try ::pass, which attempts to pass execution to another thread but is dependent on the OS whether a running thread will switch or not. The same goes for #priority, which lets you hint to the thread scheduler which threads you want to take precedence when passing execution. This method is also dependent on the OS and may be ignored on some platforms.

Thread Reference

Fiber

Fibers are primitives for implementing light weight cooperative concurrency in Ruby. Basically they are a means of creating code blocks that can be paused and resumed, much like threads. The main difference is that they are never preempted and that the scheduling must be done by the programmer and not the VM.

As opposed to other stackless light weight concurrency models, each fiber comes with a stack. This enables the fiber to be paused from deeply nested function calls within the fiber block. See the ruby(1) manpage to configure the size of the fiber stack(s).

When a fiber is created it will not run automatically. Rather it must be explicitly asked to run using the Fiber#resume method. The code running inside the fiber can give up control by calling Fiber.yield in which case it yields control back to caller (the caller of the Fiber#resume).

Upon yielding or termination the Fiber returns the value of the last executed expression

For instance:

fiber = Fiber.new do
  Fiber.yield 1
  2
end

puts fiber.resume
puts fiber.resume
puts fiber.resume

produces

1
2
FiberError: dead fiber called

The Fiber#resume method accepts an arbitrary number of parameters, if it is the first call to #resume then they will be passed as block arguments. Otherwise they will be the return value of the call to Fiber.yield

Example:

fiber = Fiber.new do |first|
  second = Fiber.yield first + 2
end

puts fiber.resume 10
puts fiber.resume 1_000_000
puts fiber.resume "The fiber will be dead before I can cause trouble"

produces

12
1000000
FiberError: dead fiber called

Fiber Reference

ThreadGroup

ThreadGroup provides a means of keeping track of a number of threads as a group.

A given Thread object can only belong to one ThreadGroup at a time; adding a thread to a new group will remove it from any previous group.

Newly created threads belong to the same group as the thread from which they were created.

ThreadGroup Reference

Mutex

Mutex implements a simple semaphore that can be used to coordinate access to shared data from multiple concurrent threads.

Example:

semaphore = Mutex.new

a = Thread.new {
  semaphore.synchronize {
    # access shared resource
  }
}

b = Thread.new {
  semaphore.synchronize {
    # access shared resource
  }
}

Mutex Reference

ConditionVariable

ConditionVariable objects augment class Mutex. Using condition variables, it is possible to suspend while in the middle of a critical section until a resource becomes available.

Example:

mutex = Mutex.new
resource = ConditionVariable.new

a = Thread.new {
   mutex.synchronize {
     # Thread 'a' now needs the resource
     resource.wait(mutex)
     # 'a' can now have the resource
   }
}

b = Thread.new {
   mutex.synchronize {
     # Thread 'b' has finished using the resource
     resource.signal
   }
}

ConditionVariable Reference

Queue

The Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.

The class implements FIFO type of queue. In a FIFO queue, the first tasks added are the first retrieved.

Example:

queue = Queue.new

producer = Thread.new do
  5.times do |i|
     sleep rand(i) # simulate expense
     queue << i
     puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
     value = queue.pop
     sleep rand(i/2) # simulate expense
     puts "consumed #{value}"
  end
end

consumer.join

Queue Reference

SizedQueue

This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.

See Queue for an example of how a SizedQueue works.

SizedQueue Reference