Skip to content

LeekJay/dify-dataset-sdk

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Dify Dataset SDK

用于管理 Dify 知识库的 Python SDK。

特性

  • 📚 模块化设计: 按功能分离的客户端模块(datasets、documents、segments、tags、metadata、models)
  • 🔐 安全认证: 基于 API Key 的安全认证
  • 📄 文档管理: 支持文本和文件上传创建文档
  • 🗂️ 数据集操作: 完整的知识库 CRUD 操作
  • ✂️ 文档块管理: 精细化的文档块(chunks)和子块管理
  • 🏷️ 标签系统: 知识库标签管理
  • 🧾 元数据管理: 自定义元数据字段与文档元数据管理
  • 🔍 高级检索: 支持语义搜索、全文搜索、混合搜索
  • 🔒 类型安全: 完整的类型提示和 Pydantic 模型
  • ⚠️ 异常处理: 完善的错误处理机制

安装

pip install dify-dataset-sdk

快速开始

from dify_dataset_sdk import DifyDatasetClient

# 初始化客户端
client = DifyDatasetClient(api_key="your-api-key")

# 创建知识库
dataset = client.datasets.create(name="我的知识库")

# 添加文档
doc = client.documents.create_by_text(
    dataset_id=dataset.id,
    name="示例文档",
    text="这是文档内容..."
)

# 检索知识库
results = client.datasets.retrieve(dataset_id=dataset.id, query="搜索关键词")

客户端结构

SDK 采用模块化设计,通过统一的 DifyDatasetClient 入口访问各功能模块:

client = DifyDatasetClient(api_key="your-api-key")

client.datasets    # 数据集管理
client.documents   # 文档管理
client.segments    # 文档块管理
client.tags        # 标签管理
client.metadata    # 元数据管理
client.models      # 嵌入模型查询

API 参考

初始化客户端

from dify_dataset_sdk import DifyDatasetClient

client = DifyDatasetClient(
    api_key="your-api-key",           # API密钥(必需)
    base_url="https://api.dify.ai",   # API地址(可选)
    timeout=30.0                       # 超时时间(可选)
)

# 支持上下文管理器
with DifyDatasetClient(api_key="your-api-key") as client:
    dataset = client.datasets.create(name="test")

数据集管理 (client.datasets)

创建数据集

dataset = client.datasets.create(
    name="知识库名称",                    # 必需
    description="知识库描述",             # 可选
    indexing_technique="high_quality",   # 可选: "high_quality" | "economy"
    permission="only_me",                # 可选: "only_me" | "all_team_members" | "partial_members"
    embedding_model="text-embedding-3-small",      # 可选
    embedding_model_provider="openai",             # 可选
)

获取数据集列表

result = client.datasets.list(
    keyword="搜索关键词",    # 可选
    tag_ids=["tag-id"],     # 可选
    page=1,                  # 可选,默认1
    limit=20,                # 可选,默认20
)

for dataset in result.data:
    print(f"{dataset['id']}: {dataset['name']}")

获取数据集详情

dataset = client.datasets.get(dataset_id="dataset-id")
print(dataset.name)

更新数据集

dataset = client.datasets.update(
    dataset_id="dataset-id",
    name="新名称",
    description="新描述",
)

删除数据集

client.datasets.delete(dataset_id="dataset-id")

知识库检索

from dify_dataset_sdk import RetrievalModel

# 基本检索
results = client.datasets.retrieve(
    dataset_id="dataset-id",
    query="搜索内容"
)

# 高级检索配置
retrieval_model = RetrievalModel(
    search_method="hybrid_search",  # "hybrid_search" | "semantic_search" | "full_text_search" | "keyword_search"
    reranking_enable=True,
    top_k=10,
    score_threshold_enabled=True,
    score_threshold=0.5,
)

results = client.datasets.retrieve(
    dataset_id="dataset-id",
    query="搜索内容",
    retrieval_model=retrieval_model
)

for record in results.records:
    print(f"内容: {record['content']}")
    print(f"分数: {record['score']}")

文档管理 (client.documents)

通过文本创建文档

from dify_dataset_sdk import ProcessRule

response = client.documents.create_by_text(
    dataset_id="dataset-id",
    name="文档名称",
    text="文档内容...",
    indexing_technique="high_quality",   # 可选
    doc_form="text_model",               # 可选: "text_model" | "hierarchical_model" | "qa_model"
    doc_language="zh",                   # 可选,Q&A模式使用
)

print(f"文档ID: {response.document.id}")
print(f"批次ID: {response.batch}")

通过文件创建文档

response = client.documents.create_by_file(
    dataset_id="dataset-id",
    file_path="/path/to/file.pdf",
    indexing_technique="high_quality",
)

获取文档列表

result = client.documents.list(
    dataset_id="dataset-id",
    keyword="搜索关键词",    # 可选
    page=1,
    limit=20,
)

获取文档详情

document = client.documents.get(
    dataset_id="dataset-id",
    document_id="document-id",
    metadata="all",  # "all" | "only" | "without"
)

更新文档(文本)

response = client.documents.update_by_text(
    dataset_id="dataset-id",
    document_id="document-id",
    name="新名称",
    text="新内容",
)

更新文档(文件)

response = client.documents.update_by_file(
    dataset_id="dataset-id",
    document_id="document-id",
    file_path="/path/to/new_file.pdf",
)

删除文档

client.documents.delete(
    dataset_id="dataset-id",
    document_id="document-id"
)

获取索引状态

status = client.documents.get_indexing_status(
    dataset_id="dataset-id",
    batch="batch-id"  # 创建文档时返回的batch
)

for item in status.data:
    print(f"状态: {item.indexing_status}")
    print(f"进度: {item.completed_segments}/{item.total_segments}")

批量更新文档状态

client.documents.batch_update_status(
    dataset_id="dataset-id",
    action="enable",  # "enable" | "disable" | "archive" | "un_archive"
    document_ids=["doc-id-1", "doc-id-2"]
)

文档块管理 (client.segments)

创建文档块

response = client.segments.create(
    dataset_id="dataset-id",
    document_id="document-id",
    segments=[
        {
            "content": "块内容",
            "keywords": ["关键词1", "关键词2"],
            "answer": "答案内容(Q&A模式)"  # 可选
        }
    ]
)

获取文档块列表

response = client.segments.list(
    dataset_id="dataset-id",
    document_id="document-id",
    keyword="搜索关键词",    # 可选
    status="completed",      # 可选
    page=1,
    limit=20,
)

for segment in response.data:
    print(f"{segment.id}: {segment.content[:50]}...")

获取文档块详情

segment = client.segments.get(
    dataset_id="dataset-id",
    document_id="document-id",
    segment_id="segment-id"
)

更新文档块

client.segments.update(
    dataset_id="dataset-id",
    document_id="document-id",
    segment_id="segment-id",
    segment_data={
        "content": "更新后的内容",
        "keywords": ["新关键词"],
        "enabled": True,
    }
)

删除文档块

client.segments.delete(
    dataset_id="dataset-id",
    document_id="document-id",
    segment_id="segment-id"
)

子块操作(分层模式)

# 创建子块
client.segments.create_child_chunk(
    dataset_id="dataset-id",
    document_id="document-id",
    segment_id="segment-id",
    content="子块内容"
)

# 获取子块列表
response = client.segments.list_child_chunks(
    dataset_id="dataset-id",
    document_id="document-id",
    segment_id="segment-id",
)

# 更新子块
client.segments.update_child_chunk(
    dataset_id="dataset-id",
    document_id="document-id",
    segment_id="segment-id",
    child_chunk_id="child-chunk-id",
    content="更新后的子块内容"
)

# 删除子块
client.segments.delete_child_chunk(
    dataset_id="dataset-id",
    document_id="document-id",
    segment_id="segment-id",
    child_chunk_id="child-chunk-id"
)

标签管理 (client.tags)

知识库标签

# 创建标签
tag = client.tags.create(name="重要")

# 获取所有标签
tags = client.tags.list()

# 更新标签
tag = client.tags.update(tag_id="tag-id", name="非常重要")

# 删除标签
client.tags.delete(tag_id="tag-id")

# 绑定标签到数据集
client.tags.bind_to_dataset(
    dataset_id="dataset-id",
    tag_ids=["tag-id-1", "tag-id-2"]
)

# 解绑标签
client.tags.unbind_from_dataset(
    dataset_id="dataset-id",
    tag_id="tag-id"
)

# 获取数据集的标签
tags = client.tags.get_dataset_tags(dataset_id="dataset-id")

元数据管理 (client.metadata)

# 创建元数据字段
metadata = client.metadata.create(
    dataset_id="dataset-id",
    field_type="string",  # "string" | "number" | "time"
    name="作者"
)

# 获取元数据字段列表
response = client.metadata.list(dataset_id="dataset-id")

# 更新元数据字段
metadata = client.metadata.update(
    dataset_id="dataset-id",
    metadata_id="metadata-id",
    name="新字段名"
)

# 删除元数据字段
client.metadata.delete(
    dataset_id="dataset-id",
    metadata_id="metadata-id"
)

# 启用/禁用内置元数据
client.metadata.toggle_built_in(
    dataset_id="dataset-id",
    action="enable"  # "enable" | "disable"
)

# 更新文档元数据值
client.metadata.update_document_metadata(
    dataset_id="dataset-id",
    operation_data=[
        {
            "document_id": "doc-id",
            "metadata_list": [
                {"id": "metadata-id", "value": "值", "name": "字段名"}
            ]
        }
    ]
)

嵌入模型 (client.models)

# 获取可用的嵌入模型列表
response = client.models.list_embedding_models()

for provider in response.data:
    print(f"提供商: {provider['provider']}")
    for model in provider.get('models', []):
        print(f"  - {model['model']}")

异常处理

from dify_dataset_sdk import (
    DifyError,
    DifyAPIError,
    DifyAuthenticationError,
    DifyValidationError,
    DifyNotFoundError,
    DifyConflictError,
    DifyServerError,
    DifyConnectionError,
    DifyTimeoutError,
)

try:
    dataset = client.datasets.get(dataset_id="invalid-id")
except DifyNotFoundError as e:
    print(f"资源未找到: {e.message}")
except DifyAuthenticationError as e:
    print(f"认证失败: {e.message}")
except DifyValidationError as e:
    print(f"参数错误: {e.message}, 错误码: {e.error_code}")
except DifyAPIError as e:
    print(f"API错误: {e.message}, HTTP状态码: {e.status_code}")
except DifyConnectionError as e:
    print(f"连接错误: {e.message}")
except DifyTimeoutError as e:
    print(f"请求超时: {e.message}")

完整示例

from dify_dataset_sdk import DifyDatasetClient, RetrievalModel

def main():
    # 初始化客户端
    with DifyDatasetClient(api_key="your-api-key") as client:
        # 1. 创建知识库
        dataset = client.datasets.create(
            name="产品文档库",
            description="存储产品相关文档"
        )
        print(f"创建知识库: {dataset.id}")

        # 2. 添加文档
        doc_response = client.documents.create_by_text(
            dataset_id=dataset.id,
            name="产品介绍",
            text="""
            这是一款智能助手产品。
            主要功能包括:
            1. 自然语言理解
            2. 知识库检索
            3. 多轮对话
            """
        )
        print(f"创建文档: {doc_response.document.id}")

        # 3. 等待索引完成(实际使用中应轮询检查)
        import time
        time.sleep(5)

        # 4. 检索知识库
        results = client.datasets.retrieve(
            dataset_id=dataset.id,
            query="产品有哪些功能",
            retrieval_model=RetrievalModel(
                search_method="hybrid_search",
                top_k=3
            )
        )

        print("\n检索结果:")
        for record in results.records:
            print(f"- {record['content'][:100]}...")
            print(f"  分数: {record['score']}")

        # 5. 添加标签
        tag = client.tags.create(name="产品")
        client.tags.bind_to_dataset(dataset.id, tag_ids=[tag.id])
        print(f"\n已添加标签: {tag.name}")

if __name__ == "__main__":
    main()

SDK 结构

dify_dataset_sdk/
├── __init__.py           # 统一导出入口
├── _base.py              # HTTP 客户端基类
├── _exceptions.py        # 异常类定义
├── client.py             # DifyClient 主入口
├── datasets/             # 数据集模块
│   ├── client.py         # DatasetsClient
│   └── models.py         # Dataset 相关模型
├── documents/            # 文档模块
│   ├── client.py         # DocumentsClient
│   └── models.py         # Document 相关模型
├── segments/             # 文档块模块
│   ├── client.py         # SegmentsClient
│   └── models.py         # Segment/ChildChunk 相关模型
├── tags/                 # 标签模块
│   ├── client.py         # TagsClient
│   └── models.py         # Tag 相关模型
├── metadata/             # 元数据模块
│   ├── client.py         # MetadataClient
│   └── models.py         # Metadata 相关模型
└── models_api/           # 嵌入模型模块
    ├── client.py         # ModelsClient
    └── models.py         # EmbeddingModel 相关模型

从 v0.3.0 迁移到 v0.4.0

主要变更

v0.4.0 对 SDK 进行了模块化重构,API 调用方式有较大变化:

  1. 客户端类名不变:仍然使用 DifyDatasetClient
  2. 方法调用方式变更:从扁平方法调用改为模块化调用
  3. 导入路径变更:统一从 dify_dataset_sdk 导入

客户端初始化

# v0.3.0 和 v0.4.0 初始化方式相同
from dify_dataset_sdk import DifyDatasetClient

client = DifyDatasetClient(api_key="your-api-key")

API 映射表

数据集操作

v0.3.0 方法 v0.4.0 方法
client.create_dataset(name=...) client.datasets.create(name=...)
client.list_datasets(...) client.datasets.list(...)
client.get_dataset(dataset_id) client.datasets.get(dataset_id)
client.update_dataset(dataset_id, ...) client.datasets.update(dataset_id, ...)
client.delete_dataset(dataset_id) client.datasets.delete(dataset_id)
client.retrieve(dataset_id, query) client.datasets.retrieve(dataset_id, query)

文档操作

v0.3.0 方法 v0.4.0 方法
client.create_document_by_text(...) client.documents.create_by_text(...)
client.create_document_by_file(...) client.documents.create_by_file(...)
client.list_documents(dataset_id, ...) client.documents.list(dataset_id, ...)
client.get_document(dataset_id, document_id) client.documents.get(dataset_id, document_id)
client.update_document_by_text(...) client.documents.update_by_text(...)
client.update_document_by_file(...) client.documents.update_by_file(...)
client.delete_document(dataset_id, document_id) client.documents.delete(dataset_id, document_id)
client.get_document_indexing_status(...) client.documents.get_indexing_status(...)
client.batch_update_document_status(...) client.documents.batch_update_status(...)

文档块操作

v0.3.0 方法 v0.4.0 方法
client.create_segments(...) client.segments.create(...)
client.list_segments(...) client.segments.list(...)
client.get_segment(...) client.segments.get(...)
client.update_segment(...) client.segments.update(...)
client.delete_segment(...) client.segments.delete(...)
client.create_child_chunk(...) client.segments.create_child_chunk(...)
client.list_child_chunks(...) client.segments.list_child_chunks(...)
client.update_child_chunk(...) client.segments.update_child_chunk(...)
client.delete_child_chunk(...) client.segments.delete_child_chunk(...)

标签操作

v0.3.0 方法 v0.4.0 方法
client.create_knowledge_tag(...) client.tags.create(...)
client.list_knowledge_tags() client.tags.list()
client.update_knowledge_tag(...) client.tags.update(...)
client.delete_knowledge_tag(...) client.tags.delete(...)
client.bind_dataset_to_tag(...) client.tags.bind_to_dataset(...)
client.unbind_dataset_from_tag(...) client.tags.unbind_from_dataset(...)
client.get_dataset_tags(...) client.tags.get_dataset_tags(...)

元数据操作

v0.3.0 方法 v0.4.0 方法
client.create_metadata_field(...) client.metadata.create(...)
client.list_metadata_fields(...) client.metadata.list(...)
client.update_metadata_field(...) client.metadata.update(...)
client.delete_metadata_field(...) client.metadata.delete(...)
client.toggle_built_in_metadata_field(...) client.metadata.toggle_built_in(...)
client.update_document_metadata(...) client.metadata.update_document_metadata(...)

模型操作

v0.3.0 方法 v0.4.0 方法
client.list_embedding_models() client.models.list_embedding_models()

完整迁移示例

# ============ v0.3.0 代码 ============
from dify_dataset_sdk import DifyDatasetClient

client = DifyDatasetClient(api_key="your-api-key")

# 创建知识库
dataset = client.create_dataset(name="我的知识库")

# 添加文档
doc = client.create_document_by_text(
    dataset_id=dataset.id,
    name="示例文档",
    text="文档内容"
)

# 创建标签
tag = client.create_knowledge_tag(name="重要")
client.bind_dataset_to_tag(dataset_id=dataset.id, tag_ids=[tag.id])

# 检索
results = client.retrieve(dataset_id=dataset.id, query="搜索关键词")


# ============ v0.4.0 代码 ============
from dify_dataset_sdk import DifyDatasetClient

client = DifyDatasetClient(api_key="your-api-key")

# 创建知识库
dataset = client.datasets.create(name="我的知识库")

# 添加文档
doc = client.documents.create_by_text(
    dataset_id=dataset.id,
    name="示例文档",
    text="文档内容"
)

# 创建标签
tag = client.tags.create(name="重要")
client.tags.bind_to_dataset(dataset_id=dataset.id, tag_ids=[tag.id])

# 检索
results = client.datasets.retrieve(dataset_id=dataset.id, query="搜索关键词")

模型和异常类

模型和异常类的导入保持不变:

from dify_dataset_sdk import (
    # 模型类
    Dataset,
    Document,
    Segment,
    RetrievalModel,
    ProcessRule,
    KnowledgeTag,
    Metadata,
    # 异常类
    DifyError,
    DifyAPIError,
    DifyAuthenticationError,
    DifyNotFoundError,
    DifyValidationError,
    DifyServerError,
    DifyConnectionError,
    DifyTimeoutError,
)

版本信息

  • 当前版本: 0.5.0
  • Python 支持: >= 3.8.1
  • 依赖: httpx, pydantic

更新日志

v0.5.0

  • 破坏性变更: tagsmetadata 拆分为独立模块
  • 新增模块: client.metadata 专用于元数据相关操作

v0.4.0

  • 重构: 采用模块化架构,按功能拆分客户端
  • 新 API: 使用 DifyDatasetClient 入口访问各子模块(datasets, documents, segments, tags, metadata, models)
  • 改进: 简化方法命名(如 create_datasetdatasets.create

v0.3.0

  • 初始版本,支持完整的 Dify Knowledge Base API

License

MIT License

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages