MythTV  master
streamhandler.cpp
Go to the documentation of this file.
1 // -*- Mode: c++ -*-
2 
3 // MythTV headers
4 #include "streamhandler.h"
5 #include "threadedfilewriter.h"
6 
7 #ifndef O_LARGEFILE
8 #define O_LARGEFILE 0
9 #endif
10 
11 #define LOC QString("SH[%1](%2): ").arg(_inputid).arg(_device)
12 
13 StreamHandler::StreamHandler(const QString &device, int inputid)
14  : MThread("StreamHandler")
15  , _device(device)
16  , _inputid(inputid)
17  , _needs_buffering(false)
18  , _allow_section_reader(false)
19  , _running_desired(false)
20  , _error(false)
21  , _running(false)
22  , _using_buffering(false)
23  , _using_section_reader(false)
24  , _pid_lock(QMutex::Recursive)
25  , _open_pid_filters(0)
26  , _mpts_tfw(nullptr)
27  , _listener_lock(QMutex::Recursive)
28 {
29 }
30 
32 {
33  QMutexLocker locker(&_add_rm_lock);
34 
35  {
36  QMutexLocker locker2(&_listener_lock);
37  if (!_stream_data_list.empty())
38  {
39  LOG(VB_GENERAL, LOG_ERR, LOC +
40  "dtor & _stream_data_list not empty");
41  }
42  }
43 
44  // This should never be triggered.. just to be safe..
45  if (_running)
46  Stop();
47 }
48 
50  bool allow_section_reader,
51  bool needs_buffering,
52  QString output_file)
53 {
54  QMutexLocker locker(&_add_rm_lock);
55 
56  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- begin")
57  .arg((uint64_t)data,0,16));
58  if (!data)
59  {
60  LOG(VB_GENERAL, LOG_ERR, LOC +
61  QString("AddListener(0x%1) -- null data")
62  .arg((uint64_t)data,0,16));
63  return;
64  }
65 
66  _listener_lock.lock();
67 
68  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- locked")
69  .arg((uint64_t)data,0,16));
70 
71  if (_stream_data_list.empty())
72  {
73  QMutexLocker locker2(&_start_stop_lock);
74  _allow_section_reader = allow_section_reader;
75  _needs_buffering = needs_buffering;
76  }
77  else
78  {
79  QMutexLocker locker2(&_start_stop_lock);
80  _allow_section_reader &= allow_section_reader;
81  _needs_buffering |= needs_buffering;
82  }
83 
84  _stream_data_list[data] = output_file;
85 
86  _listener_lock.unlock();
87 
88  Start();
89 
90  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- end")
91  .arg((uint64_t)data,0,16));
92 }
93 
95 {
96  QMutexLocker locker(&_add_rm_lock);
97 
98  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- begin")
99  .arg((uint64_t)data,0,16));
100  if (!data)
101  {
102  LOG(VB_GENERAL, LOG_ERR, LOC +
103  QString("RemoveListener(0x%1) -- null data")
104  .arg((uint64_t)data,0,16));
105  return;
106  }
107 
108  _listener_lock.lock();
109 
110  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- locked")
111  .arg((uint64_t)data,0,16));
112 
113  StreamDataList::iterator it = _stream_data_list.find(data);
114 
115  if (it != _stream_data_list.end())
116  {
117  if (!(*it).isEmpty())
119  _stream_data_list.erase(it);
120  }
121 
122  _listener_lock.unlock();
123 
124  if (_stream_data_list.empty())
125  Stop();
126 
127  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- end")
128  .arg((uint64_t)data,0,16));
129 }
130 
132 {
133  QMutexLocker locker(&_start_stop_lock);
134 
135  if (_running)
136  {
139  {
140  LOG(VB_RECORD, LOG_INFO, LOC + "Restarting StreamHandler");
141  SetRunningDesired(false);
142  locker.unlock();
143  wait();
144  locker.relock();
145  }
146  }
147 
148  if (_running)
149  return;
150 
151  _eit_pids.clear();
152 
153  _error = false;
154  SetRunningDesired(true);
155  MThread::start();
156 
157  while (!_running && !_error && _running_desired)
159 
160  if (_error)
161  {
162  LOG(VB_GENERAL, LOG_ERR, LOC + "Start failed");
163  SetRunningDesired(false);
164  }
165 }
166 
168 {
169  LOG(VB_RECORD, LOG_DEBUG, LOC + "Stopping");
170  SetRunningDesired(false);
171  wait();
172  LOG(VB_RECORD, LOG_DEBUG, LOC + "Stopped");
173 }
174 
175 bool StreamHandler::IsRunning(void) const
176 {
177  // This used to use QMutexLocker, but that sometimes left the
178  // mutex locked on exit, so...
179  _start_stop_lock.lock();
180  bool r = _running;
181  _start_stop_lock.unlock();
182  return r;
183 }
184 
185 void StreamHandler::SetRunning(bool is_running,
186  bool is_using_buffering,
187  bool is_using_section_reader)
188 {
189  QMutexLocker locker(&_start_stop_lock);
190  _running = is_running;
191  _using_buffering = is_using_buffering;
192  _using_section_reader = is_using_section_reader;
193  _running_state_changed.wakeAll();
194 }
195 
197 {
198  _running_desired = desired;
199  if (!desired)
200  MThread::exit(0);
201 }
202 
204 {
205 #ifdef DEBUG_PID_FILTERS
206  LOG(VB_RECORD, LOG_DEBUG, LOC + QString("AddPIDFilter(0x%1)")
207  .arg(info->_pid, 0, 16));
208 #endif // DEBUG_PID_FILTERS
209 
210  QMutexLocker writing_locker(&_pid_lock);
211  _pid_info[info->_pid] = info;
212 
214 
215  return true;
216 }
217 
219 {
220 #ifdef DEBUG_PID_FILTERS
221  LOG(VB_RECORD, LOG_DEBUG, LOC +
222  QString("RemovePIDFilter(0x%1)").arg(pid, 0, 16));
223 #endif // DEBUG_PID_FILTERS
224 
225  QMutexLocker write_locker(&_pid_lock);
226 
227  PIDInfoMap::iterator it = _pid_info.find(pid);
228  if (it == _pid_info.end())
229  return false;
230 
231  PIDInfo *tmp = *it;
232  _pid_info.erase(it);
233 
234  bool ok = true;
235  if (tmp->IsOpen())
236  {
237  ok = tmp->Close(_device);
239 
241  }
242 
243  delete tmp;
244 
245  return ok;
246 }
247 
249 {
250  QMutexLocker write_locker(&_pid_lock);
251 
252 #ifdef DEBUG_PID_FILTERS
253  LOG(VB_RECORD, LOG_DEBUG, LOC + "RemoveAllPIDFilters()");
254 #endif // DEBUG_PID_FILTERS
255 
256  vector<int> del_pids;
257  PIDInfoMap::iterator it = _pid_info.begin();
258  for (; it != _pid_info.end(); ++it)
259  del_pids.push_back(it.key());
260 
261  bool ok = true;
262  vector<int>::iterator dit = del_pids.begin();
263  for (; dit != del_pids.end(); ++dit)
264  ok &= RemovePIDFilter(*dit);
265 
266  return UpdateFilters() && ok;
267 }
268 
270 {
271  vector<uint> add_eit, del_eit;
272 
273  QMutexLocker read_locker(&_listener_lock);
274 
275  StreamDataList::const_iterator it1 = _stream_data_list.begin();
276  for (; it1 != _stream_data_list.end(); ++it1)
277  {
278  MPEGStreamData *sd = it1.key();
279  if (sd->HasEITPIDChanges(_eit_pids) &&
280  sd->GetEITPIDChanges(_eit_pids, add_eit, del_eit))
281  {
282  for (uint i = 0; i < del_eit.size(); i++)
283  {
284  uint_vec_t::iterator it2;
285  it2 = find(_eit_pids.begin(), _eit_pids.end(), del_eit[i]);
286  if (it2 != _eit_pids.end())
287  _eit_pids.erase(it2);
288  sd->RemoveListeningPID(del_eit[i]);
289  }
290 
291  for (uint i = 0; i < add_eit.size(); i++)
292  {
293  _eit_pids.push_back(add_eit[i]);
294  sd->AddListeningPID(add_eit[i]);
295  }
296  }
297  }
298 }
299 
301 {
303 
304  pid_map_t pids;
305 
306  {
307  QMutexLocker read_locker(&_listener_lock);
308  StreamDataList::const_iterator it = _stream_data_list.begin();
309  for (; it != _stream_data_list.end(); ++it)
310  it.key()->GetPIDs(pids);
311  }
312 
313  QMap<uint, PIDInfo*> add_pids;
314  vector<uint> del_pids;
315 
316  {
317  QMutexLocker read_locker(&_pid_lock);
318 
319  // PIDs that need to be added..
320  pid_map_t::const_iterator lit = pids.constBegin();
321  for (; lit != pids.constEnd(); ++lit)
322  {
323  if (*lit && (_pid_info.find(lit.key()) == _pid_info.end()))
324  {
325  add_pids[lit.key()] = CreatePIDInfo(
326  lit.key(), StreamID::PrivSec, 0);
327  }
328  }
329 
330  // PIDs that need to be removed..
331  PIDInfoMap::const_iterator fit = _pid_info.begin();
332  for (; fit != _pid_info.end(); ++fit)
333  {
334  bool in_pids = pids.find(fit.key()) != pids.end();
335  if (!in_pids)
336  del_pids.push_back(fit.key());
337  }
338  }
339 
340  // Remove PIDs
341  bool ok = true;
342  vector<uint>::iterator dit = del_pids.begin();
343  for (; dit != del_pids.end(); ++dit)
344  ok &= RemovePIDFilter(*dit);
345 
346  // Add PIDs
347  QMap<uint, PIDInfo*>::iterator ait = add_pids.begin();
348  for (; ait != add_pids.end(); ++ait)
349  ok &= AddPIDFilter(*ait);
350 
351  // Cycle filters if it's been a while
352  if (_cycle_timer.isRunning() && (_cycle_timer.elapsed() > 1000))
354 
355  return ok;
356 }
357 
359 {
360  QMutexLocker reading_locker(&_listener_lock);
361 
363 
364  StreamDataList::const_iterator it = _stream_data_list.begin();
365  for (; it != _stream_data_list.end(); ++it)
366  tmp = max(tmp, it.key()->GetPIDPriority(pid));
367 
368  return tmp;
369 }
370 
371 void StreamHandler::WriteMPTS(unsigned char * buffer, uint len)
372 {
373  if (_mpts_tfw == nullptr)
374  return;
375  _mpts_tfw->Write(buffer, len);
376 }
377 
378 bool StreamHandler::AddNamedOutputFile(const QString &file)
379 {
380 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
381  QMutexLocker lk(&_mpts_lock);
382 
383  _mpts_files.insert(file);
384  QString fn = QString("%1.raw").arg(file);
385 
386  if (_mpts_files.size() == 1)
387  {
388  _mpts_base_file = fn;
389  _mpts_tfw = new ThreadedFileWriter(fn,
390  O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE,
391  0644);
392  if (!_mpts_tfw->Open())
393  {
394  delete _mpts_tfw;
395  _mpts_tfw = nullptr;
396  return false;
397  }
398  LOG(VB_RECORD, LOG_INFO, LOC +
399  QString("Opened '%1'").arg(_mpts_base_file));
400  }
401  else
402  {
403  if (link(_mpts_base_file.toLocal8Bit(), fn.toLocal8Bit()) < 0)
404  {
405  LOG(VB_GENERAL, LOG_ERR, LOC +
406  QString("Failed to link '%1' to '%2'")
407  .arg(_mpts_base_file)
408  .arg(fn) +
409  ENO);
410  }
411  else
412  {
413  LOG(VB_RECORD, LOG_INFO, LOC +
414  QString("linked '%1' to '%2'")
415  .arg(_mpts_base_file)
416  .arg(fn));
417  }
418  }
419 
420 #endif // !defined( USING_MINGW ) && !defined( _MSC_VER )
421  return true;
422 }
423 
424 void StreamHandler::RemoveNamedOutputFile(const QString &file)
425 {
426 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
427  QMutexLocker lk(&_mpts_lock);
428 
429  QSet<QString>::iterator it = _mpts_files.find(file);
430  if (it != _mpts_files.end())
431  {
432  _mpts_files.erase(it);
433  if (_mpts_files.isEmpty())
434  {
435  delete _mpts_tfw;
436  _mpts_tfw = nullptr;
437  }
438  }
439 #endif // !defined( USING_MINGW ) && !defined( _MSC_VER )
440 }
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:295
#define O_LARGEFILE
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:46
bool _using_section_reader
virtual PIDInfo * CreatePIDInfo(uint pid, uint stream_type, int pes_type)
Definition: streamhandler.h:97
bool UpdateFiltersFromStreamData(void)
static pid_list_t::iterator find(const PIDInfoMap &map, pid_list_t &list, pid_list_t::iterator begin, pid_list_t::iterator end, bool find_open)
QMap< uint, PIDPriority > pid_map_t
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:312
virtual void RemoveListener(MPEGStreamData *data)
volatile bool _error
ISO 13818-1 private tables & ITU H.222.0.
Definition: mpegtables.h:143
unsigned int uint
Definition: compat.h:140
static guint32 * tmp
Definition: goom_core.c:35
bool isRunning(void) const
Returns true if start() or restart() has been called at least once since construction and since any c...
Definition: mythtimer.cpp:134
unsigned char r
Definition: ParseText.cpp:340
virtual bool UpdateFilters(void)
Definition: streamhandler.h:88
virtual void RemoveListeningPID(uint pid)
bool _allow_section_reader
virtual void RemoveNamedOutputFile(const QString &filename)
Called with _listener_lock locked just before removing old output file.
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
vector< uint > _eit_pids
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
void Stop(void)
VERBOSE_PREAMBLE false
Definition: verbosedefs.h:85
void Start(void)
#define LOC
MythTimer _cycle_timer
bool RemovePIDFilter(uint pid)
QSet< QString > _mpts_files
QMutex _listener_lock
void exit(int retcode=0)
Use this to exit from the thread if you are using a Qt event loop.
Definition: mthread.cpp:290
volatile bool _running_desired
QMutex _add_rm_lock
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
int elapsed(void) const
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
virtual void AddListeningPID(uint pid, PIDPriority priority=kPIDPriorityNormal)
PIDPriority
StreamDataList _stream_data_list
bool Open(void)
Opens the file we will be writing to.
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
virtual void SetRunningDesired(bool desired)
At minimum this sets _running_desired, this may also send signals to anything that might be blocking ...
virtual void AddListener(MPEGStreamData *data, bool allow_section_reader=false, bool needs_drb=false, QString output_file=QString())
bool AddPIDFilter(PIDInfo *info)
virtual void CycleFiltersByPriority()
Definition: streamhandler.h:89
This class supports the writing of recordings to disk.
QWaitCondition _running_state_changed
uint _open_pid_filters
void WriteMPTS(unsigned char *buffer, uint len)
Write out a copy of the raw MPTS.
virtual bool HasEITPIDChanges(const uint_vec_t &) const
ThreadedFileWriter * _mpts_tfw
PIDInfoMap _pid_info
QString _mpts_base_file
virtual bool GetEITPIDChanges(const uint_vec_t &, uint_vec_t &, uint_vec_t &) const
void UpdateListeningForEIT(void)
uint _pid
Definition: streamhandler.h:40
virtual bool AddNamedOutputFile(const QString &filename)
Called with _listener_lock locked just after adding new output file.
Encapsulates data about MPEG stream and emits events for each table.
bool IsRunning(void) const
bool RemoveAllPIDFilters(void)
StreamHandler(const QString &device, int inputid)
QMutex _start_stop_lock
PIDPriority GetPIDPriority(uint pid) const