import org.apache.kafka.streams.kstream.KGroupedStream
import org.apache.kafka.streams.kstream.KStream
import org.springframework.stereotype.Component
import org.springframework.cloud.stream.annotation.StreamListener
@Component
class CurrencyProcessing {
@StreamListener
fun processCurrency(input: KStream<String, Double>) {
val groupByKey: KGroupedStream<String, Double> = input.groupByKey()
val countKTable = groupByKey.count()
val sumKTable = groupByKey.reduce { value1, value2 -> value1 + value2 }
val avgRate = sumKTable.join(countKTable) { sum, count -> sum / count }
avgRate.toStream().to("avg-rates")
}
}