文章目录
前言
系统定制开发最近有一项任务,系统定制开发将视频中目标检测的结系统定制开发果传输给前端。系统定制开发这个任务很好实现,按照实际,系统定制开发对每一帧的视频进行检系统定制开发测然后返回图像流在前系统定制开发端进行展示。系统定制开发然而上头要求不返回,系统定制开发要的是返回检测结果。系统定制开发听到这项任务就纳闷,系统定制开发理论上只需要返回数据流,系统定制开发但是又感觉哪里有点说不出奇怪,系统定制开发于是写下这篇文章梳理整个视频流返回。本篇博客主要参考:和。代码请参考:。
Streaming
在中用到流的主要有以下两个应用场景:
- large response
在返回的数据块较大的时候,使用流的方式生成并返回,是一种较好的解决方案。当然你也可以将返回响应写在磁盘中,然后返回一个文件flask.send_file()
。但是这种情况会增加额外的I/O开销。 - 实时数据传输
实时数据传输,如视频或者语音传输,可以使用流传输。
Flask实现流
Flask通过使用generator functions
为流响应提供支持,一个generator function
如下所示:
def gen(): yield 1 yield 2 yield 3
- 1
- 2
- 3
- 4
通过上面简单理解了生成器,接下来下面的实例显示了如何使用流来处理生成大型数据报表并返回:
from flask import Response, render_templatefrom app.models import Stockdef generate_stock_table(): yield render_template('stock_header.html') for stock in Stock.query.all(): yield render_template('stock_row.html', stock=stock) yield render_template('stock_footer.html')@app.route('/stock-table')def stock_table(): return Response(generate_stock_table())
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
在这个例子当中,返回流的响应路由需要返回一个由使用生成器函数初始化对象的Response
,然后Flask
负责调用生成器将结果分块发送给客户端。这样做的好处是程序当中需要生成一个很大的数据块,而通过流传输,响应返回请求不会随着你的块变大而变大。
流除了能够将将数据块大的进行分块之外,还能提供Multipart Responses
。在这一方面最主要的应用场景是视频流或者音频流的返回播放。在这当中,流的一个有趣用途是让每个块替换页面的前一个块,这使得流能够在浏览器窗口中“播放”。Multipart/Response
由一个包含多部分内容类型之一的标头组成,紧接着是边界标记的分割部分,每个部分都有自己的特定内容类型。以下是Multipart
视频流的结构:
HTTP/1.1 200 OKContent-Type: multipart/x-mixed-replace; boundary=frame--frameContent-Type: image/jpeg<jpeg data here>--frameContent-Type: image/jpeg<jpeg data here>...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
如上所述,头的Content-Type
设置为multipart/x-mixed-replace
以及定义bouondary。然后包括每一帧数据,以--
为前缀,并在各自行中添加边界字符串以及Content-type
的标头,每个部分都可以选择包含一个Content-Length
,说明有效payload的字节长度。
在了解完上面基础知识后,接下来就构建实时视频流服务器。原理比较简单,或者视频中的每一帧然后以流的方式通过Multipart/Response
返回给客户端。
构建实时视频流
一个简单的FlaskWeb程序,提供Motion JPEG流,注意Motion JPEG应用广泛。这种方法延迟低,但质量不是最好的,因为 JPEG 压缩对于运动视频不是很有效。
从摄像机中获取视频帧:
from time import timeclass Camera(object): def __init__(self): self.frames = [open(f + '.jpg', 'rb').read() for f in ['1', '2', '3']] def get_frame(self): return self.frames[int(time()) % 3]
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
上面一部分代码是实例,针对没有摄像头设备的进行调试,通过在读取工程下面的图像来构建图像流。
#!/usr/bin/env pythonfrom flask import Flask, render_template, Responsefrom camera import Cameraapp = Flask(__name__)@app.route('/')def index(): return render_template('index.html')def gen(camera): while True: frame = camera.get_frame() yield (b'--frame\r' b'Content-Type: image/jpeg\r\
' + frame + b'\r')@app.route('/video_feed')def video_feed(): return Response(gen(Camera()), mimetype='multipart/x-mixed-replace; boundary=frame')if __name__ == '__main__': app.run(host='0.0.0.0', debug=True)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
此应用程序定义了一个Camera
负责提供帧序列的类。前端HTML内容:
<html> <head> <title>Video Streaming Demonstration</title> </head> <body> <h1>Video Streaming Demonstration</h1> <img src="{{ url_for('video_feed') }}"> </body></html>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
video_feed
路由中调用gen
生成器函数,该函数调用```Camera``类来获取视频流。整个流程都是比较简单。然而使用流也有一些限制,当Flask应用程序处理常规请求的时候,请求周期很短。Web Worker接受请求,调用处理函数并最终返回响应给客户端。当客户端接收到的是流,那么需要在流传输期间,客户端要保持连接。另一方面当客户端断开连接的时候,服务端可能也在一直给客户端提供服务,难以关闭流的传输,同时该服务只能提供给相同Web Worker数的客户端。有一些方法能够克服以上的问题,那就是使用协程或者多线程。接下来看看如何对上面的程序进行优化。
视频流优化
上面的视频流程序主要存在两个问题一是如何结束传输数据流,二是如何单个服务给多个客户端提供服务。
首先对于第一个问题,原理是记录最后一次响应的时间戳,如果最后一次响应时间戳与当前时间戳相差大于阈值(可以设定为十秒,但是不能过小,否则会导致无法正常请求)。下面是优化的代码:
- 定义
Camera
基类:
class BaseCamera(object): thread = None # background thread that reads frames from camera frame = None # current frame is stored here by background thread last_access = 0 # time of last client access to the camera # ... @staticmethod def frames(): """Generator that returns frames from the camera.""" raise RuntimeError('Must be implemented by subclasses.') @classmethod def _thread(cls): """Camera background thread.""" print('Starting camera thread.') frames_iterator = cls.frames() for frame in frames_iterator: BaseCamera.frame = frame # if there hasn't been any clients asking for frames in # the last 10 seconds then stop the thread if time.time() - BaseCamera.last_access > 10: frames_iterator.close() print('Stopping camera thread due to inactivity.') break BaseCamera.thread = None
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 继承
BaseCamera
的Camera
类:
class Camera(BaseCamera): """An emulated camera implementation that streams a repeated sequence of files 1.jpg, 2.jpg and 3.jpg at a rate of one frame per second.""" imgs = [open(f + '.jpg', 'rb').read() for f in ['1', '2', '3']] @staticmethod def frames(): while True: time.sleep(1) yield Camera.imgs[int(time.time()) % 3]
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
接着对于第二个问题,针对多客户端请求方式性能提升问题,可以使用多线程的方式进行处理,另一方面在测试中发现服务器消耗了大量的 CPU。原因是后台线程捕获帧和将这些帧提供给客户端的生成器之间没有同步。两者都尽可能快地运行,而不考虑对方的速度。
所以需要有一种机制,生成器只向客户端传递原始帧,如果生成器内部的传递循环比相机线程的帧速率快,那么生成器应该等到有新的帧可用,以便它自己调整速度以匹配相机速率。另一方面,如果传递循环的运行速度比相机线程慢,那么它在处理帧时永远不会落后,而是应该跳过帧以始终传递最新的帧。解决方案是让相机线程在新帧可用时向正在运行的生成器发出信号。然后,生成器可以在发送下一帧之前等待信号时阻塞。
为了避免在生成器中添加事件处理逻辑,实现一个自定义事件类,它使用调用者的线程 id 为每个客户端线程自动创建和管理单独的事件。
class CameraEvent(object): """An Event-like class that signals all active clients when a new frame is available. """ def __init__(self): self.events = {} def wait(self): """Invoked from each client's thread to wait for the next frame.""" ident = get_ident() if ident not in self.events: # this is a new client # add an entry for it in the self.events dict # each entry has two elements, a threading.Event() and a timestamp self.events[ident] = [threading.Event(), time.time()] return self.events[ident][0].wait() def set(self): """Invoked by the camera thread when a new frame is available.""" now = time.time() remove = None for ident, event in self.events.items(): if not event[0].isSet(): # if this client's event is not set, then set it # also update the last set timestamp to now event[0].set() event[1] = now else: # if the client's event is already set, it means the client # did not process a previous frame # if the event stays set for more than 5 seconds, then assume # the client is gone and remove it if now - event[1] > 5: remove = ident if remove: del self.events[remove] def clear(self): """Invoked from each client's thread after a frame was processed.""" self.events[get_ident()][0].clear()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
class BaseCamera(object): # ... event = CameraEvent() # ... def get_frame(self): """Return the current camera frame.""" BaseCamera.last_access = time.time() # wait for a signal from the camera thread BaseCamera.event.wait() BaseCamera.event.clear() return BaseCamera.frame @classmethod def _thread(cls): # ... for frame in frames_iterator: BaseCamera.frame = frame BaseCamera.event.set() # send signal to clients # ...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
整体代码:
base_camera.py
import timeimport threadingtry: from greenlet import getcurrent as get_identexcept ImportError: try: from thread import get_ident except ImportError: from _thread import get_identclass CameraEvent(object): """An Event-like class that signals all active clients when a new frame is available. """ def __init__(self): self.events = {} def wait(self): """Invoked from each client's thread to wait for the next frame.""" ident = get_ident() if ident not in self.events: # this is a new client # add an entry for it in the self.events dict # each entry has two elements, a threading.Event() and a timestamp self.events[ident] = [threading.Event(), time.time()] return self.events[ident][0].wait() def set(self): """Invoked by the camera thread when a new frame is available.""" now = time.time() remove = None for ident, event in self.events.items(): if not event[0].isSet(): # if this client's event is not set, then set it # also update the last set timestamp to now event[0].set() event[1] = now else: # if the client's event is already set, it means the client # did not process a previous frame # if the event stays set for more than 5 seconds, then assume # the client is gone and remove it if now - event[1] > 5: remove = ident if remove: del self.events[remove] def clear(self): """Invoked from each client's thread after a frame was processed.""" self.events[get_ident()][0].clear()class BaseCamera(object): thread = None # background thread that reads frames from camera frame = None # current frame is stored here by background thread last_access = 0 # time of last client access to the camera event = CameraEvent() def __init__(self): """Start the background camera thread if it isn't running yet.""" if BaseCamera.thread is None: BaseCamera.last_access = time.time() # start background frame thread BaseCamera.thread = threading.Thread(target=self._thread) BaseCamera.thread.start() # wait until first frame is available BaseCamera.event.wait() def get_frame(self): """Return the current camera frame.""" BaseCamera.last_access = time.time() # wait for a signal from the camera thread BaseCamera.event.wait() BaseCamera.event.clear() return BaseCamera.frame @staticmethod def frames(): """"Generator that returns frames from the camera.""" raise RuntimeError('Must be implemented by subclasses.') @classmethod def _thread(cls): """Camera background thread.""" print('Starting camera thread.') frames_iterator = cls.frames() for frame in frames_iterator: BaseCamera.frame = frame BaseCamera.event.set() # send signal to clients time.sleep(0) # if there hasn't been any clients asking for frames in # the last 10 seconds then stop the thread if time.time() - BaseCamera.last_access > 10: frames_iterator.close() print('Stopping camera thread due to inactivity.') break BaseCamera.thread = None
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
camera.py
import osimport cv2from base_camera import BaseCameraclass Camera(BaseCamera): video_source = 0 def __init__(self): if os.environ.get('OPENCV_CAMERA_SOURCE'): Camera.set_video_source(int(os.environ['OPENCV_CAMERA_SOURCE'])) super(Camera, self).__init__() @staticmethod def set_video_source(source): Camera.video_source = source @staticmethod def frames(): camera = cv2.VideoCapture(Camera.video_source) if not camera.isOpened(): raise RuntimeError('Could not start camera.') while True: # read current frame _, img = camera.read() # encode as a jpeg image and return it yield cv2.imencode('.jpg', img)[1].tobytes()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
app.py
#!/usr/bin/env pythonfrom importlib import import_moduleimport osfrom flask import Flask, render_template, Response# import camera driverif os.environ.get('CAMERA'): Camera = import_module('camera_' + os.environ['CAMERA']).Cameraelse: from camera import Camera# Raspberry Pi camera module (requires picamera package)# from camera_pi import Cameraapp = Flask(__name__)@app.route('/')def index(): """Video streaming home page.""" return render_template('index.html')def gen(camera): """Video streaming generator function.""" yield b'--frame\r' while True: frame = camera.get_frame() yield b'Content-Type: image/jpeg\r\
' + frame + b'\r--frame\r'@app.route('/video_feed')def video_feed(): """Video streaming route. Put this in the src attribute of an img tag.""" return Response(gen(Camera()), mimetype='multipart/x-mixed-replace; boundary=frame')if __name__ == '__main__': app.run(host='0.0.0.0', threaded=True)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
具体代码可以参考我的github: