Lightweight Workflow Like Execution Using Dexecutor

Dexecutor can be used very easily for workflow like cases as depicted in the following diagram.

dexecutor-workflow-example

Dexecutor instance is created using DexecutorConfig, which in turn requires ExecutionEngine and TaskProvider, Default Implementation of ExecutionEngine uses ExecutorService, so lets create a Dexecutor Instance first (source code can be found here):

private static ExecutorService buildExecutor() {
   ExecutorService executorService = Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
   return executorService;
 }
private Dexecutor<String, Boolean> buildDexecutor(final ExecutorService executorService) {
   DexecutorConfig<String, Boolean> config = new DexecutorConfig<>(executorService, new WorkFlowTaskProvider());
   return new DefaultDexecutor<>(config);
 }

TaskProvider comes into action, when it is the time to execute the task, for this example we will have simple implementation WorkFlowTaskProvider

public class WorkFlowTaskProvider implements TaskProvider<String, Boolean> {

  private final Map<String, Task<String, Boolean>> tasks = new HashMap<String, Task<String, Boolean>>() {

  private static final long serialVersionUID = 1L;
  {
    put(TaskOne.NAME, new TaskOne());
    put(TaskTwo.NAME, new TaskTwo());
    put(TaskThree.NAME, new TaskThree());
    put(TaskFour.NAME, new TaskFour());
    put(TaskFive.NAME, new TaskFive());
    put(TaskSix.NAME, new TaskSix());
    put(TaskSeven.NAME, new TaskSeven());
   }
  };

 @Override
 public Task<String, Boolean> provideTask(final String id) {
 return this.tasks.get(id);
 }
}

For simplicity we have implemented Task for each of the tasks (1..7), those can be found here, Most of the task implementations are same except for TaskTwo (if task 2 result is TRUE then tasks 3 and 4 would be executed otherwise task 5 would be executed) and TaskFive (If task 5 is executed (not skipped) then task task 6 would be executed).

dexecutor-task

TaskFive (TaskThree, TaskFour and TaskSix) overrides shouldExecute() method, to signal if the task should be executed or skipped.

dexecutor-skipping-task-execution

Next step is to build the graph

dexecutor-workflow-graph-building

If WorkFlowApplication is executed, following output can be observed.

Output if TaskTwo result is false

Executing TaskOne , result : true
Executing TaskTwo , result : false
Executing TaskFive , result : true
Executing TaskSix , result : true
Executing TaskSeven , result : true

Output if TaskTwo result is true

Executing TaskOne , result : true
Executing TaskTwo , result : true
Executing TaskFour , result : true
Executing TaskThree , result : true
Executing TaskSeven , result : true

 

References

 

Take Migration Process To Next Level Using Dexecutor

You have Data Migration process, which updates the Application from version X to X+1, by running Migration Scripts (each script consists of sequence of instructions) sequentially, to bring the application to a desired state.

Problem

The synchronous process is causing delays leading to unproductive wait times and dissatisfaction from users. There is a need for process to decrease the scripts execution time by running tasks in parallel where ever applicable to come to desired state.

Driving Forces

The following are driving forces behind Dexecutor.

  • Supports Parallel execution, conditionally may revert to sequential execution (provided such logic is provided)
  • Ultra light (Version 1.1.1 is 44KB)
  • Ultra fast
  • Distributed Execution supported
  • Immediate/Scheduled Retry logic supported
  • Non-terminating behaviour supported
  • Conditionally skip the task execution

Solution

Incorporate Dexecutor into your script execution logic, additionally distribute the execution using Infinispan, Hazelcast or Ignite. Here is the sample application which demonstrate this functionality, fork it and have fun🙂

Dexecutor can be used in this case easily by adding an Algorithmic logic on top of Dexecutor which builds the graph based on table names. Lets assume the following scripts:

Script 1 ==> operates on Tables t1 and t2 and takes 5 minute
Script 2 ==> operates on Tables t1 and t3 and takes 5 minute
Script 3 ==> operates on Tables t2 and t4 and takes 5 minute
Script 4 ==> operates on Tables t5 and t6 and takes 5 minute
Script 5 ==> operates on Tables t5 and t7 and takes 5 minute
Script 6 ==> operates on Tables t6 and t8 and takes 5 minute

Normally these scripts are executed sequentially as follows.

Script 1  5 minutes
  |
  V
Script 2  5 minutes
  |
  V
Script 3  5 minutes
  |
  V
Script 4  5 minutes
  |
  V
Script 5  5 minutes
  |
  V
Script 6  5 minutes

Total time 30 minutes 

In sequential case, total execution time would be 30 minutes, However if we could parallelize the script execution, make sure scripts are executed in right sequence and order, then we could save time, decreasing the total execution time to just 10 minutes.

       +----------+                       +----------+
       | Script 1 |                       | Script 4 |             ==> 5 minutes
  +----+----------+--+               +----+----------+-----+
  |                  |               |                     |
  |                  |               |                     |
+-----v----+   +-----v----+     +----v-----+        +------v---+
| Script 2 |   | Script 3 |     | Script 5 |        | Script 6 |   ==> 5 minutes
+----------+   +----------+     +----------+        +----------+

Total Time 10 minutes

Using Dexecutor, we just have to write the algorithm which facilitates building graph using the API exposed by Dexecutor, and rest would be taken care by Dexecutor.  MigrationTasksExecutor implements that algorithm, considering the SQLs in the migration scripts. Since table names in the SQL plays a crucial role in building the graph, we need an efficient, ultra light and ultra fast library to extract table names out of SQLs, and hence we would use sql-table-name-parser, use it by adding the following dependency in your POM.

<dependency>
    <groupId>com.github.mnadeem</groupId>
    <artifactId>sql-table-name-parser</artifactId>
    <version>0.0.2</version>
  </dependency>

And of course, Dexecutor should be added as dependency as well

<dependency>
   <groupId>com.github.dexecutor</groupId>
   <artifactId>dexecutor-core</artifactId>
   <version>LATEST_VERSION</version>
 </dependency>

The graph, that would be built, considering the migration script is the following.

dexecutor-graph

 

As can be seen here node base1, base3 and base 4 runs in parallel and once, one of them finishes its children are executed, for example if node base1 is finished then its children base2 and app3-1 are executed and so on.

Notice that for node app2-4 to start, app1-4 and app2-1 must finish, similarly for node app3-2 to start, app3-1 and app2-4 must finish.

Just Run this class to see how things proceed.

Conclusion

We can indeed run dependent/independent tasks in easy and reliable way with Dexecutor.

References

Multi-Node Distributed Execution Using Hazelcast and Dexecutor

We will try to execute Dexecutor in a distributed mode using Hazelcast. For the demo we would be setting up multiple Hazelast nodes on single machine.

Refer Introducing Dexecutor, to get an introduction on Dexecutor  and to understand the problem we would solve in a distribute fashion. In short:

We would be distributing the execution of dexecutor tasks on Hazelcast compute nodes in a single machine.

To do that one of the nodes would act as master and submit the tasks to Hazelcast compute nodes to be executed by other Hazelcast compute nodes using Dexecutor.

Here are the steps to do that :

Step 1: Add dexecutor-hazelcast dependency

<dependency>
     <groupId>com.github.dexecutor
     <artifactId>dexecutor-hazelcast
     <version>LATEST_RELEASE

Step 2: Get an Instance of Hazelcast IExecutorService from Hazelcast

 Config cfg = new Config();
 HazelcastInstance instance = Hazelcast.newHazelcastInstance(cfg);
 IExecutorService executorService = instance.getExecutorService("test");

Step 3 : Create Dexecutor using IExecutorService

if (isMaster) {
  DefaultDependentTasksExecutor<Integer, Integer> dexecutor = newTaskExecutor(executorService);

  buildGraph(dexecutor);
  dexecutor.execute(ExecutionConfig.TERMINATING);
 }
private DefaultDependentTasksExecutor<Integer, Integer> newTaskExecutor(IExecutorService executorService) {
  DependentTasksExecutorConfig<Integer, Integer> config = new DependentTasksExecutorConfig<Integer, Integer>(
  new HazelcastExecutionEngine<Integer, Integer>(executorService), new SleepyTaskProvider());
  return new DefaultDependentTasksExecutor<Integer, Integer>(config);
 }

  private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {

  public Task<Integer, Integer> provideTask(final Integer id) {
     return new HazelcastTask(id);
  }
 }

Step 4: Execution

Open three terminals and execute the following :

Terminal #1

 mvn test-compile exec:java -Djava.net.preferIPv4Stack=true -Dexec.mainClass="com.github.dexecutor.hazelcast.Node" -Dexec.classpathScope="test" -Dexec.args="s node-A"

Terminal #2

 mvn test-compile exec:java -Djava.net.preferIPv4Stack=true -Dexec.mainClass="com.github.dexecutor.hazelcast.Node" -Dexec.classpathScope="test" -Dexec.args="s node-B"
Terminal #3
 mvn test-compile exec:java -Djava.net.preferIPv4Stack=true -Dexec.mainClass="com.github.dexecutor.hazelcast.Node"  -Dexec.classpathScope="test" -Dexec.args="m node-C"

Here is the Execution
dexecutor-hazelcast-execution

Here is the Complete Node Implementation

References

Multi Node Distributed Execution Using Ignite And Dexecutor

We will try to execute Dexecutor in a distributed mode using Apache Ignite. For the demo we would be setting up multiple Ignite nodes on single machine.

Refer Introducing Dexecutor, to get an introduction on Dexecutor  and to understand the problem we would solve in a distribute fashion. In short:

We would be distributing the execution of dexecutor tasks on Apache Ignite nodes in a single machine.

To do that one of the nodes would act as master and submit the tasks to Ignite to be executed by other Ignite compute nodes using Dexecutor.

Here are the steps to do that :

Step 1: Add dexecutor-ignite dependency

<dependency>
     <groupId>com.github.dexecutor<groupId>
     <artifactId>dexecutor-ignite<artifactId>
     <version>LATEST_RELEASE<version>
 <dependency>

Step 2: Start Ignite

IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setGridName(nodeName);

Ignite ignite = Ignition.start(cfg);

Step 3 : Create Dexecutor using Ignite

if (isMaster) {
 DefaultDependentTasksExecutor<Integer, Integer> dexecutor = newTaskExecutor(ignite.compute());

 buildGraph(dexecutor);
 dexecutor.execute(ExecutionConfig.TERMINATING);
 }
private DefaultDependentTasksExecutor<Integer, Integer> newTaskExecutor(final IgniteCompute igniteCompute) {
        DependentTasksExecutorConfig<Integer, Integer> config = new DependentTasksExecutorConfig<Integer, Integer>(
                new IgniteExecutionEngine<Integer, Integer>(igniteCompute), new SleepyTaskProvider());
        return new DefaultDependentTasksExecutor<Integer, Integer>(config);
}

Step 4: Execution

Open three terminals and execute the following :

Terminal #1

  mvn test-compile exec:java -Djava.net.preferIPv4Stack=true -Dexec.mainClass="com.github.dexecutor.ignite.Node" -Dexec.classpathScope="test" -Dexec.args="s node-A"

Terminal #2

 mvn test-compile exec:java -Djava.net.preferIPv4Stack=true -Dexec.mainClass="com.github.dexecutor.ignite.Node" -Dexec.classpathScope="test" -Dexec.args="s node-B"
Terminal #3
  mvn test-compile exec:java  -Dexec.classpathScope="test" -Djava.net.preferIPv4Stack=true -Dexec.mainClass="com.github.dexecutor.ignite.Node" -Dexec.args="m node-C"

dexecutor-ignite-execution

Here is the Node Implementation.

References

Jackson Mixin To The Rescue

Many a times it is not possible to annotate classes with Jackson Annotations simply for serialization/deserialization needs. There could be many reasons, for example

  • Classes which needs to be serialized/deserialized are 3rd party classes.
  • You don’t want Jackson invade into your code base every where.
  • You want cleaner and modular design.

Jackson Mixin feature would help solve above problems easily. Lets consider an example :

Let’s say you want to serialize/deserialize following class (Note that it does not have getter/setter)

 public class Address {

	private String city;
	private String state;

	public Address(String city, String state) {
		this.city = city;
		this.state = state;;
	}

	@Override
	public String toString() {
		return "Address [city=" + city + ", state=" + state +  "]";
	}
}

If you try to serialize, you would get the following error

Exception in thread "main" com.fasterxml.jackson.databind.JsonMappingException: No serializer found for class com.some.package.Address and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) )

To solve the above issue, you have to do the following

  1. Add a default constructor
  2. Add getter/setter for each property

However that is not possible for many cases. Here we can use Jackson Mixin to get around with this problem, to do that we have to create corresponding mixing class, as can be seen here, constructor should match as that of your source object and you have to use Jackson annotations (@JsonCreator, @JsonProperty etc.)

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public abstract class AddressMixin {

    @JsonCreator
    public AddressMixin(
            @JsonProperty("city") String city,
            @JsonProperty("state") String state) {
        System.out.println("Wont be called");
        
    }
}

Still you will get the following exception

Exception in thread "main" com.fasterxml.jackson.databind.JsonMappingException: No serializer found for class com.some.package.Address and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) )

It turns out that, we have to tell jackson to use reflection and access the fields.

mapper.setVisibility(mapper.getSerializationConfig()
        	.getDefaultVisibilityChecker()
                .withFieldVisibility(JsonAutoDetect.Visibility.ANY)
                .withGetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withSetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withCreatorVisibility(JsonAutoDetect.Visibility.NONE));

Here is the teset code:

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JacksonMixInTest {

	public static void main(String[] args) throws IOException {
   
	Address address = new Address("Hyderabad",  "Telangana");

        ObjectMapper mapper = buildMapper();

        final String json = mapper.writeValueAsString(address);
        System.out.println(json);

        mapper.addMixIn(Address.class, AddressMixin.class);

        final Address deserializedUser = mapper.readValue(json, Address.class);
        System.out.println(deserializedUser);
    }

	private static ObjectMapper buildMapper() {
		ObjectMapper mapper = new ObjectMapper();
                mapper.setVisibility(mapper.getSerializationConfig()
                .getDefaultVisibilityChecker()
                .withFieldVisibility(JsonAutoDetect.Visibility.ANY)
                .withGetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withSetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
		return mapper;
	}
}

Here is the output:

{"city":"Hyderabad","state":"Telangana"}
Address [city=Hyderabad, state=Telangana]

References

 

zookeeper

Running Multiple Zookeeper Instances On Single Windows Machine

This is indeed correct with Zookeeper Runner

Running Zookeeper in windows in made so easy that even grandma can do it.

 Step 1: copy three instances of ZookeeperRunner on your local drive

I have copied it as instance1, instance2 and instance3

zookeeper-multi-instances

Step 2: copy required zookeeper jar files to [RUNNER_HOME]/lib

Note : you have to do it for all the instances.

zookeeper-runner-jars

Step 3: update the zoo config files

Note: you have to do it for all the instances.

client ports

instance1= 2181

instance2=2182

instance3=2183

zookeeper-runner-zoo-config

Step 4: update the wrapper config

zookeeper-runner-wrapper-config

Step 5 : update the myid files

zookeeper-runner-myid

Step 6 : start Zookeeper instances

Note : you have to do it for all the instances.

zookeeper-runner-start

Here are the running instances.

zookeeper-runner-instance1

zookeeper-runner-instance2

zookeeper-runner-instance3

 

Multi Node Distributed Execution Using Infinispan and Dexecutor

We will try to execute Dexecutor in a distributed mode using Infinispan. For the demo we would be setting up multiple infinispan nodes on single machine.

Refer Introducing Dexecutor, to get an introduction on Dexecutor  and to understand the problem we would solve in a distribute fashion. In short:

We would be distributing the execution of dexecutor tasks on Infinispan nodes in a single machine.

To do that one of the nodes would act as master and submit the tasks to DistributedExecutorService to be executed by other infinispan worker nodes.

Step 1: Add dexecutor-infinispan dependency

 


<dependency>
  <groupId>com.github.dexecutor</groupId>
  <artifactId>dexecutor-core</artifactId>
 <version>1.0.2</version>
</dependency>

 

Step 2: Add the default jgroups.xml

Step 3: Create the CacheManager

private DefaultCacheManager createCacheManagerProgrammatically(final String nodeName, final String cacheName) {
	DefaultCacheManager cacheManager = new DefaultCacheManager(globalConfiguration(nodeName), defaultConfiguration());
	cacheManager.defineConfiguration(cacheName, cacheConfiguration());
	return cacheManager;
}

private GlobalConfiguration globalConfiguration(String nodeName) {
	return GlobalConfigurationBuilder
				.defaultClusteredBuilder()
				.transport()
				.nodeName(nodeName)
				.addProperty("configurationFile", "jgroups.xml")
				.build();
}

private Configuration defaultConfiguration() {
	return new ConfigurationBuilder()
				.clustering()
				.cacheMode(CacheMode.REPL_SYNC)
				.build();
}

private Configuration cacheConfiguration() {
	return new ConfigurationBuilder()
				.clustering()
				.cacheMode(CacheMode.DIST_SYNC)
				.hash()
				.numOwners(2)
				.build();
}

Step 4 : Create Dexecutor instance using InfinispanExecutionEngine


EmbeddedCacheManager cacheManager = createCacheManagerProgrammatically(nodeName, cacheName);
final Cache<String, String> cache = cacheManager.getCache(cacheName);
DefaultExecutorService distributedExecutorService = new DefaultExecutorService(cache);
DefaultDependentTasksExecutor<Integer, Integer> dexecutor = newTaskExecutor(distributedExecutorService);

private DefaultDependentTasksExecutor<Integer, Integer> newTaskExecutor(final DistributedExecutorService executorService) {
	return new DefaultDependentTasksExecutor<Integer, Integer>(taskExecutorConfig(executorService));
}

private DependentTasksExecutorConfig<Integer, Integer> taskExecutorConfig(final DistributedExecutorService executorService) {
	return new DependentTasksExecutorConfig<Integer, Integer>(executionEngine(executorService), new SleepyTaskProvider());
}

private InfinispanExecutionEngine<Integer, Integer> executionEngine(final DistributedExecutorService executorService) {
	return new InfinispanExecutionEngine<Integer, Integer>(executorService);
}

Step 5: Only master should create tasks

if (isMaster) {
	DefaultExecutorService distributedExecutorService = new DefaultExecutorService(cache);
	DefaultDependentTasksExecutor<Integer, Integer> dexecutor = newTaskExecutor(distributedExecutorService);

	buildGraph(dexecutor);
	dexecutor.execute(ExecutionBehavior.TERMINATING);
}

Refer the full code here

Step 4: Run the Application

Terminal #1 : run as worker

mvn test-compile exec:java -Djava.net.preferIPv4Stack=true -Dexec.mainClass="com.github.dexecutor.infinispan.Node" -Dexec.classpathScope="test" -Dexec.args="s node-A"

Terminal #2: run as worker

mvn test-compile exec:java -Djava.net.preferIPv4Stack=true -Dexec.mainClass="com.github.dexecutor.infinispan.Node" -Dexec.classpathScope="test" -Dexec.args="s node-B"

 

Terminal #3 : run as master

mvn test-compile exec:java  -Dexec.classpathScope="test" -Djava.net.preferIPv4Stack=true -Dexec.mainClass="com.github.dexecutor.infinispan.Node" -Dexec.args="m node-C"

Here is the output:

dexecutor-multi-node-infinispan-single-machine

References