服务端

import socket
from threading import Thread

def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data:
                break
            conn.send(data.upper())
        except ConnectionError as e:
            print(str(e))
            break
    conn.close()

def server(ip, port):
    serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serv.bind((ip, port))
    serv.listen(5)

    while True:
        conn, addr = serv.accept()
        t = Thread(target=communicate, args=(conn,))
        t.start()

    serv.close()

if __name__ == '__main__':
    server("127.0.0.1", 8080)

客户端

import socket

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("127.0.0.1", 8080))

while True:
    msg = input(">>:").strip()
    if not msg:
        continue
    client.send(msg.encode("utf-8"))
    data = client.recv(1024)
    print(data.decode("utf-8"))

client.close()

此时,客户端可以无限制的和服务端建立连接,并建立对应的线程,必须控制该线程的数量在机器可接受的范围内。

所以需要建立一个池子用来放线程,即:线程池

改进后的服务端,限制了线程个数为2个

import socket
# from threading import Thread
from concurrent.futures import ThreadPoolExecutor

def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data:
                break
            conn.send(data.upper())
        except ConnectionError as e:
            print(str(e))
            break
    conn.close()

def server(ip, port):
    serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serv.bind((ip, port))
    serv.listen(5)

    while True:
        conn, addr = serv.accept()
        # t = Thread(target=communicate, args=(conn,))
        # t.start()
        pool.submit(communicate, conn)

    serv.close()

if __name__ == '__main__':
    pool = ThreadPoolExecutor(2)
    server("127.0.0.1", 8080)

进程池和线程池

进程池案例

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
import random
from threading import currentThread

def task():
    print("name: %s pid: %s run" % (currentThread().getName(), os.getpid()))
    time.sleep(random.randint(1, 3))

if __name__ == '__main__':
    # 不指定默认就是CPU的核数, 从始至终都是那4个
    pool = ProcessPoolExecutor(4)
    for i in range(10):
        # 提交任务 => 异步调用
        pool.submit(task)

    # 相当于执行了join
    pool.shutdown(wait=True) # wait 默认为True

    print("主")

线程池案例

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
import random
from threading import currentThread

def task():
    print("name: %s pid: %s run" % (currentThread().getName(), os.getpid()))
    time.sleep(random.randint(1, 3))

if __name__ == '__main__':
    # 线程池的PID都是一样的
    pool = ThreadPoolExecutor(5)
    for i in range(10):
        # 提交任务 => 异步调用
        pool.submit(task)

    # 相当于执行了join
    pool.shutdown(wait=True)

    print("主")

异步调用和回调机制

同步调用

提交任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码;导致程序是串行执行,效率低下。

import time, random
from concurrent.futures import ThreadPoolExecutor

def la(name):
    print("%s is laing" % name)
    time.sleep(random.randint(3, 5))
    res = random.randint(7, 13) * "#"
    return {"name": name, "res": res}
    # weigh({"name": name, "res": res})

def weigh(shit):
    name = shit["name"]
    size = len(shit['res'])
    print("%s 拉了 <%s>kg" % (name, size))

if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)
    shit1 = pool.submit(la, "wujie").result()
    weigh(shit1)
    shit2 = pool.submit(la, "lala").result()
    weigh(shit2)
    shit3 = pool.submit(la, "test").result()
    weigh(shit3)

异步调用

提交任务后,不在原地等待,任务执行完毕,再回调的时候会自动调用需要执行的方法

import time, random
from concurrent.futures import ThreadPoolExecutor

def la(name):
    print("%s is laing" % name)
    time.sleep(random.randint(3, 5))
    res = random.randint(7, 13) * "#"
    return {"name": name, "res": res}
    # weigh({"name": name, "res": res})

def weigh(shit):
    shit = shit.result()
    name = shit["name"]
    size = len(shit['res'])
    print("%s 拉了 <%s>kg" % (name, size))

if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)

    # 异步方式:添加回调函数,执行完了自动执行weigh方法
    pool.submit(la, "wujie").add_done_callback(weigh)
    pool.submit(la, "lala").add_done_callback(weigh)
    pool.submit(la, "test").add_done_callback(weigh)
最后修改:2021 年 05 月 07 日 12 : 05 AM
如果觉得我的文章对你有用,请随意赞赏一点,喝杯咖啡也行