Library for accessing a DynamoDB storage.
Add following into paket.references
Alma.DynamoDB
TableName is part of the configuration since it should be paired with specific credentials (and its policies)
open Alma.DynamoDB
let configuration = {
Credentials = AccessKey {
Key = "..."
Secret = "..."
}
TableName = TableName.Instance {
Domain = Domain "domain"
Context = Context "context"
Purpose = Purpose "purpose"
Version = Version "version"
}
}
let dynamoDB = DynamoDB.connect configurationopen Alma.DynamoDB
open Feather.ErrorHandling
type ItemDTO = {
[<Attribute.HashKey>] PrimaryKey: string
[<Attribute.RangeKey>] SecondaryKey: string
OtherAttribute: string
}
asyncResult {
let! itemKey =
{
PrimaryKey = "Movie"
SecondaryKey = "Lord of the Rings"
OtherAttribute = "Trilogy"
}
|> DynamoDB.putItem dynamoDB
return itemKey
}open Alma.DynamoDB
asyncResult {
let! lotrMovie =
{
HashKey = HashKey "Movie"
RangeKey = RangeKey "Lord of the Rings"
}
|> DynamoDB.getItem dynamoDB
return lotrMovie
}open Alma.DynamoDB
asyncResult {
let! movies =
HashKey "Movie"
|> DynamoDB.getItems dynamoDB (fun item -> item.PrimaryKey)
return movies
}The CheckpointStore module provides a DynamoDB-backed implementation for storing and retrieving checkpoints (such as Kafka consumer checkpoints).
open Alma.DynamoDB
open Alma.DynamoDB.Checkpoint
open Alma.DynamoDB
open Alma.DynamoDB
open Feather.ErrorHandling
// Configure CheckpointStore with appropriate table and credentials
let checkpointInstance =
Create.Instance (Domain "domain"; Context "compressorCheckpoint"; Purpose "currentTier"; Version "stable")
|> fun instance -> InstanceWithSidecar (instance, SidecarSuffix "v1")
let checkpointConfiguration =
CheckpointStore.configuration checkpointInstance Credentials.ServiceAccount
let dynamoDB = CheckpointStore.connect checkpointConfigurationlet getCheckpoint (dynamoDB: DynamoDB) (currentApplication: Instance): GetCheckpoint =
fun groupId topicPartition -> asyncResult {
let consumerInstance = ConsumerInstance (currentApplication |> Instance.concat "-")
let! key = Checkpoint.key topicPartition groupId
let checkpoint = Checkpoint key
let! offsetValue = CheckpointStore.retrieveCheckpoint dynamoDB consumerInstance checkpoint
return {
TopicPartition = topicPartition
Offset = offsetValue |> Option.map Offset
}
}let storeCheckpoint (dynamoDB: DynamoDB) (currentApplication: Instance) (groupId: GroupId) ({ TopicPartition = topicPartition; Offset = (Offset offset) }: TopicPartitionOffset) =
asyncResult {
let consumerInstance = ConsumerInstance (currentApplication |> Instance.concat "-")
let! key = Checkpoint.key topicPartition groupId
let checkpoint = Checkpoint key
do! CheckpointStore.storeCheckpoint dynamoDB consumerInstance checkpoint offset
}- Increment version in
DynamoDB.fsproj - Update
CHANGELOG.md - Commit new version and tag it
./build.sh build./build.sh -t tests