MythTV  master
asistreamhandler.cpp
Go to the documentation of this file.
1 // -*- Mode: c++ -*-
2 
3 // POSIX headers
4 #include <fcntl.h>
5 #include <unistd.h>
6 #ifndef _WIN32
7 #include <sys/select.h>
8 #include <sys/ioctl.h>
9 #endif
10 
11 // Qt headers
12 #include <QString>
13 #include <QFile>
14 
15 // MythTV headers
16 #include "asistreamhandler.h"
17 #include "asichannel.h"
18 #include "dtvsignalmonitor.h"
19 #include "streamlisteners.h"
20 #include "mpegstreamdata.h"
21 #include "cardutil.h"
22 
23 // DVEO ASI headers
24 #include <dveo/asi.h>
25 #include <dveo/master.h>
26 
27 #define LOC QString("ASISH[%1](%2): ").arg(_inputid).arg(_device)
28 
29 QMap<QString,ASIStreamHandler*> ASIStreamHandler::_handlers;
30 QMap<QString,uint> ASIStreamHandler::_handlers_refcnt;
32 
33 ASIStreamHandler *ASIStreamHandler::Get(const QString &devname,
34  int inputid)
35 {
36  QMutexLocker locker(&_handlers_lock);
37 
38  QString devkey = devname;
39 
40  QMap<QString,ASIStreamHandler*>::iterator it = _handlers.find(devkey);
41 
42  if (it == _handlers.end())
43  {
44  ASIStreamHandler *newhandler = new ASIStreamHandler(devname, inputid);
45  newhandler->Open();
46  _handlers[devkey] = newhandler;
47  _handlers_refcnt[devkey] = 1;
48 
49  LOG(VB_RECORD, LOG_INFO,
50  QString("ASISH[%1]: Creating new stream handler %2 for %3")
51  .arg(inputid).arg(devkey).arg(devname));
52  }
53  else
54  {
55  _handlers_refcnt[devkey]++;
56  uint rcount = _handlers_refcnt[devkey];
57  LOG(VB_RECORD, LOG_INFO,
58  QString("ASISH[%1]: Using existing stream handler %2 for %3")
59  .arg(inputid).arg(devkey)
60  .arg(devname) + QString(" (%1 in use)").arg(rcount));
61  }
62 
63  return _handlers[devkey];
64 }
65 
66 void ASIStreamHandler::Return(ASIStreamHandler * & ref, int inputid)
67 {
68  QMutexLocker locker(&_handlers_lock);
69 
70  QString devname = ref->_device;
71 
72  QMap<QString,uint>::iterator rit = _handlers_refcnt.find(devname);
73  if (rit == _handlers_refcnt.end())
74  return;
75 
76  QMap<QString,ASIStreamHandler*>::iterator it = _handlers.find(devname);
77 
78  if (*rit > 1)
79  {
80  ref = nullptr;
81  (*rit)--;
82  return;
83  }
84 
85  if ((it != _handlers.end()) && (*it == ref))
86  {
87  LOG(VB_RECORD, LOG_INFO, QString("ASISH[%1]: Closing handler for %2")
88  .arg(inputid).arg(devname));
89  ref->Close();
90  delete *it;
91  _handlers.erase(it);
92  }
93  else
94  {
95  LOG(VB_GENERAL, LOG_ERR,
96  QString("ASISH[%1] Error: Couldn't find handler for %2")
97  .arg(inputid).arg(devname));
98  }
99 
100  _handlers_refcnt.erase(rit);
101  ref = nullptr;
102 }
103 
104 ASIStreamHandler::ASIStreamHandler(const QString &device, int inputid)
105  : StreamHandler(device, inputid)
106  , _device_num(-1), _buf_size(-1), _fd(-1)
107  , _packet_size(TSPacket::kSize)
108  , _clock_source(kASIInternalClock)
110  , _drb(nullptr)
111 {
112  setObjectName("ASISH");
113 }
114 
116 {
117  _clock_source = cs;
118  // TODO we should make it possible to set this immediately
119  // not wait for the next open
120 }
121 
123 {
124  _rx_mode = m;
125  // TODO we should make it possible to set this immediately
126  // not wait for the next open
127 }
128 
130 {
131  if (_drb && _running_desired && !desired)
132  _drb->Stop();
134 }
135 
137 {
138  RunProlog();
139 
140  LOG(VB_RECORD, LOG_INFO, LOC + "run(): begin");
141 
142  if (!Open())
143  {
144  LOG(VB_GENERAL, LOG_ERR, LOC + QString("Failed to open device %1 : %2")
145  .arg(_device).arg(strerror(errno)));
146  _error = true;
147  return;
148  }
149 
150  DeviceReadBuffer *drb = new DeviceReadBuffer(this, true, false);
151  bool ok = drb->Setup(_device, _fd, _packet_size, _buf_size,
152  _num_buffers / 4);
153  if (!ok)
154  {
155  LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to allocate DRB buffer");
156  delete drb;
157  drb = nullptr;
158  Close();
159  _error = true;
160  RunEpilog();
161  return;
162  }
163 
164  uint buffer_size = _packet_size * 15000;
165  unsigned char *buffer = new unsigned char[buffer_size];
166  if (!buffer)
167  {
168  LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to allocate buffer");
169  delete drb;
170  drb = nullptr;
171  Close();
172  _error = true;
173  RunEpilog();
174  return;
175  }
176  memset(buffer, 0, buffer_size);
177 
178  SetRunning(true, true, false);
179 
180  drb->Start();
181 
182  {
183  QMutexLocker locker(&_start_stop_lock);
184  _drb = drb;
185  }
186 
187  int remainder = 0;
188  while (_running_desired && !_error)
189  {
191 
192  ssize_t len = 0;
193 
194  len = drb->Read(
195  &(buffer[remainder]), buffer_size - remainder);
196 
197  if (!_running_desired)
198  break;
199 
200  // Check for DRB errors
201  if (drb->IsErrored())
202  {
203  LOG(VB_GENERAL, LOG_ERR, LOC + "Device error detected");
204  _error = true;
205  }
206 
207  if (drb->IsEOF())
208  {
209  LOG(VB_GENERAL, LOG_ERR, LOC + "Device EOF detected");
210  _error = true;
211  }
212 
213  len += remainder;
214 
215  if (len < 10) // 10 bytes = 4 bytes TS header + 6 bytes PES header
216  {
217  remainder = len;
218  continue;
219  }
220 
221  if (!_listener_lock.tryLock())
222  {
223  remainder = len;
224  continue;
225  }
226 
227  if (_stream_data_list.empty())
228  {
229  _listener_lock.unlock();
230  continue;
231  }
232 
233  StreamDataList::const_iterator sit = _stream_data_list.begin();
234  for (; sit != _stream_data_list.end(); ++sit)
235  remainder = sit.key()->ProcessData(buffer, len);
236 
237  WriteMPTS(buffer, len - remainder);
238 
239  _listener_lock.unlock();
240 
241  if (remainder > 0 && (len > remainder)) // leftover bytes
242  memmove(buffer, &(buffer[len - remainder]), remainder);
243  }
244  LOG(VB_RECORD, LOG_INFO, LOC + "run(): " + "shutdown");
245 
247 
248  {
249  QMutexLocker locker(&_start_stop_lock);
250  _drb = nullptr;
251  }
252 
253  if (drb->IsRunning())
254  drb->Stop();
255 
256  delete drb;
257  delete[] buffer;
258  Close();
259 
260  LOG(VB_RECORD, LOG_INFO, LOC + "run(): " + "end");
261 
262  SetRunning(false, true, false);
263  RunEpilog();
264 }
265 
267 {
268  if (_fd >= 0)
269  return true;
270 
271  QString error;
273  if (_device_num < 0)
274  {
275  LOG(VB_GENERAL, LOG_ERR, LOC + error);
276  return false;
277  }
278 
280  if (_buf_size <= 0)
281  {
282  LOG(VB_GENERAL, LOG_ERR, LOC + error);
283  return false;
284  }
285 
287  if (_num_buffers <= 0)
288  {
289  LOG(VB_GENERAL, LOG_ERR, LOC + error);
290  return false;
291  }
292 
294  {
295  LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to set RX Mode: " + error);
296  return false;
297  }
298 
299  // actually open the device
300  _fd = open(_device.toLocal8Bit().constData(), O_RDONLY, 0);
301  if (_fd < 0)
302  {
303  LOG(VB_GENERAL, LOG_ERR, LOC +
304  QString("Failed to open '%1'").arg(_device) + ENO);
305  return false;
306  }
307 
308  // get the rx capabilities
309  unsigned int cap;
310  if (ioctl(_fd, ASI_IOC_RXGETCAP, &cap) < 0)
311  {
312  LOG(VB_GENERAL, LOG_ERR, LOC +
313  QString("Failed to query capabilities '%1'").arg(_device) + ENO);
314  Close();
315  return false;
316  }
317  // TODO? do stuff with capabilities..
318 
319  // we need to handle 188 & 204 byte packets..
320  switch (_rx_mode)
321  {
322  case kASIRXRawMode:
325  break;
326  case kASIRXSyncOn204:
328  break;
329  case kASIRXSyncOn188:
333  break;
334  }
335 
336  // pid counter?
337 
338  return _fd >= 0;
339 }
340 
342 {
343  if (_fd >= 0)
344  {
345  close(_fd);
346  _fd = -1;
347  }
348 }
349 
351 {
352  int val;
353  if(ioctl(fd, ASI_IOC_RXGETEVENTS, &val) < 0)
354  {
355  LOG(VB_GENERAL, LOG_ERR, LOC + QString("Failed to open device %1: ")
356  .arg(_device) + ENO);
357  //TODO: Handle error
358  return;
359  }
360  if(val & ASI_EVENT_RX_BUFFER)
361  LOG(VB_RECORD, LOG_ERR, LOC +
362  QString("Driver receive buffer queue overrun detected %1")
363  .arg(_device));
364  if(val & ASI_EVENT_RX_FIFO)
365  LOG(VB_RECORD, LOG_ERR, LOC +
366  QString("Driver receive FIFO overrun detected %1")
367  .arg(_device));
368  if(val & ASI_EVENT_RX_CARRIER)
369  LOG(VB_RECORD, LOG_NOTICE, LOC +
370  QString("Carrier Status change detected %1")
371  .arg(_device));
372  if(val & ASI_EVENT_RX_LOS)
373  LOG(VB_RECORD, LOG_ERR, LOC +
374  QString("Loss of Packet Sync detected %1")
375  .arg(_device));
376  if(val & ASI_EVENT_RX_AOS)
377  LOG(VB_RECORD, LOG_NOTICE, LOC +
378  QString("Acquisition of Sync detected %1")
379  .arg(_device));
380  if(val & ASI_EVENT_RX_DATA)
381  LOG(VB_RECORD, LOG_NOTICE, LOC +
382  QString("Receive data status change detected %1")
383  .arg(_device));
384 }
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:216
Used to access the data of a Transport Stream packet.
Definition: tspacket.h:127
bool IsRunning(void) const
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
static uint GetASIBufferSize(uint device_num, QString *error=nullptr)
bool IsEOF(void) const
static QMap< QString, uint > _handlers_refcnt
static void error(const char *str,...)
Definition: vbi.c:41
bool UpdateFiltersFromStreamData(void)
volatile bool _error
void SetClockSource(ASIClockSource cs)
unsigned int uint
Definition: compat.h:140
static uint GetASINumBuffers(uint device_num, QString *error=nullptr)
void setObjectName(const QString &name)
Definition: mthread.cpp:250
bool IsErrored(void) const
uint Read(unsigned char *buf, uint count)
Try to Read count bytes from into buffer.
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
#define close
Definition: compat.h:16
void PriorityEvent(int fd) override
QMutex _listener_lock
ASIClockSource _clock_source
volatile bool _running_desired
ASIRXMode
void SetRXMode(ASIRXMode m)
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
StreamDataList _stream_data_list
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
DeviceReadBuffer * _drb
static const unsigned int kSize
Definition: tspacket.h:181
static int GetASIDeviceNumber(const QString &device, QString *error=nullptr)
bool Setup(const QString &streamName, int streamfd, uint readQuanta=sizeof(TSPacket), uint deviceBufferSize=0, uint deviceBufferCount=1)
ASIStreamHandler(const QString &, int inputid)
virtual void SetRunningDesired(bool desired)
At minimum this sets _running_desired, this may also send signals to anything that might be blocking ...
static QMutex _handlers_lock
#define LOC
static bool SetASIMode(uint device_num, uint mode, QString *error=nullptr)
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:203
Buffers reads from device files.
static QMap< QString, ASIStreamHandler * > _handlers
void WriteMPTS(unsigned char *buffer, uint len)
Write out a copy of the raw MPTS.
ASIClockSource
static void Return(ASIStreamHandler *&ref, int inputid)
static const unsigned int kDVBEmissionSize
Definition: tspacket.h:183
void SetRunningDesired(bool desired) override
At minimum this sets _running_desired, this may also send signals to anything that might be blocking ...
bool RemoveAllPIDFilters(void)
static ASIStreamHandler * Get(const QString &devicename, int inputid)
QMutex _start_stop_lock