Conversation
f6c6939 to
9099b88
Compare
kyle-sammons
left a comment
There was a problem hiding this comment.
Great first pass! Since this is in draft mode, I added comments more intended to spark discussion than to lead you to a specific architecture/implementation. Genuinely curious to hear your thoughts on all of them.
There was a few additional ones that I couldn't find a great place to add a comment for:
- What are the implications of a static schema with our Indexers? Right now it looks like this code doesn't touch them, but they are also a core component of this whole pipeline. Would we still need to restart them when we deploy a new schema change (currently we do that to force the Indexer to stop using whatever dynamic type it's using, and to use the newly set one to reduce type collisions)?
- With type collisions in mind, what should we do if someone changes the type of field but still sends us logs using the previous type? Obviously under a dynamic schema we should do what we're currently doing, but under a strict schema, does it make sense to drop them? Or should we just mark them as failed like we currently do?
- I think we have an endpoint somewhere to return the schema, which I think currently does some hacky stuff to figure out what the dynamic schema is. Under a static schema, could we simplify that logic? Could we do that in a way that doesn't cause weird issues during the transition period from dynamic -> static schemas?
astra/src/main/java/com/slack/astra/bulkIngestApi/DatasetSchemaService.java
Outdated
Show resolved
Hide resolved
| public class SchemaMetadataStore extends AstraMetadataStore<SchemaMetadata> { | ||
| public static final String SCHEMA_METADATA_STORE_PATH = "/schema"; |
There was a problem hiding this comment.
I'm curious if you've thought through the pros and cons of this approach. As it's currently written, when a schema file is changed, that change must be deployed to the Manager nodes for a given cluster. The Manager then reads the new changes and updates this store, which then propagates the change out to all Preprocessors via listeners.
Seeing as a deploy and a hard restart is already required to get the changes out, why not just swap it so that the Preprocessors are what need to be deployed and they're the ones that read the schema file from disk? Wouldn't that greatly simplify the architecture/approach?
Also interested to hear your thoughts on how Indexers and their schemas would get implicated here, since right now a deploy requires deploying both the Indexers and the Preprocessors to get new schema changes. It looks like this would still require that?
There was a problem hiding this comment.
Good callout. My original thinking was that distributing the schema via ZK/etcd would give us atomic propagation, where all preprocessors pick up the change nearly instantly, which avoids a window where preprocessors are inconsistent during a deploy.
Thinking about it more, it seems that the system already tolerates brief inconsistency for every other config change? And schema changes are infrequent. Therefore, the added complexity of a new metadata store, service, and Manager write logic doesn't seem worth it for that benefit.
Thoughts? My original approach had us the preprocessors reading the schema files directly, so I'm more than happy to go back (and it'd greatly simplify things).
There was a problem hiding this comment.
Ahh, I see. Yeah, I agree that this version would certainly reduce the amount of time of schema inconsistency, but I don't think it'd eliminate it. There will always be some Preprocessors that get the update a bit faster than others in a large enough cluster for one reason or another.
In general I agree that the simpler approach is probably better, especially if there aren't serious downsides to it.
astra/src/main/java/com/slack/astra/bulkIngestApi/DatasetSchemaService.java
Outdated
Show resolved
Hide resolved
astra/src/main/java/com/slack/astra/bulkIngestApi/BulkIngestApi.java
Outdated
Show resolved
Hide resolved
astra/src/main/java/com/slack/astra/bulkIngestApi/BulkIngestApi.java
Outdated
Show resolved
Hide resolved
530916d to
ef59fe5
Compare
a7ef5d8 to
570d7b5
Compare
Yeah, I'm thinking now that we'd need to restart our indexers for them to pick up on an schema change. Any concerns about this?
I think it makes sense to drop them, otherwise it doesn't seem like a "static" schema if we allow
Good callout. I looked and the _mapping endpoint currently discovers the schema dynamically by scanning Lucene chunks across nodes. With a static schema I'm thinking we could simplify that to return the schema directly, and merge it with the dynamic results during the transition period. I believe the existing dynamic scan continues to work even once we enforce static schemas, so perhaps we could address this in a follow-up PR, where we refactor it? |
Nope, no concerns on my end if we just need to roll the Indexers like we currently do
I had a thought about this the other day, actually. If someone changes the schema, there would be some transitionary period where we are serving data with the old schema, until eventually that data ages out. If we swap to only returning the static schema for the |
kyle-sammons
left a comment
There was a problem hiding this comment.
Single NIT, but otherwise this LGTM! Great job!
astra/src/main/java/com/slack/astra/bulkIngestApi/opensearch/BulkApiRequestParser.java
Outdated
Show resolved
Hide resolved
570d7b5 to
058c4e4
Compare
0fa5026 to
13c74d2
Compare
13c74d2 to
6eca3dc
Compare
Summary
Enforce a static schema at the preprocessor level, dropping fields that are not defined in the schema
SchemaModeis set toSCHEMA_MODE_DROP_UNKNOWN, drop all fields in incoming messages that are not in the schemaSchemaMetadataStore, which the manager populates upon startupDatasetSchemaServicewhich reads the schema once upon preprocessor start, and will listen / update when schema changes in managerRequirements