Dec 20, 2017

Dynamic SFTP Connection Factory for Spring Integration

I recently had the opportunity to dive into a Spring Integration project that presented an interesting challenge: the creation of a outbound SFTP Connection Factory at runtime based on Spring Batch jobs.

In a simple outbound SFTP integration, configuration for Spring Integration is quite succinct. You simply need to define the MessageChanel that the SFTP adapter will listen to, the SessionFactory containing the SFTP details, and the IntegrationFlow that defines the path of the message to be processed. Below is an example of a simple Integration flow for a single, static SFTP outbound connection:

@Bean
MessageChannel outboundSftpChannel(){
    new DirectChannel()
}	
 
@Bean
SessionFactory sftpSessionFactory(){
    def sessionFactory = new DefaultSftpSessionFactory(false)
    sessionFactory.host = 'host'
    sessionFactory.port = 1234
    sessionFactory.user = 'user'
    // ...
 
    sessionFactory
}
 
@Bean
IntegrationFlow sftpOutboundFlow(SessionFactory sftpSessionFactory){
    IntegrationFlows.from('outboundSftpChannel')
        .handle(Sftp.outboundAdapter(sftpSessionFactory)
        .remoteDirectory('/tmp/' ))
        .get()
}

This works well for a single SFTP connection, but what if we wanted to define multiple connections, each with a unique host/username? Well, Spring Integration provides the DelegatingSessionFactory class to do that.

The DelegatingSessionFactory contains a SessionFactoryLocator that finds the correct SessionFactory based on a ThreadKey that is set when the message is being written to the MessageChanel. This means you can wire up several SFTP connections or read them from your configuration, place them in their own session factories and have the proper SessionFactory create a SFTP connection for you as the message flows through the defined pipeline. To set that up is also quite simple. With just a few adjustments to our previous configuration, we are able to facilitate the DelegatingSessionFactory.

@Bean
MessageChannel outboundSftpChannel(){
    new DirectChannel()
}	
 
@Bean
DelegatingSessionFactory delegatingSessionFactory(){
	def firstSessionFactory = new DefaultSftpSessionFactory(false)
	firstSessionFactory.host = 'host'
	firstSessionFactory.port = 1234
	//...
 
	def secondSessionFactory = new DefaultSftpSessionFactory(false)
	secondSessionFactory.host = 'hosttwo'
	secondSessionFactory.port = 1234
	//...
 
	def defaultSessionFactory = new DefaultSftpSessionFactory(false)
	defaultSessionFactory.host = 'default'
	defaultSessionFactory.port = 1234
	//...
 
	def sessionFactoryMap = [0:firstSessionFactory, 1: secondSessionFactory]
 
    new DelegatingSessionFactory(sessionFactoryMap, defaultSessionFactory)
}
 
@Bean
IntegrationFlow sftpOutboundFlow(DelegatingSessionFactory delegatingSessionFactory){
    IntegrationFlows.from('outboundSftpChannel')
        .handle(Sftp.outboundAdapter(delegatingSessionFactory)
        .remoteDirectory('/tmp/' ))
        .get()
}

With this configuration, I have specified 3 different connection factories for my SFTP endpoints. As long as we specify the appropriate Thread key, the proper SFTP connection will be initiated and our single SFTP adapter can now handle multiple endpoints. This works well for any process that has a long and stable life, but what if the process that feeds the outboundSftpChannel is more dynamic? What if there is a business use case to be able to add/change/remove SFTP connections at runtime? How can we solve for that?

There are many ways the DelegatingSessionFactory and it’s factory locator can be updated at runtime. The default locator implementation even provides public methods to do so. Add a connection, run the process that invokes this outbound SFTP channel, and you’re done.

I didn’t do that.

The methods for the DefaultSessionFactoryLocator weren’t quite dynamic enough. The SFTP process required the registration of the new connection prior to a message sent down the pipeline for that connection. I hated the thought of having to add more configuration to facilitate what is supposed to be a dynamic process. Why ask the users to remember to set up the new connection in the configuration store (database, file, etc) and have to invoke whatever endpoint would need to be designed to call the appropriate methods? We should let the application control it’s own process.

I injected a custom factory locator functionally similar to the default session factory locator with one key difference: instead of requiring a separate, manual process to register a new session factory with the locator, it would instantiate a new session factory if the right one isn’t in factory storage.

@Component
class ExampleRuntimeSessionFactoryLocator implements SessionFactoryLocator {
 
    private final Map sessionFactoryMap = [:]
 
    @Override
    SessionFactory getSessionFactory(Object key) {
        def sessionFactory = sessionFactoryMap[key]
 
        if (!sessionFactory){
            sessionFactory = generateSessionFactory(key as Long)
            sessionFactoryMap[key] = sessionFactory
        }
 
        sessionFactory
    }
 
    private DefaultSftpSessionFactory generateSessionFactory(Long key){
        new DefaultSftpSessionFactory(
                host: 'host',
                port: 1234,
		//...
        )
    }

And to wire it up in the Spring Integration Configuration, a small change is required to the DelegatingSessionFactory.

@Bean
    DelegatingSessionFactory delegatingSessionFactory(ExampleRuntimeSessionFactoryLocator runtimeSessionFactoryLocator){
        new DelegatingSessionFactory(runtimeSessionFactoryLocator)
    }

And with that, anytime a message is sent on the SFTP outbound channel, the application will automatically wire up any relevant session factories for use.

If you want to see an example project with all of these pieces in place with the addition of looking up the SFTP connection information from a database table, I have uploaded the example code to my GitHub.

About the Author

Chris Tosspon profile.

Chris Tosspon

Sr. Consultant

Chris is a developer passionate about solving complex problems. With his knowledge of cryptography and advanced hardware attacks, he pursued the life of a hardware hacker before turning his attention on software development and engineering.

Chris builds applications using Spring, Java, Groovy, and has experience with most of the popular Java testing frameworks, e.g. Spock, TestNG, JUnit.

One thought on “Dynamic SFTP Connection Factory for Spring Integration

  1. Andy says:

    Great article. It helped me a lot. Earlier I had no idea how to accomplish this but after seeing its a big sign of relief. Keep up the good work. Also I have a question what is the difference between DefaultSftpSessionFactory and DelegatingSessionFactory. I tried researching this but didnot find much help.

    1. I’m glad this post helped you.

      As to your question, the DefaultSftpSessionFactory is for managing sessions of a single SFTP endpoint. So if your application is only required to pull from or send to a single SFTP source, the DefaultSftpSessionFactory would be the logical choice.

      The DelegatingSftpSessionFactory is for managing sessions for multiple SFTP endpoints and seems to be intended for primarily outbound sftp sources. As in the code above, the DelegatingSftpSessionFactory is populated with multiple DefaultSftpSessionFactory instances so that at run-time the application can pick with which sftp server to communicate.

  2. Dora says:

    What a big relief I fell when I found your article! It is amazing. But I have a question to your provided demo. Could I also change “remoteDirectory” at runtime?

    1. Thanks for the comment!

      Spring Integration has many ways to choose which remote directory an outbound sftp connection can write to. To be able to specify which destination folder to write to at runtime, replace the remoteDirectory builder with the remoteDirectoryExpression builder and use a SPeL expression to determine the directory location. This expression is evaluated for each message in the integration flow.

      For example, a simple usage would have one of the steps of the integration provide a value in a specific message header, then, have the remoteDirectoryExpression read the appropriate header. Combined with the examples above, each message would then control both its sftp session and its destination directory.

      1. Dora says:

        Thanks for your advice! I really appreciate it. I’ll try it.

        1. Dora says:

          Hi Chris, your article and your advice give me a lot of help! Recently I’m trying using thread pool to upload the files to sftp server. But it always throws error while getSession(). Do you know why? Is it because of using DelegatingSessionFactory? I also ask my colleagues. But they have no idea. I’m sorry to bother you again.
          “org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.MessagingException: Failed to obtain pooled item; nested exception is java.lang.IllegalStateException: failed to create SFTP Session”

Leave a Reply

Your email address will not be published. Required fields are marked *

Related Blog Posts
Android Development for iOS Developers
Android development has greatly improved since the early days. Maybe you tried it out when Android development was done in Eclipse, emulators were slow and buggy, and Java was the required language. Things have changed […]
Add a custom object to your Liquibase diff
Adding a custom object to your liquibase diff is a pretty simple two step process. Create an implementation of DatabaseObject Create an implementation of SnapshotGenerator In my case I wanted to add tracking of Stored […]
Keeping Secrets Out of Terraform State
There are many instances where you will want to create resources via Terraform with secrets that you just don’t want anyone to see. These could be IAM credentials, certificates, RDS DB credentials, etc. One problem […]
Validating Terraform Plans using Open Policy Agent
When developing infrastructure as code using terraform, it can be difficult to test and validate changes without executing the code against a real environment. The feedback loop between writing a line of code and understanding […]