The flink program executes normally, but cannot write to druid(Source cannot be queried in druid). What is wrong?
version:
<scala.binary.version>2.11</scala.binary.version>
<kafka.version>2.2.0</kafka.version>
<flink.version>1.10.0</flink.version>
flin_tranquility version:
io.druid
tranquility-flink_2.11
0.8.3
apache druid version: 0.19.0
flink code:
event case class:
case class Event(id: Int, name: String, age: Int, ts: Long)
SimpleEventBeamFactory.scala
`class SimpleEventBeamFactory(conf: ConfigureUtil) extends BeamFactory[Event] {
lazy val makeBeam: Beam[Event] = {
// Tranquility uses ZooKeeper (through Curator framework) for coordination.
val curator = CuratorFrameworkFactory.newClient(
conf.getDruidZKHost(),
new BoundedExponentialBackoffRetry(100, 3000, 5)
)
curator.start()
val dimensions = IndexedSeq("id", "name", "age")
val aggregators = Seq(new LongSumAggregatorFactory("ts", "ts_sum"))
val isRollup = true
// Expects simpleEvent.timestamp to return a Joda DateTime object.
DruidBeams
.builder((simpleEvent: Event) => new DateTime(simpleEvent.ts))
.curator(curator)
.discoveryPath(conf.getDruidDiscoveryPath())
.location(DruidLocation.create(conf.getDruidIndexService(), "john_flink"))
.rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularities. MINUTE,isRollup))
.tuning(
ClusteredBeamTuning(
segmentGranularity = Granularity.HOUR,
windowPeriod = new Period("PT10M"),
partitions = 1,
replicants = 1
)
)
.buildBeam()
}
}`
config:
application.name: dsp-report
profiles.active: @profileActive@
druid.zk.host: 192.168.200.78:2181
druid.index.service: druid/overlord
druid.discovery.path: /druid/discovery
input event data:
1,john1,18,1603179767000
2,john2,19,1603179768000
3,john3,11,1603179769000
2,john2,19,1603179770000
2,john2,19,1603179771000
2,john2,19,1603179772000
2,john2,19,1603617127000
2,john2,19,1603617127400
2,john2,19,1603617127500
2,john2,19,1603617127600
1,john1,18,1603789927000
2,john2,19,1603789927000
3,john3,11,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
The flink program executes normally, but cannot write to druid(Source cannot be queried in druid). What is wrong?
version:
<scala.binary.version>2.11</scala.binary.version>
<kafka.version>2.2.0</kafka.version>
<flink.version>1.10.0</flink.version>
flin_tranquility version:
io.druid
tranquility-flink_2.11
0.8.3
apache druid version: 0.19.0
flink code:
event case class:
case class Event(id: Int, name: String, age: Int, ts: Long)SimpleEventBeamFactory.scala
`class SimpleEventBeamFactory(conf: ConfigureUtil) extends BeamFactory[Event] {
lazy val makeBeam: Beam[Event] = {
}
}`
config:
application.name: dsp-report
profiles.active: @profileActive@
druid.zk.host: 192.168.200.78:2181
druid.index.service: druid/overlord
druid.discovery.path: /druid/discovery
input event data:
1,john1,18,1603179767000
2,john2,19,1603179768000
3,john3,11,1603179769000
2,john2,19,1603179770000
2,john2,19,1603179771000
2,john2,19,1603179772000
2,john2,19,1603617127000
2,john2,19,1603617127400
2,john2,19,1603617127500
2,john2,19,1603617127600
1,john1,18,1603789927000
2,john2,19,1603789927000
3,john3,11,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000