Skip to content

BEP XET

BEP XET: Xet Protocol Extension for Content-Defined Chunking and Deduplication

Overview

The Xet Protocol Extension (BEP XET) is a BitTorrent protocol extension that enables content-defined chunking (CDC) and cross-torrent deduplication through a peer-to-peer Content Addressable Storage (CAS) system. This extension transforms BitTorrent into a super-fast, updatable peer-to-peer file system optimized for collaboration and efficient data sharing.

Rationale

The Xet protocol extension addresses key limitations of traditional BitTorrent:

  1. Fixed Piece Sizes: Traditional BitTorrent uses fixed piece sizes, leading to inefficient redistribution when files are modified. CDC adapts to content boundaries.

  2. No Cross-Torrent Deduplication: Each torrent is independent, even if sharing identical content. Xet enables chunk-level deduplication across torrents.

  3. Centralized Storage: Traditional CAS systems require external services. Xet builds CAS directly into the BitTorrent network using DHT and trackers.

  4. Inefficient Updates: Updating a shared file requires redistributing the entire file. Xet only redistributes changed chunks.

By combining CDC, deduplication, and P2P CAS, Xet transforms BitTorrent into a super-fast, updatable peer-to-peer file system optimized for collaboration.

Key Features

  • Content-Defined Chunking (CDC): Gearhash-based intelligent file segmentation (8KB-128KB chunks)
  • Cross-Torrent Deduplication: Chunk-level deduplication across multiple torrents
  • Peer-to-Peer CAS: Decentralized Content Addressable Storage using DHT and trackers
  • Merkle Tree Verification: BLAKE3-256 hashing with SHA-256 fallback for integrity
  • Xorb Format: Efficient storage format for grouping multiple chunks
  • Shard Format: Metadata storage for file information and CAS data
  • LZ4 Compression: Optional compression for Xorb data

Use Cases

1. Collaborative File Sharing

Xet enables efficient collaboration by: - Deduplication: Shared files across multiple torrents share the same chunks - Fast Updates: Only changed chunks need to be redistributed - Version Control: Track file versions through Merkle tree roots

2. Large File Distribution

For large files or datasets: - Content-Defined Chunking: Intelligent boundaries reduce chunk redistribution on edits - Parallel Downloads: Download chunks from multiple peers simultaneously - Resume Capability: Track individual chunks for reliable resume

3. Peer-to-Peer File System

Transform BitTorrent into a P2P file system: - CAS Integration: Chunks stored in DHT for global availability - Metadata Storage: Shards provide file system metadata - Fast Lookups: Direct chunk access via hash eliminates need for full torrent download

Implementation Status

The Xet protocol extension is fully implemented in ccBitTorrent:

  • ✅ Content-Defined Chunking (Gearhash CDC)
  • ✅ BLAKE3-256 hashing with SHA-256 fallback
  • ✅ SQLite deduplication cache
  • ✅ DHT integration (BEP 44)
  • ✅ Tracker integration
  • ✅ Xorb and Shard formats
  • ✅ Merkle tree computation
  • ✅ BitTorrent protocol extension (BEP 10)
  • ✅ CLI integration
  • ✅ Configuration management

Configuration

CLI Commands

# Enable Xet protocol
ccbt xet enable

# Show Xet status
ccbt xet status

# Show deduplication statistics
ccbt xet stats

# Clean up unused chunks
ccbt xet cleanup --max-age-days 30

Enable Xet Protocol

Configure Xet support in ccbt.toml:

[disk]
# Xet Protocol Configuration
xet_enabled = false                        # Enable Xet protocol
xet_chunk_min_size = 8192                  # Minimum chunk size (bytes)
xet_chunk_max_size = 131072                # Maximum chunk size (bytes)
xet_chunk_target_size = 16384              # Target chunk size (bytes)
xet_deduplication_enabled = true           # Enable chunk-level deduplication
xet_cache_db_path = "data/xet_cache.db"    # SQLite cache database path
xet_chunk_store_path = "data/xet_chunks"   # Chunk storage directory
xet_use_p2p_cas = true                     # Use P2P Content Addressable Storage
xet_compression_enabled = true             # Enable LZ4 compression for Xorb data

Protocol Specification

Message Types

The Xet extension defines four message types:

  1. CHUNK_REQUEST (0x01): Request a specific chunk by hash
  2. CHUNK_RESPONSE (0x02): Response containing chunk data
  3. CHUNK_NOT_FOUND (0x03): Peer does not have the requested chunk
  4. CHUNK_ERROR (0x04): Error occurred while retrieving chunk

Message Format

CHUNK_REQUEST
Offset  Size  Description
0       32    Chunk hash (BLAKE3-256 or SHA-256)
CHUNK_RESPONSE
Offset  Size  Description
0       32    Chunk hash
32      4     Chunk data length (big-endian)
36      N     Chunk data
CHUNK_NOT_FOUND
Offset  Size  Description
0       32    Chunk hash
CHUNK_ERROR
Offset  Size  Description
0       32    Chunk hash
32      4     Error code (big-endian)
36      N     Error message (UTF-8)

Extension Handshake

The Xet extension follows BEP 10 (Extension Protocol) handshake:

  1. Client sends ut_metadata extension handshake with Xet extension ID
  2. Server responds with Xet extension ID and message ID mapping
  3. Messages are sent using the assigned extension message ID

Chunk Discovery

Chunks are discovered through multiple mechanisms:

  1. DHT (BEP 44): Store and retrieve chunk metadata using DHT
  2. Trackers: Announce chunk availability to trackers
  3. Peer Exchange: Exchange chunk availability information with peers
  4. Torrent Metadata: Extract chunk hashes from torrent Xet metadata

Architecture

Core Components

1. Protocol Extension (ccbt/extensions/xet.py)

The Xet extension implements BEP 10 (Extension Protocol) messages for chunk requests and responses.

Xet Protocol Extension implementation.

Initialize Xet Extension.

Source code in ccbt/extensions/xet.py
def __init__(self):
    """Initialize Xet Extension."""
    self.pending_requests: dict[
        tuple[str, int], XetChunkRequest
    ] = {}  # (peer_id, request_id) -> request
    self.request_counter = 0
    self.chunk_provider: Callable[[bytes], bytes | None] | None = None

decode_chunk_request(data: bytes) -> tuple[int, bytes]

Decode chunk request message.

Parameters:

Name Type Description Default
data bytes

Encoded request message

required

Returns:

Type Description
tuple[int, bytes]

Tuple of (request_id, chunk_hash)

Raises:

Type Description
ValueError

If message is invalid

Source code in ccbt/extensions/xet.py
def decode_chunk_request(self, data: bytes) -> tuple[int, bytes]:
    """Decode chunk request message.

    Args:
        data: Encoded request message

    Returns:
        Tuple of (request_id, chunk_hash)

    Raises:
        ValueError: If message is invalid

    """
    if len(data) < 37:  # 1 byte type + 4 bytes request_id + 32 bytes hash
        msg = "Invalid Xet chunk request message"
        raise ValueError(msg)

    message_type, request_id = struct.unpack("!BI", data[:5])
    if message_type != XetMessageType.CHUNK_REQUEST:
        msg = "Invalid message type for chunk request"
        raise ValueError(msg)

    chunk_hash = data[5:37]
    if len(chunk_hash) != 32:
        msg = "Invalid chunk hash length"
        raise ValueError(msg)

    return request_id, chunk_hash

decode_chunk_response(data: bytes) -> tuple[int, bytes]

Decode chunk response message.

Parameters:

Name Type Description Default
data bytes

Encoded response message

required

Returns:

Type Description
tuple[int, bytes]

Tuple of (request_id, chunk_data)

Raises:

Type Description
ValueError

If message is invalid

Source code in ccbt/extensions/xet.py
def decode_chunk_response(self, data: bytes) -> tuple[int, bytes]:
    """Decode chunk response message.

    Args:
        data: Encoded response message

    Returns:
        Tuple of (request_id, chunk_data)

    Raises:
        ValueError: If message is invalid

    """
    if len(data) < 9:  # 1 byte type + 4 bytes request_id + 4 bytes size
        msg = "Invalid Xet chunk response message"
        raise ValueError(msg)

    message_type, request_id, chunk_size = struct.unpack("!BII", data[:9])
    if message_type != XetMessageType.CHUNK_RESPONSE:
        msg = "Invalid message type for chunk response"
        raise ValueError(msg)

    if len(data) < 9 + chunk_size:
        msg = "Incomplete chunk data in response"
        raise ValueError(msg)

    chunk_data = data[9 : 9 + chunk_size]
    return request_id, chunk_data

decode_handshake(data: dict[str, Any]) -> bool

Decode Xet extension handshake data.

Parameters:

Name Type Description Default
data dict[str, Any]

Extension handshake data dictionary

required

Returns:

Type Description
bool

True if peer supports Xet extension

Source code in ccbt/extensions/xet.py
def decode_handshake(self, data: dict[str, Any]) -> bool:
    """Decode Xet extension handshake data.

    Args:
        data: Extension handshake data dictionary

    Returns:
        True if peer supports Xet extension

    """
    xet_data = data.get("xet", {})
    if isinstance(xet_data, dict):
        return xet_data.get("supports_chunk_requests", False)
    return False

encode_chunk_error(request_id: int, error_code: int = 0) -> bytes

Encode chunk error message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
error_code int

Error code (0 = generic error)

0

Returns:

Type Description
bytes

Encoded error message

Source code in ccbt/extensions/xet.py
def encode_chunk_error(self, request_id: int, error_code: int = 0) -> bytes:
    """Encode chunk error message.

    Args:
        request_id: Request ID
        error_code: Error code (0 = generic error)

    Returns:
        Encoded error message

    """
    # Pack: <message_type><request_id><error_code>
    return struct.pack("!BII", XetMessageType.CHUNK_ERROR, request_id, error_code)

encode_chunk_not_found(request_id: int) -> bytes

Encode chunk not found message.

Parameters:

Name Type Description Default
request_id int

Request ID

required

Returns:

Type Description
bytes

Encoded not found message

Source code in ccbt/extensions/xet.py
def encode_chunk_not_found(self, request_id: int) -> bytes:
    """Encode chunk not found message.

    Args:
        request_id: Request ID

    Returns:
        Encoded not found message

    """
    # Pack: <message_type><request_id>
    return struct.pack("!BI", XetMessageType.CHUNK_NOT_FOUND, request_id)

encode_chunk_request(chunk_hash: bytes) -> bytes

Encode chunk request message.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required

Returns:

Type Description
bytes

Encoded request message

Source code in ccbt/extensions/xet.py
def encode_chunk_request(self, chunk_hash: bytes) -> bytes:
    """Encode chunk request message.

    Args:
        chunk_hash: 32-byte chunk hash

    Returns:
        Encoded request message

    """
    if len(chunk_hash) != 32:
        msg = f"Chunk hash must be 32 bytes, got {len(chunk_hash)}"
        raise ValueError(msg)

    self.request_counter += 1
    request_id = self.request_counter

    # Pack: <message_type><request_id><chunk_hash>
    return struct.pack("!BI", XetMessageType.CHUNK_REQUEST, request_id) + chunk_hash

encode_chunk_response(request_id: int, chunk_data: bytes) -> bytes

Encode chunk response message.

Parameters:

Name Type Description Default
request_id int

Request ID to respond to

required
chunk_data bytes

Chunk data bytes

required

Returns:

Type Description
bytes

Encoded response message

Source code in ccbt/extensions/xet.py
def encode_chunk_response(self, request_id: int, chunk_data: bytes) -> bytes:
    """Encode chunk response message.

    Args:
        request_id: Request ID to respond to
        chunk_data: Chunk data bytes

    Returns:
        Encoded response message

    """
    # Pack: <message_type><request_id><chunk_size><chunk_data>
    return (
        struct.pack(
            "!BII",
            XetMessageType.CHUNK_RESPONSE,
            request_id,
            len(chunk_data),
        )
        + chunk_data
    )

encode_handshake() -> dict[str, Any]

Encode Xet extension handshake data.

Returns:

Type Description
dict[str, Any]

Dictionary containing Xet extension capabilities

Source code in ccbt/extensions/xet.py
def encode_handshake(self) -> dict[str, Any]:
    """Encode Xet extension handshake data.

    Returns:
        Dictionary containing Xet extension capabilities

    """
    return {
        "xet": {
            "version": "1.0",
            "supports_chunk_requests": True,
            "supports_p2p_cas": True,
        }
    }

get_capabilities() -> dict[str, Any]

Get Xet extension capabilities.

Returns:

Type Description
dict[str, Any]

Capabilities dictionary

Source code in ccbt/extensions/xet.py
def get_capabilities(self) -> dict[str, Any]:
    """Get Xet extension capabilities.

    Returns:
        Capabilities dictionary

    """
    return {
        "supports_chunk_requests": True,
        "supports_p2p_cas": True,
        "version": "1.0",
        "pending_requests": len(self.pending_requests),
    }

handle_chunk_request(peer_id: str, request_id: int, chunk_hash: bytes) -> bytes async

Handle chunk request from peer.

Parameters:

Name Type Description Default
peer_id str

Peer identifier

required
request_id int

Request ID

required
chunk_hash bytes

32-byte chunk hash

required

Returns:

Type Description
bytes

Response message (chunk data, not found, or error)

Source code in ccbt/extensions/xet.py
async def handle_chunk_request(
    self, peer_id: str, request_id: int, chunk_hash: bytes
) -> bytes:
    """Handle chunk request from peer.

    Args:
        peer_id: Peer identifier
        request_id: Request ID
        chunk_hash: 32-byte chunk hash

    Returns:
        Response message (chunk data, not found, or error)

    """
    # Store request
    self.pending_requests[(peer_id, request_id)] = XetChunkRequest(
        chunk_hash=chunk_hash,
        request_id=request_id,
        timestamp=time.time(),
    )

    # Try to get chunk from provider
    if self.chunk_provider:
        try:
            chunk_data = self.chunk_provider(chunk_hash)
            if chunk_data is not None:
                # Emit event
                await emit_event(
                    Event(
                        event_type=EventType.XET_CHUNK_PROVIDED.value,
                        data={
                            "peer_id": peer_id,
                            "request_id": request_id,
                            "chunk_hash": chunk_hash.hex(),
                            "chunk_size": len(chunk_data),
                            "timestamp": time.time(),
                        },
                    ),
                )
                return self.encode_chunk_response(request_id, chunk_data)
        except Exception as e:
            logger.warning(
                "Error providing chunk %s: %s",
                chunk_hash.hex()[:16],
                e,
            )
            # Emit event
            await emit_event(
                Event(
                    event_type=EventType.XET_CHUNK_ERROR.value,
                    data={
                        "peer_id": peer_id,
                        "request_id": request_id,
                        "chunk_hash": chunk_hash.hex(),
                        "error": str(e),
                        "timestamp": time.time(),
                    },
                ),
            )
            return self.encode_chunk_error(request_id, 1)

    # Chunk not found
    await emit_event(
        Event(
            event_type=EventType.XET_CHUNK_NOT_FOUND.value,
            data={
                "peer_id": peer_id,
                "request_id": request_id,
                "chunk_hash": chunk_hash.hex(),
                "timestamp": time.time(),
            },
        ),
    )
    return self.encode_chunk_not_found(request_id)

handle_chunk_response(peer_id: str, request_id: int, chunk_data: bytes) -> None async

Handle chunk response from peer.

Parameters:

Name Type Description Default
peer_id str

Peer identifier

required
request_id int

Request ID

required
chunk_data bytes

Chunk data bytes

required
Source code in ccbt/extensions/xet.py
async def handle_chunk_response(
    self, peer_id: str, request_id: int, chunk_data: bytes
) -> None:
    """Handle chunk response from peer.

    Args:
        peer_id: Peer identifier
        request_id: Request ID
        chunk_data: Chunk data bytes

    """
    # Remove from pending requests
    key = (peer_id, request_id)
    if key in self.pending_requests:
        request = self.pending_requests.pop(key)
        # Emit event
        await emit_event(
            Event(
                event_type=EventType.XET_CHUNK_RECEIVED.value,
                data={
                    "peer_id": peer_id,
                    "request_id": request_id,
                    "chunk_hash": request.chunk_hash.hex(),
                    "chunk_size": len(chunk_data),
                    "timestamp": time.time(),
                },
            ),
        )

set_chunk_provider(provider: Callable[[bytes], bytes | None]) -> None

Set function to provide chunks by hash.

Parameters:

Name Type Description Default
provider Callable[[bytes], bytes | None]

Callable that takes chunk_hash (32 bytes) and returns chunk data bytes or None if not available

required
Source code in ccbt/extensions/xet.py
def set_chunk_provider(self, provider: Callable[[bytes], bytes | None]) -> None:
    """Set function to provide chunks by hash.

    Args:
        provider: Callable that takes chunk_hash (32 bytes) and returns
                 chunk data bytes or None if not available

    """
    self.chunk_provider = provider

Message Types:

```23:29:ccbt/extensions/xet.py class XetMessageType(IntEnum): """Xet Extension message types."""

CHUNK_REQUEST = 0x01  # Request chunk by hash
CHUNK_RESPONSE = 0x02  # Response with chunk data
CHUNK_NOT_FOUND = 0x03  # Chunk not available
CHUNK_ERROR = 0x04  # Error retrieving chunk

```

Key Methods: - encode_chunk_request(): ccbt/extensions/xet.py:89 - Encode chunk request message with request ID - decode_chunk_request(): ccbt/extensions/xet.py:108 - Decode chunk request message - encode_chunk_response(): ccbt/extensions/xet.py:136 - Encode chunk response with data - handle_chunk_request(): ccbt/extensions/xet.py:210 - Handle incoming chunk request from peer - handle_chunk_response(): ccbt/extensions/xet.py:284 - Handle chunk response from peer

Extension Handshake: - encode_handshake(): ccbt/extensions/xet.py:61 - Encode Xet extension capabilities - decode_handshake(): ccbt/extensions/xet.py:75 - Decode peer's Xet extension capabilities

2. Content-Defined Chunking (ccbt/storage/xet_chunking.py)

Gearhash CDC algorithm for intelligent file segmentation with variable-sized chunks based on content patterns.

Content-defined chunking using Gearhash algorithm.

The Gearhash algorithm uses a rolling hash with a precomputed gear table to find content-defined chunk boundaries. This ensures that similar content in different files will produce the same chunk boundaries, enabling cross-file deduplication.

Attributes:

Name Type Description
target_size

Target average chunk size (default: 16 KB)

gear_table

Precomputed 256-element gear table for rolling hash

Initialize chunker with target chunk size.

Parameters:

Name Type Description Default
target_size int

Target average chunk size in bytes (default: 16 KB) Must be between MIN_CHUNK_SIZE and MAX_CHUNK_SIZE

TARGET_CHUNK_SIZE
Source code in ccbt/storage/xet_chunking.py
def __init__(self, target_size: int = TARGET_CHUNK_SIZE):
    """Initialize chunker with target chunk size.

    Args:
        target_size: Target average chunk size in bytes (default: 16 KB)
                    Must be between MIN_CHUNK_SIZE and MAX_CHUNK_SIZE

    """
    if target_size < MIN_CHUNK_SIZE or target_size > MAX_CHUNK_SIZE:
        msg = f"Target size must be between {MIN_CHUNK_SIZE} and {MAX_CHUNK_SIZE} bytes"
        raise ValueError(msg)

    self.target_size = target_size
    self.gear_table = self._init_gear_table()

chunk_buffer(data: bytes) -> list[bytes]

Chunk data using Gearhash CDC.

This method processes the input data and finds content-defined chunk boundaries using the Gearhash rolling hash algorithm. Chunks will be between MIN_CHUNK_SIZE and MAX_CHUNK_SIZE bytes, with an average size close to target_size.

Parameters:

Name Type Description Default
data bytes

Input data to chunk

required

Returns:

Type Description
list[bytes]

List of chunks, each between MIN_CHUNK_SIZE and MAX_CHUNK_SIZE bytes

Source code in ccbt/storage/xet_chunking.py
def chunk_buffer(self, data: bytes) -> list[bytes]:
    """Chunk data using Gearhash CDC.

    This method processes the input data and finds content-defined chunk
    boundaries using the Gearhash rolling hash algorithm. Chunks will
    be between MIN_CHUNK_SIZE and MAX_CHUNK_SIZE bytes, with an average
    size close to target_size.

    Args:
        data: Input data to chunk

    Returns:
        List of chunks, each between MIN_CHUNK_SIZE and MAX_CHUNK_SIZE bytes

    """
    if len(data) == 0:
        return []  # pragma: no cover - Empty data edge case, tested in test_chunk_buffer_empty

    chunks = []
    data_len = len(data)
    pos = 0

    # Minimum mask: ensures chunks are at least MIN_CHUNK_SIZE
    # Maximum mask: ensures chunks are at most MAX_CHUNK_SIZE
    # Target mask: controls average chunk size
    min_mask = (1 << (32 - MIN_CHUNK_SIZE.bit_length())) - 1
    max_mask = (1 << (32 - MAX_CHUNK_SIZE.bit_length())) - 1
    target_mask = (1 << (32 - self.target_size.bit_length())) - 1

    while pos < data_len:
        # Calculate chunk end position
        chunk_end = self._find_chunk_boundary(
            data, pos, min_mask, max_mask, target_mask
        )

        # Extract chunk
        chunk = data[pos:chunk_end]
        chunks.append(chunk)

        pos = chunk_end

    return chunks

chunk_file(file_path: str, chunk_size_hint: int = 1024 * 1024) -> Iterator[bytes]

Chunk a file using Gearhash CDC.

This method reads a file in chunks and applies CDC chunking, yielding content-defined chunks as they are found.

Parameters:

Name Type Description Default
file_path str

Path to file to chunk

required
chunk_size_hint int

Hint for read buffer size (default: 1 MB)

1024 * 1024

Yields:

Type Description
bytes

Content-defined chunks (bytes)

Source code in ccbt/storage/xet_chunking.py
def chunk_file(
    self, file_path: str, chunk_size_hint: int = 1024 * 1024
) -> Iterator[
    bytes
]:  # pragma: no cover - File chunking wrapper, tested in test_chunk_file_with_custom_hint
    """Chunk a file using Gearhash CDC.

    This method reads a file in chunks and applies CDC chunking,
    yielding content-defined chunks as they are found.

    Args:
        file_path: Path to file to chunk
        chunk_size_hint: Hint for read buffer size (default: 1 MB)

    Yields:
        Content-defined chunks (bytes)

    """
    with open(file_path, "rb") as f:
        buffer = b""

        while True:
            # Read next chunk of file
            chunk = f.read(chunk_size_hint)
            if not chunk:
                break

            # Add to buffer
            buffer += chunk

            # Process buffer in chunks
            while len(buffer) >= MIN_CHUNK_SIZE:
                # Find boundary
                boundary = self._find_chunk_boundary(
                    buffer,
                    0,
                    (1 << (32 - MIN_CHUNK_SIZE.bit_length())) - 1,
                    (1 << (32 - MAX_CHUNK_SIZE.bit_length())) - 1,
                    (1 << (32 - self.target_size.bit_length())) - 1,
                )

                # Yield chunk
                chunk_data = buffer[:boundary]
                yield chunk_data

                # Remove processed data from buffer
                buffer = buffer[boundary:]

        # Process remaining buffer
        if buffer:
            yield buffer

chunk_stream(stream: Iterator[bytes]) -> Iterator[bytes]

Chunk a stream of data using Gearhash CDC.

Parameters:

Name Type Description Default
stream Iterator[bytes]

Iterator yielding bytes chunks

required

Yields:

Type Description
bytes

Content-defined chunks (bytes)

Source code in ccbt/storage/xet_chunking.py
def chunk_stream(self, stream: Iterator[bytes]) -> Iterator[bytes]:
    """Chunk a stream of data using Gearhash CDC.

    Args:
        stream: Iterator yielding bytes chunks

    Yields:
        Content-defined chunks (bytes)

    """
    buffer = b""

    for chunk in stream:
        buffer += chunk

        # Process buffer in chunks
        while (
            len(buffer) >= MIN_CHUNK_SIZE
        ):  # pragma: no cover - Stream chunking loop, tested via test_chunk_stream_with_remaining_buffer
            # Find boundary
            boundary = self._find_chunk_boundary(
                buffer,
                0,
                (1 << (32 - MIN_CHUNK_SIZE.bit_length())) - 1,
                (1 << (32 - MAX_CHUNK_SIZE.bit_length())) - 1,
                (1 << (32 - self.target_size.bit_length())) - 1,
            )  # pragma: no cover - Same context

            # Yield chunk
            chunk_data = buffer[:boundary]  # pragma: no cover - Same context
            yield chunk_data  # pragma: no cover - Same context

            # Remove processed data from buffer
            buffer = buffer[boundary:]  # pragma: no cover - Same context

    # Process remaining buffer
    if buffer:
        yield buffer  # pragma: no cover - Remaining buffer handling tested in test_chunk_stream_with_remaining_buffer

Constants: - MIN_CHUNK_SIZE: ccbt/storage/xet_chunking.py:21 - 8 KB minimum chunk size - MAX_CHUNK_SIZE: ccbt/storage/xet_chunking.py:22 - 128 KB maximum chunk size - TARGET_CHUNK_SIZE: ccbt/storage/xet_chunking.py:23 - 16 KB default target chunk size - WINDOW_SIZE: ccbt/storage/xet_chunking.py:24 - 48 bytes rolling hash window

Key Methods: - chunk_buffer(): ccbt/storage/xet_chunking.py:210 - Chunk data using Gearhash CDC algorithm - _find_chunk_boundary(): ccbt/storage/xet_chunking.py:242 - Find content-defined chunk boundary using rolling hash - _init_gear_table(): ccbt/storage/xet_chunking.py:54 - Initialize precomputed gear table for rolling hash

Algorithm: The Gearhash algorithm uses a rolling hash with a precomputed 256-element gear table to find content-defined boundaries. This ensures similar content in different files produces the same chunk boundaries, enabling cross-file deduplication.

3. Deduplication Cache (ccbt/storage/xet_deduplication.py)

SQLite-based local deduplication cache with DHT integration for chunk-level deduplication.

Chunk-level deduplication manager.

Manages local deduplication cache using SQLite and provides integration with DHT for global chunk discovery.

Attributes:

Name Type Description
cache_path

Path to SQLite cache database

chunk_store_path

Directory where chunks are physically stored

db

SQLite database connection

dht_client

Optional DHT client for global chunk discovery

Initialize deduplication with local cache.

Parameters:

Name Type Description Default
cache_db_path Path | str

Path to SQLite cache database file

required
dht_client Any | None

Optional DHT client instance for global chunk discovery

None
Source code in ccbt/storage/xet_deduplication.py
def __init__(
    self,
    cache_db_path: Path | str,
    dht_client: Any | None = None,  # type: ignore[assignment]
):
    """Initialize deduplication with local cache.

    Args:
        cache_db_path: Path to SQLite cache database file
        dht_client: Optional DHT client instance for global chunk discovery

    """
    self.cache_path = Path(cache_db_path)
    self.chunk_store_path = self.cache_path.parent / "xet_chunks"
    self.chunk_store_path.mkdir(parents=True, exist_ok=True)

    self.db = self._init_database()
    self.dht_client = dht_client
    self.logger = logging.getLogger(__name__)

check_chunk_exists(chunk_hash: bytes) -> Path | None async

Check if chunk exists locally.

Queries the database for the chunk hash and updates the last_accessed timestamp if found.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required

Returns:

Type Description
Path | None

Path to stored chunk if exists, None otherwise

Source code in ccbt/storage/xet_deduplication.py
async def check_chunk_exists(self, chunk_hash: bytes) -> Path | None:
    """Check if chunk exists locally.

    Queries the database for the chunk hash and updates the
    last_accessed timestamp if found.

    Args:
        chunk_hash: 32-byte chunk hash

    Returns:
        Path to stored chunk if exists, None otherwise

    """
    cursor = self.db.execute(
        "SELECT storage_path FROM chunks WHERE hash = ?",
        (chunk_hash,),
    )
    row = cursor.fetchone()
    if row:
        # Update last accessed timestamp
        self.db.execute(
            "UPDATE chunks SET last_accessed = ? WHERE hash = ?",
            (time.time(), chunk_hash),
        )
        self.db.commit()
        return Path(row[0])
    return None

cleanup_unused_chunks(max_age_seconds: int = 30 * 24 * 60 * 60) -> int async

Remove chunks that haven't been accessed recently.

Parameters:

Name Type Description Default
max_age_seconds int

Maximum age in seconds before chunk is considered unused

30 * 24 * 60 * 60

Returns:

Type Description
int

Number of chunks removed

Source code in ccbt/storage/xet_deduplication.py
async def cleanup_unused_chunks(
    self,
    max_age_seconds: int = 30 * 24 * 60 * 60,  # 30 days
) -> int:
    """Remove chunks that haven't been accessed recently.

    Args:
        max_age_seconds: Maximum age in seconds before chunk is considered unused

    Returns:
        Number of chunks removed

    """
    cutoff_time = time.time() - max_age_seconds

    cursor = self.db.execute(
        """SELECT hash, storage_path FROM chunks
           WHERE last_accessed < ? AND ref_count <= 1""",
        (cutoff_time,),
    )

    removed_count = 0
    for row in cursor.fetchall():
        chunk_hash = row[0]
        storage_path = row[1]

        try:
            Path(storage_path).unlink()
            self.db.execute("DELETE FROM chunks WHERE hash = ?", (chunk_hash,))
            removed_count += 1
        except OSError as e:
            self.logger.warning(
                "Failed to remove unused chunk %s: %s",
                storage_path,
                e,
            )

    self.db.commit()
    self.logger.info("Cleaned up %d unused chunks", removed_count)
    return removed_count

close() -> None

Close database connection.

Source code in ccbt/storage/xet_deduplication.py
def close(self) -> None:
    """Close database connection."""
    if self.db:
        self.db.close()

get_cache_stats() -> dict

Get statistics about the deduplication cache.

Returns:

Type Description
dict

Dictionary with cache statistics

Source code in ccbt/storage/xet_deduplication.py
def get_cache_stats(self) -> dict:
    """Get statistics about the deduplication cache.

    Returns:
        Dictionary with cache statistics

    """
    cursor = self.db.execute(
        """SELECT
            COUNT(*) as total_chunks,
            SUM(size) as total_size,
            SUM(ref_count) as total_refs,
            AVG(size) as avg_size
           FROM chunks"""
    )
    row = cursor.fetchone()

    return {
        "total_chunks": row[0] or 0,
        "total_size": row[1] or 0,
        "total_refs": row[2] or 0,
        "avg_size": row[3] or 0,
    }

get_chunk_info(chunk_hash: bytes) -> dict | None

Get information about a stored chunk.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required

Returns:

Type Description
dict | None

Dictionary with chunk information or None if not found

Source code in ccbt/storage/xet_deduplication.py
def get_chunk_info(self, chunk_hash: bytes) -> dict | None:
    """Get information about a stored chunk.

    Args:
        chunk_hash: 32-byte chunk hash

    Returns:
        Dictionary with chunk information or None if not found

    """
    cursor = self.db.execute(
        """SELECT hash, size, storage_path, ref_count, created_at, last_accessed
           FROM chunks WHERE hash = ?""",
        (chunk_hash,),
    )
    row = cursor.fetchone()
    if row:
        return {
            "hash": row[0],
            "size": row[1],
            "storage_path": row[2],
            "ref_count": row[3],
            "created_at": row[4],
            "last_accessed": row[5],
        }
    return None

query_dht_for_chunk(chunk_hash: bytes) -> PeerInfo | None async

Query DHT for peers that have this chunk.

Uses existing DHT infrastructure to find peers that have the specified chunk. This enables global deduplication across the peer network.

The method: 1. Converts 32-byte chunk hash to 20-byte DHT key (using SHA-1) 2. Queries DHT using BEP 44 get_data() method 3. Parses returned value to extract peer information 4. Returns PeerInfo if found

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required

Returns:

Type Description
PeerInfo | None

PeerInfo if found, None otherwise

Source code in ccbt/storage/xet_deduplication.py
async def query_dht_for_chunk(self, chunk_hash: bytes) -> PeerInfo | None:
    """Query DHT for peers that have this chunk.

    Uses existing DHT infrastructure to find peers that have
    the specified chunk. This enables global deduplication
    across the peer network.

    The method:
    1. Converts 32-byte chunk hash to 20-byte DHT key (using SHA-1)
    2. Queries DHT using BEP 44 get_data() method
    3. Parses returned value to extract peer information
    4. Returns PeerInfo if found

    Args:
        chunk_hash: 32-byte chunk hash

    Returns:
        PeerInfo if found, None otherwise

    """
    if len(chunk_hash) != 32:
        self.logger.warning(
            "Invalid chunk hash length: expected 32 bytes, got %d",
            len(chunk_hash),
        )
        return None

    if not self.dht_client:
        self.logger.debug("DHT client not available for chunk query")
        return None

    try:
        # Convert 32-byte chunk hash to 20-byte DHT key
        # Use SHA-1 of the chunk hash to ensure proper DHT distribution
        dht_key = hashlib.sha1(chunk_hash, usedforsecurity=False).digest()

        self.logger.debug(
            "Querying DHT for chunk %s (DHT key: %s)",
            chunk_hash.hex()[:16],
            dht_key.hex()[:16],
        )

        # Try get_data() first (BEP 44)
        if hasattr(self.dht_client, "get_data"):
            try:
                value = await self.dht_client.get_data(dht_key)

                if value:
                    # Value is a bencoded dictionary, decode it
                    peer_info = self._extract_peer_from_dht_value(value)
                    if peer_info:
                        self.logger.debug(
                            "Found peer %s for chunk %s via DHT get_data",
                            peer_info,
                            chunk_hash.hex()[:16],
                        )
                        return peer_info
            except Exception as e:
                self.logger.debug(
                    "DHT get_data failed for chunk %s: %s",
                    chunk_hash.hex()[:16],
                    e,
                )

        # Try get_peers() as fallback (uses info_hash lookup)
        # Note: This treats the chunk hash as an info_hash
        if hasattr(self.dht_client, "get_peers"):
            try:
                # Use first 20 bytes of chunk hash as info_hash
                info_hash = chunk_hash[:20]
                peers = await self.dht_client.get_peers(info_hash, max_peers=1)

                # Extract first peer
                if (
                    peers
                    and isinstance(peers[0], (list, tuple))
                    and len(peers[0]) >= 2
                ):
                    ip, port = peers[0][0], peers[0][1]
                    peer_info = PeerInfo(ip=str(ip), port=int(port))
                    self.logger.debug(
                        "Found peer %s for chunk %s via DHT get_peers",
                        peer_info,
                        chunk_hash.hex()[:16],
                    )
                    return peer_info
            except Exception as e:  # pragma: no cover - DHT get_peers exception handling, defensive error path
                self.logger.debug(
                    "DHT get_peers failed for chunk %s: %s",
                    chunk_hash.hex()[:16],
                    e,
                )  # pragma: no cover - Same context

        self.logger.debug(
            "No peers found in DHT for chunk %s",
            chunk_hash.hex()[:16],
        )
        return None

    except (
        Exception
    ) as e:  # pragma: no cover - DHT query exception handling, defensive error path
        self.logger.warning(
            "Error querying DHT for chunk %s: %s",
            chunk_hash.hex()[:16],
            e,
        )  # pragma: no cover - Same context
        return None  # pragma: no cover - Same context

remove_chunk_reference(chunk_hash: bytes) -> bool

Remove a reference to a chunk.

Decrements the reference count. If ref_count reaches zero, the chunk file is deleted.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required

Returns:

Type Description
bool

True if chunk was removed, False otherwise

Source code in ccbt/storage/xet_deduplication.py
def remove_chunk_reference(self, chunk_hash: bytes) -> bool:
    """Remove a reference to a chunk.

    Decrements the reference count. If ref_count reaches zero,
    the chunk file is deleted.

    Args:
        chunk_hash: 32-byte chunk hash

    Returns:
        True if chunk was removed, False otherwise

    """
    # Get current ref count
    cursor = self.db.execute(
        "SELECT ref_count, storage_path FROM chunks WHERE hash = ?",
        (chunk_hash,),
    )
    row = cursor.fetchone()
    if not row:
        return False

    ref_count = row[0]
    storage_path = row[1]

    if ref_count <= 1:
        # Remove chunk file and database entry
        try:
            Path(storage_path).unlink()
        except OSError as e:
            self.logger.warning(
                "Failed to remove chunk file %s: %s",
                storage_path,
                e,
            )

        self.db.execute("DELETE FROM chunks WHERE hash = ?", (chunk_hash,))
        self.db.commit()
        return True
    # Decrement ref count
    self.db.execute(
        "UPDATE chunks SET ref_count = ref_count - 1 WHERE hash = ?",
        (chunk_hash,),
    )
    self.db.commit()
    return False

store_chunk(chunk_hash: bytes, chunk_data: bytes) -> Path async

Store chunk with deduplication.

Checks if chunk already exists. If it does, increments reference count. Otherwise, stores the chunk physically and creates a database entry.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required
chunk_data bytes

Chunk data to store

required

Returns:

Type Description
Path

Path to stored chunk (may be existing or new)

Source code in ccbt/storage/xet_deduplication.py
async def store_chunk(
    self,
    chunk_hash: bytes,
    chunk_data: bytes,
) -> Path:
    """Store chunk with deduplication.

    Checks if chunk already exists. If it does, increments
    reference count. Otherwise, stores the chunk physically
    and creates a database entry.

    Args:
        chunk_hash: 32-byte chunk hash
        chunk_data: Chunk data to store

    Returns:
        Path to stored chunk (may be existing or new)

    """
    # Check if already exists
    existing = await self.check_chunk_exists(chunk_hash)
    if existing:
        # Increment reference count
        self.db.execute(
            "UPDATE chunks SET ref_count = ref_count + 1 WHERE hash = ?",
            (chunk_hash,),
        )
        self.db.commit()
        self.logger.debug(
            "Chunk %s already exists, incremented ref count",
            chunk_hash.hex()[:16],
        )
        return existing

    # Store new chunk
    storage_file = self.chunk_store_path / chunk_hash.hex()
    storage_file.write_bytes(chunk_data)

    # Update database
    current_time = time.time()
    self.db.execute(
        """INSERT INTO chunks (hash, size, storage_path, created_at, last_accessed)
           VALUES (?, ?, ?, ?, ?)""",
        (
            chunk_hash,
            len(chunk_data),
            str(storage_file),
            current_time,
            current_time,
        ),
    )
    self.db.commit()

    self.logger.debug(
        "Stored new chunk %s (%d bytes)",
        chunk_hash.hex()[:16],
        len(chunk_data),
    )

    return storage_file

Database Schema: - chunks table: ccbt/storage/xet_deduplication.py:65 - Stores chunk hash, size, storage path, reference count, timestamps - Indexes: ccbt/storage/xet_deduplication.py:75 - On size and last_accessed for efficient queries

Key Methods: - check_chunk_exists(): ccbt/storage/xet_deduplication.py:85 - Check if chunk exists locally and update access time - store_chunk(): ccbt/storage/xet_deduplication.py:112 - Store chunk with deduplication (increments ref_count if exists) - get_chunk_path(): ccbt/storage/xet_deduplication.py:165 - Get local storage path for chunk - cleanup_unused_chunks(): ccbt/storage/xet_deduplication.py:201 - Remove chunks not accessed within max_age_days

Features: - Reference counting: Tracks how many torrents/files reference each chunk - Automatic cleanup: Removes unused chunks based on access time - Physical storage: Chunks stored in xet_chunks/ directory with hash as filename

4. Peer-to-Peer CAS (ccbt/discovery/xet_cas.py)

DHT and tracker-based chunk discovery and exchange for decentralized Content Addressable Storage.

Peer-to-peer Content Addressable Storage client.

Uses DHT and trackers for chunk discovery instead of HuggingFace CAS. This enables distributed chunk storage and retrieval without external dependencies.

Attributes:

Name Type Description
dht

DHT client instance

tracker

Optional tracker client instance

local_chunks dict[bytes, str]

Dictionary mapping chunk hash to local storage path

Initialize P2P CAS with DHT and tracker clients.

Parameters:

Name Type Description Default
dht_client Any | None

DHT client instance (will be obtained from session if None)

None
tracker_client Any | None

Optional tracker client instance

None
key_manager Any

Optional Ed25519KeyManager for signing chunks

None
Source code in ccbt/discovery/xet_cas.py
def __init__(
    self,
    dht_client: Any | None = None,  # type: ignore[assignment]
    tracker_client: Any | None = None,  # type: ignore[assignment]
    key_manager: Any = None,  # Ed25519KeyManager
):
    """Initialize P2P CAS with DHT and tracker clients.

    Args:
        dht_client: DHT client instance (will be obtained from session if None)
        tracker_client: Optional tracker client instance
        key_manager: Optional Ed25519KeyManager for signing chunks

    """
    self.dht = dht_client
    self.tracker = tracker_client
    self.key_manager = key_manager
    self.local_chunks: dict[bytes, str] = {}  # hash -> local path
    self.logger = logging.getLogger(__name__)

announce_chunk(chunk_hash: bytes) -> None async

Announce chunk availability to DHT/trackers.

Stores chunk metadata in DHT (BEP 44) and announces to tracker if configured. Other peers can discover this chunk via hash lookup.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required
Source code in ccbt/discovery/xet_cas.py
async def announce_chunk(self, chunk_hash: bytes) -> None:
    """Announce chunk availability to DHT/trackers.

    Stores chunk metadata in DHT (BEP 44) and announces to tracker
    if configured. Other peers can discover this chunk via hash lookup.

    Args:
        chunk_hash: 32-byte chunk hash

    """
    if len(chunk_hash) != 32:
        msg = f"Chunk hash must be 32 bytes, got {len(chunk_hash)}"
        raise ValueError(msg)

    # Store chunk hash in DHT (BEP 44: storing arbitrary data)
    if self.dht:
        try:
            # Store chunk metadata in DHT
            # Format: {"type": "xet_chunk", "peer_id": ..., "available": True}
            metadata = {
                "type": "xet_chunk",
                "available": True,
            }

            # Sign chunk metadata with Ed25519 if key_manager available
            if self.key_manager:
                try:
                    # Create metadata bytes for signing
                    import json

                    metadata_bytes = json.dumps(metadata, sort_keys=True).encode()
                    signature = self.key_manager.sign_message(metadata_bytes)
                    public_key = self.key_manager.get_public_key_bytes()

                    metadata["ed25519_public_key"] = public_key.hex()
                    metadata["ed25519_signature"] = signature.hex()
                    self.logger.debug(
                        "Signed chunk announcement with Ed25519: %s",
                        chunk_hash.hex()[:16],
                    )
                except Exception as e:
                    self.logger.warning("Failed to sign chunk announcement: %s", e)

            # Use DHT store method if available
            if hasattr(self.dht, "store"):
                await self.dht.store(chunk_hash, metadata)
            elif hasattr(
                self.dht, "store_chunk_hash"
            ):  # pragma: no cover - Alternative DHT storage method path
                await self.dht.store_chunk_hash(
                    chunk_hash, metadata
                )  # pragma: no cover - Same context
            else:
                self.logger.warning(
                    "DHT client does not support chunk storage",
                )  # pragma: no cover - DHT client without storage methods, defensive warning path

            self.logger.debug(
                "Announced chunk %s to DHT",
                chunk_hash.hex()[:16],
            )
        except Exception as e:  # pragma: no cover - DHT announcement exception handling, defensive error path
            self.logger.warning(
                "Failed to announce chunk to DHT: %s",
                e,
            )  # pragma: no cover - Same context

    # Also announce to tracker if configured
    if self.tracker:
        try:
            if hasattr(self.tracker, "announce_chunk"):
                await self.tracker.announce_chunk(chunk_hash)
            self.logger.debug(
                "Announced chunk %s to tracker",
                chunk_hash.hex()[:16],
            )
        except Exception as e:
            self.logger.warning(
                "Failed to announce chunk to tracker: %s",
                e,
            )

download_chunk(chunk_hash: bytes, peer: PeerInfo, torrent_data: dict[str, Any] | None = None, connection_manager: Any | None = None) -> bytes async

Download chunk from peer using BitTorrent protocol extension.

Uses BEP 10 extension protocol with Xet extension for chunk requests.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required
peer PeerInfo

Peer that has the chunk

required
torrent_data dict[str, Any] | None

Torrent data for connection (required)

None
connection_manager Any | None

AsyncPeerConnectionManager instance (optional)

None

Returns:

Type Description
bytes

Chunk data bytes

Raises:

Type Description
ValueError

If download fails

NotImplementedError

If extension protocol not available

Source code in ccbt/discovery/xet_cas.py
async def download_chunk(
    self,
    chunk_hash: bytes,
    peer: PeerInfo,
    torrent_data: dict[str, Any] | None = None,
    connection_manager: Any | None = None,  # type: ignore[assignment]
) -> bytes:
    """Download chunk from peer using BitTorrent protocol extension.

    Uses BEP 10 extension protocol with Xet extension for chunk requests.

    Args:
        chunk_hash: 32-byte chunk hash
        peer: Peer that has the chunk
        torrent_data: Torrent data for connection (required)
        connection_manager: AsyncPeerConnectionManager instance (optional)

    Returns:
        Chunk data bytes

    Raises:
        ValueError: If download fails
        NotImplementedError: If extension protocol not available

    """
    if len(chunk_hash) != 32:
        msg = f"Chunk hash must be 32 bytes, got {len(chunk_hash)}"
        raise ValueError(msg)

    if not torrent_data:
        msg = "torrent_data is required for chunk download"
        raise ValueError(msg)

    # Get extension manager and Xet extension
    from ccbt.extensions.manager import get_extension_manager

    extension_manager = get_extension_manager()
    extension_protocol = extension_manager.get_extension("protocol")
    xet_ext = extension_manager.get_extension("xet")

    if not extension_protocol:  # pragma: no cover - Extension protocol unavailable path, tested in integration tests
        msg = "Extension protocol not available"
        raise NotImplementedError(msg)  # pragma: no cover - Same context

    if not xet_ext:  # pragma: no cover - Xet extension not registered path, tested in integration tests
        msg = "Xet extension not registered"
        raise NotImplementedError(msg)  # pragma: no cover - Same context

    # Get or create connection to peer
    connection = None
    connection_created = False  # Track if we created the connection
    try:
        # Try to get existing connection from connection manager first
        if connection_manager:
            peer_key = str(peer)
            async with connection_manager.connection_lock:
                connection = connection_manager.connections.get(peer_key)

        # If no connection, establish one with handshake
        if not connection:  # pragma: no cover - New connection establishment path, tested in integration tests
            self.logger.debug(
                "No existing connection to peer %s, establishing new connection",
                peer,
            )  # pragma: no cover - Same context
            connection = await self._establish_peer_connection(
                peer, torrent_data
            )  # pragma: no cover - Same context
            connection_created = True  # pragma: no cover - Same context

        # Check if peer supports Xet extension
        # Note: For newly created connections, extension handshake may not have
        # completed yet. We'll try the request anyway and handle errors.
        peer_id = str(peer)
        peer_supports_xet = extension_protocol.peer_supports_extension(
            peer_id, "xet"
        )

        if not peer_supports_xet:
            # For new connections, extension handshake may not be complete
            # Try sending request anyway - if peer doesn't support it, we'll get an error
            if (
                connection_created
            ):  # pragma: no cover - New connection extension handshake timing path
                self.logger.debug(
                    "Extension handshake may not be complete for new connection to %s, "
                    "attempting chunk request anyway",
                    peer,
                )  # pragma: no cover - Same context
            else:
                msg = f"Peer {peer} does not support Xet extension"
                raise ValueError(msg)

        # Get Xet extension message ID
        xet_ext_info = extension_protocol.get_extension_info("xet")
        if (
            not xet_ext_info
        ):  # pragma: no cover - Extension info validation, defensive check
            msg = "Xet extension not registered in protocol"
            raise ValueError(msg)  # pragma: no cover - Same context

        xet_message_id = xet_ext_info.message_id

        # Encode chunk request
        request_payload = xet_ext.encode_chunk_request(chunk_hash)

        # Send extension message
        if (
            not connection or not connection.writer
        ):  # pragma: no cover - Connection state validation, defensive check
            msg = f"Connection to peer {peer} not available"
            raise ValueError(msg)  # pragma: no cover - Same context

        # Encode as BitTorrent extension message (message ID 20)
        # Note: encode_extension_message is called but result not used directly
        # as we send the message through the connection
        extension_protocol.encode_extension_message(xet_message_id, request_payload)

        # Send message: <length><message_id_20><extension_id><payload>
        # ExtensionProtocol.encode_extension_message already includes length + message_id
        # But we need to send it as BitTorrent message type 20
        from ccbt.protocols.bittorrent_v2 import _send_extension_message

        sent = await _send_extension_message(
            connection, xet_message_id, request_payload
        )

        if (
            not sent
        ):  # pragma: no cover - Message send failure path, defensive error handling
            msg = f"Failed to send chunk request to peer {peer}"
            raise ValueError(msg)  # pragma: no cover - Same context

        self.logger.debug(
            "Sent chunk request for %s to peer %s",
            chunk_hash.hex()[:16],
            peer,
        )

        # Receive response
        from ccbt.protocols.bittorrent_v2 import _receive_extension_message

        response = await _receive_extension_message(connection, timeout=30.0)

        if not response:  # pragma: no cover - No response timeout path, tested in integration tests
            msg = f"No response from peer {peer} for chunk request"
            raise ValueError(msg)  # pragma: no cover - Same context

        extension_id, response_payload = response

        # Verify it's from Xet extension
        if (
            extension_id != xet_message_id
        ):  # pragma: no cover - Protocol validation error path, defensive check
            msg = (
                f"Unexpected extension ID in response: "
                f"expected {xet_message_id}, got {extension_id}"
            )
            raise ValueError(msg)  # pragma: no cover - Same context

        # Decode response
        if (
            len(response_payload) < 1
        ):  # pragma: no cover - Invalid payload validation, defensive check
            msg = "Invalid response payload"
            raise ValueError(msg)  # pragma: no cover - Same context

        message_type = response_payload[0]

        if message_type == 0x02:  # CHUNK_RESPONSE
            _request_id, chunk_data = xet_ext.decode_chunk_response(
                response_payload
            )

            # Verify chunk hash
            from ccbt.storage.xet_hashing import XetHasher

            hasher = XetHasher()
            computed_hash = hasher.compute_chunk_hash(chunk_data)

            if computed_hash != chunk_hash:
                msg = (
                    f"Chunk hash mismatch: expected {chunk_hash.hex()[:16]}, "
                    f"got {computed_hash.hex()[:16]}"
                )
                raise ValueError(msg)

            self.logger.debug(
                "Successfully downloaded chunk %s (%d bytes) from peer %s",
                chunk_hash.hex()[:16],
                len(chunk_data),
                peer,
            )

            return chunk_data

        if message_type == 0x03:  # CHUNK_NOT_FOUND
            msg = f"Chunk {chunk_hash.hex()[:16]} not found on peer {peer}"
            raise ValueError(msg)

        if message_type == 0x04:  # CHUNK_ERROR
            msg = f"Error retrieving chunk {chunk_hash.hex()[:16]} from peer {peer}"
            raise ValueError(msg)

        # pragma: no cover - Unknown message type error path, defensive protocol validation
        msg = f"Unknown response message type: {message_type:02x}"  # pragma: no cover - Same context
        raise ValueError(msg)  # pragma: no cover - Same context

    except Exception as e:
        self.logger.exception(
            "Failed to download chunk %s from peer %s",
            chunk_hash.hex()[:16],
            peer,
        )
        error_msg = f"Failed to download chunk: {e}"
        raise ValueError(error_msg) from e

    finally:
        # Clean up connection if we created it
        if (
            connection_created and connection
        ):  # pragma: no cover - Connection cleanup in finally block, defensive cleanup
            try:  # pragma: no cover - Same context
                if connection.writer:  # pragma: no cover - Same context
                    connection.writer.close()  # pragma: no cover - Same context
                    await (
                        connection.writer.wait_closed()
                    )  # pragma: no cover - Same context
                self.logger.debug(
                    "Closed temporary connection to peer %s",
                    peer,
                )  # pragma: no cover - Same context
            except Exception as cleanup_error:  # pragma: no cover - Connection cleanup exception handling, defensive error path
                self.logger.debug(
                    "Error closing connection to peer %s: %s",
                    peer,
                    cleanup_error,
                )  # pragma: no cover - Same context

find_chunk_peers(chunk_hash: bytes) -> list[PeerInfo] async

Find peers that have a specific chunk.

Queries DHT and tracker (if configured) to find peers that can provide the requested chunk.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required

Returns:

Type Description
list[PeerInfo]

List of peers that can provide this chunk

Source code in ccbt/discovery/xet_cas.py
async def find_chunk_peers(self, chunk_hash: bytes) -> list[PeerInfo]:
    """Find peers that have a specific chunk.

    Queries DHT and tracker (if configured) to find peers that can
    provide the requested chunk.

    Args:
        chunk_hash: 32-byte chunk hash

    Returns:
        List of peers that can provide this chunk

    """
    if len(chunk_hash) != 32:
        msg = f"Chunk hash must be 32 bytes, got {len(chunk_hash)}"
        raise ValueError(msg)

    peers = []

    # Query DHT for chunk
    if self.dht:
        try:
            dht_results = []

            # Try different DHT methods
            if hasattr(self.dht, "get_chunk_peers"):
                dht_results = await self.dht.get_chunk_peers(chunk_hash)
            elif hasattr(
                self.dht, "get_peers"
            ):  # pragma: no cover - Standard DHT get_peers method path (less common than get_chunk_peers)
                # Standard DHT get_peers method
                dht_results = await self.dht.get_peers(
                    chunk_hash
                )  # pragma: no cover - Same context
            elif hasattr(
                self.dht, "find_value"
            ):  # pragma: no cover - Alternative DHT method path, less common
                # BEP 44 find_value method
                value = await self.dht.find_value(
                    chunk_hash
                )  # pragma: no cover - Same context
                if value:  # pragma: no cover - Same context
                    # Extract peer info from value
                    peer_info = self._extract_peer_from_dht_value(
                        value
                    )  # pragma: no cover - Same context
                    if peer_info:  # pragma: no cover - Same context
                        dht_results = [peer_info]  # pragma: no cover - Same context

            # Extract peer info from DHT results
            for result in dht_results:
                if isinstance(result, PeerInfo):
                    peers.append(result)
                else:  # pragma: no cover - DHT result format conversion path
                    peer_info = self._extract_peer_from_dht(
                        result
                    )  # pragma: no cover - Same context
                    if peer_info:  # pragma: no cover - Same context
                        peers.append(peer_info)  # pragma: no cover - Same context

            self.logger.debug(
                "Found %d peers for chunk %s via DHT",
                len(peers),
                chunk_hash.hex()[:16],
            )
        except Exception as e:
            self.logger.warning(
                "Failed to query DHT for chunk: %s",
                e,
            )

    # Query tracker if available
    if self.tracker:
        try:
            if hasattr(self.tracker, "get_chunk_peers"):
                tracker_peers = await self.tracker.get_chunk_peers(chunk_hash)
                peers.extend(tracker_peers)

            self.logger.debug(
                "Found %d peers for chunk %s via tracker",
                len(peers),
                chunk_hash.hex()[:16],
            )
        except Exception as e:
            self.logger.warning(
                "Failed to query tracker for chunk: %s",
                e,
            )

    # Remove duplicates
    return self._deduplicate_peers(peers)

get_local_chunk_path(chunk_hash: bytes) -> str | None

Get local path for a chunk if available.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required

Returns:

Type Description
str | None

Local path if available, None otherwise

Source code in ccbt/discovery/xet_cas.py
def get_local_chunk_path(self, chunk_hash: bytes) -> str | None:
    """Get local path for a chunk if available.

    Args:
        chunk_hash: 32-byte chunk hash

    Returns:
        Local path if available, None otherwise

    """
    return self.local_chunks.get(chunk_hash)

register_local_chunk(chunk_hash: bytes, local_path: str) -> None

Register a locally stored chunk.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required
local_path str

Path to local chunk file

required
Source code in ccbt/discovery/xet_cas.py
def register_local_chunk(self, chunk_hash: bytes, local_path: str) -> None:
    """Register a locally stored chunk.

    Args:
        chunk_hash: 32-byte chunk hash
        local_path: Path to local chunk file

    """
    if len(chunk_hash) != 32:
        msg = f"Chunk hash must be 32 bytes, got {len(chunk_hash)}"
        raise ValueError(msg)

    self.local_chunks[chunk_hash] = local_path
    self.logger.debug(
        "Registered local chunk %s at %s",
        chunk_hash.hex()[:16],
        local_path,
    )

Key Methods: - announce_chunk(): ccbt/discovery/xet_cas.py:50 - Announce chunk availability to DHT (BEP 44) and trackers - find_chunk_peers(): ccbt/discovery/xet_cas.py:112 - Find peers that have a specific chunk via DHT and tracker queries - request_chunk_from_peer(): ccbt/discovery/xet_cas.py:200 - Request chunk from a specific peer using Xet extension protocol

DHT Integration: - Uses BEP 44 (Distributed Hash Table for Mutable Items) to store chunk metadata - Chunk metadata format: ccbt/discovery/xet_cas.py:68 - {"type": "xet_chunk", "available": True} - Supports multiple DHT methods: store(), store_chunk_hash(), get_chunk_peers(), get_peers(), find_value()

Tracker Integration: - Announces chunks to trackers using first 20 bytes of chunk hash as info_hash - Enables tracker-based peer discovery for chunks

Storage Formats

Xorb Format

Xorbs group multiple chunks for efficient storage and retrieval.

Xorb (XOR of blocks) format handler.

Groups multiple chunks into a single xorb for efficient storage. Each xorb can contain multiple chunks up to MAX_XORB_SIZE.

Attributes:

Name Type Description
chunks list[tuple[bytes, bytes]]

List of (hash, data) tuples

total_size

Total size of all chunks in bytes

Initialize empty xorb.

Source code in ccbt/storage/xet_xorb.py
def __init__(self):
    """Initialize empty xorb."""
    self.chunks: list[tuple[bytes, bytes]] = []  # (hash, data) pairs
    self.total_size = 0

add_chunk(chunk_hash: bytes, chunk_data: bytes) -> bool

Add chunk to xorb.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required
chunk_data bytes

Chunk data bytes

required

Returns:

Type Description
bool

True if added, False if xorb would exceed MAX_XORB_SIZE

Source code in ccbt/storage/xet_xorb.py
def add_chunk(self, chunk_hash: bytes, chunk_data: bytes) -> bool:
    """Add chunk to xorb.

    Args:
        chunk_hash: 32-byte chunk hash
        chunk_data: Chunk data bytes

    Returns:
        True if added, False if xorb would exceed MAX_XORB_SIZE

    """
    chunk_size = len(chunk_data)
    if self.total_size + chunk_size > MAX_XORB_SIZE:
        return False

    if len(chunk_hash) != 32:
        msg = f"Chunk hash must be 32 bytes, got {len(chunk_hash)}"
        raise ValueError(msg)

    self.chunks.append((chunk_hash, chunk_data))
    self.total_size += chunk_size
    return True

clear() -> None

Clear all chunks from xorb.

Source code in ccbt/storage/xet_xorb.py
def clear(self) -> None:
    """Clear all chunks from xorb."""
    self.chunks.clear()
    self.total_size = 0

deserialize(data: bytes) -> Xorb staticmethod

Deserialize xorb from binary format.

Parameters:

Name Type Description Default
data bytes

Serialized xorb data

required

Returns:

Type Description
Xorb

Xorb instance

Raises:

Type Description
ValueError

If data is invalid or format is incorrect

Source code in ccbt/storage/xet_xorb.py
@staticmethod
def deserialize(data: bytes) -> Xorb:
    """Deserialize xorb from binary format.

    Args:
        data: Serialized xorb data

    Returns:
        Xorb instance

    Raises:
        ValueError: If data is invalid or format is incorrect

    """
    if len(data) < XORB_HEADER_SIZE:
        msg = f"Xorb data too short: {len(data)} bytes (minimum {XORB_HEADER_SIZE})"
        raise ValueError(msg)

    # Parse header
    header_data = data[:XORB_HEADER_SIZE]
    magic_bytes = header_data[:4]
    version = header_data[4]
    flags = header_data[5]
    _reserved = header_data[6:16]

    if magic_bytes != XORB_MAGIC:
        msg = f"Invalid xorb magic: expected {XORB_MAGIC.hex()}, got {magic_bytes.hex()}"
        raise ValueError(msg)

    if version != XORB_VERSION:
        msg = f"Unsupported xorb version: {version} (expected {XORB_VERSION})"
        raise ValueError(msg)

    compress = (flags & FLAG_COMPRESSED) != 0
    if compress and not HAS_LZ4:
        msg = "Xorb is compressed but LZ4 is not available"
        raise ValueError(msg)

    # Parse chunk count
    offset = XORB_HEADER_SIZE
    if (
        len(data) < offset + 4
    ):  # pragma: no cover - Data validation error path, tested in test_deserialize_short_data
        msg = "Xorb data too short for chunk count"
        raise ValueError(msg)  # pragma: no cover - Same context

    chunk_count = struct.unpack("I", data[offset : offset + 4])[0]
    offset += 4

    # Validate chunk count (reasonable limit)
    if chunk_count > 1000000:  # Sanity check
        msg = f"Invalid chunk count: {chunk_count} (too large)"
        raise ValueError(
            msg
        )  # pragma: no cover - Defensive check for corrupted data, unlikely in normal operation

    # Parse chunks
    xorb = Xorb()
    for i in range(chunk_count):
        # Parse hash (32 bytes)
        if (
            len(data) < offset + 32
        ):  # pragma: no cover - Data validation error path, defensive check for corrupted data
            msg = f"Xorb data too short for chunk {i} hash"
            raise ValueError(msg)  # pragma: no cover - Same context

        chunk_hash = data[offset : offset + 32]
        offset += 32

        # Parse uncompressed size (4 bytes)
        if (
            len(data) < offset + 4
        ):  # pragma: no cover - Data validation error path, defensive check for corrupted data
            msg = f"Xorb data too short for chunk {i} uncompressed size"
            raise ValueError(msg)  # pragma: no cover - Same context

        uncompressed_size = struct.unpack("I", data[offset : offset + 4])[0]
        offset += 4

        # Parse compressed size (4 bytes)
        if (
            len(data) < offset + 4
        ):  # pragma: no cover - Data validation error path, defensive check for corrupted data
            msg = f"Xorb data too short for chunk {i} compressed size"
            raise ValueError(msg)  # pragma: no cover - Same context

        compressed_size = struct.unpack("I", data[offset : offset + 4])[0]
        offset += 4

        # Determine actual data size
        if compressed_size > 0:
            # Data is compressed
            if (
                not HAS_LZ4 or lz4 is None
            ):  # pragma: no cover - LZ4 unavailable check for compressed chunks, tested via monkeypatch
                msg = f"Chunk {i} is compressed but LZ4 is not available"
                raise ValueError(msg)  # pragma: no cover - Same context

            data_size = compressed_size
            if (
                len(data) < offset + data_size
            ):  # pragma: no cover - Data validation error path, defensive check for corrupted data
                msg = f"Xorb data too short for chunk {i} compressed data"
                raise ValueError(msg)  # pragma: no cover - Same context

            compressed_data = data[offset : offset + data_size]
            offset += data_size

            # Decompress
            try:
                chunk_data = lz4.frame.decompress(compressed_data)
                if (
                    len(chunk_data) != uncompressed_size
                ):  # pragma: no cover - Decompression size validation, defensive check for corrupted data
                    msg = (
                        f"Chunk {i} decompressed size mismatch: "
                        f"expected {uncompressed_size}, got {len(chunk_data)}"
                    )
                    raise ValueError(msg)  # pragma: no cover - Same context
            except Exception as e:
                msg = f"Failed to decompress chunk {i}: {e}"
                raise ValueError(msg) from e
        else:
            # Data is uncompressed
            data_size = uncompressed_size
            if (
                len(data) < offset + data_size
            ):  # pragma: no cover - Data validation error path, defensive check for corrupted data
                msg = f"Xorb data too short for chunk {i} data"
                raise ValueError(msg)  # pragma: no cover - Same context

            chunk_data = data[offset : offset + data_size]
            offset += data_size

        xorb.add_chunk(chunk_hash, chunk_data)

    # Parse metadata (total size)
    if (
        len(data) < offset + 8
    ):  # pragma: no cover - Data validation error path, defensive check for corrupted data
        msg = "Xorb data too short for metadata"
        raise ValueError(msg)  # pragma: no cover - Same context

    total_size = struct.unpack("Q", data[offset : offset + 8])[0]
    if total_size != xorb.total_size:
        logger.warning(
            "Xorb metadata total_size mismatch: expected %d, got %d",
            xorb.total_size,
            total_size,
        )

    return xorb

get_chunk_by_hash(chunk_hash: bytes) -> bytes | None

Get chunk data by hash.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required

Returns:

Type Description
bytes | None

Chunk data if found, None otherwise

Source code in ccbt/storage/xet_xorb.py
def get_chunk_by_hash(self, chunk_hash: bytes) -> bytes | None:
    """Get chunk data by hash.

    Args:
        chunk_hash: 32-byte chunk hash

    Returns:
        Chunk data if found, None otherwise

    """
    for hash_bytes, chunk_data in self.chunks:
        if hash_bytes == chunk_hash:
            return chunk_data
    return None

get_chunk_count() -> int

Get number of chunks in xorb.

Returns:

Type Description
int

Number of chunks

Source code in ccbt/storage/xet_xorb.py
def get_chunk_count(self) -> int:
    """Get number of chunks in xorb.

    Returns:
        Number of chunks

    """
    return len(self.chunks)

get_compressed_size(compress: bool = True) -> int

Get size of xorb when serialized with compression.

Parameters:

Name Type Description Default
compress bool

Whether to calculate compressed size

True

Returns:

Type Description
int

Size in bytes

Source code in ccbt/storage/xet_xorb.py
def get_compressed_size(self, compress: bool = True) -> int:
    """Get size of xorb when serialized with compression.

    Args:
        compress: Whether to calculate compressed size

    Returns:
        Size in bytes

    """
    if compress:
        serialized = self.serialize(compress=True)
        return len(serialized)
    return len(self.serialize(compress=False))

get_compression_ratio() -> float

Get compression ratio if compression is enabled.

Returns:

Type Description
float

Compression ratio (compressed_size / uncompressed_size)

float

Returns 1.0 if compression is not available or not beneficial

Source code in ccbt/storage/xet_xorb.py
def get_compression_ratio(self) -> float:
    """Get compression ratio if compression is enabled.

    Returns:
        Compression ratio (compressed_size / uncompressed_size)
        Returns 1.0 if compression is not available or not beneficial

    """
    if not HAS_LZ4:
        return 1.0  # pragma: no cover - LZ4 unavailable path tested via monkeypatch in tests

    uncompressed_size = len(self.serialize(compress=False))
    compressed_size = self.get_compressed_size(compress=True)

    if uncompressed_size == 0:
        return 1.0  # pragma: no cover - Empty xorb edge case, tested in test_get_compression_ratio_empty

    return compressed_size / uncompressed_size

get_total_size() -> int

Get total size of all chunks.

Returns:

Type Description
int

Total size in bytes

Source code in ccbt/storage/xet_xorb.py
def get_total_size(self) -> int:
    """Get total size of all chunks.

    Returns:
        Total size in bytes

    """
    return self.total_size

get_xorb_hash() -> bytes

Compute xorb hash for deduplication.

Returns the hash of the serialized xorb data, which can be used to identify identical xorbs for deduplication.

Returns:

Type Description
bytes

32-byte hash (BLAKE3-256 or SHA-256)

Source code in ccbt/storage/xet_xorb.py
def get_xorb_hash(self) -> bytes:
    """Compute xorb hash for deduplication.

    Returns the hash of the serialized xorb data, which can be used
    to identify identical xorbs for deduplication.

    Returns:
        32-byte hash (BLAKE3-256 or SHA-256)

    """
    from ccbt.storage.xet_hashing import XetHasher

    serialized = self.serialize(compress=False)  # Use uncompressed for hash
    return XetHasher.compute_chunk_hash(serialized)

is_full() -> bool

Check if xorb is full (would exceed MAX_XORB_SIZE with next chunk).

Returns:

Type Description
bool

True if full, False otherwise

Source code in ccbt/storage/xet_xorb.py
def is_full(self) -> bool:
    """Check if xorb is full (would exceed MAX_XORB_SIZE with next chunk).

    Returns:
        True if full, False otherwise

    """
    return self.total_size >= MAX_XORB_SIZE

serialize(compress: bool = False) -> bytes

Serialize xorb to binary format.

Format: [Header: 16 bytes] - Magic: 4 bytes ("XORB") - Version: 1 byte - Flags: 1 byte (compression flag, reserved bits) - Reserved: 10 bytes

[Chunk count: 4 bytes (uint32)]

[Chunk entries: variable] - For each chunk: - Hash: 32 bytes - Size: 4 bytes (uint32, uncompressed size) - Compressed size: 4 bytes (uint32, 0 if not compressed) - Data: variable (compressed if flags indicate)

[Metadata: variable] - Total size: 8 bytes (uint64, uncompressed)

Parameters:

Name Type Description Default
compress bool

Whether to compress chunk data with LZ4

False

Returns:

Type Description
bytes

Serialized xorb data

Source code in ccbt/storage/xet_xorb.py
def serialize(self, compress: bool = False) -> bytes:
    """Serialize xorb to binary format.

    Format:
    [Header: 16 bytes]
    - Magic: 4 bytes ("XORB")
    - Version: 1 byte
    - Flags: 1 byte (compression flag, reserved bits)
    - Reserved: 10 bytes

    [Chunk count: 4 bytes (uint32)]

    [Chunk entries: variable]
    - For each chunk:
      - Hash: 32 bytes
      - Size: 4 bytes (uint32, uncompressed size)
      - Compressed size: 4 bytes (uint32, 0 if not compressed)
      - Data: variable (compressed if flags indicate)

    [Metadata: variable]
    - Total size: 8 bytes (uint64, uncompressed)

    Args:
        compress: Whether to compress chunk data with LZ4

    Returns:
        Serialized xorb data

    """
    # Build header
    header = self._serialize_header(compress)

    # Build chunk data (with optional compression)
    chunk_data = self._serialize_chunks(compress)

    # Build metadata
    metadata = struct.pack("Q", self.total_size)  # uint64 (uncompressed size)

    return header + chunk_data + metadata

Format Specification: - Header: ccbt/storage/xet_xorb.py:123 - 16 bytes (magic 0x24687531, version, flags, reserved) - Chunk count: ccbt/storage/xet_xorb.py:149 - 4 bytes (uint32, little-endian) - Chunk entries: ccbt/storage/xet_xorb.py:140 - Variable (hash, sizes, data for each chunk) - Metadata: ccbt/storage/xet_xorb.py:119 - 8 bytes (total uncompressed size as uint64)

Constants: - MAX_XORB_SIZE: ccbt/storage/xet_xorb.py:35 - 64 MiB maximum xorb size - XORB_MAGIC_INT: ccbt/storage/xet_xorb.py:36 - 0x24687531 magic number - FLAG_COMPRESSED: ccbt/storage/xet_xorb.py:42 - LZ4 compression flag

Key Methods: - add_chunk(): ccbt/storage/xet_xorb.py:62 - Add chunk to xorb (fails if exceeds MAX_XORB_SIZE) - serialize(): ccbt/storage/xet_xorb.py:84 - Serialize xorb to binary format with optional LZ4 compression - deserialize(): ccbt/storage/xet_xorb.py:200 - Deserialize xorb from binary format with automatic decompression

Compression: - Optional LZ4 compression: ccbt/storage/xet_xorb.py:132 - Compresses chunk data if compress=True and LZ4 available - Automatic detection: ccbt/storage/xet_xorb.py:22 - Falls back gracefully if LZ4 not installed

Shard Format

Shards store file metadata and CAS information for efficient file system operations.

Shard format handler for metadata storage.

Shards group file metadata and CAS information for efficient retrieval. Each shard contains: - File information (paths, sizes, hashes) - Xorb references - Chunk hashes - HMAC for integrity verification

Attributes:

Name Type Description
files list[dict]

List of file metadata dictionaries

xorbs list[bytes]

List of xorb hashes

chunks list[bytes]

List of chunk hashes

Initialize empty shard.

Source code in ccbt/storage/xet_shard.py
def __init__(self):
    """Initialize empty shard."""
    self.files: list[dict] = []  # File metadata
    self.xorbs: list[bytes] = []  # Xorb hashes (32 bytes each)
    self.chunks: list[bytes] = []  # Chunk hashes (32 bytes each)

add_chunk_hash(chunk_hash: bytes) -> None

Add a chunk hash to the shard.

Parameters:

Name Type Description Default
chunk_hash bytes

32-byte chunk hash

required
Source code in ccbt/storage/xet_shard.py
def add_chunk_hash(self, chunk_hash: bytes) -> None:
    """Add a chunk hash to the shard.

    Args:
        chunk_hash: 32-byte chunk hash

    """
    if len(chunk_hash) != 32:
        msg = f"Chunk hash must be 32 bytes, got {len(chunk_hash)}"
        raise ValueError(msg)

    if chunk_hash not in self.chunks:
        self.chunks.append(chunk_hash)

add_file_info(file_path: str, file_hash: bytes, xorb_refs: list[bytes], total_size: int) -> None

Add file information to shard.

Parameters:

Name Type Description Default
file_path str

Path to the file

required
file_hash bytes

32-byte Merkle root hash of the file

required
xorb_refs list[bytes]

List of 32-byte xorb hashes that contain this file's chunks

required
total_size int

Total file size in bytes

required
Source code in ccbt/storage/xet_shard.py
def add_file_info(
    self,
    file_path: str,
    file_hash: bytes,
    xorb_refs: list[bytes],
    total_size: int,
) -> None:
    """Add file information to shard.

    Args:
        file_path: Path to the file
        file_hash: 32-byte Merkle root hash of the file
        xorb_refs: List of 32-byte xorb hashes that contain this file's chunks
        total_size: Total file size in bytes

    """
    if len(file_hash) != 32:
        msg = f"File hash must be 32 bytes, got {len(file_hash)}"
        raise ValueError(msg)

    for xorb_hash in xorb_refs:
        if len(xorb_hash) != 32:
            msg = f"Xorb hash must be 32 bytes, got {len(xorb_hash)}"
            raise ValueError(msg)

    self.files.append(
        {
            "path": file_path,
            "hash": file_hash,
            "xorbs": xorb_refs,
            "size": total_size,
        }
    )

add_xorb_hash(xorb_hash: bytes) -> None

Add a xorb hash to the shard.

Parameters:

Name Type Description Default
xorb_hash bytes

32-byte xorb hash

required
Source code in ccbt/storage/xet_shard.py
def add_xorb_hash(self, xorb_hash: bytes) -> None:
    """Add a xorb hash to the shard.

    Args:
        xorb_hash: 32-byte xorb hash

    """
    if len(xorb_hash) != 32:
        msg = f"Xorb hash must be 32 bytes, got {len(xorb_hash)}"
        raise ValueError(msg)

    if xorb_hash not in self.xorbs:
        self.xorbs.append(xorb_hash)

deserialize(data: bytes, hmac_key: bytes | None = None) -> XetShard staticmethod

Deserialize shard from binary format.

Parameters:

Name Type Description Default
data bytes

Serialized shard data

required
hmac_key bytes | None

Optional HMAC key for verification

None

Returns:

Type Description
XetShard

XetShard instance

Raises:

Type Description
ValueError

If data is invalid or HMAC verification fails

Source code in ccbt/storage/xet_shard.py
@staticmethod
def deserialize(data: bytes, hmac_key: bytes | None = None) -> XetShard:
    """Deserialize shard from binary format.

    Args:
        data: Serialized shard data
        hmac_key: Optional HMAC key for verification

    Returns:
        XetShard instance

    Raises:
        ValueError: If data is invalid or HMAC verification fails

    """
    if len(data) < SHARD_HEADER_SIZE:
        msg = (
            f"Shard data too short: {len(data)} bytes (minimum {SHARD_HEADER_SIZE})"
        )
        raise ValueError(msg)

    # Parse header
    header_data = data[:SHARD_HEADER_SIZE]
    (
        magic,
        version,
        flags,
        _reserved1,
        file_count,
        xorb_count,
        chunk_count,
        _reserved2,
    ) = struct.unpack("4sBB2sIII4s", header_data)

    if magic != SHARD_MAGIC:
        msg = f"Invalid shard magic: expected {SHARD_MAGIC}, got {magic}"
        raise ValueError(msg)

    if version != SHARD_VERSION:
        msg = f"Unsupported shard version: {version} (expected {SHARD_VERSION})"
        raise ValueError(msg)

    has_hmac = (flags & 0x01) != 0

    # Verify HMAC if present
    if has_hmac and hmac_key:
        if (
            len(data) < HMAC_SIZE
        ):  # pragma: no cover - Defensive check for corrupted data, tested in test_deserialize_short_data
            msg = "Shard data too short for HMAC"  # pragma: no cover - Same context
            raise ValueError(msg)  # pragma: no cover - Same context

        payload = data[:-HMAC_SIZE]
        expected_hmac = data[-HMAC_SIZE:]
        actual_hmac = hmac.new(hmac_key, payload, hashlib.sha256).digest()

        if not hmac.compare_digest(expected_hmac, actual_hmac):
            msg = "Shard HMAC verification failed"
            raise ValueError(msg)

    # Parse file info section
    offset = SHARD_HEADER_SIZE
    shard = XetShard()

    for _ in range(file_count):
        # Parse path
        if len(data) < offset + 4:
            msg = "Shard data too short for path length"
            raise ValueError(msg)

        path_len = struct.unpack("I", data[offset : offset + 4])[0]
        offset += 4

        if len(data) < offset + path_len:
            msg = "Shard data too short for path"
            raise ValueError(msg)

        file_path = data[offset : offset + path_len].decode("utf-8")
        offset += path_len

        # Parse hash
        if len(data) < offset + 32:
            msg = "Shard data too short for file hash"
            raise ValueError(msg)

        file_hash = data[offset : offset + 32]
        offset += 32

        # Parse size
        if len(data) < offset + 8:
            msg = "Shard data too short for file size"
            raise ValueError(msg)

        file_size = struct.unpack("Q", data[offset : offset + 8])[0]
        offset += 8

        # Parse xorb refs
        if len(data) < offset + 4:
            msg = "Shard data too short for xorb count"
            raise ValueError(msg)

        xorb_ref_count = struct.unpack("I", data[offset : offset + 4])[0]
        offset += 4

        xorb_refs = []
        for _ in range(xorb_ref_count):
            if len(data) < offset + 32:
                msg = "Shard data too short for xorb ref"
                raise ValueError(msg)

            xorb_ref = data[offset : offset + 32]
            offset += 32
            xorb_refs.append(xorb_ref)

        shard.add_file_info(file_path, file_hash, xorb_refs, file_size)

    # Parse CAS info section
    # Xorb hashes
    for _ in range(xorb_count):
        if len(data) < offset + 32:
            msg = "Shard data too short for xorb hash"
            raise ValueError(msg)

        xorb_hash = data[offset : offset + 32]
        offset += 32
        shard.add_xorb_hash(xorb_hash)

    # Chunk hashes
    for _ in range(chunk_count):
        if len(data) < offset + 32:
            msg = "Shard data too short for chunk hash"
            raise ValueError(msg)

        chunk_hash = data[offset : offset + 32]
        offset += 32
        shard.add_chunk_hash(chunk_hash)

    return shard

get_file_by_path(file_path: str) -> dict | None

Get file information by path.

Parameters:

Name Type Description Default
file_path str

Path to file

required

Returns:

Type Description
dict | None

File info dictionary if found, None otherwise

Source code in ccbt/storage/xet_shard.py
def get_file_by_path(self, file_path: str) -> dict | None:
    """Get file information by path.

    Args:
        file_path: Path to file

    Returns:
        File info dictionary if found, None otherwise

    """
    for file_info in self.files:
        if file_info["path"] == file_path:
            return file_info
    return None

get_file_count() -> int

Get number of files in shard.

Returns:

Type Description
int

Number of files

Source code in ccbt/storage/xet_shard.py
def get_file_count(self) -> int:
    """Get number of files in shard.

    Returns:
        Number of files

    """
    return len(self.files)

serialize(hmac_key: bytes | None = None) -> bytes

Serialize shard to binary format with optional HMAC.

Format: [Header: 24 bytes] - Magic: 4 bytes ("SHAR") - Version: 1 byte - Flags: 1 byte (HMAC flag, reserved bits) - Reserved: 2 bytes - File count: 4 bytes (uint32) - Xorb count: 4 bytes (uint32) - Chunk count: 4 bytes (uint32) - Reserved: 4 bytes

[File Info Section: variable] - For each file: - Path length: 4 bytes (uint32) - Path: variable (UTF-8) - Hash: 32 bytes - Size: 8 bytes (uint64) - Xorb count: 4 bytes (uint32) - Xorb refs: variable (32 bytes each)

[CAS Info Section: variable] - Xorb hashes: variable (32 bytes each) - Chunk hashes: variable (32 bytes each)

[Footer with HMAC: variable] - HMAC: 32 bytes (if key provided)

Parameters:

Name Type Description Default
hmac_key bytes | None

Optional HMAC key for integrity verification

None

Returns:

Type Description
bytes

Serialized shard data

Source code in ccbt/storage/xet_shard.py
def serialize(self, hmac_key: bytes | None = None) -> bytes:
    """Serialize shard to binary format with optional HMAC.

    Format:
    [Header: 24 bytes]
    - Magic: 4 bytes ("SHAR")
    - Version: 1 byte
    - Flags: 1 byte (HMAC flag, reserved bits)
    - Reserved: 2 bytes
    - File count: 4 bytes (uint32)
    - Xorb count: 4 bytes (uint32)
    - Chunk count: 4 bytes (uint32)
    - Reserved: 4 bytes

    [File Info Section: variable]
    - For each file:
      - Path length: 4 bytes (uint32)
      - Path: variable (UTF-8)
      - Hash: 32 bytes
      - Size: 8 bytes (uint64)
      - Xorb count: 4 bytes (uint32)
      - Xorb refs: variable (32 bytes each)

    [CAS Info Section: variable]
    - Xorb hashes: variable (32 bytes each)
    - Chunk hashes: variable (32 bytes each)

    [Footer with HMAC: variable]
    - HMAC: 32 bytes (if key provided)

    Args:
        hmac_key: Optional HMAC key for integrity verification

    Returns:
        Serialized shard data

    """
    # Build header
    header = self._serialize_header(hmac_key is not None)

    # Build file info section
    file_info = self._serialize_file_info()

    # Build CAS info section
    cas_info = self._serialize_cas_info()

    # Build footer with HMAC
    footer = self._serialize_footer(hmac_key, header + file_info + cas_info)

    return header + file_info + cas_info + footer

Format Specification: - Header: ccbt/storage/xet_shard.py:142 - 24 bytes (magic "SHAR", version, flags, file/xorb/chunk counts) - File Info Section: ccbt/storage/xet_shard.py:145 - Variable (path, hash, size, xorb refs for each file) - CAS Info Section: ccbt/storage/xet_shard.py:148 - Variable (xorb hashes, chunk hashes) - HMAC Footer: ccbt/storage/xet_shard.py:150 - 32 bytes (HMAC-SHA256 if key provided)

Constants: - SHARD_MAGIC: ccbt/storage/xet_shard.py:19 - b"SHAR" magic bytes - SHARD_VERSION: ccbt/storage/xet_shard.py:20 - Format version 1 - HMAC_SIZE: ccbt/storage/xet_shard.py:22 - 32 bytes for HMAC-SHA256

Key Methods: - add_file_info(): ccbt/storage/xet_shard.py:47 - Add file metadata with xorb references - add_chunk_hash(): ccbt/storage/xet_shard.py:80 - Add chunk hash to shard - add_xorb_hash(): ccbt/storage/xet_shard.py:93 - Add xorb hash to shard - serialize(): ccbt/storage/xet_shard.py:106 - Serialize shard to binary format with optional HMAC - deserialize(): ccbt/storage/xet_shard.py:201 - Deserialize shard from binary format with HMAC verification

Integrity: - HMAC verification: ccbt/storage/xet_shard.py:170 - Optional HMAC-SHA256 for shard integrity

Merkle Tree Computation

Files are verified using Merkle trees built from chunk hashes for efficient integrity verification.

Xet protocol hashing functions.

Provides BLAKE3-256 hashing for chunks and Merkle tree construction for file-level hashing. Falls back to SHA-256 if blake3 is not available.

build_merkle_tree(chunks: list[bytes]) -> bytes staticmethod

Build Merkle tree from chunk hashes.

Constructs a binary Merkle tree bottom-up from chunk hashes. Each level pairs hashes and hashes them together until a single root hash remains.

Parameters:

Name Type Description Default
chunks list[bytes]

List of chunk data (not hashes - will be hashed)

required

Returns:

Type Description
bytes

32-byte root hash (Merkle tree root)

Source code in ccbt/storage/xet_hashing.py
@staticmethod
def build_merkle_tree(chunks: list[bytes]) -> bytes:
    """Build Merkle tree from chunk hashes.

    Constructs a binary Merkle tree bottom-up from chunk hashes.
    Each level pairs hashes and hashes them together until a single
    root hash remains.

    Args:
        chunks: List of chunk data (not hashes - will be hashed)

    Returns:
        32-byte root hash (Merkle tree root)

    """
    if not chunks:
        return b"\x00" * XetHasher.HASH_SIZE

    # Compute chunk hashes
    hashes = [XetHasher.compute_chunk_hash(chunk) for chunk in chunks]

    # Build binary tree bottom-up
    while len(hashes) > 1:
        next_level = []
        for i in range(0, len(hashes), 2):
            if i + 1 < len(hashes):
                # Pair hashes: combine and hash
                combined = hashes[i] + hashes[i + 1]
                next_level.append(XetHasher.compute_chunk_hash(combined))
            else:
                # Odd number, promote single hash (duplicate for pairing)
                # In Merkle trees, odd nodes are typically duplicated
                combined = hashes[i] + hashes[i]
                next_level.append(XetHasher.compute_chunk_hash(combined))
        hashes = next_level

    return hashes[0]

build_merkle_tree_from_hashes(chunk_hashes: list[bytes]) -> bytes staticmethod

Build Merkle tree from existing chunk hashes.

This variant takes pre-computed chunk hashes instead of chunk data. Useful when you already have the hashes and don't need to recompute them.

Parameters:

Name Type Description Default
chunk_hashes list[bytes]

List of 32-byte chunk hashes

required

Returns:

Type Description
bytes

32-byte root hash (Merkle tree root)

Source code in ccbt/storage/xet_hashing.py
@staticmethod
def build_merkle_tree_from_hashes(chunk_hashes: list[bytes]) -> bytes:
    """Build Merkle tree from existing chunk hashes.

    This variant takes pre-computed chunk hashes instead of chunk data.
    Useful when you already have the hashes and don't need to recompute them.

    Args:
        chunk_hashes: List of 32-byte chunk hashes

    Returns:
        32-byte root hash (Merkle tree root)

    """
    if not chunk_hashes:
        return (
            b"\x00" * XetHasher.HASH_SIZE
        )  # pragma: no cover - Empty hash list tested in test_build_merkle_tree_empty

    # Validate hash sizes
    for h in chunk_hashes:
        if len(h) != XetHasher.HASH_SIZE:
            msg = f"Invalid hash size: expected {XetHasher.HASH_SIZE}, got {len(h)}"
            raise ValueError(msg)

    # Build binary tree bottom-up
    hashes = list(chunk_hashes)
    while len(hashes) > 1:
        next_level = []
        for i in range(0, len(hashes), 2):
            if i + 1 < len(hashes):
                # Pair hashes
                combined = hashes[i] + hashes[i + 1]
                next_level.append(XetHasher.compute_chunk_hash(combined))
            else:  # pragma: no cover - Odd number handling tested in test_build_merkle_tree_three
                # Odd number, duplicate for pairing
                combined = hashes[i] + hashes[i]  # pragma: no cover - Same context
                next_level.append(
                    XetHasher.compute_chunk_hash(combined)
                )  # pragma: no cover - Same context
        hashes = next_level

    return hashes[0]

compute_chunk_hash(chunk_data: bytes) -> bytes staticmethod

Compute BLAKE3-256 hash for a chunk.

Uses BLAKE3 if available for better performance, otherwise falls back to SHA-256 for compatibility.

Parameters:

Name Type Description Default
chunk_data bytes

Chunk data to hash

required

Returns:

Type Description
bytes

32-byte hash (BLAKE3-256 or SHA-256)

Source code in ccbt/storage/xet_hashing.py
@staticmethod
def compute_chunk_hash(chunk_data: bytes) -> bytes:
    """Compute BLAKE3-256 hash for a chunk.

    Uses BLAKE3 if available for better performance, otherwise
    falls back to SHA-256 for compatibility.

    Args:
        chunk_data: Chunk data to hash

    Returns:
        32-byte hash (BLAKE3-256 or SHA-256)

    """
    if HAS_BLAKE3:
        return blake3.blake3(chunk_data).digest()
    # Fallback to SHA-256 (protocol-compatible)
    return hashlib.sha256(
        chunk_data
    ).digest()  # pragma: no cover - Fallback tested via monkeypatch in tests

compute_xorb_hash(xorb_data: bytes) -> bytes staticmethod

Compute hash for xorb data.

Xorbs are collections of chunks stored together. This method computes the hash of the xorb data.

Parameters:

Name Type Description Default
xorb_data bytes

Xorb data to hash

required

Returns:

Type Description
bytes

32-byte hash

Source code in ccbt/storage/xet_hashing.py
@staticmethod
def compute_xorb_hash(xorb_data: bytes) -> bytes:
    """Compute hash for xorb data.

    Xorbs are collections of chunks stored together. This method
    computes the hash of the xorb data.

    Args:
        xorb_data: Xorb data to hash

    Returns:
        32-byte hash

    """
    return XetHasher.compute_chunk_hash(xorb_data)

hash_file_incremental(file_path: str, chunk_callback: Callable[[bytes], None] | None = None) -> bytes staticmethod

Compute file hash incrementally by reading and hashing chunks.

This method reads a file in chunks and computes the hash incrementally, which is memory-efficient for large files.

Parameters:

Name Type Description Default
file_path str

Path to file to hash

required
chunk_callback Callable[[bytes], None] | None

Optional callback function called with each chunk

None

Returns:

Type Description
bytes

32-byte file hash

Source code in ccbt/storage/xet_hashing.py
@staticmethod
def hash_file_incremental(
    file_path: str,
    chunk_callback: Callable[[bytes], None] | None = None,
) -> bytes:
    """Compute file hash incrementally by reading and hashing chunks.

    This method reads a file in chunks and computes the hash incrementally,
    which is memory-efficient for large files.

    Args:
        file_path: Path to file to hash
        chunk_callback: Optional callback function called with each chunk

    Returns:
        32-byte file hash

    """
    if HAS_BLAKE3:
        # BLAKE3 supports incremental hashing
        hasher = blake3.blake3()
        with open(file_path, "rb") as f:
            while True:
                chunk = f.read(1024 * 1024)  # Read 1 MB at a time
                if not chunk:
                    break
                hasher.update(chunk)
                if chunk_callback:
                    chunk_callback(chunk)
        return hasher.digest()
    # SHA-256 fallback
    hasher = (
        hashlib.sha256()
    )  # pragma: no cover - SHA-256 fallback tested via monkeypatch in tests
    with open(file_path, "rb") as f:  # pragma: no cover - Same context
        while True:  # pragma: no cover - Same context
            chunk = f.read(
                1024 * 1024
            )  # Read 1 MB at a time  # pragma: no cover - Same context
            if not chunk:  # pragma: no cover - Same context
                break  # pragma: no cover - Same context
            hasher.update(chunk)  # pragma: no cover - Same context
            if chunk_callback:  # pragma: no cover - Same context
                chunk_callback(chunk)  # pragma: no cover - Same context
    return hasher.digest()  # pragma: no cover - Same context

verify_chunk_hash(chunk_data: bytes, expected_hash: bytes) -> bool staticmethod

Verify chunk data against expected hash.

Parameters:

Name Type Description Default
chunk_data bytes

Chunk data to verify

required
expected_hash bytes

Expected hash (32 bytes)

required

Returns:

Type Description
bool

True if hash matches, False otherwise

Source code in ccbt/storage/xet_hashing.py
@staticmethod
def verify_chunk_hash(chunk_data: bytes, expected_hash: bytes) -> bool:
    """Verify chunk data against expected hash.

    Args:
        chunk_data: Chunk data to verify
        expected_hash: Expected hash (32 bytes)

    Returns:
        True if hash matches, False otherwise

    """
    if len(expected_hash) != XetHasher.HASH_SIZE:
        return False

    actual_hash = XetHasher.compute_chunk_hash(chunk_data)
    return actual_hash == expected_hash

Hash Functions: - compute_chunk_hash(): ccbt/storage/xet_hashing.py:43 - Compute BLAKE3-256 hash for chunk (falls back to SHA-256) - compute_xorb_hash(): ccbt/storage/xet_hashing.py:63 - Compute hash for xorb data - verify_chunk_hash(): ccbt/storage/xet_hashing.py:158 - Verify chunk data against expected hash

Merkle Tree Construction: - build_merkle_tree(): ccbt/storage/xet_hashing.py:78 - Build Merkle tree from chunk data (hashes chunks first) - build_merkle_tree_from_hashes(): ccbt/storage/xet_hashing.py:115 - Build Merkle tree from pre-computed chunk hashes

Algorithm: The Merkle tree is built bottom-up by pairing hashes at each level: 1. Start with chunk hashes (leaf nodes) 2. Pair adjacent hashes and hash the combination 3. Repeat until single root hash remains 4. Odd numbers: duplicate the last hash for pairing

Incremental Hashing: - hash_file_incremental(): ccbt/storage/xet_hashing.py:175 - Compute file hash incrementally for memory efficiency

Hash Size: - HASH_SIZE: ccbt/storage/xet_hashing.py:40 - 32 bytes for BLAKE3-256 or SHA-256

BLAKE3 Support: - Automatic detection: ccbt/storage/xet_hashing.py:21 - Uses BLAKE3 if available, falls back to SHA-256 - Performance: BLAKE3 provides better performance for large files

References