21. Конкурентност

21. Конкурентност

21. Конкурентност

8 януари 2017

Днес

Втори тест

Седма задача

Disclaimer

IO-bound vs. CPU-bound

Processes vs. Threads (Green & Native)

Паралелизация на многоядрени процесори

Нишки

GVL/GIL?

Импликации от съществуването на GVL

Shellout и GVL

Бъдещето на GVL

1. Паралелизация с процеси

Първият начин за паралелизация и фоново изпълнение на задачи, който ще разгледаме, е базиран на процеси.

Процеси в ОС

crash-course

fork

fork в Ruby

child_pid = fork

if child_pid
  puts 'This is the parent'
else
  puts 'This is the child speaking!'
end

puts 'This is executed in both the parent and the child.'

fork (2)

Има и друга версия, която се възползва от удобството на блоковете:

fork do
  puts 'This is the child'
end

puts 'This is the parent'

module Process

Всъщност, повечето неща са в модул Process:

Process.fork
Process.wait
Process.waitall
Process.waitpid

Process.wait

Process.wait чака някое от децата да приключи и връща pid-а му, а $? съдържа Process::Status.

fork { exit 99 } # => 22475
Process.wait     # => 22475
$?.exitstatus    # => 99

Process.wait2

Process.wait2 е сходно, но връща масив от pid и Process::Status:

fork { exit 99 } # => 22482
Process.wait2    # => [22482, #<Process::Status: pid 22482 exit 99>]

Process.waitpid

Process.waitpid(pid) чака дадено конкретно дете да приключи:

pid = fork do
  puts "Child: I'm starting..."
  sleep 1
  puts "Child: I'm done."
end

puts 'Parent: Child running. Waiting for it to complete.'
Process.waitpid(pid)
puts 'Parent: Child is done.'

Process.waitall

Process.waitall чака всички деца да приключат

fork { sleep 1; puts '1' }
fork { sleep 2; puts '2' }

puts '0'
Process.waitall
puts '3'

Process.exec

Process.exec заменя текущия процес с изпълнение на команда:

fork do
  exec 'date'
  puts 'Unreachable code'
end

Process.daemon

Process.daemon "откача" процеса от терминала и го пуска в background.

fork do
  Process.daemon
  loop do
    system 'echo Spam >> ~/spam'
    sleep 1
  end
end

Process.methods

2. Паралелизация с нишки

Втори начин за паралелизация и фоново изпълнение на задачи е употребата на нишки.

Thread.new

Създаването на нишка в Ruby е лесно:

thread = Thread.new do
  puts 'This is run in a separate new thread'
  sleep 1
  puts 'The thread is started immediatelly'
end

puts 'This is run in the main thread'

Thread#join

Процесът приключва, когато основната нишка приключи. Ако искате да изчакате някоя от създадените нишки да приключи преди процесът да излезе, ползвайте Thread#join.

thread = Thread.new do
  puts 'This is run in the thread'
  sleep 1
  puts 'The thread is started immediatelly'
end

puts 'This is run in the main thread'
thread.join

Thread#value

Thread#value блокира основната нишка, докато поднишката не приключи и върне последния оценен израз

thread = Thread.new do
  2 + 2
end

# Can be called multiple times, will block only on the first call.
thread.value # => 4
thread.value # => 4

Изключения

Ако една нишка предизвика изключение, то няма да убие интерпретатора. Вместо това, ще се появи в нишката, извикваща #value или #join.

thread = Thread.new do
  raise 'Oh noes!'
end

thread.join # => error: RuntimeError

Изключения (2)

Можете да промените последното с Thread.abort_on_exception.

Thread.abort_on_exception = true

Thread.methods

Thread#priority

Променливи (1)

Променливи, дефинирани в блока на нишката, са (очевидно) локални за нея:

thread = Thread.new { something = 1 }
thread.join

something # => error: NameError

Променливи (2)

Блокът на нишката вижда променливите отвън:

answer = 1
thread = Thread.new { answer = 2 }

thread.join
answer # => 2

Променливи (3)

Можете да подавате стойности на нишката през Thread.new

n = 10
thread = Thread.new(n) do |number|
  n      # => 20
  number # => 10
end
n = 20

thread.join

Променливи (4)

Всяка нишка функционира като хеш от символи. Така може да правите thread-local променливи – нещо като глобални променливи, но в рамките на една (текущата) нишка:

Thread.current[:x] = 10
thread = Thread.new do
  Thread.current[:x] # => nil
  Thread.current[:x] = 20
end
thread.join
Thread.current[:x]   # => 10

Thread-local променливи

Thread-local променливи (2)

Ето примерна имплементация на I18n.locale методите:

class I18n
  class << self
    def locale
      Thread.current[:locale]
    end

    def locale=(new_locale)
      Thread.current[:locale] = new_locale
    end
  end
end

Thread-safety

Конкурентният достъп до споделени ресурси създава проблеми, когато операциите по промяна на ресурс не са атомарни. Нека разгледаме един пример:

username = 'larodi'

50.times do
  Thread.new do
    unless User.username_taken?(username)
      User.create username: username
    end
  end
end

Какво става тук? Няколко нишки могат да изпълнят username_taken?, преди да се е стигнало до създаване на потребител и да решат, че няма проблем да създадат такъв, понеже потребителското име е свободно и хоп – дублирани данни.

Синхронизация на нишки

Mutex (1)

Mutex (2)

Примерна употреба:

user_creation_mutex = Mutex.new

username = 'larodi'

50.times do
  Thread.new do
    user_creation_mutex.lock
    unless User.username_taken?(username)
      User.create username: username
    end
    user_creation_mutex.unlock
  end
end

Mutex (3)

Има и по-удобна форма, приемаща блок:

user_creation_mutex = Mutex.new

username = 'larodi'

50.times do
  Thread.new do
    user_creation_mutex.synchronize do
      unless User.username_taken?(username)
        User.create username: username
      end
    end
  end
end

Обърнете внимание, че ако възникне изключение в блока, подаден на synchronize, mutex-ът ще бъде коректно отключен.

Писане на thread-safe код

Thread-safe структури от данни

Producer-consumer

Класически "конкуретен" проблем:

In computing, the producer–consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate a piece of data, put it into the buffer and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer) one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer.

Queue

Queue - пример

queue = Queue.new

producer = Thread.new do
  loop { queue << 'LOOK MA, I AM PRODUCING STUFF!' }
end

consumer = Thread.new do
  loop { puts "Consumed #{queue.pop}" }
end

SizedQueue

Producer-consumer с три компонента

case study над реален пример

Producer-consumer с три компонента - реализация

Всяка виртуална машина стартира и прави следното в този ред:

Producer-consumer с три компонента - особености

Producer-consumer с три компонента - пример (1/4)

class ConvertCoordinator
  def initialize
    @convert_queue = SizedQueue.new(1)
    @upload_queue  = Queue.new
    @threads       = []
  end

  def run
    start_downloading_files
    start_converting_downloaded_files
    start_uploading_converted_files

    # Wait for all threads to finish
    @threads.each(&:join)
  end
end

ConvertCoordinator.new.run

Producer-consumer с три компонента - пример (2/4)

class ConvertCoordinator
  def start_downloading_files
    @threads << Thread.new do
      loop do
        # Fetch a file to convert and put it in the convert queue.
        # This will block if the convert queue is full.
        @convert_queue << "some data to convert"
      end
    end
  end
end

Producer-consumer с три компонента - пример (3/4)

class ConvertCoordinator
  def start_converting_downloaded_files
    @threads << Thread.new do
      loop do
        # Look for files to convert.
        # This will block if the convert queue is empty.
        file = @convert_queue.pop

        # Convert the file
        converted_file = file # system('ffmpeg ...')

        # Put the converted file it in the upload queue.
        @upload_queue << converted_file
      end
    end
  end
end

Producer-consumer с три компонента - пример (4/4)

class ConvertCoordinator
  def start_uploading_converted_files
    @threads << Thread.new do
      loop do
        # Look for converted files, ready to be uploaded.
        # This will block if the upload queue is empty.
        converted = @upload_queue.pop

        # Upload the file
      end
    end
  end
end

Имплементация на Queue

Имплементация на Queue - пример

Въпроси