Alpakka で AWS SQS 接続 (Scala)

f:id:o8yamazakiy:20210302152657p:plain

はじめに

こんにちは、オープンエイトの山崎です。

今回は、Akka Streams で Alpakka を使って AWS SQS と接続しデータを送受信する方法について説明します。また本記事内では AWS SQS をローカル環境でシミュレートするために ElasticMQ を使用します。ElasticMQ のセットアップについてもあわせて簡単に説明します。記事執筆時の環境は以下の通りです。

  • Scala 2.13.5
  • sbt 1.4.7
  • Akka 2.6.13
  • Alpakka SQS 2.0.2
  • ElasticMQ 1.1.0

オープンエイトでは、SNS 配信効果測定サービス Insight BRAIN において内部のジョブ管理に SQS を利用していますが、その接続処理において実際に Akka Streams と Alpakka を使用しています。

Alpakka

Alpakka とは

Akka Streams で様々なリソースと接続するためのコンポーネント群を提供するプロジェクトが Alpakka です。Akka の公式プロジェクトとして開発・提供されています。たとえばローカルのテキストファイルをストリーム処理する Source や、処理結果を S3 に保存する Flow など、様々な入力ソースや出力先を Akka Streams のコンポーネントとして汎用的に使えるかたちにしてくれています。

Alpakka Documentation
https://doc.akka.io/docs/alpakka/current/

外部サービスとの連携処理が簡単に構築できてしまうのでうまく Alpakka を活用できれば開発効率の向上が見込めるでしょう。

今回はその中から AWS SQS 用のモジュールを使用します。

AWS SQS • Alpakka Documentation
https://doc.akka.io/docs/alpakka/current/sqs.html

ちなみに「Alpakka」という名前は動物のアルパカから取っているものと思われます。アルパカは英語の綴りだと「Alpaca」ですがフィンランドノルウェーでは「Alpakka」と表記するようです。「Akka」という単語が含まれているためこの名前が採用されたのかなと思うのですが、ドキュメントでもそのあたりは特に説明されてなさそうなので正確なところはわかりませんね…。

セットアップ

Alpakka は接続対象ごとにモジュールが分かれているので必要なものだけを選んで導入します。たとえば今回は SQS モジュールが必要なので libraryDependencies を以下のように記述します。

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % "2.6.13",
  "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "2.0.2"
)

実際のコードについては後述します。

AWS SQS

AWS SQS とは

Amazon SQS(サーバーレスアプリのためのメッセージキューサービス)| AWS
https://aws.amazon.com/jp/sqs/

SQS (Simple Queue Service) は AWS が提供するマネージド型のメッセージキューサービスです。AWS では MQ、Kinesis、MSK など複数のメッセージキュー系サービスが提供されていますが、名前の通り最もシンプルな機能を提供しているのが SQS です。プロプライエタリなサービスであり AWS にロックインされる点はネックですが、低コストで利用できる上に単純なユースケースなら大体 SQS でカバーできるのではと思います。

ElasticMQ のセットアップ

SQS は AWS のサービスなので、開発用にローカル環境で動作させるといったことができないのですが、SQS 互換 API を提供する ElasticMQ というアプリケーションを利用することで擬似的にローカル環境で SQS を動作させることができます。

softwaremill/elasticmq: In-memory message queue with an Amazon SQS-compatible interface. Runs stand-alone or embedded.
https://github.com/softwaremill/elasticmq

キューのデータはインメモリで扱われるため永続化されず ElasticMQ サービスを終了すると消えてしまいます。なのであくまでも開発用のツールという位置づけで捉えておくのがよさそうです。

ElasticMQ は Alpakka SQS のテストでも利用されているようです。我々も開発環境で利用していますが、これまでのところ実際の SQS と挙動が異なって困ったというようなことは起きていません。

ちなみに ElasticMQ 自体は Scala 製です。ただし Docker イメージでも提供されているため特に Scala を意識せずに利用できます。Jar 単体で実行したりライブラリとしてアプリケーションに組み込んだり様々な利用方法が提供されていますが、今回は Docker イメージを利用することにします。

docker-compose.yml を作成し以下のように記述します。

version: '3'

services:
  elasticmq:
    image: softwaremill/elasticmq
    container_name: elasticmq_test
    ports:
      - 9324:9324
    volumes:
      - ./elasticmq.conf:/opt/elasticmq.conf

設定ファイル elasticmq.conf を以下のように作成します。今回は test-queue.fifo という名前で FIFO キューを作成します。

include classpath("application.conf")

aws {
  accountId = queue
}

queues {
  "test-queue.fifo" {
    defaultVisibilityTimeout = 10 seconds
    fifo = true
    contentBasedDeduplication = true
  }
}

上記ファイルが用意できたら docker-compose コマンドでコンテナを起動します。

docker-compose up -d

コンテナが起動すると test-queue.fifo というキューが自動的に作成され localhost:9324 で接続可能な状態になります。そのエンドポイントを指定すれば AWS CLI などを使って SQS として扱うことが可能です。

# AWS CLI でキューをリストしてみる
aws sqs list-queues --endpoint-url 'http://localhost:9324'
{
    "QueueUrls": [
        "http://localhost:9324/queue/test-queue.fifo"
    ]
}

ちなみに、おそらくバグだと思うのですが、現行バージョンだと接続 URL の AWS アカウントの部分は queue という文字列でないとうまく動作しないようです。たとえば elasticmq.confaws.accountId = aws01 のように記述すれば http://localhost:9324/aws01/test-queue.fifo で接続できそうに思えるのですが手元の環境では正しく動作しませんでした。

Alpakka で AWS SQS 接続

準備が整ったので実際に Alpakka で SQS (ElasticMQ) に接続してみます。

まず最初に SqsAsyncClient を生成して SQS 接続情報を設定します。

import java.net.URI
import akka.actor.ActorSystem
import akka.stream.alpakka.sqs.SqsSourceSettings
import com.github.matsluni.akkahttpspi.AkkaHttpClient
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient

val endpoint = "http://localhost:9324"
implicit val actorSystem = ActorSystem("example")

implicit val sqsClient = SqsAsyncClient
  .builder()
  .credentialsProvider(
    StaticCredentialsProvider.create(
      AwsBasicCredentials.create("AK", "SK")  // (1)
    )
  )
  .endpointOverride(URI.create(endpoint))     // (2)
  .region(Region.AP_NORTHEAST_1)
  .httpClient(AkkaHttpClient.builder()
    .withActorSystem(actorSystem).build())
  .build()

(1) 今回は本物の SQS ではなくローカルの ElasticMQ への接続なのでアクセスキーとシークレットキーは適当な値を記述しておきます。

(2) endpointOverride でエンドポイントを指定することで接続先を AWS ではなくローカルに向けることができます。

Subscribe

SQS からメッセージをストリーム取得するには SqsSource を使用します。このとき先程生成した SqsAsyncClientimplicit で参照されます。この SqsSource はその名の通り Akka Streams の Source なので Flow や Sink とつないでデータフローを構築できます。

val queueUrl = endpoint + "/queue/test-queue.fifo"
val settings = SqsSourceSettings()

SqsSource(queueUrl, settings)
  .runWith(Sink.foreach { message: Message =>
    val body = message.body
    println(s"received: ${body}")
  })

設定等に問題がなければこれで SQS と接続された状態になります。試しにこのアプリケーションを実行した状態で AWS CLI を使ってメッセージを送信してみます。グループ ID が必要になるのですがとりあえず適当な値で大丈夫です。

# AWS CLI で "hello" というメッセージを送信してみる
aws sqs send-message \
    --queue-url "http://localhost:9324/queue/test-queue.fifo" \
    --endpoint-url "http://localhost:9324" \
    --message-group-id "x" \
    --message-body "hello"

成功すると Scala アプリケーション側のコンソールに received: hello と表示されるはずです。

ちなみにこのままだと SQS 上のメッセージは消えずに残り続けます。なので少し時間が経つとまた同じメッセージを受信してしまいます。一度受信した処理済みのメッセージを削除するには ACK メッセージを送ってメッセージを削除する必要があります。その場合、以下のように SqsAckFlowSqsAckSink を使用します。

import akka.stream.alpakka.sqs.MessageAction
import akka.stream.alpakka.sqs.SqsAckResult

SqsSource(queueUrl, settings)
  .map(MessageAction.Delete(_))
  .via(SqsAckFlow(queueUrl))
  .runWith(Sink.foreach { res: SqsAckResult =>
    val body = res.messageAction.message.body
    println(s"received: ${body}")
  })

これで一度受信したメッセージは SQS 上から削除されます。

Publish

SQS にメッセージを送信する場合は SqsPublishSinkSqsPublishFlow を使います。

import akka.stream.alpakka.sqs.scaladsl.SqsPublishSink
import akka.stream.scaladsl.Source
import software.amazon.awssdk.services.sqs.model.SendMessageRequest

val queueUrl = endpoint + "/queue/test-queue.fifo"

val message =
  SendMessageRequest.builder
    .messageBody("hello")
    .messageGroupId("x")
    .build

Source.single(message)
  .runWith(SqsPublishSink.messageSink(queueUrl))

これを実行すると SQS にメッセージが 1 件送信されます。

AWS CLI で確認することができます。

# AWS CLI でキューからメッセージを取得してみる
aws sqs receive-message \
    --queue-url "http://localhost:9324/queue/test-queue.fifo" \
    --endpoint-url "http://localhost:9324"
{
    "Messages": [
        {
            "MessageId": "5be5527a-ec4e-4f3f-8ad2-67a713938673",
            "ReceiptHandle": "5be5527a-ec4e-4f3f-8ad2-67a713938673#2406b392-60b7-4019-b3b9-4a62620bdbcc",
            "MD5OfBody": "5d41402abc4b2a76b9719d911017c592",
            "Body": "hello"
        }
    ]
}

さいごに

Akka Streams はロジックを Source、Flow、Sink といったかたちでモジュール化して扱うことができるためモジュールの組み合わせでデータフローを構築できる点が非常に強力だと感じています。Alpakka は SQS 接続以外にも様々なモジュールを提供しているので、要件がマッチすれば外部サービスとのデータ連携部分は Alpakka で手軽に構築してしまうことができます。その分ビジネスロジックの開発に集中できるので、うまく活用することで開発効率の向上が図れるのではないでしょうか。

ということで、今回は以上となります。非常に駆け足の内容ではありますが本記事が Akka Streams 活用の一助となれば幸いです。最後までお読みいただきありがとうございました。

Scalaスケーラブルプログラミング第3版

Scalaスケーラブルプログラミング第3版