サムネイル画像

昔Node.jsでファイルアップローダーを作った頃の投稿をしていた記憶があるのだけれども、そのときはfs.createWriteStreamを使えば難しいところはすべてRailsやfetchがなんとかしてくれた記憶がある。

ところが今回はGoで書かなければならない。

Goにはfs.createWriteStreamのような関数が用意されてはいないと思うのだけれども、やはりChatGPTを頼ればなんとなくそれっぽいものを用意してくれるので今回もLLM様様である。

1回目:

package main

import (
  "bytes"
  "fmt"
  "io/ioutil"
  "net/http"
  "os"
)

func main() {
  filePath := "movie.mp4"
  fileData, err := ioutil.ReadFile(filePath)
  if err != nil {
    panic(err)
  }

  req, err := http.NewRequest("POST", "http://your-rails-server/upload",
                              bytes.NewReader(fileData))
  if err != nil {
    panic(err)
  }

  req.Header.Set("Content-Type", "application/octet-stream")
  req.Header.Set("X-Filename", "movie.mp4")

  client := &http.Client{}
  resp, err := client.Do(req)
  if err != nil {
    panic(err)
  }
  defer resp.Body.Close()

  body, _ := ioutil.ReadAll(resp.Body)
  fmt.Println("Response:", string(body))
}
class UploadsController < ApplicationController
  skip_before_action :verify_authenticity_token

  def create
    file_data = request.body.read
    filename = request.headers["X-Filename"] || "uploaded_file"
    File.open(Rails.root.join("public", "uploads", filename), "wb") do |f|
      f.write(file_data)
    end
    render json: { status: "ok", filename: filename }
  end
end

とりあえず初回はContent-Typeを指定してみましたくらいのノリと、アップロードしたファイルをそのまま保存しましたみたいな書き方である。 これはこれで興味深いけれども、このコードはRAMに優しくない実装なので却下。

2回目:

package main

import (
  "bufio"
  "fmt"
  "net/http"
  "os"
  "strconv"
)

func main() {
  filePath := "movie.mp4"
  file, err := os.Open(filePath)
  if err != nil {
    panic(err)
  }
  defer file.Close()

  reader := bufio.NewReader(file)
  chunkSize := 1024
  buffer := make([]byte, chunkSize)
  chunkIndex := 0

  for {
    n, err := reader.Read(buffer)
    if n == 0 {
      break
    }

    isLast := err != nil

    req, err := http.NewRequest("POST",
                                "http://your-rails-server/upload",
                                bufio.NewReaderSize(bufio.NewReader(file), n))
    if err != nil {
      panic(err)
    }
    req.Body = http.NoBody // 実際は以下で送る(修正後コードを下に記載)

    // ヘッダー
    req.Header.Set("Content-Type", "application/octet-stream")
    req.Header.Set("X-Filename", "movie.mp4")
    req.Header.Set("X-Chunk-Index", strconv.Itoa(chunkIndex))
    req.Header.Set("X-Last-Chunk", strconv.FormatBool(isLast))

    // 正しいbody設定
    req.Body = ioutil.NopCloser(bytes.NewReader(buffer[:n]))

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
      panic(err)
    }
    resp.Body.Close()

    chunkIndex++
    if isLast {
      break
    }
  }

  fmt.Println("Upload complete.")
}
class UploadsController < ApplicationController
  skip_before_action :verify_authenticity_token

  def create
    filename = request.headers["X-Filename"]
    chunk_index = request.headers["X-Chunk-Index"].to_i
    is_last = request.headers["X-Last-Chunk"] == "true"

    tmp_path = Rails.root.join("tmp", "#{filename}.uploading")
    File.open(tmp_path, "ab") do |f|
      f.write(request.body.read)
    end

    if is_last
      final_path = Rails.root.join("public", "uploads", filename)
      FileUtils.mkdir_p(File.dirname(final_path))
      FileUtils.mv(tmp_path, final_path)
      render json: { status: "completed", filename: filename }
    else
      render json: { status: "chunk_received", chunk_index: chunk_index }
    end
  end
end

実装量は増えたが一気にそれっぽくなった。 Goを通すとNode.jsで隠ぺいされていた処理が見えてきた。

ただしこのコードを実際に動かしてみると 1024バイト ごと送信するので全く終わらないのと、サーバー側は無数の200のログで埋まってしまう。

3回目:

この時点ではChatGPTの提案にのって並列アップロードを試した。

Goroutineはまだ自分では書けないが、なんとなく読めるようにはなった気がする。

package main

import (
  "bytes"
  "fmt"
  "io"
  "net/http"
  "os"
  "strconv"
  "sync"

  "github.com/google/uuid"
)

const chunkSize = 1024 * 1024 // 1MB
const uploadURL = "http://your-rails-server/upload"
const fileName = "movie.mp4"

func main() {
  file, err := os.Open(fileName)
  if err != nil {
    panic(err)
  }
  defer file.Close()

  uploadID := uuid.New().String()
  var chunks [][]byte
  for {
    buf := make([]byte, chunkSize)
    n, err := file.Read(buf)
    if n == 0 {
      break
    }
    chunk := make([]byte, n)
    copy(chunk, buf[:n])
    chunks = append(chunks, chunk)
    if err == io.EOF {
      break
    }
  }

  var wg sync.WaitGroup
  for i, chunk := range chunks {
    wg.Add(1)
    go func(index int, data []byte, isLast bool) {
      defer wg.Done()

      req, err := http.NewRequest("POST", uploadURL, bytes.NewReader(data))
      if err != nil {
        fmt.Println("Request error:", err)
        return
      }
      req.Header.Set("Content-Type", "application/octet-stream")
      req.Header.Set("X-Filename", fileName)
      req.Header.Set("X-Upload-Id", uploadID)
      req.Header.Set("X-Chunk-Index", strconv.Itoa(index))
      req.Header.Set("X-Last-Chunk", strconv.FormatBool(isLast))

      resp, err := http.DefaultClient.Do(req)
      if err != nil {
        fmt.Printf("Chunk %d failed: %v\n", index, err)
        return
      }
      defer resp.Body.Close()

      if resp.StatusCode != 100 {
        fmt.Printf("Unexpected response for chunk %d: %s\n",
                   index, resp.Status)
      } else {
        fmt.Printf("Chunk %d uploaded successfully\n", index)
      }
    }(i, chunk, i == len(chunks)-1)
  }

  wg.Wait()
  fmt.Println("All chunks uploaded.")
}
class UploadsController < ApplicationController
  skip_before_action :verify_authenticity_token

  def create
    upload_id   = request.headers["X-Upload-Id"]
    filename    = request.headers["X-Filename"]
    chunk_index = request.headers["X-Chunk-Index"]
    is_last     = request.headers["X-Last-Chunk"] == "true"

    unless upload_id && chunk_index && filename
      return head :bad_request
    end

    tmp_dir = Rails.root.join("tmp", "uploads", upload_id)
    FileUtils.mkdir_p(tmp_dir)

    chunk_path = tmp_dir.join("chunk_#{chunk_index}")
    File.open(chunk_path, "wb") { |f| f.write(request.body.read) }

    if is_last
      final_path = Rails.root.join("public", "uploads", filename)
      FileUtils.mkdir_p(final_path.dirname)

      File.open(final_path, "wb") do |f|
        chunks = Dir[tmp_dir.join("chunk_*")]
        chunks = chunks.sort_by { |path| path.split("_").last.to_i }

        chunks.each do |chunk_file|
          f.write(File.binread(chunk_file))
        end
      end

      FileUtils.rm_rf(tmp_dir)
      render json: { status: "completed", filename: filename }, status: :ok
    else
      head 100
    end
  end
end

ただし理論上はうまく行きそうなコードでも実際は1MB程度のファイルにしかなっていなかった。

次のようなエラーも表示されている:

Chunk 283 failed: Post "http://localhost:3000/upload": net/http: HTTP/1.x transport connection broken: unexpected EOF

これはRailsというよりはPumaの同時接続数の都合なのだろうかと仮定を立てたのだけれども、並列化には失敗した。

スレッドセーフの実装や、チャンクそのもののハッシュ化などいろいろ勉強にはなったのだが、いずれも成功できなかった。

まとめ

それからさらに試してみたところ、Goのhttpクライアントに100のステータスコードを設定していた変更が本来の挙動を狂わせてしまう原因だったようだ。

サーバー側のチャンク処理を単純に200を返すように戻したらその後は問題なくアップロードすることができた。

並列処理そのものは興味深いものではあるのだが、今回はどちらかというと単純な実装でGoの理解を深めるほうがよいと判断したのでこれ以上の深追いはやめた。

サーバー側はRubyで処理が追いやすいとはいえ、コントローラー側の処理が多すぎる気もする。

いずれ改良版を出せればよいけれども、今回は分割ダウンロード自体は実現できたので一旦はよしとしようと思う。