system
Първият начин за паралелизация и фоново изпълнение на задачи, който ще разгледаме, е базиран на процеси.
fork
съществува от зората на операционните системи и е системен примитив
fork
child_pid = fork
if child_pid
puts 'This is the parent'
else
puts 'This is the child speaking!'
end
puts 'This is executed in both the parent and the child.'
Има и друга версия, която се възползва от удобството на блоковете:
fork do
puts 'This is the child'
end
puts 'This is the parent'
Всъщност, повечето неща са в модул Process
:
Process.fork
Process.wait
Process.waitall
Process.waitpid
Process.wait
чака някое от децата да приключи и връща pid-а му,
а $?
съдържа Process::Status
.
fork { exit 99 } # => 22475
Process.wait # => 22475
$?.exitstatus # => 99
Process.wait2
е сходно, но връща масив от pid и Process::Status
:
fork { exit 99 } # => 22482
Process.wait2 # => [22482, #<Process::Status: pid 22482 exit 99>]
Process.waitpid(pid)
чака дадено конкретно дете да приключи:
pid = fork do
puts "Child: I'm starting..."
sleep 1
puts "Child: I'm done."
end
puts 'Parent: Child running. Waiting for it to complete.'
Process.waitpid(pid)
puts 'Parent: Child is done.'
Process.waitall
чака всички деца да приключат
fork { sleep 1; puts '1' }
fork { sleep 2; puts '2' }
puts '0'
Process.waitall
puts '3'
Process.exec
заменя текущия процес с изпълнение на команда:
fork do
exec 'date'
puts 'Unreachable code'
end
Process.daemon
"откача" процеса от терминала и го пуска в background.
fork do
Process.daemon
loop do
system 'echo Spam >> ~/spam'
sleep 1
end
end
Process.pid
връща process id на текущия процес
Process.ppid
връща parent process id
getpgid
, gid
, setpgid
, uid
и т.н.
spawn
е швейцарско ножче за пускане на процеси
fork
не е наличен на всички ОС и за някои цели се препоръчва spawn
Втори начин за паралелизация и фоново изпълнение на задачи е употребата на нишки.
Създаването на нишка в Ruby е лесно:
thread = Thread.new do
puts 'This is run in a separate new thread'
sleep 1
puts 'The thread is started immediatelly'
end
puts 'This is run in the main thread'
Процесът приключва, когато основната нишка приключи. Ако искате да изчакате
някоя от създадените нишки да приключи преди процесът да излезе, ползвайте
Thread#join
.
thread = Thread.new do
puts 'This is run in the thread'
sleep 1
puts 'The thread is started immediatelly'
end
puts 'This is run in the main thread'
thread.join
Thread#value
блокира основната нишка, докато поднишката не приключи и върне последния
оценен израз
thread = Thread.new do
2 + 2
end
# Can be called multiple times, will block only on the first call.
thread.value # => 4
thread.value # => 4
Ако една нишка предизвика изключение, то няма да убие интерпретатора. Вместо това,
ще се появи в нишката, извикваща #value
или #join
.
thread = Thread.new do
raise 'Oh noes!'
end
thread.join # => error: RuntimeError
Можете да промените последното с Thread.abort_on_exception
.
Thread.abort_on_exception = true
Thread.main
връща основната нишка
Thread.current
връща текущата нишка
Thread.list
връща всички нишкиThread#priority
и
Thread#priority=
Променливи, дефинирани в блока на нишката, са (очевидно) локални за нея:
thread = Thread.new { something = 1 }
thread.join
something # => error: NameError
Блокът на нишката вижда променливите отвън:
answer = 1
thread = Thread.new { answer = 2 }
thread.join
answer # => 2
Можете да подавате стойности на нишката през Thread.new
n = 10
thread = Thread.new(n) do |number|
n # => 20
number # => 10
end
n = 20
thread.join
Всяка нишка функционира като хеш от символи. Така може да правите thread-local променливи – нещо като глобални променливи, но в рамките на една (текущата) нишка:
Thread.current[:x] = 10
thread = Thread.new do
Thread.current[:x] # => nil
Thread.current[:x] = 20
end
thread.join
Thread.current[:x] # => 10
i18n
I18n.locale
и може да се променя с I18n.locale=(new_locale)
I18n.locale
вътрешно ползва Thread.current
Ето примерна имплементация на I18n.locale
методите:
class I18n
class << self
def locale
Thread.current[:locale]
end
def locale=(new_locale)
Thread.current[:locale] = new_locale
end
end
end
Конкурентният достъп до споделени ресурси създава проблеми, когато операциите по промяна на ресурс не са атомарни. Нека разгледаме един пример:
username = 'larodi'
50.times do
Thread.new do
unless User.username_taken?(username)
User.create username: username
end
end
end
Какво става тук? Няколко нишки могат да изпълнят username_taken?
,
преди да се е стигнало до създаване на потребител и да решат, че няма проблем да
създадат такъв, понеже потребителското име е свободно и хоп – дублирани данни.
lock
и unlock
lock
: Ако mutex-ът е отключен го заключва и продължава нататък
lock
: Ако mutex-ът е заключен, приспива нишката докато се отключи, след това прави горното
unlock
: Отключва mutex-а
locked?
, try_lock
и други
Примерна употреба:
user_creation_mutex = Mutex.new
username = 'larodi'
50.times do
Thread.new do
user_creation_mutex.lock
unless User.username_taken?(username)
User.create username: username
end
user_creation_mutex.unlock
end
end
Има и по-удобна форма, приемаща блок:
user_creation_mutex = Mutex.new
username = 'larodi'
50.times do
Thread.new do
user_creation_mutex.synchronize do
unless User.username_taken?(username)
User.create username: username
end
end
end
end
Обърнете внимание, че ако възникне изключение в блока, подаден на synchronize
, mutex-ът ще бъде коректно отключен.
Класически "конкуретен" проблем:
In computing, the producer–consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate a piece of data, put it into the buffer and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer) one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer.
Queue
push
и pop
pop
ще "блокира" нишката, която го извиква, ако опашката е празна
queue = Queue.new
producer = Thread.new do
loop { queue << 'LOOK MA, I AM PRODUCING STUFF!' }
end
consumer = Thread.new do
loop { puts "Consumed #{queue.pop}" }
end
Queue
, но с лимит в размера
push
ще блокира, ако опашката е пълнаQueue
и SizedQueue
Всяка виртуална машина стартира и прави следното в този ред:
SizedQueue
- да ограничим размера на буфера, който изтеглямеclass ConvertCoordinator
def initialize
@convert_queue = SizedQueue.new(1)
@upload_queue = Queue.new
@threads = []
end
def run
start_downloading_files
start_converting_downloaded_files
start_uploading_converted_files
# Wait for all threads to finish
@threads.each(&:join)
end
end
ConvertCoordinator.new.run
class ConvertCoordinator
def start_downloading_files
@threads << Thread.new do
loop do
# Fetch a file to convert and put it in the convert queue.
# This will block if the convert queue is full.
@convert_queue << "some data to convert"
end
end
end
end
class ConvertCoordinator
def start_converting_downloaded_files
@threads << Thread.new do
loop do
# Look for files to convert.
# This will block if the convert queue is empty.
file = @convert_queue.pop
# Convert the file
converted_file = file # system('ffmpeg ...')
# Put the converted file it in the upload queue.
@upload_queue << converted_file
end
end
end
end
class ConvertCoordinator
def start_uploading_converted_files
@threads << Thread.new do
loop do
# Look for converted files, ready to be uploaded.
# This will block if the upload queue is empty.
converted = @upload_queue.pop
# Upload the file
end
end
end
end
Queue
и SizedQueue
в Ruby могат да се имплементират чрез примитиви на по-ниско ниво:
Monitor
, MonitorMixin
и ConditionVariable
Queue
, която се използва за "басейн" от връзки към базата данни (connection pool)
Monitor
, MonitorMixin
и ConditionVariable