Quitada ブログ HAX

Hatena Blog でも Quitada ブログ

Spring IO 基盤技術の一つの Reactor を弄ってみた〜 TCP Server 編

前回に続き、Spring IO 基盤技術の一つの Reactor を弄ってみた。

Spring in Action: Covers Spring 4

Spring in Action: Covers Spring 4

今回は、TCP Server のサンプルを作ってみました。まー、TCP の口を開けて、そこからだらだらとストリームチックに流れてくるデータに対して非同期にイベントハンドリングを行うので、前回の単一プロセス内で自分でイベントだして自分で反応するサンプルより、より具体的なユースケースが思いつきやすいですね。

というわけで、動かしてみたので quitada メモを列挙します。

★事前準備

前回、弄ってみた時と同様、disruptor、slf4j、log4j が必要です。あと、TCP サーバー機能をつけるため、Reactor の TCP Server モジュール(reactor-tcp-1.1.0.BUILD-SNAPSHOT.jar とか)と、デフォルトでは Netty のライブラリが必要ですね(netty-all-4.0.17.Final.jar とか)。プラガブルなようで、Netty 的なものであれば何でも対応するようです。

とりあえず quitada 的には、以下の jar を IDE のビルドパスに入れておきました。

  • reactor-core-1.1.0.BUILD-SNAPSHOT.jar
  • reactor-tcp-1.1.0.BUILD-SNAPSHOT.jar
  • disruptor-3.2.0.jar
  • slf4j-log4j12-1.7.6.jar
  • slf4j-api-1.7.6.jar
  • log4j-1.2.17.jar
  • netty-all-4.0.17.Final.jar

今度は、Maven プロジェクトとかで作ってライブラリーダウンロードの自動化してみますかね…。

★サンプルコード

まーこれは、Reactor の TCP Server の Wiki ページ(こちら)のサンプルをそのままパクった感じです。本ブログエントリーに添付します(ReactorTcpServer.java)。
ReactorTcpServer.java 直

で、Wiki ページ見て思ったのが「これ、どう動作確認するの?」

ということで、当初は TCP Client も作らなきゃならないのかなと思ってたら、ポイントはコーデックでした。Reactor におけるコーディックは、TCP Serever があけている口からだらだらと流れてくる TCP のデータの区切りのルールを設定するようなものです。つまり、何をもって、1 つのデータを受信した!と判断するか、その判断する基準ですね。デフォルトのコーデックは、以下のサンプルにあるように LINE_FEED_CODEC という組み込みのものです(ちなみに、見ての通り、ホスト名 localhost で、ポート番号が 15151 でリッスンするよう設定しているのがわかります)。

TcpServer<String, String> server = new TcpServerSpec<String, String>(NettyTcpServer.class)
	.env(env)
	.dispatcher(Environment.RING_BUFFER)
	.listen("localhost", 15151)
	.codec(StandardCodecs.LINE_FEED_CODEC)

これは、エンターコードをデータの区切りとして一つのデータとして受信する、といったコーデックです。なので、このサンプルの動作確認をするためのクライアントとしては、telent がてっとり早い、ということになります。まずは、サンプルアプリを実行して、次にコンソールから以下のように telent コマンドにて接続してみます。

$ telent localhost 15151

そうすると、サンプルアプリケーションを起動したコンソールに以下のメッセージがでてきます。

[ReactorTcpServer] New connection is detected...

これは、以下のコンシューマーのロジック(無名クラスで実装)で反応したってことですね。

TcpServer<String, String> server = new TcpServerSpec<String, String>(NettyTcpServer.class)
           :
	.consume(new Consumer<TcpConnection<String, String>>() { // Consumer を作成(コネクション用)
		public void accept(final TcpConnection<String, String> conn) {
			writeToStdout("New connection is detected...");

で、telent を実行したコンソールから、適当な文字列を入力して、エンターキーを押してみてください。すると、サンプルアプリケーションを実行したコンソールで、以下のように反応すると思います。

[ReactorTcpServer] Got data...:<何か入力した文字列>

これは、以下のコンシューマーのロジック(前出のロジックからさらにネストした無名クラスで実装)で反応したってことですね。

TcpServer<String, String> server = new TcpServerSpec<String, String>(NettyTcpServer.class)
           :
	conn.in().consume(new Consumer<String>() { // Consumer を作成(データインジェスチョン用)
		public void accept(String line) {
			// handle line feed data
			writeToStdout("Got data...:" + line);

おもしろいのは、以下の部分のコードにあるように、TCP Server から接続クライアントに対して、逆にメッセージを送り返すことも可能なんですね。

conn.send("Your data is discarded.");

なので、telent から何か文字列入れてエンターを押すとすぐに、"Your data is discarded." とか TCP Server から返ってきたメッセージが出力されると思います。

まー、地味なデモですが、何かに使えそうなイメージはついたのではないか(少なくとも自分は)と思います。

他システムからデータを送り込むには、TCP ClientAPI 使って TCP Server と連携すればいいのかなという印象。

今後 Spring Framework ベースで Reactor プログラミングしてみたい。Spring Boot とかで使えるかな…。