Skip to content

Connector Examples

This page shows complete connector implementations for reference.

Simple Text File Connector

A basic connector that indexes text files from a directory.

connector.py

from hoard.sdk import ConnectorV1, EntityInput, ChunkInput, DiscoverResult
from hoard.sdk import chunk_plain_text, compute_content_hash
from pathlib import Path
from typing import Iterator, Tuple, List
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class TextFilesConnector(ConnectorV1):
@property
def name(self) -> str:
return "text_files"
@property
def version(self) -> str:
return "1.0.0"
@property
def source_name(self) -> str:
return "text_files"
def discover(self, config: dict) -> DiscoverResult:
path = Path(config.get("path", "")).expanduser()
if not path.exists():
return DiscoverResult(
success=False,
message=f"Path not found: {path}"
)
txt_files = list(path.rglob("*.txt"))
return DiscoverResult(
success=True,
message=f"Found {len(txt_files)} text files",
entity_count_estimate=len(txt_files),
)
def scan(self, config: dict) -> Iterator[Tuple[EntityInput, List[ChunkInput]]]:
path = Path(config["path"]).expanduser()
max_tokens = config.get("chunk_max_tokens", 400)
overlap = config.get("chunk_overlap_tokens", 50)
for txt_file in path.rglob("*.txt"):
try:
yield self._process_file(txt_file, max_tokens, overlap)
except Exception as e:
logger.warning(f"Skipping {txt_file}: {e}")
continue
def _process_file(
self, file_path: Path, max_tokens: int, overlap: int
) -> Tuple[EntityInput, List[ChunkInput]]:
content = file_path.read_text(encoding="utf-8")
entity = EntityInput(
source=self.source_name,
source_id=str(file_path.absolute()),
entity_type="document",
title=file_path.name,
uri=f"file://{file_path.absolute()}",
mime_type="text/plain",
tags=["txt"],
updated_at=datetime.fromtimestamp(file_path.stat().st_mtime),
content_hash=compute_content_hash(content),
)
chunks = [
ChunkInput(
content=c.text,
char_offset_start=c.start,
char_offset_end=c.end,
)
for c in chunk_plain_text(content, max_tokens=max_tokens, overlap_tokens=overlap)
]
return entity, chunks

manifest.yaml

name: text_files
version: "1.0.0"
description: "Import text files from a directory"
author: "community"
license: "MIT"
requires_hoard: ">=0.1.0"
requires_python: ">=3.11"
config_schema:
type: object
required: [path]
properties:
path:
type: string
description: "Directory containing text files"
chunk_max_tokens:
type: integer
default: 400
chunk_overlap_tokens:
type: integer
default: 50
entry_point: "connector:TextFilesConnector"

Markdown Connector

A connector for markdown files with basic metadata extraction.

connector.py

from hoard.sdk import ConnectorV1, EntityInput, ChunkInput, DiscoverResult
from hoard.sdk import chunk_plain_text, compute_content_hash
from pathlib import Path
from typing import Iterator, Tuple, List
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class MarkdownConnector(ConnectorV1):
@property
def name(self) -> str:
return "markdown"
@property
def version(self) -> str:
return "1.0.0"
@property
def source_name(self) -> str:
return "markdown"
def discover(self, config: dict) -> DiscoverResult:
path = Path(config.get("path", "")).expanduser()
if not path.exists():
return DiscoverResult(
success=False,
message=f"Path not found: {path}"
)
md_files = list(path.rglob("*.md"))
return DiscoverResult(
success=True,
message=f"Found {len(md_files)} markdown files",
entity_count_estimate=len(md_files),
)
def scan(self, config: dict) -> Iterator[Tuple[EntityInput, List[ChunkInput]]]:
path = Path(config["path"]).expanduser()
max_tokens = config.get("chunk_max_tokens", 400)
overlap = config.get("chunk_overlap_tokens", 50)
for md_file in path.rglob("*.md"):
try:
yield self._process_file(md_file, max_tokens, overlap)
except Exception as e:
logger.warning(f"Skipping {md_file}: {e}")
continue
def _process_file(
self, file_path: Path, max_tokens: int, overlap: int
) -> Tuple[EntityInput, List[ChunkInput]]:
content = file_path.read_text(encoding="utf-8")
# Extract title from first heading or filename
title = self._extract_title(content, file_path)
entity = EntityInput(
source=self.source_name,
source_id=str(file_path.absolute()),
entity_type="note",
title=title,
uri=f"file://{file_path.absolute()}",
mime_type="text/markdown",
tags=["markdown"],
updated_at=datetime.fromtimestamp(file_path.stat().st_mtime),
content_hash=compute_content_hash(content),
)
chunks = [
ChunkInput(
content=c.text,
char_offset_start=c.start,
char_offset_end=c.end,
)
for c in chunk_plain_text(content, max_tokens=max_tokens, overlap_tokens=overlap)
]
return entity, chunks
def _extract_title(self, content: str, file_path: Path) -> str:
"""Extract title from first # heading or use filename."""
for line in content.split("\n")[:10]:
if line.startswith("# "):
return line[2:].strip()
return file_path.stem

Export Connector Pattern

For connectors that process exported files (ZIP archives).

from hoard.sdk import ConnectorV1, EntityInput, ChunkInput, DiscoverResult
from hoard.sdk import chunk_plain_text, compute_content_hash
from pathlib import Path
from typing import Iterator, Tuple, List
import tempfile
import shutil
import zipfile
import logging
logger = logging.getLogger(__name__)
class ExportConnector(ConnectorV1):
def __init__(self):
self.temp_dir = None
@property
def name(self) -> str:
return "my_export"
@property
def version(self) -> str:
return "1.0.0"
@property
def source_name(self) -> str:
return "my_export"
def discover(self, config: dict) -> DiscoverResult:
export_path = Path(config.get("export_path", "")).expanduser()
if not export_path.exists():
return DiscoverResult(
success=False,
message=f"Export not found: {export_path}"
)
# Count files in ZIP or directory
if export_path.suffix == ".zip":
with zipfile.ZipFile(export_path, 'r') as zf:
count = sum(1 for n in zf.namelist() if n.endswith('.html'))
else:
count = len(list(export_path.rglob("*.html")))
return DiscoverResult(
success=True,
message=f"Found {count} files",
entity_count_estimate=count,
)
def scan(self, config: dict) -> Iterator[Tuple[EntityInput, List[ChunkInput]]]:
export_path = Path(config["export_path"]).expanduser()
# Extract ZIP to temp directory
if export_path.suffix == ".zip":
self.temp_dir = tempfile.mkdtemp(prefix="hoard-export-")
with zipfile.ZipFile(export_path, 'r') as zf:
zf.extractall(self.temp_dir)
scan_path = Path(self.temp_dir)
else:
scan_path = export_path
# Process each file
for file_path in scan_path.rglob("*.html"):
try:
yield self._process_file(file_path)
except Exception as e:
logger.warning(f"Skipping {file_path}: {e}")
continue
def _process_file(self, file_path: Path) -> Tuple[EntityInput, List[ChunkInput]]:
content = file_path.read_text(encoding="utf-8")
# Extract text from HTML (implement your own logic)
text = self._html_to_text(content)
entity = EntityInput(
source=self.source_name,
source_id=file_path.stem, # Use stable ID
entity_type="page",
title=file_path.stem,
uri=f"file://{file_path.absolute()}",
content_hash=compute_content_hash(text),
)
chunks = [
ChunkInput(
content=c.text,
char_offset_start=c.start,
char_offset_end=c.end,
)
for c in chunk_plain_text(text, max_tokens=400)
]
return entity, chunks
def _html_to_text(self, html: str) -> str:
"""Extract text from HTML. Use BeautifulSoup if available."""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
return soup.get_text(separator='\n', strip=True)
except ImportError:
# Fallback: strip tags naively
import re
text = re.sub(r'<[^>]+>', '', html)
return text.strip()
def cleanup(self) -> None:
"""Clean up temp directory after sync."""
if self.temp_dir:
shutil.rmtree(self.temp_dir, ignore_errors=True)
self.temp_dir = None

Key Patterns

Stable source_id

Always use IDs that don’t change when content is edited:

# Good: file path
source_id = str(file_path.absolute())
# Good: extracted UUID
source_id = extract_uuid_from_filename(file_path.name)
# Bad: content hash (changes on every edit!)
source_id = compute_content_hash(content)

Error Handling

Never let one bad file crash the whole sync:

for file in files:
try:
yield self._process_file(file)
except Exception as e:
logger.warning(f"Skipping {file}: {e}")
continue # Keep processing other files

Cleanup

Always clean up temp resources:

def cleanup(self) -> None:
if self.temp_dir:
shutil.rmtree(self.temp_dir, ignore_errors=True)
self.temp_dir = None

Next Steps