Python编程之核心特性与常用库

本文记录Python语言的一些重要特性及常用库用法...

0x01 核心特性

1. 列表、元组与字典

对比

Python中列表、元组和字典都是序列类型。

  • 列表和元组是有序的,字典是无序的。列表和元组都可以包含任意类型的数据,而字典中的元素必须是键值对。

  • 列表是可变的,可以被修改。元组是不可变的,不能被修改。字典也是可变的。

在Python中,列表用方括号[]表示,元组用圆括号()表示,字典用花括号{}表示

1
2
3
my_list = [1, 2, 3]
my_tuple = (4, 5, 6)
my_dict = {'a': 1, 'b': 2, 'c': 3}

适用场景

在Python中,列表、元组和字典都有自己的用途:

  • 列表是一种通用的数据类型,适用于各种情况。它可以用来存储任意类型的数据,并且支持修改。
  • 元组是一种不可变的数据类型,适用于需要保存不可变的数据的场景。例如,如果要保存一个人的名字和年龄,就可以使用元组来存储这些信息。
  • 字典是一种用于存储键值对数据的数据类型。它的键必须是唯一的,并且可以用于快速查找和修改数据。例如,如果要保存一组用户信息,就可以使用字典来存储这些信息。

总之,应该根据实际情况来选择使用哪种数据类型。例如,如果需要保存一组不可变的数据,那么元组是最好的选择;如果需要保存一组键值对,那么字典是最好的选择。

操作实现

列表是有序的对象集合,可以通过编号访问其中的对象。它们用方括号括起来,并用逗号分隔其中的元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 创建一个列表
my_list = [1, 2, 3]

# 访问列表中的元素
print(my_list[0]) # 输出 1

# 修改列表中的元素
my_list[1] = 4

# 在列表末尾添加元素
my_list.append(5)

# 在列表中插入元素
my_list.insert(1, 6)

# 从列表中删除元素
del my_list[2]

元组与列表类似,也是有序的对象集合。但是与列表不同,元组是不可变的,一旦创建,就不能修改。它们用圆括号括起来,并用逗号分隔其中的元素

1
2
3
4
5
6
7
8
9
# 创建一个元组
my_tuple = (1, 2, 3)

# 访问元组中的元素
print(my_tuple[0]) # 输出 1

# 元组是不可变的,因此无法修改元素
# 例如,下面的代码将会抛出 TypeError 异常
my_tuple[1] = 4

字典是一种映射类型,它将对象映射到键上,通过键来访问对象。它们用大括号括起来,并用冒号分隔键和值,多个键值对用逗号分隔

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 创建一个字典
my_dict = {"key1": 1, "key2": 2, "key3": 3}

# 访问字典中的元素
print(my_dict["key1"]) # 输出 1

# 修改字典中的元素
my_dict["key2"] = 4

# 在字典中添加元素
my_dict["key4"] = 5

# 删除字典中的元素
del my_dict["key3"]

字典的键必须是不可变的类型,例如字符串或数字。但是值可以是任意类型,包括可变类型,例如列表

1
2
3
4
5
6
7
8
9
10
11
# 创建一个字典,其中值是列表
my_dict = {"key1": [1, 2, 3], "key2": [4, 5, 6]}

# 访问字典中的元素
print(my_dict["key1"]) # 输出 [1, 2, 3]

# 修改字典中的元素,例如添加元素到列表中
my_dict["key1"].append(4)

# 删除字典中的元素
del my_dict["key2"]

2. 迭代器与生成器

迭代器和生成器都可以用来遍历序列中的元素。迭代器是一种对象,它能够用来遍历序列中的元素,但它不能用来修改序列中的元素。相反,生成器是一种特殊的迭代器,它可以用来生成新的值,而不是遍历已有的值。

使用迭代器可以让我们更容易地遍历序列中的元素,而使用生成器则可以让我们更容易地生成新的值。

使用迭代器来遍历一个列表:

1
2
3
4
5
6
7
8
9
my_list = [1, 2, 3]

# 创建迭代器
it = iter(my_list)

# 通过迭代器遍历列表中的元素
print(next(it)) # 输出 1
print(next(it)) # 输出 2
print(next(it)) # 输出 3

使用生成器来生成斐波那契数列中的元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b

# 创建生成器
fib = fibonacci()

# 通过生成器生成斐波那契数列中的元素
print(next(fib)) # 输出 0
print(next(fib)) # 输出 1
print(next(fib)) # 输出 1
print(next(fib)) # 输出 2

3. Python装饰器

Python 装饰器是一个高阶函数,它接受一个函数作为输入并返回一个新的函数作为输出。允许在不改变原函数的基础上,添加额外的功能,有助于减少重复代码

定义一个装饰器来记录一个函数被调用的次数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def call_counter(func):
def helper(*args, **kwargs):
helper.calls += 1
return func(*args, **kwargs)
helper.calls = 0
return helper

@call_counter
def succ(x):
return x + 1

print(succ.calls) # 0
for i in range(10):
succ(i)
print(succ.calls) # 10

在这个例子中,我们定义了一个 call_counter 装饰器,它接受一个函数作为输入并返回一个新的函数,这个新函数会记录它被调用的次数。我们通过使用 @call_counter 语法来将这个装饰器应用于函数 succ。

通过使用装饰器,我们能够在不改变 succ 函数本身的情况下添加一个计数功能。这种技术在很多场景下都很有用,包括缓存、权限控制、日志记录等等

实现一个计时器,对任何函数的执行时间计时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time

def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"{func.__name__} took {end - start} seconds to run.")
return result
return wrapper

@timer
def say_hello():
time.sleep(1)
print("hello!")

say_hello() # prints: "hello!" and "say_hello took 1.0005841255187988 seconds to run."

在这个示例中,timer 函数是一个装饰器,它接受一个函数作为参数并返回一个新函数,该函数在调用原函数之前计算运行时间,并在运行完成后打印出来。我们可以使用 @timer 语法糖来调用装饰器,这样就不用手动调用 timer 函数

4. 多进程、多线程与协程

Python有三种主要的并发模型:多进程、多线程和协程。

多进程和多线程都可以用来实现并发执行多个任务。进程是操作系统中的一个基本执行单元,拥有独立的内存空间和系统资源,并且可以并行执行多个进程。线程是进程的一个执行流,拥有独立的执行栈和局部变量,并且可以并行执行多个线程。

协程也可以用来实现并发,但与进程和线程不同,它是一种用户态的轻量级线程,可以由用户控制执行流程。协程通过切换不同的协程来实现并发,因此可以在单个线程中模拟多个线程的并发执行。

a. 对比

  • 多进程是指拥有多个执行流的程序。每个执行流都是一个进程,并且每个进程都有自己的独立内存空间。多进程的程序的开销比较大,因为每个进程都有自己的独立内存空间,所以消耗的内存比较多。

  • 多线程是指拥有多个执行流的程序。每个执行流都是一个线程,所有线程都共享同一个进程内存空间。多线程程序相比多进程程序,内存开销更小,因为所有线程共享同一个进程内存空间。但是由于线程之间会相互影响,所以多线程程序可能比较难理解和实现。多线程模型适用于需要多个执行流来共享数据的场景。例如有一个程序需要同时执行多个任务,并且这些任务之间需要互相通信,这时候可以使用多线程来实现。

  • 协程是一种轻量级的并发模型。它的工作方式类似于多线程,但是它更加轻量级,可以在不同的线程之间切换。协程比较适合用于需要在多个线程之间频繁切换的场景,因为它比较轻量级,开销比较小。但是由于协程模型是用户态的,它的执行需要由用户自己来控制。因此,如果用户不能正确的控制协程的执行,可能会导致协程的饥饿,即某个协程一直占用 CPU 资源而没有让出,从而导致其他协程无法执行。

b. 适用场景

通常来说,如果任务需要独立执行且消耗大量资源,则可以使用多进程来实现并发;如果任务之间有依赖关系或者需要共享数据,则可以使用多线程来实现并发;如果任务执行时间较短且需要高效的切换,则可以使用协程来实现并发。

  • 多进程模型适用于需要独立内存空间的场景,例如需要同时执行多个不相关的任务。
  • 多线程模型适用于需要多个执行流来共享数据的场景。例如有一个程序需要同时执行多个任务,并且这些任务之间需要互相通信,这时候可以使用多线程来实现。
  • 协程模型适用于需要在多个线程之间频繁切换的场景。例如一个程序中有多个任务,每个任务都需要多次切换到其他任务中去执行,这时候可以使用协程来实现。

c. 代码实现

多进程

在 Python 中,使用 multiprocessing 模块中的 Process 类或 Pool 类可以在多个 CPU 核心上运行多个进程,使用 concurrent.futures 模块中的 ProcessPoolExecutor 类或 ThreadPoolExecutor 类可以在单个 CPU 核心上运行多个进程

1)使用 multiprocessing 模块中的 Process 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process

def foo(i):
"""
A function that prints the number passed to it.
"""
print(f"Starting process {i}")

if __name__ == '__main__':
# Create a list to store the processes we want to run.
processes = []
# Start 5 processes, each running the foo() function with a different number.
for i in range(5):
# Create the Process object, passing the foo() function and the number we want to print.
p = Process(target=foo, args=(i,))
# Add the process to the list of processes.
processes.append(p)
# Start the process.
p.start()

2)使用 multiprocessing 模块中的 Pool 类

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Pool

def foo(i):
"""
A function that prints the number passed to it.
"""
print(f"Starting process {i}")

if __name__ == '__main__':
# Create a Pool object with 5 worker processes.
pool = Pool(5)
# Apply the foo() function to each number in the range 0-4.
pool.map(foo, range(5))

3)使用 concurrent.futures 模块中的 ProcessPoolExecutor 类

1
2
3
4
5
6
7
8
9
10
11
12
13
from concurrent.futures import ProcessPoolExecutor

def foo(i):
"""
A function that prints the number passed to it.
"""
print(f"Starting process {i}")

if __name__ == '__main__':
# Create a ProcessPoolExecutor object with 5 worker processes.
with ProcessPoolExecutor(max_workers=5) as executor:
# Apply the foo() function to each number in the range 0-4.
executor.map(foo, range(5))

4)使用 concurrent.futures 模块中的 ThreadPoolExecutor 类

1
2
3
4
5
6
7
8
9
10
11
12
13
from concurrent.futures import ThreadPoolExecutor

def foo(i):
"""
A function that prints the number passed to it.
"""
print(f"Starting process {i}")

if __name__ == '__main__':
# Create a ThreadPoolExecutor object with 5 worker processes.
with ThreadPoolExecutor(max_workers=5) as executor:
# Apply the foo() function to each number in the range 0-4.
executor.map(foo, range(5))

多线程

在 Python 中,使用 threading 模块中的 Thread 类或装饰器可以在单个 CPU 核心上运行多个线程,而使用 concurrent.futures 模块中的 ThreadPoolExecutor 类或 ProcessPoolExecutor 类可以在多个 CPU 核心上运行多个线程。

1)使用 threading 模块中的 Thread 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from threading import Thread

def foo(i):
"""
A function that prints the number passed to it.
"""
print(f"Starting thread {i}")

if __name__ == '__main__':
# Create a list to store the threads we want to run.
threads = []
# Start 5 threads, each running the foo() function with a different number.
for i in range(5):
# Create the Thread object, passing the foo() function and the number we want to print.
t = Thread(target=foo, args=(i,))
# Add the thread to the list of threads.
threads.append(t)
# Start the thread.
t.start()

2)使用 threading 模块中的 Thread 类和装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from threading import Thread

@Thread
def foo(i):
"""
A function that prints the number passed to it.
"""
print(f"Starting thread {i}")

if __name__ == '__main__':
# Create a list to store the threads we want to run.
threads = []
# Start 5 threads, each running the foo() function with a different number.
for i in range(5):
# Call the foo() function with the number we want to print.
# This automatically creates a Thread object and starts it.
t = foo(i)
# Add the thread to the list of threads.
threads.append(t)

3)使用 concurrent.futures 模块中的 ThreadPoolExecutor 类

1
2
3
4
5
6
7
8
9
10
11
12
13
from concurrent.futures import ThreadPoolExecutor

def foo(i):
"""
A function that prints the number passed to it.
"""
print(f"Starting thread {i}")

if __name__ == '__main__':
# Create a ThreadPoolExecutor object with 5 worker threads.
with ThreadPoolExecutor(max_workers=5) as executor:
# Apply the foo() function to each number in the range 0-4.
executor.map(foo, range(5))

4)使用 concurrent.futures 模块中的 ProcessPoolExecutor 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from concurrent.futures import ProcessPoolExecutor

# 定义一个需要多线程处理的函数
def some_function(arg):
# do something with arg
return result

# 使用 ProcessPoolExecutor 类来创建一个进程池
with ProcessPoolExecutor() as executor:
# 使用 map 函数来将 some_function 应用到一个可迭代对象中的每一个元素上
results = executor.map(some_function, iterable_object)
# 迭代处理结果
for result in results:
# do something with result

上面的代码会创建一个进程池,然后使用 map 函数来并行地执行 some_function 函数,并处理结果,使用 ProcessPoolExecutor 类的优点是它能够自动管理进程池

协程

协程是一种用于在单线程中支持并发编程的方法,在 Python 中,协程可以通过四种方式来实现:

  • 使用生成器函数:通过使用 Python 的 yield 关键字实现。
  • 使用 asyncio 库:通过使用 async 和 await 关键字实现。
  • 使用 Trio 库:通过使用 async 和 await 关键字实现。
  • 使用 Trollius 库:通过使用 @coroutine 装饰器实现
  1. 使用生成器函数
1
2
3
4
5
6
7
8
9
10
11
12
13
def my_coroutine():
while True:
received = yield
print('Received:', received)

# 创建协程对象
coro = my_coroutine()

# 启动协程
next(coro)

# 向协程发送数据
coro.send('Hello!')

2)使用 asyncio 库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def my_coroutine():
while True:
received = await asyncio.sleep(0.1)
print('Received:', received)

# 创建协程对象
coro = my_coroutine()

# 启动协程
asyncio.run(coro)

# 向协程发送数据
coro.send('Hello!')

3)使用 Trio 库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import trio

async def my_coroutine():
while True:
received = await trio.sleep(0.1)
print('Received:', received)

# 创建协程对象
coro = my_coroutine()

# 启动协程
trio.run(coro)

# 向协程发送数据
coro.send('Hello!')

4)使用 Trollius 库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import trollius

@trollius.coroutine
def my_coroutine():
while True:
received = yield trollius.From(trollius.sleep(0.1))
print('Received:', received)

# 创建协程对象
coro = my_coroutine()

# 启动协程
trollius.get_event_loop().run_until_complete(coro)

# 向协程发送数据
coro.send('Hello!')

0x02 实用类库

1. 系统信息

a. os模块

Python的os模块提供了与操作系统交互的功能,可用于执行文件和目录操作。下面是一些常用的文件/目录方法:

  • os.getcwd():返回当前工作目录。
  • os.chdir(path):更改当前工作目录。
  • os.listdir(path):返回指定目录下的文件和目录列表。
  • os.mkdir(path[, mode]):创建一个新目录。
  • os.rmdir(path):删除指定目录。
  • os.rename(src, dst):将文件或目录重命名。
  • os.remove(path):删除文件。
  • os.path.exists(path):检查给定路径是否存在。
  • os.path.isdir(path):检查给定路径是否为目录。
  • os.path.isfile(path):检查给定路径是否为文件。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import os

# 获取当前工作目录
current_dir = os.getcwd()
print(f'当前工作目录为:{current_dir}')

# 更改工作目录
os.chdir('/tmp')
print(f'当前工作目录为:{os.getcwd()}')

# 列出指定目录下的文件和目录
files = os.listdir('/tmp')
print(f'目录/tmp下的文件和目录:{files}')

# 创建新目录
os.mkdir('/tmp/test_dir')
print(f'目录/tmp下的文件和目录:{os.listdir("/tmp")}')

# 删除新目录
os.rmdir('/tmp/test_dir')
print(f'目录/tmp下的文件和目录:{os.listdir("/tmp")}')

# 检查路径是否存在
path = '/tmp/test_dir'
print(f'{path}是否存在:{os.path.exists(path)}')

# 检查路径是否为目录
print(f'{path}是否为目录:{os.path.isdir(path)}')

# 检查路径是否为文件
print(f'{path}是否为文件:{os.path.isfile(path)}')

b. psutil库

psutil 是一个跨平台的库,可以用来监控系统的 CPU、内存、磁盘、网络等信息。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import psutil

def get_system_info():
# 获取CPU信息
cpu_count = psutil.cpu_count() # CPU逻辑数量
cpu_percent = psutil.cpu_percent() # CPU使用率

# 获取内存信息
memory = psutil.virtual_memory() # 内存信息
memory_total = memory.total # 总内存
memory_used = memory.used # 已使用内存
memory_free = memory.free # 空闲内存
memory_percent = memory.percent # 内存使用率

# 获取磁盘信息
disk = psutil.disk_partitions() # 磁盘分区信息
disk_total = psutil.disk_usage('/').total # 总磁盘大小
disk_used = psutil.disk_usage('/').used # 已使用磁盘大小
disk_free = psutil.disk_usage('/').free # 空闲磁盘大小
disk_percent = psutil.disk_usage('/').percent # 磁盘使用率

# 获取网络信息
net = psutil.net_io_counters() # 网络信息
net_sent = net.bytes_sent # 发送的总字节数
net_recv = net.bytes_recv # 接收的总字节数

# 获取进程信息
process = psutil.Process() # 当前进程信息
process_name = process.name() # 进程名称
process_pid = process.pid # 进程ID
process_cpu_percent = process.cpu_percent() # 进程CPU使用率
process_memory_percent = process.memory_percent() # 进程内存使用率

# 获取系统用户信息
users = psutil.users() # 系统用户信息

# 获取系统启动时间
boot_time = psutil.boot_time() # 系统启动时间

获取全部网络及进程信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import psutil
import json

def get_network_and_process_info():
net_conns = psutil.net_connections()
proc_info = {}
for proc in psutil.process_iter():
proc_info[proc.pid] = {
"name": proc.name(),
"cpu_times": proc.cpu_times(),
# 这里还可以添加其他有关进程的信息
}
return {
"network_connections": net_conns,
"processes": proc_info
}


info = get_network_and_process_info()

info = get_network_and_process_info()

json_str = json.dumps(info, indent=2)
print(json_str)

2. 正则表达式

Python 中使用正则表达式通常是通过 re 模块来实现,它提供了一组用于处理正则表达式的函数。下面是一些常用的 re 模块函数:

  • re.search(pattern, string, flags=0):在给定字符串中搜索匹配正则表达式的第一个位置。返回一个 Match 对象,如果没有匹配返回 None。
  • re.match(pattern, string, flags=0):从字符串的开头开始匹配正则表达式。返回一个 Match 对象,如果没有匹配返回 None。
  • re.fullmatch(pattern, string, flags=0):匹配整个字符串。返回一个 Match 对象,如果没有匹配返回 None。
  • re.findall(pattern, string, flags=0):查找字符串中所有匹配正则表达式的位置。返回一个字符串列表,如果没有匹配返回空列表。
  • re.finditer(pattern, string, flags=0):查找字符串中所有匹配正则表达式的位置,返回一个迭代器。
  • re.sub(pattern, repl, string, count=0, flags=0):查找并替换字符串中匹配正则表达式的部分。返回替换后的字符串。

代码示例

1)匹配 email 地址:

1
2
3
4
5
6
7
8
9
10
import re

# 定义要匹配的字符串
s = "my email address is [email protected]"

# 使用正则表达式查找 email 地址
email = re.search(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b", s)

# 输出匹配的 email 地址
print(email.group())

2)替换字符串中的文本:

1
2
3
4
5
6
7
8
9
10
import re

# 定义要匹配的字符串
s = "The quick brown fox jumps over the lazy dog."

# 使用正则表达式替换文本
new_s = re.sub(r"\b(the)\b", "a", s)

# 输出替换后的字符串
print(new_s)

3)提取 HTML 标签中的属性:

1
2
3
4
5
6
7
8
9
10
import re

# 定义要匹配的字符串
s = '<a href="https://www.example.com" class="link">Example</a>'

# 使用正则表达式提取属性
attr = re.search(r'href="(.+?)" class="(.+?)"', s)

# 输出结果
print(attr.groups())

输出结果:

1
('https://www.example.com', 'link')

4)提取日期:

1
2
3
4
5
6
7
8
9
10
import re

# 定义要匹配的字符串
s = "Today is December 07, 2022."

# 使用正则表达式提取日期
date = re.search(r"\b(?P<month>\w+)\s+(?P<day>\d+),\s+(?P<year>\d+)\b", s)

# 输出结果
print(date.groups())

5)提取文本中的所有 URL:

1
2
3
4
5
6
7
8
9
10
import re

# 定义要匹配的字符串
s = "Here are some URLs: https://www.example.com, https://www.google.com, https://www.python.org"

# 使用正则表达式提取所有 URL
urls = re.findall(r"https?://[\w.-]+", s)

# 输出结果
print(urls)

3. 文件读写

常用读写文件的方式主要包括以下两种:

  • 使用 open() 函数打开文件并通过 .read() 或 .write() 方法来读写文件
  • 使用 with open() 语句块来管理文件对象,推荐用法

使用 open() 函数打开文件并读写文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 打开文件
f = open('myfile.txt', 'w')

# 向文件中写入字符串
f.write('Hello World!')

# 关闭文件
f.close()

# 打开文件
f = open('myfile.txt', 'r')

# 读取文件中的内容
content = f.read()

# 关闭文件
f.close()

print(content) # 输出: Hello World!

使用 with open() 语句块来管理文件对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 使用 with open() 语句块来打开文件
with open('myfile.txt', 'w') as f:
# 向文件中写入字符串
f.write('Hello World!')

# 文件会在 with open() 语句块结束时自动关闭

# 使用 with open() 语句块来打开文件
with open('myfile.txt', 'r') as f:
# 读取文件中的内容
content = f.read()

# 文件会在 with open() 语句块结束时自动关闭

print(content) # 输出: Hello World!

4. 数据处理

a. json格式处理

内置json模块

在 Python 中,可以使用内置的 json 模块来处理 JSON 格式的数据。常用的 json 模块函数:

  • json.loads(s):将 JSON 格式的字符串解码为 Python 字典。
  • json.dumps(obj):将 Python 对象编码为 JSON 格式的字符串。
  • json.load(fp, *, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw):从文件对象 fp 中读取 JSON 数据并转换为 Python 字典。
  • json.dump(obj, fp, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, default=None, sort_keys=False, **kw):将 Python 对象 obj 编码为 JSON 格式并写入文件对象 fp。

1)将JSON 数据解码为 Python 字典,使用 json.loads() 函数

1
2
3
4
5
6
7
8
9
10
11
12
import json

# 假设我们有一个包含 JSON 数据的字符串
json_string = '{"name": "John Doe", "age": 35, "city": "New York"}'

# 使用 json.loads() 函数可以将 JSON 数据解码为 Python 字典
json_data = json.loads(json_string)

# 现在我们可以像访问字典中的值一样访问 JSON 数据
print(json_data["name"]) # 输出 "John Doe"
print(json_data["age"]) # 输出 35
print(json_data["city"]) # 输出 "New York"

2) 将Python 字典编码为 JSON 数据,使用 json.dumps() 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import json

# 假设我们有一个 Python 字典
data = {
"name": "John Doe",
"age": 35,
"city": "New York"
}

# 使用 json.dumps() 函数可以将字典编码为 JSON 数据
json_string = json.dumps(data)

# 现在,json_string 变量中包含了编码后的 JSON 数据
# 可以将它保存到文件中,或者在网络上传输
print(json_string)

# 输出 '{"name": "John Doe", "age": 35, "city": "New York"}'

3)使用 json 模块读取和写入 JSON 数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import json

# 读取 JSON 格式的数据
with open("data.json") as fp:
data = json.load(fp)

# 使用字典访问 JSON 数据
print(data["name"]) # 输出 "John Doe"
print(data["age"]) # 输出 35
print(data["city"]) # 输出 "New York"

# 将 Python 字典编码为 JSON 格式
json_string = json.dumps(data)

# 将 JSON 数据写入文件
with open("data2.json", "w") as fp:
json.dump(json_string, fp)

第三方库

  • ujson:提供了与 json 模块相似的 API,但是比 json 模块快得多。
  • simplejson:一个简单易用的 JSON 库,提供了与 json 模块相似的 API,推荐。
  • rapidjson:一个高效的 JSON 库,可以用于快速解析和生成 JSON 数据。
  • ijson:一个轻量级的库,用于迭代处理 JSON 数据。
  • orjson:一个快速且高效的库,用于解析和生成 JSON 数据

simplejson

simplejson是一个第三方库,与Python内置的json模块功能相似,但速度更快、支持更多的Python数据类型和更好的Unicode支持。

使用pip安装程序:

1
pip install simplejson

使用simplejson库读写JSON文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import simplejson as json

# 读取JSON文件
with open('data.json', 'r') as f:
data = json.load(f)

# 访问字典中的值
print(data['key1']) # 输出:value1

# 修改字典
data['key1'] = 'new_value'

# 写入JSON文件
with open('data.json', 'w') as f:
json.dump(data, f)

b. xml格式处理

XML(Extensible Markup Language,可扩展标记语言)是一种用于存储和传输数据的标准格式,python常用如下三种方法处理xml数据:

  • SAX(Simple API for XML):一种流式解析XML数据的模式,通过事件驱动的方式来解析XML数据,即当解析器发现XML文档中的特定事件时,会触发对应的事件处理函数。这种方式的优点是高效率,因为它只需要一次遍历XML文档,就可以处理完所有的数据。缺点是代码实现较为复杂,需要编写多个事件处理函数。

  • DOM(Document Object Model):一种将XML文档转换为树形结构的方式。它使用一种递归的方法来遍历XML文档的所有节点,并将它们作为树的结点进行存储。这种方式的优点是使用简单,因为它提供了一系列的方法来操作树形结构。缺点是它需要将整个XML文档都加载到内存中,并解析完成后才能处理数据,因此它不太适用于处理大型XML文档。

  • ElementTree:提供了一种简单的方法来解析XML数据,并将其转换为树形结构。这种方式的优点是代码实现简单,易于理解。缺点是它不支持流式解析,只能处理完整的XML文档。

使用SAX处理XML数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import xml.sax

# 定义一个事件处理器类,继承自xml.sax.ContentHandler类
class BookHandler(xml.sax.ContentHandler):
# 元素开始事件处理
def startElement(self, name, attrs):
# 处理<book>元素的category属性
if name == 'book':
print(attrs['category'])

# 元素结束事件处理
def endElement(self, name):
# 处理<title>元素的文本内容
if name == 'title':
print(self.title)
# 处理<year>元素的文本内容
elif name == 'year':
print(self.year)

# 元素文本内容事件处理
def characters(self, data):
# 记录当前处理的元素名称
self.curr_elem = name
# 处理<title>元素的文本内容
if self.curr_elem == 'title':
self.title = data
# 处理<year>元素的文本内容
elif self.curr_elem == 'year':
self.year = data

# 创建一个解析器
parser = xml.sax.make_parser()
# 设置事件处理器
parser.setContentHandler(BookHandler())
# 解析XML数据
parser.parse('books.xml')

使用DOM处理XML数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import xml.dom.minidom

# 解析XML数据
dom = xml.dom.minidom.parse('books.xml')

# 获取XML文档的根节点
root = dom.documentElement

# 迭代XML数据
for book in root.childNodes:
# 只处理元素节点
if book.nodeType == book.ELEMENT_NODE:
# 获取<title>元素
title = book.getElementsByTagName('title')[0]
# 获取<author>元素
author = book.getElementsByTagName('author')[0]
# 获取<year>元素
year = book.getElementsByTagName('year')[0]
# 输出<title>、<author>、<year>元素的文本内容
print(title.firstChild.data, author.firstChild.data, year.firstChild.data)

# 以上代码会解析XML文档中的元素的文本内容,并输出到控制台

使用xml.etree.ElementTree库处理XML数据

1
2
3
4
5
6
7
8
9
10
11
12
from xml.etree import ElementTree

# 解析XML文件
tree = ElementTree.parse("data.xml")

# 获取XML文件的根元素
root = tree.getroot()

# 遍历XML文件中的所有元素
for elem in root.iter():
# 输出元素名和元素属性
print(elem.tag, elem.attrib)

第三方库-lxml

lxml是一个Python的XML处理库,可以用来解析、操作和生成XML数据。它可以方便地与Python的其它库(如BeautifulSoup)集成使用。lxml库还提供了XPath和XSLT的支持,可以用来快速查找XML文件中的特定元素。

安装lxml库:

1
pip install lxml

使用lxml库解析XML文件:

1
2
3
4
5
6
7
8
9
10
11
12
from lxml import etree

# 解析XML文件
tree = etree.parse("data.xml")

# 获取XML文件的根元素
root = tree.getroot()

# 遍历XML文件中的所有元素
for elem in root.iter():
# 输出元素名和元素属性
print(elem.tag, elem.attrib)

c. CSV格式处理

Pandas是一个用于数据操作和分析的Python库。它主要用于处理和分析表格型数据,例如电子表格和数据库。Pandas提供了大量的函数和方法来让我们能够完成常见的数据分析任务,例如过滤、转换、聚合、合并等等。

处理CSV文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import pandas as pd

# 读取CSV文件
df = pd.read_csv('my_data.csv')

# 查看前5行数据
print(df.head())

# 统计每个列的均值
print(df.mean())

# 筛选出某列值大于某个阈值的行
df = df[df['my_column'] > my_threshold]

# 将表格按某列分组,并计算每组的和
df = df.groupby('my_column')['my_other_column'].sum()

d. Excel格式处理

最常用的库之一是 openpyxl,可以用来读取、写入和处理 Excel 文件

安装openpyxl 库

1
pip install openpyxl

读取一个 Excel 文件中的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 导入 openpyxl 库
import openpyxl

# 读取 Excel 文件
wb = openpyxl.load_workbook('my_file.xlsx')

# 获取活动工作表
ws = wb.active

# 遍历工作表中的每一行
for row in ws.rows:
# 遍历每一行中的每一列
for cell in row:
# 输出单元格的值
print(cell.value)

# 读取 my_file.xlsx 文件中的数据,并将每个单元格的值打印出来

写入数据到 Excel 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 导入 openpyxl 第三方库
import openpyxl

# 创建一个工作簿
workbook = openpyxl.Workbook()

# 获取第一个工作表
worksheet = workbook.active

# 写入数据到单元格
worksheet['A1'] = 'Foo'
worksheet['B1'] = 'Bar'

# 保存工作簿
workbook.save('example.xlsx')

5. 日志处理

logging模块

Python 标准库中的 logging 模块提供了一个统一的接口用于记录日志。它可以将日志信息输出到文件、控制台或其他外部存储,并且支持对日志进行级别分类和过滤。

使用 logging 模块记录日志的基本步骤:

1
2
3
4
5
6
1) 创建一个 Logger 对象。
2) 设置日志级别。
3) 设置日志格式。
4) 创建一个 Handler 对象,用于将日志输出到指定的目的地。
5) 将 Handler 对象添加到 Logger 对象中。
6) 使用 Logger 对象记录日志。

使用 logging 模块将日志输出到文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import logging

# 创建一个 Logger 对象
logger = logging.getLogger(__name__)

# 设置日志级别
logger.setLevel(logging.INFO)

# 创建一个 Handler 对象,用于将日志输出到文件
file_handler = logging.FileHandler('app.log')

# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)

# 将 Handler 添加到 Logger 中
logger.addHandler(file_handler)

# 记录日志
logger.debug('This is a debug message.')
logger.info('This is an info message.')
logger.warning('This is a warning message.')
logger.error('This is an error message.')
logger.critical('This is a critical message.')

6. 数据库操作

a. MySQL

在Python中可以通过多种库来访问MySQL数据库。常用的MySQL库包括:

  • MySQLdb: 这是Python中最流行的MySQL数据库访问库,它提供了一种简单的方法来执行SQL语句并处理结果。
  • PyMySQL: 这是一个用于Python的免费、开源的MySQL数据库访问库,具有与MySQLdb相同的接口,可以轻松地替换MySQLdb。
  • PyODBC: 这是一个Python数据库接口库,可以用于访问多种数据库,包括MySQL。它提供了一个通用的接口,可以使用标准的ODBC API访问数据库。
  • pymysql-sa: 这是一个基于SQLAlchemy的MySQL驱动程序,可以与SQLAlchemy一起使用以方便地访问MySQL数据库。

PyMySQL

PyMySQL是一个用于Python的免费、开源的MySQL数据库访问库,具有与MySQLdb相同的接口,可以轻松地替换MySQLdb

1)使用PyMySQL访问MySQL数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 导入PyMySQL模块
import pymysql

# 连接到MySQL数据库
conn = pymysql.connect(host='localhost', user='username', password='password', database='mydb')

# 创建一个游标(cursor)
cursor = conn.cursor()

# 执行一条SQL语句,插入一条记录
cursor.execute("INSERT INTO users (name, email) VALUES ('John Doe', '[email protected]')")

# 提交事务
conn.commit()

# 关闭游标和连接
cursor.close()
conn.close()

2)使用PyMySQL查询数据库中所有用户记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 导入PyMySQL模块
import pymysql

# 连接到MySQL数据库
conn = pymysql.connect(host='localhost', user='username', password='password', database='mydb')

# 创建一个游标(cursor)
cursor = conn.cursor()

# 执行一条SQL语句,查询所有用户记录
cursor.execute("SELECT * FROM users")

# 获取查询结果
result = cursor.fetchall()

# 打印查询结果
for row in result:
print(row)

# 关闭游标和连接
cursor.close()
conn.close()

7. 网络编程

a. 套接字编程

socket 模块提供了标准的 BSD Sockets API,用于创建基于流的套接字。通过使用 socket 模块,可以实现客户端和服务器端程序之间的通信

使用 socket 模块实现一个简单的客户端和服务器端程序:

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 导入 socket 模块
import socket

# 创建一个 socket 对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 获取本地主机名
host = socket.gethostname()

# 设置端口号
port = 9999

# 连接服务,指定主机和端口
s.connect((host, port))

# 接收小于 1024 字节的数据
msg = s.recv(1024)

s.close()

print(msg.decode("utf-8"))

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 导入 socket 模块
import socket

# 创建一个 socket 对象
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 获取本地主机名
host = socket.gethostname()

# 设置端口号
port = 9999

# 绑定端口
serversocket.bind((host, port))

# 设置最大连接数,超过后排队
serversocket.listen(5)

while True:
# 建立客户端连接
clientsocket, addr = serversocket.accept()

print("连接地址: %s" % str(addr))

msg = "欢迎访问编程网站!" + "\r\n"
clientsocket.send(msg.encode("utf-8"))
clientsocket.close()

使用 socket 模块创建简单的 UDP 客户端/服务器程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 客户端
import socket

# 创建一个 socket 对象
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 定义服务器地址
host = "localhost"
port = 9999

# 发送消息
msg = "Hello World!"
s.sendto(msg.encode("utf-8"), (host, port))

# 接收数据
data, addr = s.recvfrom(1024)

s.close()

print("Received from server: %s" % data.decode("utf-8"))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 服务器
import socket

# 创建一个 socket 对象
serversocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 绑定端口
serversocket.bind(("localhost", 9999))

while True:
# 接收数据
data, addr = serversocket.recvfrom(1024)

print("Received from client: %s" % data.decode("utf-8"))

# 发送数据
serversocket.sendto(data, addr)

b. HTTP库

Python requests库是一个用于发送网络请求的第三方 Python 库。它允许你发送各种不同类型的 HTTP 请求,包括 GET、POST、PUT 和 DELETE。

requests 方法

方法 描述
delete(url, args) 发送 DELETE 请求到指定 url
get(url, params, args) 发送 GET 请求到指定 url
head(url, args) 发送 HEAD 请求到指定 url
patch(url, data, args) 发送 PATCH 请求到指定 url
post(url, data, json, args) 发送 POST 请求到指定 url
put(url, data, args) 发送 PUT 请求到指定 url
request(method, url, args) 向指定的 url 发送指定的请求方法

安装 requests 库:

1
pip install requests

代码示例

1)发送一个 GET 请求

1
2
3
4
5
6
7
8
9
10
import requests

# 发送 GET 请求
response = requests.get('https://www.example.com')

# 检查响应状态码
if response.status_code == 200:
print('Success!')
else:
print('An error occurred.')

2)发送一个带有查询参数的 GET 请求

1
2
3
4
5
6
7
8
9
10
import requests

# 发送 GET 请求
response = requests.get('https://www.example.com/search', params={'q': 'python'})

# 检查响应状态码
if response.status_code == 200:
print('Success!')
else:
print('An error occurred.')

3)发送一个带有自定义头部的 GET 请求

1
2
3
4
5
6
7
8
9
10
import requests

# 发送 GET 请求
response = requests.get('https://www.example.com', headers={'X-Custom-Header': 'MyValue'})

# 检查响应状态码
if response.status_code == 200:
print('Success!')
else:
print('An error occurred.')

4)从响应中提取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import requests

# 发送 GET 请求
response = requests.get('https://www.example.com')

# 处理响应数据
if response.status_code == 200:
# 获取响应文本
response_text = response.text
print('Response text:')
print(response_text)

# 获取响应头信息
response_headers = response.headers
print('Response headers:')
print(response_headers)

# 获取响应 Cookies
response_cookies = response.cookies
print('Response cookies:')
print(response_cookies)

else:
print('An error occurred.')

5)发送一个 POST 请求

1
2
3
4
5
6
7
8
9
10
import requests

# 发送 POST 请求
response = requests.post('https://www.example.com/login', data={'username': 'john', 'password': '1234'})

# 检查响应状态码
if response.status_code == 200:
print('Success!')
else:
print('An error occurred.')

c. 电子邮件

在 Python 中,常用的处理电子邮件的库有:

  • email:提供了一组用于处理复杂电子邮件消息的模块。
  • smtplib:提供了一个实现 Simple Mail Transfer Protocol (SMTP) 的客户端,推荐使用。
  • imaplib:提供了一个实现 Internet Message Access Protocol (IMAP) 的客户端。

发送邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import smtplib
from email.message import EmailMessage

def send_email():
# 创建 SMTP 客户端对象
smtp_client = smtplib.SMTP('smtp.example.com')

# 登录邮件服务器
smtp_client.login('[email protected]', 'password')

# 创建邮件对象
msg = EmailMessage()
msg.set_content('Hello, this is a test email')
msg['Subject'] = 'Test Email'
msg['From'] = '[email protected]'
msg['To'] = '[email protected]'

# 发送邮件
smtp_client.sendmail('[email protected]', ['[email protected]'], msg.as_string())

# 断开连接
smtp_client.quit()

if __name__ == '__main__':
send_email()

发送带有附件的邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import smtplib
from email.message import EmailMessage
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders

def send_email():
# 创建 SMTP 客户端对象
smtp_client = smtplib.SMTP('smtp.example.com')

# 登录邮件服务器
smtp_client.login('[email protected]', 'password')

# 创建邮件对象
msg = MIMEMultipart()
msg['Subject'] = 'Test Email with Attachment'
msg['From'] = '[email protected]'
msg['To'] = '[email protected]'

# 添加附件
with open('attachment.txt', 'rb') as f:
part = MIMEBase('application', 'octet-stream')
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition',
'attachment; filename="attachment.txt"')
msg.attach(part)

# 发送邮件
smtp_client.sendmail('[email protected]', ['[email protected]'], msg.as_string())

# 断开连接
smtp_client.quit()

if __name__ == '__main__':
send_email()

向多个用户发送带有 HTML 表格的 HTML 格式邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import smtplib
from email.message import EmailMessage

def send_email():
# 创建 SMTP 客户端对象
smtp_client = smtplib.SMTP('smtp.example.com')

# 登录邮件服务器
smtp_client.login('[email protected]', 'password')

# 创建邮件对象
msg = EmailMessage()
msg['Subject'] = 'Test HTML Email with Table'
msg['From'] = '[email protected]'
msg['To'] = '[email protected], [email protected]'

# 设置邮件正文为 HTML 格式
msg.add_alternative("""
<html>
<body>
<h1>Hello, this is a test HTML email</h1>
<table>
<tr>
<th>Name</th>
<th>Age</th>
</tr>
<tr>
<td>Alice</td>
<td>20</td>
</tr>
<tr>
<td>Bob</td>
<td>22</td>
</tr>
</table>
</body>
</html>
""", subtype='html')

# 发送邮件
smtp_client.sendmail('[email protected]', ['[email protected]', '[email protected]'], msg.as_string())

# 断开连接
smtp_client.quit()

if __name__ == '__main__':
send_email()

0xFF Reference