For a customer I was thinking about a solution to loosely couple some systems to be integrated with a content management system, hippo cms in this case. I remembered I had a demo for a few times by Iwein Fuld and by Roel Derksen. A few weeks a go, Allard showed a nice demo with spring integration and sending an email. Based on these experiences I think that spring integration is a very nice product to solve my problem.

In this post I am showing the steps to create an application that reads an rss feed, transforms the data into our domain model, send the message to a business service. Of course I use spring integration in this step by step tutorial.

Of course the code is available online, check the references.

Very basic spring integration introduction

Giving a spring-integration introduction is very hard. You need to understand some basic concepts, the best way is to read the reference manual. With a bit of springframework knowledge, it should not be to hard. For now I focus on the elements I use in the sample.

  • Message – used to send data
  • Channel – Used to transport messages
  • ChannelAdapter – Used to put data from some external source on a channel (file, jms, rss)
  • Transformer – Used to transform one message into another to loosely couple sender and receiver
  • Service activator – Used to call a business service with contents from a channel

Those are the most important components, of course you need some code in a project, some spring configuration and to make life easier maven configuration.

Maven configuration

To make life easier, we use maven to get the dependencies. I needed three type of dependencies: spring-integration, logging and Rome for rss. You might need some additional repositories. I suggest to use a maven repository like artifactory to prevent this repetitive work.

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>${spring.integration.version}</version>
</dependency>
<dependency>
    <groupId>rome</groupId>
    <artifactId>rome</artifactId>
    <version>1.0</version>
</dependency>
<dependency>
    <groupId>rome</groupId>
    <artifactId>rome-fetcher</artifactId>
    <version>1.0</version>
</dependency>

As for the repositories, this is the one you might need.

http://download.java.net/maven/2

The Bootstrapper

To be able to run the integration component, we use a bootstrapper. This bootstrapper initializes the spring application context and waits for input so it can keep running. The following code block shows the main method.

public class Bootstrap {
    public static void main(String[] args) throws IOException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                new String[]{"integration-config.xml"});
        new Scanner(System.in).nextLine();
    }    
}

Step 1 – getting to know spring integration

In this post I am going to expand the sample step by step. This is the first step. In this step we are going to configure an inbound-channel-adapter, that needs to read an rss feed. Reading the feed is not implemented in this step. We just return a Message. The channel-adapter polls the feed reader every 5 seconds. The results are placed in the input channel. To be able to read from the input channel, we use a service-activator. This service activator only logs the message at the moment. The spring configuration now looks like the following code block.

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:si="http://www.springframework.org/schema/integration"
       xmlns:util="http://www.springframework.org/schema/util"
       xmlns:tool="http://www.springframework.org/schema/tool"
       xmlns:lang="http://www.springframework.org/schema/lang"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
           http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd
           http://www.springframework.org/schema/tool http://www.springframework.org/schema/tool/spring-tool-2.5.xsd
           http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.5.xsd">
    <si:channel id="inputRssFeedChannel"/>
    <si:inbound-channel-adapter channel="inputRssFeedChannel" ref="rssReader">
        <si:poller max-messages-per-poll="1">
            <si:interval-trigger interval="5" time-unit="SECONDS"/>
        </si:poller>
    </si:inbound-channel-adapter>
    <si:service-activator input-channel="inputRssFeedChannel" ref="rssFeedMessageHandler"/>
    <!-- Other beans-->
    <bean id="rssReader" class="nl.gridshore.samples.si.RssReader"/>
    <bean id="rssFeedMessageHandler" class="nl.gridshore.samples.si.RssFeedMessageHandler"/>
</beans>

Some things to notice from this source code are:

  • The inbound-channel-adapter uses poller with an interval-trigger, there are cron possibilities as well. Refer to the spring integration documentation for more info.
  • The rssReader is implementing the spring MessageSource interface, this is not necessary. You can use every bean, you do need to provide the name of the method in such a case.

The important parts of the two beans that are used are in the next code block.

public class RssReader implements MessageSource {
    private static Logger logger = LoggerFactory.getLogger(RssReader.class);
    public Message<String> receive() {
        logger.debug("readRssFeed method is called");
        return MessageBuilder.withPayload("test").setHeader("feedid", "thefeed").build();
    }
}

public class RssFeedMessageHandler {
    private static Logger logger = LoggerFactory.getLogger(RssFeedMessageHandler.class);
    public void handleMessage(Message<String> message) {
        logger.debug("At {} I received a message with feedid {} and payload {}",new String[] {
                new Date(message.getHeaders().getTimestamp()).toString(),
                message.getHeaders().get("feedid",String.class),
                message.getPayload()});
    }
}

The output of the program than is:

INFO  - ClassPathXmlApplicationContext   - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@3bf8bd0d: display name [org.springframework.context.support.ClassPathXmlApplicationContext@3bf8bd0d]; startup date [Sat Mar 28 17:23:43 CET 2009]; root of context hierarchy
INFO  - XmlBeanDefinitionReader          - Loading XML bean definitions from class path resource [integration-config.xml]
INFO  - ClassPathXmlApplicationContext   - Bean factory for application context [org.springframework.context.support.ClassPathXmlApplicationContext@3bf8bd0d]: org.springframework.beans.factory.support.DefaultListableBeanFactory@5d1d20d3
INFO  - DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
INFO  - DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default SimpleTaskScheduler will be created.
INFO  - ThreadPoolTaskExecutor           - Initializing ThreadPoolExecutor
INFO  - MessageMappingMethodInvoker      - Failed to find any valid Message-handling methods with annotation [interface org.springframework.integration.annotation.ServiceActivator] on target class [class nl.gridshore.samples.si.RssFeedMessageHandler]. Method-resolution will be applied to all eligible methods.
INFO  - EventDrivenConsumer              - started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
INFO  - DefaultListableBeanFactory       - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@5d1d20d3: defining beans [org.springframework.integration.internalDefaultConfiguringBeanFactoryPostProcessor,inputRssFeedChannel,org.springframework.integration.scheduling.IntervalTrigger#0,org.springframework.integration.scheduling.PollerMetadata#0,org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0,org.springframework.integration.handler.ServiceActivatingHandler#0,org.springframework.integration.config.ConsumerEndpointFactoryBean#0,rssReader,rssFeedMessageHandler,errorChannel,org.springframework.integration.handler.LoggingHandler#0,org.springframework.integration.endpoint.EventDrivenConsumer#0,org.springframework.integration.channel.MessagePublishingErrorHandler#0,taskScheduler]; root of factory hierarchy
INFO  - SourcePollingChannelAdapter      - started org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0
INFO  - EventDrivenConsumer              - started org.springframework.integration.endpoint.EventDrivenConsumer#0
INFO  - SimpleTaskScheduler              - started org.springframework.integration.scheduling.SimpleTaskScheduler@194d4313
DEBUG - RssReader                        - readRssFeed method is called
DEBUG - RssFeedMessageHandler            - At Sat Mar 28 17:23:44 CET 2009 I received a message with feedid thefeed and payload test
DEBUG - RssReader                        - readRssFeed method is called
DEBUG - RssFeedMessageHandler            - At Sat Mar 28 17:23:49 CET 2009 I received a message with feedid thefeed and payload test
DEBUG - RssReader                        - readRssFeed method is called
DEBUG - RssFeedMessageHandler            - At Sat Mar 28 17:23:54 CET 2009 I received a message with feedid thefeed and payload test
DEBUG - RssReader                        - readRssFeed method is called
DEBUG - RssFeedMessageHandler            - At Sat Mar 28 17:23:59 CET 2009 I received a message with feedid thefeed and payload test
DEBUG - RssReader                        - readRssFeed method is called
DEBUG - RssFeedMessageHandler            - At Sat Mar 28 17:24:04 CET 2009 I received a message with feedid thefeed and payload test

Step 2 – implement the feed reader

Reading rss feeds is not the main and most important part of this post. Still we need it so I will discuss it briefly. Main thing to mention is that I am going to use Rome and Rome fetcher to obtain the feeds. I am used to work with Rome and I like the addition of the caching that the fetching project introduces. The changes so far are not very big. I changed the payload from a string to com.sun.syndication.feed.synd.SyndFeed. Of course I also had to make some changes to the RssReader class. The class now becomes what you can see in the next block of code.

public class RssReader implements MessageSource, InitializingBean {
    private static Logger logger = LoggerFactory.getLogger(RssReader.class);
    private FeedFetcherCache feedInfoCache;
    private FeedFetcher feedFetcher;
    private FetcherListener fetcherListener;

    public Message<SyndFeed> receive() {
        logger.debug("readRssFeed method is called");
        SyndFeed feed = obtainFeedItems();
        return MessageBuilder.withPayload(feed)
                .setHeader("feedid", "gridshore").build();

    }

    private SyndFeed obtainFeedItems() {
        SyndFeed feed = null;
        try {
            feed = feedFetcher.retrieveFeed(new URL("https://www.gridshore.nl/feed/"));
        } catch (IOException e) {
            logger.error("IO Problem while retrieving feed", e);
        } catch (FeedException e) {
            logger.error("Feed Problem while retrieving feed", e);
        } catch (FetcherException e) {
            logger.error("Fetcher Problem while retrieving feed", e);
        }
        return feed;
    }

    public void afterPropertiesSet() throws Exception {
        feedInfoCache = HashMapFeedInfoCache.getInstance();
        feedFetcher = new HttpURLFeedFetcher(feedInfoCache);
        if (fetcherListener != null) {
            feedFetcher.addFetcherEventListener(fetcherListener);
        }
    }

    public void setFetcherListener(FetcherListener fetcherListener) {
        this.fetcherListener = fetcherListener;
    }
}

Step 3 – Add a transformer

At the moment we have a message handler, that is a service-activtor. This message handler takes the message with the SyndFeed as a Payload. We want a more generic handler that takes a NewsItem as input. Therefore we need a transformer that takes the SyndFeed and transforms it into our own NewsItem. The SyndFeed does not have the most obvious structure, a java 5 version with generics would be a lot easier. But when you get used to it, it is not that bad and you can request all sorts of data from it. Let’s start with the xml configuration. We need to add a channel, change the input channel for the service-activator and of course add the transformer.

    <si:channel id="inputRssFeedChannel"/>
    <si:channel id="inputNewsItemChannel"/>

    <si:inbound-channel-adapter channel="inputRssFeedChannel" ref="rssReader">
        <si:poller max-messages-per-poll="1">
            <si:interval-trigger interval="10" time-unit="SECONDS"/>
        </si:poller>
    </si:inbound-channel-adapter>

    <si:transformer input-channel="inputRssFeedChannel" output-channel="inputNewsItemChannel"
                    ref="syndFeedNewsItemTransformer"/>

    <si:service-activator input-channel="inputNewsItemChannel" ref="rssFeedMessageHandler"/>

A very basic implementation of the transformer is following. Beware, this is not the best spring bean implementation ever.

public class SyndFeedToNewsItemTransformer {
    private Logger logger = LoggerFactory.getLogger(SyndFeedToNewsItemTransformer.class);

    public Message<List<NewsItem>> transform(Message<SyndFeed> syndFeedMessage) {
        logger.debug("Received a feed from the blog {}",syndFeedMessage.getPayload().getTitle());

        SyndFeed syndFeed = syndFeedMessage.getPayload();

        List<NewsItem> newsItems = new ArrayList<NewsItem>();
        List syndFeedItems = syndFeed.getEntries();
        for (Object syndFeedEntry:syndFeedItems) {
            SyndEntry syndEntry = (SyndEntry)syndFeedEntry;
            String title = syndEntry.getTitle();
            String author = syndEntry.getAuthor();
            String description = syndEntry.getDescription().getValue();
            SyndContent syndContent = (SyndContent)syndEntry.getContents().get(0);
            String content = syndContent.getValue();
            // a lot of other information is possible
            newsItems.add(new NewsItem(title,description,content));
        }
        Message<List<NewsItem>> newMessage = MessageBuilder.withPayload(newsItems)
                .copyHeaders(syndFeedMessage.getHeaders()).build();
        return newMessage;
    }
}

Conclusion

We now have a basic rss feed reader that is completely decoupled from the business service that actually stores the results of the feed. Of course this is not finished, you need to make up you mind about the information from the feed to store. You need to make the configuration of the feed more flexible. You also need to handle the updates of the feed. I did show the event mechanism of the Rome framework.

I hope this post helped in getting more understanding of the spring integration framework.

References

  • http://static.springframework.org/spring-integration/
  • https://rome.dev.java.net/
  • http://code.google.com/p/gridshore/source/browse/#svn/trunk/springintegration
Using spring integration for rss reading
Tagged on: