|
import org.apache.kafka.streams.kstream.KGroupedStream |
|
import org.apache.kafka.streams.kstream.KStream |
|
import org.springframework.stereotype.Component |
|
import org.springframework.cloud.stream.annotation.StreamListener |
|
|
|
@Component |
|
class CurrencyProcessing { |
|
|
|
@StreamListener |
|
fun processCurrency(input: KStream<String, Double>) { |
|
|
|
val groupByKey: KGroupedStream<String, Double> = input.groupByKey() |
|
|
|
val countKTable = groupByKey.count() |
|
val sumKTable = groupByKey.reduce { value1, value2 -> value1 + value2 } |
|
|
|
val avgRate = sumKTable.join(countKTable) { sum, count -> sum / count } |
|
avgRate.toStream().to("avg-rates") |
|
} |
|
} |