Multi Node Distributed Execution Using Infinispan and Dexecutor

We will try to execute Dexecutor in a distributed mode using Infinispan. For the demo we would be setting up multiple infinispan nodes on single machine.

Refer Introducing Dexecutor, to get an introduction on Dexecutor  and to understand the problem we would solve in a distribute fashion. In short:

We would be distributing the execution of dexecutor tasks on Infinispan nodes in a single machine.

To do that one of the nodes would act as master and submit the tasks to DistributedExecutorService to be executed by other infinispan worker nodes.

Step 1: Add dexecutor-infinispan dependency

 


<dependency>
  <groupId>com.github.dexecutor</groupId>
  <artifactId>dexecutor-core</artifactId>
 <version>1.0.2</version>
</dependency>

 

Step 2: Add the default jgroups.xml

Step 3: Create the CacheManager

private DefaultCacheManager createCacheManagerProgrammatically(final String nodeName, final String cacheName) {
	DefaultCacheManager cacheManager = new DefaultCacheManager(globalConfiguration(nodeName), defaultConfiguration());
	cacheManager.defineConfiguration(cacheName, cacheConfiguration());
	return cacheManager;
}

private GlobalConfiguration globalConfiguration(String nodeName) {
	return GlobalConfigurationBuilder
				.defaultClusteredBuilder()
				.transport()
				.nodeName(nodeName)
				.addProperty("configurationFile", "jgroups.xml")
				.build();
}

private Configuration defaultConfiguration() {
	return new ConfigurationBuilder()
				.clustering()
				.cacheMode(CacheMode.REPL_SYNC)
				.build();
}

private Configuration cacheConfiguration() {
	return new ConfigurationBuilder()
				.clustering()
				.cacheMode(CacheMode.DIST_SYNC)
				.hash()
				.numOwners(2)
				.build();
}

Step 4 : Create Dexecutor instance using InfinispanExecutionEngine


EmbeddedCacheManager cacheManager = createCacheManagerProgrammatically(nodeName, cacheName);
final Cache<String, String> cache = cacheManager.getCache(cacheName);
DefaultExecutorService distributedExecutorService = new DefaultExecutorService(cache);
DefaultDependentTasksExecutor<Integer, Integer> dexecutor = newTaskExecutor(distributedExecutorService);

private DefaultDependentTasksExecutor<Integer, Integer> newTaskExecutor(final DistributedExecutorService executorService) {
	return new DefaultDependentTasksExecutor<Integer, Integer>(taskExecutorConfig(executorService));
}

private DependentTasksExecutorConfig<Integer, Integer> taskExecutorConfig(final DistributedExecutorService executorService) {
	return new DependentTasksExecutorConfig<Integer, Integer>(executionEngine(executorService), new SleepyTaskProvider());
}

private InfinispanExecutionEngine<Integer, Integer> executionEngine(final DistributedExecutorService executorService) {
	return new InfinispanExecutionEngine<Integer, Integer>(executorService);
}

Step 5: Only master should create tasks

if (isMaster) {
	DefaultExecutorService distributedExecutorService = new DefaultExecutorService(cache);
	DefaultDependentTasksExecutor<Integer, Integer> dexecutor = newTaskExecutor(distributedExecutorService);

	buildGraph(dexecutor);
	dexecutor.execute(ExecutionBehavior.TERMINATING);
}

Refer the full code here

Step 4: Run the Application

Terminal #1 : run as worker

mvn test-compile exec:java -Djava.net.preferIPv4Stack=true -Dexec.mainClass="com.github.dexecutor.infinispan.Node" -Dexec.classpathScope="test" -Dexec.args="s node-A"

Terminal #2: run as worker

mvn test-compile exec:java -Djava.net.preferIPv4Stack=true -Dexec.mainClass="com.github.dexecutor.infinispan.Node" -Dexec.classpathScope="test" -Dexec.args="s node-B"

 

Terminal #3 : run as master

mvn test-compile exec:java  -Dexec.classpathScope="test" -Djava.net.preferIPv4Stack=true -Dexec.mainClass="com.github.dexecutor.infinispan.Node" -Dexec.args="m node-C"

Here is the output:

dexecutor-multi-node-infinispan-single-machine

References