๐Ÿ”’ Protected Content

This Python tutorial is for ECOM 561 students only. Please enter the password to continue.

โŒ Incorrect password. Please try again.

Python Tutorial

ECOM 561 - Information Theory and Coding | Spring 2026

๐Ÿš€ Introduction & Setup

This tutorial provides Python implementations for key concepts in Information Theory and Coding. Python offers excellent libraries for numerical computing and visualization, making it an ideal choice for understanding and implementing coding algorithms.

๐Ÿ“‹ Course Learning Outcome

CLO-6: Evaluate the performance of selected coding algorithms on some error-correction and coding techniques using Python.

Required Libraries

pip install numpy scipy matplotlib
Import Libraries
import numpy as np
import matplotlib.pyplot as plt
from scipy import special
from collections import defaultdict
import heapq

# For nicer plots
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = [10, 6]
plt.rcParams['font.size'] = 12

Basic Setup Template

script_template.py
"""
ECOM 561 - Information Theory and Coding
Assignment/Project: [Title]
Student Name: [Your Name]
Student ID: [Your ID]
Date: [Date]
Description: [Brief description of what this code does]
"""

import numpy as np
import matplotlib.pyplot as plt

# ============================================
# Main Code Starts Here
# ============================================

def main():
    """Main function to run the analysis."""
    # Your implementation goes here
    pass

if __name__ == "__main__":
    main()

๐Ÿ’ก Tip: Using Jupyter Notebooks

Consider using Jupyter Notebooks (pip install jupyter) for interactive development. You can run cells individually and see results immediately, which is great for learning and experimentation.

๐Ÿ“Š Entropy & Information Measures CLO-1

๐Ÿ“ Theory Review

Self-Information: I(x) = -logโ‚‚(P(x)) bits

Entropy: H(X) = -ฮฃ P(xแตข) logโ‚‚(P(xแตข)) bits

Joint Entropy: H(X,Y) = -ฮฃแตข ฮฃโฑผ P(xแตข,yโฑผ) logโ‚‚(P(xแตข,yโฑผ))

Mutual Information: I(X;Y) = H(X) + H(Y) - H(X,Y)

Self-Information Calculator

self_information.py
import numpy as np

def self_information(p):
    """
    Calculate self-information of an event.
    
    Parameters:
        p: probability of the event (0 < p <= 1)
    
    Returns:
        I: self-information in bits
    """
    if p <= 0 or p > 1:
        raise ValueError("Probability must be in range (0, 1]")
    
    return -np.log2(p)

# Example Usage
if __name__ == "__main__":
    # Event with probability 0.5 (fair coin flip)
    p = 0.5
    I = self_information(p)
    print(f"Self-information for P={p:.2f}: {I:.4f} bits")
    
    # Multiple events
    probs = [0.9, 0.5, 0.1, 0.01]
    for p in probs:
        I = self_information(p)
        print(f"P = {p:.2f} โ†’ I = {I:.4f} bits")

Entropy Calculator

entropy.py
import numpy as np
import matplotlib.pyplot as plt

def calculate_entropy(P):
    """
    Calculate entropy of a discrete source.
    
    Parameters:
        P: probability distribution (array-like, must sum to 1)
    
    Returns:
        H: entropy in bits
    """
    P = np.array(P)
    
    # Validate input
    if not np.isclose(np.sum(P), 1.0):
        raise ValueError(f"Probabilities must sum to 1. Sum = {np.sum(P):.6f}")
    
    if np.any(P < 0):
        raise ValueError("Probabilities cannot be negative")
    
    # Calculate entropy (handle P=0 case)
    # Use np.where to avoid log(0)
    H = -np.sum(np.where(P > 0, P * np.log2(P), 0))
    
    return H

def binary_entropy(p):
    """Calculate binary entropy function H(p)."""
    if p == 0 or p == 1:
        return 0
    return calculate_entropy([p, 1-p])

# Plot binary entropy function
if __name__ == "__main__":
    p_values = np.linspace(0.001, 0.999, 1000)
    H_values = [binary_entropy(p) for p in p_values]
    
    plt.figure(figsize=(10, 6))
    plt.plot(p_values, H_values, 'b-', linewidth=2)
    plt.xlabel('Probability p', fontsize=12)
    plt.ylabel('H(p) [bits]', fontsize=12)
    plt.title('Binary Entropy Function H(p) = -p logโ‚‚(p) - (1-p) logโ‚‚(1-p)', fontsize=14)
    plt.grid(True, alpha=0.3)
    plt.xlim([0, 1])
    plt.ylim([0, 1.1])
    
    # Mark maximum entropy point
    plt.plot(0.5, 1, 'ro', markersize=10)
    plt.annotate('Maximum at p=0.5', xy=(0.5, 1), xytext=(0.55, 0.95), fontsize=11)
    
    plt.tight_layout()
    plt.savefig('binary_entropy.png', dpi=150)
    plt.show()

Joint and Conditional Entropy

joint_entropy.py
import numpy as np

def joint_conditional_entropy(P_XY):
    """
    Calculate all entropy measures from joint probability matrix.
    
    Parameters:
        P_XY: joint probability matrix P(X,Y) as 2D numpy array
    
    Returns:
        dict with H_XY, H_X, H_Y, H_X_given_Y, H_Y_given_X, I_XY
    """
    P_XY = np.array(P_XY)
    
    # Marginal distributions
    P_X = np.sum(P_XY, axis=1)  # Sum over Y (columns)
    P_Y = np.sum(P_XY, axis=0)  # Sum over X (rows)
    
    # Individual entropies
    H_X = calculate_entropy(P_X)
    H_Y = calculate_entropy(P_Y)
    
    # Joint entropy H(X,Y)
    H_XY = -np.sum(np.where(P_XY > 0, P_XY * np.log2(P_XY), 0))
    
    # Conditional entropies
    H_X_given_Y = H_XY - H_Y  # H(X|Y) = H(X,Y) - H(Y)
    H_Y_given_X = H_XY - H_X  # H(Y|X) = H(X,Y) - H(X)
    
    # Mutual information
    I_XY = H_X + H_Y - H_XY   # I(X;Y) = H(X) + H(Y) - H(X,Y)
    
    return {
        'H_X': H_X,
        'H_Y': H_Y,
        'H_XY': H_XY,
        'H_X_given_Y': H_X_given_Y,
        'H_Y_given_X': H_Y_given_X,
        'I_XY': I_XY
    }

# Example: Binary Symmetric Channel Analysis
if __name__ == "__main__":
    p = 0.1  # Crossover probability
    
    # Joint probability matrix for BSC with uniform input
    P_XY = np.array([
        [0.5*(1-p), 0.5*p],      # P(X=0,Y=0), P(X=0,Y=1)
        [0.5*p,     0.5*(1-p)]   # P(X=1,Y=0), P(X=1,Y=1)
    ])
    
    results = joint_conditional_entropy(P_XY)
    
    print(f"===== Binary Symmetric Channel Analysis (p={p:.2f}) =====")
    print(f"H(X) = {results['H_X']:.4f} bits")
    print(f"H(Y) = {results['H_Y']:.4f} bits")
    print(f"H(X,Y) = {results['H_XY']:.4f} bits")
    print(f"H(X|Y) = {results['H_X_given_Y']:.4f} bits")
    print(f"H(Y|X) = {results['H_Y_given_X']:.4f} bits")
    print(f"I(X;Y) = {results['I_XY']:.4f} bits")

โœ… Key Relationships to Remember

  • Chain Rule: H(X,Y) = H(X) + H(Y|X) = H(Y) + H(X|Y)
  • Conditioning Reduces Entropy: H(X|Y) โ‰ค H(X)
  • Mutual Information: I(X;Y) = H(X) - H(X|Y) โ‰ฅ 0
  • Maximum Entropy: Uniform distribution maximizes entropy

๐Ÿ“ฆ Source Coding Fundamentals CLO-2

๐Ÿ“ Source Coding Theorem

For a discrete memoryless source with entropy H(X), the average codeword length L satisfies:

H(X) โ‰ค L < H(X) + 1

Kraft Inequality Checker

kraft_inequality.py
import numpy as np

def kraft_inequality(lengths, r=2):
    """
    Check if codeword lengths satisfy Kraft inequality.
    
    Parameters:
        lengths: list of codeword lengths
        r: radix/alphabet size (default: 2 for binary)
    
    Returns:
        tuple: (K, is_valid) - Kraft sum and validity
    """
    lengths = np.array(lengths)
    K = np.sum(r ** (-lengths))
    is_valid = K <= 1
    
    print("===== Kraft Inequality Check =====")
    print(f"Codeword lengths: {lengths}")
    print(f"Radix r = {r}")
    print(f"Kraft sum K = {K:.4f}")
    
    if is_valid:
        print("โœ“ K โ‰ค 1: A prefix-free code EXISTS")
        if K < 1:
            print("  (Code can be improved - K < 1)")
    else:
        print("โœ— K > 1: NO prefix-free code exists")
    
    return K, is_valid

# Examples
if __name__ == "__main__":
    print("Example 1: Valid code")
    kraft_inequality([1, 2, 3, 3])
    
    print("\nExample 2: Invalid lengths")
    kraft_inequality([1, 1, 2, 2])
    
    print("\nExample 3: Optimal code (K = 1)")
    kraft_inequality([2, 2, 2, 2])

Average Codeword Length

avg_length.py
import numpy as np

def avg_codeword_length(P, lengths):
    """
    Calculate average codeword length and efficiency.
    
    Parameters:
        P: probability distribution
        lengths: codeword lengths for each symbol
    
    Returns:
        tuple: (L_avg, efficiency, redundancy)
    """
    P = np.array(P)
    lengths = np.array(lengths)
    
    # Average codeword length
    L_avg = np.sum(P * lengths)
    
    # Entropy
    H = calculate_entropy(P)
    
    # Efficiency and redundancy
    efficiency = (H / L_avg) * 100  # Percentage
    redundancy = L_avg - H
    
    print("===== Source Coding Analysis =====")
    print(f"Source Entropy H(X) = {H:.4f} bits")
    print(f"Average Length L = {L_avg:.4f} bits")
    print(f"Coding Efficiency = {efficiency:.2f}%")
    print(f"Redundancy = {redundancy:.4f} bits")
    
    return L_avg, efficiency, redundancy

# Example
if __name__ == "__main__":
    symbols = ['A', 'B', 'C', 'D']
    P = [0.5, 0.25, 0.125, 0.125]
    lengths = [1, 2, 3, 3]
    codes = ['0', '10', '110', '111']
    
    print("Symbol | Prob  | Length | Code")
    print("-------|-------|--------|------")
    for s, p, l, c in zip(symbols, P, lengths, codes):
        print(f"  {s}    | {p:.3f} |   {l}    | {c}")
    print()
    
    avg_codeword_length(P, lengths)

๐ŸŒณ Huffman Coding CLO-2

๐Ÿ“ Huffman Algorithm

Huffman coding produces an optimal prefix-free code:

  1. Sort symbols by probability (ascending)
  2. Combine two lowest probability symbols
  3. Repeat until single node remains
  4. Assign bits by traversing tree (0=left, 1=right)

Huffman Coding Implementation

huffman.py
import heapq
from collections import defaultdict
import numpy as np

class HuffmanNode:
    """Node for Huffman tree."""
    def __init__(self, symbol=None, prob=0):
        self.symbol = symbol
        self.prob = prob
        self.left = None
        self.right = None
    
    def __lt__(self, other):
        return self.prob < other.prob

def huffman_encode(symbols, probabilities):
    """
    Generate Huffman codes for given symbols.
    
    Parameters:
        symbols: list of symbol names
        probabilities: list of probabilities
    
    Returns:
        dict: symbol -> code mapping
    """
    # Create leaf nodes and add to heap
    heap = []
    for s, p in zip(symbols, probabilities):
        node = HuffmanNode(s, p)
        heapq.heappush(heap, node)
    
    # Build Huffman tree
    while len(heap) > 1:
        # Pop two smallest nodes
        left = heapq.heappop(heap)
        right = heapq.heappop(heap)
        
        # Create parent node
        parent = HuffmanNode(prob=left.prob + right.prob)
        parent.left = left
        parent.right = right
        
        heapq.heappush(heap, parent)
    
    # Generate codes by traversing tree
    root = heap[0]
    codes = {}
    
    def traverse(node, code=""):
        if node.symbol is not None:
            codes[node.symbol] = code if code else "0"
        else:
            if node.left:
                traverse(node.left, code + "0")
            if node.right:
                traverse(node.right, code + "1")
    
    traverse(root)
    return codes

def huffman_analysis(symbols, probabilities):
    """Perform complete Huffman coding analysis."""
    codes = huffman_encode(symbols, probabilities)
    
    # Calculate statistics
    lengths = [len(codes[s]) for s in symbols]
    P = np.array(probabilities)
    L_avg = np.sum(P * lengths)
    H = -np.sum(P * np.log2(P))
    efficiency = (H / L_avg) * 100
    
    print("===== Huffman Coding Results =====")
    print("Symbol | Prob   | Length | Code")
    print("-------|--------|--------|----------")
    for s, p in zip(symbols, probabilities):
        print(f"  {s}    | {p:.4f} |   {len(codes[s])}    | {codes[s]}")
    
    print(f"\nEntropy H(X) = {H:.4f} bits")
    print(f"Average Length L = {L_avg:.4f} bits")
    print(f"Efficiency = {efficiency:.2f}%")
    
    return codes

# Example
if __name__ == "__main__":
    symbols = ['A', 'B', 'C', 'D', 'E']
    probabilities = [0.4, 0.2, 0.2, 0.1, 0.1]
    
    codes = huffman_analysis(symbols, probabilities)

Huffman Encoding/Decoding Functions

huffman_codec.py
def huffman_encode_message(message, codes):
    """Encode a message using Huffman codes."""
    return ''.join(codes[char] for char in message if char in codes)

def huffman_decode_message(bitstring, codes):
    """Decode a Huffman-encoded bitstring."""
    # Create reverse lookup
    reverse_codes = {v: k for k, v in codes.items()}
    
    decoded = []
    buffer = ""
    
    for bit in bitstring:
        buffer += bit
        if buffer in reverse_codes:
            decoded.append(reverse_codes[buffer])
            buffer = ""
    
    if buffer:
        print(f"Warning: Incomplete codeword remaining: {buffer}")
    
    return ''.join(decoded)

# Example
if __name__ == "__main__":
    symbols = ['A', 'B', 'C', 'D']
    probs = [0.5, 0.25, 0.125, 0.125]
    codes = huffman_encode(symbols, probs)
    
    message = "ABACADA"
    print(f"Original message: {message}")
    
    encoded = huffman_encode_message(message, codes)
    print(f"Encoded: {encoded} ({len(encoded)} bits)")
    
    decoded = huffman_decode_message(encoded, codes)
    print(f"Decoded: {decoded}")
    
    # Compression ratio
    original_bits = len(message) * 2  # Fixed 2-bit encoding
    compression = (1 - len(encoded)/original_bits) * 100
    print(f"Compression: {original_bits} bits โ†’ {len(encoded)} bits ({compression:.1f}% reduction)")

๐Ÿ”„ Lempel-Ziv Coding CLO-2

๐Ÿ“ LZ78 Algorithm

Dictionary-based compression that builds a codebook dynamically:

  1. Start with empty dictionary
  2. Read input until finding longest match in dictionary
  3. Output (index, next_symbol) pair
  4. Add new phrase to dictionary

LZ78 Implementation

lz78.py
def lz78_encode(input_string):
    """
    Encode a string using LZ78 algorithm.
    
    Parameters:
        input_string: string to encode
    
    Returns:
        tuple: (output pairs, dictionary)
    """
    dictionary = {'': 0}  # Empty string has index 0
    output = []
    i = 0
    next_index = 1
    
    print("===== LZ78 Encoding Process =====")
    print("Step | Current | Match | Output | New Entry")
    print("-----|---------|-------|--------|----------")
    
    step = 0
    while i < len(input_string):
        # Find longest match
        match = ''
        match_idx = 0
        
        for j in range(i, len(input_string) + 1):
            substring = input_string[i:j]
            if substring in dictionary:
                match = substring
                match_idx = dictionary[substring]
            else:
                break
        
        # Get next symbol
        next_pos = i + len(match)
        next_symbol = input_string[next_pos] if next_pos < len(input_string) else ''
        
        # Output pair
        output.append((match_idx, next_symbol))
        
        # Add new phrase to dictionary
        if next_symbol:
            new_phrase = match + next_symbol
            dictionary[new_phrase] = next_index
            next_index += 1
        
        step += 1
        current = input_string[i:next_pos + (1 if next_symbol else 0)]
        dict_entries = [k for k in dictionary.keys() if k]
        print(f" {step:2d}  |   {current:<5} |   {match_idx}   | ({match_idx},{next_symbol or 'ฮต'}) | {dict_entries}")
        
        i = next_pos + (1 if next_symbol else 0)
    
    return output, dictionary

def lz78_decode(encoded):
    """Decode LZ78 encoded data."""
    dictionary = {0: ''}
    decoded = ''
    next_index = 1
    
    for idx, symbol in encoded:
        phrase = dictionary[idx] + symbol
        decoded += phrase
        
        if symbol:
            dictionary[next_index] = phrase
            next_index += 1
    
    return decoded

# Example
if __name__ == "__main__":
    input_string = "AABABAABAB"
    print(f"Input: {input_string}\n")
    
    encoded, dictionary = lz78_encode(input_string)
    
    print(f"\nEncoded: {encoded}")
    
    decoded = lz78_decode(encoded)
    print(f"Decoded: {decoded}")
    print(f"Match: {input_string == decoded}")

๐Ÿ“ก Channel Capacity CLO-3

๐Ÿ“ Shannon's Channel Capacity

Binary Symmetric Channel (BSC):

C = 1 - H(p) = 1 + p logโ‚‚(p) + (1-p) logโ‚‚(1-p)

AWGN Channel:

C = W logโ‚‚(1 + SNR) bits/second

Binary Symmetric Channel Capacity

bsc_capacity.py
import numpy as np
import matplotlib.pyplot as plt

def bsc_capacity(p):
    """
    Calculate capacity of Binary Symmetric Channel.
    
    Parameters:
        p: crossover probability (error probability)
    
    Returns:
        C: channel capacity in bits per channel use
    """
    if p == 0 or p == 1:
        return 1.0  # Perfect channel
    
    H_p = -p * np.log2(p) - (1-p) * np.log2(1-p)
    return 1 - H_p

# Plot BSC Capacity
if __name__ == "__main__":
    p_values = np.linspace(0.001, 0.5, 500)
    C_values = [bsc_capacity(p) for p in p_values]
    
    plt.figure(figsize=(10, 6))
    plt.plot(p_values, C_values, 'b-', linewidth=2)
    plt.xlabel('Crossover Probability p', fontsize=12)
    plt.ylabel('Channel Capacity C (bits/use)', fontsize=12)
    plt.title('Binary Symmetric Channel Capacity', fontsize=14)
    plt.grid(True, alpha=0.3)
    plt.xlim([0, 0.5])
    plt.ylim([0, 1])
    
    # Mark specific point
    p_mark = 0.1
    C_mark = bsc_capacity(p_mark)
    plt.plot(p_mark, C_mark, 'ro', markersize=8)
    plt.annotate(f'C({p_mark}) = {C_mark:.3f}', 
                 xy=(p_mark, C_mark), xytext=(p_mark+0.05, C_mark+0.05))
    
    plt.tight_layout()
    plt.savefig('bsc_capacity.png', dpi=150)
    plt.show()

AWGN Channel Capacity

awgn_capacity.py
import numpy as np
import matplotlib.pyplot as plt

def awgn_capacity(SNR_dB, W=1):
    """
    Calculate capacity of AWGN channel.
    
    Parameters:
        SNR_dB: Signal-to-noise ratio in dB
        W: Bandwidth in Hz (default: 1 for normalized)
    
    Returns:
        C: Channel capacity in bits/second
    """
    SNR_linear = 10 ** (SNR_dB / 10)
    return W * np.log2(1 + SNR_linear)

# Plot AWGN Capacity
if __name__ == "__main__":
    SNR_dB = np.linspace(-10, 30, 500)
    C_values = [awgn_capacity(snr) for snr in SNR_dB]
    
    plt.figure(figsize=(10, 6))
    plt.plot(SNR_dB, C_values, 'b-', linewidth=2)
    plt.xlabel('SNR (dB)', fontsize=12)
    plt.ylabel('Channel Capacity C (bits/s/Hz)', fontsize=12)
    plt.title('AWGN Channel Capacity (Shannon Limit)', fontsize=14)
    plt.grid(True, alpha=0.3)
    
    # Mark reference points
    for snr in [0, 10, 20]:
        cap = awgn_capacity(snr)
        plt.plot(snr, cap, 'ro', markersize=8)
        plt.annotate(f'{cap:.2f}', xy=(snr, cap), xytext=(snr+1, cap+0.2))
    
    plt.tight_layout()
    plt.savefig('awgn_capacity.png', dpi=150)
    plt.show()
    
    # Print table
    print("\n===== AWGN Channel Capacity Table =====")
    print("SNR (dB) | SNR (linear) | Capacity (bits/s/Hz)")
    print("---------|--------------|---------------------")
    for snr in [-10, -5, 0, 5, 10, 15, 20, 25, 30]:
        snr_lin = 10 ** (snr / 10)
        cap = awgn_capacity(snr)
        print(f"  {snr:4d}   |    {snr_lin:7.2f}   |       {cap:.4f}")

๐Ÿ”ฒ Linear Block Codes CLO-4

๐Ÿ“ Linear Block Code Basics

An (n, k) linear block code encodes k information bits into n coded bits.

Generator Matrix G: c = m ร— G

Parity-Check Matrix H: c ร— HT = 0 for valid codewords

Syndrome: s = r ร— HT (non-zero indicates errors)

Hamming (7,4) Code Implementation

hamming_74.py
import numpy as np

class HammingCode:
    """Hamming (7,4) Code implementation."""
    
    def __init__(self):
        # Generator matrix (systematic form)
        self.G = np.array([
            [1, 0, 0, 0, 1, 1, 0],
            [0, 1, 0, 0, 1, 0, 1],
            [0, 0, 1, 0, 0, 1, 1],
            [0, 0, 0, 1, 1, 1, 1]
        ])
        
        # Parity-check matrix
        self.H = np.array([
            [1, 1, 0, 1, 1, 0, 0],
            [1, 0, 1, 1, 0, 1, 0],
            [0, 1, 1, 1, 0, 0, 1]
        ])
        
        self.n = 7  # Codeword length
        self.k = 4  # Message length
    
    def encode(self, message):
        """Encode a 4-bit message to 7-bit codeword."""
        message = np.array(message)
        codeword = np.dot(message, self.G) % 2
        return codeword
    
    def syndrome(self, received):
        """Calculate syndrome of received word."""
        received = np.array(received)
        s = np.dot(received, self.H.T) % 2
        return s
    
    def decode(self, received):
        """Decode received word with error correction."""
        received = np.array(received).copy()
        s = self.syndrome(received)
        
        if np.all(s == 0):
            print("No error detected.")
            return received
        
        # Find error position from syndrome
        error_pos = int(''.join(map(str, s)), 2)
        
        if error_pos > 0 and error_pos <= self.n:
            print(f"Error detected at position {error_pos}, correcting...")
            received[error_pos - 1] ^= 1
        
        return received
    
    def get_message(self, codeword):
        """Extract message bits from codeword (systematic)."""
        return codeword[:self.k]

# Example usage
if __name__ == "__main__":
    hamming = HammingCode()
    
    print("===== Hamming (7,4) Code =====\n")
    print("Generator Matrix G:")
    print(hamming.G)
    print("\nParity-Check Matrix H:")
    print(hamming.H)
    
    # Verify G ร— H^T = 0
    GHt = np.dot(hamming.G, hamming.H.T) % 2
    print("\nVerification G ร— H^T (should be zeros):")
    print(GHt)
    
    # Encode a message
    message = [1, 0, 1, 1]
    print(f"\nOriginal message: {message}")
    
    codeword = hamming.encode(message)
    print(f"Encoded codeword: {codeword}")
    
    # Introduce error
    error_pos = 3
    received = codeword.copy()
    received[error_pos - 1] ^= 1
    print(f"\nReceived (error at pos {error_pos}): {received}")
    
    # Decode
    corrected = hamming.decode(received)
    print(f"Corrected: {corrected}")
    print(f"Matches original: {np.array_equal(corrected, codeword)}")

Generate All Codewords

all_codewords.py
import numpy as np
from itertools import product

def generate_all_codewords(hamming):
    """Generate all 16 codewords of Hamming (7,4) code."""
    print("\n===== All 16 Codewords =====")
    print("Message | Codeword    | Weight")
    print("--------|-------------|-------")
    
    weights = []
    for bits in product([0, 1], repeat=4):
        message = list(bits)
        codeword = hamming.encode(message)
        weight = np.sum(codeword)
        weights.append(weight)
        
        msg_str = ''.join(map(str, message))
        cw_str = ''.join(map(str, codeword))
        print(f" {msg_str}   | {cw_str}   |   {weight}")
    
    # Minimum distance (excluding zero codeword)
    d_min = min(w for w in weights if w > 0)
    print(f"\nMinimum distance d_min = {d_min}")
    print(f"Error correction capability: {(d_min-1)//2} error(s)")

if __name__ == "__main__":
    hamming = HammingCode()
    generate_all_codewords(hamming)

โ™ป๏ธ Cyclic Codes CLO-4

๐Ÿ“ Cyclic Code Properties

A cyclic code is a linear block code where any cyclic shift of a codeword is also a codeword.

Generator Polynomial g(x): Divides xn + 1

CRC Implementation

crc.py
import numpy as np

def crc_encode(data, generator):
    """
    Generate CRC codeword.
    
    Parameters:
        data: input data bits (list)
        generator: CRC generator polynomial coefficients
    
    Returns:
        tuple: (codeword, fcs)
    """
    data = list(data)
    generator = list(generator)
    r = len(generator) - 1  # CRC bits
    
    # Append r zeros to data
    augmented = data + [0] * r
    
    # Perform polynomial division (XOR-based)
    temp = augmented.copy()
    for i in range(len(data)):
        if temp[i] == 1:
            for j in range(len(generator)):
                temp[i + j] ^= generator[j]
    
    # FCS is the remainder
    fcs = temp[-r:]
    
    # Codeword = data + FCS
    codeword = data + fcs
    
    return codeword, fcs

def crc_check(received, generator):
    """Verify received codeword."""
    received = list(received)
    generator = list(generator)
    r = len(generator) - 1
    
    temp = received.copy()
    for i in range(len(received) - r):
        if temp[i] == 1:
            for j in range(len(generator)):
                temp[i + j] ^= generator[j]
    
    remainder = temp[-r:]
    return all(b == 0 for b in remainder)

# Example
if __name__ == "__main__":
    data = [1, 1, 0, 1, 0, 1, 1]
    generator = [1, 0, 0, 1, 1]  # x^4 + x + 1
    
    print("===== CRC Example =====")
    print(f"Data: {''.join(map(str, data))}")
    print(f"Generator (CRC-4): {''.join(map(str, generator))} (xโด + x + 1)")
    
    codeword, fcs = crc_encode(data, generator)
    print(f"FCS (CRC): {''.join(map(str, fcs))}")
    print(f"Transmitted: {''.join(map(str, codeword))}")
    
    # Verify
    is_valid = crc_check(codeword, generator)
    print(f"Verification (no error): {is_valid}")
    
    # Introduce error
    received_err = codeword.copy()
    received_err[3] ^= 1
    is_valid_err = crc_check(received_err, generator)
    print(f"Verification (with error): {is_valid_err}")

๐Ÿ”— Convolutional Codes CLO-5

๐Ÿ“ Convolutional Code Parameters

Rate: R = k/n (k input bits, n output bits)

Constraint Length: K (memory + 1)

Generator Polynomials: Define connections to output

Rate 1/2 Convolutional Encoder

conv_encoder.py
import numpy as np

class ConvolutionalEncoder:
    """Rate 1/2 Convolutional Encoder."""
    
    def __init__(self, g1, g2, K):
        """
        Initialize encoder.
        
        Parameters:
            g1, g2: generator polynomials (lists)
            K: constraint length
        """
        self.g1 = np.array(g1)
        self.g2 = np.array(g2)
        self.K = K
        self.memory = K - 1
    
    def encode(self, input_bits):
        """Encode input bits."""
        input_bits = np.array(input_bits)
        n_bits = len(input_bits)
        
        # Initialize state
        state = np.zeros(self.memory, dtype=int)
        output = np.zeros(2 * n_bits, dtype=int)
        
        for i, bit in enumerate(input_bits):
            # Register contents
            reg = np.concatenate([[bit], state])
            
            # Calculate outputs
            out1 = np.sum(reg * self.g1) % 2
            out2 = np.sum(reg * self.g2) % 2
            
            output[2*i] = out1
            output[2*i + 1] = out2
            
            # Shift register
            state = np.concatenate([[bit], state[:-1]])
        
        return output
    
    def print_state_table(self):
        """Print state transition table."""
        print("===== State Transition Table =====")
        print("Current | Input | Next  | Output")
        print("--------|-------|-------|-------")
        
        num_states = 2 ** self.memory
        
        for s in range(num_states):
            state = [(s >> i) & 1 for i in range(self.memory-1, -1, -1)]
            
            for in_bit in [0, 1]:
                reg = [in_bit] + state
                
                out1 = sum(r * g for r, g in zip(reg, self.g1)) % 2
                out2 = sum(r * g for r, g in zip(reg, self.g2)) % 2
                
                next_state = [in_bit] + state[:-1]
                next_dec = sum(b << (self.memory-1-i) for i, b in enumerate(next_state))
                
                state_str = ''.join(map(str, state))
                next_str = ''.join(map(str, next_state))
                print(f"   {state_str}   |   {in_bit}   |  {next_str}  |   {out1}{out2}")

# Example
if __name__ == "__main__":
    # Standard rate 1/2, K=3 encoder
    g1 = [1, 1, 1]  # Octal 7
    g2 = [1, 0, 1]  # Octal 5
    K = 3
    
    encoder = ConvolutionalEncoder(g1, g2, K)
    
    print("===== Rate 1/2, K=3 Convolutional Encoder =====")
    print(f"Generator 1: {g1} (octal 7)")
    print(f"Generator 2: {g2} (octal 5)")
    
    input_bits = [1, 0, 1, 1, 0]
    print(f"\nInput: {''.join(map(str, input_bits))}")
    
    output = encoder.encode(input_bits)
    print(f"Output: {''.join(map(str, output))}")
    
    print()
    encoder.print_state_table()

๐ŸŽฏ Viterbi Algorithm CLO-5

๐Ÿ“ Viterbi Decoding

Maximum likelihood sequence decoding algorithm:

  1. Initialize: Set initial path metrics
  2. Branch Metrics: Calculate Hamming distance
  3. Add-Compare-Select: Keep best path to each state
  4. Traceback: Find path with minimum metric

Viterbi Decoder Implementation

viterbi.py
import numpy as np

class ViterbiDecoder:
    """Viterbi decoder for rate 1/2 convolutional codes."""
    
    def __init__(self, g1, g2, K):
        self.g1 = np.array(g1)
        self.g2 = np.array(g2)
        self.K = K
        self.memory = K - 1
        self.num_states = 2 ** self.memory
    
    def _get_output(self, state, input_bit):
        """Get expected output for given state and input."""
        state_bits = [(state >> i) & 1 for i in range(self.memory-1, -1, -1)]
        reg = np.array([input_bit] + state_bits)
        out1 = np.sum(reg * self.g1) % 2
        out2 = np.sum(reg * self.g2) % 2
        return out1, out2
    
    def _get_next_state(self, state, input_bit):
        """Get next state for given current state and input."""
        state_bits = [(state >> i) & 1 for i in range(self.memory-1, -1, -1)]
        next_bits = [input_bit] + state_bits[:-1]
        return sum(b << (self.memory-1-i) for i, b in enumerate(next_bits))
    
    def decode(self, received):
        """Decode received sequence using Viterbi algorithm."""
        received = np.array(received)
        n_branches = len(received) // 2
        
        # Initialize path metrics
        path_metrics = np.full(self.num_states, np.inf)
        path_metrics[0] = 0  # Start at state 0
        
        # Store survivors
        survivors = np.zeros((self.num_states, n_branches), dtype=int)
        
        # Forward pass
        for t in range(n_branches):
            r1, r2 = received[2*t], received[2*t + 1]
            new_metrics = np.full(self.num_states, np.inf)
            
            for curr_state in range(self.num_states):
                if path_metrics[curr_state] == np.inf:
                    continue
                
                for in_bit in [0, 1]:
                    exp1, exp2 = self._get_output(curr_state, in_bit)
                    branch_metric = (r1 != exp1) + (r2 != exp2)
                    
                    next_state = self._get_next_state(curr_state, in_bit)
                    total_metric = path_metrics[curr_state] + branch_metric
                    
                    if total_metric < new_metrics[next_state]:
                        new_metrics[next_state] = total_metric
                        survivors[next_state, t] = curr_state * 2 + in_bit
            
            path_metrics = new_metrics
        
        # Traceback
        final_state = np.argmin(path_metrics)
        final_metric = path_metrics[final_state]
        
        decoded = np.zeros(n_branches, dtype=int)
        current_state = final_state
        
        for t in range(n_branches - 1, -1, -1):
            survivor_info = survivors[current_state, t]
            prev_state = survivor_info // 2
            input_bit = survivor_info % 2
            
            decoded[t] = input_bit
            current_state = prev_state
        
        return decoded, int(final_metric)

# Example
if __name__ == "__main__":
    g1 = [1, 1, 1]
    g2 = [1, 0, 1]
    K = 3
    
    encoder = ConvolutionalEncoder(g1, g2, K)
    decoder = ViterbiDecoder(g1, g2, K)
    
    # Original message (with tail bits)
    message = np.array([1, 0, 1, 1, 0, 0])
    
    print("===== Viterbi Decoder Test =====")
    print(f"Original message: {''.join(map(str, message))}")
    
    # Encode
    encoded = encoder.encode(message)
    print(f"Encoded: {''.join(map(str, encoded))}")
    
    # Add error
    received = encoded.copy()
    received[5] ^= 1  # Flip bit
    print(f"Received (with error): {''.join(map(str, received))}")
    
    # Decode
    decoded, metric = decoder.decode(received)
    print(f"Decoded: {''.join(map(str, decoded))}")
    print(f"Path metric: {metric}")
    print(f"Decoding successful: {np.array_equal(decoded, message)}")

๐Ÿ“ˆ BER Performance Simulation CLO-6

Complete BER Simulation

ber_simulation.py
import numpy as np
import matplotlib.pyplot as plt
from scipy.special import erfc

def ber_simulation():
    """Compare coded vs uncoded BER performance."""
    
    # Parameters
    g1 = [1, 1, 1]
    g2 = [1, 0, 1]
    K = 3
    
    num_bits = 10000
    EbN0_dB = np.arange(0, 10.5, 0.5)
    num_trials = 5
    
    encoder = ConvolutionalEncoder(g1, g2, K)
    decoder = ViterbiDecoder(g1, g2, K)
    
    # Results storage
    ber_uncoded = np.zeros_like(EbN0_dB)
    ber_coded = np.zeros_like(EbN0_dB)
    
    print("===== BER Simulation =====")
    
    for idx, snr_db in enumerate(EbN0_dB):
        errors_uncoded = 0
        errors_coded = 0
        total_bits = 0
        
        for trial in range(num_trials):
            # Generate random bits
            bits = np.random.randint(0, 2, num_bits)
            bits_tail = np.concatenate([bits, np.zeros(K-1, dtype=int)])
            
            # Encode
            coded = encoder.encode(bits_tail)
            
            # BPSK modulation (0 -> +1, 1 -> -1)
            tx_uncoded = 2 * bits - 1
            tx_coded = 2 * coded - 1
            
            # Channel (AWGN)
            EbN0_linear = 10 ** (snr_db / 10)
            
            # Uncoded
            noise_var_uncoded = 1 / (2 * EbN0_linear)
            noise_uncoded = np.sqrt(noise_var_uncoded) * np.random.randn(len(tx_uncoded))
            rx_uncoded = tx_uncoded + noise_uncoded
            
            # Coded (rate 1/2)
            noise_var_coded = 1 / (2 * EbN0_linear * 0.5)
            noise_coded = np.sqrt(noise_var_coded) * np.random.randn(len(tx_coded))
            rx_coded = tx_coded + noise_coded
            
            # Hard decision
            decoded_uncoded = (rx_uncoded < 0).astype(int)
            received_hard = (rx_coded < 0).astype(int)
            
            # Viterbi decoding
            decoded_coded, _ = decoder.decode(received_hard)
            
            # Count errors
            errors_uncoded += np.sum(bits != decoded_uncoded)
            errors_coded += np.sum(bits != decoded_coded[:num_bits])
            total_bits += num_bits
        
        ber_uncoded[idx] = errors_uncoded / total_bits
        ber_coded[idx] = errors_coded / total_bits
        
        print(f"Eb/N0 = {snr_db:4.1f} dB: BER_coded = {ber_coded[idx]:.2e}")
    
    # Theoretical BER for uncoded BPSK
    ber_theory = 0.5 * erfc(np.sqrt(10 ** (EbN0_dB / 10)))
    
    # Plot results
    plt.figure(figsize=(10, 7))
    plt.semilogy(EbN0_dB, ber_theory, 'k-', linewidth=2, label='Uncoded BPSK (Theory)')
    plt.semilogy(EbN0_dB, ber_uncoded, 'bo-', linewidth=1.5, markersize=6, label='Uncoded BPSK (Sim)')
    plt.semilogy(EbN0_dB, ber_coded, 'rs-', linewidth=1.5, markersize=6, label=f'Conv Code R=1/2, K={K} (Sim)')
    
    plt.xlabel('$E_b/N_0$ (dB)', fontsize=12)
    plt.ylabel('Bit Error Rate (BER)', fontsize=12)
    plt.title('BER Performance: Coded vs Uncoded BPSK', fontsize=14)
    plt.legend(loc='lower left', fontsize=11)
    plt.grid(True, which='both', alpha=0.3)
    plt.xlim([min(EbN0_dB), max(EbN0_dB)])
    plt.ylim([1e-6, 1])
    
    plt.tight_layout()
    plt.savefig('ber_comparison.png', dpi=150)
    plt.show()

if __name__ == "__main__":
    ber_simulation()

โš ๏ธ Simulation Tips

  • Use enough bits for statistical significance (โ‰ฅ10,000)
  • Run multiple trials and average results
  • Account for code rate when calculating noise variance
  • Use NumPy vectorization for speed

๐Ÿ’ก Tips & Best Practices

Python vs MATLAB Equivalents

MATLABPython (NumPy)
zeros(m, n)np.zeros((m, n))
ones(m, n)np.ones((m, n))
eye(n)np.eye(n)
A' (transpose)A.T
A * B (matrix)np.dot(A, B) or A @ B
A .* B (element)A * B
mod(A, 2)A % 2
de2bi(n, k)[(n >> i) & 1 for i in range(k-1,-1,-1)]
bi2de(b)int(''.join(map(str, b)), 2)

Useful Libraries

NumPy

  • np.array() - Create arrays
  • np.dot() - Matrix multiply
  • np.sum() - Sum elements
  • np.log2() - Log base 2

Matplotlib

  • plt.plot() - Line plots
  • plt.semilogy() - Log y-axis
  • plt.xlabel() - Axis labels
  • plt.legend() - Add legend

SciPy

  • special.erfc() - Complementary error
  • signal.convolve() - Convolution
  • linalg.inv() - Matrix inverse

Built-in

  • heapq - Priority queues
  • collections - Defaultdict
  • itertools - Combinatorics

โœ… Assignment Submission Checklist

  • โœ“ Header with name, ID, assignment number
  • โœ“ Code runs without errors
  • โœ“ All imports at the top
  • โœ“ Functions have docstrings
  • โœ“ Figures have titles, labels, and legends
  • โœ“ Results printed clearly with f-strings
  • โœ“ Code follows PEP 8 style guide

๐Ÿ“š Additional Resources

Need Help?

Contact Dr. Al Bataineh during office hours (Tuesday/Thursday 11:00 AM - 12:00 PM)

Email: [email protected] | Office: F1-1175

โ† Back to ECOM 561 Course Page