MythTV  master
DeviceReadBuffer.cpp
Go to the documentation of this file.
1 #include <algorithm>
2 using namespace std;
3 
4 #include "DeviceReadBuffer.h"
5 #include "mythcorecontext.h"
6 #include "mythbaseutil.h"
7 #include "mythlogging.h"
8 #include "tspacket.h"
9 #include "mthread.h"
10 #include "compat.h"
11 
12 #ifndef _WIN32
13 #include <sys/poll.h>
14 #endif
15 
17 #define REPORT_RING_STATS 0
18 
19 #define LOC QString("DevRdB(%1): ").arg(videodevice)
20 
22  DeviceReaderCB *cb, bool use_poll, bool error_exit_on_poll_timeout)
23  : MThread("DeviceReadBuffer"),
24  videodevice(""), _stream_fd(-1),
25  readerCB(cb),
26 
27  // Data for managing the device ringbuffer
28  dorun(false),
29  eof(false), error(false),
30  request_pause(false), paused(false),
31  using_poll(use_poll),
32  poll_timeout_is_error(error_exit_on_poll_timeout),
33  max_poll_wait(2500 /*ms*/),
34 
35  size(0), used(0),
36  read_quanta(0), dev_buffer_count(1),
37  dev_read_size(0), readThreshold(0),
38 
39  buffer(nullptr), readPtr(nullptr),
40  writePtr(nullptr), endPtr(nullptr),
41 
42  // statistics
43  max_used(0), avg_used(0),
44  avg_buf_write_cnt(0), avg_buf_read_cnt(0),
45  avg_buf_sleep_cnt(0)
46 {
47  for (int i = 0; i < 2; i++)
48  {
49  wake_pipe[i] = -1;
50  wake_pipe_flags[i] = 0;
51  }
52 
53 #ifdef USING_MINGW
54 #warning mingw DeviceReadBuffer::Poll
55  if (using_poll)
56  {
57  LOG(VB_GENERAL, LOG_WARNING, LOC +
58  "mingw DeviceReadBuffer::Poll is not implemented");
59  using_poll = false;
60  }
61 #endif
62 }
63 
65 {
66  Stop();
67  if (buffer)
68  {
69  delete[] buffer;
70  buffer = nullptr;
71  }
72 }
73 
74 bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd,
75  uint readQuanta, uint deviceBufferSize,
76  uint deviceBufferCount)
77 {
78  QMutexLocker locker(&lock);
79 
80  if (buffer)
81  delete[] buffer;
82 
83  videodevice = streamName;
84  videodevice = videodevice.isNull() ? "" : videodevice;
85  _stream_fd = streamfd;
86 
87  // Setup device ringbuffer
88  eof = false;
89  error = false;
90  request_pause = false;
91  paused = false;
92 
93  read_quanta = (readQuanta) ? readQuanta : read_quanta;
94  dev_buffer_count = deviceBufferCount;
96  "HDRingbufferSize", static_cast<int>(50 * read_quanta)) * 1024;
97  used = 0;
98  dev_read_size = read_quanta * (using_poll ? 256 : 48);
99  dev_read_size = (deviceBufferSize) ?
100  min(dev_read_size, (size_t)deviceBufferSize) : dev_read_size;
101  readThreshold = read_quanta * 128;
102 
103  buffer = new (nothrow) unsigned char[size + dev_read_size];
104  readPtr = buffer;
105  writePtr = buffer;
106  endPtr = buffer + size;
107 
108  // Initialize buffer, if it exists
109  if (!buffer)
110  {
111  LOG(VB_GENERAL, LOG_ERR, LOC +
112  QString("Failed to allocate buffer of size %1 = %2 + %3")
113  .arg(size+dev_read_size).arg(size).arg(dev_read_size));
114  return false;
115  }
116  memset(buffer, 0xFF, size + read_quanta);
117 
118  // Initialize statistics
119  max_used = 0;
120  avg_used = 0;
121  avg_buf_write_cnt = 0;
122  avg_buf_read_cnt = 0;
123  avg_buf_sleep_cnt = 0;
124  lastReport.start();
125 
126  LOG(VB_RECORD, LOG_INFO, LOC + QString("buffer size %1 KB").arg(size/1024));
127 
128  return true;
129 }
130 
132 {
133  LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- begin");
134 
135  QMutexLocker locker(&lock);
136  if (isRunning() || dorun)
137  {
138  dorun = false;
139  locker.unlock();
140  WakePoll();
141  wait();
142  locker.relock();
143  }
144 
145  dorun = true;
146  error = false;
147  eof = false;
148 
149  start();
150 
151  LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- middle");
152 
153  while (dorun && !isRunning())
154  runWait.wait(locker.mutex(), 100);
155 
156  LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- end");
157 }
158 
159 void DeviceReadBuffer::Reset(const QString &streamName, int streamfd)
160 {
161  QMutexLocker locker(&lock);
162 
163  videodevice = streamName;
164  videodevice = videodevice.isNull() ? "" : videodevice;
165  _stream_fd = streamfd;
166 
167  used = 0;
168  readPtr = buffer;
169  writePtr = buffer;
170 
171  error = false;
172 }
173 
175 {
176  LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- begin");
177  QMutexLocker locker(&lock);
178  if (isRunning() || dorun)
179  {
180  dorun = false;
181  locker.unlock();
182  WakePoll();
183  wait();
184  }
185  LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- end");
186 }
187 
189 {
190  QMutexLocker locker(&lock);
191  request_pause = req;
192  WakePoll();
193 }
194 
196 {
197  QMutexLocker locker(&lock);
198  paused = val;
199  if (val)
200  pauseWait.wakeAll();
201  else
202  unpauseWait.wakeAll();
203 }
204 
205 // The WakePoll code is copied from MythSocketThread::WakeReadyReadThread()
207 {
208  char buf[1];
209  buf[0] = '0';
210  ssize_t wret = 0;
211  while (isRunning() && (wret <= 0) && (wake_pipe[1] >= 0))
212  {
213  wret = ::write(wake_pipe[1], &buf, 1);
214  if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
215  {
216  LOG(VB_GENERAL, LOG_ERR, LOC + "WakePoll failed.");
217  ClosePipes();
218  break;
219  }
220  }
221 }
222 
224 {
225  for (uint i = 0; i < 2; i++)
226  {
227  if (wake_pipe[i] >= 0)
228  {
229  ::close(wake_pipe[i]);
230  wake_pipe[i] = -1;
231  wake_pipe_flags[i] = 0;
232  }
233  }
234 }
235 
237 {
238  QMutexLocker locker(&lock);
239  return paused;
240 }
241 
243 {
244  QMutexLocker locker(&lock);
245 
246  if (!paused)
247  pauseWait.wait(&lock, timeout);
248 
249  return paused;
250 }
251 
253 {
254  QMutexLocker locker(&lock);
255 
256  if (paused)
257  unpauseWait.wait(&lock, timeout);
258 
259  return paused;
260 }
261 
263 {
264  QMutexLocker locker(&lock);
265  return request_pause;
266 }
267 
269 {
270  QMutexLocker locker(&lock);
271  return error;
272 }
273 
274 bool DeviceReadBuffer::IsEOF(void) const
275 {
276  QMutexLocker locker(&lock);
277  return eof;
278 }
279 
281 {
282  QMutexLocker locker(&lock);
283  return isRunning();
284 }
285 
287 {
288  QMutexLocker locker(&lock);
289  return size - used;
290 }
291 
293 {
294  QMutexLocker locker(&lock);
295  return used;
296 }
297 
299 {
300  QMutexLocker locker(&lock);
301  return endPtr - writePtr;
302 }
303 
305 {
306  QMutexLocker locker(&lock);
307  used += len;
308  writePtr += len;
310 #if REPORT_RING_STATS
311  max_used = max(used, max_used);
314 #endif
315  dataWait.wakeAll();
316 }
317 
319 {
320  QMutexLocker locker(&lock);
321  used -= len;
322  readPtr += len;
323  readPtr = (readPtr == endPtr) ? buffer : readPtr;
324 #if REPORT_RING_STATS
326 #endif
327 }
328 
330 {
331  RunProlog();
332 
333  uint errcnt = 0;
334  uint cnt;
335  ssize_t len;
336  size_t read_size;
337  size_t unused;
338  size_t total;
339  size_t throttle = dev_read_size * dev_buffer_count / 2;
340 
341  lock.lock();
342  runWait.wakeAll();
343  lock.unlock();
344 
345  if (using_poll)
347 
348  while (dorun)
349  {
350  if (!HandlePausing())
351  continue;
352 
353  if (!IsOpen())
354  {
355  usleep(5000);
356  continue;
357  }
358 
359  if (using_poll && !Poll())
360  continue;
361 
362  {
363  QMutexLocker locker(&lock);
364  if (error)
365  {
366  LOG(VB_RECORD, LOG_ERR, LOC + "fill_ringbuffer: error state");
367  break;
368  }
369  }
370 
371  /* Some device drivers segment their buffer into small pieces,
372  * So allow for the reading of multiple buffers */
373  for (cnt = 0, len = 0, total = 0;
374  dorun && len >= 0 && cnt < dev_buffer_count; ++cnt)
375  {
376  // Limit read size for faster return from read
377  unused = static_cast<size_t>(WaitForUnused(read_quanta));
378  read_size = min(dev_read_size, unused);
379 
380  // if read_size > 0 do the read...
381  if (read_size)
382  {
383  len = read(_stream_fd, writePtr, read_size);
384  if (!CheckForErrors(len, read_size, errcnt))
385  break;
386  errcnt = 0;
387 
388  // if we wrote past the official end of the buffer,
389  // copy to start
390  if (writePtr + len > endPtr)
391  memcpy(buffer, endPtr, writePtr + len - endPtr);
392  IncrWritePointer(len);
393  total += len;
394  }
395  }
396  if (errcnt > 5)
397  break;
398 
399  // Slow down reading if not under load
400  if (errcnt == 0 && total < throttle)
401  usleep(1000);
402  }
403 
404  ClosePipes();
405 
406  lock.lock();
407  eof = true;
408  runWait.wakeAll();
409  dataWait.wakeAll();
410  pauseWait.wakeAll();
411  unpauseWait.wakeAll();
412  lock.unlock();
413 
414  RunEpilog();
415 }
416 
418 {
419  if (IsPauseRequested())
420  {
421  SetPaused(true);
422 
423  if (readerCB)
425 
426  usleep(5000);
427  return false;
428  }
429  else if (IsPaused())
430  {
432  SetPaused(false);
433  }
434  return true;
435 }
436 
437 bool DeviceReadBuffer::Poll(void) const
438 {
439 #ifdef _WIN32
440 # ifdef _MSC_VER
441 # pragma message( "mingw DeviceReadBuffer::Poll" )
442 # else
443 # warning mingw DeviceReadBuffer::Poll
444 # endif
445  LOG(VB_GENERAL, LOG_ERR, LOC +
446  "mingw DeviceReadBuffer::Poll is not implemented");
447  return false;
448 #else
449  bool retval = true;
450  MythTimer timer;
451  timer.start();
452 
453  int poll_cnt = 1;
454  struct pollfd polls[2];
455  memset(polls, 0, sizeof(polls));
456 
457  polls[0].fd = _stream_fd;
458  polls[0].events = POLLIN | POLLPRI;
459  polls[0].revents = 0;
460 
461  if (wake_pipe[0] >= 0)
462  {
463  poll_cnt = 2;
464  polls[1].fd = wake_pipe[0];
465  polls[1].events = POLLIN;
466  polls[1].revents = 0;
467  }
468 
469  while (true)
470  {
471  polls[0].revents = 0;
472  polls[1].revents = 0;
473  poll_cnt = (wake_pipe[0] >= 0) ? poll_cnt : 1;
474 
475  int timeout = max_poll_wait;
476  if (1 == poll_cnt)
477  timeout = 10;
478  else if (poll_timeout_is_error)
479  // subtract a bit to allow processing time.
480  timeout = max((int)max_poll_wait - timer.elapsed() - 15, 10);
481 
482  int ret = poll(polls, poll_cnt, timeout);
483 
484  if (polls[0].revents & POLLHUP)
485  {
486  LOG(VB_GENERAL, LOG_ERR, LOC + "poll eof (POLLHUP)");
487  break;
488  }
489  else if (polls[0].revents & POLLNVAL)
490  {
491  LOG(VB_GENERAL, LOG_ERR, LOC + "poll error" + ENO);
492  error = true;
493  return true;
494  }
495 
496  if (!dorun || !IsOpen() || IsPauseRequested())
497  {
498  retval = false;
499  break; // are we supposed to pause, stop, etc.
500  }
501 
502  if (polls[0].revents & POLLPRI)
503  {
504  readerCB->PriorityEvent(polls[0].fd);
505  }
506 
507  if (polls[0].revents & POLLIN)
508  {
509  if (ret > 0)
510  break; // we have data to read :)
511  else if (ret < 0)
512  {
513  if ((EOVERFLOW == errno))
514  break; // we have an error to handle
515 
516  if ((EAGAIN == errno) || (EINTR == errno))
517  continue; // errors that tell you to try again
518 
519  usleep(2500 /*2.5 ms*/);
520  }
521  else // ret == 0
522  {
523  if (poll_timeout_is_error &&
524  (timer.elapsed() >= (int)max_poll_wait))
525  {
526  LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 1");
527  QMutexLocker locker(&lock);
528  error = true;
529  return true;
530  }
531  }
532  }
533 
534  // Clear out any pending pipe reads
535  if ((poll_cnt > 1) && (polls[1].revents & POLLIN))
536  {
537  char dummy[128];
538  int cnt = (wake_pipe_flags[0] & O_NONBLOCK) ? 128 : 1;
539  cnt = ::read(wake_pipe[0], dummy, cnt);
540  }
541 
542  if (poll_timeout_is_error && (timer.elapsed() >= (int)max_poll_wait))
543  {
544  LOG(VB_GENERAL, LOG_ERR, LOC + QString("Poll giving up after %1ms")
545  .arg(max_poll_wait));
546  QMutexLocker locker(&lock);
547  error = true;
548  return true;
549  }
550  }
551 
552  int e = timer.elapsed();
553  if (e > (int)max_poll_wait)
554  {
555  LOG(VB_GENERAL, LOG_WARNING, LOC +
556  QString("Poll took an unusually long time %1 ms")
557  .arg(timer.elapsed()));
558  }
559 
560  return retval;
561 #endif
562 }
563 
565  ssize_t len, size_t requested_len, uint &errcnt)
566 {
567  if (len > (ssize_t)requested_len)
568  {
569  LOG(VB_GENERAL, LOG_ERR, LOC +
570  "Driver is returning bogus values on read");
571  if (++errcnt > 5)
572  {
573  LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
574  QMutexLocker locker(&lock);
575  error = true;
576  }
577  return false;
578  }
579 
580 #ifdef _WIN32
581 # ifdef _MSC_VER
582 # pragma message( "mingw DeviceReadBuffer::CheckForErrors" )
583 # else
584 # warning mingw DeviceReadBuffer::CheckForErrors
585 # endif
586  LOG(VB_GENERAL, LOG_ERR, LOC +
587  "mingw DeviceReadBuffer::CheckForErrors is not implemented");
588  return false;
589 #else
590  if (len < 0)
591  {
592  if (EINTR == errno)
593  return false;
594  if (EAGAIN == errno)
595  {
596  usleep(2500);
597  return false;
598  }
599  if (EOVERFLOW == errno)
600  {
601  LOG(VB_GENERAL, LOG_ERR, LOC + "Driver buffers overflowed");
602  return false;
603  }
604 
605  LOG(VB_GENERAL, LOG_ERR, LOC +
606  QString("Problem reading fd(%1)").arg(_stream_fd) + ENO);
607 
608  if (++errcnt > 5)
609  {
610  LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
611  QMutexLocker locker(&lock);
612  error = true;
613  return false;
614  }
615 
616  usleep(500);
617  return false;
618  }
619  else if (len == 0)
620  {
621  if (++errcnt > 5)
622  {
623  LOG(VB_GENERAL, LOG_ERR, LOC +
624  QString("End-Of-File? fd(%1)").arg(_stream_fd));
625 
626  lock.lock();
627  eof = true;
628  lock.unlock();
629 
630  return false;
631  }
632  usleep(500);
633  return false;
634  }
635  return true;
636 #endif
637 }
638 
645 uint DeviceReadBuffer::Read(unsigned char *buf, const uint count)
646 {
647  uint avail = WaitForUsed(min(count, (uint)readThreshold), 20);
648  size_t cnt = min(count, avail);
649 
650  if (!cnt)
651  return 0;
652 
653  if (readPtr + cnt > endPtr)
654  {
655  // Process as two pieces
656  size_t len = endPtr - readPtr;
657  if (len)
658  {
659  memcpy(buf, readPtr, len);
660  buf += len;
661  IncrReadPointer(len);
662  }
663  if (cnt > len)
664  {
665  len = cnt - len;
666  memcpy(buf, readPtr, len);
667  IncrReadPointer(len);
668  }
669  }
670  else
671  {
672  memcpy(buf, readPtr, cnt);
673  IncrReadPointer(cnt);
674  }
675 
676 #if REPORT_RING_STATS
677  ReportStats();
678 #endif
679 
680  return cnt;
681 }
682 
688 {
689  size_t unused = GetUnused();
690 
691  if (unused > read_quanta)
692  {
693  while (unused < needed)
694  {
695  unused = GetUnused();
696  if (IsPauseRequested() || !IsOpen() || !dorun)
697  return 0;
698  usleep(5000);
699  }
700  if (IsPauseRequested() || !IsOpen() || !dorun)
701  return 0;
702  unused = GetUnused();
703  }
704 
705  return unused;
706 }
707 
714 {
715  MythTimer timer;
716  timer.start();
717 
718  QMutexLocker locker(&lock);
719  size_t avail = used;
720  while ((needed > avail) && isRunning() &&
721  !request_pause && !error && !eof &&
722  (timer.elapsed() < (int)max_wait))
723  {
724  dataWait.wait(locker.mutex(), 10);
725  avail = used;
726  }
727  return avail;
728 }
729 
731 {
732 #if REPORT_RING_STATS
733  static const int secs = 20;
734  static const double d1_s = 1.0 / secs;
735  if (lastReport.elapsed() > secs * 1000 /* msg every 20 seconds */)
736  {
737  QMutexLocker locker(&lock);
738  double rsize = 100.0 / size;
739  QString msg = QString("fill avg(%1%) ").arg(avg_used*rsize,5,'f',2);
740  msg += QString("fill max(%1%) ").arg(max_used*rsize,5,'f',2);
741  msg += QString("writes/sec(%1) ").arg(avg_buf_write_cnt*d1_s);
742  msg += QString("reads/sec(%1) ").arg(avg_buf_read_cnt*d1_s);
743  msg += QString("sleeps/sec(%1)").arg(avg_buf_sleep_cnt*d1_s);
744 
745  avg_used = 0;
746  avg_buf_write_cnt = 0;
747  avg_buf_read_cnt = 0;
748  avg_buf_sleep_cnt = 0;
749  max_used = 0;
750  lastReport.start();
751 
752  LOG(VB_GENERAL, LOG_INFO, LOC + msg);
753  }
754 #endif
755 }
756 
757 /*
758  * vim:ts=4:sw=4:ai:et:si:sts=4
759  */
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:216
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:295
def write(text, progress=True)
Definition: mythburn.py:279
bool IsRunning(void) const
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:46
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
#define O_NONBLOCK
Definition: mythmedia.cpp:25
bool IsEOF(void) const
QWaitCondition runWait
void ClosePipes(void) const
uint GetContiguousUnused(void) const
static void error(const char *str,...)
Definition: vbi.c:41
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
bool IsOpen(void) const
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:312
void WakePoll(void) const
unsigned int uint
Definition: compat.h:140
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
void IncrReadPointer(uint len)
QWaitCondition pauseWait
static void setup_pipe(int[2], long[2])
Definition: mythbaseutil.h:16
bool IsErrored(void) const
def read(device=None, features=[])
Definition: disc.py:35
unsigned char * writePtr
bool IsPaused(void) const
uint Read(unsigned char *buf, uint count)
Try to Read count bytes from into buffer.
#define LOC
VERBOSE_PREAMBLE false
Definition: verbosedefs.h:85
#define close
Definition: compat.h:16
unsigned char * buffer
virtual void ReaderPaused(int fd)=0
uint WaitForUsed(uint needed, uint max_wait) const
unsigned char * endPtr
unsigned char * readPtr
bool WaitForPaused(unsigned long timeout)
void SetRequestPause(bool request)
bool isRunning(void) const
Definition: mthread.cpp:275
uint WaitForUnused(uint needed) const
virtual void PriorityEvent(int fd)=0
bool WaitForUnpause(unsigned long timeout)
bool Poll(void) const
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
DeviceReaderCB * readerCB
int elapsed(void) const
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
void IncrWritePointer(uint len)
int GetNumSetting(const QString &key, int defaultval=0)
void Reset(const QString &streamName, int streamfd)
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
bool Setup(const QString &streamName, int streamfd, uint readQuanta=sizeof(TSPacket), uint deviceBufferSize=0, uint deviceBufferCount=1)
QWaitCondition unpauseWait
bool CheckForErrors(ssize_t read_len, size_t requested_len, uint &err_cnt)
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:203
uint GetUsed(void) const
DeviceReadBuffer(DeviceReaderCB *callback, bool use_poll=true, bool error_exit_on_poll_timeout=true)
uint GetUnused(void) const
static void usleep(unsigned long time)
Definition: mthread.cpp:349
volatile bool dorun
bool IsPauseRequested(void) const
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
QWaitCondition dataWait