MythTV  master
ringbuffer.cpp
Go to the documentation of this file.
1 // ANSI C headers
2 #include <cmath>
3 #include <cstdio>
4 #include <cstdlib>
5 #include <cerrno>
6 #include <chrono> // for milliseconds
7 #include <thread> // for sleep_for
8 
9 // POSIX C headers
10 #include <sys/types.h>
11 #include <sys/time.h>
12 #include <unistd.h>
13 #include <fcntl.h>
14 
15 // Qt headers
16 #include <QFile>
17 #include <QDateTime>
18 #include <QReadLocker>
19 
20 #include "threadedfilewriter.h"
21 #include "fileringbuffer.h"
22 #include "streamingringbuffer.h"
23 #include "mythmiscutil.h"
24 #include "dvdstream.h"
25 #include "livetvchain.h"
26 #include "mythcontext.h"
27 #include "ringbuffer.h"
28 #include "mythconfig.h"
29 #include "remotefile.h"
30 #include "compat.h"
31 #include "mythdate.h"
32 #include "mythtimer.h"
33 #include "mythlogging.h"
34 #include "DVD/dvdringbuffer.h"
35 #include "Bluray/bdringbuffer.h"
37 #include "mythcdrom.h"
38 
39 // about one second at 35mbit
40 #define BUFFER_SIZE_MINIMUM (4 * 1024 * 1024)
41 #define BUFFER_FACTOR_NETWORK 2
42 #define BUFFER_FACTOR_BITRATE 2
43 #define BUFFER_FACTOR_MATROSKA 2
44 
45 const int RingBuffer::kDefaultOpenTimeout = 2000; // ms
46 const int RingBuffer::kLiveTVOpenTimeout = 10000;
47 
48 #define CHUNK 32768 /* readblocksize increments */
49 
50 #define LOC QString("RingBuf(%1): ").arg(filename)
51 
53 QStringList RingBuffer::subExt;
54 QStringList RingBuffer::subExtNoCheck;
55 
56 extern "C" {
57 #include "libavformat/avformat.h"
58 }
60 
61 /*
62  Locking relations:
63  rwlock->poslock->rbrlock->rbwlock
64 
65  A child should never lock any of the parents without locking
66  the parent lock before the child lock.
67  void RingBuffer::Example1()
68  {
69  poslock.lockForWrite();
70  rwlock.lockForRead(); // error!
71  blah(); // <- does not implicitly aquire any locks
72  rwlock.unlock();
73  poslock.unlock();
74  }
75  void RingBuffer::Example2()
76  {
77  rwlock.lockForRead();
78  rbrlock.lockForWrite(); // ok!
79  blah(); // <- does not implicitly aquire any locks
80  rbrlock.unlock();
81  rwlock.unlock();
82  }
83 */
84 
113  const QString &xfilename, bool write,
114  bool usereadahead, int timeout_ms, bool stream_only)
115 {
116  QString lfilename = xfilename;
117  QString lower = lfilename.toLower();
118 
119  if (write)
120  return new FileRingBuffer(lfilename, write, usereadahead, timeout_ms);
121 
122  bool dvddir = false;
123  bool bddir = false;
124  bool httpurl = lower.startsWith("http://") || lower.startsWith("https://");
125  bool iptvurl =
126  lower.startsWith("rtp://") || lower.startsWith("tcp://") ||
127  lower.startsWith("udp://");
128  bool mythurl = lower.startsWith("myth://");
129  bool bdurl = lower.startsWith("bd:");
130  bool dvdurl = lower.startsWith("dvd:");
131  bool imgext = lower.endsWith(".img") || lower.endsWith(".iso");
132 
133  if (imgext)
134  {
135  switch (MythCDROM::inspectImage(lfilename))
136  {
137  case MythCDROM::kBluray:
138  bdurl = true;
139  break;
140 
141  case MythCDROM::kDVD:
142  dvdurl = true;
143  break;
144 
145  default:
146  break;
147  }
148  }
149 
150  if (httpurl || iptvurl)
151  {
152  if (!iptvurl && HLSRingBuffer::TestForHTTPLiveStreaming(lfilename))
153  {
154  return new HLSRingBuffer(lfilename);
155  }
156  return new StreamingRingBuffer(lfilename);
157  }
158  if (!stream_only && mythurl)
159  {
160  struct stat fileInfo;
161  if ((RemoteFile::Exists(lfilename, &fileInfo)) &&
162  (S_ISDIR(fileInfo.st_mode)))
163  {
164  if (RemoteFile::Exists(lfilename + "/VIDEO_TS"))
165  dvddir = true;
166  else if (RemoteFile::Exists(lfilename + "/BDMV"))
167  bddir = true;
168  }
169  }
170  else if (!stream_only && !mythurl)
171  {
172  if (QFile::exists(lfilename + "/VIDEO_TS"))
173  dvddir = true;
174  else if (QFile::exists(lfilename + "/BDMV"))
175  bddir = true;
176  }
177 
178  if (!stream_only && (dvdurl || dvddir))
179  {
180  if (lfilename.startsWith("dvd:")) // URI "dvd:" + path
181  lfilename.remove(0,4); // e.g. "dvd:/dev/dvd"
182 
183  if (!(mythurl || QFile::exists(lfilename)))
184  lfilename = "/dev/dvd";
185  LOG(VB_PLAYBACK, LOG_INFO, "Trying DVD at " + lfilename);
186 
187  return new DVDRingBuffer(lfilename);
188  }
189  else if (!stream_only && (bdurl || bddir))
190  {
191  if (lfilename.startsWith("bd:")) // URI "bd:" + path
192  lfilename.remove(0,3); // e.g. "bd:/videos/ET"
193 
194  if (!(mythurl || QFile::exists(lfilename)))
195  lfilename = "/dev/dvd";
196  LOG(VB_PLAYBACK, LOG_INFO, "Trying BD at " + lfilename);
197 
198  return new BDRingBuffer(lfilename);
199  }
200 
201  if (!mythurl && imgext && lfilename.startsWith("dvd:"))
202  {
203  LOG(VB_PLAYBACK, LOG_INFO, "DVD image at " + lfilename);
204  return new DVDStream(lfilename);
205  }
206  if (!mythurl && lower.endsWith(".vob") && lfilename.contains("/VIDEO_TS/"))
207  {
208  LOG(VB_PLAYBACK, LOG_INFO, "DVD VOB at " + lfilename);
209  DVDStream *s = new DVDStream(lfilename);
210  if (s && s->IsOpen())
211  return s;
212 
213  delete s;
214  }
215 
216  return new FileRingBuffer(
217  lfilename, write, usereadahead, timeout_ms);
218 }
219 
221  MThread("RingBuffer"),
222  type(rbtype),
223  readpos(0), writepos(0),
224  internalreadpos(0), ignorereadpos(-1),
225  rbrpos(0), rbwpos(0),
226  stopreads(false), safefilename(QString()),
227  filename(), subtitlefilename(),
228  tfw(nullptr), fd2(-1),
229  writemode(false), remotefile(nullptr),
230  bufferSize(BUFFER_SIZE_MINIMUM),
231  low_buffers(false),
232  fileismatroska(false), unknownbitrate(false),
233  startreadahead(false), readAheadBuffer(nullptr),
234  readaheadrunning(false), reallyrunning(false),
235  request_pause(false), paused(false),
236  ateof(false),
237  waitforwrite(false), beingwritten(false),
238  readsallowed(false), readsdesired(false),
239  recentseek(true),
240  setswitchtonext(false),
241  rawbitrate(8000), playspeed(1.0f),
242  fill_threshold(65536), fill_min(-1),
243  readblocksize(CHUNK), wanttoread(0),
244  numfailures(0), commserror(false),
245  oldfile(false), livetvchain(nullptr),
246  ignoreliveeof(false), readAdjust(0),
247  readOffset(0), readInternalMode(false),
248  bitrateMonitorEnabled(false),
249  bitrateInitialized(false)
250 {
251  {
252  QMutexLocker locker(&subExtLock);
253  if (subExt.empty())
254  {
255  // Possible subtitle file extensions '.srt', '.sub', '.txt'
256  // since #9294 also .ass and .ssa
257  subExt += ".ass";
258  subExt += ".srt";
259  subExt += ".ssa";
260  subExt += ".sub";
261  subExt += ".txt";
262 
263  // Extensions for which a subtitle file should not exist
265  subExtNoCheck += ".gif";
266  subExtNoCheck += ".png";
267  }
268  }
269 }
270 
271 #undef NDEBUG
272 #include <cassert>
273 
282 {
283  assert(!isRunning());
284  wait();
285 
286  delete [] readAheadBuffer;
287  readAheadBuffer = nullptr;
288 
289  if (tfw)
290  {
291  tfw->Flush();
292  delete tfw;
293  tfw = nullptr;
294  }
295 }
296 
300 void RingBuffer::Reset(bool full, bool toAdjust, bool resetInternal)
301 {
302  LOG(VB_FILE, LOG_INFO, LOC + QString("Reset(%1,%2,%3)")
303  .arg(full).arg(toAdjust).arg(resetInternal));
304 
305  rwlock.lockForWrite();
306  poslock.lockForWrite();
307 
308  numfailures = 0;
309  commserror = false;
310  setswitchtonext = false;
311 
312  writepos = 0;
313  readpos = (toAdjust) ? (readpos - readAdjust) : 0;
314 
315  if (readpos != 0)
316  {
317  LOG(VB_GENERAL, LOG_ERR, LOC +
318  QString("RingBuffer::Reset() nonzero readpos. toAdjust: %1 "
319  "readpos: %2 readAdjust: %3")
320  .arg(toAdjust).arg(readpos).arg(readAdjust));
321  }
322 
323  readAdjust = 0;
324  readpos = (readpos < 0) ? 0 : readpos;
325 
326  if (full)
328 
329  if (resetInternal)
331 
332  generalWait.wakeAll();
333  poslock.unlock();
334  rwlock.unlock();
335 }
336 
343 {
344  LOG(VB_FILE, LOG_INFO, LOC +
345  QString("UpdateRawBitrate(%1Kb)").arg(raw_bitrate));
346 
347  // an audio only stream could be as low as 64Kb (DVB radio) and
348  // an MHEG only stream is likely to be reported as 0Kb
349  if (raw_bitrate < 64)
350  {
351  LOG(VB_FILE, LOG_INFO, LOC +
352  QString("Bitrate too low - setting to 64Kb"));
353  raw_bitrate = 64;
354  }
355  else if (raw_bitrate > 100000)
356  {
357  LOG(VB_FILE, LOG_INFO, LOC +
358  QString("Bitrate too high - setting to 100Mb"));
359  raw_bitrate = 100000;
360  }
361 
362  rwlock.lockForWrite();
363  rawbitrate = raw_bitrate;
365  bitrateInitialized = true;
366  rwlock.unlock();
367 }
368 
373 void RingBuffer::UpdatePlaySpeed(float play_speed)
374 {
375  rwlock.lockForWrite();
376  playspeed = play_speed;
378  rwlock.unlock();
379 }
380 
386 void RingBuffer::SetBufferSizeFactors(bool estbitrate, bool matroska)
387 {
388  rwlock.lockForWrite();
389  unknownbitrate = estbitrate;
390  fileismatroska = matroska;
391  rwlock.unlock();
393 }
394 
403 {
404  uint estbitrate = 0;
405 
406  readsallowed = false;
407  readsdesired = false;
408 
409  // loop without sleeping if the buffered data is less than this
410  fill_threshold = 7 * bufferSize / 8;
411 
412  const uint KB2 = 2*1024;
413  const uint KB4 = 4*1024;
414  const uint KB8 = 8*1024;
415  const uint KB16 = 16*1024;
416  const uint KB32 = 32*1024;
417  const uint KB64 = 64*1024;
418  const uint KB128 = 128*1024;
419  const uint KB256 = 256*1024;
420  const uint KB512 = 512*1024;
421 
422  estbitrate = (uint) max(abs(rawbitrate * playspeed),
423  0.5f * rawbitrate);
424  estbitrate = min(rawbitrate * 3, estbitrate);
425  int const rbs = (estbitrate > 18000) ? KB512 :
426  (estbitrate > 9000) ? KB256 :
427  (estbitrate > 5000) ? KB128 :
428  (estbitrate > 2500) ? KB64 :
429  (estbitrate > 1250) ? KB32 :
430  (estbitrate >= 500) ? KB16 :
431  (estbitrate > 250) ? KB8 :
432  (estbitrate > 125) ? KB4 : KB2;
433  if (rbs < CHUNK)
434  {
435  readblocksize = rbs;
436  }
437  else
438  {
440  }
441 
442  // minumum seconds of buffering before allowing read
443  float secs_min = 0.3;
444  // set the minimum buffering before allowing ffmpeg read
445  fill_min = (uint) ((estbitrate * 1000 * secs_min) * 0.125f);
446  // make this a multiple of ffmpeg block size..
447  if (fill_min >= CHUNK || rbs >= CHUNK)
448  {
449  if (low_buffers)
450  {
451  LOG(VB_GENERAL, LOG_INFO, LOC +
452  "Buffering optimisations disabled.");
453  }
454  low_buffers = false;
455  fill_min = ((fill_min / CHUNK) + 1) * CHUNK;
456  fill_min = min((uint)fill_min, bufferSize / 2);
457  }
458  else
459  {
460  low_buffers = true;
461  LOG(VB_GENERAL, LOG_WARNING, "Enabling buffering optimisations "
462  "for low bitrate stream.");
463  }
464 
465  LOG(VB_FILE, LOG_INFO, LOC +
466  QString("CalcReadAheadThresh(%1 Kb)\n\t\t\t -> "
467  "threshhold(%2 KB) min read(%3 KB) blk size(%4 KB)")
468  .arg(estbitrate).arg(fill_threshold/1024)
469  .arg(fill_min/1024).arg(readblocksize/1024));
470 }
471 
472 bool RingBuffer::IsNearEnd(double /*fps*/, uint vvf) const
473 {
474  QReadLocker lock(&rwlock);
475 
476  if (!ateof && !setswitchtonext)
477  {
478  // file is still being read, so can't be finished
479  return false;
480  }
481 
482  poslock.lockForRead();
483  long long rp = readpos;
484  long long sz = internalreadpos - readpos;
485  poslock.unlock();
486 
487  // telecom kilobytes (i.e. 1000 per k not 1024)
488  uint tmp = (uint) max(abs(rawbitrate * playspeed), 0.5f * rawbitrate);
489  uint kbits_per_sec = min(rawbitrate * 3, tmp);
490  if (kbits_per_sec == 0)
491  return false;
492 
493  double readahead_time = sz / (kbits_per_sec * (1000.0/8.0));
494 
495  bool near_end = readahead_time <= 1.5;
496  LOG(VB_PLAYBACK, LOG_INFO, LOC + "IsReallyNearEnd()" +
497  QString(" br(%1KB)").arg(kbits_per_sec/8) +
498  QString(" sz(%1KB)").arg(sz / 1000LL) +
499  QString(" vfl(%1)").arg(vvf) +
500  QString(" time(%1)").arg(readahead_time) +
501  QString(" rawbitrate(%1)").arg(rawbitrate) +
502  QString(" avail(%1)").arg(sz) +
503  QString(" internal_size(%1)").arg(internalreadpos) +
504  QString(" readposition(%1)").arg(rp) +
505  QString(" stopreads(%1)").arg(stopreads) +
506  QString(" paused(%1)").arg(paused) +
507  QString(" ne:%1").arg(near_end));
508 
509  return near_end;
510 }
511 
514 int RingBuffer::ReadBufFree(void) const
515 {
516  rbrlock.lockForRead();
517  rbwlock.lockForRead();
518  int ret = ((rbwpos >= rbrpos) ? rbrpos + bufferSize : rbrpos) - rbwpos - 1;
519  rbwlock.unlock();
520  rbrlock.unlock();
521  return ret;
522 }
523 
526 {
527  QReadLocker lock(&rwlock);
528 
529  return ReadBufAvail();
530 }
531 
532 long long RingBuffer::GetRealFileSize(void) const
533 {
534  {
535  QReadLocker lock(&rwlock);
536  if (readInternalMode)
537  {
538  return ReadBufAvail();
539  }
540  }
541 
542  return GetRealFileSizeInternal();
543 }
544 
545 long long RingBuffer::Seek(long long pos, int whence, bool has_lock)
546 {
547  LOG(VB_FILE, LOG_INFO, LOC + QString("Seek(%1,%2,%3)")
548  .arg(pos).arg((SEEK_SET==whence)?"SEEK_SET":
549  ((SEEK_CUR==whence)?"SEEK_CUR":"SEEK_END"))
550  .arg(has_lock?"locked":"unlocked"));
551 
552  if (!has_lock)
553  {
554  rwlock.lockForWrite();
555  }
556 
557  long long ret;
558 
559  if (readInternalMode)
560  {
561  poslock.lockForWrite();
562  // only valid for SEEK_SET & SEEK_CUR
563  switch (whence)
564  {
565  case SEEK_SET:
566  readpos = pos;
567  break;
568  case SEEK_CUR:
569  readpos += pos;
570  break;
571  case SEEK_END:
572  readpos = ReadBufAvail() - pos;
573  break;
574  }
576  poslock.unlock();
577  ret = readpos;
578  }
579  else
580  {
581  ret = SeekInternal(pos, whence);
582  }
583 
584  generalWait.wakeAll();
585 
586  if (!has_lock)
587  {
588  rwlock.unlock();
589  }
590  return ret;
591 }
592 
594 {
595  QWriteLocker lock(&rwlock);
596  bool old = readInternalMode;
597 
598  if (mode == old)
599  {
600  return old;
601  }
602 
603  readInternalMode = mode;
604 
605  if (!mode)
606  {
607  // adjust real read position in ringbuffer
608  rbrlock.lockForWrite();
610  generalWait.wakeAll();
611  rbrlock.unlock();
612  // reset the read offset as we are exiting the internal read mode
613  readOffset = 0;
614  }
615 
616  LOG(VB_FILE, LOG_DEBUG, LOC +
617  QString("SetReadInternalMode: %1").arg(mode ? "on" : "off"));
618 
619  return old;
620 }
621 
625 {
626  rbrlock.lockForRead();
627  rbwlock.lockForRead();
628  int ret = (rbwpos >= rbrpos) ? rbwpos - rbrpos : bufferSize - rbrpos + rbwpos;
629  rbwlock.unlock();
630  rbrlock.unlock();
631  return ret;
632 }
633 
645 void RingBuffer::ResetReadAhead(long long newinternal)
646 {
647  LOG(VB_FILE, LOG_INFO, LOC +
648  QString("ResetReadAhead(internalreadpos = %1->%2)")
649  .arg(internalreadpos).arg(newinternal));
650 
651  readInternalMode = false;
652  readOffset = 0;
653 
654  rbrlock.lockForWrite();
655  rbwlock.lockForWrite();
656 
658 
659  rbrpos = 0;
660  rbwpos = 0;
661  internalreadpos = newinternal;
662  ateof = false;
663  readsallowed = false;
664  readsdesired = false;
665  recentseek = true;
666  setswitchtonext = false;
667 
668  generalWait.wakeAll();
669 
670  rbwlock.unlock();
671  rbrlock.unlock();
672 }
673 
689 {
690  bool do_start = true;
691 
692  rwlock.lockForWrite();
693  if (!startreadahead)
694  {
695  do_start = false;
696  }
697  else if (writemode)
698  {
699  LOG(VB_GENERAL, LOG_WARNING, LOC + "Not starting read ahead thread, "
700  "this is a write only RingBuffer");
701  do_start = false;
702  }
703  else if (readaheadrunning)
704  {
705  LOG(VB_GENERAL, LOG_WARNING, LOC + "Not starting read ahead thread, "
706  "already running");
707  do_start = false;
708  }
709 
710  if (!do_start)
711  {
712  rwlock.unlock();
713  return;
714  }
715 
716  StartReads();
717 
718  MThread::start();
719 
720  while (!readaheadrunning && !reallyrunning)
721  generalWait.wait(&rwlock);
722 
723  rwlock.unlock();
724 }
725 
730 {
731  while (isRunning())
732  {
733  rwlock.lockForWrite();
734  readaheadrunning = false;
735  StopReads();
736  generalWait.wakeAll();
737  rwlock.unlock();
738  MThread::wait(5000);
739  }
740 }
741 
747 {
748  LOG(VB_FILE, LOG_INFO, LOC + "StopReads()");
749  stopreads = true;
750  generalWait.wakeAll();
751 }
752 
758 {
759  LOG(VB_FILE, LOG_INFO, LOC + "StartReads()");
760  stopreads = false;
761  generalWait.wakeAll();
762 }
763 
769 {
770  LOG(VB_FILE, LOG_INFO, LOC + "Pause()");
771  StopReads();
772 
773  rwlock.lockForWrite();
774  request_pause = true;
775  rwlock.unlock();
776 }
777 
783 {
784  LOG(VB_FILE, LOG_INFO, LOC + "Unpause()");
785  StartReads();
786 
787  rwlock.lockForWrite();
788  request_pause = false;
789  generalWait.wakeAll();
790  rwlock.unlock();
791 }
792 
794 bool RingBuffer::isPaused(void) const
795 {
796  rwlock.lockForRead();
797  bool ret = !readaheadrunning || paused;
798  rwlock.unlock();
799  return ret;
800 }
801 
806 {
807  MythTimer t;
808  t.start();
809 
810  rwlock.lockForRead();
811  while (readaheadrunning && !paused && request_pause)
812  {
813  generalWait.wait(&rwlock, 1000);
814  if (readaheadrunning && !paused && request_pause && t.elapsed() > 1000)
815  {
816  LOG(VB_GENERAL, LOG_WARNING, LOC +
817  QString("Waited %1 ms for ringbuffer pause..")
818  .arg(t.elapsed()));
819  }
820  }
821  rwlock.unlock();
822 }
823 
825 {
826  const uint timeout = 500; // ms
827 
828  if (request_pause)
829  {
830  if (!paused)
831  {
832  rwlock.unlock();
833  rwlock.lockForWrite();
834 
835  if (request_pause)
836  {
837  paused = true;
838  generalWait.wakeAll();
839  }
840 
841  rwlock.unlock();
842  rwlock.lockForRead();
843  }
844 
846  generalWait.wait(&rwlock, timeout);
847  }
848 
849  if (!request_pause && paused)
850  {
851  rwlock.unlock();
852  rwlock.lockForWrite();
853 
854  if (!request_pause)
855  {
856  paused = false;
857  generalWait.wakeAll();
858  }
859 
860  rwlock.unlock();
861  rwlock.lockForRead();
862  }
863 
864  return request_pause || paused;
865 }
866 
868 {
869  rwlock.lockForWrite();
870  poslock.lockForWrite();
871 
872  uint oldsize = bufferSize;
873  uint newsize = BUFFER_SIZE_MINIMUM;
874  if (remotefile)
875  {
876  newsize *= BUFFER_FACTOR_NETWORK;
877  if (fileismatroska)
878  newsize *= BUFFER_FACTOR_MATROSKA;
879  if (unknownbitrate)
880  newsize *= BUFFER_FACTOR_BITRATE;
881  }
882 
883  // N.B. Don't try and make it smaller - bad things happen...
884  if (readAheadBuffer && oldsize >= newsize)
885  {
886  poslock.unlock();
887  rwlock.unlock();
888  return;
889  }
890 
891  bufferSize = newsize;
892  if (readAheadBuffer)
893  {
894  char* newbuffer = new char[bufferSize + 1024];
895  memcpy(newbuffer, readAheadBuffer + rbwpos, oldsize - rbwpos);
896  memcpy(newbuffer + (oldsize - rbwpos), readAheadBuffer, rbwpos);
897  delete [] readAheadBuffer;
898  readAheadBuffer = newbuffer;
899  rbrpos = (rbrpos > rbwpos) ? (rbrpos - rbwpos) :
900  (rbrpos + oldsize - rbwpos);
901  rbwpos = oldsize;
902  }
903  else
904  {
905  readAheadBuffer = new char[bufferSize + 1024];
906  }
908  poslock.unlock();
909  rwlock.unlock();
910 
911  LOG(VB_FILE, LOG_INFO, LOC + QString("Created readAheadBuffer: %1Mb")
912  .arg(newsize >> 20));
913 }
914 
915 void RingBuffer::run(void)
916 {
917  RunProlog();
918 
919  // These variables are used to adjust the read block size
920  struct timeval lastread, now;
921  int readtimeavg = 300;
922  bool ignore_for_read_timing = true;
923  int eofreads = 0;
924 
925  gettimeofday(&lastread, nullptr); // this is just to keep gcc happy
926 
928  rwlock.lockForWrite();
929  poslock.lockForWrite();
930  request_pause = false;
931  ResetReadAhead(0);
932  readaheadrunning = true;
933  reallyrunning = true;
934  generalWait.wakeAll();
935  poslock.unlock();
936  rwlock.unlock();
937 
938  // NOTE: this must loop at some point hold only
939  // a read lock on rwlock, so that other functions
940  // such as reset and seek can take priority.
941 
942  rwlock.lockForRead();
943 
944  LOG(VB_FILE, LOG_INFO, LOC +
945  QString("Initial readblocksize %1K & fill_min %2K")
946  .arg(readblocksize/1024).arg(fill_min/1024));
947 
948  while (readaheadrunning)
949  {
950  rwlock.unlock();
951  bool isopened = IsOpen();
952  rwlock.lockForRead();
953 
954  if (!isopened)
955  {
956  LOG(VB_FILE, LOG_WARNING, LOC +
957  QString("File not opened, terminating readahead thread"));
958  poslock.lockForWrite();
959  readaheadrunning = false;
960  generalWait.wakeAll();
961  poslock.unlock();
962  break;
963  }
964  if (PauseAndWait())
965  {
966  ignore_for_read_timing = true;
967  LOG(VB_FILE, LOG_DEBUG, LOC +
968  "run: PauseAndWait Not reading continuing");
969  continue;
970  }
971 
972  long long totfree = ReadBufFree();
973 
974  const uint KB32 = 32*1024;
975  const int KB512 = 512*1024;
976  // These are conditions where we don't want to go through
977  // the loop if they are true.
978  if (((totfree < KB32) && readsallowed) ||
979  (ignorereadpos >= 0) || commserror || stopreads)
980  {
981  ignore_for_read_timing |=
982  (ignorereadpos >= 0) || commserror || stopreads;
983  generalWait.wait(&rwlock, (stopreads) ? 50 : 1000);
984  LOG(VB_FILE, LOG_DEBUG, LOC +
985  QString("run: Not reading continuing: totfree(%1) "
986  "readsallowed(%2) ignorereadpos(%3) commserror(%4) "
987  "stopreads(%5)")
988  .arg(totfree).arg(readsallowed).arg(ignorereadpos)
989  .arg(commserror).arg(stopreads));
990  continue;
991  }
992 
993  // These are conditions where we want to sleep to allow
994  // other threads to do stuff.
995  if (setswitchtonext || (ateof && readsdesired))
996  {
997  ignore_for_read_timing = true;
998  generalWait.wait(&rwlock, 1000);
999  totfree = ReadBufFree();
1000  }
1001 
1002  int read_return = -1;
1003  if (totfree >= KB32 && !commserror &&
1004  !ateof && !setswitchtonext)
1005  {
1006  // limit the read size
1007  if (readblocksize > totfree)
1008  totfree = (totfree / KB32) * KB32; // must be multiple of 32KB
1009  else
1010  totfree = readblocksize;
1011 
1012  // adapt blocksize
1013  gettimeofday(&now, nullptr);
1014  if (!ignore_for_read_timing)
1015  {
1016  int readinterval = (now.tv_sec - lastread.tv_sec ) * 1000 +
1017  (now.tv_usec - lastread.tv_usec) / 1000;
1018  readtimeavg = (readtimeavg * 9 + readinterval) / 10;
1019 
1020  if (readtimeavg < 150 &&
1022  readblocksize >= CHUNK /* low_buffers */ &&
1023  readblocksize <= KB512)
1024  {
1025  int old_block_size = readblocksize;
1026  readblocksize = 3 * readblocksize / 2;
1028  if (readblocksize > KB512)
1029  {
1030  readblocksize = KB512;
1031  }
1032  LOG(VB_FILE, LOG_INFO, LOC +
1033  QString("Avg read interval was %1 msec. "
1034  "%2K -> %3K block size")
1035  .arg(readtimeavg)
1036  .arg(old_block_size/1024)
1037  .arg(readblocksize/1024));
1038  readtimeavg = 225;
1039  }
1040  else if (readtimeavg > 300 && readblocksize > CHUNK)
1041  {
1042  readblocksize -= CHUNK;
1043  LOG(VB_FILE, LOG_INFO, LOC +
1044  QString("Avg read interval was %1 msec. "
1045  "%2K -> %3K block size")
1046  .arg(readtimeavg)
1047  .arg((readblocksize+CHUNK)/1024)
1048  .arg(readblocksize/1024));
1049  readtimeavg = 225;
1050  }
1051  }
1052  lastread = now;
1053 
1054  rbwlock.lockForRead();
1055  if (rbwpos + totfree > bufferSize)
1056  {
1057  totfree = bufferSize - rbwpos;
1058  LOG(VB_FILE, LOG_DEBUG, LOC +
1059  "Shrinking read, near end of buffer");
1060  }
1061 
1062  if (internalreadpos == 0)
1063  {
1064  totfree = max(fill_min, readblocksize);
1065  LOG(VB_FILE, LOG_DEBUG, LOC +
1066  "Reading enough data to start playback");
1067  }
1068 
1069  LOG(VB_FILE, LOG_DEBUG, LOC +
1070  QString("safe_read(...@%1, %2) -- begin")
1071  .arg(rbwpos).arg(totfree));
1072 
1073  MythTimer sr_timer;
1074  sr_timer.start();
1075 
1076  int rbwposcopy = rbwpos;
1077 
1078  // FileRingBuffer::safe_read(RemoteFile*...) acquires poslock;
1079  // so we need to unlock this here to preserve locking order.
1080  rbwlock.unlock();
1081 
1082  read_return = safe_read(readAheadBuffer + rbwposcopy, totfree);
1083 
1084  int sr_elapsed = sr_timer.elapsed();
1085  uint64_t bps = !sr_elapsed ? 1000000001 :
1086  (uint64_t)(((double)read_return * 8000.0) /
1087  (double)sr_elapsed);
1088  LOG(VB_FILE, LOG_DEBUG, LOC +
1089  QString("safe_read(...@%1, %2) -> %3, took %4 ms %5 avg %6 ms")
1090  .arg(rbwposcopy).arg(totfree).arg(read_return)
1091  .arg(sr_elapsed)
1092  .arg(QString("(%1Mbps)").arg((double)bps / 1000000.0))
1093  .arg(readtimeavg));
1094  UpdateStorageRate(bps);
1095 
1096  if (read_return >= 0)
1097  {
1098  poslock.lockForWrite();
1099  rbwlock.lockForWrite();
1100 
1101  if (rbwposcopy == rbwpos)
1102  {
1103  internalreadpos += read_return;
1104  rbwpos = (rbwpos + read_return) % bufferSize;
1105  LOG(VB_FILE, LOG_DEBUG,
1106  LOC + QString("rbwpos += %1K requested %2K in read")
1107  .arg(read_return/1024,3).arg(totfree/1024,3));
1108  }
1109  numfailures = 0;
1110 
1111  rbwlock.unlock();
1112  poslock.unlock();
1113 
1114  LOG(VB_FILE, LOG_DEBUG, LOC +
1115  QString("total read so far: %1 bytes")
1116  .arg(internalreadpos));
1117  }
1118  }
1119  else
1120  {
1121  LOG(VB_FILE, LOG_DEBUG, LOC +
1122  QString("We are not reading anything "
1123  "(totfree: %1 commserror:%2 ateof:%3 "
1124  "setswitchtonext:%4")
1125  .arg(totfree).arg(commserror).arg(ateof).arg(setswitchtonext));
1126  }
1127 
1128  int used = bufferSize - ReadBufFree();
1129 
1130  bool reads_were_allowed = readsallowed;
1131 
1132  ignore_for_read_timing =
1133  ((totfree < readblocksize) || (read_return < totfree)) ? true : false;
1134 
1135  if ((0 == read_return) || (numfailures > 5) ||
1136  (readsallowed != (used >= 1 || ateof ||
1137  setswitchtonext || commserror)) ||
1138  (readsdesired != (used >= fill_min || ateof ||
1140  {
1141  // If readpos changes while the lock is released
1142  // we should not handle the 0 read_return now.
1143  long long old_readpos = readpos;
1144 
1145  rwlock.unlock();
1146  rwlock.lockForWrite();
1147 
1148  commserror |= (numfailures > 5);
1149 
1150  readsallowed = used >= 1 || ateof || setswitchtonext || commserror;
1151  readsdesired =
1152  used >= fill_min || ateof || setswitchtonext || commserror;
1153 
1154  if (0 == read_return && old_readpos == readpos)
1155  {
1156  eofreads++;
1157  if (eofreads >= 3 && readblocksize >= KB512)
1158  {
1159  // not reading anything
1160  readblocksize = CHUNK;
1162  }
1163 
1164  if (livetvchain)
1165  {
1166  if (!setswitchtonext && !ignoreliveeof &&
1167  livetvchain->HasNext())
1168  {
1169  // we receive new livetv chain element event
1170  // before we receive file closed for writing event
1171  // so don't need to test if file is closed for writing
1172  livetvchain->SwitchToNext(true);
1173  setswitchtonext = true;
1174  }
1176  {
1177  LOG(VB_FILE, LOG_DEBUG, LOC +
1178  QString("EOF encountered, but %1 still being written to")
1179  .arg(filename));
1180  // We reached EOF, but file still open for writing and
1181  // no next program in livetvchain
1182  // wait a little bit (60ms same wait as typical safe_read)
1183  generalWait.wait(&rwlock, 60);
1184  }
1185  }
1187  {
1188  LOG(VB_FILE, LOG_DEBUG, LOC +
1189  QString("EOF encountered, but %1 still being written to")
1190  .arg(filename));
1191  // We reached EOF, but file still open for writing,
1192  // typically active in-progress recording
1193  // wait a little bit (60ms same wait as typical safe_read)
1194  generalWait.wait(&rwlock, 60);
1195  beingwritten = true;
1196  }
1197  else
1198  {
1199  if (waitforwrite && !beingwritten)
1200  {
1201  LOG(VB_FILE, LOG_DEBUG, LOC +
1202  "Waiting for file to grow large enough to process.");
1203  generalWait.wait(&rwlock, 300);
1204  }
1205  else
1206  {
1207  LOG(VB_FILE, LOG_DEBUG,
1208  LOC + "setting ateof (read_return == 0)");
1209  ateof = true;
1210  }
1211  }
1212  }
1213 
1214  rwlock.unlock();
1215  rwlock.lockForRead();
1216  used = bufferSize - ReadBufFree();
1217  }
1218  else
1219  {
1220  eofreads = 0;
1221  }
1222 
1223  LOG(VB_FILE, LOG_DEBUG, LOC + "@ end of read ahead loop");
1224 
1225  if (!readsallowed || commserror || ateof || setswitchtonext ||
1226  (wanttoread <= used && wanttoread > 0))
1227  {
1228  // To give other threads a good chance to handle these
1229  // conditions, even if they are only requesting a read lock
1230  // like us, yield (currently implemented with short usleep).
1231  generalWait.wakeAll();
1232  rwlock.unlock();
1233  std::this_thread::sleep_for(std::chrono::milliseconds(5));
1234  rwlock.lockForRead();
1235  }
1236  else
1237  {
1238  // yield if we have nothing to do...
1239  if (!request_pause && reads_were_allowed &&
1240  (used >= fill_threshold || ateof || setswitchtonext))
1241  {
1242  generalWait.wait(&rwlock, 50);
1243  }
1244  else if (readsallowed)
1245  { // if reads are allowed release the lock and yield so the
1246  // reader gets a chance to read before the buffer is full.
1247  generalWait.wakeAll();
1248  rwlock.unlock();
1249  std::this_thread::sleep_for(std::chrono::milliseconds(5));
1250  rwlock.lockForRead();
1251  }
1252  }
1253  }
1254 
1255  rwlock.unlock();
1256 
1257  rwlock.lockForWrite();
1258  rbrlock.lockForWrite();
1259  rbwlock.lockForWrite();
1260 
1261  delete [] readAheadBuffer;
1262 
1263  readAheadBuffer = nullptr;
1264  rbrpos = 0;
1265  rbwpos = 0;
1266  reallyrunning = false;
1267  readsallowed = false;
1268  readsdesired = false;
1269 
1270  rbwlock.unlock();
1271  rbrlock.unlock();
1272  rwlock.unlock();
1273 
1274  LOG(VB_FILE, LOG_INFO, LOC + QString("Exiting readahead thread"));
1275 
1276  RunEpilog();
1277 }
1278 
1280 {
1281  rwlock.lockForWrite();
1282  poslock.lockForRead();
1284  long long ra = readAdjust;
1285  poslock.unlock();
1286  rwlock.unlock();
1287  return ra;
1288 }
1289 
1290 int RingBuffer::Peek(void *buf, int count)
1291 {
1292  int ret = ReadPriv(buf, count, true);
1293  if (ret != count)
1294  {
1295  LOG(VB_GENERAL, LOG_WARNING, LOC +
1296  QString("Peek() requested %1 bytes, but only returning %2")
1297  .arg(count).arg(ret));
1298  }
1299  return ret;
1300 }
1301 
1303 {
1304  // Wait up to 30000 ms for reads allowed (or readsdesired if post seek/open)
1305  bool &check = (recentseek || readInternalMode) ? readsdesired : readsallowed;
1306  recentseek = false;
1307  int timeout_ms = 30000;
1308  int count = 0;
1309  MythTimer t;
1310  t.start();
1311 
1312  while ((t.elapsed() < timeout_ms) && !check && !stopreads &&
1314  {
1315  generalWait.wait(&rwlock, clamp(timeout_ms - t.elapsed(), 10, 100));
1316  if (!check && t.elapsed() > 1000 && (count % 100) == 0)
1317  {
1318  LOG(VB_GENERAL, LOG_WARNING, LOC +
1319  "Taking too long to be allowed to read..");
1320  }
1321  count++;
1322  }
1323  if (t.elapsed() >= timeout_ms)
1324  {
1325  LOG(VB_GENERAL, LOG_ERR, LOC +
1326  QString("Took more than %1 seconds to be allowed to read, aborting.")
1327  .arg(timeout_ms / 1000));
1328  return false;
1329  }
1330  return check;
1331 }
1332 
1334 {
1335  int avail = ReadBufAvail();
1336  if (avail >= count)
1337  return avail;
1338 
1339  count = (ateof && avail < count) ? avail : count;
1340 
1341  if (livetvchain && setswitchtonext && avail < count)
1342  {
1343  return avail;
1344  }
1345 
1346  // Make sure that if the read ahead thread is sleeping and
1347  // it should be reading that we start reading right away.
1348  if ((avail < count) && !stopreads &&
1350  {
1351  generalWait.wakeAll();
1352  }
1353 
1354  MythTimer t;
1355  t.start();
1356  while ((avail < count) && !stopreads &&
1358  {
1359  wanttoread = count;
1360  generalWait.wait(&rwlock, clamp(timeout - t.elapsed(), 10, 250));
1361  avail = ReadBufAvail();
1362  if (ateof)
1363  break;
1364  if (low_buffers && avail >= fill_min)
1365  break;
1366  if (t.elapsed() > timeout)
1367  break;
1368  }
1369 
1370  wanttoread = 0;
1371 
1372  return avail;
1373 }
1374 
1375 int RingBuffer::ReadDirect(void *buf, int count, bool peek)
1376 {
1377  long long old_pos = 0;
1378  if (peek)
1379  {
1380  poslock.lockForRead();
1381  old_pos = (ignorereadpos >= 0) ? ignorereadpos : readpos;
1382  poslock.unlock();
1383  }
1384 
1385  MythTimer timer;
1386  timer.start();
1387  int ret = safe_read(buf, count);
1388  int elapsed = timer.elapsed();
1389  uint64_t bps = !elapsed ? 1000000001 :
1390  (uint64_t)(((float)ret * 8000.0f) / (float)elapsed);
1391  UpdateStorageRate(bps);
1392 
1393  poslock.lockForWrite();
1394  if (ignorereadpos >= 0 && ret > 0)
1395  {
1396  if (peek)
1397  {
1398  // seek should always succeed since we were at this position
1399  long long cur_pos = -1;
1400  if (remotefile)
1401  cur_pos = remotefile->Seek(old_pos, SEEK_SET);
1402  else if (fd2 >= 0)
1403  cur_pos = lseek64(fd2, old_pos, SEEK_SET);
1404  if (cur_pos < 0)
1405  {
1406  LOG(VB_FILE, LOG_ERR, LOC +
1407  "Seek failed repositioning to previous position");
1408  }
1409  }
1410  else
1411  {
1412  ignorereadpos += ret;
1413  }
1414  poslock.unlock();
1415  return ret;
1416  }
1417  poslock.unlock();
1418 
1419  if (peek && ret > 0)
1420  {
1421  if ((IsDVD() || IsBD()) && old_pos != 0)
1422  {
1423  LOG(VB_GENERAL, LOG_ERR, LOC +
1424  "DVD and Blu-Ray do not support arbitrary "
1425  "peeks except when read-ahead is enabled."
1426  "\n\t\t\tWill seek to beginning of video.");
1427  old_pos = 0;
1428  }
1429 
1430  long long new_pos = Seek(old_pos, SEEK_SET, true);
1431 
1432  if (new_pos != old_pos)
1433  {
1434  LOG(VB_GENERAL, LOG_ERR, LOC +
1435  QString("Peek() Failed to return from new "
1436  "position %1 to old position %2, now "
1437  "at position %3")
1438  .arg(old_pos - ret).arg(old_pos).arg(new_pos));
1439  }
1440  }
1441 
1442  return ret;
1443 }
1444 
1453 int RingBuffer::ReadPriv(void *buf, int count, bool peek)
1454 {
1455  QString loc_desc = QString("ReadPriv(..%1, %2)")
1456  .arg(count).arg(peek?"peek":"normal");
1457  LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
1458  QString(" @%1 -- begin").arg(rbrpos));
1459 
1460  rwlock.lockForRead();
1461  if (writemode)
1462  {
1463  LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
1464  ": Attempt to read from a write only file");
1465  errno = EBADF;
1466  rwlock.unlock();
1467  return -1;
1468  }
1469 
1471  {
1472  rwlock.unlock();
1473  rwlock.lockForWrite();
1474  // we need a write lock so the read-ahead thread
1475  // can't start mucking with the read position.
1476  // If the read ahead thread was started while we
1477  // didn't hold the lock, we proceed with a normal
1478  // read from the buffer, otherwise we read directly.
1479  if (request_pause || stopreads ||
1480  !readaheadrunning || (ignorereadpos >= 0))
1481  {
1482  int ret = ReadDirect(buf, count, peek);
1483  LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
1484  QString(": ReadDirect checksum %1")
1485  .arg(qChecksum((char*)buf,count)));
1486  rwlock.unlock();
1487  return ret;
1488  }
1489  rwlock.unlock();
1490  rwlock.lockForRead();
1491  }
1492 
1493  if (!WaitForReadsAllowed())
1494  {
1495  LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForReadsAllowed()");
1496  rwlock.unlock();
1497  stopreads = true; // this needs to be outside the lock
1498  rwlock.lockForWrite();
1499  wanttoread = 0;
1500  rwlock.unlock();
1501  return 0;
1502  }
1503 
1504  int avail = ReadBufAvail();
1506 
1507  // Wait up to 10000 ms for any data
1508  int timeout_ms = 10000;
1509  while (!readInternalMode && !ateof &&
1510  (t.elapsed() < timeout_ms) && readaheadrunning &&
1512  {
1513  avail = WaitForAvail(count, min(timeout_ms - t.elapsed(), 100));
1514  if (livetvchain && setswitchtonext && avail < count)
1515  {
1516  LOG(VB_GENERAL, LOG_INFO, LOC +
1517  "Checking to see if there's a new livetv program to switch to..");
1519  break;
1520  }
1521  if (avail > 0)
1522  break;
1523  }
1524  if (t.elapsed() > 6000)
1525  {
1526  LOG(VB_GENERAL, LOG_WARNING, LOC + loc_desc +
1527  QString(" -- waited %1 ms for avail(%2) > count(%3)")
1528  .arg(t.elapsed()).arg(avail).arg(count));
1529  }
1530 
1531  if (readInternalMode)
1532  {
1533  LOG(VB_FILE, LOG_DEBUG, LOC +
1534  QString("ReadPriv: %1 bytes available, %2 left")
1535  .arg(avail).arg(avail-readOffset));
1536  }
1537  count = min(avail - readOffset, count);
1538 
1539  if ((count <= 0) && (ateof || readInternalMode))
1540  {
1541  // If we're at the end of file return 0 bytes
1542  rwlock.unlock();
1543  return count;
1544  }
1545  else if (count <= 0)
1546  {
1547  // If we're not at the end of file but have no data
1548  // at this point time out and shutdown read ahead.
1549  LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
1550  QString(" -- timed out waiting for data (%1 ms)")
1551  .arg(t.elapsed()));
1552 
1553  rwlock.unlock();
1554  stopreads = true; // this needs to be outside the lock
1555  rwlock.lockForWrite();
1556  ateof = true;
1557  wanttoread = 0;
1558  generalWait.wakeAll();
1559  rwlock.unlock();
1560  return count;
1561  }
1562 
1563  if (peek || readInternalMode)
1564  rbrlock.lockForRead();
1565  else
1566  rbrlock.lockForWrite();
1567 
1568  LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + " -- copying data");
1569 
1570  int rpos;
1571  if (rbrpos + readOffset > (int) bufferSize)
1572  {
1573  rpos = (rbrpos + readOffset) - bufferSize;
1574  }
1575  else
1576  {
1577  rpos = rbrpos + readOffset;
1578  }
1579  if (rpos + count > (int) bufferSize)
1580  {
1581  int firstsize = bufferSize - rpos;
1582  int secondsize = count - firstsize;
1583 
1584  memcpy(buf, readAheadBuffer + rpos, firstsize);
1585  memcpy((char *)buf + firstsize, readAheadBuffer, secondsize);
1586  }
1587  else
1588  {
1589  memcpy(buf, readAheadBuffer + rpos, count);
1590  }
1591  LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + QString(" -- checksum %1")
1592  .arg(qChecksum((char*)buf,count)));
1593 
1594  if (!peek)
1595  {
1596  if (readInternalMode)
1597  {
1598  readOffset += count;
1599  }
1600  else
1601  {
1602  rbrpos = (rbrpos + count) % bufferSize;
1603  generalWait.wakeAll();
1604  }
1605  }
1606  rbrlock.unlock();
1607  rwlock.unlock();
1608 
1609  return count;
1610 }
1611 
1620 int RingBuffer::Read(void *buf, int count)
1621 {
1622  int ret = ReadPriv(buf, count, false);
1623  if (ret > 0)
1624  {
1625  poslock.lockForWrite();
1626  readpos += ret;
1627  poslock.unlock();
1628  UpdateDecoderRate(ret);
1629  }
1630 
1631  return ret;
1632 }
1633 
1634 QString RingBuffer::BitrateToString(uint64_t rate, bool hz)
1635 {
1636  QString msg;
1637  float bitrate;
1638  int range = 0;
1639  if (rate < 1)
1640  {
1641  return "-";
1642  }
1643  else if (rate > 1000000000)
1644  {
1645  return QObject::tr(">1Gbps");
1646  }
1647  else if (rate >= 1000000)
1648  {
1649  msg = hz ? QObject::tr("%1MHz") : QObject::tr("%1Mbps");
1650  bitrate = (float)rate / (1000000.0f);
1651  range = hz ? 3 : 1;
1652  }
1653  else if (rate >= 1000)
1654  {
1655  msg = hz ? QObject::tr("%1kHz") : QObject::tr("%1kbps");
1656  bitrate = (float)rate / 1000.0f;
1657  range = hz ? 1 : 0;
1658  }
1659  else
1660  {
1661  msg = hz ? QObject::tr("%1Hz") : QObject::tr("%1bps");
1662  bitrate = (float)rate;
1663  }
1664  return msg.arg(bitrate, 0, 'f', range);
1665 }
1666 
1668 {
1670 }
1671 
1673 {
1675 }
1676 
1678 {
1679  if (type == kRingBuffer_DVD || type == kRingBuffer_BD)
1680  return "N/A";
1681 
1682  int avail = (rbwpos >= rbrpos) ? rbwpos - rbrpos : bufferSize - rbrpos + rbwpos;
1683  return QString("%1%").arg(lroundf((float)avail / (float)bufferSize * 100.0f));
1684 }
1685 
1686 uint64_t RingBuffer::UpdateDecoderRate(uint64_t latest)
1687 {
1688  if (!bitrateMonitorEnabled)
1689  return 0;
1690 
1691  // TODO use QDateTime once we've moved to Qt 4.7
1692  static QTime midnight = QTime(0, 0, 0);
1693  QTime now = QTime::currentTime();
1694  qint64 age = midnight.msecsTo(now);
1695  qint64 oldest = age - 1000;
1696 
1697  decoderReadLock.lock();
1698  if (latest)
1699  decoderReads.insert(age, latest);
1700 
1701  uint64_t total = 0;
1702  QMutableMapIterator<qint64,uint64_t> it(decoderReads);
1703  while (it.hasNext())
1704  {
1705  it.next();
1706  if (it.key() < oldest || it.key() > age)
1707  it.remove();
1708  else
1709  total += it.value();
1710  }
1711 
1712  uint64_t average = (uint64_t)((double)total * 8.0);
1713  decoderReadLock.unlock();
1714 
1715  LOG(VB_FILE, LOG_INFO, LOC + QString("Decoder read speed: %1 %2")
1716  .arg(average).arg(decoderReads.size()));
1717  return average;
1718 }
1719 
1720 uint64_t RingBuffer::UpdateStorageRate(uint64_t latest)
1721 {
1722  if (!bitrateMonitorEnabled)
1723  return 0;
1724 
1725  // TODO use QDateTime once we've moved to Qt 4.7
1726  static QTime midnight = QTime(0, 0, 0);
1727  QTime now = QTime::currentTime();
1728  qint64 age = midnight.msecsTo(now);
1729  qint64 oldest = age - 1000;
1730 
1731  storageReadLock.lock();
1732  if (latest)
1733  storageReads.insert(age, latest);
1734 
1735  uint64_t total = 0;
1736  QMutableMapIterator<qint64,uint64_t> it(storageReads);
1737  while (it.hasNext())
1738  {
1739  it.next();
1740  if (it.key() < oldest || it.key() > age)
1741  it.remove();
1742  else
1743  total += it.value();
1744  }
1745 
1746  int size = storageReads.size();
1747  storageReadLock.unlock();
1748 
1749  uint64_t average = size ? (uint64_t)(((double)total) / (double)size) : 0;
1750 
1751  LOG(VB_FILE, LOG_INFO, LOC + QString("Average storage read speed: %1 %2")
1752  .arg(average).arg(storageReads.size()));
1753  return average;
1754 }
1755 
1760 int RingBuffer::Write(const void *buf, uint count)
1761 {
1762  rwlock.lockForRead();
1763 
1764  if (!writemode)
1765  {
1766  LOG(VB_GENERAL, LOG_ERR, LOC + "Tried to write to a read only file.");
1767  rwlock.unlock();
1768  return -1;
1769  }
1770 
1771  if (!tfw && !remotefile)
1772  {
1773  rwlock.unlock();
1774  return -1;
1775  }
1776 
1777  int ret = -1;
1778  if (tfw)
1779  ret = tfw->Write(buf, count);
1780  else
1781  ret = remotefile->Write(buf, count);
1782 
1783  if (ret > 0)
1784  {
1785  poslock.lockForWrite();
1786  writepos += ret;
1787  poslock.unlock();
1788  }
1789 
1790  rwlock.unlock();
1791 
1792  return ret;
1793 }
1794 
1799 {
1800  rwlock.lockForRead();
1801  if (tfw)
1802  tfw->Sync();
1803  rwlock.unlock();
1804 }
1805 
1808 long long RingBuffer::WriterSeek(long long pos, int whence, bool has_lock)
1809 {
1810  long long ret = -1;
1811 
1812  if (!has_lock)
1813  rwlock.lockForRead();
1814 
1815  poslock.lockForWrite();
1816 
1817  if (tfw)
1818  {
1819  ret = tfw->Seek(pos, whence);
1820  writepos = ret;
1821  }
1822 
1823  poslock.unlock();
1824 
1825  if (!has_lock)
1826  rwlock.unlock();
1827 
1828  return ret;
1829 }
1830 
1835 {
1836  rwlock.lockForRead();
1837  if (tfw)
1838  tfw->Flush();
1839  rwlock.unlock();
1840 }
1841 
1846 {
1847  rwlock.lockForRead();
1848  if (tfw)
1849  tfw->SetWriteBufferMinWriteSize(newMinSize);
1850  rwlock.unlock();
1851 }
1852 
1857 {
1858  QReadLocker lock(&rwlock);
1859 
1860  if (tfw)
1861  return tfw->SetBlocking(block);
1862  return false;
1863 }
1864 
1880 void RingBuffer::SetOldFile(bool is_old)
1881 {
1882  LOG(VB_FILE, LOG_INFO, LOC + QString("SetOldFile(%1)").arg(is_old));
1883  rwlock.lockForWrite();
1884  oldfile = is_old;
1885  rwlock.unlock();
1886 }
1887 
1889 QString RingBuffer::GetFilename(void) const
1890 {
1891  rwlock.lockForRead();
1892  QString tmp = filename;
1893  rwlock.unlock();
1894  return tmp;
1895 }
1896 
1898 {
1899  rwlock.lockForRead();
1900  QString tmp = subtitlefilename;
1901  rwlock.unlock();
1902  return tmp;
1903 }
1904 
1905 QString RingBuffer::GetLastError(void) const
1906 {
1907  rwlock.lockForRead();
1908  QString tmp = lastError;
1909  rwlock.unlock();
1910  return tmp;
1911 }
1912 
1916 long long RingBuffer::GetWritePosition(void) const
1917 {
1918  poslock.lockForRead();
1919  long long ret = writepos;
1920  poslock.unlock();
1921  return ret;
1922 }
1923 
1928 bool RingBuffer::LiveMode(void) const
1929 {
1930  rwlock.lockForRead();
1931  bool ret = (livetvchain);
1932  rwlock.unlock();
1933  return ret;
1934 }
1935 
1941 {
1942  rwlock.lockForWrite();
1943  livetvchain = chain;
1944  rwlock.unlock();
1945 }
1946 
1948 void RingBuffer::IgnoreLiveEOF(bool ignore)
1949 {
1950  rwlock.lockForWrite();
1951  ignoreliveeof = ignore;
1952  rwlock.unlock();
1953 }
1954 
1955 const DVDRingBuffer *RingBuffer::DVD(void) const
1956 {
1957  return dynamic_cast<const DVDRingBuffer*>(this);
1958 }
1959 
1960 const BDRingBuffer *RingBuffer::BD(void) const
1961 {
1962  return dynamic_cast<const BDRingBuffer*>(this);
1963 }
1964 
1966 {
1967  return dynamic_cast<DVDRingBuffer*>(this);
1968 }
1969 
1971 {
1972  return dynamic_cast<BDRingBuffer*>(this);
1973 }
1974 
1976 {
1977  QMutexLocker lock(avcodeclock);
1978 
1980  {
1981  avformat_network_init();
1983  }
1984 }
1985 
1986 /* vim: set expandtab tabstop=4 shiftwidth=4: */
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:216
static const int kDefaultOpenTimeout
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:295
def write(text, progress=True)
Definition: mythburn.py:279
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:46
void Pause(void)
Pauses the read-ahead thread.
Definition: ringbuffer.cpp:768
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
const DVDRingBuffer * DVD(void) const
int GetReadBufAvail() const
Returns number of bytes available for reading from buffer.
Definition: ringbuffer.cpp:525
static QString BitrateToString(uint64_t rate, bool hz=false)
VERBOSE_PREAMBLE Most true
Definition: verbosedefs.h:91
bool LiveMode(void) const
Returns true if this RingBuffer has been assigned a LiveTVChain.
void WriterFlush(void)
Calls ThreadedFileWriter::Flush(void)
QString GetLastError(void) const
#define BUFFER_FACTOR_BITRATE
Definition: ringbuffer.cpp:42
static bool TestForHTTPLiveStreaming(const QString &filename)
bool WriterSetBlocking(bool lock=true)
Calls ThreadedFileWriter::SetBlocking(bool)
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:312
uint64_t UpdateStorageRate(uint64_t latest=0)
void SetLiveMode(LiveTVChain *chain)
Assigns a LiveTVChain to this RingBuffer.
void StartReads(void)
????
Definition: ringbuffer.cpp:757
uint64_t UpdateDecoderRate(uint64_t latest=0)
QString GetFilename(void) const
Returns name of file used by this RingBuffer.
volatile bool recentseek
#define LOC
Definition: ringbuffer.cpp:50
unsigned int uint
Definition: compat.h:140
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
static RingBuffer * Create(const QString &xfilename, bool write, bool usereadahead=true, int timeout_ms=kDefaultOpenTimeout, bool stream_only=false)
Creates a RingBuffer instance.
Definition: ringbuffer.cpp:112
virtual long long SeekInternal(long long pos, int whence)=0
bool HasNext(void) const
QReadWriteLock rwlock
long long Seek(long long pos, int whence)
Seek to a position within stream; May be unsafe.
QString GetDecoderRate(void)
QString GetStorageRate(void)
volatile bool stopreads
static guint32 * tmp
Definition: goom_core.c:35
void CalcReadAheadThresh(void)
Calculates fill_min, fill_threshold, and readblocksize from the estimated effective bitrate of the st...
Definition: ringbuffer.cpp:402
long long SetAdjustFilesize(void)
void SetBufferSizeFactors(bool estbitrate, bool matroska)
Tells RingBuffer that the raw bitrate may be innacurate and the underlying container is matroska,...
Definition: ringbuffer.cpp:386
QReadWriteLock rbrlock
int ReadPriv(void *buf, int count, bool peek)
When possible reads from the read-ahead buffer, otherwise reads directly from the device.
bool IsDVD(void) const
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
long long WriterSeek(long long pos, int whence, bool has_lock=false)
Calls ThreadedFileWriter::Seek(long long,int).
void ReloadAll(const QStringList &data=QStringList())
bool PauseAndWait(void)
Definition: ringbuffer.cpp:824
VERBOSE_PREAMBLE false
Definition: verbosedefs.h:85
static bool gAVformat_net_initialised
#define BUFFER_SIZE_MINIMUM
Definition: ringbuffer.cpp:40
QReadWriteLock poslock
QString GetAvailableBuffer(void)
#define BUFFER_FACTOR_NETWORK
Definition: ringbuffer.cpp:41
static void AVFormatInitNetwork(void)
void Sync(void)
Calls ThreadedFileWriter::Sync(void)
void KillReadAheadThread(void)
Stops the read-ahead thread, and waits for it to stop.
Definition: ringbuffer.cpp:729
int ReadDirect(void *buf, int count, bool peek)
void IgnoreLiveEOF(bool ignore)
Tells RingBuffer whether to ignore the end-of-file.
unsigned char t
Definition: ParseText.cpp:340
virtual ~RingBuffer()=0
Deletes.
Definition: ringbuffer.cpp:281
bool IsOpen(void) const override
Returns true if open for either reading or writing.
Definition: dvdstream.cpp:159
void Flush(void)
Allow DiskLoop() to flush buffer completely ignoring low watermark.
bool isRunning(void) const
Definition: mthread.cpp:275
void SetOldFile(bool is_old)
Tell RingBuffer if this is an old file or not.
static const int kLiveTVOpenTimeout
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: ringbuffer.cpp:915
virtual bool IsOpen(void) const =0
Returns true if open for either reading or writing.
void SetWriteBufferMinWriteSize(uint newMinSize=kMinWriteSize)
Sets the minumum number of bytes to write to disk in a single write.
int ReadBufFree(void) const
Returns number of bytes available for reading into buffer.
Definition: ringbuffer.cpp:514
bool IsBD(void) const
int Peek(void *buf, int count)
float clamp(float val, float minimum, float maximum)
Definition: mythmiscutil.h:41
#define BUFFER_FACTOR_MATROSKA
Definition: ringbuffer.cpp:43
virtual int safe_read(void *data, uint sz)=0
void Start(void)
Starts the read-ahead thread.
Definition: ringbuffer.cpp:688
int elapsed(void) const
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
void WaitForPause(void)
Waits for Pause(void) to take effect.
Definition: ringbuffer.cpp:805
void StopReads(void)
????
Definition: ringbuffer.cpp:746
static bool Exists(const QString &url, struct stat *fileinfo)
Definition: remotefile.cpp:468
long long GetWritePosition(void) const
Returns how far into a ThreadedFileWriter file we have written.
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
static QStringList subExtNoCheck
void Unpause(void)
Unpauses the read-ahead thread.
Definition: ringbuffer.cpp:782
LiveTVChain * livetvchain
#define assert(x)
QMutex * avcodeclock
This global variable is used to makes certain calls to avlib threadsafe.
bool WaitForReadsAllowed(void)
ThreadedFileWriter * tfw
QString GetSubtitleFilename(void) const
static QMutex subExtLock
#define CHUNK
Definition: ringbuffer.cpp:48
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:203
QWaitCondition generalWait
Condition to signal that the read ahead thread is running.
QMap< qint64, uint64_t > storageReads
long long GetRealFileSize(void) const
Returns the size of the file we are reading/writing, or -1 if the query fails.
Definition: ringbuffer.cpp:532
void Reset(bool full=false, bool toAdjust=false, bool resetInternal=false)
Resets the read-ahead thread and our position in the file.
Definition: ringbuffer.cpp:300
int Read(void *buf, int count)
This is the public method for reading from a file, it calls the appropriate read method if the file i...
void Sync(void)
Flush data written to the file descriptor to disk.
bool SetReadInternalMode(bool mode)
Definition: ringbuffer.cpp:593
bool IsNearEnd(double fps, uint vvf) const
Definition: ringbuffer.cpp:472
static ImageType inspectImage(const QString &path)
Definition: mythcdrom.cpp:179
int Write(const void *buf, uint count)
Writes buffer to ThreadedFileWriter::Write(const void*,uint)
void SetWriteBufferMinWriteSize(int newMinSize)
Calls ThreadedFileWriter::SetWriteBufferMinWriteSize(int)
void ResetReadAhead(long long newinternal)
Restart the read-ahead thread at the 'newinternal' position.
Definition: ringbuffer.cpp:645
Implements a file/stream reader/writer.
long long Seek(long long pos, int whence, long long curpos=-1)
Definition: remotefile.cpp:766
QReadWriteLock rbwlock
bool IsRegisteredFileForWrite(const QString &file)
static QStringList subExt
void SwitchToNext(bool up)
Sets the recording to switch to.
void UpdateRawBitrate(uint raw_bitrate)
Set the raw bit rate, to allow RingBuffer adjust effective bitrate.
Definition: ringbuffer.cpp:342
QMap< qint64, uint64_t > decoderReads
long long Seek(long long pos, int whence, bool has_lock=false)
Seeks to a particular position in the file.
Definition: ringbuffer.cpp:545
const BDRingBuffer * BD(void) const
int Write(const void *data, int size)
Definition: remotefile.cpp:845
void UpdatePlaySpeed(float play_speed)
Set the play speed, to allow RingBuffer adjust effective bitrate.
Definition: ringbuffer.cpp:373
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
Keeps track of recordings in a current LiveTV instance.
Definition: livetvchain.h:31
virtual long long GetRealFileSizeInternal(void) const
int WaitForAvail(int count, int timeout)
bool SetBlocking(bool block=true)
Set write blocking mode While in blocking mode, ThreadedFileWriter::Write will wait for buffers to be...
Stream content from a DVD image file.
Definition: dvdstream.h:20
void CreateReadAheadBuffer(void)
Definition: ringbuffer.cpp:867
bool isPaused(void) const
Returns false iff read-ahead is not running and read-ahead is not paused.
Definition: ringbuffer.cpp:794
RingBuffer(RingBufferType rbtype)
Definition: ringbuffer.cpp:220
int ReadBufAvail(void) const
Returns number of bytes available for reading from buffer.
Definition: ringbuffer.cpp:624