MythTV  master
threadedfilewriter.cpp
Go to the documentation of this file.
1 // C++ headers
2 #include <cerrno>
3 #include <csignal>
4 #include <cstdio>
5 #include <cstdlib>
6 #include <cstring>
7 #include <fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 
12 // Qt headers
13 #include <QString>
14 
15 // MythTV headers
16 #include "threadedfilewriter.h"
17 #include "mythlogging.h"
18 #include "mythcorecontext.h"
19 
20 #include "mythtimer.h"
21 #include "compat.h"
22 #include "mythdate.h"
23 
24 #define LOC QString("TFW(%1:%2): ").arg(filename).arg(fd)
25 
28 {
29  RunProlog();
30  m_parent->DiskLoop();
31  RunEpilog();
32 }
33 
36 {
37  RunProlog();
38  m_parent->SyncLoop();
39  RunEpilog();
40 }
41 
42 const uint ThreadedFileWriter::kMaxBufferSize = 8 * 1024 * 1024;
43 const uint ThreadedFileWriter::kMinWriteSize = 64 * 1024;
44 const uint ThreadedFileWriter::kMaxBlockSize = 1 * 1024 * 1024;
45 
61  int pflags, mode_t pmode) :
62  // file stuff
63  filename(fname), flags(pflags),
64  mode(pmode), fd(-1),
65  // state
66  flush(false), in_dtor(false),
67  ignore_writes(false), tfw_min_write_size(kMinWriteSize),
68  totalBufferUse(0),
69  // threads
70  writeThread(nullptr), syncThread(nullptr),
71  m_warned(false), m_blocking(false),
72  m_registered(false)
73 {
74 }
75 
82 bool ThreadedFileWriter::ReOpen(QString newFilename)
83 {
84  Flush();
85 
86  buflock.lock();
87 
88  if (fd >= 0)
89  {
90  close(fd);
91  fd = -1;
92  }
93 
94  if (m_registered)
95  {
97  }
98 
99  if (!newFilename.isEmpty())
100  filename = newFilename;
101 
102  buflock.unlock();
103 
104  return Open();
105 }
106 
112 {
113  ignore_writes = false;
114 
115  if (filename == "-")
116  fd = fileno(stdout);
117  else
118  {
119  QByteArray fname = filename.toLocal8Bit();
120  fd = open(fname.constData(), flags, mode);
121  }
122 
123  if (fd < 0)
124  {
125  LOG(VB_GENERAL, LOG_ERR, LOC +
126  QString("Opening file '%1'.").arg(filename) + ENO);
127  return false;
128  }
129 
131  m_registered = true;
132 
133  LOG(VB_FILE, LOG_INFO, LOC + "Open() successful");
134 
135 #ifdef _WIN32
136  _setmode(fd, _O_BINARY);
137 #endif
138  if (!writeThread)
139  {
140  writeThread = new TFWWriteThread(this);
141  writeThread->start();
142  }
143 
144  if (!syncThread)
145  {
146  syncThread = new TFWSyncThread(this);
147  syncThread->start();
148  }
149 
150  return true;
151 }
152 
157 {
158  Flush();
159 
160  { /* tell child threads to exit */
161  QMutexLocker locker(&buflock);
162  in_dtor = true;
163  bufferSyncWait.wakeAll();
164  bufferHasData.wakeAll();
165  }
166 
167  if (writeThread)
168  {
169  writeThread->wait();
170  delete writeThread;
171  writeThread = nullptr;
172  }
173 
174  while (!writeBuffers.empty())
175  {
176  delete writeBuffers.front();
177  writeBuffers.pop_front();
178  }
179 
180  while (!emptyBuffers.empty())
181  {
182  delete emptyBuffers.front();
183  emptyBuffers.pop_front();
184  }
185 
186  if (syncThread)
187  {
188  syncThread->wait();
189  delete syncThread;
190  syncThread = nullptr;
191  }
192 
193  if (fd >= 0)
194  {
195  close(fd);
196  fd = -1;
197  }
198 
200  m_registered = false;
201 }
202 
209 int ThreadedFileWriter::Write(const void *data, uint count)
210 {
211  if (count == 0)
212  return 0;
213 
214  QMutexLocker locker(&buflock);
215 
216  if (ignore_writes)
217  return -1;
218 
219  uint written = 0;
220  uint left = count;
221 
222  while (written < count)
223  {
224  uint towrite = (left > kMaxBlockSize) ? kMaxBlockSize : left;
225 
226  if ((totalBufferUse + towrite) > (kMaxBufferSize * (m_blocking ? 1 : 8)))
227  {
228  if (!m_blocking)
229  {
230  LOG(VB_GENERAL, LOG_ERR, LOC +
231  "Maximum buffer size exceeded."
232  "\n\t\t\tfile will be truncated, no further writing "
233  "will be done."
234  "\n\t\t\tThis generally indicates your disk performance "
235  "\n\t\t\tis insufficient to deal with the number of on-going "
236  "\n\t\t\trecordings, or you have a disk failure.");
237  ignore_writes = true;
238  return -1;
239  }
240  if (!m_warned)
241  {
242  LOG(VB_GENERAL, LOG_WARNING, LOC +
243  "Maximum buffer size exceeded."
244  "\n\t\t\tThis generally indicates your disk performance "
245  "\n\t\t\tis insufficient or you have a disk failure.");
246  m_warned = true;
247  }
248  // wait until some was written to disk, and try again
249  if (!bufferWasFreed.wait(locker.mutex(), 1000))
250  {
251  LOG(VB_GENERAL, LOG_DEBUG, LOC +
252  QString("Taking a long time waiting to write.. "
253  "buffer size %1 (needing %2, %3 to go)")
254  .arg(totalBufferUse).arg(towrite)
255  .arg(towrite-(kMaxBufferSize-totalBufferUse)));
256  }
257  continue;
258  }
259 
260  TFWBuffer *buf = nullptr;
261 
262  if (!writeBuffers.empty() &&
263  (writeBuffers.back()->data.size() + towrite) < kMinWriteSize)
264  {
265  buf = writeBuffers.back();
266  writeBuffers.pop_back();
267  }
268  else
269  {
270  if (!emptyBuffers.empty())
271  {
272  buf = emptyBuffers.front();
273  emptyBuffers.pop_front();
274  buf->data.clear();
275  }
276  else
277  {
278  buf = new TFWBuffer();
279  }
280  }
281 
282  totalBufferUse += towrite;
283 
284  const char *cdata = (const char*) data + written;
285  buf->data.insert(buf->data.end(), cdata, cdata+towrite);
286  buf->lastUsed = MythDate::current();
287 
288  writeBuffers.push_back(buf);
289 
290  if ((writeBuffers.size() > 1) || (buf->data.size() >= kMinWriteSize))
291  {
292  bufferHasData.wakeAll();
293  }
294 
295  written += towrite;
296  left -= towrite;
297  }
298 
299  LOG(VB_FILE, LOG_DEBUG, LOC + QString("Write(*, %1) total %2 cnt %3")
300  .arg(count,4).arg(totalBufferUse).arg(writeBuffers.size()));
301 
302  return count;
303 }
304 
316 long long ThreadedFileWriter::Seek(long long pos, int whence)
317 {
318  QMutexLocker locker(&buflock);
319  flush = true;
320  while (!writeBuffers.empty())
321  {
322  bufferHasData.wakeAll();
323  if (!bufferEmpty.wait(locker.mutex(), 2000))
324  {
325  LOG(VB_GENERAL, LOG_WARNING, LOC +
326  QString("Taking a long time to flush.. buffer size %1")
327  .arg(totalBufferUse));
328  }
329  }
330  flush = false;
331  return lseek(fd, pos, whence);
332 }
333 
338 {
339  QMutexLocker locker(&buflock);
340  flush = true;
341  while (!writeBuffers.empty())
342  {
343  bufferHasData.wakeAll();
344  if (!bufferEmpty.wait(locker.mutex(), 2000))
345  {
346  LOG(VB_GENERAL, LOG_WARNING, LOC +
347  QString("Taking a long time to flush.. buffer size %1")
348  .arg(totalBufferUse));
349  }
350  }
351  flush = false;
352 }
353 
375 {
376  if (fd >= 0)
377  {
378 #if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0
379  // fdatasync tries to avoid updating metadata, but will in
380  // practice always update metadata if any data is written
381  // as the file will usually have grown.
382  fdatasync(fd);
383 #else
384  fsync(fd);
385 #endif
386  }
387 }
388 
394 {
395  QMutexLocker locker(&buflock);
396  if (newMinSize > 0)
397  tfw_min_write_size = newMinSize;
398  bufferHasData.wakeAll();
399 }
400 
405 {
406  QMutexLocker locker(&buflock);
407  while (!in_dtor)
408  {
409  locker.unlock();
410 
411  Sync();
412 
413  locker.relock();
414 
416  {
417  // we aren't going to write to the disk anymore, so can de-register
419  m_registered = false;
420  }
421  bufferSyncWait.wait(&buflock, 1000);
422  }
423 }
424 
429 {
430 #ifndef _WIN32
431  // don't exit program if file gets larger than quota limit..
432  signal(SIGXFSZ, SIG_IGN);
433 #endif
434 
435  QMutexLocker locker(&buflock);
436 
437  // Even if the bytes buffered is less than the minimum write
438  // size we do want to write to the OS buffers periodically.
439  // This timer makes sure we do.
440  MythTimer minWriteTimer, lastRegisterTimer;
441  minWriteTimer.start();
442  lastRegisterTimer.start();
443 
444  uint64_t total_written = 0LL;
445 
446  while (!in_dtor)
447  {
448  if (ignore_writes)
449  {
450  while (!writeBuffers.empty())
451  {
452  delete writeBuffers.front();
453  writeBuffers.pop_front();
454  }
455  while (!emptyBuffers.empty())
456  {
457  delete emptyBuffers.front();
458  emptyBuffers.pop_front();
459  }
460  bufferEmpty.wakeAll();
461  bufferHasData.wait(locker.mutex());
462  continue;
463  }
464 
465  if (writeBuffers.empty())
466  {
467  bufferEmpty.wakeAll();
468  bufferHasData.wait(locker.mutex(), 1000);
470  continue;
471  }
472 
473  int mwte = minWriteTimer.elapsed();
474  if (!flush && (mwte < 250) && (totalBufferUse < kMinWriteSize))
475  {
476  bufferHasData.wait(locker.mutex(), 250 - mwte);
478  continue;
479  }
480 
481  if (fd == -1)
482  {
483  bufferHasData.wait(locker.mutex(), 200);
485  continue;
486  }
487 
488  TFWBuffer *buf = writeBuffers.front();
489  writeBuffers.pop_front();
490  totalBufferUse -= buf->data.size();
491  bufferWasFreed.wakeAll();
492  minWriteTimer.start();
493 
495 
496  const void *data = &(buf->data[0]);
497  uint sz = buf->data.size();
498 
499  bool write_ok = true;
500  uint tot = 0;
501  uint errcnt = 0;
502 
503  LOG(VB_FILE, LOG_DEBUG, LOC + QString("write(%1) cnt %2 total %3")
504  .arg(sz).arg(writeBuffers.size())
505  .arg(totalBufferUse));
506 
507  MythTimer writeTimer;
508  writeTimer.start();
509 
510  while ((tot < sz) && !in_dtor)
511  {
512  locker.unlock();
513 
514  int ret = write(fd, (char *)data + tot, sz - tot);
515 
516  if (ret < 0)
517  {
518  if (errno == EAGAIN)
519  {
520  LOG(VB_GENERAL, LOG_WARNING, LOC + "Got EAGAIN.");
521  }
522  else
523  {
524  errcnt++;
525  LOG(VB_GENERAL, LOG_ERR, LOC + "File I/O " +
526  QString(" errcnt: %1").arg(errcnt) + ENO);
527  }
528 
529  if ((errcnt >= 3) || (ENOSPC == errno) || (EFBIG == errno))
530  {
531  locker.relock();
532  write_ok = false;
533  break;
534  }
535  }
536  else
537  {
538  tot += ret;
539  total_written += ret;
540  LOG(VB_FILE, LOG_DEBUG, LOC +
541  QString("total written so far: %1 bytes")
542  .arg(total_written));
543  }
544 
545  locker.relock();
546 
547  if ((tot < sz) && !in_dtor)
548  bufferHasData.wait(locker.mutex(), 50);
549  }
550 
552 
553  if (lastRegisterTimer.elapsed() >= 10000)
554  {
555  gCoreContext->RegisterFileForWrite(filename, total_written);
556  m_registered = true;
557  lastRegisterTimer.restart();
558  }
559 
560  buf->lastUsed = MythDate::current();
561  emptyBuffers.push_back(buf);
562 
563  if (writeTimer.elapsed() > 1000)
564  {
565  LOG(VB_GENERAL, LOG_WARNING, LOC +
566  QString("write(%1) cnt %2 total %3 -- took a long time, %4 ms")
567  .arg(sz).arg(writeBuffers.size())
568  .arg(totalBufferUse).arg(writeTimer.elapsed()));
569  }
570 
571  if (!write_ok && ((EFBIG == errno) || (ENOSPC == errno)))
572  {
573  QString msg;
574  switch (errno)
575  {
576  case EFBIG:
577  msg =
578  "Maximum file size exceeded by '%1'"
579  "\n\t\t\t"
580  "You must either change the process ulimits, configure"
581  "\n\t\t\t"
582  "your operating system with \"Large File\" support, "
583  "or use"
584  "\n\t\t\t"
585  "a filesystem which supports 64-bit or 128-bit files."
586  "\n\t\t\t"
587  "HINT: FAT32 is a 32-bit filesystem.";
588  break;
589  case ENOSPC:
590  msg =
591  "No space left on the device for file '%1'"
592  "\n\t\t\t"
593  "file will be truncated, no further writing "
594  "will be done.";
595  break;
596  }
597 
598  LOG(VB_GENERAL, LOG_ERR, LOC + msg.arg(filename));
599  ignore_writes = true;
600  }
601  }
602 }
603 
605 {
606  QDateTime cur = MythDate::current();
607  QDateTime cur_m_60 = cur.addSecs(-60);
608 
609  QList<TFWBuffer*>::iterator it = emptyBuffers.begin();
610  while (it != emptyBuffers.end())
611  {
612  if (((*it)->lastUsed < cur_m_60) ||
613  ((*it)->data.capacity() > 3 * (*it)->data.size() &&
614  (*it)->data.capacity() > 64 * 1024))
615  {
616  delete *it;
617  it = emptyBuffers.erase(it);
618  continue;
619  }
620  ++it;
621  }
622 }
623 
632 {
633  bool old = m_blocking;
634  m_blocking = block;
635  return old;
636 }
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:216
#define fsync(FD)
Definition: compat.h:136
ThreadedFileWriter * m_parent
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:295
def write(text, progress=True)
Definition: mythburn.py:279
int restart(void)
Returns milliseconds elapsed since last start() or restart() and resets the count.
Definition: mythtimer.cpp:62
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
ThreadedFileWriter(const QString &fname, int flags, mode_t mode)
Creates a threaded file writer.
QWaitCondition bufferSyncWait
QList< TFWBuffer * > emptyBuffers
void RegisterFileForWrite(const QString &file, uint64_t size=0LL)
friend class TFWWriteThread
QWaitCondition bufferEmpty
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:312
friend class TFWSyncThread
TFWSyncThread * syncThread
#define lseek
unsigned int uint
Definition: compat.h:140
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
long long Seek(long long pos, int whence)
Seek to a position within stream; May be unsafe.
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
VERBOSE_PREAMBLE false
Definition: verbosedefs.h:85
#define close
Definition: compat.h:16
QList< TFWBuffer * > writeBuffers
TFWWriteThread * writeThread
QDateTime current(bool stripped)
Returns current Date and Time in UTC.
Definition: mythdate.cpp:10
void Flush(void)
Allow DiskLoop() to flush buffer completely ignoring low watermark.
void SetWriteBufferMinWriteSize(uint newMinSize=kMinWriteSize)
Sets the minumum number of bytes to write to disk in a single write.
void run(void) override
Runs ThreadedFileWriter::DiskLoop(void)
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
void UnregisterFileForWrite(const QString &file)
void DiskLoop(void)
The thread run method that actually calls writes to disk.
ThreadedFileWriter * m_parent
int elapsed(void) const
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
~ThreadedFileWriter()
Commits all writes and closes the file.
bool Open(void)
Opens the file we will be writing to.
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
void run(void) override
Runs ThreadedFileWriter::SyncLoop(void)
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:203
static const uint kMaxBlockSize
Maximum block size to write at a time.
static MythSystemLegacyIOHandler * writeThread
void Sync(void)
Flush data written to the file descriptor to disk.
static const uint kMinWriteSize
Minimum to write to disk in a single write, when not flushing buffer.
bool ReOpen(QString newFilename="")
Reopens the file we are writing to or opens a new file.
QWaitCondition bufferWasFreed
#define LOC
void SyncLoop(void)
The thread run method that calls Sync(void).
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
bool SetBlocking(bool block=true)
Set write blocking mode While in blocking mode, ThreadedFileWriter::Write will wait for buffers to be...
static const uint kMaxBufferSize
QWaitCondition bufferHasData