知识清单
基础语法和概念
变量和数据类型
- 基本类型:整数、浮点数、字符串、布尔值
- 复合类型:列表、元组、集合、字典
操作符
- 算术操作符:
+
,-
,*
,/
,//
,%
,**
- 比较操作符:
==
,!=
,>
,<
,>=
,<=
- 逻辑操作符:
and
,or
,not
- 身份操作符:
is
,is not
- 成员操作符:
in
,not in
控制流
- 条件语句:
if
,elif
,else
- 循环:
for
,while
- 循环控制:
break
,continue
,pass
函数
- 定义和调用函数
- 参数和返回值
- 默认参数、关键字参数
- 可变参数 *args & **kwargs
- 匿名函数(lambda 表达式)
- 命名空间和作用域 Namespace and Scope: 在 Python 中,命名空间(Namespace)和作用域(Scope)是两个基本的概念,它们与变量的可见性和生命周期密切相关。
- 闭包 Closure
命名空间 Namespace
命名空间是从名称到对象的一个映射。在 Python 中,不同的命名空间由不同的生命周期管理。例如,函数内的局部命名空间在函数被调用时创建,函数返回或抛出异常时被删除。而全局命名空间在模块被加载时创建,在解释器被关闭时删除。Python 的命名空间可以是嵌套的,允许重用名称而不引起名称冲突。
Python 中的几种命名空间包括
- 局部命名空间: 特定于当前函数或类方法。如果在函数内部声明变量,它们将位于这个局部命名空间内。
- 全局命名空间: 特定于当前模块。在模块级别声明的变量或函数会放入这个全局命名空间。
- 内置命名空间: 包含 Python 的内置对象,比如函数 print()和异常 Exception 等,是全局可访问的。
作用域 Scope
作用域是 Python 程序中可以直接访问命名空间的文本区域。作用域决定了在哪个区域内你可以直接访问一个命名空间中的名称。
Python 中的作用域分为四种,有时被称为 LEGB 规则
- L (Local): 最先被搜索的是局部作用域,也就是函数或方法内部的作用域。
- E (Enclosing): 如果在局部作用域里找不到,会继续搜索包围它的函数的局部作用域,这样的情况通常出现在嵌套函数中。
- G (Global): 如果在局部作用域和外围作用域都找不到,会在当前模块的全局命名空间中搜索。
- B (Built-in): 如果其他作用域都没有找到,最后会在内置命名空间中搜索。
示例 1-访问变量
这个例子展示了如何在不同作用域内访问和修改变量。
# 全局命名空间
x = "global x"
def outer_function():
# 外围作用域
x = "outer x"
def inner_function():
# 局部作用域
x = "inner x"
print(x)
inner_function()
print(x)
outer_function()
print(x)
# === Output ===
# inner x
# outer x
# global x
示例 2-修改作用域变量的值
在此例中,inner
函数通过 nonlocal
关键字修改了 outer
函数的局部变量 x
如果你想在函数内部修改全局作用域的变量,你可以使用 global
关键字。对于嵌套函数中的外围变量,可以使用 nonlocal
关键字。
def outer():
x = "outer x"
def inner():
nonlocal x
x = "inner x"
print("inner:", x)
inner()
print("outer:", x)
outer()
# === Output ===
# inner: inner x
# outer: inner x
了解作用域和命名空间不仅有助于编写结构化和模块化的代码,也是理解 Python 闭包和装饰器等高级概念的基础。
闭包 Closure
闭包是一种特殊的函数结构。它是一个函数内部定义的函数,并且这个内部函数引用了外部函数的局部变量。闭包的特点是即使外部函数已经执行完毕,内部函数仍然能够访问外部函数的局部变量。
特点
- 在一个函数内部定义另一个函数。
- 内部函数引用外部函数的变量。
- 外部函数返回内部函数,这样内部函数可以在外部函数执行完毕后仍然访问其变量。
示例
def outer_function(msg):
message = msg
def inner_function():
print(message)
return inner_function
my_func = outer_function("Hello")
my_func() # 输出 "Hello"
用途
- 数据封装和隐藏: 闭包允许你隐藏数据,只通过闭包内定义的函数来访问它。这是一种封装数据的方法,可以防止外部代码直接访问这些数据。
def make_counter():
count = 0
def counter():
nonlocal count
count += 1
return count
return counter
counter = make_counter()
print(counter()) # 输出 1
print(counter()) # 输出 2
- 保持状态: 闭包可以保持状态。即使外部函数已经执行完毕,闭包内的函数仍然记住外部函数的局部变量的状态。
def make_multiplier(x):
def multiplier(n):
return x * n
return multiplier
double = make_multiplier(2)
triple = make_multiplier(3)
print(double(5)) # 输出 10
print(triple(5)) # 输出 15
- 实现装饰器: 闭包是实现装饰器的关键机制。装饰器可以增加函数的额外功能,而不修改其原始定义。
- 延迟计算(Currying): 闭包允许实现函数的部分应用,即参数的延迟绑定,允许我们延迟计算。
def add(x, y):
return x + y
def make_adder(x):
def adder(y):
return add(x, y)
return adder
add_10 = make_adder(10)
print(add_10(5)) # 输出 15
- 回调函数: 闭包可用于创建回调函数,这在异步编程或事件驱动编程中特别有用。
def bubble_sort(arr, callback=None):
n = len(arr)
for i in range(n):
for j in range(0, n-i-1):
if arr[j] > arr[j+1]:
arr[j], arr[j+1] = arr[j+1], arr[j]
if callback: # 如果存在回调函数,调用它
callback(arr)
def print_callback(arr):
print("Current state of array:", arr)
# 定义数组和回调函数
arr = [64, 34, 25, 12, 22, 11, 90]
# 使用回调函数调用冒泡排序
bubble_sort(arr, print_callback)
好处
- 减少全局变量: 使用闭包可以避免定义全局变量,减少全局变量的使用。
- 增加可读性和维护性: 闭包让你能够将相关的功能组合在一起,增加代码的可读性和维护性。
模块和包
- 导入模块
- 创建自己的模块和包: 创建
__init__.py
文件 - Python 标准库
Python 标准库
Python 的标准库提供了许多有用的模块和函数,它们是 Python 安装的一部分,因此无需额外安装即可使用。
以下是一些常见和重要的 Python 标准库模块的概述:
- 文本处理
- re: 正则表达式操作。
- difflib: 比较序列的帮助库。
- textwrap: 格式化文本段落以适应屏幕宽度。
- 数据结构
- datetime: 处理日期和时间的功能。
- collections: 高性能容器数据类型。
- heapq: 堆队列算法,例如优先队列。
- bisect: 数组二分查找算法。
- array: 高效的数值数组。
- 文件和目录访问
- os: 与操作系统功能交互。
- os.path: 常用路径名操作。
- glob: 文件路径名的模式匹配。
- shutil: 高级文件操作,包括复制和删除。
- 数据持久性
- pickle: 对象序列化。
- json: 读写 JSON 数据。
- sqlite3: SQLite 数据库接口。
- 数据压缩和归档
- zlib: 与 gzip 兼容的压缩。
- gzip: 支持 gzip 文件的工具。
- tarfile: 读写 tar 归档文件。
- 数学
- math: 数学函数。
- random: 生成伪随机数。
- statistics: 数值数据的统计函数。
- 网络通信
- socket: 底层网络接口。
- http: HTTP 模块。
- urllib: URL 处理模块。
- ftplib: FTP 协议客户端。
- smtplib: 发送电子邮件。
- 并发执行
- threading: 基于线程的并行。
- multiprocessing: 基于进程的并行。
- concurrent: 启动并发任务的库。
- subprocess: 运行新进程。
- 用户界面
- tkinter: Tcl/Tk 的 Python 接口,用于创建图形用户界面。
- 开发工具
- unittest: 单元测试框架。
- pdb: Python 调试器。
- timeit: 测量小代码片段的执行时间。
- logging: 记录日志。
- 调试和性能
- cProfile 和 profile: 性能分析。
- time: 时间访问和转换。
- 系统管理
- sys: 访问与 Python 解释器密切相关的变量和函数。
- argparse: 命令行选项、参数和子命令解析器。
高级主题
面向对象编程
装饰器
在 Python 中,装饰器是一种非常强大而且有用的工具,它允许程序员在不修改原始函数代码的情况下,给函数添加额外的功能。装饰器实际上是一个函数,它接受一个函数作为参数并返回一个新的函数。
使用动机
装饰器的主要用途之一是在代码运行时动态地添加功能。你可能想要在函数执行前后打印日志、检查权限、缓存结果等,而不想在每个函数中重复这些代码。装饰器使得你能够把这些常见的逻辑抽象出来,并且可以轻松地重用。
定义装饰器
一个装饰器本身是一个函数,它定义了要附加到原始函数上的包装行为。下面是一个简单的装饰器例子
def my_decorator(func):
def wrapper():
print("Something is happening before the function is called.")
func()
print("Something is happening after the function is called.")
return wrapper
def say_hello():
print("Hello!")
# 使用装饰器
say_hello = my_decorator(say_hello)
当你调用 say_hello()时,它不仅仅只执行 say_hello 函数体内的代码,还会执行 wrapper 函数中的代码。
使用 @ 语法
在 Python 中,有一种更方便的方法来应用装饰器,即使用@符号(语法糖)
@my_decorator
def say_hello():
print("Hello!")
这个@my_decorator 的用法和上面的 say_hello = my_decorator(say_hello)有相同的效果。
装饰器带参数
装饰器也可以接收参数,这样可以提供更灵活的行为。为了实现这一点,你需要再定义一个外层函数来接受这些参数
def decorator_with_args(number):
def my_decorator(func):
def wrapper(*args, **kwargs):
print("Decorator args:", number)
return func(*args, **kwargs)
return wrapper
return my_decorator
@decorator_with_args(42)
def say_hello():
print("Hello!")
在这个例子中,decorator_with_args
就是一个接受参数并返回一个装饰器的函数。
使用 functools.wraps
当你使用装饰器时,原始函数的某些元数据可能会丢失(例如函数的名称、文档字符串等)。为了保留这些元数据,你可以使用 functools
模块中的 wraps
装饰器
from functools import wraps
def my_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# ...
return func(*args, **kwargs)
return wrapper
使用 wraps
后,装饰后的函数将保留原始函数的元数据。
总结
装饰器是一种使用其他函数来修改函数或类方法的函数。它们允许你在不改变函数源代码的情况下添加新的行为。在 Python 中,装饰器被广泛用于日志记录、性能测试、事务处理、缓存等多种场景。
迭代器和生成器
迭代器 Iterator
迭代器是一个更通用的概念,任何实现了迭代协议(__iter__() 和 __next__() 方法)的对象都是迭代器。
迭代器是遵循迭代协议的对象,即它们包含以下两个方法
- __iter__():返回迭代器对象本身,这使得迭代器同时也是可迭代的。
- __next__():返回迭代器的下一个元素。如果没有元素可以返回,它会抛出 StopIteration 异常。
迭代器允许你一次处理序列中的一个元素,而不是一次性将所有元素加载到内存中。这就是为什么迭代器特别适用于处理大数据集或无限序列。
在 Python 中,你可以使用内置的 iter()函数将可迭代对象(如列表,元组,字符串等)转换为迭代器,然后使用 next()函数来逐个获取元素。
生成器 Generator
生成器是一种特殊的迭代器,它通过函数中的 yield 语句动态产生值,而不需要实现 __iter__() 和 __next__() 方法,因为生成器函数或生成器表达式自动支持这些方法。
生成器是一种简化版的迭代器。它们是通过在函数中使用 yield 语句来构建的。当你调用包含 yield 的函数时,函数的运行状态会在 yield 处中断,并返回一个值。当再次调用 next()时,函数会从上次中断的位置继续执行,直到再次遇到 yield。
生成器可以是生成器函数或生成器表达式
- 生成器函数: 使用 def 关键字定义,并在函数体内使用 yield 关键字。
- 生成器表达式: 类似于列表推导式,但使用圆括号而不是方括号。
生成器函数的一个例子
def my_generator():
yield 1
yield 2
yield 3
gen = my_generator()
print(next(gen)) # 输出 1
print(next(gen)) # 输出 2
print(next(gen)) # 输出 3
# print(next(gen)) # 抛出 StopIteration
生成器表达式的一个例子
gen_expr = (x * x for x in range(3))
for num in gen_expr:
print(num) # 输出 0, 1, 4
上下文管理器 和 with 语句
在 Python 中,上下文管理器是一种实现特定协议的对象,它定义了在某个操作的前后需要执行的代码。这个协议包括两个魔术方法:__enter__() 和 __exit__()。上下文管理器通常与 with 语句一起使用,来简化资源的管理(如文件操作、网络连接、锁的获取和释放等),确保资源的正确获取和释放。
上下文管理器
上下文管理器的主要职责是管理资源的使用周期,确保即使在发生错误或异常的情况下,资源也能被正确地释放。
主要方法
__enter__(self)
: 进入上下文管理器时执行的方法。通常返回资源对象。__exit__(self, exc_type, exc_val, exc_tb)
: 退出上下文时执行的方法。用于处理资源的释放以及异常处理。如果方法处理了异常,则应该返回 True 以防止异常继续传播。
class MyContextManager:
def __enter__(self):
print("进入上下文")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print("退出上下文")
if exc_type:
print(f"异常发生: {exc_val}")
return True # 处理了异常
# 使用示例
with MyContextManager() as manager:
print("执行代码块")
raise ValueError("出错了!")
with 语句
with
语句用于包裹上下文管理器的代码块。它会在进入该块之前调用上下文管理器的 __enter__
方法,并在退出时调用 __exit__
方法。
优点
- 自动管理资源: 确保在代码块执行完毕后,不管有没有发生异常,资源都被正确释放。
- 提高代码可读性: 使得资源管理相关的代码更加集中和明确。
- 减少错误: 自动处理资源的获取和释放,减少了因遗漏释放资源而造成的错误。
使用场景
- 文件操作: 自动关闭文件。
- 网络连接: 自动关闭连接。
- 锁的获取和释放。
文件操作: 自动关闭文件。
在这个文件操作的例子中,open 函数返回一个文件对象,它是一个上下文管理器。使用 with 语句,文件在离开代码块时自动关闭,即使在写入时发生异常也是如此。
with open('example.txt', 'w') as file:
file.write('Hello, world!')
# 文件在这里已经被正确关闭
网络连接: 自动关闭连接。
假设我们使用 Python 的 socket 库来建立一个网络连接。我们可以创建一个上下文管理器来确保连接在使用后被正确关闭。
import socket
class NetworkConnection:
def __init__(self, host, port):
self.host = host
self.port = port
self.connection = None
def __enter__(self):
self.connection = socket.create_connection((self.host, self.port))
return self.connection
def __exit__(self, exc_type, exc_val, exc_tb):
if self.connection:
self.connection.close()
print("连接已关闭")
# 示例: 简单版
with NetworkConnection("www.example.com", 80) as conn:
conn.sendall(b"GET / HTTP/1.0\r\n\r\n")
print(conn.recv(1024)) # 接收数据的调用,其中 1024 指定了最大接收数据的字节数。
# 示例:处理超时和多次接收
with NetworkConnection("www.example.com", 80) as conn:
conn.sendall(b"GET / HTTP/1.0\r\n\r\n")
response = b""
while True:
try:
data = conn.recv(1024)
if not data: # 没有更多数据
break
response += data
except socket.timeout:
# 处理超时情况
print("读取超时,当前已接收的数据:", response.decode('utf-8'))
break
print(response.decode('utf-8'))
在这个示例中,NetworkConnection 类创建了一个到指定主机和端口的网络连接。当离开 with 语句块时,无论是否发生异常,__exit__
方法都会被调用,并关闭网络连接。
使用 socket 进行网络通信时,可以通过 recv 方法的返回值来判断是否还有更多数据可读。当 recv 方法返回一个空字节串(b''
)时,这通常表示对方已经正常关闭了连接,没有更多数据发送给你。在这种情况下,你可以安全地认为数据接收已经完成,并退出接收循环。
在 Python 中,b''
表示一个空的字节串(byte string)。b 前缀表明随后的字符串是一个字节串(byte string)而非普通的文本字符串(text string)。这是 Python 中处理二进制数据和文本数据的区分方式。
在处理像文件 IO、网络通信等涉及二进制数据的场景时,使用字节串是必要的。字节串提供了一种更精确地控制数据的方式,确保数据不会被自动地或错误地解释为文本
锁的获取和释放。
在多线程编程中,我们经常需要获取和释放锁来保护共享资源。我们可以通过上下文管理器简化这一过程。
from threading import Lock
class ThreadSafeLock:
def __init__(self):
self.lock = Lock()
def __enter__(self):
self.lock.acquire()
print("锁已获取")
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock.release()
print("锁已释放")
# 使用示例
lock = ThreadSafeLock()
with lock:
# 在这里执行需要同步的操作
pass
在这个示例中,ThreadSafeLock 类使用 Python 标准库中的 Lock 对象。通过实现 __enter__
和 __exit__
方法,它在进入和退出 with 语句时自动获取和释放锁。
并发和并行
- 线程: threading 模块 (并发)
- 异步编程 asyncio (并发)
- 进程: multiprocessing 模块 (并行)
在 Python 的面向对象编程中,理解并发 (Concurrency) 和并行 (Parallelism) 是重要的,尤其是在设计能够高效处理多任务的应用程序时。虽然这两个术语经常一起使用,但它们指的是不同的概念:
并发 Concurrency (线程 threading 模块)
并发是指程序能够处理多个任务的能力。在并发模型中,任务在同一时间点开始,但不一定同时进行。它们可能交替执行,让出处理器时间给其他任务。并发更多关注的是结构上的多任务处理。
- 并发在单核处理器上: 即使在单核处理器上,也可以通过任务间的快速切换来实现并发。例如,一个任务在等待 IO 操作时,处理器可以切换到另一个任务。
- Python 中的并发: Python 提供了多种并发编程的选项,如线程(threading 模块)和协程(asyncio 模块)。
创建线程
import threading
def print_numbers():
for i in range(1, 6):
print(i)
def print_letters():
for letter in 'abcde':
print(letter)
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters)
# start(): 启动一个线程。
thread1.start()
thread2.start()
# join(): 等待线程结束。
thread1.join()
thread2.join()
在你的例子中,thread1.join()
和 thread2.join()
被调用后,主线程将等待这两个线程分别完成它们的执行。这意味着主线程会等待 print_numbers
和 print_letters
函数的执行完毕,然后程序才继续向下执行。
获取线程结果
Python 的 threading
模块不直接支持获取线程的返回值。要获取线程的返回结果,你通常需要使用某种形式的共享数据结构,例如队列或全局变量。传递参数到线程中可以通过 args
参数实现。
import threading
import time
def worker(num, result):
"""线程工作函数"""
print(f"Worker: {num}")
if num == 2:
time.sleep(1)
result.append(num)
# 存储结果
result = []
# 创建线程
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, result))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print(result) # 输出结果
# === Worker 的顺序不一定, 会有变化 ===
# Worker: 0
# Worker: 1
# Worker: 3
# Worker: 2
# Worker: 4
# === 最终结果一直不会变 ===
# [0, 1, 3, 4, 2]
处理错误情况
在使用 threading 模块时,可以在线程函数内部直接使用try...except
块来捕获和处理异常。例如:
import threading
def thread_function(name):
try:
# 这里是可能引发异常的代码
if name == "error":
raise ValueError("错误示例")
print(f"线程 {name} 正在运行")
except Exception as e:
print(f"线程 {name} 发生错误: {e}")
# 创建线程
thread = threading.Thread(target=thread_function, args=("error",))
thread.start()
thread.join()
并发 Concurrency (单线程 asyncio 模块)
asyncio
是 Python 的一个库,用于编写单线程的并发代码,通过协程(coroutine),事件循环(event loop),和 IO 多路复用(I/O multiplexing)。asyncio 通常用于网络和 I/O 密集型应用。
创建协程
import asyncio
# 定义一个异步函数
async def my_coroutine(task_number):
print(f"协程{task_number}开始运行")
await asyncio.sleep(1) # 模拟 I/O 操作
print(f"协程{task_number}结束运行")
return f"结果来自协程{task_number}"
# 创建一个主函数用于运行协程
async def main():
tasks = [my_coroutine(i) for i in range(5)] # 创建多个协程
completed, pending = await asyncio.wait(tasks) # 并发执行协程
for task in completed: # 打印每个协程的结果
print(task.result())
# 运行主函数
asyncio.run(main())
获取协程结果
asyncio.gather
用于并发运行多个协程,并收集它们的返回值。
import asyncio
async def my_coroutine(task_number):
await asyncio.sleep(1) # 模拟 I/O 操作
return f"结果来自协程{task_number}"
async def main():
tasks = []
for item in range(1, 4):
tasks.append(my_coroutine(item))
results = await asyncio.gather(*tasks)
print(results) # 打印所有协程的返回值
asyncio.run(main())
处理错误情况
在协程中处理异常与同步代码类似,可以使用 try...except
:
import asyncio
async def my_coroutine(task_number):
if task_number == 2:
raise ValueError("协程2发生错误")
await asyncio.sleep(1)
return f"结果来自协程{task_number}"
async def main():
tasks = [my_coroutine(i) for i in range(4)]
results = []
for task in asyncio.as_completed(tasks):
try:
result = await task
results.append(result)
except ValueError as e:
print(e)
print(results)
asyncio.run(main())
并行 Parallelism (进程)
并行是指多个任务同时进行。在多核处理器上,不同的任务可以在不同的核上同时运行。并行更多关注的是执行上的多任务处理。
- 并行在多核处理器上: 并行通常在具有多个处理核心的系统上实现,每个核心同时执行不同的任务。
- Python 中的并行: 可以通过多进程(multiprocessing 模块)来实现并行。
创建进程
from multiprocessing import Process
def print_numbers():
for i in range(1, 6):
print(i)
def print_letters():
for letter in 'abcde':
print(letter)
process1 = Process(target=print_numbers)
process2 = Process(target=print_letters)
# start(): 启动一个单独的进程。
process1.start()
process2.start()
# join(): 等待进程结束。
process1.join()
process2.join()
在这个例子中,process1.join()
和 process2.join()
被调用后,主进程将等待这两个进程分别完成它们的执行。这确保了 print_numbers
和 print_letters
函数的执行完全结束后,程序的其他部分才会继续执行。
获取线程结果
Python 的 multiprocessing
模块提供了 Pool
类,它可以用来并行地运行一组任务,并收集它们的输出。传递参数到进程中可以通过 args
参数实现,或者通过 Pool
类的 map
方法传递可迭代的参数。
from multiprocessing import Pool
import time
def worker(num):
"""进程工作函数"""
print(f"Worker: {num}")
if num == 2:
time.sleep(1)
print(f"Worker {num} finished")
return num
if __name__ == "__main__":
# 创建进程池
with Pool(5) as p:
results = p.map(worker, range(5))
print(results)
# === 输出结果 results 一直不变,非常稳定,顺序也是不变,由于我们使用的是 map 方法,所以结果的顺序也能够保证 ===
# Worker: 0
# Worker 0 finished
# Worker: 1
# Worker 1 finished
# Worker: 3
# Worker 3 finished
# Worker: 2
# Worker: 4
# Worker 4 finished
# Worker 2 finished
# [0, 1, 2, 3, 4]
在这个示例中,Pool.map
类似于内置的 map
函数,但它将任务分配给进程池中的进程执行,并收集返回结果。
处理错误情况
在 multiprocessing 中,由于每个进程都在自己的内存空间中运行,所以你不能直接在主进程中捕获子进程的异常。但是,你可以在子进程内部捕获异常,并将错误信息通过 multiprocessing.Queue
等方式传回主进程。
import multiprocessing
def process_function(name, error_queue):
try:
# 这里是可能引发异常的代码
if name == "error":
raise ValueError("错误示例")
print(f"进程 {name} 正在运行")
except Exception as e:
error_queue.put(e)
# 创建进程
error_queue = multiprocessing.Queue()
process = multiprocessing.Process(target=process_function, args=("error", error_queue))
process.start()
process.join()
# 检查错误
if not error_queue.empty():
error = error_queue.get()
print(f"进程发生错误: {error}")
并发 vs 并行
- 并发 是关于处理多个任务的结构,它可以在单核或多核处理器上实现。它涉及任务的切换和分配。
- 并行 是关于任务的同时执行,通常在多核处理器上实现。
在 Python 中,由于全局解释器锁(GIL)的存在,标准 CPython 解释器在同一时刻只能执行一个线程。因此,对于计算密集型任务,使用多线程并不会提高性能;相反,应该使用多进程来实现真正的并行计算。对于 IO 密集型任务,线程和协程都是很好的选择,因为它们可以在等待 IO 时让出处理器给其他任务。
锁机制
数据
数据处理
- 文件操作: 读取、写入、文件对象。
- 读取和写入文件
- 文件对象的方法
- 使用
with
语句管理文件
- JSON、XML、CSV 处理: 数据解析和生成。
- 字符串操作
- 字符串方法
- 格式化字符串
- 正则表达式: 字符串匹配、模式搜索。
数据科学
- NumPy: 数组操作、数学函数。
- Pandas: 数据分析、DataFrame 操作。
- Matplotlib: 数据可视化。
开发
开发模式
- 行为驱动开发(BDD)
- 测试驱动开发(TDD)
Web 开发
错误和异常处理
- try-except 块
- 引发异常(raise)
- finally 块
- 自定义异常
- 异常链
自定义异常
要创建自定义异常,你可以简单地继承 Exception
类或其任何子类。例如:
class MyCustomError(Exception):
"""一个带有自定义消息和额外数据的自定义异常类"""
def __init__(self, message, extra_data):
super().__init__(message) # 调用基类的构造器来设置消息
self.extra_data = extra_data # 添加额外的数据属性
# 在代码中使用自定义异常
def my_function():
raise MyCustomError("发生了某种特定错误!", extra_data={"id": 42, "reason": "特殊原因"})
try:
my_function()
except MyCustomError as e:
print(f"捕获到自定义异常:{e}")
print(f"额外数据:{e.extra_data}")
在这个例子中,MyCustomError
是一个从 Exception
类继承的自定义异常类。使用 raise
语句引发此异常,然后在 try...except
块中捕获它。
异常链
在 Python 3 中,当在一个 except
块内引发新的异常时,你可以使用 from
关键字来链接到原始异常。这称为异常链。异常链对于调试和记录错误原因非常有用,因为它保留了完整的异常上下文。
def function1():
raise ValueError("值错误")
def function2():
try:
function1()
except ValueError as e:
raise RuntimeError("运行时错误") from e
try:
function2()
except RuntimeError as e:
print(f"捕获到异常:{e}")
print(f"原始异常:{e.__cause__}")
在这个例子中,function1
引发了一个 ValueError
异常,然后在 function2
中被捕获。在 except
块内,使用 raise...from...
语句引发了一个 RuntimeError
异常,并将原始的 ValueError
异常链接为其原因。最后,在主程序中捕获 RuntimeError
时,你也可以访问其原始异常 ValueError
。
异常链提供了一种透明的方式来追踪异常的原因,特别是在复杂的应用程序中,它们有助于理解错误发生的上下文。
网络编程
- 套接字编程: Sockets -> link
- HTTP 请求和响应: Request, Response
测试
- 单元测试: Unit testing, unittest、pytest。
- 集成测试: Integration testing
- 接口测试: API testing
在 Python 中,单元测试、集成测试和接口测试是确保软件质量的重要手段。这些测试类型各自聚焦于软件开发的不同层面和阶段。
单元测试 Unit testing
单元测试聚焦于程序的最小可测试部分,通常是单个函数或方法。
- 目的: 验证每个部分是否如预期那样正确执行。
- 特点: 快速执行,无需依赖外部系统(如数据库、网络等)。
- 工具: Python 中常用的单元测试框架有
unittest
和pytest
。
import unittest
def add(a, b):
return a + b
class TestAddFunction(unittest.TestCase):
def test_add(self):
self.assertEqual(add(2, 3), 5)
if __name__ == '__main__':
unittest.main()
集成测试 Integration testing
集成测试检查不同模块或服务之间的交互是否按预期工作。
- 目的: 验证多个组件间的接口和交互。
- 特点: 涉及多个模块或系统的协作,检查数据流和控制流。
- 场景: 例如,测试数据库连接、外部依赖服务的调用等。
在集成测试中,你可能会测试实际的数据库连接或与其他服务的交互,而这通常需要设置测试环境。
示例概述
- 数据库模块: 用于简单的数据存取。
- 业务逻辑模块: 处理数据,并依赖于数据库模块。
- 集成测试: 测试这两个模块的集成是否如预期工作。
示例代码
# database.py
class Database:
def __init__(self):
self.data = {}
def insert(self, key, value):
self.data[key] = value
def retrieve(self, key):
return self.data.get(key, None)
# logic.py
class BusinessLogic:
def __init__(self, database):
self.database = database
def process_data(self, key, value):
if not self.database.retrieve(key):
self.database.insert(key, value)
return True
return False
# test_integration.py
import unittest
from database import Database
from logic import BusinessLogic
class TestDatabaseLogicIntegration(unittest.TestCase):
def test_integration(self):
db = Database()
logic = BusinessLogic(db)
self.assertTrue(logic.process_data("key1", "value1"))
self.assertEqual(db.retrieve("key1"), "value1")
self.assertFalse(logic.process_data("key1", "value2")) # 已存在的键不应重复插入
if __name__ == "__main__":
unittest.main()
接口测试 Interface testing
接口测试主要针对软件系统的外部接口,如 REST API、图形用户界面等。
- 目的: 验证系统的外部接口是否满足规范和需求。
- 特点: 测试与外部系统的交互,例如 API 请求和响应。
- 工具: 对于 API 测试,可以使用 requests 库结合测试框架进行测试。
import requests
import unittest
class TestAPI(unittest.TestCase):
def test_api_response(self):
response = requests.get("http://example.com/api")
self.assertEqual(response.status_code, 200)
if __name__ == '__main__':
unittest.main()
总结
- 单元测试: 主要用于测试程序的最小单元,如函数或类。
- 集成测试: 关注不同模块或系统组件之间的交互。
- 接口测试: 针对的是系统的外部接口,确保它们按照预期与外界交互。
在软件开发的不同阶段应用这些测试,有助于提早发现错误,确保软件质量,并促进持续集成和持续部署。
调试技巧
- 调试技巧: 使用调试器、日志记录。
使用调试器 Debugger
调试器允许你逐步执行代码,检查在运行时变量的状态,评估表达式,设置断点,等等。Python 提供了几种不同的调试器,例如内置的 pdb(Python Debugger),以及像 ipdb(基于 IPython 的更高级调试器)等第三方工具。
pdb 是 Python 的官方标准调试器,无需安装即可使用。
python -m pdb myscript.py
import pdb
def my_function(a, b):
result = a / b
return result
a, b = 5, 0
pdb.set_trace() # 设置断点
print(my_function(a, b))
在这个示例中,pdb.set_trace() 会在执行到这一行时暂停,允许你检查和修改变量,单步执行或继续执行程序。
常用命令
c
: 继续执行直到遇到下一个断点。n
: 执行下一行代码。s
: 进入函数。q
: 退出调试器。
日志记录 Logging
日志记录是另一种调试和监控程序行为的方法。与打印语句相比,日志记录更加灵活和强大,允许你指定消息的严重性级别,并且可以轻松地将日志信息导向不同的目的地(例如控制台、文件等)。
使用 logging 模块
Python 的 logging 模块提供了灵活的日志记录系统。
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.debug('程序开始运行')
def my_function(a, b):
try:
result = a / b
except ZeroDivisionError:
logging.error("除数不能为零")
return None
return result
a, b = 5, 0
result = my_function(a, b)
logging.debug(f"结果: {result}")
在这个示例中,我们设置了日志记录的基本配置,并在关键点使用日志记录来跟踪程序的执行和捕获异常。
日志级别
DEBUG
: 用于小规模的调试信息。INFO
: 用于常规信息。WARNING
: 表示有问题的情况可能发生。ERROR
: 严重问题,程序某些功能没有正常运行。CRITICAL
: 严重错误,可能导致程序崩溃。
总结
- 调试器 是交互式的,非常适合逐步执行代码和检查变量状态。
- 日志记录 提供了一种在运行时记录程序执行信息的方法,非常适合追踪和记录程序的行为。
版本控制
- Git 基础
虚拟环境和包管理
- 虚拟环境: venv、pipenv。
- 软件包管理: pip、PyPI。
- requirements.txt
代码风格和规范
- 代码静态分析工具: Pylint, 它主要用于检查 Python 代码中的错误,强制执行一致的编码标准,以及查找代码中可能的问题。
- 代码风格和规范: PEP 8, Black。