media: dvb: mpq: Use kthread instead of workqueue
demux used single-threaded workqueue to process TS packets
notified from the HW. Workqueue implementation was changed
so that all work scheduled to workqueues are submitted
to same kworker threads, this result on having TS packet
processing not to be done on demux own thread and compete
with other work scheduled by other drivers. Moved to separate
thread dedicated only for demux.
Change-Id: Ia8b96543f26428a0a12809d34c27849f900cc45e
Signed-off-by: Hamad Kadmany <hkadmany@codeaurora.org>
Signed-off-by: Neha Pandey <nehap@codeaurora.org>
diff --git a/Documentation/dvb/qcom-mpq.txt b/Documentation/dvb/qcom-mpq.txt
index 28f5d39..1196da0 100644
--- a/Documentation/dvb/qcom-mpq.txt
+++ b/Documentation/dvb/qcom-mpq.txt
@@ -123,17 +123,15 @@
Background Processing
---------------------
-When demux receives notifications from underlying HW drivers about new
-data, it schedules work to a single-threaded workqueue to process the
-notification.
+Demux allocates a kernel thread for each live-input to process
+the TS packets notified from the HW for specific input. There
+are two such inputs (TSIF0 and TSIF1), both can be processed in
+parallel by two seperate threads.
The processing is the action of demuxing of the new data; it may sleep
as it locks against the demux data-structure that may be accessed by
user-space in the meanwhile.
-A single threaded workqueue exists for each live input (TSIF0 or TSIF1)
-to process the inputs in parallel.
-
Dependencies
------------
The demux driver depends on the following kernel drivers and subsystems:
diff --git a/drivers/media/dvb/mpq/demux/mpq_dmx_plugin_common.h b/drivers/media/dvb/mpq/demux/mpq_dmx_plugin_common.h
index 3500eda..e9987c2 100644
--- a/drivers/media/dvb/mpq/demux/mpq_dmx_plugin_common.h
+++ b/drivers/media/dvb/mpq/demux/mpq_dmx_plugin_common.h
@@ -30,7 +30,7 @@
/**
* TSIF alias name length
*/
-#define TSIF_NAME_LENGTH 10
+#define TSIF_NAME_LENGTH 20
#define MPQ_MAX_FOUND_PATTERNS 5
diff --git a/drivers/media/dvb/mpq/demux/mpq_dmx_plugin_tsif.c b/drivers/media/dvb/mpq/demux/mpq_dmx_plugin_tsif.c
index 2e783f6..bbf9d0a 100644
--- a/drivers/media/dvb/mpq/demux/mpq_dmx_plugin_tsif.c
+++ b/drivers/media/dvb/mpq/demux/mpq_dmx_plugin_tsif.c
@@ -13,7 +13,7 @@
#include <linux/init.h>
#include <linux/module.h>
#include <linux/tsif_api.h>
-#include <linux/workqueue.h>
+#include <linux/kthread.h>
#include <linux/moduleparam.h>
#include "mpq_dvb_debug.h"
#include "mpq_dmx_plugin_common.h"
@@ -38,17 +38,8 @@
static int tsif_mode = DMX_TSIF_DRIVER_MODE_DEF;
static int clock_inv;
module_param(threshold, int, S_IRUGO);
-module_param(tsif_mode, int, S_IRUGO);
-module_param(clock_inv, int, S_IRUGO);
-
-/*
- * Work scheduled each time TSIF notifies dmx
- * of new TS packet
- */
-struct tsif_work {
- struct work_struct work;
- int tsif_id;
-};
+module_param(tsif_mode, int, S_IRUGO | S_IWUSR);
+module_param(clock_inv, int, S_IRUGO | S_IWUSR);
/*
@@ -78,11 +69,12 @@
{
/* Information for each TSIF input processing */
struct {
- /* work used to submit to workqueue for processing */
- struct tsif_work work;
+ /* thread processing TS packets from TSIF */
+ struct task_struct *thread;
+ wait_queue_head_t wait_queue;
- /* workqueue that processes TS packets from specific TSIF */
- struct workqueue_struct *workqueue;
+ /* Counter for data notifications from TSIF */
+ atomic_t data_cnt;
/* TSIF alias */
char name[TSIF_NAME_LENGTH];
@@ -103,94 +95,72 @@
/**
- * Worker function that processes the TS packets notified by the TSIF driver.
+ * Demux thread function handling data from specific TSIF.
*
- * @worker: the executed work
+ * @arg: TSIF number
*/
-static void mpq_dmx_tsif_work(struct work_struct *worker)
+static int mpq_dmx_tsif_thread(void *arg)
{
- struct tsif_work *tsif_work =
- container_of(worker, struct tsif_work, work);
struct mpq_demux *mpq_demux;
struct tsif_driver_info *tsif_driver;
size_t packets = 0;
- int tsif = tsif_work->tsif_id;
+ int tsif = (int)arg;
+ int ret;
- mpq_demux = mpq_dmx_tsif_info.tsif[tsif].mpq_demux;
- tsif_driver = &(mpq_dmx_tsif_info.tsif[tsif].tsif_driver);
+ do {
+ ret = wait_event_interruptible(
+ mpq_dmx_tsif_info.tsif[tsif].wait_queue,
+ (atomic_read(
+ &mpq_dmx_tsif_info.tsif[tsif].data_cnt) != 0) ||
+ kthread_should_stop());
- MPQ_DVB_DBG_PRINT(
- "%s executed, tsif = %d\n",
- __func__,
- tsif);
+ if ((ret < 0) || kthread_should_stop()) {
+ MPQ_DVB_DBG_PRINT("%s: exit\n", __func__);
+ break;
+ }
- if (mutex_lock_interruptible(&mpq_dmx_tsif_info.tsif[tsif].mutex))
- return;
+ if (mutex_lock_interruptible(
+ &mpq_dmx_tsif_info.tsif[tsif].mutex))
+ return -ERESTARTSYS;
- /* Check if driver handler is still valid */
- if (tsif_driver->tsif_handler == NULL) {
- mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
- MPQ_DVB_ERR_PRINT("%s: tsif_driver->tsif_handler is NULL!\n",
+ tsif_driver = &(mpq_dmx_tsif_info.tsif[tsif].tsif_driver);
+ mpq_demux = mpq_dmx_tsif_info.tsif[tsif].mpq_demux;
+
+ /* Check if driver handler is still valid */
+ if (tsif_driver->tsif_handler == NULL) {
+ mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
+ MPQ_DVB_DBG_PRINT(
+ "%s: tsif was detached\n",
__func__);
- return;
- }
+ continue;
+ }
- tsif_get_state(tsif_driver->tsif_handler, &(tsif_driver->ri),
- &(tsif_driver->wi), &(tsif_driver->state));
+ tsif_get_state(
+ tsif_driver->tsif_handler, &(tsif_driver->ri),
+ &(tsif_driver->wi), &(tsif_driver->state));
- if ((tsif_driver->wi == tsif_driver->ri) ||
- (tsif_driver->state == tsif_state_stopped) ||
- (tsif_driver->state == tsif_state_error)) {
+ if ((tsif_driver->wi == tsif_driver->ri) ||
+ (tsif_driver->state == tsif_state_stopped) ||
+ (tsif_driver->state == tsif_state_error)) {
- mpq_demux->hw_notification_size = 0;
+ mpq_demux->hw_notification_size = 0;
- mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
+ mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
- MPQ_DVB_ERR_PRINT(
- "%s: invalid TSIF state (%d), wi = (%d), ri = (%d)\n",
- __func__,
- tsif_driver->state, tsif_driver->wi, tsif_driver->ri);
- return;
- }
+ MPQ_DVB_DBG_PRINT(
+ "%s: TSIF invalid state %d, %d, %d\n",
+ __func__,
+ tsif_driver->state,
+ tsif_driver->wi,
+ tsif_driver->ri);
+ continue;
+ }
- if (tsif_driver->wi > tsif_driver->ri) {
- packets = (tsif_driver->wi - tsif_driver->ri);
- mpq_demux->hw_notification_size = packets;
+ atomic_dec(&mpq_dmx_tsif_info.tsif[tsif].data_cnt);
- dvb_dmx_swfilter_format(
- &mpq_demux->demux,
- (tsif_driver->data_buffer +
- (tsif_driver->ri * TSIF_PKT_SIZE)),
- (packets * TSIF_PKT_SIZE),
- DMX_TSP_FORMAT_192_TAIL);
-
- tsif_driver->ri =
- (tsif_driver->ri + packets) % tsif_driver->buffer_size;
-
- tsif_reclaim_packets(tsif_driver->tsif_handler,
- tsif_driver->ri);
- } else {
- /*
- * wi < ri, means wraparound on cyclic buffer.
- * Handle in two stages.
- */
- packets = (tsif_driver->buffer_size - tsif_driver->ri);
- mpq_demux->hw_notification_size = packets;
-
- dvb_dmx_swfilter_format(
- &mpq_demux->demux,
- (tsif_driver->data_buffer +
- (tsif_driver->ri * TSIF_PKT_SIZE)),
- (packets * TSIF_PKT_SIZE),
- DMX_TSP_FORMAT_192_TAIL);
-
- /* tsif_driver->ri should be 0 after this */
- tsif_driver->ri =
- (tsif_driver->ri + packets) % tsif_driver->buffer_size;
-
- packets = tsif_driver->wi;
- if (packets > 0) {
- mpq_demux->hw_notification_size += packets;
+ if (tsif_driver->wi > tsif_driver->ri) {
+ packets = (tsif_driver->wi - tsif_driver->ri);
+ mpq_demux->hw_notification_size = packets;
dvb_dmx_swfilter_format(
&mpq_demux->demux,
@@ -202,13 +172,55 @@
tsif_driver->ri =
(tsif_driver->ri + packets) %
tsif_driver->buffer_size;
+
+ tsif_reclaim_packets(
+ tsif_driver->tsif_handler,
+ tsif_driver->ri);
+ } else {
+ /*
+ * wi < ri, means wraparound on cyclic buffer.
+ * Handle in two stages.
+ */
+ packets = (tsif_driver->buffer_size - tsif_driver->ri);
+ mpq_demux->hw_notification_size = packets;
+
+ dvb_dmx_swfilter_format(
+ &mpq_demux->demux,
+ (tsif_driver->data_buffer +
+ (tsif_driver->ri * TSIF_PKT_SIZE)),
+ (packets * TSIF_PKT_SIZE),
+ DMX_TSP_FORMAT_192_TAIL);
+
+ /* tsif_driver->ri should be 0 after this */
+ tsif_driver->ri =
+ (tsif_driver->ri + packets) %
+ tsif_driver->buffer_size;
+
+ packets = tsif_driver->wi;
+ if (packets > 0) {
+ mpq_demux->hw_notification_size += packets;
+
+ dvb_dmx_swfilter_format(
+ &mpq_demux->demux,
+ (tsif_driver->data_buffer +
+ (tsif_driver->ri * TSIF_PKT_SIZE)),
+ (packets * TSIF_PKT_SIZE),
+ DMX_TSP_FORMAT_192_TAIL);
+
+ tsif_driver->ri =
+ (tsif_driver->ri + packets) %
+ tsif_driver->buffer_size;
+ }
+
+ tsif_reclaim_packets(
+ tsif_driver->tsif_handler,
+ tsif_driver->ri);
}
- tsif_reclaim_packets(tsif_driver->tsif_handler,
- tsif_driver->ri);
- }
+ mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
+ } while (1);
- mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
+ return 0;
}
@@ -220,7 +232,6 @@
static void mpq_tsif_callback(void *user)
{
int tsif = (int)user;
- struct work_struct *work;
struct mpq_demux *mpq_demux;
MPQ_DVB_DBG_PRINT("%s executed, tsif = %d\n", __func__, tsif);
@@ -229,11 +240,8 @@
mpq_demux = mpq_dmx_tsif_info.tsif[tsif].mpq_demux;
mpq_dmx_update_hw_statistics(mpq_demux);
- work = &mpq_dmx_tsif_info.tsif[tsif].work.work;
-
- /* Scheudle a new work to demux workqueue */
- if (!work_pending(work))
- queue_work(mpq_dmx_tsif_info.tsif[tsif].workqueue, work);
+ atomic_inc(&mpq_dmx_tsif_info.tsif[tsif].data_cnt);
+ wake_up(&mpq_dmx_tsif_info.tsif[tsif].wait_queue);
}
@@ -376,20 +384,10 @@
tsif_driver = &(mpq_dmx_tsif_info.tsif[tsif].tsif_driver);
tsif_stop(tsif_driver->tsif_handler);
tsif_detach(tsif_driver->tsif_handler);
- /*
- * temporarily release mutex and flush the work queue
- * before setting tsif_handler to NULL
- */
- mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
- flush_workqueue(mpq_dmx_tsif_info.tsif[tsif].workqueue);
- /* re-acquire mutex */
- if (mutex_lock_interruptible(
- &mpq_dmx_tsif_info.tsif[tsif].mutex))
- return -ERESTARTSYS;
-
tsif_driver->tsif_handler = NULL;
tsif_driver->data_buffer = NULL;
tsif_driver->buffer_size = 0;
+ atomic_set(&mpq_dmx_tsif_info.tsif[tsif].data_cnt, 0);
mpq_dmx_tsif_info.tsif[tsif].mpq_demux = NULL;
}
@@ -708,31 +706,28 @@
}
for (i = 0; i < TSIF_COUNT; i++) {
- mpq_dmx_tsif_info.tsif[i].work.tsif_id = i;
-
- INIT_WORK(&mpq_dmx_tsif_info.tsif[i].work.work,
- mpq_dmx_tsif_work);
-
snprintf(mpq_dmx_tsif_info.tsif[i].name,
TSIF_NAME_LENGTH,
- "tsif_%d",
+ "dmx_tsif%d",
i);
- mpq_dmx_tsif_info.tsif[i].workqueue =
- create_singlethread_workqueue(
+ atomic_set(&mpq_dmx_tsif_info.tsif[i].data_cnt, 0);
+ init_waitqueue_head(&mpq_dmx_tsif_info.tsif[i].wait_queue);
+ mpq_dmx_tsif_info.tsif[i].thread =
+ kthread_run(
+ mpq_dmx_tsif_thread, (void *)i,
mpq_dmx_tsif_info.tsif[i].name);
- if (mpq_dmx_tsif_info.tsif[i].workqueue == NULL) {
+ if (IS_ERR(mpq_dmx_tsif_info.tsif[i].thread)) {
int j;
for (j = 0; j < i; j++) {
- destroy_workqueue(
- mpq_dmx_tsif_info.tsif[j].workqueue);
+ kthread_stop(mpq_dmx_tsif_info.tsif[j].thread);
mutex_destroy(&mpq_dmx_tsif_info.tsif[j].mutex);
}
MPQ_DVB_ERR_PRINT(
- "%s: create_singlethread_workqueue failed\n",
+ "%s: kthread_run failed\n",
__func__);
return -ENOMEM;
@@ -753,7 +748,7 @@
ret);
for (i = 0; i < TSIF_COUNT; i++) {
- destroy_workqueue(mpq_dmx_tsif_info.tsif[i].workqueue);
+ kthread_stop(mpq_dmx_tsif_info.tsif[i].thread);
mutex_destroy(&mpq_dmx_tsif_info.tsif[i].mutex);
}
}
@@ -781,16 +776,13 @@
if (tsif_driver->tsif_handler)
tsif_stop(tsif_driver->tsif_handler);
}
+
/* Detach from TSIF driver to avoid further notifications. */
if (tsif_driver->tsif_handler)
tsif_detach(tsif_driver->tsif_handler);
- /* release mutex to allow work queue to finish scheduled work */
mutex_unlock(&mpq_dmx_tsif_info.tsif[i].mutex);
- /* flush the work queue and destroy it */
- flush_workqueue(mpq_dmx_tsif_info.tsif[i].workqueue);
- destroy_workqueue(mpq_dmx_tsif_info.tsif[i].workqueue);
-
+ kthread_stop(mpq_dmx_tsif_info.tsif[i].thread);
mutex_destroy(&mpq_dmx_tsif_info.tsif[i].mutex);
}
diff --git a/drivers/media/dvb/mpq/demux/mpq_dmx_plugin_tspp_v1.c b/drivers/media/dvb/mpq/demux/mpq_dmx_plugin_tspp_v1.c
index 360d96a..f5c01e1 100644
--- a/drivers/media/dvb/mpq/demux/mpq_dmx_plugin_tspp_v1.c
+++ b/drivers/media/dvb/mpq/demux/mpq_dmx_plugin_tspp_v1.c
@@ -12,12 +12,11 @@
#include <linux/init.h>
#include <linux/module.h>
-#include <linux/workqueue.h>
+#include <linux/kthread.h>
#include <mach/msm_tspp.h>
#include "mpq_dvb_debug.h"
#include "mpq_dmx_plugin_common.h"
-
#define TSIF_COUNT 2
#define TSPP_MAX_PID_FILTER_NUM 16
@@ -28,6 +27,7 @@
/* For each TSIF we allocate two pipes, one for PES and one for sections */
#define TSPP_PES_CHANNEL 0
#define TSPP_SECTION_CHANNEL 1
+#define TSPP_CHANNEL_COUNT 2
/* the channel_id set to TSPP driver based on TSIF number and channel type */
#define TSPP_CHANNEL_ID(tsif, ch) ((tsif << 1) + ch)
@@ -84,18 +84,10 @@
static int clock_inv;
static int tsif_mode = 2;
static int allocation_mode = MPQ_DMX_TSPP_INTERNAL_ALLOC;
-module_param(tsif_mode, int, S_IRUGO);
-module_param(clock_inv, int, S_IRUGO);
+module_param(tsif_mode, int, S_IRUGO | S_IWUSR);
+module_param(clock_inv, int, S_IRUGO | S_IWUSR);
module_param(allocation_mode, int, S_IRUGO);
-/*
- * Work scheduled each time TSPP notifies dmx
- * of new TS packet in some channel
- */
-struct tspp_work {
- struct work_struct work;
- int channel_id;
-};
/* The following structure hold singelton information
* required for dmx implementation on top of TSPP.
@@ -111,8 +103,8 @@
*/
int pes_channel_ref;
- /* work used to submit to workqueue to process pes channel */
- struct tspp_work pes_work;
+ /* Counter for data notifications on PES pipe */
+ atomic_t pes_data_cnt;
/* ION handle used for TSPP data buffer allocation */
struct ion_handle *pes_mem_heap_handle;
@@ -130,8 +122,8 @@
*/
int section_channel_ref;
- /* work used to submit to workqueue to process pes channel */
- struct tspp_work section_work;
+ /* Counter for data notifications on section pipe */
+ atomic_t section_data_cnt;
/* ION handle used for TSPP data buffer allocation */
struct ion_handle *section_mem_heap_handle;
@@ -151,8 +143,9 @@
int ref_count;
} filters[TSPP_MAX_PID_FILTER_NUM];
- /* workqueue that processes TS packets from specific TSIF */
- struct workqueue_struct *workqueue;
+ /* thread processing TS packets from TSPP */
+ struct task_struct *thread;
+ wait_queue_head_t wait_queue;
/* TSIF alias */
char name[TSIF_NAME_LENGTH];
@@ -274,55 +267,93 @@
}
/**
- * Worker function that processes the TS packets notified by TSPP.
+ * Demux thread function handling data from specific TSIF.
*
- * @worker: the executed work
+ * @arg: TSIF number
*/
-static void mpq_dmx_tspp_work(struct work_struct *worker)
+static int mpq_dmx_tspp_thread(void *arg)
{
- struct tspp_work *tspp_work =
- container_of(worker, struct tspp_work, work);
+ int tsif = (int)arg;
struct mpq_demux *mpq_demux;
- int channel_id = tspp_work->channel_id;
- int tsif = TSPP_GET_TSIF_NUM(channel_id);
const struct tspp_data_descriptor *tspp_data_desc;
+ atomic_t *data_cnt;
int ref_count;
+ int ret;
+ int i;
- mpq_demux = mpq_dmx_tspp_info.tsif[tsif].mpq_demux;
+ do {
+ ret = wait_event_interruptible(
+ mpq_dmx_tspp_info.tsif[tsif].wait_queue,
+ (atomic_read(
+ &mpq_dmx_tspp_info.tsif[tsif].pes_data_cnt)) ||
+ (atomic_read(
+ &mpq_dmx_tspp_info.tsif[tsif].section_data_cnt)) ||
+ kthread_should_stop());
- /* Lock against the TSPP filters data-structure */
- if (mutex_lock_interruptible(&mpq_dmx_tspp_info.tsif[tsif].mutex))
- return;
+ if ((ret < 0) || kthread_should_stop()) {
+ MPQ_DVB_ERR_PRINT("%s: exit\n", __func__);
+ break;
+ }
- /* Make sure channel is still active */
- if (TSPP_IS_PES_CHANNEL(channel_id))
- ref_count = mpq_dmx_tspp_info.tsif[tsif].pes_channel_ref;
- else
- ref_count = mpq_dmx_tspp_info.tsif[tsif].section_channel_ref;
+ /* Lock against the TSPP filters data-structure */
+ if (mutex_lock_interruptible(
+ &mpq_dmx_tspp_info.tsif[tsif].mutex))
+ return -ERESTARTSYS;
- if (ref_count == 0) {
+ for (i = 0; i < TSPP_CHANNEL_COUNT; i++) {
+ int channel_id = TSPP_CHANNEL_ID(tsif, i);
+
+ if (TSPP_IS_PES_CHANNEL(channel_id)) {
+ ref_count =
+ mpq_dmx_tspp_info.tsif[tsif].pes_channel_ref;
+ data_cnt =
+ &mpq_dmx_tspp_info.tsif[tsif].pes_data_cnt;
+ } else {
+ ref_count =
+ mpq_dmx_tspp_info.tsif[tsif].
+ section_channel_ref;
+ data_cnt =
+ &mpq_dmx_tspp_info.tsif[tsif].section_data_cnt;
+ }
+
+ /* Make sure channel is still active */
+ if (ref_count == 0)
+ continue;
+
+ atomic_dec(data_cnt);
+
+ mpq_demux = mpq_dmx_tspp_info.tsif[tsif].mpq_demux;
+ mpq_demux->hw_notification_size = 0;
+
+ /*
+ * Go through all filled descriptors
+ * and perform demuxing on them
+ */
+ while ((tspp_data_desc =
+ tspp_get_buffer(0, channel_id)) != NULL) {
+ mpq_demux->hw_notification_size +=
+ (tspp_data_desc->size /
+ TSPP_RAW_TTS_SIZE);
+
+ dvb_dmx_swfilter_format(
+ &mpq_demux->demux,
+ tspp_data_desc->virt_base,
+ tspp_data_desc->size,
+ DMX_TSP_FORMAT_192_TAIL);
+
+ /*
+ * Notify TSPP that the buffer
+ * is no longer needed
+ */
+ tspp_release_buffer(0,
+ channel_id, tspp_data_desc->id);
+ }
+ }
+
mutex_unlock(&mpq_dmx_tspp_info.tsif[tsif].mutex);
- return;
- }
+ } while (1);
- mpq_demux->hw_notification_size = 0;
-
- /* Go through all filled descriptors and perform demuxing on them */
- while ((tspp_data_desc = tspp_get_buffer(0, channel_id)) != NULL) {
- mpq_demux->hw_notification_size +=
- (tspp_data_desc->size / TSPP_RAW_TTS_SIZE);
-
- dvb_dmx_swfilter_format(
- &mpq_demux->demux,
- tspp_data_desc->virt_base,
- tspp_data_desc->size,
- DMX_TSP_FORMAT_192_TAIL);
-
- /* Notify TSPP that the buffer is no longer needed */
- tspp_release_buffer(0, channel_id, tspp_data_desc->id);
- }
-
- mutex_unlock(&mpq_dmx_tspp_info.tsif[tsif].mutex);
+ return 0;
}
/**
@@ -334,7 +365,6 @@
static void mpq_tspp_callback(int channel_id, void *user)
{
int tsif = (int)user;
- struct work_struct *work;
struct mpq_demux *mpq_demux;
/* Save statistics on TSPP notifications */
@@ -342,13 +372,11 @@
mpq_dmx_update_hw_statistics(mpq_demux);
if (TSPP_IS_PES_CHANNEL(channel_id))
- work = &mpq_dmx_tspp_info.tsif[tsif].pes_work.work;
+ atomic_inc(&mpq_dmx_tspp_info.tsif[tsif].pes_data_cnt);
else
- work = &mpq_dmx_tspp_info.tsif[tsif].section_work.work;
+ atomic_inc(&mpq_dmx_tspp_info.tsif[tsif].section_data_cnt);
- /* Scheudle a new work to demux workqueue */
- if (!work_pending(work))
- queue_work(mpq_dmx_tspp_info.tsif[tsif].workqueue, work);
+ wake_up(&mpq_dmx_tspp_info.tsif[tsif].wait_queue);
}
/**
@@ -586,6 +614,7 @@
int tsif;
int ret;
int channel_id;
+ atomic_t *data_cnt;
int *channel_ref_count;
struct tspp_filter tspp_filter;
struct mpq_demux *mpq_demux = feed->demux->priv;
@@ -613,10 +642,12 @@
channel_id = TSPP_CHANNEL_ID(tsif, TSPP_PES_CHANNEL);
channel_ref_count =
&mpq_dmx_tspp_info.tsif[tsif].pes_channel_ref;
+ data_cnt = &mpq_dmx_tspp_info.tsif[tsif].pes_data_cnt;
} else {
channel_id = TSPP_CHANNEL_ID(tsif, TSPP_SECTION_CHANNEL);
channel_ref_count =
&mpq_dmx_tspp_info.tsif[tsif].section_channel_ref;
+ data_cnt = &mpq_dmx_tspp_info.tsif[tsif].section_data_cnt;
}
/* check if required TSPP pipe is already allocated or not */
@@ -677,6 +708,7 @@
tspp_unregister_notification(0, channel_id);
tspp_close_channel(0, channel_id);
tspp_close_stream(0, channel_id);
+ atomic_set(data_cnt, 0);
}
mutex_unlock(&mpq_dmx_tspp_info.tsif[tsif].mutex);
@@ -1079,24 +1111,14 @@
mpq_dmx_tspp_info.tsif[i].pes_mem_heap_handle = NULL;
mpq_dmx_tspp_info.tsif[i].pes_mem_heap_virt_base = NULL;
mpq_dmx_tspp_info.tsif[i].pes_mem_heap_phys_base = 0;
-
- mpq_dmx_tspp_info.tsif[i].pes_work.channel_id =
- TSPP_CHANNEL_ID(i, TSPP_PES_CHANNEL);
-
- INIT_WORK(&mpq_dmx_tspp_info.tsif[i].pes_work.work,
- mpq_dmx_tspp_work);
+ atomic_set(&mpq_dmx_tspp_info.tsif[i].pes_data_cnt, 0);
mpq_dmx_tspp_info.tsif[i].section_channel_ref = 0;
mpq_dmx_tspp_info.tsif[i].section_index = 0;
mpq_dmx_tspp_info.tsif[i].section_mem_heap_handle = NULL;
mpq_dmx_tspp_info.tsif[i].section_mem_heap_virt_base = NULL;
mpq_dmx_tspp_info.tsif[i].section_mem_heap_phys_base = 0;
-
- mpq_dmx_tspp_info.tsif[i].section_work.channel_id =
- TSPP_CHANNEL_ID(i, TSPP_SECTION_CHANNEL);
-
- INIT_WORK(&mpq_dmx_tspp_info.tsif[i].section_work.work,
- mpq_dmx_tspp_work);
+ atomic_set(&mpq_dmx_tspp_info.tsif[i].section_data_cnt, 0);
for (j = 0; j < TSPP_MAX_PID_FILTER_NUM; j++) {
mpq_dmx_tspp_info.tsif[i].filters[j].pid = -1;
@@ -1105,22 +1127,23 @@
snprintf(mpq_dmx_tspp_info.tsif[i].name,
TSIF_NAME_LENGTH,
- "tsif_%d",
+ "dmx_tsif%d",
i);
- mpq_dmx_tspp_info.tsif[i].workqueue =
- create_singlethread_workqueue(
+ init_waitqueue_head(&mpq_dmx_tspp_info.tsif[i].wait_queue);
+ mpq_dmx_tspp_info.tsif[i].thread =
+ kthread_run(
+ mpq_dmx_tspp_thread, (void *)i,
mpq_dmx_tspp_info.tsif[i].name);
- if (mpq_dmx_tspp_info.tsif[i].workqueue == NULL) {
+ if (IS_ERR(mpq_dmx_tspp_info.tsif[i].thread)) {
for (j = 0; j < i; j++) {
- destroy_workqueue(
- mpq_dmx_tspp_info.tsif[j].workqueue);
-
+ kthread_stop(mpq_dmx_tspp_info.tsif[j].thread);
mutex_destroy(&mpq_dmx_tspp_info.tsif[j].mutex);
}
+
MPQ_DVB_ERR_PRINT(
- "%s: create_singlethread_workqueue failed\n",
+ "%s: kthread_run failed\n",
__func__);
return -ENOMEM;
@@ -1138,7 +1161,7 @@
ret);
for (i = 0; i < TSIF_COUNT; i++) {
- destroy_workqueue(mpq_dmx_tspp_info.tsif[i].workqueue);
+ kthread_stop(mpq_dmx_tspp_info.tsif[i].thread);
mutex_destroy(&mpq_dmx_tspp_info.tsif[i].mutex);
}
}
@@ -1179,8 +1202,7 @@
mpq_dmx_tsif_ion_cleanup(i);
mutex_unlock(&mpq_dmx_tspp_info.tsif[i].mutex);
- flush_workqueue(mpq_dmx_tspp_info.tsif[i].workqueue);
- destroy_workqueue(mpq_dmx_tspp_info.tsif[i].workqueue);
+ kthread_stop(mpq_dmx_tspp_info.tsif[i].thread);
mutex_destroy(&mpq_dmx_tspp_info.tsif[i].mutex);
}