aboutsummaryrefslogtreecommitdiff
path: root/voctocore/experiments/failovertest2.py
blob: 588b52e676fe9cb2ade837d201552707bb64d392 (plain)
  1. #!/usr/bin/python3
  2. from os import path
  3. import threading
  4. import gi
  5. gi.require_version('Gst', '1.0')
  6. from gi.repository import GObject, Gst
  7. GObject.threads_init()
  8. Gst.init(None)
  9. class FramegrabberSource(Gst.Bin):
  10. def __init__(self, uri):
  11. super().__init__()
  12. # Create elements
  13. self.http = Gst.ElementFactory.make('souphttpsrc', None)
  14. self.demux = Gst.ElementFactory.make('multipartdemux', None)
  15. self.parse = Gst.ElementFactory.make('jpegparse', None)
  16. self.dec = Gst.ElementFactory.make('jpegdec', None)
  17. # Add elements to Bin
  18. self.add(self.http)
  19. self.add(self.parse)
  20. self.add(self.demux)
  21. self.add(self.dec)
  22. # Set properties
  23. self.http.set_property('location', uri)
  24. self.http.set_property('timeout', 5)
  25. #self.http.set_property('is-live', True)
  26. #self.http.set_property('do-timestamp', True)
  27. # Connect signal handlers
  28. self.demux.connect('pad-added', self.on_demux_pad_added)
  29. # Link elements
  30. self.http.link(self.demux)
  31. # dynamic linked # self.demux.link(self.parse)
  32. self.parse.link(self.dec)
  33. # Add Ghost Pads
  34. self.add_pad(
  35. Gst.GhostPad.new('src', self.dec.get_static_pad('src'))
  36. )
  37. def on_demux_pad_added(self, element, pad):
  38. string = pad.query_caps(None).to_string()
  39. print('on_demux_pad_added():', string)
  40. if string.startswith('image/jpeg'):
  41. pad.link(self.parse.get_static_pad('sink'))
  42. class FailoverFilter(Gst.Bin):
  43. failcount = 1 # start initially failed
  44. watchdogcount = 1
  45. dead = False
  46. def __init__(self):
  47. super().__init__()
  48. self.queue = Gst.ElementFactory.make('queue', None)
  49. self.failsrc = Gst.ElementFactory.make('videotestsrc', None)
  50. self.capsfilter = Gst.ElementFactory.make('capsfilter', None)
  51. self.switch = Gst.ElementFactory.make('input-selector', None)
  52. # Add elements to Bin
  53. self.add(self.queue)
  54. self.add(self.failsrc)
  55. self.add(self.capsfilter)
  56. self.add(self.switch)
  57. # Request Pads
  58. self.goodpad = self.switch.get_request_pad('sink_%u')
  59. self.failpad = self.switch.get_request_pad('sink_%u')
  60. # Set properties
  61. self.switch.set_property('active-pad', self.failpad)
  62. self.capsfilter.set_property('caps', Gst.Caps.from_string('video/x-raw,format=I420,width=640,height=480'))
  63. # Connect signal handlers
  64. self.queue.connect('underrun', self.underrun)
  65. self.queue.connect('overrun', self.overrun)
  66. self.queue.connect('running', self.running)
  67. self.queue.connect('pushing', self.pushing)
  68. # Link elements
  69. self.queue.get_static_pad('src').link(self.goodpad)
  70. self.failsrc.link(self.capsfilter)
  71. self.capsfilter.get_static_pad('src').link(self.failpad)
  72. # Add Ghost Pads
  73. self.add_pad(
  74. Gst.GhostPad.new('sink', self.queue.get_static_pad('sink'))
  75. )
  76. self.add_pad(
  77. Gst.GhostPad.new('src', self.switch.get_static_pad('src'))
  78. )
  79. # Setup signals
  80. for signalname in ('watchdog', 'switch-to-fail', 'switch-to-good', 'dead', 'reanimate'):
  81. GObject.signal_new(
  82. signalname,
  83. self,
  84. GObject.SIGNAL_RUN_LAST,
  85. GObject.TYPE_NONE,
  86. []
  87. )
  88. self.connect('reanimate', self.reanimate)
  89. # Start watchdog
  90. self.watchdog()
  91. def reanimate(self, caller=None):
  92. print('reanimate()')
  93. self.failcount = 1
  94. self.watchdogcount = 1
  95. self.dead = False
  96. def watchdog(self):
  97. if self.dead:
  98. return
  99. self.watchdogtimer = threading.Timer(0.5, self.watchdog)
  100. self.watchdogtimer.start()
  101. self.emit('watchdog')
  102. if self.failcount == 0:
  103. self.watchdogcount = 0
  104. if self.switch.get_property('active-pad') != self.goodpad:
  105. print("watchdog switching to goodpad")
  106. self.switch.set_property('active-pad', self.goodpad)
  107. self.emit('switch-to-good')
  108. else:
  109. self.watchdogcount += 1
  110. if self.switch.get_property('active-pad') != self.failpad:
  111. print("watchdog switching to failpad")
  112. self.switch.set_property('active-pad', self.failpad)
  113. self.emit('switch-to-fail')
  114. print("self.watchdogcount=", self.watchdogcount)
  115. if self.watchdogcount > 20:
  116. print("watchdog giving up -> source is dead")
  117. self.dead = True
  118. self.emit('dead')
  119. def underrun(self, queue):
  120. level = queue.get_property('current-level-buffers')
  121. failbin = queue.get_parent()
  122. if level == 0:
  123. failbin.failcount += 1
  124. print('underrun(), level=', level, 'failcount=', failbin.failcount)
  125. def overrun(self, queue):
  126. level = queue.get_property('current-level-buffers')
  127. failbin = queue.get_parent()
  128. if level > 0:
  129. failbin.failcount = 0
  130. print('overrun(), level=', level, 'failcount=', failbin.failcount)
  131. def running(self, queue):
  132. level = queue.get_property('current-level-buffers')
  133. print('running(), level=', level)
  134. def pushing(self, queue):
  135. level = queue.get_property('current-level-buffers')
  136. failbin = queue.get_parent()
  137. if level == 0:
  138. failbin.failcount += 1
  139. print('pushing(), level=', level, 'failcount=', failbin.failcount)
  140. class VideomixerWithDisplay(Gst.Bin):
  141. def __init__(self):
  142. super().__init__()
  143. # Create elements
  144. self.secondsrc = Gst.ElementFactory.make('videotestsrc', None)
  145. self.mixer = Gst.ElementFactory.make('videomixer', None)
  146. self.q1 = Gst.ElementFactory.make('queue', None)
  147. self.q2 = Gst.ElementFactory.make('queue', None)
  148. self.display = Gst.ElementFactory.make('ximagesink', None)
  149. # Add elements to Bin
  150. self.add(self.secondsrc)
  151. self.add(self.mixer)
  152. self.add(self.display)
  153. self.add(self.q1)
  154. self.add(self.q2)
  155. # Set properties
  156. self.secondsrc.set_property('pattern', 'ball')
  157. # Request Pads
  158. self.firstpad = self.mixer.get_request_pad('sink_%u')
  159. self.secondpad = self.mixer.get_request_pad('sink_%u')
  160. # Set pad-properties
  161. self.secondpad.set_property('alpha', 0.7)
  162. self.secondpad.set_property('xpos', 50)
  163. self.secondpad.set_property('ypos', 50)
  164. # Link elements
  165. self.q1.get_static_pad('src').link(self.firstpad)
  166. self.q2.get_static_pad('src').link(self.secondpad)
  167. self.secondsrc.link(self.q2)
  168. self.mixer.link(self.display)
  169. # Add Ghost Pads
  170. self.add_pad(
  171. Gst.GhostPad.new('sink', self.q1.get_static_pad('sink'))
  172. )
  173. class Example:
  174. def __init__(self):
  175. self.mainloop = GObject.MainLoop()
  176. self.pipeline = Gst.Pipeline()
  177. self.bus = self.pipeline.get_bus()
  178. self.bus.add_signal_watch()
  179. self.bus.connect('message::eos', self.on_eos)
  180. self.bus.connect('message::error', self.on_error)
  181. # Create elements
  182. self.grabbersrc = FramegrabberSource(
  183. uri='http://beachcam.kdhnc.com/mjpg/video.mjpg?camera=1')
  184. self.failover = FailoverFilter()
  185. self.mixdisplay = VideomixerWithDisplay()
  186. # Add elements to pipeline
  187. self.pipeline.add(self.grabbersrc)
  188. self.pipeline.add(self.failover)
  189. self.pipeline.add(self.mixdisplay)
  190. self.grabbersrc.link(self.failover)
  191. self.failover.link(self.mixdisplay)
  192. self.failover.connect('dead', self.grabbersrc_is_dead)
  193. def grabbersrc_is_dead(self, failover):
  194. print("grabbersrc_is_dead", failover)
  195. self.pipeline.set_state(Gst.State.PAUSED)
  196. print(self.grabbersrc.unlink(self.failover))
  197. self.pipeline.remove(self.grabbersrc)
  198. self.grabbersrc = FramegrabberSource(
  199. uri='http://beachcam.kdhnc.com/mjpg/video.mjpg?camera=1')
  200. self.pipeline.add(self.grabbersrc)
  201. print(self.grabbersrc.link(self.failover))
  202. self.failover.reanimate()
  203. self.pipeline.set_state(Gst.State.PLAYING)
  204. def run(self):
  205. self.pipeline.set_state(Gst.State.PLAYING)
  206. self.mainloop.run()
  207. def kill(self):
  208. self.pipeline.set_state(Gst.State.NULL)
  209. self.mainloop.quit()
  210. def on_eos(self, bus, msg):
  211. print('on_eos()')
  212. #self.kill()
  213. def on_error(self, bus, msg):
  214. print('on_error():', msg.parse_error())
  215. #self.kill()
  216. example = Example()
  217. example.run()