MythTV  master
fifowriter.cpp
Go to the documentation of this file.
1 #include <cstdio>
2 #include <cstdlib>
3 #include <unistd.h>
4 #include <fcntl.h>
5 #include <cassert>
6 #include <cerrno>
7 #include <sys/time.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <ctime>
11 #include <cmath>
12 
13 #include "fifowriter.h"
14 #include "compat.h"
15 #include "mythlogging.h"
16 
17 #include "mythconfig.h"
18 #if CONFIG_DARWIN
19  #include <sys/aio.h> // O_SYNC
20 #endif
21 
22 #include <iostream>
23 using namespace std;
24 
25 FIFOWriter::FIFOWriter(int count, bool sync) :
26  fifo_buf(nullptr),
27  fb_inptr(nullptr),
28  fb_outptr(nullptr),
29  fifothrds(nullptr),
30  fifo_lock(nullptr),
31  full_cond(nullptr),
32  empty_cond(nullptr),
33  filename(nullptr),
34  fbdesc(nullptr),
35  maxblksize(nullptr),
36  killwr(nullptr),
37  fbcount(nullptr),
38  fbmaxcount( nullptr),
39  num_fifos(count),
40  usesync(sync)
41 {
42  if (count <= 0)
43  return;
44 
45  fifo_buf = new struct fifo_buf *[count];
46  fb_inptr = new struct fifo_buf *[count];
47  fb_outptr = new struct fifo_buf *[count];
48  fifothrds = new FIFOThread[count];
49  fifo_lock = new QMutex[count];
50  full_cond = new QWaitCondition[count];
51  empty_cond = new QWaitCondition[count];
52  filename = new QString [count];
53  fbdesc = new QString [count];
54  maxblksize = new long[count];
55  killwr = new int[count];
56  fbcount = new int[count];
57  fbmaxcount = new int[count];
58 }
59 
61 {
62  if (num_fifos <= 0)
63  return;
64 
65  for (int i = 0; i < num_fifos; i++)
66  {
67  QMutexLocker flock(&fifo_lock[i]);
68  killwr[i] = 1;
69  empty_cond[i].wakeAll();
70  }
71 
72  for (int i = 0; i < num_fifos; i++)
73  {
74  fifothrds[i].wait();
75  }
76 
77  num_fifos = 0;
78 
79  delete [] maxblksize;
80  delete [] fifo_buf;
81  delete [] fb_inptr;
82  delete [] fb_outptr;
83  delete [] fifothrds;
84  delete [] full_cond;
85  delete [] empty_cond;
86  delete [] fifo_lock;
87  delete [] filename;
88  delete [] fbdesc;
89  delete [] killwr;
90  delete [] fbcount;
91  delete [] fbmaxcount;
92 }
93 
94 int FIFOWriter::FIFOInit(int id, QString desc, QString name, long size,
95  int num_bufs)
96 {
97  if (id < 0 || id >= num_fifos)
98  return false;
99 
100  QByteArray fname = name.toLatin1();
101  const char *aname = fname.constData();
102  if (mkfifo(aname, S_IREAD | S_IWRITE | S_IRGRP | S_IROTH) == -1)
103  {
104  LOG(VB_GENERAL, LOG_ERR, QString("Couldn't create fifo for file: '%1'")
105  .arg(name) + ENO);
106  return false;
107  }
108  LOG(VB_GENERAL, LOG_INFO, QString("Created %1 fifo: %2")
109  .arg(desc).arg(name));
110  maxblksize[id] = size;
111  filename[id] = name;
112  fbdesc[id] = desc;
113  killwr[id] = 0;
114  fbcount[id] = (usesync) ? 2 : num_bufs;
115  fbmaxcount[id] = 512;
116  fifo_buf[id] = new struct fifo_buf;
117  struct fifo_buf *fifoptr = fifo_buf[id];
118  for (int i = 0; i < fbcount[id]; i++)
119  {
120  fifoptr->data = new unsigned char[maxblksize[id]];
121  if (i == fbcount[id] - 1)
122  fifoptr->next = fifo_buf[id];
123  else
124  fifoptr->next = new struct fifo_buf;
125  fifoptr = fifoptr->next;
126  }
127  fb_inptr[id] = fifo_buf[id];
128  fb_outptr[id] = fifo_buf[id];
129 
130  fifothrds[id].SetParent(this);
131  fifothrds[id].SetId(id);
132  fifothrds[id].start();
133 
134  while (0 == killwr[id] && !fifothrds[id].isRunning())
135  usleep(1000);
136 
137  return fifothrds[id].isRunning();
138 }
139 
140 void FIFOThread::run(void)
141 {
142  RunProlog();
143  if (m_parent && m_id != -1)
145  RunEpilog();
146 }
147 
149 {
150  int fd = -1;
151 
152  QMutexLocker flock(&fifo_lock[id]);
153  while (true)
154  {
155  if ((fb_inptr[id] == fb_outptr[id]) && (0 == killwr[id]))
156  empty_cond[id].wait(flock.mutex());
157  flock.unlock();
158  if (killwr[id])
159  break;
160  if (fd < 0)
161  {
162  QByteArray fname = filename[id].toLatin1();
163  fd = open(fname.constData(), O_WRONLY| O_SYNC);
164  }
165  if (fd >= 0)
166  {
167  int written = 0;
168  while (written < fb_outptr[id]->blksize)
169  {
170  int ret = write(fd, fb_outptr[id]->data+written,
171  fb_outptr[id]->blksize-written);
172  if (ret < 0)
173  {
174  LOG(VB_GENERAL, LOG_ERR,
175  QString("FIFOW: write failed with %1")
176  .arg(strerror(errno)));
178  break;
179  }
180  else
181  {
182  written += ret;
183  }
184  }
185  }
186  flock.relock();
187  fb_outptr[id] = fb_outptr[id]->next;
188  full_cond[id].wakeAll();
189  }
190 
191  if (fd != -1)
192  close(fd);
193 
194  unlink(filename[id].toLocal8Bit().constData());
195 
196  while (fifo_buf[id]->next != fifo_buf[id])
197  {
198  struct fifo_buf *tmpfifo = fifo_buf[id]->next->next;
199  delete [] fifo_buf[id]->next->data;
200  delete fifo_buf[id]->next;
201  fifo_buf[id]->next = tmpfifo;
202  }
203  delete [] fifo_buf[id]->data;
204  delete fifo_buf[id];
205 }
206 
207 void FIFOWriter::FIFOWrite(int id, void *buffer, long blksize)
208 {
209  QMutexLocker flock(&fifo_lock[id]);
210  while (fb_inptr[id]->next == fb_outptr[id])
211  {
212  bool blocking = false;
213  if (!usesync)
214  {
215  for(int i = 0; i < num_fifos; i++)
216  {
217  if (i == id)
218  continue;
219  if (fb_inptr[i] == fb_outptr[i])
220  blocking = true;
221  }
222  }
223 
224  if (blocking && fbcount[id] < fbmaxcount[id])
225  {
226  struct fifo_buf *tmpfifo;
227  tmpfifo = fb_inptr[id]->next;
228  fb_inptr[id]->next = new struct fifo_buf;
229  fb_inptr[id]->next->data = new unsigned char[maxblksize[id]];
230  fb_inptr[id]->next->next = tmpfifo;
231  QString msg = QString("allocating additonal buffer for : %1(%2)")
232  .arg(fbdesc[id]).arg(++fbcount[id]);
233  LOG(VB_FILE, LOG_INFO, msg);
234  }
235  else
236  {
237  full_cond[id].wait(flock.mutex(), 1000);
238  }
239  }
240  if (blksize > maxblksize[id])
241  {
242  delete [] fb_inptr[id]->data;
243  fb_inptr[id]->data = new unsigned char[blksize];
244  }
245  memcpy(fb_inptr[id]->data,buffer,blksize);
246  fb_inptr[id]->blksize = blksize;
247  fb_inptr[id] = fb_inptr[id]->next;
248  empty_cond[id].wakeAll();
249 }
250 
252 {
253  int count = 0;
254  while (count < num_fifos)
255  {
256  count = 0;
257  for (int i = 0; i < num_fifos; i++)
258  {
259  QMutexLocker flock(&fifo_lock[i]);
260  if (fb_inptr[i] == fb_outptr[i])
261  {
262  killwr[i] = 1;
263  empty_cond[i].wakeAll();
264  count++;
265  }
266  }
267  usleep(1000);
268  }
269 }
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:216
#define mkfifo(path, mode)
Definition: compat.h:220
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:295
def write(text, progress=True)
Definition: mythburn.py:279
FIFOWriter * m_parent
Definition: fifowriter.h:26
struct fifo_buf * next
Definition: fifowriter.h:47
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: fifowriter.cpp:140
#define O_SYNC
Definition: compat.h:218
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:312
int * fbmaxcount
Definition: fifowriter.h:60
int FIFOInit(int id, QString desc, QString name, long size, int num_bufs)
Definition: fifowriter.cpp:94
struct FIFOWriter::fifo_buf ** fb_inptr
struct FIFOWriter::fifo_buf ** fifo_buf
QWaitCondition * empty_cond
Definition: fifowriter.h:55
QString * fbdesc
Definition: fifowriter.h:57
static bool isRunning(const char *program)
Returns true if a program containing the specified string is running on this machine.
#define S_IROTH
Definition: compat.h:217
#define close
Definition: compat.h:16
unsigned char * data
Definition: fifowriter.h:48
struct FIFOWriter::fifo_buf ** fb_outptr
void FIFOWriteThread(int id)
Definition: fifowriter.cpp:148
~FIFOWriter(void)
Definition: fifowriter.cpp:60
bool isRunning(void) const
Definition: mthread.cpp:275
QMutex * fifo_lock
Definition: fifowriter.h:53
long * maxblksize
Definition: fifowriter.h:59
bool usesync
Definition: fifowriter.h:62
const char * name
Definition: ParseText.cpp:339
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
int num_fifos
Definition: fifowriter.h:61
#define S_IRGRP
Definition: compat.h:216
PictureAttribute next(PictureAttributeSupported supported, PictureAttribute attribute)
QString * filename
Definition: fifowriter.h:57
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:203
void FIFODrain(void)
Definition: fifowriter.cpp:251
int * fbcount
Definition: fifowriter.h:60
QWaitCondition * full_cond
Definition: fifowriter.h:54
void FIFOWrite(int id, void *buf, long size)
Definition: fifowriter.cpp:207
void SetParent(FIFOWriter *parent)
Definition: fifowriter.h:23
int * killwr
Definition: fifowriter.h:60
FIFOThread * fifothrds
Definition: fifowriter.h:52
FIFOWriter(int count, bool sync)
Definition: fifowriter.cpp:25
void SetId(int id)
Definition: fifowriter.h:22