VMware 社のデータグリッド製品である vFabric GemFire に、Function Execution という機能があります。GemFire で任意のロジックを実行する機能なんですが、データベース的な観点でみるとストアードプロシージャのようなもの、分散ノードに対して並列実行するという観点では Hadoop みたいなことができます。詳しくは、英語ですが以下のドキュメントを参照してください。
ただいざ実際に Function Execution 機能を使って、何かサンプルアプリを作ってみるとなると、細かいところで色々と分からなくて、しばらく試行錯誤を繰り返して、ようやく動くものができたので公開したいと思います。
まずは Function Execution を使うモチベーションを想定します。
GemFire に格納されているデータ群に対してなんらかのデータ処理を行いたい。GemFire はスケールアウト型のデータストアなので、その特定を生かし、パーティションリージョンにてデータ群を各ノードに分割して均等に保持した上で、各ノードのデータ群に対してデータ処理を一斉かつ並列的に実行すれば、ノードを追加すればするほど処理時間が短縮できてうれしい。というか、そうしたい。
ま、Hadoop でやりたいことの GemFire 版ということですね。実行イメージ図としては、以下な感じ(画像をクリックして、オリジナルサイズを表示をクリックすると原寸大のサイズで表示します)。
本題に入る前に、環境構築。キャッシュサーバーを 3 つ起動します。キャッシュサーバーの数やマシンの数が多くして並列度を高めた方が効果が見えやすいですが、同一分散システム内で評価ライセンスでで起動できるキャッシュサーバー数は 3 なんで…。ま、cacheserver コマンド使って適当に起動するとよいでしょう。
さて本題です。上述の図の番号順にサンプルコードを元に説明します。
(1)クライアント
さて最初はクライアントから。そしていきなり、サンプル Java ソースコード(MyLikeFunctionClient.java)とサンプル cache.xml ファイル(PoolClient.xml)を添付します。
MyLikeFunctionClient.java
PoolClient.xml
PoolClient.xml には特別な設定はしてないです。動かす時は、環境にあわせて適宜変えてください。
MyLikeFunctionClient.java で注目するところは、まずは以下の部分です。
MyLikeFunctionService function = new MyLikeFunctionService();
FunctionService.registerFunction(function);
MyLikeFunctionService というクラスは、Function Execution の機能を使ってメンバーで実行するロジックを記述したファンクション実装クラスです。これは自分で実装します。サンプルコードは後ほど掲載します。で、それをインスタンス化して、FunctionService.registerFunction() という関数で登録してやります。
次のポイントは、以下の部分です。
Execution execution = FunctionService.onRegion(testRegion)
.withArgs(likeValue)
.withCollector(new MyArrayListResultCollector());
これは、実装したロジックを Function Execution の機能を使ってキックするところです。onRegion([リージョン]) を設定しているので指定リージョンにたいするロジックの実行、withArgs([引数]) は実行ロジックに渡したい引数、withCollector([リザルトコレクター]) は結果取得ロジックを記述したオブジェクト指定です。結果取得ロジッククラス(この例では、MyArrayListResultCollector)はこれも自身で記述します。後ほどサンプルコードを掲載します。
クライアントロジック最後のポイントは以下です。
ResultCollector rc = execution.execute(function); List result = (List) rc.getResult();
1 行目は結果取得を行っています。結果は、ResultCollector オブジェクトに結果が格納されます。対象メンバーから全ての結果がそろうまで、この部分で待ちが入ります。次の行は、ResultCollector オブジェクトに格納したものを、java.util.List に格納して、普通の Java アプリケーションで扱いやすいよう変換してやります。ResultCollector はあくまでもインターフェースで、その実装クラスがちょっと前にでてきた、結果取得ロジッククラスです。ここでは、実装した getResult() メソッドが、java.util.ArrayList 型を返すように実装している(サンプルコードは後述)ので、java.util.List にキャストしています。
(2)ファンクション実装クラス - キャッシュで実行するロジックを記述
さて次はキャッシュプロセス(本件の例ではキャッシュサーバー)で、ファンクション実装クラス(Function Execution 機能を使って実行するロジックを実装)についてです。ま、ここが Function Execution の肝ですね。以下がサンプルコードと、キャッシュサーバーの設定ファイル(xml)です。
MyLikeFunctionService.java
Partitioned.xml
あと、本ソースコードのロジックで使用しているドメインオブジェクトのソースコード(リージョンにつっこんである、バリューですね)も添付します。
MyData.java
ついでに、ターゲットなるリージョンにサンプルを動かすためのダミーのデータを put するサンプルも添付します(上述のクライアントのサンプルと同じ xml ファイルを使用してリージョン設定を行っています)。
MyDataPutter.java
まず、MyLikeFunctionService.java ですが、クラス定義の行が以下のようになっています。
public class MyLikeFunctionService extends FunctionAdapter implements Declarable {
FunctionAdapter クラスを継承するのは Function Execution 使う際のお約束ですね。何か他のクラスを継承する必要があって、FunctionAdapter を継承できない場合は、Function インターフェスを実装しましょう。あと、Declarable を実装しているのは、Partitioned.xml の以下の部分からファンクション実装クラスを読み込むためです。
<function-service> <function> <class-name>quitada.MyLikeFunctionService</class-name> </function> </function-service>
ファンクション実装クラスでは、execute メソッドに実行するロジックと、Function 呼び出し元に結果を返すロジックをいれてやります。サンプルで重要なところは、「パーティションリージョンにおける各ローカルリージョンのプライマリーバケットのみを取得して、それに対してクエリーをかける」という部分です。何も考えずにやると、各メンバーが、それぞれリージョン全体のデータを取得してクエリーをかけるという無駄に同じことをやってしまって並列処理の意味がなくなるのです。さて、その工夫はサンプルでは以下のように行っています。
RegionFunctionContext context = (RegionFunctionContext) fc; : Region localEntries = PartitionRegionHelper.getLocalDataForContext(context);
PartitionRegionHelper を使ってやるんですね。なお、PartitionRegionHelper のメソッドに、getLocalPrimaryData というメソッドがあり、名前からして上述の目的「〜各ローカルリージョンのプライマリーバケットのみを取得〜」を達成できそうな感じですが、やってみた限りでは期待した動作はしないようです。
次に着目するポイントは、Function 呼び出し元への結果返却方法です。特に、今回の例のようにクエリーした結果が複数ある場合です。
例えば、結果が 10 個あるとしましょう(本サンプルの場合だと、クエリーの結果セットのサイズが 10)。9 個目までは、以下のように sendResult メソッドを使って送ります。
fc.getResultSender().sendResult(myData.get(j));
10 個目、つまり最後の 1 個を送る際は「これが最後ですよー」ということもあわせてお知らせする必要があるので、lastResult メソッドを使って送ります。
fc.getResultSender().lastResult(myData.get(listSize-1));
処理的には、このメソッドを実行した時点でファンクション実装クラスの execution メソッドの処理から抜けます。
あと、上述の目的達成のため、以下のようにファンクション実装クラスの optimizeForWrite の返値を true にしてやる必要があるみたいですが、本サンプルに関しては false でも動作の違いは結果だけみると感じられないです。
public boolean optimizeForWrite() { return true; }
#ファンクション実行中にリージョンへの書き込みが発生するようなロジックの場合に、感じられるのかな?ちょっと不明。
(3)リザルトコレクター
最後は、結果を集めてリクエスト元に返すリザルトコレクターの実装です。これも自前で実装します。まずはサンプルコードを添付します。
MyArrayListResultCollector.java
このクラスは、本サンプルのリクエスト元であるクライアントコードにもでてきましたね。よほど特殊なことをしようとしない限り、このサンプを若干修正して使うようなイメージになると思います。
ポイントは、以下 ResultCollector インターフェース実装時の型変数の宣言部分ですかね。
public class MyArrayListResultCollector implements ResultCollector<Struct,ArrayList> { final ArrayList<Struct> result = new ArrayList<Struct>();
これは、ファンクション実行結果の一つ一つが Struct 型(クエリー実行結果なので)で、結果セットは ArrayList 型ということです。これを決めてしまえば残りのソースコードの型をこれにあわせてやればよいわけですね。
なお、ResultCollector インターフェースの型変数は、GemFire 6.6.1 までは Serializable を実装している必要がありました(java doc 参照)。GemFire 6.6.2 はそういった制限はなくなりました(同じく java doc 参照)。ArrayList はさておき、Struct は Serializable でないので、本サンプルは GemFire 6.6.2 以降でないと動きません。ぐはぁ。6.6.1 で動くようにするには、ファンクション実装クラスで返却する結果を Serializable なオブジェクトでラップしてやらないといけないですね。ややめんどい。
最後に…
ちなみにこのサンプルコードは、ファンクション実行の結果、何か例外が発生したり、結果の数がゼロだったりしたら、ただただ null を返すだけです。実アプリで使うとしたらそこらへんも考慮する必要があるでしょう。対応策として、例えば、例外が発生した場合は特殊なオブジェクトを結果として返して、そこらへんのハンドリングをリザルトコレクターでやるとか、といったことが考えられます。