Describe the issue
After upgrading spring-cloud-stream with the Apache Kafka binder to 5.0.2, we encounter the following exception for output-only bindings:
2026-06-18T10:47:17.816+02:00 ERROR 34446 --- [demo] [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:441)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$1(AbstractPollingEndpoint.java:357)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:64)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:88)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:62)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$0(AbstractPollingEndpoint.java:350)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:94)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:545)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:328)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:309)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1090)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:614)
at java.base/java.lang.Thread.run(Thread.java:1474)
Caused by: java.lang.ClassCastException: class org.springframework.cloud.stream.messaging.DirectWithAttributesChannel$$SpringCGLIB$$0 cannot be cast to class java.util.function.Supplier (org.springframework.cloud.stream.messaging.DirectWithAttributesChannel$$SpringCGLIB$$0 is in unnamed module of loader 'app'; java.util.function.Supplier is in module java.base of loader 'bootstrap')
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:826)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:656)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.get(SimpleFunctionRegistry.java:664)
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.get(PartitionAwareFunctionWrapper.java:111)
at org.springframework.integration.dsl.IntegrationFlow$1.doReceive(IntegrationFlow.java:256)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:139)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:255)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:459)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:421)
... 13 more
While analyzing the Issue with AI we came across f042adb, which references issue #3166. There the isFunctionExist() guard was commented out:
// 5.0.1
return this.isFunctionExist() ? this.functionDefinition : null;
// 5.0.2
//return this.isFunctionExist() ? this.functionDefinition : null;
return this.functionDefinition;
That non-null name now reaches the supplier-binding initializer, which is not guarded by isFunctionExist() :
// FunctionConfiguration.supplierInitializer(), ~line 193
FunctionInvocationWrapper functionWrapper =
functionCatalog.lookup(proxyFactory.getFunctionDefinition()); // lookup("testOutput")
if (functionWrapper != null && functionWrapper.isSupplier()) {
... build polling IntegrationFlow.fromSupplier(...) ...
}
The guard passes, a SourcePollingChannelAdapter is built, and the first poll runs ((Supplier) channel).get()
(SimpleFunctionRegistry:826) → ClassCastException.
To Reproduce
Steps to reproduce the behavior:
- Setup simple demo project with spring-cloud-stream 5.0.2 and Apache Kafka Binder
- Configure output only binding
spring:
cloud:
stream:
output-bindings: testOutput
bindings:
testOutput:
destination: test-topic
content-type: application/json
binders:
kafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: >
localhost:9092
auto-create-topics: true
configuration:
security.protocol: PLAINTEXT
- Start the application repeatedly. On some starts it boots normally; on most others it throws the ClassCastException
Version of the framework
5.0.2
Expected behavior
An output-only binding that has no backing Supplier/Function/Consumer bean must never be resolved to a function or treated as a Supplier. No SourcePollingChannelAdapter should be created for it — the binding should remain a plain output destination driven by StreamBridge, exactly as in 5.0.1. No ClassCastException should occur.
Describe the issue
After upgrading spring-cloud-stream with the Apache Kafka binder to 5.0.2, we encounter the following exception for output-only bindings:
While analyzing the Issue with AI we came across f042adb, which references issue #3166. There the
isFunctionExist()guard was commented out:That non-null name now reaches the supplier-binding initializer, which is not guarded by isFunctionExist() :
The guard passes, a
SourcePollingChannelAdapteris built, and the first poll runs((Supplier) channel).get()(
SimpleFunctionRegistry:826) →ClassCastException.To Reproduce
Steps to reproduce the behavior:
Version of the framework
5.0.2
Expected behavior
An output-only binding that has no backing
Supplier/Function/Consumerbean must never be resolved to a function or treated as aSupplier. No SourcePollingChannelAdapter should be created for it — the binding should remain a plain output destination driven by StreamBridge, exactly as in 5.0.1. NoClassCastExceptionshould occur.