読者です 読者をやめる 読者になる 読者になる

テストステ論

高テス協会会長が, テストステロンに関する情報をお届けします.

(akashic report) 進捗報告: Multipart Uploadをサポートしました

私が現在, 熱心に開発しているakashic-storageは, finagle/finch上に作られたFS-S3互換レイヤーです. 2月末のリリースを目指しています.
finchというフレームワークは素晴らしいのですが, 使っているアプリケーションはあまりありません. また, Scalaで書かれたストレージソフトウェアというものもあまり存在しないため, 価値のあるソフトウェアです.

GitHub - akiradeveloper/akashic-storage: Akashic records implementation in Scala

Multipart UploadはS3の目玉機能の一つです.

サーバに巨大なファイルを送るとき, 巨大なファイルを一気に送ることが出来ないこともありますし, そもそも, ネットワークが不安定な場合, 途中まで送ったのに中断されてしまいまた最初からやり直しということになりかねません. そのために, 巨大なファイルを細切れにして失敗するにしてもリスクを下げるというのがMultipart Uploadのもたらす効果の一つです.

もう一つは性能です. 送りたいファイルの細切れを並列にリクエストすることが可能で, これにより性能向上が出来ます.

今回, akashic-storageはMultipart Uploadを実装しました. 以下のテストをパスします.

  test("multipart upload (lowlevel)") { p =>
    import p._

    client.createBucket("myb")

    createLargeFile(LARGE_FILE_PATH)
    val upFile = LARGE_FILE_PATH.toFile

    val initReq = new InitiateMultipartUploadRequest("myb", "myobj")
    val initRes = client.initiateMultipartUpload(initReq)
    assert(initRes.getBucketName === "myb")
    assert(initRes.getKey === "myobj")
    assert(initRes.getUploadId.length > 0)

    import scala.collection.mutable
    val partEtags = mutable.ListBuffer[PartETag]()
    val contentLength = upFile.length

    var i = 1
    var filePos: Long = 0
    while (filePos < contentLength) {
      val partSize = Math.min(contentLength - filePos, 5 * 1024 * 1024)

      val uploadReq = new UploadPartRequest()
        .withBucketName("myb")
        .withKey("myobj")
        .withUploadId(initRes.getUploadId())
        .withPartNumber(i)
        .withFileOffset(filePos)
        .withFile(upFile)
        .withPartSize(partSize)

      val res = client.uploadPart(uploadReq)
      assert(res.getETag.size > 0)
      assert(res.getPartNumber === i)
      assert(res.getPartETag.getETag.size > 0)
      assert(res.getPartETag.getPartNumber === i)

      partEtags.add(res.getPartETag)

      i += 1
      filePos += partSize
    }

    val compReq = new CompleteMultipartUploadRequest(
      "myb",
      "myobj",
      initRes.getUploadId(),
      partEtags
    )
    val compRes = client.completeMultipartUpload(compReq)
    assert(compRes.getETag.size > 0)
    assert(compRes.getBucketName === "myb")
    assert(compRes.getKey === "myobj")
    assert(compRes.getVersionId === "null")
    assert(compRes.getLocation === s"http://${server.address}/myb/myobj")

    val obj = client.getObject("myb", "myobj")
    checkFileContent(obj, upFile)
  }

  test("multipart upload (highlevel)") { p =>
    import p._

    client.createBucket("myb")

    createLargeFile(LARGE_FILE_PATH)
    val upFile = LARGE_FILE_PATH.toFile

    val tmUp = new TransferManager(client)
    val upload = tmUp.upload("myb", "myobj", upFile)
    upload.waitForCompletion()
    tmUp.shutdownNow(false) // shutdown s3 client = false

    val obj = client.getObject("myb", "myobj")
    checkFileContent(obj, upFile)

    val tmDown = new TransferManager(client)
    val downFile = Paths.get("/tmp/akashic-storage-test-large-file-download").toFile
    val download = tmDown.download(new GetObjectRequest("myb", "myobj"), downFile)
    download.waitForCompletion()
    tmDown.shutdownNow(false)

    assert(IOUtils.contentEquals(
      Files.newInputStream(upFile.toPath),
      Files.newInputStream(downFile.toPath)
    ))
  }