Oracle Service Bus, Implementing Aggregator pattern by use of Split-Join

One of the patterns mentioned on the eai patterns site is the so called Aggregator pattern.

“Use a stateful filter, an Aggregator, to collect and store individual messages until a complete set of related messages has been received. Then, the Aggregator publishes a single message distilled from the individual messages”

We will be using this pattern in our next use case.

Use case

On the Oracle Service Bus we will implement the EmployeeService with an operation called ‘getEmployee’.
When we call this operation and supply an input parameters of the EmployeeNumber, we want to receive the next set of information

  • EmployeeDetails
  • AddressDetails
  • ContactDetails

In a real life scenario we could have the next situation

On the Oracle Service Bus we want to supply our clients 1 interface to be able to retrieve Employee information. The interface definition will define Employee details, addresses and contact details.

The process will receive the Employee Number, and by use of parallel invocation it will :

  • send a request to system1 which will supply us the Employee details
  • send a request to system2 which will suppy us the Employee addresses
  • send a request to system3 which will supply us the Employee contact details

Every system invocation will be a synchronous call and will get a response back from every one of them.
After having received the last of the 3 responses the parallel invocation is done. In the process flow we now need to aggregate the responses and transform these into one response which will get routed back to the client caller.

The client caller will not know what backend systems will get invoked to retrieve data. It will only receive one message response according to our defined interface on the service bus.

Enough talk, let’s see what we need in the Oracle Service Bus to implement this.

Backend systems

In a real world situation we could have 3 different backend systems which will all supply diffent parts of the Employee data object. For our case we will use just 1 webservice with different operations which will get invoked as different services.
Create a new Webservice project in Eclipse and add a new webservice to see.

package nl.xenta.services;

import nl.xenta.entities.Address;
import nl.xenta.entities.ContactDetail;
import nl.xenta.entities.Employee;

public interface EmployeeService {

	public Employee getEmployeeDetails(String employeeNumber);

	public Address getEmployeeAddress(String employeeNumber);

	public ContactDetail getContactDetail(String employeeNumber);

}
package nl.xenta.services;

import javax.jws.*;

import nl.xenta.entities.Address;
import nl.xenta.entities.Employee;
import nl.xenta.entities.ContactDetail;

@WebService
public class EmployeeServiceImpl implements EmployeeService {

	/* (non-Javadoc)
	 * @see nl.xenta.services.EmployeeService#getEmployeeDetails(java.lang.String)
	 */
	@WebMethod
	public Employee getEmployeeDetails(@WebParam(name="employeeNumber") String employeeNumber) {
		Employee emp = new Employee();
		emp.setFirstname("Eric");
		emp.setLastname("Elzinga");
		emp.setId(100);
		
		return emp;
	}
	
	/* (non-Javadoc)
	 * @see nl.xenta.services.EmployeeService#getEmployeeAddress(java.lang.String)
	 */
	@WebMethod
	public Address getEmployeeAddress(@WebParam(name="employeeNumber") String employeeNumber) {
		Address address = new Address();
		address.setCity("Utrecht");
		address.setCountry("The Netherlands");
		address.setPostalCode("1413JK");
		
		return address;
	}
	
	/* (non-Javadoc)
	 * @see nl.xenta.services.EmployeeService#getContactDetail(java.lang.String)
	 */
	@WebMethod
	public ContactDetail getContactDetail(@WebParam(name="employeeNumber") String employeeNumber) {		
		ContactDetail contact = new ContactDetail();
		contact.setType("mobile");
		contact.setValue("+31(6)24562772");		
				
		return contact;		
	}	
}

package nl.xenta.entities;

public class Address {
	
	private String street;
	private String postalCode;
	private String city;
	private String country;
	
	public String getStreet() {
		return street;
	}
	public void setStreet(String street) {
		this.street = street;
	}
	public String getPostalCode() {
		return postalCode;
	}
	public void setPostalCode(String postalCode) {
		this.postalCode = postalCode;
	}
	public String getCity() {
		return city;
	}
	public void setCity(String city) {
		this.city = city;
	}
	public String getCountry() {
		return country;
	}
	public void setCountry(String country) {
		this.country = country;
	}
}
package nl.xenta.entities;

public class ContactDetail {
	
	String type;
	String value;
	public String getType() {
		return type;
	}
	public void setType(String type) {
		this.type = type;
	}
	public String getValue() {
		return value;
	}
	public void setValue(String value) {
		this.value = value;
	}
}
package nl.xenta.entities;

public class Employee {
	
	private int id;
	private String firstname;
	private String lastname;
	
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getFirstname() {
		return firstname;
	}
	public void setFirstname(String firstname) {
		this.firstname = firstname;
	}
	public String getLastname() {
		return lastname;
	}
	public void setLastname(String lastname) {
		this.lastname = lastname;
	}
}

Deploy the service
The simulated backend systems are ready to supply data.

Oracle Service Bus resources

We need to next list of resources in the Oracle Service Bus to be able to implement the process.

  • wsdl + xsd of the deployed EmployeeService java webservice
  • business services for every third party service
  • split-join for the aggregation
  • business service representing the split-join
  • proxy service which routes to the split-join business service

Wsdl and xsd

Import the wsdl from the wsdl endpoint shown in the Weblogic Console

Business services for every third party service

Since we have just 1 webservice representing all the third parties we don’t really need to create separate business services for every party. To simulate the real scenario we still create 3 separate business services, all based on the same wsdl, but in the routing we will just use different operations.
So create the next set of business services all based on the same EmployeeService wsdl (which we imported in the step before).

  • AddressDetails.biz
  • ContactDetails.biz
  • EmployeeDetails.biz

Split-join

Create a new wsdl for the interface of the split-join. This wsdl will only represent the getEmployee operation. Name the wsdl CRM_EmployeeService.wsdl.

<!-- Published by JAX-WS RI at http://jax-ws.dev.java.net. RI's version is Oracle JAX-WS 2.1.5. --><!-- Generated by JAX-WS RI at http://jax-ws.dev.java.net. RI's version is Oracle JAX-WS 2.1.5. --><definitions name="EmployeeServiceService" targetNamespace="http://services.xenta.nl/" xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/" xmlns:tns="http://services.xenta.nl/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://schemas.xmlsoap.org/wsdl/">
	<types>
		<xsd:schema>
			<xsd:import namespace="http://services.xenta.nl/" schemaLocation="EmployeeService.xsd"/>
		</xsd:schema>
	</types>
	<message name="getEmployee">
		<part name="parameters" element="tns:getEmployee"/>
	</message>
	<message name="getEmployeeResponse">
		<part name="parameters" element="tns:getEmployeeResponse"/>
	</message>
	<portType name="CRM_EmployeeService">
		<operation name="getEmployee">
			<input message="tns:getEmployee"/>
			<output message="tns:getEmployeeResponse"/>
		</operation>
	</portType>
	<binding name="EmployeeServicePortBinding" type="tns:CRM_EmployeeService">
		<soap:binding style="document" transport="http://schemas.xmlsoap.org/soap/http"/>
		<operation name="getEmployee">
			<soap:operation soapAction=""/>
			<input>
				<soap:body use="literal"/>
			</input>
			<output>
				<soap:body use="literal"/>
			</output>
		</operation>
	</binding>
	<service name="EmployeeServiceService">
		<port name="EmployeeServicePort" binding="tns:EmployeeServicePortBinding">
			<soap:address location="http://192.168.178.28:7001/EmployeeService/EmployeeServiceService"/>
		</port>
	</service>
</definitions>
<?xml version="1.0" encoding="UTF-8"?>
<!-- Published by JAX-WS RI at http://jax-ws.dev.java.net. RI's version is Oracle JAX-WS 2.1.5. -->
<xs:schema xmlns:tns="http://services.xenta.nl/" xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="http://services.xenta.nl/" version="1.0">
	<xs:element name="getEmployee" type="tns:getEmployee"/>
	<xs:element name="getEmployeeResponse" type="tns:getEmployeeResponse"/>
	<xs:complexType name="getEmployee">
		<xs:sequence>
			<xs:element name="employeeNumber" type="xs:string" minOccurs="0"/>
		</xs:sequence>
	</xs:complexType>
	<xs:complexType name="getEmployeeResponse">
		<xs:sequence>
			<xs:element name="EmployeeDetails" type="tns:employeeDetailType" minOccurs="0"/>
			<xs:element name="AddressDetails" type="tns:contactDetailType" minOccurs="0"/>
			<xs:element name="ContactDetails" type="tns:addressDetailType" minOccurs="0"/>
		</xs:sequence>
	</xs:complexType>
	<xs:complexType name="contactDetailType">
		<xs:sequence>
			<xs:element name="type" type="xs:string" minOccurs="0"/>
			<xs:element name="value" type="xs:string" minOccurs="0"/>
		</xs:sequence>
	</xs:complexType>
	<xs:complexType name="employeeDetailType">
		<xs:sequence>
			<xs:element name="firstname" type="xs:string" minOccurs="0"/>
			<xs:element name="id" type="xs:int"/>
			<xs:element name="lastname" type="xs:string" minOccurs="0"/>
		</xs:sequence>
	</xs:complexType>
	<xs:complexType name="addressDetailType">
		<xs:sequence>
			<xs:element name="city" type="xs:string" minOccurs="0"/>
			<xs:element name="country" type="xs:string" minOccurs="0"/>
			<xs:element name="postalCode" type="xs:string" minOccurs="0"/>
			<xs:element name="street" type="xs:string" minOccurs="0"/>
		</xs:sequence>
	</xs:complexType>
</xs:schema>

Create a new split join (File | New | Split-Join) and name it EmployeeSplitJoin.

By default it will generate for us the variables request and response.

Add a Parallel activity between Receive and Reply.
Add an extra Branch to the Parallel activity.
In every Scope and a Invoke Service activity.
Select the first Invoke Service in the first Scope and configure it.
On the Operation tab, browse to the first business service, AddressDetails.

On the Input Variable tab, select for Message Variable the option Create Message Variable.

Name the variable EmployeeAddressRequest and define it as Global Variable

Do the same for the Output Variable tab and name the variable EmployeeAddressResponse.

In every Scope add an assign before every Invoke Service activity.

With these assigns we’re mapping the inputparameter of our process (employeeNumber) to the input of our business service (backend service).

Repeat these steps (add scope, add assign, add invoke service, configure input/output of invoke service) for every invocation of our business services.
We should end up with the next set of branches.

The parallel invocation of our backend services is ready. Now we need to aggregate the responses of all the services. Create a new xquery transformation which will receive 3 inputparameters, one for every response of our backend service. The result of the transformation will be a message according to our proxy service interface (getEmployee).


After the Parallel activity add an Assign. Click the expression and go to the XQuery Resources tab and browse to the ServicesToAggregatedResponse xquery. Bind all the input parameters as defined in the image below.

For the Variable select response.parameters.

Our split-join part of the process is done. The process flow should look a bit like this :

To be able the call the split-join flow we need to generate a business out of it. Right click on the EmployeeSplitJoin.flow and go to File | Oracle Service Bus | Generate Business Service, name it EmployeeSplitJoin.

ProxyService

Create a new http soap proxy service based on the CRM_EmployeeService wsdl. In the proxyservice route to the ‘EmployeeSplitJoin’ business service. Since the interface of our split-join is the same as our proxy service (same wsdl) we can just use the passthrough, no transformation is needed.
Deploy the project and and test the service

Conclusion

Instead of using the split-join we can also use several service call outs in our process. Downside on this approach is all the invokes will be waiting on each other to come back with response (sequential). With the split-join approach we can parallel invoke all the services an aggregate the results. In case of services which need to give a low response time and still need to retrieve data from multiple backend systems we can’t model the flow with sequential invocations, split-join would then be an approach to take.

Besides that static split-join i used for my case we can also use the dynamic apporach in which we loop over reoccuring elements in the payload, process them parallel an aggregate the results.
Edwin showed this approach in his blog.

download : OracleServiceBusAggregator

Share this Post:
Digg Google Bookmarks reddit Mixx StumbleUpon Technorati Yahoo! Buzz DesignFloat Delicious BlinkList Furl

4 Responses to “Oracle Service Bus, Implementing Aggregator pattern by use of Split-Join”

Lucas Jellema | July 3rd, 2011 at 4:38 pm
commenter

Very clear explantion of the split-join and why you may want to use this parallel processing of independent operations whose results need to be aggregated together. Thanks Eric, this is very helpful.

Lucas

commenter

Hi Eric,Nice article,it’s veryful for me.Can you Please send the project jar file or artifacts.

Thanks
Mani

commenter

Do you have an example on Dynamic Split join. This was an excellent example.

Leave a Reply:

Name (required):
Mail (will not be published) (required):
Website:
Comment (required):
XHTML: You can use these tags: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>