| import org.apache.kafka.streams.kstream.KGroupedStream | |
| import org.apache.kafka.streams.kstream.KStream | |
| import org.springframework.stereotype.Component | |
| import org.springframework.cloud.stream.annotation.StreamListener | |
| @Component | |
| class CurrencyProcessing { | |
| @StreamListener | |
| fun processCurrency(input: KStream<String, Double>) { | |
| val groupByKey: KGroupedStream<String, Double> = input.groupByKey() | |
| val countKTable = groupByKey.count() | |
| val sumKTable = groupByKey.reduce { value1, value2 -> value1 + value2 } | |
| val avgRate = sumKTable.join(countKTable) { sum, count -> sum / count } | |
| avgRate.toStream().to("avg-rates") | |
| } | |
| } |