-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathQueryTransformation.scala
68 lines (57 loc) · 2.38 KB
/
QueryTransformation.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package io.sqooba.oss.chronosExamples
import zio._
import zio.test._
import java.time.Instant
import zio.test.Assertion._
import scala.concurrent.duration._
import io.sqooba.oss.utils.Utils._
import io.sqooba.oss.utils.ChronosRunnable
import io.sqooba.oss.chronos.{Chronos, ChronosEntityId, Query}
import io.sqooba.oss.timeseries.entity.TsLabel
import io.sqooba.oss.timeseries.immutable.EmptyTimeSeries
import io.sqooba.oss.timeseries.TimeSeries
import org.junit.runner.RunWith
@RunWith(classOf[zio.test.junit.ZTestJUnitRunner])
class QueryTransformation extends ChronosRunnable {
val spec: ChronosRunnable = suite("VictoriaMetrics Integration")(
testM("Grouping and transforming a query should add a new timeseries") {
val start = Instant.parse("2020-12-12T00:00:00.000Z")
val end = start.plusSeconds(5.minutes.toSeconds)
val label = TsLabel("cpu")
val step = 10.seconds
final case class Workstation(id: Long) extends ChronosEntityId {
override def tags: Map[String, String] =
Map("type" -> "workstation", "id" -> id.toString)
}
final case class Room(name: String, workstations: Seq[Workstation]) extends ChronosEntityId {
override def tags: Map[String, String] =
Map("type" -> "room", "name" -> name)
}
val office = Room("office", (1L to 10).map(Workstation.apply))
val workStationQueries = office.workstations
.map(_.buildTsId(label))
.map(tsId => Query.fromTsId(tsId, start, end, step = Some(step)))
val groupedQueries = Query.group(workStationQueries)
val transformedQueries = groupedQueries.transform(
office.buildTsId(label),
start,
end,
step = step
) {
case (ir, _) =>
office.workstations
.map(_.buildTsId(label))
.map(tsid => ir.getByTsId(tsid))
.collect { case Some(ts) => ts }
.foldLeft(EmptyTimeSeries: TimeSeries[Double])(
_.plus(_, strict = false)
)
}
val fakeDataPoints = ZIO.foreachPar_(office.workstations)(ws => insertFakePercentage(start, end, Map("__name__" -> label.value) ++ ws.tags, step))
val queries = fakeDataPoints *> Chronos.query(query = transformedQueries)
for {
result <- queries
} yield assert(result.map)(isNonEmpty) && assert(result.getByTsId(office.buildTsId(label)))(isSome)
}
)
}