並列処理を書いてみる
今年に入ってから全く趣味のプログラミングができていないのだが、副業としては幸い続いている。
今の案件では大量の商品データをCSVでダウンロードして、それらをRailsのレコードとして保存することでSQL操作を行いながらデータ整形やら分析やらのタスクをこなしている。
商品データには画像のURLがあるのだが、そもそも画像URLが設定されていない論外な商品データもある。そして画像URLそのものが404
を返すものが見受けられたのだ。
有効な商品全体だけでもなんと70万件以上もあるので、手作業でこれらのファイルをひとつひとつ確認するのは難しいだろう。
サムネイル画像になんとなくネズミのようなキャラを採用してみたのだけれども、今回Go言語は全く関係ないのであしからず。
🔍 もとになったコード
そして私はスクレイピングくらいであれば片手間で書くことはできるのだが、それまでの手法はループで1秒ごとに繰り返し行うものだ。
class ProductTask
def initialize
@products = Product.where(http_code: nil)
@progress = ProgressBar.create(total: @products.count)
end
def perform
@products.find_each do |product|
begin
response = HTTParty.head(product.image)
sleep 1
product.update(http_code: response.code)
rescue StandardError => e
puts "Error fetching image for product #{product.handle}: #{e.message}"
ensure
@progress.increment
end
end
end
end
このコードの実行時間を置き換えると:
なんと約194時間と26分。
つまり約8日と2時間で1週間ずっと回し続けてようやく完了するかもしれないようだ。
全くもって不可能とも言い切れないのだが、これでは今回の納期に間に合わせるのは難しいだろう。
そこで並列処理を書いてみようと思ったわけだ。
🔧 並列処理付きの改善コード
なんの変哲もないコードであるのだが、これが8日のスクリプトである。
ChatGPTを経由することで次のように仕上がった:
require 'httparty'
require 'thread'
require 'concurrent-ruby'
class ProductHttpCodeTask
THREAD_COUNT = 5 # DBのpoolサイズに合わせて調整(例: database.ymlでpool: 5)
def initialize
@products = Product.where(http_code: nil)
@progress = ProgressBar.create(total: @products.count)
@queue = Queue.new
@products.find_each { |product| @queue << product }
end
def perform
threads = []
THREAD_COUNT.times do
threads << Thread.new do
ActiveRecord::Base.connection_pool.with_connection do
while !@queue.empty?
product = @queue.pop(true) rescue nil
next unless product
begin
response = HTTParty.head(product.image)
sleep 1
product.update(http_code: response.code)
rescue StandardError => e
puts "Error fetching image for product #{product.handle}: #{e.message}"
ensure
@progress.increment
end
end
end
end
end
threads.each(&:join)
end
end
✅ ポイント
- 並列数はDBのpoolサイズ以下にする
- 各スレッドは自分でDB接続を管理する必要がある
- スレッド内で
ActiveRecord
を使う場合、明示的にActiveRecord::Base.connection_pool.with_connection
を使う
特にスレッドの概念を深く理解していなくても、ChatGPTにとってはこのようなコードは朝飯前のようだった。
実に頼もしいが、このコードはスレッドによって管理されたプログラムがキューに溜まっているコードがなくなるまで実行し続ける。
1秒ごとのウェイトを設けてはいるが、いざ動かしてログの様子を眺めていると感覚的にはほとんどウェイトを介さずにリクエストを送信しているようにも思える。
本来であればこれらのリクエストは5台のPCから分散して実行したいものでもあるが、今回はそこまで高度なトピックは扱わないでおく。
興味深いのはProgressBar
というgemはスレッド内でincrement
メソッドを実行しても動作に問題はなさそうだということだ。
これがいわゆるスレッドセーフという概念なのだろう。
⌚ 実行時間
肝心の実行時間ではあるが、HTTP通信を行わずに2,000件のレコードを使ってupdate
をしてみた。
実行したところ、完了まで6分45秒かかった。
- 2,000件 → 6分45秒(= 405秒)
- 全体件数:705,671件
① 単位あたりの処理時間(1件あたり)
2000 / 405 秒 = 0.2025 秒/件
② 全体の予測時間
705,671 × 0.2025 = 142,393.1775 秒
③ 時間・分に変換
142,393 / 60 ≈ 2,373.2 分
2,373.2 / 60 ≈ 39.55 時間
✅ 結論
およそ 39時間33分 かかると予想されます。
とはいえこれはおよそ0.2秒の操作を70万回繰り返した時間とほぼ同じ値であり、1/5秒はつまり0.2秒だと考えると単純にスレッドを使わずに0.2秒のループを設定すればよかっただけではないかとすら思えてしまう。
もちろん現実の処理がすべて計算通りに終わる保証はないし、あまりに高頻度でHTTP操作を行っているのだから本来200
を返すファイルがもっと遅いレスポンスを返したり、最悪のケースはそもそも通信を遮断されてしまう可能性すら考えうる。
結局のところ、およそ2日後あるいはそれよりも先か後に結果が出るだろう。
今回は実験的な要素が強くて実用性はないのかもしれないが、並列処理も今後は取り組んでいきたい。
そのためにもGo言語を今年こそ取り組むべきかもしれないが、それはまた別の機会にしようと思う。