Secure applications and infrastructure for Toronto's financial and government sectors
Duration: 16-20 weeks (self-paced)
Level: Intermediate to Advanced
Prerequisites: Networking fundamentals, Linux/Windows administration, basic programming
#!/usr/bin/env python3
"""
Toronto Banking Security Assessment Tool
Compliance with Canadian banking regulations
"""
import requests
import ssl
import socket
import subprocess
import json
from datetime import datetime, timedelta
class TorontoBankingSecurityAssessment:
def __init__(self, target_host, compliance_level="OSFI"):
self.target_host = target_host
self.compliance_level = compliance_level
self.results = {
"assessment_date": datetime.now().isoformat(),
"target": target_host,
"compliance_framework": compliance_level,
"checks": []
}
def check_ssl_configuration(self):
"""Check SSL/TLS configuration for Canadian banking standards"""
try:
context = ssl.create_default_context()
with socket.create_connection((self.target_host, 443), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=self.target_host) as ssock:
cert = ssock.getpeercert()
cipher = ssock.cipher()
# Check certificate expiry
not_after = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
days_until_expiry = (not_after - datetime.now()).days
# Canadian banking requirements
min_key_size = 2048
approved_ciphers = ['TLS_AES_256_GCM_SHA384', 'TLS_AES_128_GCM_SHA256']
check_result = {
"check_name": "SSL/TLS Configuration",
"status": "PASS" if days_until_expiry > 30 else "FAIL",
"details": {
"certificate_expiry_days": days_until_expiry,
"cipher_suite": cipher[0] if cipher else "Unknown",
"protocol_version": cipher[1] if cipher else "Unknown",
"key_size": cipher[2] if cipher else "Unknown",
"osfi_compliant": cipher[1] >= "TLSv1.2" if cipher else False
},
"recommendations": self._get_ssl_recommendations(cipher, days_until_expiry)
}
self.results["checks"].append(check_result)
return check_result
except Exception as e:
error_result = {
"check_name": "SSL/TLS Configuration",
"status": "ERROR",
"error": str(e)
}
self.results["checks"].append(error_result)
return error_result
def check_canadian_privacy_compliance(self, privacy_policy_url):
"""Verify PIPEDA compliance indicators"""
try:
response = requests.get(privacy_policy_url, timeout=10)
content = response.text.lower()
# PIPEDA compliance indicators
pipeda_indicators = [
"personal information protection",
"electronic documents act",
"pipeda",
"privacy officer",
"data retention policy",
"cross-border data transfer",
"consent to collect",
"right to access",
"right to correction"
]
found_indicators = []
for indicator in pipeda_indicators:
if indicator in content:
found_indicators.append(indicator)
compliance_score = len(found_indicators) / len(pipeda_indicators) * 100
check_result = {
"check_name": "PIPEDA Compliance Assessment",
"status": "PASS" if compliance_score >= 80 else "REVIEW",
"details": {
"compliance_score": round(compliance_score, 2),
"found_indicators": found_indicators,
"missing_indicators": [i for i in pipeda_indicators if i not in found_indicators]
},
"recommendations": self._get_privacy_recommendations(compliance_score)
}
self.results["checks"].append(check_result)
return check_result
except Exception as e:
error_result = {
"check_name": "PIPEDA Compliance Assessment",
"status": "ERROR",
"error": str(e)
}
self.results["checks"].append(error_result)
return error_result
def check_security_headers(self):
"""Check HTTP security headers for Toronto financial services"""
try:
response = requests.get(f"https://{self.target_host}", timeout=10)
headers = response.headers
# Required security headers for Canadian financial institutions
required_headers = {
"strict-transport-security": "HSTS protection",
"x-frame-options": "Clickjacking protection",
"x-content-type-options": "MIME type sniffing protection",
"x-xss-protection": "XSS protection",
"content-security-policy": "CSP protection",
"referrer-policy": "Referrer policy",
"permissions-policy": "Feature policy"
}
found_headers = {}
missing_headers = {}
for header, description in required_headers.items():
if header in [h.lower() for h in headers.keys()]:
found_headers[header] = headers.get(header)
else:
missing_headers[header] = description
security_score = len(found_headers) / len(required_headers) * 100
check_result = {
"check_name": "Security Headers Assessment",
"status": "PASS" if security_score >= 85 else "REVIEW",
"details": {
"security_score": round(security_score, 2),
"found_headers": found_headers,
"missing_headers": missing_headers,
"osfi_requirement": "Grade A security headers required"
},
"recommendations": self._get_header_recommendations(missing_headers)
}
self.results["checks"].append(check_result)
return check_result
except Exception as e:
error_result = {
"check_name": "Security Headers Assessment",
"status": "ERROR",
"error": str(e)
}
self.results["checks"].append(error_result)
return error_result
def check_cdn_security(self):
"""Assess CDN and DDoS protection for Toronto operations"""
try:
# Check for common CDN providers used in Canada
cdn_headers = [
"cf-ray", # Cloudflare
"x-amz-cf-id", # AWS CloudFront
"x-azure-ref", # Azure CDN
"x-cache" # Generic CDN
]
response = requests.get(f"https://{self.target_host}", timeout=10)
detected_cdns = []
for header in cdn_headers:
if header in [h.lower() for h in response.headers.keys()]:
detected_cdns.append({
"header": header,
"value": response.headers.get(header, "")
})
# Check response time from Toronto
start_time = datetime.now()
requests.get(f"https://{self.target_host}", timeout=10)
response_time = (datetime.now() - start_time).total_seconds()
check_result = {
"check_name": "CDN and DDoS Protection",
"status": "PASS" if detected_cdns and response_time < 2.0 else "REVIEW",
"details": {
"detected_cdns": detected_cdns,
"response_time_seconds": round(response_time, 3),
"toronto_performance": "Good" if response_time < 1.0 else "Acceptable" if response_time < 2.0 else "Poor"
},
"recommendations": self._get_cdn_recommendations(detected_cdns, response_time)
}
self.results["checks"].append(check_result)
return check_result
except Exception as e:
error_result = {
"check_name": "CDN and DDoS Protection",
"status": "ERROR",
"error": str(e)
}
self.results["checks"].append(error_result)
return error_result
def generate_compliance_report(self):
"""Generate comprehensive security assessment report"""
passed_checks = len([c for c in self.results["checks"] if c["status"] == "PASS"])
total_checks = len(self.results["checks"])
compliance_percentage = (passed_checks / total_checks * 100) if total_checks > 0 else 0
report = {
**self.results,
"summary": {
"total_checks": total_checks,
"passed_checks": passed_checks,
"compliance_percentage": round(compliance_percentage, 2),
"overall_status": self._get_overall_status(compliance_percentage),
"next_assessment": (datetime.now() + timedelta(days=90)).isoformat()
},
"canadian_compliance_notes": self._get_canadian_compliance_notes()
}
return report
def _get_ssl_recommendations(self, cipher, days_until_expiry):
recommendations = []
if days_until_expiry < 30:
recommendations.append("Renew SSL certificate immediately")
if cipher and cipher[1] < "TLSv1.2":
recommendations.append("Upgrade to TLS 1.2 or higher for OSFI compliance")
return recommendations
def _get_privacy_recommendations(self, score):
if score < 80:
return ["Review privacy policy for PIPEDA compliance", "Consult with Canadian privacy lawyer"]
return ["Maintain current privacy policy standards"]
def _get_header_recommendations(self, missing):
return [f"Implement {header}: {desc}" for header, desc in missing.items()]
def _get_cdn_recommendations(self, cdns, response_time):
recommendations = []
if not cdns:
recommendations.append("Implement CDN for improved Toronto area performance")
if response_time > 2.0:
recommendations.append("Optimize server response time for Canadian users")
return recommendations
def _get_overall_status(self, percentage):
if percentage >= 90:
return "EXCELLENT - OSFI Ready"
elif percentage >= 80:
return "GOOD - Minor improvements needed"
elif percentage >= 70:
return "ACCEPTABLE - Review required"
else:
return "POOR - Immediate action required"
def _get_canadian_compliance_notes(self):
return [
"Assessment based on OSFI cybersecurity guidelines",
"PIPEDA compliance verification included",
"Next review recommended in 90 days",
"Contact Canadian cyber security experts for detailed analysis"
]
# Usage example
if __name__ == "__main__":
# Example for Toronto banking application assessment
assessor = TorontoBankingSecurityAssessment("secure-bank.toronto.ca", "OSFI")
# Run security checks
assessor.check_ssl_configuration()
assessor.check_canadian_privacy_compliance("https://secure-bank.toronto.ca/privacy")
assessor.check_security_headers()
assessor.check_cdn_security()
# Generate compliance report
report = assessor.generate_compliance_report()
# Save report
with open(f"security_assessment_{datetime.now().strftime('%Y%m%d')}.json", "w") as f:
json.dump(report, f, indent=2)
print("Security assessment completed for Toronto banking application")
print(f"Overall compliance: {report['summary']['overall_status']}")
Toronto Healthcare Security Audit - Perform comprehensive security assessment of a healthcare application handling Ontario patient data with PHIPA compliance requirements.
# Automated penetration testing for Toronto e-commerce
import requests
import subprocess
import json
from urllib.parse import urljoin, urlparse
class TorontoEcommercePenTest:
def __init__(self, target_url):
self.target_url = target_url
self.session = requests.Session()
self.vulnerabilities = []
def test_sql_injection(self):
"""Test for SQL injection vulnerabilities"""
payloads = ["'", "1' OR '1'='1", "'; DROP TABLE users; --"]
for payload in payloads:
test_url = f"{self.target_url}/search?q={payload}"
response = self.session.get(test_url)
if any(error in response.text.lower() for error in
["sql", "mysql", "postgresql", "oracle", "syntax error"]):
self.vulnerabilities.append({
"type": "SQL Injection",
"severity": "High",
"url": test_url,
"payload": payload
})
def test_xss(self):
"""Test for Cross-Site Scripting vulnerabilities"""
xss_payload = ""
# Test in search parameters
test_url = f"{self.target_url}/search?q={xss_payload}"
response = self.session.get(test_url)
if xss_payload in response.text:
self.vulnerabilities.append({
"type": "Cross-Site Scripting (XSS)",
"severity": "Medium",
"url": test_url,
"payload": xss_payload
})
def check_canadian_compliance(self):
"""Check for Canadian e-commerce compliance issues"""
compliance_checks = {
"privacy_policy": "/privacy",
"terms_service": "/terms",
"canadian_pricing": "/checkout",
"french_language": "/fr"
}
for check, path in compliance_checks.items():
test_url = urljoin(self.target_url, path)
response = self.session.get(test_url)
if response.status_code == 404:
self.vulnerabilities.append({
"type": "Compliance Issue",
"severity": "Medium",
"issue": f"Missing {check.replace('_', ' ').title()}",
"url": test_url
})
# Usage
pen_tester = TorontoEcommercePenTest("https://shop.toronto.ca")
pen_tester.test_sql_injection()
pen_tester.test_xss()
pen_tester.check_canadian_compliance()
print(f"Found {len(pen_tester.vulnerabilities)} vulnerabilities")
Government Portal Security Assessment - Conduct authorized penetration testing of a Toronto municipal web portal with focus on citizen data protection.
# Azure security configuration check for Toronto deployments
from azure.identity import DefaultAzureCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.security import SecurityCenter
class TorontoAzureSecurityAudit:
def __init__(self, subscription_id):
self.credential = DefaultAzureCredential()
self.subscription_id = subscription_id
self.security_client = SecurityCenter(
credential=self.credential,
subscription_id=subscription_id
)
def check_canadian_data_residency(self):
"""Ensure all resources are in Canadian regions"""
allowed_regions = ["Canada Central", "Canada East"]
violations = []
# Check resource locations
resource_client = ResourceManagementClient(
self.credential, self.subscription_id
)
for resource in resource_client.resources.list():
if resource.location not in [r.lower().replace(" ", "")
for r in allowed_regions]:
violations.append({
"resource": resource.name,
"location": resource.location,
"compliance_issue": "Data residency violation"
})
return violations
def assess_storage_encryption(self):
"""Check storage account encryption for PIPEDA compliance"""
# Implementation for storage encryption assessment
pass
def verify_network_security(self):
"""Verify network security groups and firewall rules"""
# Implementation for network security verification
pass
# Usage for Toronto municipal Azure environment
auditor = TorontoAzureSecurityAudit("toronto-municipal-subscription-id")
residency_violations = auditor.check_canadian_data_residency()
print(f"Found {len(residency_violations)} data residency violations")
Multi-Cloud Security Posture Assessment - Evaluate security configurations across AWS, Azure, and GCP for a Toronto fintech company with regulatory requirements.
# Toronto Incident Response Playbook
incident_response:
canadian_requirements:
pipeda_notification: "72 hours for significant breaches"
provincial_requirements:
ontario: "PHIPA notification within 24 hours"
quebec: "Bill 64 compliance required"
federal_reporting:
cyber_centre: "Report to Canadian Centre for Cyber Security"
privacy_commissioner: "Report to Privacy Commissioner of Canada"
escalation_matrix:
level_1_low:
response_time: "4 hours"
team: ["SOC Analyst", "IT Support"]
notification: ["IT Manager"]
level_2_medium:
response_time: "2 hours"
team: ["Security Engineer", "Network Admin", "Legal"]
notification: ["CISO", "Privacy Officer"]
level_3_high:
response_time: "1 hour"
team: ["Incident Commander", "Forensics Expert", "Legal Counsel"]
notification: ["CEO", "Board", "External Authorities"]
level_4_critical:
response_time: "30 minutes"
team: ["Full Response Team", "External Consultants", "PR Team"]
notification: ["All Stakeholders", "Media Relations", "Regulators"]
toronto_contacts:
legal: "Toronto Privacy Law Firm: +1-416-XXX-XXXX"
forensics: "Canadian Digital Forensics: +1-416-XXX-XXXX"
cyber_insurance: "Cyber Policy Provider: +1-416-XXX-XXXX"
public_relations: "Crisis Communications: +1-416-XXX-XXXX"
canadian_compliance_checklist:
- "Notify Privacy Commissioner within 72 hours"
- "Document all affected personal information"
- "Assess cross-border data implications"
- "Prepare breach notification for individuals"
- "Coordinate with provincial authorities if required"
- "Engage Canadian legal counsel"
- "Review cyber insurance coverage"
Toronto Hospital Breach Simulation - Conduct tabletop exercise simulating healthcare data breach with PHIPA compliance requirements and multi-stakeholder coordination.
Join the cybersecurity professionals protecting Canada's financial capital and critical infrastructure.