Connector Examples
This page shows complete connector implementations for reference.
Simple Text File Connector
A basic connector that indexes text files from a directory.
connector.py
from hoard.sdk import ConnectorV1, EntityInput, ChunkInput, DiscoverResultfrom hoard.sdk import chunk_plain_text, compute_content_hashfrom pathlib import Pathfrom typing import Iterator, Tuple, Listfrom datetime import datetimeimport logging
logger = logging.getLogger(__name__)
class TextFilesConnector(ConnectorV1):
@property def name(self) -> str: return "text_files"
@property def version(self) -> str: return "1.0.0"
@property def source_name(self) -> str: return "text_files"
def discover(self, config: dict) -> DiscoverResult: path = Path(config.get("path", "")).expanduser()
if not path.exists(): return DiscoverResult( success=False, message=f"Path not found: {path}" )
txt_files = list(path.rglob("*.txt")) return DiscoverResult( success=True, message=f"Found {len(txt_files)} text files", entity_count_estimate=len(txt_files), )
def scan(self, config: dict) -> Iterator[Tuple[EntityInput, List[ChunkInput]]]: path = Path(config["path"]).expanduser() max_tokens = config.get("chunk_max_tokens", 400) overlap = config.get("chunk_overlap_tokens", 50)
for txt_file in path.rglob("*.txt"): try: yield self._process_file(txt_file, max_tokens, overlap) except Exception as e: logger.warning(f"Skipping {txt_file}: {e}") continue
def _process_file( self, file_path: Path, max_tokens: int, overlap: int ) -> Tuple[EntityInput, List[ChunkInput]]: content = file_path.read_text(encoding="utf-8")
entity = EntityInput( source=self.source_name, source_id=str(file_path.absolute()), entity_type="document", title=file_path.name, uri=f"file://{file_path.absolute()}", mime_type="text/plain", tags=["txt"], updated_at=datetime.fromtimestamp(file_path.stat().st_mtime), content_hash=compute_content_hash(content), )
chunks = [ ChunkInput( content=c.text, char_offset_start=c.start, char_offset_end=c.end, ) for c in chunk_plain_text(content, max_tokens=max_tokens, overlap_tokens=overlap) ]
return entity, chunksmanifest.yaml
name: text_filesversion: "1.0.0"description: "Import text files from a directory"author: "community"license: "MIT"
requires_hoard: ">=0.1.0"requires_python: ">=3.11"
config_schema: type: object required: [path] properties: path: type: string description: "Directory containing text files" chunk_max_tokens: type: integer default: 400 chunk_overlap_tokens: type: integer default: 50
entry_point: "connector:TextFilesConnector"Markdown Connector
A connector for markdown files with basic metadata extraction.
connector.py
from hoard.sdk import ConnectorV1, EntityInput, ChunkInput, DiscoverResultfrom hoard.sdk import chunk_plain_text, compute_content_hashfrom pathlib import Pathfrom typing import Iterator, Tuple, Listfrom datetime import datetimeimport logging
logger = logging.getLogger(__name__)
class MarkdownConnector(ConnectorV1):
@property def name(self) -> str: return "markdown"
@property def version(self) -> str: return "1.0.0"
@property def source_name(self) -> str: return "markdown"
def discover(self, config: dict) -> DiscoverResult: path = Path(config.get("path", "")).expanduser()
if not path.exists(): return DiscoverResult( success=False, message=f"Path not found: {path}" )
md_files = list(path.rglob("*.md")) return DiscoverResult( success=True, message=f"Found {len(md_files)} markdown files", entity_count_estimate=len(md_files), )
def scan(self, config: dict) -> Iterator[Tuple[EntityInput, List[ChunkInput]]]: path = Path(config["path"]).expanduser() max_tokens = config.get("chunk_max_tokens", 400) overlap = config.get("chunk_overlap_tokens", 50)
for md_file in path.rglob("*.md"): try: yield self._process_file(md_file, max_tokens, overlap) except Exception as e: logger.warning(f"Skipping {md_file}: {e}") continue
def _process_file( self, file_path: Path, max_tokens: int, overlap: int ) -> Tuple[EntityInput, List[ChunkInput]]: content = file_path.read_text(encoding="utf-8")
# Extract title from first heading or filename title = self._extract_title(content, file_path)
entity = EntityInput( source=self.source_name, source_id=str(file_path.absolute()), entity_type="note", title=title, uri=f"file://{file_path.absolute()}", mime_type="text/markdown", tags=["markdown"], updated_at=datetime.fromtimestamp(file_path.stat().st_mtime), content_hash=compute_content_hash(content), )
chunks = [ ChunkInput( content=c.text, char_offset_start=c.start, char_offset_end=c.end, ) for c in chunk_plain_text(content, max_tokens=max_tokens, overlap_tokens=overlap) ]
return entity, chunks
def _extract_title(self, content: str, file_path: Path) -> str: """Extract title from first # heading or use filename.""" for line in content.split("\n")[:10]: if line.startswith("# "): return line[2:].strip() return file_path.stemExport Connector Pattern
For connectors that process exported files (ZIP archives).
from hoard.sdk import ConnectorV1, EntityInput, ChunkInput, DiscoverResultfrom hoard.sdk import chunk_plain_text, compute_content_hashfrom pathlib import Pathfrom typing import Iterator, Tuple, Listimport tempfileimport shutilimport zipfileimport logging
logger = logging.getLogger(__name__)
class ExportConnector(ConnectorV1):
def __init__(self): self.temp_dir = None
@property def name(self) -> str: return "my_export"
@property def version(self) -> str: return "1.0.0"
@property def source_name(self) -> str: return "my_export"
def discover(self, config: dict) -> DiscoverResult: export_path = Path(config.get("export_path", "")).expanduser()
if not export_path.exists(): return DiscoverResult( success=False, message=f"Export not found: {export_path}" )
# Count files in ZIP or directory if export_path.suffix == ".zip": with zipfile.ZipFile(export_path, 'r') as zf: count = sum(1 for n in zf.namelist() if n.endswith('.html')) else: count = len(list(export_path.rglob("*.html")))
return DiscoverResult( success=True, message=f"Found {count} files", entity_count_estimate=count, )
def scan(self, config: dict) -> Iterator[Tuple[EntityInput, List[ChunkInput]]]: export_path = Path(config["export_path"]).expanduser()
# Extract ZIP to temp directory if export_path.suffix == ".zip": self.temp_dir = tempfile.mkdtemp(prefix="hoard-export-") with zipfile.ZipFile(export_path, 'r') as zf: zf.extractall(self.temp_dir) scan_path = Path(self.temp_dir) else: scan_path = export_path
# Process each file for file_path in scan_path.rglob("*.html"): try: yield self._process_file(file_path) except Exception as e: logger.warning(f"Skipping {file_path}: {e}") continue
def _process_file(self, file_path: Path) -> Tuple[EntityInput, List[ChunkInput]]: content = file_path.read_text(encoding="utf-8")
# Extract text from HTML (implement your own logic) text = self._html_to_text(content)
entity = EntityInput( source=self.source_name, source_id=file_path.stem, # Use stable ID entity_type="page", title=file_path.stem, uri=f"file://{file_path.absolute()}", content_hash=compute_content_hash(text), )
chunks = [ ChunkInput( content=c.text, char_offset_start=c.start, char_offset_end=c.end, ) for c in chunk_plain_text(text, max_tokens=400) ]
return entity, chunks
def _html_to_text(self, html: str) -> str: """Extract text from HTML. Use BeautifulSoup if available.""" try: from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'html.parser') return soup.get_text(separator='\n', strip=True) except ImportError: # Fallback: strip tags naively import re text = re.sub(r'<[^>]+>', '', html) return text.strip()
def cleanup(self) -> None: """Clean up temp directory after sync.""" if self.temp_dir: shutil.rmtree(self.temp_dir, ignore_errors=True) self.temp_dir = NoneKey Patterns
Stable source_id
Always use IDs that don’t change when content is edited:
# Good: file pathsource_id = str(file_path.absolute())
# Good: extracted UUIDsource_id = extract_uuid_from_filename(file_path.name)
# Bad: content hash (changes on every edit!)source_id = compute_content_hash(content)Error Handling
Never let one bad file crash the whole sync:
for file in files: try: yield self._process_file(file) except Exception as e: logger.warning(f"Skipping {file}: {e}") continue # Keep processing other filesCleanup
Always clean up temp resources:
def cleanup(self) -> None: if self.temp_dir: shutil.rmtree(self.temp_dir, ignore_errors=True) self.temp_dir = NoneNext Steps
- Connector Interface — Full interface spec
- Testing — Test your connector
- Manifest — Configure your connector