您的位置:首页 > 博客中心 > 数据库 >

day10-python并发编程之多线程协程及MySQL

时间:2022-03-15 06:45

 

第1章 python并发编程之多线程

1.1 死锁现象与递归锁

1.1.1 死锁概念

进程也有死锁与递归锁,在进程那里忘记说了,放到这里一切说了额

 

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

1.1.2 博客实例

from threading import Thread,Lock

import time

mutexA=Lock()

mutexB=Lock()

 

class MyThread(Thread):

    def run(self):

        self.func1()

        self.func2()

    def func1(self):

        mutexA.acquire()

        print(‘\033[41m%s 拿到A锁\033[0m‘ %self.name)

 

        mutexB.acquire()

        print(‘\033[42m%s 拿到B锁\033[0m‘ %self.name)

        mutexB.release()

 

        mutexA.release()

 

    def func2(self):

        mutexB.acquire()

        print(‘\033[43m%s 拿到B锁\033[0m‘ %self.name)

        time.sleep(2)

 

        mutexA.acquire()

        print(‘\033[44m%s 拿到A锁\033[0m‘ %self.name)

        mutexA.release()

 

        mutexB.release()

 

if __name__ == ‘__main__‘:

    for i in range(10):

        t=MyThread()

        t.start()

 

‘‘‘

Thread-1 拿到A锁

Thread-1 拿到B锁

Thread-1 拿到B锁

Thread-2 拿到A锁

然后就卡住,死锁了

‘‘‘

 

 

1.1.3 课堂实例

from threading import Thread,Lock,RLock

import time

 

# mutexA=Lock()

# mutexB=Lock()

 

mutexA=mutexB=RLock()

 

class MyThread(Thread):

    def run(self):

        self.f1()

        self.f2()

 

    def f1(self):

        mutexA.acquire()

        print(‘%s 拿到了A锁‘ %self.name)

 

        mutexB.acquire()

        print(‘%s 拿到了B锁‘ % self.name)

        mutexB.release() #1

 

        mutexA.release() #0

 

    def f2(self):

        mutexB.acquire()

        print(‘%s 拿到了B锁‘ % self.name)

        time.sleep(0.1)

 

        mutexA.acquire()

        print(‘%s 拿到了A锁‘ % self.name)

        mutexA.release()

 

        mutexB.release()

 

 

 

if __name__ == ‘__main__‘:

    for i in range(10):

        t=MyThread()

        t.start()

 

1.1.4 死锁解决方法

解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

 

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止

 

1.2 信号量Semaphore

同进程的一样

 

Semaphore管理一个内置的计数器,

每当调用acquire()时内置计数器-1;

调用release() 时内置计数器+1;

计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

 

实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

 

1.2.1 博客实例

from threading import Thread,Semaphore

import threading

import time

# def func():

#     if sm.acquire():

#         print (threading.currentThread().getName() + ‘ get semaphore‘)

#         time.sleep(2)

#         sm.release()

def func():

    sm.acquire()

    print(‘%s get sm‘ %threading.current_thread().getName())

    time.sleep(3)

    sm.release()

if __name__ == ‘__main__‘:

    sm=Semaphore(5)

    for i in range(23):

        t=Thread(target=func)

        t.start()

 

1.2.2 课堂实例

from threading import Thread,Semaphore,current_thread

import time,random

 

sm=Semaphore(5)

 

def task():

    with sm:

        print(‘%s is laing‘ %current_thread().getName())

        time.sleep(random.randint(1,3))

 

if __name__ == ‘__main__‘:

    for i in range(20):

        t=Thread(target=task)

        t.start()

 

 

与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

 

互斥锁与信号量推荐博客:http://url.cn/5DMsS9r

 

1.3 Event事件

同进程的一样

 

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

1.3.1 event参数

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

 

1.3.2 博客实例

 技术分享图片

 

例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作

 

from threading import Thread,Event

import threading

import time,random

def conn_mysql():

    count=1

    while not event.is_set():

        if count > 3:

            raise TimeoutError(‘链接超时‘)

        print(‘<%s>第%s次尝试链接‘ % (threading.current_thread().getName(), count))

        event.wait(0.5)

        count+=1

    print(‘<%s>链接成功‘ %threading.current_thread().getName())

 

 

def check_mysql():

    print(‘\033[45m[%s]正在检查mysql\033[0m‘ % threading.current_thread().getName())

    time.sleep(random.randint(2,4))

    event.set()

if __name__ == ‘__main__‘:

    event=Event()

    conn1=Thread(target=conn_mysql)

    conn2=Thread(target=conn_mysql)

    check=Thread(target=check_mysql)

 

    conn1.start()

    conn2.start()

    check.start()

 

 

1.3.3 课堂实例

from threading import Thread,Event,current_thread

import time

 

event=Event()

 

def check():

    print(‘checking MySQL...‘)

    time.sleep(5)

    event.set()

 

def conn():

    count=1

    while not event.is_set():

        if count > 3:

            raise TimeoutError(‘超时‘)

        print(‘%s try to connect MySQL time %s‘ %(current_thread().getName(),count))

        event.wait(2)

        count+=1

 

    print(‘%s connected MySQL‘ %current_thread().getName())

 

if __name__ == ‘__main__‘:

    t1=Thread(target=check)

    t2=Thread(target=conn)

    t3=Thread(target=conn)

    t4=Thread(target=conn)

 

 

    t1.start()

    t2.start()

    t3.start()

    t4.start()

 

 

 

1.4 定时器

定时器,指定n秒后执行某操作

1.4.1 实例

from threading import Timer

 

 

def hello():

    print("hello, world")

 

t = Timer(1, hello)

t.start()  # after 1 seconds, "hello, world" will be printed

 

1.4.2 验证码定时器实例

from threading import Timer

import random,time

 

class Code:

    def __init__(self):

        self.make_cache()

 

    def make_cache(self,interval=5):

        self.cache=self.make_code()

        print(self.cache)

        self.t=Timer(interval,self.make_cache)

        self.t.start()

 

    def make_code(self,n=4):

        res=‘‘

        for i in range(n):

            s1=str(random.randint(0,9))

            s2=chr(random.randint(65,90))

            res+=random.choice([s1,s2])

        return res

 

    def check(self):

        while True:

            inp=input(‘>>: ‘).strip()

            if inp.upper() ==  self.cache:

                print(‘验证成功‘,end=‘\n‘)

                self.t.cancel()

                break

 

 

if __name__ == ‘__main__‘:

    obj=Code()

    obj.check()

 

 

1.5 线程queue队列

queue队列 :使用import queue,用法与进程Queue一样

 

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

 

1.5.1 先进先出class queue.Queue(maxsize=0)

import queue

 

q=queue.Queue()

q.put(‘first‘)

q.put(‘second‘)

q.put(‘third‘)

 

print(q.get())

print(q.get())

print(q.get())

‘‘‘

结果(先进先出):

first

second

third

‘‘‘

 

1.5.2 堆栈(后进先出)class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue

 

q=queue.LifoQueue()

q.put(‘first‘)

q.put(‘second‘)

q.put(‘third‘)

 

print(q.get())

print(q.get())

print(q.get())

‘‘‘

结果(后进先出):

third

second

first

‘‘‘

 

 

1.5.3 优先级队列class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

import queue

 

q=queue.PriorityQueue()

#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高

q.put((20,‘a‘))

q.put((10,‘b‘))

q.put((30,‘c‘))

 

print(q.get())

print(q.get())

print(q.get())

‘‘‘

结果(数字越小优先级越高,优先级高的优先出队):

(10, ‘b‘)

(20, ‘a‘)

(30, ‘c‘)

‘‘‘

 

1.5.4 课堂讲解实例

import queue

 

# q=queue.Queue(3) #队列:先进先出

#

# q.put(1)

# q.put(2)

# q.put(3)

# # q.put(4)

# # q.put_nowait(4)

# # q.put(4,block=False)

# q.put(4,block=True,timeout=3)

#

#

# # print(q.get())

# # print(q.get())

# # print(q.get())

 

# q=queue.LifoQueue(3) #堆栈:后进先出

# q.put(1)

# q.put(2)

# q.put(3)

#

# print(q.get())

# print(q.get())

# print(q.get())

 

q=queue.PriorityQueue(3) #优先级队列

q.put((10,‘a‘))

q.put((-3,‘b‘))

q.put((100,‘c‘))

 

print(q.get())

print(q.get())

print(q.get())

 

 

1.6 Python标准模块--concurrent.futures进程池与线程池

1.6.1 提交任务的两种方式:

#同步调用:提交完任务后,就在原地等待,等待任务执行完毕,拿到任务的返回值,才能继续下一行代码,导致程序串行执行

#异步调用+回调机制:提交完任务后,不在原地等待,任务一旦执行完毕就会触发回调函数的执行, 程序是并发执行

 

1.6.2 进程的执行状态:

#阻塞

#非阻塞

 

1.6.3 concurrent.futures

#1 介绍

concurrent.futures模块提供了高度封装的异步调用接口

ThreadPoolExecutor:线程池,提供异步调用

ProcessPoolExecutor: 进程池,提供异步调用

Both implement the same interface, which is defined by the abstract Executor class.

 

#2 基本方法

#submit(fn, *args, **kwargs)

异步提交任务

 

#map(func, *iterables, timeout=None, chunksize=1)

取代for循环submit的操作

 

#shutdown(wait=True)

相当于进程池的pool.close()+pool.join()操作

wait=True,等待池内所有任务执行完毕回收完资源后才继续

wait=False,立即返回,并不会等待池内的任务执行完毕

但不管wait参数为何值,整个程序都会等到所有任务执行完毕

submit和map必须在shutdown之前

 

#result(timeout=None)

取得结果

 

#add_done_callback(fn)

回调函数

 

1.6.4 相关实例

################ProcessPoolExecutor#############

#介绍

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

 

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)

An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.

 

 

#用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

 

import os,time,random

def task(n):

    print(‘%s is runing‘ %os.getpid())

    time.sleep(random.randint(1,3))

    return n**2

 

if __name__ == ‘__main__‘:

 

    executor=ProcessPoolExecutor(max_workers=3)

 

    futures=[]

    for i in range(11):

        future=executor.submit(task,i)

        futures.append(future)

    executor.shutdown(True)

    print(‘+++>‘)

    for future in futures:

        print(future.result())

 

 

#######################ThreadPoolExecutor######################

#介绍

ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=‘‘)

An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

 

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

 

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

 

#用法

与ProcessPoolExecutor相同

 

 

###########################map的用法########################

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

 

import os,time,random

def task(n):

    print(‘%s is runing‘ %os.getpid())

    time.sleep(random.randint(1,3))

    return n**2

 

if __name__ == ‘__main__‘:

 

    executor=ThreadPoolExecutor(max_workers=3)

 

    # for i in range(11):

    #     future=executor.submit(task,i)

 

    executor.map(task,range(1,12)) #map取代了for+submit

 

 

#######################回调函数####################

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

from multiprocessing import Pool

import requests

import json

import os

 

def get_page(url):

    print(‘<进程%s> get %s‘ %(os.getpid(),url))

    respone=requests.get(url)

    if respone.status_code == 200:

        return {‘url‘:url,‘text‘:respone.text}

 

def parse_page(res):

    res=res.result()

    print(‘<进程%s> parse %s‘ %(os.getpid(),res[‘url‘]))

    parse_res=‘url:<%s> size:[%s]\n‘ %(res[‘url‘],len(res[‘text‘]))

    with open(‘db.txt‘,‘a‘) as f:

        f.write(parse_res)

 

 

if __name__ == ‘__main__‘:

    urls=[

        ‘https://www.baidu.com‘,

        ‘https://www.python.org‘,

        ‘https://www.openstack.org‘,

        ‘https://help.github.com/‘,

        ‘http://www.sina.com.cn/‘

    ]

 

    # p=Pool(3)

    # for url in urls:

    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)

    # p.close()

    # p.join()

 

    p=ProcessPoolExecutor(3)

    for url in urls:

        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果

 

 

1.6.5 进程池ThreadPoolExecutor

# #同步调用示例:

# # from multiprocessing import Pool

# from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

# import time,random,os

#

# def task(n):

#     print(‘%s is ruuning‘ %os.getpid())

#     time.sleep(random.randint(1,3))

#     return n**2

#

# def handle(res):

#     print(‘handle res %s‘ %res)

#

# if __name__ == ‘__main__‘:

#     #同步调用

#     pool=ProcessPoolExecutor(2)

#

#     for i in range(5):

#         res=pool.submit(task,i).result()

#         # print(res)

#         handle(res)

#

#     pool.shutdown(wait=True)

#     # pool.submit(task,33333)

#     print(‘主‘)

 

 

#异步调用示例:

# from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

# import time,random,os

#

# def task(n):

#     print(‘%s is ruuning‘ %os.getpid())

#     time.sleep(random.randint(1,3))

#     # res=n**2

#     # handle(res)

#     return n**2

#

# def handle(res):

#     res=res.result()

#     print(‘handle res %s‘ %res)

#

# if __name__ == ‘__main__‘:

#     #异步调用

#     pool=ProcessPoolExecutor(2)

#

#     for i in range(5):

#         obj=pool.submit(task,i)

#         obj.add_done_callback(handle) #handle(obj)

#

#     pool.shutdown(wait=True)

#     print(‘主‘)

 

 

1.6.6 线程池ProcessPoolExecutor

#线程池

from concurrent.futures import ThreadPoolExecutor

from threading import current_thread

import requests

import time

 

def get(url):

    print(‘%s GET %s‘ %(current_thread().getName(),url))

    response=requests.get(url)

    time.sleep(2)

    if response.status_code == 200:

        return {‘url‘:url,‘content‘:response.text}

 

def parse(res):

    res=res.result()

    print(‘parse:[%s] res:[%s]‘ %(res[‘url‘],len(res[‘content‘])))

 

 

if __name__ == ‘__main__‘:

    pool=ThreadPoolExecutor(2)

 

    urls=[

        ‘https://www.baidu.com‘,

        ‘https://www.python.org‘,

        ‘https://www.openstack.org‘,

        ‘https://www.openstack.org‘,

        ‘https://www.openstack.org‘,

        ‘https://www.openstack.org‘,

        ‘https://www.openstack.org‘,

        ‘https://www.openstack.org‘,

        ‘https://www.openstack.org‘,

        ‘https://www.openstack.org‘,

        ‘https://www.openstack.org‘,

        ‘https://www.openstack.org‘,

    ]

    for url in urls:

        pool.submit(get,url).add_done_callback(parse)

 

    pool.shutdown(wait=True)

 

 

第2章 python并发编程之协程

2.1 引子

本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态

    cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长或有一个优先级更高的程序替代了它

 技术分享图片

 

ps:在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态

    一:其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。为此我们可以基于yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下:

#1 yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级

#2 send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换 

 

2.1.1 单纯地切换反而会降低运行效率

##############单纯地切换反而会降低运行效率######################

#串行执行

import time

def consumer(res):

    ‘‘‘任务1:接收数据,处理数据‘‘‘

    pass

 

def producer():

    ‘‘‘任务2:生产数据‘‘‘

    res=[]

    for i in range(10000000):

        res.append(i)

    return res

 

start=time.time()

#串行执行

res=producer()

consumer(res) #写成consumer(producer())会降低执行效率

stop=time.time()

print(stop-start) #1.5536692142486572

 

 

 

#基于yield并发执行

import time

def consumer():

    ‘‘‘任务1:接收数据,处理数据‘‘‘

    while True:

        x=yield

 

def producer():

    ‘‘‘任务2:生产数据‘‘‘

    g=consumer()

    next(g)

    for i in range(10000000):

        g.send(i)

 

start=time.time()

#基于yield保存状态,实现两个任务直接来回切换,即并发的效果

#PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.

producer()

 

stop=time.time()

print(stop-start) #2.0272178649902344

 

 

2.1.2 yield并不能实现遇到io切换

二:第一种情况的切换。在任务一遇到io情况下,切到任务二去执行,这样就可以利用任务一阻塞的时间完成任务二的计算,效率的提升就在于此。

################yield并不能实现遇到io切换####################

import time

def consumer():

    ‘‘‘任务1:接收数据,处理数据‘‘‘

    while True:

        x=yield

 

def producer():

    ‘‘‘任务2:生产数据‘‘‘

    g=consumer()

    next(g)

    for i in range(10000000):

        g.send(i)

        time.sleep(2)

 

start=time.time()

producer() #并发执行,但是任务producer遇到io就会阻塞住,并不会切到该线程内的其他任务去执行

 

stop=time.time()

print(stop-start)

 

对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。

 

    协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:

#1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。

 

#2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换

 

2.1.3 课堂实例

# 单纯地切换反而会降低运行效率

#串行执行

# import time

# def consumer(res):

#     ‘‘‘任务1:接收数据,处理数据‘‘‘

#     pass

#

# def producer():

#     ‘‘‘任务2:生产数据‘‘‘

#     res=[]

#     for i in range(10000000):

#         res.append(i)

#     return res

#

# start=time.time()

# #串行执行

# res=producer()

# consumer(res)

# stop=time.time()

# print(stop-start)

 

 

 

#基于yield并发执行

import time

def consumer():

    ‘‘‘任务1:接收数据,处理数据‘‘‘

    while True:

        print(‘consumer‘)

        x=yield

        time.sleep(100)

 

def producer():

    ‘‘‘任务2:生产数据‘‘‘

    g=consumer()

    next(g)

    for i in range(10000000):

        print(‘producer‘)

        g.send(i)

 

start=time.time()

#基于yield保存状态,实现两个任务直接来回切换,即并发的效果

#PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.

producer()

 

stop=time.time()

print(stop-start) #

 

 

 

2.2 协程介绍

2.2.1 协程概念

协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

2.2.2 强调

需要强调的是:

#1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)

#2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)

对比操作系统控制线程的切换,用户在单线程内控制协程的切换

2.2.3 优缺点

优点如下:

#1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

#2. 单线程内就可以实现并发的效果,最大限度地利用cpu

缺点如下:

#1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程

#2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

2.2.4 总结

总结协程特点:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

 

2.3 Greenlet

2.3.1 博客实例

如果我们在单个线程内有20个任务,要想实现在多个任务之间切换,使用yield生成器的方式过于麻烦(需要先得到初始化一次的生成器,然后再调用send。。。非常麻烦),而使用greenlet模块可以非常简单地实现这20个任务直接的切换

#安装

pip3 install greenlet

 

 

from greenlet import greenlet

 

def eat(name):

    print(‘%s eat 1‘ %name)

    g2.switch(‘egon‘)

    print(‘%s eat 2‘ %name)

    g2.switch()

def play(name):

    print(‘%s play 1‘ %name)

    g1.switch()

    print(‘%s play 2‘ %name)

 

g1=greenlet(eat)

g2=greenlet(play)

 

g1.switch(‘egon‘)#可以在第一次switch时传入参数,以后都不需要

 

单纯的切换(在没有io的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度

 

#顺序执行

import time

def f1():

    res=1

    for i in range(100000000):

        res+=i

 

def f2():

    res=1

    for i in range(100000000):

        res*=i

 

start=time.time()

f1()

f2()

stop=time.time()

print(‘run time is %s‘ %(stop-start)) #10.985628366470337

 

#切换

from greenlet import greenlet

import time

def f1():

    res=1

    for i in range(100000000):

        res+=i

        g2.switch()

 

def f2():

    res=1

    for i in range(100000000):

        res*=i

        g1.switch()

 

start=time.time()

g1=greenlet(f1)

g2=greenlet(f2)

g1.switch()

stop=time.time()

print(‘run time is %s‘ %(stop-start)) # 52.763017892837524

 

greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时如果遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。

 

单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。

 

2.3.2 课堂实例

#pip3 install greenlet

from greenlet import greenlet

import time

 

def eat(name):

    print(‘%s eat 1‘ %name)

    time.sleep(1000)

    g2.switch(‘egon‘)

    print(‘%s eat 2‘ %name)

    g2.switch()

 

 

def play(name):

    print(‘%s play 1‘ % name)

    g1.switch()

    print(‘%s play 2‘ % name)

 

g1=greenlet(eat)

g2=greenlet(play)

 

g1.switch(‘egon‘)

 

 

2.4 Gevent介绍

#安装

pip3 install gevent

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

2.4.1 用法

#用法

g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的

g2=gevent.spawn(func2)

g1.join() #等待g1结束

g2.join() #等待g2结束

#或者上述两步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

 

2.4.2 遇到IO阻塞时会自动切换任务

import gevent

def eat(name):

    print(‘%s eat 1‘ %name)

    gevent.sleep(2)

    print(‘%s eat 2‘ %name)

 

def play(name):

    print(‘%s play 1‘ %name)

    gevent.sleep(1)

    print(‘%s play 2‘ %name)

 

 

g1=gevent.spawn(eat,‘egon‘)

g2=gevent.spawn(play,name=‘egon‘)

g1.join()

g2.join()

#或者gevent.joinall([g1,g2])

print(‘主‘)

 

上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,

而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了

from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前

或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头。

from gevent import monkey;monkey.patch_all()

 

import gevent

import time

def eat():

    print(‘eat food 1‘)

    time.sleep(2)

    print(‘eat food 2‘)

 

def play():

    print(‘play 1‘)

    time.sleep(1)

    print(‘play 2‘)

 

g1=gevent.spawn(eat)

g2=gevent.spawn(play_phone)

gevent.joinall([g1,g2])

print(‘主‘)

 

我们可以用threading.current_thread().getName()来查看每个g1和g2,查看的结果为DummyThread-n,即假线程。

 

2.4.3 课堂实例

from gevent import monkey;monkey.patch_all()

import gevent

import time

 

def eat(name):

    print(‘%s eat 1‘ %name)

    # gevent.sleep(3)

    time.sleep(3)

    print(‘%s eat 2‘ %name)

 

 

def play(name):

    print(‘%s play 1‘ % name)

    # gevent.sleep(2)

    time.sleep(3)

    print(‘%s play 2‘ % name)

 

g1=gevent.spawn(eat,‘egon‘)

g2=gevent.spawn(play,‘alex‘)

# gevent.sleep(1)

 

# g1.join()

# g2.join()

gevent.joinall([g1,g2])

 

2.5 Gevent之同步与异步

from gevent import spawn,joinall,monkey;monkey.patch_all()

 

import time

def task(pid):

    """

    Some non-deterministic task

    """

    time.sleep(0.5)

    print(‘Task %s done‘ % pid)

 

 

def synchronous():

    for i in range(10):

        task(i)

 

def asynchronous():

    g_l=[spawn(task,i) for i in range(10)]

    joinall(g_l)

 

if __name__ == ‘__main__‘:

    print(‘Synchronous:‘)

    synchronous()

 

    print(‘Asynchronous:‘)

    asynchronous()

#上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。

 

2.6 Gevent之应用举例一

##########################协程应用:爬虫############################3

from gevent import monkey;monkey.patch_all()

import gevent

import requests

import time

 

def get_page(url):

    print(‘GET: %s‘ %url)

    response=requests.get(url)

    if response.status_code == 200:

        print(‘%d bytes received from %s‘ %(len(response.text),url))

 

 

start_time=time.time()

gevent.joinall([

    gevent.spawn(get_page,‘https://www.python.org/‘),

    gevent.spawn(get_page,‘https://www.yahoo.com/‘),

    gevent.spawn(get_page,‘https://github.com/‘),

])

stop_time=time.time()

print(‘run time is %s‘ %(stop_time-start_time))

 

 

2.7 Gevent之应用举例二

通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)

2.7.1 服务端

####################服务端#####################

 

from gevent import monkey;monkey.patch_all()

from socket import *

import gevent

 

#如果不想用money.patch_all()打补丁,可以用gevent自带的socket

# from gevent import socket

# s=socket.socket()

 

def server(server_ip,port):

    s=socket(AF_INET,SOCK_STREAM)

    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

    s.bind((server_ip,port))

    s.listen(5)

    while True:

        conn,addr=s.accept()

        gevent.spawn(talk,conn,addr)

 

def talk(conn,addr):

    try:

        while True:

            res=conn.recv(1024)

            print(‘client %s:%s msg: %s‘ %(addr[0],addr[1],res))

            conn.send(res.upper())

    except Exception as e:

        print(e)

    finally:

        conn.close()

 

if __name__ == ‘__main__‘:

    server(‘127.0.0.1‘,8080)

 

 

2.7.2 客户端

##################客户端###################

#_*_coding:utf-8_*_

__author__ = ‘Linhaifeng‘

 

from socket import *

 

client=socket(AF_INET,SOCK_STREAM)

client.connect((‘127.0.0.1‘,8080))

 

 

while True:

    msg=input(‘>>: ‘).strip()

    if not msg:continue

 

    client.send(msg.encode(‘utf-8‘))

    msg=client.recv(1024)

    print(msg.decode(‘utf-8‘))

 

 

2.7.3 多线程并发多个客户端

########################多线程并发多个客户端##########################

from threading import Thread

from socket import *

import threading

 

def client(server_ip,port):

    c=socket(AF_INET,SOCK_STREAM) #套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了

    c.connect((server_ip,port))

 

    count=0

    while True:

        c.send((‘%s say hello %s‘ %(threading.current_thread().getName(),count)).encode(‘utf-8‘))

        msg=c.recv(1024)

        print(msg.decode(‘utf-8‘))

        count+=1

if __name__ == ‘__main__‘:

    for i in range(500):

        t=Thread(target=client,args=(‘127.0.0.1‘,8080))

        t.start()

 

 

2.8 课堂讲解练习

2.8.1 服务端

from gevent import monkey,spawn;monkey.patch_all()

from threading import current_thread

from socket import *

 

def comunicate(conn):

    print(‘子线程:%s‘ %current_thread().getName())

    while True:

        try:

            data=conn.recv(1024)

            if not data:break

            conn.send(data.upper())

        except ConnectionResetError:

            break

    conn.close()

 

def server(ip,port):

    print(‘主线程:%s‘ %current_thread().getName())

    server = socket(AF_INET, SOCK_STREAM)

    server.bind((ip,port))

    server.listen(5)

 

    while True:

        conn, addr = server.accept()

        print(addr)

        # comunicate(conn)

        # t=Thread(target=comunicate,args=(conn,))

        # t.start()

        spawn(comunicate,conn)

 

    server.close()

 

if __name__ == ‘__main__‘:

    g=spawn(server,‘127.0.0.1‘, 8081)

    g.join()

 

2.8.2 客户端

from socket import *

from threading import current_thread,Thread

 

def client():

    client=socket(AF_INET,SOCK_STREAM)

    client.connect((‘127.0.0.1‘,8081))

 

    while True:

        client.send((‘%s say hello‘ %current_thread().getName()).encode(‘utf-8‘))

        data=client.recv(1024)

        print(data.decode(‘utf-8‘))

 

    client.close()

 

if __name__ == ‘__main__‘:

    for i in range(500):

        t=Thread(target=client)

        t.start()

 

 

第3章 mysql一:初识数据库

3.1 数据库管理软件的由来

基于我们之前所学,数据要想永久保存,都是保存于文件中,毫无疑问,一个文件仅仅只能存在于某一台机器上。

如果我们暂且忽略直接基于文件来存取数据的效率问题,并且假设程序所有的组件都运行在一台机器上,那么用文件存取数据,并没有问题。

很不幸,这些假设都是你自己意淫出来的,上述假设存在以下几个问题。。。。。。

3.1.1 程序所有的组件就不可能运行在一台机器上

#因为这台机器一旦挂掉则意味着整个软件的崩溃,并且程序的执行效率依赖于承载它的硬件,而一台机器机器的性能总归是有限的,受限于目前的硬件水平,就一台机器的性能垂直进行扩展是有极限的。

 

#于是我们只能通过水平扩展来增强我们系统的整体性能,这就需要我们将程序的各个组件分布于多台机器去执行。

 

3.1.2 数据安全问题

#根据1的描述,我们将程序的各个组件分布到各台机器,但需知各组件仍然是一个整体,言外之意,所有组件的数据还是要共享的。但每台机器上的组件都只能操作本机的文件,这就导致了数据必然不一致。

 

#于是我们想到了将数据与应用程序分离:把文件存放于一台机器,然后将多台机器通过网络去访问这台机器上的文件(用socket实现),即共享这台机器上的文件,共享则意味着竞争,会发生数据不安全,需要加锁处理。。。。

 

3.1.3 并发

根据2的描述,我们必须写一个socket服务端来管理这台机器(数据库服务器)上的文件,然后写一个socket客户端,完成如下功能:

#1.远程连接(支持并发)

#2.打开文件

#3.读写(加锁)

#4.关闭文件

 

3.1.4 总结:

#我们在编写任何程序之前,都需要事先写好基于网络操作一台主机上文件的程序(socket服务端与客户端程序),于是有人将此类程序写成一个专门的处理软件,这就是mysql等数据库管理软件的由来,但mysql解决的不仅仅是数据共享的问题,还有查询效率,安全性等一系列问题,总之,把程序员从数据管理中解脱出来,专注于自己的程序逻辑的编写。

 

3.2 数据库概述

3.2.1 什么是数据(Data)

描述事物的符号记录称为数据,描述事物的符号既可以是数字,也可以是文字、图片,图像、声音、语言等,数据由多种表现形式,它们都可以经过数字化后存入计算机

在计算机中描述一个事物,就需要抽取这一事物的典型特征,组成一条记录,就相当于文件里的一行内容,如:

egon,male,18,1999,山东,计算机系,2017,oldboy

 

单纯的一条记录并没有任何意义,如果我们按逗号作为分隔,依次定义各个字段的意思,相当于定义表的标题

name,sex,age,birth,born_addr,major,entrance_time,school #字段

egon,male,18,1999,山东,计算机系,2017,oldboy #记录

这样我们就可以了解egon,性别为男,年龄18岁,出生于1999年,出生地为山东,2017年考入老男孩计算机系

 

3.2.2 什么是数据库(DataBase,简称DB)

数据库即存放数据的仓库,只不过这个仓库是在计算机存储设备上,而且数据是按一定的格式存放的

过去人们将数据存放在文件柜里,现在数据量庞大,已经不再适用

数据库是长期存放在计算机内、有组织、可共享的数据即可。

数据库中的数据按一定的数据模型组织、描述和储存,具有较小的冗余度、较高的数据独立性和易扩展性,并可为各种 用户共享

3.2.3 什么是数据库管理系统(DataBase Management System 简称DBMS)

在了解了Data与DB的概念后,如何科学地组织和存储数据,如何高效获取和维护数据成了关键

这就用到了一个系统软件---数据库管理系统

如MySQL、Oracle、SQLite、Access、MS SQL Server

mysql主要用于大型门户,例如搜狗、新浪等,它主要的优势就是开放源代码,因为开放源代码这个数据库是免费的,他现在是甲骨文公司的产品。

oracle主要用于银行、铁路、飞机场等。该数据库功能强大,软件费用高。也是甲骨文公司的产品。

sql server是微软公司的产品,主要应用于大中型企业,如联想、方正等。

 

3.2.4 数据库服务器、数据管理系统、数据库、表与记录的关系(重点理解!!!)

记录:1 刘海龙  324245234 22(多个字段的信息组成一条记录,即文件中的一行内容)

表:student,scholl,class_list(即文件)

数据库:oldboy_stu(即文件夹)

数据库管理系统:如mysql(是一个软件)

数据库服务器:一台计算机(对内存要求比较高)

总结:

    数据库服务器-:运行数据库管理软件

    数据库管理软件:管理-数据库

    数据库:即文件夹,用来组织文件/表

    表:即文件,用来存放多行内容/多条记录

 技术分享图片

 

 

3.3 mysql介绍

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,目前属于 Oracle 旗下公司。MySQL 最流行的关系型数据库管理系统,在 WEB 应用方面MySQL是最好的 RDBMS (Relational Database Management System,关系数据库管理系统) 应用软件之一。

3.3.1 mysql是什么

#mysql就是一个基于socket编写的C/S架构的软件

#客户端软件

  mysql自带:如mysql命令,mysqldump命令等

  python模块:如pymysql

 

3.3.2 数据库管理软件分类

#分两大类:

  关系型:如sqllite,db2,oracle,access,sql server,MySQL,注意:sql语句通用

  非关系型:mongodb,redis,memcache

 

#可以简单的理解为:

    关系型数据库需要有表结构

    非关系型数据库是key-value存储的,没有表结构

 

3.4 下载安装

3.4.1 Linux版本

#二进制rpm包安装

yum -y install mysql-server mysql

 

########################源码安装mysql######################3

1.解压tar包

cd /software

tar -xzvf mysql-5.6.21-linux-glibc2.5-x86_64.tar.gz

mv mysql-5.6.21-linux-glibc2.5-x86_64 mysql-5.6.21

 

2.添加用户与组

groupadd mysql

useradd -r -g mysql mysql

chown -R mysql:mysql mysql-5.6.21

 

3.安装数据库

su mysql

cd mysql-5.6.21/scripts

./mysql_install_db --user=mysql --basedir=/software/mysql-5.6.21 --datadir=/software/mysql-5.6.21/data

 

4.配置文件

cd /software/mysql-5.6.21/support-files

cp my-default.cnf /etc/my.cnf

cp mysql.server /etc/init.d/mysql

vim /etc/init.d/mysql   #若mysql的安装目录是/usr/local/mysql,则可省略此步

修改文件中的两个变更值

basedir=/software/mysql-5.6.21

datadir=/software/mysql-5.6.21/data

 

5.配置环境变量

vim /etc/profile

export MYSQL_HOME="/software/mysql-5.6.21"

export PATH="$PATH:$MYSQL_HOME/bin"

source /etc/profile

 

6.添加自启动服务

chkconfig --add mysql

chkconfig mysql on

 

7.启动mysql

service mysql start

 

8.登录mysql及改密码与配置远程访问

mysqladmin -u root password ‘your_password‘     #修改root用户密码

mysql -u root -p     #登录mysql,需要输入密码

mysql>GRANT ALL PRIVILEGES ON *.* TO ‘root‘@‘%‘ IDENTIFIED BY ‘your_password‘ WITH GRANT OPTION;     #允许root用户远程访问

mysql>FLUSH PRIVILEGES;     #刷新权限

 

 

 

###########################源码安装mariadb####################3

1. 解压

tar zxvf  mariadb-5.5.31-linux-x86_64.tar.gz  

mv mariadb-5.5.31-linux-x86_64 /usr/local/mysql //必需这样,很多脚本或可执行程序都会直接访问这个目录

 

2. 权限

groupadd mysql             //增加 mysql 属组

useradd -g mysql mysql     //增加 mysql 用户 并归于mysql 属组

chown mysql:mysql -Rf  /usr/local/mysql    // 设置 mysql 目录的用户及用户组归属。

chmod +x -Rf /usr/local/mysql    //赐予可执行权限

 

3. 拷贝配置文件

cp /usr/local/mysql/support-files/my-medium.cnf /etc/my.cnf     //复制默认mysql配置 文件到/etc目录

 

4. 初始化

/usr/local/mysql/scripts/mysql_install_db --user=mysql          //初始化数据库

cp  /usr/local/mysql/support-files/mysql.server    /etc/init.d/mysql    //复制mysql服务程序 到系统目录

chkconfig  mysql on     //添加mysql 至系统服务并设置为开机启动

service  mysql  start  //启动mysql

 

5. 环境变量配置

vim /etc/profile   //编辑profile,将mysql的可执行路径加入系统PATH

export PATH=/usr/local/mysql/bin:$PATH

source /etc/profile  //使PATH生效。

 

6. 账号密码

mysqladmin -u root password ‘yourpassword‘ //设定root账号及密码

mysql -u root -p  //使用root用

热门排行

今日推荐

热门手游