Skip to content

alma-oss/fdynamodb

Repository files navigation

F-DynamoDB

NuGet NuGet Downloads Tests

Library for accessing a DynamoDB storage.

Install

Add following into paket.references

Alma.DynamoDB

Use

Connect to a table

TableName is part of the configuration since it should be paired with specific credentials (and its policies)

open Alma.DynamoDB

let configuration = {
    Credentials = AccessKey {
        Key = "..."
        Secret = "..."
    }
    TableName = TableName.Instance {
        Domain = Domain "domain"
        Context = Context "context"
        Purpose = Purpose "purpose"
        Version = Version "version"
    }
}

let dynamoDB = DynamoDB.connect configuration

Put item to DynamoDB

open Alma.DynamoDB
open Feather.ErrorHandling

type ItemDTO = {
    [<Attribute.HashKey>] PrimaryKey: string
    [<Attribute.RangeKey>] SecondaryKey: string

    OtherAttribute: string
}

asyncResult {
    let! itemKey =
        {
            PrimaryKey = "Movie"
            SecondaryKey = "Lord of the Rings"
            OtherAttribute = "Trilogy"
        }
        |> DynamoDB.putItem dynamoDB

    return itemKey
}

Get item from DynamoDB

open Alma.DynamoDB

asyncResult {
    let! lotrMovie =
        {
            HashKey = HashKey "Movie"
            RangeKey = RangeKey "Lord of the Rings"
        }
        |> DynamoDB.getItem dynamoDB

    return lotrMovie
}

Get items by a hashKey from DynamoDB

open Alma.DynamoDB

asyncResult {
    let! movies =
        HashKey "Movie"
        |> DynamoDB.getItems dynamoDB (fun item -> item.PrimaryKey)

    return movies
}

Checkpoint

The CheckpointStore module provides a DynamoDB-backed implementation for storing and retrieving checkpoints (such as Kafka consumer checkpoints).

Setup for Kafka Checkpoints

open Alma.DynamoDB
open Alma.DynamoDB.Checkpoint
open Alma.DynamoDB
open Alma.DynamoDB
open Feather.ErrorHandling

// Configure CheckpointStore with appropriate table and credentials
let checkpointInstance =
    Create.Instance (Domain "domain"; Context "compressorCheckpoint"; Purpose "currentTier"; Version "stable")
    |> fun instance -> InstanceWithSidecar (instance, SidecarSuffix "v1")

let checkpointConfiguration =
    CheckpointStore.configuration checkpointInstance Credentials.ServiceAccount

let dynamoDB = CheckpointStore.connect checkpointConfiguration

Implementing GetCheckpoint for Kafka

let getCheckpoint (dynamoDB: DynamoDB) (currentApplication: Instance): GetCheckpoint =
    fun groupId topicPartition -> asyncResult {
        let consumerInstance = ConsumerInstance (currentApplication |> Instance.concat "-")
        let! key = Checkpoint.key topicPartition groupId
        let checkpoint = Checkpoint key

        let! offsetValue = CheckpointStore.retrieveCheckpoint dynamoDB consumerInstance checkpoint

        return {
            TopicPartition = topicPartition
            Offset = offsetValue |> Option.map Offset
        }
    }

Storing Checkpoints

let storeCheckpoint (dynamoDB: DynamoDB) (currentApplication: Instance) (groupId: GroupId) ({ TopicPartition = topicPartition; Offset = (Offset offset) }: TopicPartitionOffset) =
    asyncResult {
        let consumerInstance = ConsumerInstance (currentApplication |> Instance.concat "-")
        let! key = Checkpoint.key topicPartition groupId
        let checkpoint = Checkpoint key

        do! CheckpointStore.storeCheckpoint dynamoDB consumerInstance checkpoint offset
    }

Release

  1. Increment version in DynamoDB.fsproj
  2. Update CHANGELOG.md
  3. Commit new version and tag it

Development

Requirements

Build

./build.sh build

Tests

./build.sh -t tests

About

Library for accessing a DynamoDB storage.

Resources

License

Stars

Watchers

Forks

Packages

No packages published