Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
736 views
in Technique[技术] by (71.8m points)

session - spring integration | See threads stuck in RUNNABLE for a long time when connecting via sftp inbound adapters

We are using spring integration dynamic sftp flows to ingest sftp files . The flow java config looks like below

   from(Sftp.inboundAdapter(cachingSessionFactory, (a, b) -> Long.valueOf(a.lastModified())
            .compareTo(b.lastModified()))//
            .preserveTimestamp(true)//
            .remoteDirectory(job.getRemoteDirectory())//
            .deleteRemoteFiles(job.getDeleteRemoteFiles())//
            .filter(this.compositeRemoteFilter(job))//
            .autoCreateLocalDirectory(true)//
            .preserveTimestamp(true)//
            .maxFetchSize(maxMessagesPerPoll)
            .localFilter(new LocalFileFilter(job))//
            .localDirectory(localDirectory)),
            e -> e.id("testComponent")
                  .autoStartup(false)//
                  .poller(Pollers.cron(job.getPollingFreq(), job.timeZone())//
                        .maxMessagesPerPoll(maxMessagesPerPoll)
                        .receiveTimeout(1000L)    
                        .handle(UploadHandler)

The caching session factory is something we get dynamically via using a delegate . Most of it works fine but sometimes after running for days we observe some threads stuck in RUNNABLE . Our assumption was if the jsch session was stuck in any way it should eventually come out as we have timeouts both at the session factory level and at the poller .

The dump for the thread looks something like this


java.io.FileInputStream.readBytes(Native Method)java.io.FileInputStream.read(FileInputStream.java:255)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read(BufferedInputStream.java:265)
com.jcraft.jsch.IO.getByte(IO.java:73)
com.jcraft.jsch.Session.connect(Session.java:263)
com.jcraft.jsch.Session.connect(Session.java:183)
org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:268)
org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:390)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:44)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:15)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:84)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:81)
org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:195)
org.springframework.integration.util.SimplePool.getItem(SimplePool.java:176)
org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:135)
custom.integration.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:80)
custom.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:67)
org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:308)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:258)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:64)
org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:160)org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:360)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1934/1648215776.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$2062/2127922639.run(Unknown Source)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
org.springframework.integration.util.ErrorHandlingTaskExecutor$$Lambda$2063/1949167295.run(Unknown Source)
org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1935/1382748208.run(Unknown Source)org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:67)
org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

Please help if we are missing anything here or if there is some configuration we can do on the si side to fix this . SI version 5.1.13

Another heap dump trace of thread

"Name","Retained Size","Shallow Size","Level"
"java.lang.Thread [Thread, Stack Local] ""my-taskScheduler-42"" tid=348 [RUNNABLE]","54768","120","1"
"contextClassLoader  org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","2"
"<local variable>  com.jcraft.jsch.Session [Stack Local]","21232","256","2"
"threadLocals  java.lang.ThreadLocal$ThreadLocalMap","15896","24","2"
"<local variable>  java.lang.UNIXProcess$ProcessPipeInputStream [Monitor Used, Stack Local]","8264","40","2"
"<local variable>  org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource [Stack Local]","4784","96","2"
"<local variable>  org.springframework.integration.endpoint.SourcePollingChannelAdapter [Stack Local]","2608","176","2"
"<local variable>  java.util.concurrent.ScheduledThreadPoolExecutor [Stack Local]","2392","80","2"
"<local variable>  org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer [Stack Local]","2168","64","2"
"<local variable>  org.springframework.integration.file.remote.RemoteFileTemplate [Stack Local]","824","64","2"
"<local variable>  org.springframework.integration.util.SimplePool [Stack Local]","744","56","2"
"group  java.lang.ThreadGroup","656","48","2"
"<local variable>  custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","2"
"<local variable>  org.springframework.scheduling.concurrent.ReschedulingRunnable [Stack Local]","232","48","2"
"inheritableThreadLocals  java.lang.ThreadLocal$ThreadLocalMap","104","24","2"
"inheritedAccessControlContext  java.security.AccessControlContext","88","40","2"
"name  java.lang.String ""my-taskScheduler-42""","80","24","2"
"<local variable>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask [Stack Local]","72","72","2"
"<local variable>  org.springframework.integration.sftp.session.SftpSession [Stack Local]","56","32","2"
"<local variable>  java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"target  java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"<local variable>  org.springframework.integration.util.ErrorHandlingTaskExecutor [Stack Local]","40","24","2"
"<local variable>  org.springframework.integration.sftp.session.JSchSessionWrapper [Stack Local]","40","24","2"
"<local variable>  java.io.FileDescriptor [JNI Local]","32","32","2"
"<local variable>  org.springframework.integration.file.remote.session.CachingSessionFactory [Stack Local]","32","32","2"
"pool  org.springframework.integration.util.SimplePool [Stack Local]","744","56","3"
"sessionFactory  custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","3"
"jsch  com.jcraft.jsch.JSch","736","32","4"
"proxy  custom.adapters.session.SftpProxyCommand","328","32","4"
"sessionConfig  java.util.Properties size = 2","176","48","4"
"sharedSessionLock  java.util.concurrent.locks.ReentrantReadWriteLock","120","24","4"
"host  java.lang.String ""sftp.server""","80","24","4"
"host  java.lang.String ""sftp.server""","80","24","4"
"<class>  custom.adapters.session.LogEnabledSftpSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","64","64","4"
"password  java.lang.String ""@#@#@#@#""","64","24","4"
"user  java.lang.String ""user""","64","24","4"
"user  java.lang.String ""user""","64","24","4"
"enableDaemonThread  java.lang.Boolean = false","16","16","4"
"serverAliveCountMax  java.lang.Integer = 4  0x00000004","16","16","4"
"serverAliveInterval  java.lang.Integer = 240,000  0x0003A980","16","16","4"
"timeout  java.lang.Integer = 120,000  0x0001D4C0","16","16","4"
"userInfoWrapper  org.springframework.integration.sftp.session.DefaultSftpSessionFactory$UserInfoWrapper&qu

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

Please log in or register to reply this article.

OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...