Python Tutorial
ECOM 561 - Information Theory and Coding | Spring 2026
๐ Table of Contents
๐ Introduction & Setup
This tutorial provides Python implementations for key concepts in Information Theory and Coding. Python offers excellent libraries for numerical computing and visualization, making it an ideal choice for understanding and implementing coding algorithms.
๐ Course Learning Outcome
CLO-6: Evaluate the performance of selected coding algorithms on some error-correction and coding techniques using Python.
Required Libraries
pip install numpy scipy matplotlib
import numpy as np
import matplotlib.pyplot as plt
from scipy import special
from collections import defaultdict
import heapq
# For nicer plots
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = [10, 6]
plt.rcParams['font.size'] = 12
Basic Setup Template
"""
ECOM 561 - Information Theory and Coding
Assignment/Project: [Title]
Student Name: [Your Name]
Student ID: [Your ID]
Date: [Date]
Description: [Brief description of what this code does]
"""
import numpy as np
import matplotlib.pyplot as plt
# ============================================
# Main Code Starts Here
# ============================================
def main():
"""Main function to run the analysis."""
# Your implementation goes here
pass
if __name__ == "__main__":
main()
๐ก Tip: Using Jupyter Notebooks
Consider using Jupyter Notebooks (pip install jupyter) for interactive development. You can run cells individually and see results immediately, which is great for learning and experimentation.
๐ Entropy & Information Measures CLO-1
๐ Theory Review
Self-Information: I(x) = -logโ(P(x)) bits
Entropy: H(X) = -ฮฃ P(xแตข) logโ(P(xแตข)) bits
Joint Entropy: H(X,Y) = -ฮฃแตข ฮฃโฑผ P(xแตข,yโฑผ) logโ(P(xแตข,yโฑผ))
Mutual Information: I(X;Y) = H(X) + H(Y) - H(X,Y)
Self-Information Calculator
import numpy as np
def self_information(p):
"""
Calculate self-information of an event.
Parameters:
p: probability of the event (0 < p <= 1)
Returns:
I: self-information in bits
"""
if p <= 0 or p > 1:
raise ValueError("Probability must be in range (0, 1]")
return -np.log2(p)
# Example Usage
if __name__ == "__main__":
# Event with probability 0.5 (fair coin flip)
p = 0.5
I = self_information(p)
print(f"Self-information for P={p:.2f}: {I:.4f} bits")
# Multiple events
probs = [0.9, 0.5, 0.1, 0.01]
for p in probs:
I = self_information(p)
print(f"P = {p:.2f} โ I = {I:.4f} bits")
Entropy Calculator
import numpy as np
import matplotlib.pyplot as plt
def calculate_entropy(P):
"""
Calculate entropy of a discrete source.
Parameters:
P: probability distribution (array-like, must sum to 1)
Returns:
H: entropy in bits
"""
P = np.array(P)
# Validate input
if not np.isclose(np.sum(P), 1.0):
raise ValueError(f"Probabilities must sum to 1. Sum = {np.sum(P):.6f}")
if np.any(P < 0):
raise ValueError("Probabilities cannot be negative")
# Calculate entropy (handle P=0 case)
# Use np.where to avoid log(0)
H = -np.sum(np.where(P > 0, P * np.log2(P), 0))
return H
def binary_entropy(p):
"""Calculate binary entropy function H(p)."""
if p == 0 or p == 1:
return 0
return calculate_entropy([p, 1-p])
# Plot binary entropy function
if __name__ == "__main__":
p_values = np.linspace(0.001, 0.999, 1000)
H_values = [binary_entropy(p) for p in p_values]
plt.figure(figsize=(10, 6))
plt.plot(p_values, H_values, 'b-', linewidth=2)
plt.xlabel('Probability p', fontsize=12)
plt.ylabel('H(p) [bits]', fontsize=12)
plt.title('Binary Entropy Function H(p) = -p logโ(p) - (1-p) logโ(1-p)', fontsize=14)
plt.grid(True, alpha=0.3)
plt.xlim([0, 1])
plt.ylim([0, 1.1])
# Mark maximum entropy point
plt.plot(0.5, 1, 'ro', markersize=10)
plt.annotate('Maximum at p=0.5', xy=(0.5, 1), xytext=(0.55, 0.95), fontsize=11)
plt.tight_layout()
plt.savefig('binary_entropy.png', dpi=150)
plt.show()
Joint and Conditional Entropy
import numpy as np
def joint_conditional_entropy(P_XY):
"""
Calculate all entropy measures from joint probability matrix.
Parameters:
P_XY: joint probability matrix P(X,Y) as 2D numpy array
Returns:
dict with H_XY, H_X, H_Y, H_X_given_Y, H_Y_given_X, I_XY
"""
P_XY = np.array(P_XY)
# Marginal distributions
P_X = np.sum(P_XY, axis=1) # Sum over Y (columns)
P_Y = np.sum(P_XY, axis=0) # Sum over X (rows)
# Individual entropies
H_X = calculate_entropy(P_X)
H_Y = calculate_entropy(P_Y)
# Joint entropy H(X,Y)
H_XY = -np.sum(np.where(P_XY > 0, P_XY * np.log2(P_XY), 0))
# Conditional entropies
H_X_given_Y = H_XY - H_Y # H(X|Y) = H(X,Y) - H(Y)
H_Y_given_X = H_XY - H_X # H(Y|X) = H(X,Y) - H(X)
# Mutual information
I_XY = H_X + H_Y - H_XY # I(X;Y) = H(X) + H(Y) - H(X,Y)
return {
'H_X': H_X,
'H_Y': H_Y,
'H_XY': H_XY,
'H_X_given_Y': H_X_given_Y,
'H_Y_given_X': H_Y_given_X,
'I_XY': I_XY
}
# Example: Binary Symmetric Channel Analysis
if __name__ == "__main__":
p = 0.1 # Crossover probability
# Joint probability matrix for BSC with uniform input
P_XY = np.array([
[0.5*(1-p), 0.5*p], # P(X=0,Y=0), P(X=0,Y=1)
[0.5*p, 0.5*(1-p)] # P(X=1,Y=0), P(X=1,Y=1)
])
results = joint_conditional_entropy(P_XY)
print(f"===== Binary Symmetric Channel Analysis (p={p:.2f}) =====")
print(f"H(X) = {results['H_X']:.4f} bits")
print(f"H(Y) = {results['H_Y']:.4f} bits")
print(f"H(X,Y) = {results['H_XY']:.4f} bits")
print(f"H(X|Y) = {results['H_X_given_Y']:.4f} bits")
print(f"H(Y|X) = {results['H_Y_given_X']:.4f} bits")
print(f"I(X;Y) = {results['I_XY']:.4f} bits")
โ Key Relationships to Remember
- Chain Rule: H(X,Y) = H(X) + H(Y|X) = H(Y) + H(X|Y)
- Conditioning Reduces Entropy: H(X|Y) โค H(X)
- Mutual Information: I(X;Y) = H(X) - H(X|Y) โฅ 0
- Maximum Entropy: Uniform distribution maximizes entropy
๐ฆ Source Coding Fundamentals CLO-2
๐ Source Coding Theorem
For a discrete memoryless source with entropy H(X), the average codeword length L satisfies:
H(X) โค L < H(X) + 1
Kraft Inequality Checker
import numpy as np
def kraft_inequality(lengths, r=2):
"""
Check if codeword lengths satisfy Kraft inequality.
Parameters:
lengths: list of codeword lengths
r: radix/alphabet size (default: 2 for binary)
Returns:
tuple: (K, is_valid) - Kraft sum and validity
"""
lengths = np.array(lengths)
K = np.sum(r ** (-lengths))
is_valid = K <= 1
print("===== Kraft Inequality Check =====")
print(f"Codeword lengths: {lengths}")
print(f"Radix r = {r}")
print(f"Kraft sum K = {K:.4f}")
if is_valid:
print("โ K โค 1: A prefix-free code EXISTS")
if K < 1:
print(" (Code can be improved - K < 1)")
else:
print("โ K > 1: NO prefix-free code exists")
return K, is_valid
# Examples
if __name__ == "__main__":
print("Example 1: Valid code")
kraft_inequality([1, 2, 3, 3])
print("\nExample 2: Invalid lengths")
kraft_inequality([1, 1, 2, 2])
print("\nExample 3: Optimal code (K = 1)")
kraft_inequality([2, 2, 2, 2])
Average Codeword Length
import numpy as np
def avg_codeword_length(P, lengths):
"""
Calculate average codeword length and efficiency.
Parameters:
P: probability distribution
lengths: codeword lengths for each symbol
Returns:
tuple: (L_avg, efficiency, redundancy)
"""
P = np.array(P)
lengths = np.array(lengths)
# Average codeword length
L_avg = np.sum(P * lengths)
# Entropy
H = calculate_entropy(P)
# Efficiency and redundancy
efficiency = (H / L_avg) * 100 # Percentage
redundancy = L_avg - H
print("===== Source Coding Analysis =====")
print(f"Source Entropy H(X) = {H:.4f} bits")
print(f"Average Length L = {L_avg:.4f} bits")
print(f"Coding Efficiency = {efficiency:.2f}%")
print(f"Redundancy = {redundancy:.4f} bits")
return L_avg, efficiency, redundancy
# Example
if __name__ == "__main__":
symbols = ['A', 'B', 'C', 'D']
P = [0.5, 0.25, 0.125, 0.125]
lengths = [1, 2, 3, 3]
codes = ['0', '10', '110', '111']
print("Symbol | Prob | Length | Code")
print("-------|-------|--------|------")
for s, p, l, c in zip(symbols, P, lengths, codes):
print(f" {s} | {p:.3f} | {l} | {c}")
print()
avg_codeword_length(P, lengths)
๐ณ Huffman Coding CLO-2
๐ Huffman Algorithm
Huffman coding produces an optimal prefix-free code:
- Sort symbols by probability (ascending)
- Combine two lowest probability symbols
- Repeat until single node remains
- Assign bits by traversing tree (0=left, 1=right)
Huffman Coding Implementation
import heapq
from collections import defaultdict
import numpy as np
class HuffmanNode:
"""Node for Huffman tree."""
def __init__(self, symbol=None, prob=0):
self.symbol = symbol
self.prob = prob
self.left = None
self.right = None
def __lt__(self, other):
return self.prob < other.prob
def huffman_encode(symbols, probabilities):
"""
Generate Huffman codes for given symbols.
Parameters:
symbols: list of symbol names
probabilities: list of probabilities
Returns:
dict: symbol -> code mapping
"""
# Create leaf nodes and add to heap
heap = []
for s, p in zip(symbols, probabilities):
node = HuffmanNode(s, p)
heapq.heappush(heap, node)
# Build Huffman tree
while len(heap) > 1:
# Pop two smallest nodes
left = heapq.heappop(heap)
right = heapq.heappop(heap)
# Create parent node
parent = HuffmanNode(prob=left.prob + right.prob)
parent.left = left
parent.right = right
heapq.heappush(heap, parent)
# Generate codes by traversing tree
root = heap[0]
codes = {}
def traverse(node, code=""):
if node.symbol is not None:
codes[node.symbol] = code if code else "0"
else:
if node.left:
traverse(node.left, code + "0")
if node.right:
traverse(node.right, code + "1")
traverse(root)
return codes
def huffman_analysis(symbols, probabilities):
"""Perform complete Huffman coding analysis."""
codes = huffman_encode(symbols, probabilities)
# Calculate statistics
lengths = [len(codes[s]) for s in symbols]
P = np.array(probabilities)
L_avg = np.sum(P * lengths)
H = -np.sum(P * np.log2(P))
efficiency = (H / L_avg) * 100
print("===== Huffman Coding Results =====")
print("Symbol | Prob | Length | Code")
print("-------|--------|--------|----------")
for s, p in zip(symbols, probabilities):
print(f" {s} | {p:.4f} | {len(codes[s])} | {codes[s]}")
print(f"\nEntropy H(X) = {H:.4f} bits")
print(f"Average Length L = {L_avg:.4f} bits")
print(f"Efficiency = {efficiency:.2f}%")
return codes
# Example
if __name__ == "__main__":
symbols = ['A', 'B', 'C', 'D', 'E']
probabilities = [0.4, 0.2, 0.2, 0.1, 0.1]
codes = huffman_analysis(symbols, probabilities)
Huffman Encoding/Decoding Functions
def huffman_encode_message(message, codes):
"""Encode a message using Huffman codes."""
return ''.join(codes[char] for char in message if char in codes)
def huffman_decode_message(bitstring, codes):
"""Decode a Huffman-encoded bitstring."""
# Create reverse lookup
reverse_codes = {v: k for k, v in codes.items()}
decoded = []
buffer = ""
for bit in bitstring:
buffer += bit
if buffer in reverse_codes:
decoded.append(reverse_codes[buffer])
buffer = ""
if buffer:
print(f"Warning: Incomplete codeword remaining: {buffer}")
return ''.join(decoded)
# Example
if __name__ == "__main__":
symbols = ['A', 'B', 'C', 'D']
probs = [0.5, 0.25, 0.125, 0.125]
codes = huffman_encode(symbols, probs)
message = "ABACADA"
print(f"Original message: {message}")
encoded = huffman_encode_message(message, codes)
print(f"Encoded: {encoded} ({len(encoded)} bits)")
decoded = huffman_decode_message(encoded, codes)
print(f"Decoded: {decoded}")
# Compression ratio
original_bits = len(message) * 2 # Fixed 2-bit encoding
compression = (1 - len(encoded)/original_bits) * 100
print(f"Compression: {original_bits} bits โ {len(encoded)} bits ({compression:.1f}% reduction)")
๐ Lempel-Ziv Coding CLO-2
๐ LZ78 Algorithm
Dictionary-based compression that builds a codebook dynamically:
- Start with empty dictionary
- Read input until finding longest match in dictionary
- Output (index, next_symbol) pair
- Add new phrase to dictionary
LZ78 Implementation
def lz78_encode(input_string):
"""
Encode a string using LZ78 algorithm.
Parameters:
input_string: string to encode
Returns:
tuple: (output pairs, dictionary)
"""
dictionary = {'': 0} # Empty string has index 0
output = []
i = 0
next_index = 1
print("===== LZ78 Encoding Process =====")
print("Step | Current | Match | Output | New Entry")
print("-----|---------|-------|--------|----------")
step = 0
while i < len(input_string):
# Find longest match
match = ''
match_idx = 0
for j in range(i, len(input_string) + 1):
substring = input_string[i:j]
if substring in dictionary:
match = substring
match_idx = dictionary[substring]
else:
break
# Get next symbol
next_pos = i + len(match)
next_symbol = input_string[next_pos] if next_pos < len(input_string) else ''
# Output pair
output.append((match_idx, next_symbol))
# Add new phrase to dictionary
if next_symbol:
new_phrase = match + next_symbol
dictionary[new_phrase] = next_index
next_index += 1
step += 1
current = input_string[i:next_pos + (1 if next_symbol else 0)]
dict_entries = [k for k in dictionary.keys() if k]
print(f" {step:2d} | {current:<5} | {match_idx} | ({match_idx},{next_symbol or 'ฮต'}) | {dict_entries}")
i = next_pos + (1 if next_symbol else 0)
return output, dictionary
def lz78_decode(encoded):
"""Decode LZ78 encoded data."""
dictionary = {0: ''}
decoded = ''
next_index = 1
for idx, symbol in encoded:
phrase = dictionary[idx] + symbol
decoded += phrase
if symbol:
dictionary[next_index] = phrase
next_index += 1
return decoded
# Example
if __name__ == "__main__":
input_string = "AABABAABAB"
print(f"Input: {input_string}\n")
encoded, dictionary = lz78_encode(input_string)
print(f"\nEncoded: {encoded}")
decoded = lz78_decode(encoded)
print(f"Decoded: {decoded}")
print(f"Match: {input_string == decoded}")
๐ก Channel Capacity CLO-3
๐ Shannon's Channel Capacity
Binary Symmetric Channel (BSC):
C = 1 - H(p) = 1 + p logโ(p) + (1-p) logโ(1-p)
AWGN Channel:
C = W logโ(1 + SNR) bits/second
Binary Symmetric Channel Capacity
import numpy as np
import matplotlib.pyplot as plt
def bsc_capacity(p):
"""
Calculate capacity of Binary Symmetric Channel.
Parameters:
p: crossover probability (error probability)
Returns:
C: channel capacity in bits per channel use
"""
if p == 0 or p == 1:
return 1.0 # Perfect channel
H_p = -p * np.log2(p) - (1-p) * np.log2(1-p)
return 1 - H_p
# Plot BSC Capacity
if __name__ == "__main__":
p_values = np.linspace(0.001, 0.5, 500)
C_values = [bsc_capacity(p) for p in p_values]
plt.figure(figsize=(10, 6))
plt.plot(p_values, C_values, 'b-', linewidth=2)
plt.xlabel('Crossover Probability p', fontsize=12)
plt.ylabel('Channel Capacity C (bits/use)', fontsize=12)
plt.title('Binary Symmetric Channel Capacity', fontsize=14)
plt.grid(True, alpha=0.3)
plt.xlim([0, 0.5])
plt.ylim([0, 1])
# Mark specific point
p_mark = 0.1
C_mark = bsc_capacity(p_mark)
plt.plot(p_mark, C_mark, 'ro', markersize=8)
plt.annotate(f'C({p_mark}) = {C_mark:.3f}',
xy=(p_mark, C_mark), xytext=(p_mark+0.05, C_mark+0.05))
plt.tight_layout()
plt.savefig('bsc_capacity.png', dpi=150)
plt.show()
AWGN Channel Capacity
import numpy as np
import matplotlib.pyplot as plt
def awgn_capacity(SNR_dB, W=1):
"""
Calculate capacity of AWGN channel.
Parameters:
SNR_dB: Signal-to-noise ratio in dB
W: Bandwidth in Hz (default: 1 for normalized)
Returns:
C: Channel capacity in bits/second
"""
SNR_linear = 10 ** (SNR_dB / 10)
return W * np.log2(1 + SNR_linear)
# Plot AWGN Capacity
if __name__ == "__main__":
SNR_dB = np.linspace(-10, 30, 500)
C_values = [awgn_capacity(snr) for snr in SNR_dB]
plt.figure(figsize=(10, 6))
plt.plot(SNR_dB, C_values, 'b-', linewidth=2)
plt.xlabel('SNR (dB)', fontsize=12)
plt.ylabel('Channel Capacity C (bits/s/Hz)', fontsize=12)
plt.title('AWGN Channel Capacity (Shannon Limit)', fontsize=14)
plt.grid(True, alpha=0.3)
# Mark reference points
for snr in [0, 10, 20]:
cap = awgn_capacity(snr)
plt.plot(snr, cap, 'ro', markersize=8)
plt.annotate(f'{cap:.2f}', xy=(snr, cap), xytext=(snr+1, cap+0.2))
plt.tight_layout()
plt.savefig('awgn_capacity.png', dpi=150)
plt.show()
# Print table
print("\n===== AWGN Channel Capacity Table =====")
print("SNR (dB) | SNR (linear) | Capacity (bits/s/Hz)")
print("---------|--------------|---------------------")
for snr in [-10, -5, 0, 5, 10, 15, 20, 25, 30]:
snr_lin = 10 ** (snr / 10)
cap = awgn_capacity(snr)
print(f" {snr:4d} | {snr_lin:7.2f} | {cap:.4f}")
๐ฒ Linear Block Codes CLO-4
๐ Linear Block Code Basics
An (n, k) linear block code encodes k information bits into n coded bits.
Generator Matrix G: c = m ร G
Parity-Check Matrix H: c ร HT = 0 for valid codewords
Syndrome: s = r ร HT (non-zero indicates errors)
Hamming (7,4) Code Implementation
import numpy as np
class HammingCode:
"""Hamming (7,4) Code implementation."""
def __init__(self):
# Generator matrix (systematic form)
self.G = np.array([
[1, 0, 0, 0, 1, 1, 0],
[0, 1, 0, 0, 1, 0, 1],
[0, 0, 1, 0, 0, 1, 1],
[0, 0, 0, 1, 1, 1, 1]
])
# Parity-check matrix
self.H = np.array([
[1, 1, 0, 1, 1, 0, 0],
[1, 0, 1, 1, 0, 1, 0],
[0, 1, 1, 1, 0, 0, 1]
])
self.n = 7 # Codeword length
self.k = 4 # Message length
def encode(self, message):
"""Encode a 4-bit message to 7-bit codeword."""
message = np.array(message)
codeword = np.dot(message, self.G) % 2
return codeword
def syndrome(self, received):
"""Calculate syndrome of received word."""
received = np.array(received)
s = np.dot(received, self.H.T) % 2
return s
def decode(self, received):
"""Decode received word with error correction."""
received = np.array(received).copy()
s = self.syndrome(received)
if np.all(s == 0):
print("No error detected.")
return received
# Find error position from syndrome
error_pos = int(''.join(map(str, s)), 2)
if error_pos > 0 and error_pos <= self.n:
print(f"Error detected at position {error_pos}, correcting...")
received[error_pos - 1] ^= 1
return received
def get_message(self, codeword):
"""Extract message bits from codeword (systematic)."""
return codeword[:self.k]
# Example usage
if __name__ == "__main__":
hamming = HammingCode()
print("===== Hamming (7,4) Code =====\n")
print("Generator Matrix G:")
print(hamming.G)
print("\nParity-Check Matrix H:")
print(hamming.H)
# Verify G ร H^T = 0
GHt = np.dot(hamming.G, hamming.H.T) % 2
print("\nVerification G ร H^T (should be zeros):")
print(GHt)
# Encode a message
message = [1, 0, 1, 1]
print(f"\nOriginal message: {message}")
codeword = hamming.encode(message)
print(f"Encoded codeword: {codeword}")
# Introduce error
error_pos = 3
received = codeword.copy()
received[error_pos - 1] ^= 1
print(f"\nReceived (error at pos {error_pos}): {received}")
# Decode
corrected = hamming.decode(received)
print(f"Corrected: {corrected}")
print(f"Matches original: {np.array_equal(corrected, codeword)}")
Generate All Codewords
import numpy as np
from itertools import product
def generate_all_codewords(hamming):
"""Generate all 16 codewords of Hamming (7,4) code."""
print("\n===== All 16 Codewords =====")
print("Message | Codeword | Weight")
print("--------|-------------|-------")
weights = []
for bits in product([0, 1], repeat=4):
message = list(bits)
codeword = hamming.encode(message)
weight = np.sum(codeword)
weights.append(weight)
msg_str = ''.join(map(str, message))
cw_str = ''.join(map(str, codeword))
print(f" {msg_str} | {cw_str} | {weight}")
# Minimum distance (excluding zero codeword)
d_min = min(w for w in weights if w > 0)
print(f"\nMinimum distance d_min = {d_min}")
print(f"Error correction capability: {(d_min-1)//2} error(s)")
if __name__ == "__main__":
hamming = HammingCode()
generate_all_codewords(hamming)
โป๏ธ Cyclic Codes CLO-4
๐ Cyclic Code Properties
A cyclic code is a linear block code where any cyclic shift of a codeword is also a codeword.
Generator Polynomial g(x): Divides xn + 1
CRC Implementation
import numpy as np
def crc_encode(data, generator):
"""
Generate CRC codeword.
Parameters:
data: input data bits (list)
generator: CRC generator polynomial coefficients
Returns:
tuple: (codeword, fcs)
"""
data = list(data)
generator = list(generator)
r = len(generator) - 1 # CRC bits
# Append r zeros to data
augmented = data + [0] * r
# Perform polynomial division (XOR-based)
temp = augmented.copy()
for i in range(len(data)):
if temp[i] == 1:
for j in range(len(generator)):
temp[i + j] ^= generator[j]
# FCS is the remainder
fcs = temp[-r:]
# Codeword = data + FCS
codeword = data + fcs
return codeword, fcs
def crc_check(received, generator):
"""Verify received codeword."""
received = list(received)
generator = list(generator)
r = len(generator) - 1
temp = received.copy()
for i in range(len(received) - r):
if temp[i] == 1:
for j in range(len(generator)):
temp[i + j] ^= generator[j]
remainder = temp[-r:]
return all(b == 0 for b in remainder)
# Example
if __name__ == "__main__":
data = [1, 1, 0, 1, 0, 1, 1]
generator = [1, 0, 0, 1, 1] # x^4 + x + 1
print("===== CRC Example =====")
print(f"Data: {''.join(map(str, data))}")
print(f"Generator (CRC-4): {''.join(map(str, generator))} (xโด + x + 1)")
codeword, fcs = crc_encode(data, generator)
print(f"FCS (CRC): {''.join(map(str, fcs))}")
print(f"Transmitted: {''.join(map(str, codeword))}")
# Verify
is_valid = crc_check(codeword, generator)
print(f"Verification (no error): {is_valid}")
# Introduce error
received_err = codeword.copy()
received_err[3] ^= 1
is_valid_err = crc_check(received_err, generator)
print(f"Verification (with error): {is_valid_err}")
๐ Convolutional Codes CLO-5
๐ Convolutional Code Parameters
Rate: R = k/n (k input bits, n output bits)
Constraint Length: K (memory + 1)
Generator Polynomials: Define connections to output
Rate 1/2 Convolutional Encoder
import numpy as np
class ConvolutionalEncoder:
"""Rate 1/2 Convolutional Encoder."""
def __init__(self, g1, g2, K):
"""
Initialize encoder.
Parameters:
g1, g2: generator polynomials (lists)
K: constraint length
"""
self.g1 = np.array(g1)
self.g2 = np.array(g2)
self.K = K
self.memory = K - 1
def encode(self, input_bits):
"""Encode input bits."""
input_bits = np.array(input_bits)
n_bits = len(input_bits)
# Initialize state
state = np.zeros(self.memory, dtype=int)
output = np.zeros(2 * n_bits, dtype=int)
for i, bit in enumerate(input_bits):
# Register contents
reg = np.concatenate([[bit], state])
# Calculate outputs
out1 = np.sum(reg * self.g1) % 2
out2 = np.sum(reg * self.g2) % 2
output[2*i] = out1
output[2*i + 1] = out2
# Shift register
state = np.concatenate([[bit], state[:-1]])
return output
def print_state_table(self):
"""Print state transition table."""
print("===== State Transition Table =====")
print("Current | Input | Next | Output")
print("--------|-------|-------|-------")
num_states = 2 ** self.memory
for s in range(num_states):
state = [(s >> i) & 1 for i in range(self.memory-1, -1, -1)]
for in_bit in [0, 1]:
reg = [in_bit] + state
out1 = sum(r * g for r, g in zip(reg, self.g1)) % 2
out2 = sum(r * g for r, g in zip(reg, self.g2)) % 2
next_state = [in_bit] + state[:-1]
next_dec = sum(b << (self.memory-1-i) for i, b in enumerate(next_state))
state_str = ''.join(map(str, state))
next_str = ''.join(map(str, next_state))
print(f" {state_str} | {in_bit} | {next_str} | {out1}{out2}")
# Example
if __name__ == "__main__":
# Standard rate 1/2, K=3 encoder
g1 = [1, 1, 1] # Octal 7
g2 = [1, 0, 1] # Octal 5
K = 3
encoder = ConvolutionalEncoder(g1, g2, K)
print("===== Rate 1/2, K=3 Convolutional Encoder =====")
print(f"Generator 1: {g1} (octal 7)")
print(f"Generator 2: {g2} (octal 5)")
input_bits = [1, 0, 1, 1, 0]
print(f"\nInput: {''.join(map(str, input_bits))}")
output = encoder.encode(input_bits)
print(f"Output: {''.join(map(str, output))}")
print()
encoder.print_state_table()
๐ฏ Viterbi Algorithm CLO-5
๐ Viterbi Decoding
Maximum likelihood sequence decoding algorithm:
- Initialize: Set initial path metrics
- Branch Metrics: Calculate Hamming distance
- Add-Compare-Select: Keep best path to each state
- Traceback: Find path with minimum metric
Viterbi Decoder Implementation
import numpy as np
class ViterbiDecoder:
"""Viterbi decoder for rate 1/2 convolutional codes."""
def __init__(self, g1, g2, K):
self.g1 = np.array(g1)
self.g2 = np.array(g2)
self.K = K
self.memory = K - 1
self.num_states = 2 ** self.memory
def _get_output(self, state, input_bit):
"""Get expected output for given state and input."""
state_bits = [(state >> i) & 1 for i in range(self.memory-1, -1, -1)]
reg = np.array([input_bit] + state_bits)
out1 = np.sum(reg * self.g1) % 2
out2 = np.sum(reg * self.g2) % 2
return out1, out2
def _get_next_state(self, state, input_bit):
"""Get next state for given current state and input."""
state_bits = [(state >> i) & 1 for i in range(self.memory-1, -1, -1)]
next_bits = [input_bit] + state_bits[:-1]
return sum(b << (self.memory-1-i) for i, b in enumerate(next_bits))
def decode(self, received):
"""Decode received sequence using Viterbi algorithm."""
received = np.array(received)
n_branches = len(received) // 2
# Initialize path metrics
path_metrics = np.full(self.num_states, np.inf)
path_metrics[0] = 0 # Start at state 0
# Store survivors
survivors = np.zeros((self.num_states, n_branches), dtype=int)
# Forward pass
for t in range(n_branches):
r1, r2 = received[2*t], received[2*t + 1]
new_metrics = np.full(self.num_states, np.inf)
for curr_state in range(self.num_states):
if path_metrics[curr_state] == np.inf:
continue
for in_bit in [0, 1]:
exp1, exp2 = self._get_output(curr_state, in_bit)
branch_metric = (r1 != exp1) + (r2 != exp2)
next_state = self._get_next_state(curr_state, in_bit)
total_metric = path_metrics[curr_state] + branch_metric
if total_metric < new_metrics[next_state]:
new_metrics[next_state] = total_metric
survivors[next_state, t] = curr_state * 2 + in_bit
path_metrics = new_metrics
# Traceback
final_state = np.argmin(path_metrics)
final_metric = path_metrics[final_state]
decoded = np.zeros(n_branches, dtype=int)
current_state = final_state
for t in range(n_branches - 1, -1, -1):
survivor_info = survivors[current_state, t]
prev_state = survivor_info // 2
input_bit = survivor_info % 2
decoded[t] = input_bit
current_state = prev_state
return decoded, int(final_metric)
# Example
if __name__ == "__main__":
g1 = [1, 1, 1]
g2 = [1, 0, 1]
K = 3
encoder = ConvolutionalEncoder(g1, g2, K)
decoder = ViterbiDecoder(g1, g2, K)
# Original message (with tail bits)
message = np.array([1, 0, 1, 1, 0, 0])
print("===== Viterbi Decoder Test =====")
print(f"Original message: {''.join(map(str, message))}")
# Encode
encoded = encoder.encode(message)
print(f"Encoded: {''.join(map(str, encoded))}")
# Add error
received = encoded.copy()
received[5] ^= 1 # Flip bit
print(f"Received (with error): {''.join(map(str, received))}")
# Decode
decoded, metric = decoder.decode(received)
print(f"Decoded: {''.join(map(str, decoded))}")
print(f"Path metric: {metric}")
print(f"Decoding successful: {np.array_equal(decoded, message)}")
๐ BER Performance Simulation CLO-6
Complete BER Simulation
import numpy as np
import matplotlib.pyplot as plt
from scipy.special import erfc
def ber_simulation():
"""Compare coded vs uncoded BER performance."""
# Parameters
g1 = [1, 1, 1]
g2 = [1, 0, 1]
K = 3
num_bits = 10000
EbN0_dB = np.arange(0, 10.5, 0.5)
num_trials = 5
encoder = ConvolutionalEncoder(g1, g2, K)
decoder = ViterbiDecoder(g1, g2, K)
# Results storage
ber_uncoded = np.zeros_like(EbN0_dB)
ber_coded = np.zeros_like(EbN0_dB)
print("===== BER Simulation =====")
for idx, snr_db in enumerate(EbN0_dB):
errors_uncoded = 0
errors_coded = 0
total_bits = 0
for trial in range(num_trials):
# Generate random bits
bits = np.random.randint(0, 2, num_bits)
bits_tail = np.concatenate([bits, np.zeros(K-1, dtype=int)])
# Encode
coded = encoder.encode(bits_tail)
# BPSK modulation (0 -> +1, 1 -> -1)
tx_uncoded = 2 * bits - 1
tx_coded = 2 * coded - 1
# Channel (AWGN)
EbN0_linear = 10 ** (snr_db / 10)
# Uncoded
noise_var_uncoded = 1 / (2 * EbN0_linear)
noise_uncoded = np.sqrt(noise_var_uncoded) * np.random.randn(len(tx_uncoded))
rx_uncoded = tx_uncoded + noise_uncoded
# Coded (rate 1/2)
noise_var_coded = 1 / (2 * EbN0_linear * 0.5)
noise_coded = np.sqrt(noise_var_coded) * np.random.randn(len(tx_coded))
rx_coded = tx_coded + noise_coded
# Hard decision
decoded_uncoded = (rx_uncoded < 0).astype(int)
received_hard = (rx_coded < 0).astype(int)
# Viterbi decoding
decoded_coded, _ = decoder.decode(received_hard)
# Count errors
errors_uncoded += np.sum(bits != decoded_uncoded)
errors_coded += np.sum(bits != decoded_coded[:num_bits])
total_bits += num_bits
ber_uncoded[idx] = errors_uncoded / total_bits
ber_coded[idx] = errors_coded / total_bits
print(f"Eb/N0 = {snr_db:4.1f} dB: BER_coded = {ber_coded[idx]:.2e}")
# Theoretical BER for uncoded BPSK
ber_theory = 0.5 * erfc(np.sqrt(10 ** (EbN0_dB / 10)))
# Plot results
plt.figure(figsize=(10, 7))
plt.semilogy(EbN0_dB, ber_theory, 'k-', linewidth=2, label='Uncoded BPSK (Theory)')
plt.semilogy(EbN0_dB, ber_uncoded, 'bo-', linewidth=1.5, markersize=6, label='Uncoded BPSK (Sim)')
plt.semilogy(EbN0_dB, ber_coded, 'rs-', linewidth=1.5, markersize=6, label=f'Conv Code R=1/2, K={K} (Sim)')
plt.xlabel('$E_b/N_0$ (dB)', fontsize=12)
plt.ylabel('Bit Error Rate (BER)', fontsize=12)
plt.title('BER Performance: Coded vs Uncoded BPSK', fontsize=14)
plt.legend(loc='lower left', fontsize=11)
plt.grid(True, which='both', alpha=0.3)
plt.xlim([min(EbN0_dB), max(EbN0_dB)])
plt.ylim([1e-6, 1])
plt.tight_layout()
plt.savefig('ber_comparison.png', dpi=150)
plt.show()
if __name__ == "__main__":
ber_simulation()
โ ๏ธ Simulation Tips
- Use enough bits for statistical significance (โฅ10,000)
- Run multiple trials and average results
- Account for code rate when calculating noise variance
- Use NumPy vectorization for speed
๐ก Tips & Best Practices
Python vs MATLAB Equivalents
| MATLAB | Python (NumPy) |
|---|---|
zeros(m, n) | np.zeros((m, n)) |
ones(m, n) | np.ones((m, n)) |
eye(n) | np.eye(n) |
A' (transpose) | A.T |
A * B (matrix) | np.dot(A, B) or A @ B |
A .* B (element) | A * B |
mod(A, 2) | A % 2 |
de2bi(n, k) | [(n >> i) & 1 for i in range(k-1,-1,-1)] |
bi2de(b) | int(''.join(map(str, b)), 2) |
Useful Libraries
NumPy
np.array()- Create arraysnp.dot()- Matrix multiplynp.sum()- Sum elementsnp.log2()- Log base 2
Matplotlib
plt.plot()- Line plotsplt.semilogy()- Log y-axisplt.xlabel()- Axis labelsplt.legend()- Add legend
SciPy
special.erfc()- Complementary errorsignal.convolve()- Convolutionlinalg.inv()- Matrix inverse
Built-in
heapq- Priority queuescollections- Defaultdictitertools- Combinatorics
โ Assignment Submission Checklist
- โ Header with name, ID, assignment number
- โ Code runs without errors
- โ All imports at the top
- โ Functions have docstrings
- โ Figures have titles, labels, and legends
- โ Results printed clearly with f-strings
- โ Code follows PEP 8 style guide
๐ Additional Resources
- NumPy Documentation: numpy.org/doc/
- Matplotlib: matplotlib.org/tutorials/
- SciPy: scipy.org
- Textbook: Simon Haykin, Communication Systems, 4th Ed.
- YouTube: Dr. Al Bataineh's lecture videos
Need Help?
Contact Dr. Al Bataineh during office hours (Tuesday/Thursday 11:00 AM - 12:00 PM)
Email: [email protected] | Office: F1-1175