audio_demo/voice_call.cpp
2025-02-14 08:58:27 +08:00

1391 lines
50 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <iostream>
#include <unistd.h>
#include <cmath>
#include "timing.h"
#include "log/logger.h"
#include "common.h"
#include <modules/audio_processing/include/audio_processing.h>
#include <modules/audio_processing/include/config.h>
#include "alsa_dev.h"
#include "agcm.h"
#include "rnnoise_plugin.h"
#include <cmath>
using namespace toolkit;
#define MIX_AUDIO_CHANNELS 1
#define MIX_AUDIO_RATE 32000
#define MIX_AUDIO_SAMPLES (10 * MIX_AUDIO_RATE / 1000)
#define MIX_OUTPUT_RATE 48000
#define MIX_OUTPUT_SAMPLES (10 * MIX_OUTPUT_RATE / 1000)
struct audio_buf_t
{
uint8_t* data;
int index;
int size;
};
static void alsa_volume(long *volume, bool is_set)
{
static const char *alsa_dev = "default";
static const char *elem_name = "Master";
long min, max;
long left, right;
snd_mixer_t *handle;
snd_mixer_selem_id_t *sid;
snd_mixer_elem_t *elem;
snd_mixer_open(&handle, 0);
snd_mixer_attach(handle, alsa_dev);
snd_mixer_selem_register(handle, NULL, NULL);
snd_mixer_load(handle);
snd_mixer_selem_id_alloca(&sid);
snd_mixer_selem_id_set_index(sid, 0);
snd_mixer_selem_id_set_name(sid, elem_name);
#if 0
// Get the first element
elem = snd_mixer_first_elem(handle);
// Iterate through all elements
while (elem) {
// Get the element ID
snd_mixer_selem_get_id(elem, sid);
// Get and print the index and name
printf("Index: %d, Name: %s\n", snd_mixer_selem_id_get_index(sid), snd_mixer_selem_id_get_name(sid));
// Move to the next element
elem = snd_mixer_elem_next(elem);
}
#endif
// Get the mixer element
elem = snd_mixer_find_selem(handle, sid);
if (!elem) {
printf("snd_mixer_find_selem failed.\n");
return ;
}
if (std::string(elem_name) == "Digital") {
snd_mixer_selem_get_capture_volume_range(elem, &min, &max);
snd_mixer_selem_get_capture_volume(elem, SND_MIXER_SCHN_FRONT_LEFT, &left);
snd_mixer_selem_get_capture_volume(elem, SND_MIXER_SCHN_FRONT_RIGHT, &right);
} else {
snd_mixer_selem_get_playback_volume_range(elem, &min, &max);
snd_mixer_selem_get_playback_volume(elem, SND_MIXER_SCHN_FRONT_LEFT, &left);
snd_mixer_selem_get_playback_volume(elem, SND_MIXER_SCHN_FRONT_RIGHT, &right);
}
if (is_set) {
// Set the volume (e.g., to 50%)
if (std::string(elem_name) == "Digital") {
snd_mixer_selem_set_capture_volume_all(elem, (float)(max - min)/100.0 * (float)*volume);
} else
snd_mixer_selem_set_playback_volume_all(elem, (float)(max - min)/100.0 * (float)*volume);
} else {
*volume = (float)std::max(left, right)/ (float)(max - min) * 100;
}
snd_mixer_close(handle);
}
static void speaker_enable(bool enable)
{
static long volume = 0, zero = 0;
long tmp;
alsa_volume(&tmp, false);
if(!volume || tmp > 0) alsa_volume(&volume, false);
if (enable) {
alsa_volume(&volume, true);
}
else {
alsa_volume(&zero, true);
}
}
webrtc::AudioProcessing::Config webrtcConfigInit()
{
webrtc::AudioProcessing::Config apmConfig;
apmConfig.pipeline.maximum_internal_processing_rate = MIX_AUDIO_RATE;
apmConfig.pipeline.multi_channel_capture = MIX_AUDIO_CHANNELS > 1 ? true : false;
apmConfig.pipeline.multi_channel_render = MIX_AUDIO_CHANNELS > 1 ? true : false;
//PreAmplifier
apmConfig.pre_amplifier.enabled = false;
apmConfig.pre_amplifier.fixed_gain_factor = 0.7f;
//HighPassFilter
apmConfig.high_pass_filter.enabled = true;
apmConfig.high_pass_filter.apply_in_full_band = false;
//EchoCanceller
apmConfig.echo_canceller.enabled = true;
apmConfig.echo_canceller.mobile_mode = true;
apmConfig.echo_canceller.export_linear_aec_output = false;
apmConfig.echo_canceller.enforce_high_pass_filtering = false;
//NoiseSuppression
apmConfig.noise_suppression.enabled = true;
apmConfig.noise_suppression.level = webrtc::AudioProcessing::Config::NoiseSuppression::kVeryHigh;
apmConfig.noise_suppression.analyze_linear_aec_output_when_available = false;
//TransientSuppression
apmConfig.transient_suppression.enabled = false;
//VoiceDetection
apmConfig.voice_detection.enabled = true;
//GainController1
apmConfig.gain_controller1.enabled = false;
// kAdaptiveAnalog 自适应模拟模式
// kAdaptiveDigital 自适应数字增益模式
// kFixedDigital 固定数字增益模式
apmConfig.gain_controller1.mode = webrtc::AudioProcessing::Config::GainController1::kFixedDigital;
apmConfig.gain_controller1.target_level_dbfs = 6; // 目标音量
apmConfig.gain_controller1.compression_gain_db = 60; // 增益能力
apmConfig.gain_controller1.enable_limiter = true; // 压限器开关
apmConfig.gain_controller1.analog_level_minimum = 0;
apmConfig.gain_controller1.analog_level_maximum = 255;
apmConfig.gain_controller1.analog_gain_controller.enabled = false;
// apmConfig.gain_controller1.analog_gain_controller.startup_min_volume = webrtc::kAgcStartupMinVolume;
apmConfig.gain_controller1.analog_gain_controller.startup_min_volume = 0;
apmConfig.gain_controller1.analog_gain_controller.clipped_level_min = 0;
apmConfig.gain_controller1.analog_gain_controller.enable_agc2_level_estimator = false;
apmConfig.gain_controller1.analog_gain_controller.enable_digital_adaptive = false;
//GainController2
apmConfig.gain_controller2.enabled = true;
apmConfig.gain_controller2.fixed_digital.gain_db = 10.4f;
apmConfig.gain_controller2.adaptive_digital.enabled = true;
apmConfig.gain_controller2.adaptive_digital.vad_probability_attack = 1.f;
apmConfig.gain_controller2.adaptive_digital.level_estimator = webrtc::AudioProcessing::Config::GainController2::kRms;
apmConfig.gain_controller2.adaptive_digital.level_estimator_adjacent_speech_frames_threshold = 1;
apmConfig.gain_controller2.adaptive_digital.use_saturation_protector = true;
apmConfig.gain_controller2.adaptive_digital.initial_saturation_margin_db = 20.f;
apmConfig.gain_controller2.adaptive_digital.extra_saturation_margin_db = 2.f;
apmConfig.gain_controller2.adaptive_digital.gain_applier_adjacent_speech_frames_threshold = 1;
apmConfig.gain_controller2.adaptive_digital.max_gain_change_db_per_second = 3.f;
apmConfig.gain_controller2.adaptive_digital.max_output_noise_level_dbfs = -50.f;
//ResidualEchoDetector
apmConfig.residual_echo_detector.enabled = false;
//LevelEstimation
apmConfig.level_estimation.enabled = false;
return apmConfig;
}
struct RtmpConfig {
char url[1024];
AVFormatContext *formatCtx;
AVStream *stream;
AVCodecContext *codecCtx;
SwrContext *swrCtx;
};
struct CallContext {
RtmpConfig rtmp;
std::thread *rtmp_thread;
std::thread *alsa_thread;
std::mutex *mutex;
std::vector<audio_buf_t> *list;
webrtc::AudioProcessing *apm_ns;
webrtc::AudioProcessing *apm_agc;
webrtc::AudioProcessing *apm_aec;
webrtc::StreamConfig *rtc_stream_config;
alsa::AlsaDev *alsa;
alsa::Config *alsa_config;
LegacyAgc *agcModule;
// rnnoise
RnNoiseCommonPlugin *rnnoise;
bool rnnoise_enable;
float vadThreshold;// (0, 1)
uint32_t vadGracePeriodBlocks;// (0, 20)
uint32_t retroactiveVADGraceBlocks;// 0
//
bool running;
};
static int64_t t_analyze = 0;
static int64_t t_render = 0;
static int64_t t_capture = 0;
static int64_t t_process = 0;
static int who = -1;
int pushInit(RtmpConfig *config);
void pushDestory(RtmpConfig *config);
int pullInit(RtmpConfig *config);
void pullDestory(RtmpConfig *config);
void audio_thread(CallContext *pushCTx, CallContext *pullCtx);
void capture_thread(CallContext *ctx);
void push_thread(CallContext *ctx);
void play_thread(CallContext *ctx);
void pull_thread(CallContext *ctx);
std::thread *audio_thread_ptr = nullptr;
int main(int argc, char *argv[])
{
if (argc < 2) {
fprintf(stderr, "usage: %s who(0 or 1) \n", argv[0]);
return -1;
}
std::string push_a_url = "rtmp://192.168.15.248:1935/live/A";
std::string push_b_url = "rtmp://192.168.15.248:1935/live/B";
//初始化日志系统
Logger::Instance().add(std::make_shared<ConsoleChannel> ());
Logger::Instance().add(std::make_shared<FileChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
// webrtc初始化
webrtc::AudioProcessing *apm_ns = webrtc::AudioProcessingBuilder().Create();
webrtc::AudioProcessing *apm_agc = webrtc::AudioProcessingBuilder().Create();
webrtc::AudioProcessing *apm_aec = webrtc::AudioProcessingBuilder().Create();
webrtc::AudioProcessing::Config config;
// ns
config = webrtcConfigInit();
config.echo_canceller.enabled = false;
config.gain_controller2.enabled = false;
apm_ns->ApplyConfig(config);
apm_ns->Initialize();
// agc
config = webrtcConfigInit();
config.noise_suppression.enabled = false;
config.echo_canceller.enabled = false;
apm_agc->ApplyConfig(config);
apm_agc->Initialize();
// aec
config = webrtcConfigInit();
config.noise_suppression.enabled = true;
config.gain_controller2.enabled = false;
apm_aec->ApplyConfig(config);
apm_aec->Initialize();
apm_aec->set_stream_analog_level(200);
webrtc::StreamConfig streamConfig;
streamConfig.set_has_keyboard(false);
streamConfig.set_num_channels(MIX_AUDIO_CHANNELS);
streamConfig.set_sample_rate_hz(MIX_OUTPUT_RATE);
long vol = 89;
// apm->set_stream_analog_level(vol);
alsa_volume(&vol, true);
PrintI("webrtc params: {\n%s\n}\n", config.ToString().c_str());
// agc module
LegacyAgc *agcInst = (LegacyAgc *)agc_init(MIX_AUDIO_RATE, kAgcModeAdaptiveDigital,
9,
3);
// rnnoise
float vad_threshold = 0.92;
// alsa设备参数
alsa::Config alsaConfig;
sprintf(alsaConfig.device, "default");
alsaConfig.period_time = MIX_AUDIO_SAMPLES * 1000000 / MIX_AUDIO_RATE;
alsaConfig.buffer_time = 5 * alsaConfig.period_time;
alsaConfig.channels = MIX_AUDIO_CHANNELS;
alsaConfig.format = SND_PCM_FORMAT_S16_LE;
alsaConfig.rate = MIX_AUDIO_RATE;
// 上下文
CallContext pushCtx, pullCtx;
memset(&pushCtx, 0, sizeof(pushCtx));
memset(&pullCtx, 0, sizeof(pullCtx));
who = atoi(argv[1]);
if (0 == who) {
// a --> b
strcpy(pushCtx.rtmp.url, push_a_url.data());
strcpy(pullCtx.rtmp.url, push_b_url.data());
} else {
// b --> a
strcpy(pushCtx.rtmp.url, push_b_url.data());
strcpy(pullCtx.rtmp.url, push_a_url.data());
}
pushCtx.mutex = new std::mutex;
pushCtx.list = new std::vector<audio_buf_t>();
pushCtx.apm_ns = apm_ns;
pushCtx.apm_agc = apm_agc;
pushCtx.apm_aec = apm_aec;
pushCtx.rtc_stream_config = &streamConfig;
pushCtx.alsa_config = &alsaConfig;
pushCtx.vadThreshold = vad_threshold;
pushCtx.vadGracePeriodBlocks = 0;
pushCtx.retroactiveVADGraceBlocks = 0;
pullCtx.mutex = new std::mutex;
pullCtx.list = new std::vector<audio_buf_t>();
pullCtx.apm_ns = apm_ns;
pullCtx.apm_agc = apm_agc;
pullCtx.apm_aec = apm_aec;
pullCtx.rtc_stream_config = &streamConfig;
pullCtx.alsa_config = &alsaConfig;
pullCtx.vadThreshold = vad_threshold;
pullCtx.vadGracePeriodBlocks = 0;
pullCtx.retroactiveVADGraceBlocks = 0;
pullCtx.agcModule = agcInst;
pushCtx.agcModule = agcInst;
InfoL << "call start, caller: " << (who == 0 ? "A" : "B");
/*
char c;
bool quit = false;
while ((c = getchar()) != EOF && !quit)
{
switch (c)
{
case 'q': {
InfoL << "app quit";
quit = true;
pushCtx.running = false;
pullCtx.running = false;
if (audio_thread_ptr && audio_thread_ptr->joinable())
audio_thread_ptr->join();
if (pushCtx.rtmp_thread && pushCtx.rtmp_thread->joinable())
pushCtx.rtmp_thread->join();
pullCtx.running = false;
if (pullCtx.rtmp_thread && pullCtx.rtmp_thread->joinable())
pullCtx.rtmp_thread->join();
break;
}
case 's': {
InfoL << "start push: " << pushCtx.rtmp.url;
pushCtx.running = true;
audio_thread_ptr = new std::thread(audio_thread, &pushCtx, &pullCtx);
// pushCtx.alsa_thread = new std::thread(capture_thread, &pushCtx);
pushCtx.rtmp_thread = new std::thread(push_thread, &pushCtx);
break;
}
case 'r': {
InfoL << "start pull: " << pullCtx.rtmp.url;
pullCtx.running = true;
// pullCtx.alsa_thread = new std::thread(play_thread, &pullCtx);
pullCtx.rtmp_thread = new std::thread(pull_thread, &pullCtx);
break;
}
}
}
*/
std::string input_str;
while (getline(std::cin, input_str)) {
if (input_str == std::string("quit") ||
input_str == std::string("q")) {
InfoL << "app quit";
// quit = true;
pushCtx.running = false;
pullCtx.running = false;
if (audio_thread_ptr && audio_thread_ptr->joinable())
audio_thread_ptr->join();
if (pushCtx.rtmp_thread && pushCtx.rtmp_thread->joinable())
pushCtx.rtmp_thread->join();
pullCtx.running = false;
if (pullCtx.rtmp_thread && pullCtx.rtmp_thread->joinable())
pullCtx.rtmp_thread->join();
break;
}
else if (input_str == std::string("s")) {
InfoL << "start push: " << pushCtx.rtmp.url;
pushCtx.running = true;
audio_thread_ptr = new std::thread(audio_thread, &pushCtx, &pullCtx);
// pushCtx.alsa_thread = new std::thread(capture_thread, &pushCtx);
pushCtx.rtmp_thread = new std::thread(push_thread, &pushCtx);
}
else if (input_str == std::string("r")) {
InfoL << "start pull: " << pullCtx.rtmp.url;
pullCtx.running = true;
// pullCtx.alsa_thread = new std::thread(play_thread, &pullCtx);
pullCtx.rtmp_thread = new std::thread(pull_thread, &pullCtx);
}
else if (input_str.find("-e") == 0) {
std::string enable_str = input_str.substr(2, input_str.size() - 2);
trim(enable_str);
pushCtx.rnnoise_enable = atoi(enable_str.c_str()) > 0 ? true : false;
InfoL << "rnnoise enable: " << pushCtx.rnnoise_enable;
}
else if (input_str.find("-t") == 0) {
std::string th_str = input_str.substr(2, input_str.size() - 2);
trim(th_str);
int threshold = atoi(th_str.c_str());
pushCtx.vadThreshold = std::max(std::min(threshold / 100.f, 0.99f), 0.f);
InfoL << "VAD Threshold(%): " << pushCtx.vadThreshold;
}
else if (input_str.find("-p") == 0) {
std::string period_str = input_str.substr(2, input_str.size() - 2);
trim(period_str);
int period = atoi(period_str.c_str());
pushCtx.vadGracePeriodBlocks = std::max(std::min(period, 20), 0);
InfoL << "VAD Grace Period (ms): " << pushCtx.vadGracePeriodBlocks;
}
}
InfoL << "call end";
if (apm_ns) {
apm_ns->Initialize();
delete apm_ns;
}
if (apm_agc) {
apm_agc->Initialize();
delete apm_agc;
}
if (apm_aec) {
apm_aec->Initialize();
delete apm_aec;
}
for (auto buf: *pushCtx.list) {
free(buf.data);
}
pushCtx.list->clear();
delete pushCtx.list;
for (auto buf: *pullCtx.list) {
free(buf.data);
}
pullCtx.list->clear();
delete pullCtx.list;
if (pushCtx.rtmp_thread) delete pushCtx.rtmp_thread;
if (pushCtx.alsa_thread) delete pushCtx.alsa_thread;
delete pushCtx.mutex;
if (pullCtx.rtmp_thread) delete pullCtx.rtmp_thread;
if (pullCtx.alsa_thread) delete pullCtx.alsa_thread;
delete pullCtx.mutex;
delete audio_thread_ptr;
return 0;
}
int pushInit(RtmpConfig *config)
{
if (nullptr == strstr(config->url, "rtmp://")) {
PrintE("url error, url: %s\n", config->url);
return -1;
}
AVCodec *codec = nullptr;
AVCodecContext *codecCtx = nullptr;
AVFormatContext *afctx = nullptr;
AVCodecParameters *codecPar = nullptr;
SwrContext *swrCtx = nullptr;
AVStream *audio_st = nullptr;
AVDictionary *opts = nullptr;
int ret;
// 打开输出流
ret = avformat_alloc_output_context2(&afctx, nullptr, "flv", config->url);
if (ret < 0) {
PrintE("open output failed.\n");
goto fail;
}
if ( !(afctx->oformat->flags & AVFMT_NOFILE) ) {
ret = avio_open(&afctx->pb, config->url, AVIO_FLAG_WRITE);
if (ret < 0) {
PrintE("avio_open failed.\n");
goto fail;
}
}
// 创建音频流
audio_st = avformat_new_stream(afctx, codec);
if (!audio_st) {
PrintE("alloc new audio stream failed.\n");
goto fail;
}
// 设置编码参数
codecPar = afctx->streams[audio_st->index]->codecpar;
codecPar->codec_id = AV_CODEC_ID_AAC;
codecPar->codec_type = AVMEDIA_TYPE_AUDIO;
codecPar->codec_tag = 0;
codecPar->bit_rate = 128 * 1024;
codecPar->sample_rate = 44100;
codecPar->channel_layout = av_get_default_channel_layout(MIX_AUDIO_CHANNELS);
codecPar->channels = av_get_channel_layout_nb_channels(codecPar->channel_layout);
codecPar->format = AV_SAMPLE_FMT_FLTP;
// 编码器初始化
codec = avcodec_find_encoder(codecPar->codec_id);
if (!codec) {
PrintE("find codec aac failed.\n");
return -1;
}
codecCtx = avcodec_alloc_context3(codec);
if (!codecCtx) {
PrintE("alloc codec context failed.\n");
goto fail;
}
ret = avcodec_parameters_to_context(codecCtx, codecPar);
if (ret < 0) {
PrintE("copt codec params failed.\n");
goto fail;
}
// 禁用缓冲
av_dict_set(&opts, "fflags", "nobuffer", AV_DICT_MATCH_CASE);
// av_dict_set(&opts, "rtmp_live", "1", AV_DICT_MATCH_CASE);
// 打开编码器
ret = avcodec_open2(codecCtx, codec, &opts);
if (ret < 0) {
PrintE("open codec {} failed.\n", codec->id);
goto fail;
}
audio_st->codecpar->codec_tag = 0;
// 释放字典资源
av_dict_free(&opts);
// 打印输出流信息
av_dump_format(afctx, 0, config->url, 1);
// 重采样初始化
swrCtx = swr_alloc_set_opts(nullptr,
// output
codecCtx->channel_layout,
codecCtx->sample_fmt,
codecCtx->sample_rate,
// input
av_get_default_channel_layout(MIX_AUDIO_CHANNELS),
AV_SAMPLE_FMT_S16,
MIX_OUTPUT_RATE,
0, nullptr);
if (!swrCtx) {
PrintE("swr_alloc_set_opts failed.\n");
goto fail;
}
swr_init(swrCtx);
config->codecCtx = codecCtx;
config->formatCtx = afctx;
config->stream = audio_st;
config->swrCtx = swrCtx;
PrintI("rtmp push init ok.\n");
return 0;
fail:
if (afctx) {
if (afctx->pb)
avio_close(afctx->pb);
avformat_free_context(afctx);
}
if (codecCtx) {
avcodec_close(codecCtx);
avcodec_free_context(&codecCtx);
}
if (swrCtx) {
swr_close(swrCtx);
swr_free(&swrCtx);
}
return -1;
}
void pushDestory(RtmpConfig *config) {
if (config->formatCtx) {
if (config->formatCtx->pb)
avio_close(config->formatCtx->pb);
avformat_free_context(config->formatCtx);
}
if (config->codecCtx) {
avcodec_close(config->codecCtx);
avcodec_free_context(&config->codecCtx);
}
if (config->swrCtx) {
swr_close(config->swrCtx);
swr_free(&config->swrCtx);
}
}
int pullInit(RtmpConfig *config)
{
if (nullptr == strstr(config->url, "rtmp://")) {
LogE("url error, url: %s\n", config->url);
return -1;
}
int ret = 0;
int scan_all_pmts_set = 0;
int st_index = -1;
AVDictionary *format_opts = nullptr;
AVFormatContext *ic = nullptr;
AVCodecParameters *codecPar = nullptr;
AVCodec *codec = nullptr;
AVCodecContext *codecCtx = nullptr;
SwrContext *swrCtx = nullptr;
ic = avformat_alloc_context();
if (!ic) {
throw(std::runtime_error("avformat_alloc_context failed."));
}
if (!av_dict_get(format_opts, "scan_all_pmts", NULL, AV_DICT_MATCH_CASE)) {
av_dict_set(&format_opts, "scan_all_pmts", "1", AV_DICT_DONT_OVERWRITE);
scan_all_pmts_set = 1;
}
// 禁用缓冲
av_dict_set(&format_opts, "fflags", "nobuffer", AV_DICT_MATCH_CASE);
// 设置媒体流分析最大字节数
av_dict_set(&format_opts, "probesize", "10000", AV_DICT_MATCH_CASE);
retry:
// 打开输入流
ret = avformat_open_input(&ic, config->url, nullptr, &format_opts);
if (ret < 0) {
LogE("avformat_open_input failed.\n");
goto fail;
}
if (scan_all_pmts_set)
av_dict_set(&format_opts, "scan_all_pmts", nullptr, AV_DICT_MATCH_CASE);
av_format_inject_global_side_data(ic);
ret = avformat_find_stream_info(ic, nullptr);
if (ret < 0) {
// LOG(ERROR) << url << ": could not find codec parameters";
LogE("{} : could not find codec parameters\n", config->url);
goto fail;
}
if (ic->pb)
ic->pb->eof_reached = 0;
// 打印输入流参数
av_dump_format(ic, 0, config->url, 0);
st_index = av_find_best_stream(ic, AVMEDIA_TYPE_AUDIO, -1, -1, nullptr, 0);
if (st_index >= 0) {
//
config->stream = ic->streams[st_index];
}
else {
LogW("find audio stream failed, try again.\n");
avformat_close_input(&ic);
goto retry;
}
// 初始化解码器
codecPar = config->stream->codecpar;
codec = avcodec_find_decoder(codecPar->codec_id);
if (!codec) {
LogE("find codec failed.\n");
goto fail;
}
codecCtx = avcodec_alloc_context3(codec);
if (!codecCtx) {
LogE("avcodec_alloc_context3 failed.\n");
goto fail;
}
ret = avcodec_parameters_to_context(codecCtx, codecPar);
if (ret < 0) {
LogE("avcodec_parameters_to_context\n");
goto fail;
}
codecCtx->time_base = config->stream->time_base;
// 打开解码器
if (avcodec_open2(codecCtx, codec, nullptr) < 0){
LogE("avcodec_open2 failed\n");
goto fail;
}
// 重采样初始化
swrCtx = swr_alloc_set_opts(nullptr,
// output
av_get_default_channel_layout(MIX_AUDIO_CHANNELS),
AV_SAMPLE_FMT_S16,
MIX_OUTPUT_RATE,
// input
codecCtx->channel_layout,
codecCtx->sample_fmt,
codecCtx->sample_rate,
0, nullptr);
if (!swrCtx) {
LogE("swr_alloc_set_opts failed.\n");
goto fail;
}
swr_init(swrCtx);
config->formatCtx = ic;
config->codecCtx = codecCtx;
config->swrCtx = swrCtx;
config->stream->discard = AVDISCARD_DEFAULT;
av_dict_free(&format_opts);
return 0;
fail:
if (format_opts)
av_dict_free(&format_opts);
if (ic)
avformat_close_input(&ic);
if (codecCtx) {
avcodec_close(codecCtx);
avcodec_free_context(&codecCtx);
}
if (swrCtx) {
swr_close(swrCtx);
swr_free(&swrCtx);
}
return -1;
}
void pullDestory(RtmpConfig *config)
{
if (config->formatCtx) {
avformat_close_input(&config->formatCtx);
avformat_free_context(config->formatCtx);
}
if (config->codecCtx) {
avcodec_close(config->codecCtx);
avcodec_free_context(&config->codecCtx);
}
if (config->swrCtx) {
swr_close(config->swrCtx);
swr_free(&config->swrCtx);
}
}
void audio_thread(CallContext *pushCTx, CallContext *pullCtx)
{
pthread_setname_np(audio_thread_ptr->native_handle(), "audio_thread");
// 声卡初始化
alsa::AlsaDev usbCaptureDev;
if (usbCaptureDev.applyConfig(*pushCTx->alsa_config) < 0) {
PrintE("alsa config failed.\n");
return ;
}
if (usbCaptureDev.init(SND_PCM_STREAM_CAPTURE) < 0) {
PrintE("alsa init failed.\n");
return ;
}
PrintI("alsa init: %s\n", usbCaptureDev.configToString());
pushCTx->alsa = &usbCaptureDev;
uint8_t *capData = nullptr;
int buffer_size = usbCaptureDev.getFrames() * usbCaptureDev.getFrameSize();
capData = (uint8_t *)malloc(buffer_size);
assert(capData);
alsa::AlsaDev usbPlaybackDev;
if (usbPlaybackDev.applyConfig(*pullCtx->alsa_config) < 0) {
PrintE("alsa config failed.\n");
return ;
}
// PrintI("alsa before init: %s\n", usbPlaybackDev.configToString());
if (usbPlaybackDev.init(SND_PCM_STREAM_PLAYBACK) < 0) {
PrintE("alsa init failed.\n");
return ;
}
PrintI("alsa init: %s\n", usbPlaybackDev.configToString());
pullCtx->alsa = &usbPlaybackDev;
int sampleSize = 0;
int outSize = MIX_OUTPUT_SAMPLES * MIX_AUDIO_CHANNELS * sizeof(int16_t);
uint8_t *outBuffer = (uint8_t *)calloc(outSize, sizeof(uint8_t));
// rnnoise
pushCTx->rnnoise = new RnNoiseCommonPlugin(MIX_AUDIO_CHANNELS);
pushCTx->rnnoise->init();
pushCTx->rnnoise_enable = false;
pushCTx->vadThreshold = 0.92;
std::vector<float *> in;
std::vector<float *> out;
for (int ch = 0; ch < MIX_AUDIO_CHANNELS; ++ch) {
in.push_back(new float[10 * MIX_OUTPUT_RATE / 1000]);
out.push_back(new float[10 * MIX_OUTPUT_RATE / 1000]);
}
// 重采样
AVFrame *inputFrame = av_frame_alloc();
{
inputFrame->format = AV_SAMPLE_FMT_S16;
inputFrame->channels = MIX_AUDIO_CHANNELS;
inputFrame->channel_layout = av_get_default_channel_layout(MIX_AUDIO_CHANNELS);
inputFrame->sample_rate = MIX_AUDIO_RATE;
inputFrame->nb_samples = MIX_AUDIO_SAMPLES;
int size = av_samples_get_buffer_size(nullptr,
inputFrame->channels, inputFrame->nb_samples, (AVSampleFormat)inputFrame->format, 1);
uint8_t *buffer = (uint8_t *)av_malloc(size);
avcodec_fill_audio_frame(inputFrame, inputFrame->channels, (AVSampleFormat)inputFrame->format,
(const uint8_t*)buffer, size, 1);
InfoL << "input frame samples: " << inputFrame->nb_samples
<< ", buffer_size: " << size;
}
AVFrame *outputFrame = av_frame_alloc();
{
outputFrame->format = AV_SAMPLE_FMT_S16;
outputFrame->channels = MIX_AUDIO_CHANNELS;
outputFrame->channel_layout = av_get_default_channel_layout(MIX_AUDIO_CHANNELS);
outputFrame->sample_rate = MIX_OUTPUT_RATE;
outputFrame->nb_samples = MIX_OUTPUT_SAMPLES;
int output_bz = av_samples_get_buffer_size(NULL, outputFrame->channels, outputFrame->nb_samples, (AVSampleFormat)outputFrame->format, 0);
uint8_t *samples_data = (uint8_t *)av_malloc(output_bz);
avcodec_fill_audio_frame(outputFrame, outputFrame->channels, (AVSampleFormat)outputFrame->format, samples_data, output_bz, 0);
InfoL << "output frame samples: " << outputFrame->nb_samples
<< ", buffer_size: " << output_bz;
}
SwrContext *swrCtx = swr_alloc_set_opts(nullptr,
outputFrame->channel_layout,
(AVSampleFormat)outputFrame->format,
outputFrame->sample_rate,
inputFrame->channel_layout,
(AVSampleFormat)inputFrame->format,
inputFrame->sample_rate,
0, nullptr);
if (!swrCtx) {
PrintE("swr_alloc_set_opts failed.\n");
return ;
}
swr_init(swrCtx);
int output_size = av_samples_get_buffer_size(NULL, outputFrame->channels, outputFrame->nb_samples, (AVSampleFormat)outputFrame->format, 0);
uint8_t *output_buf = outputFrame->data[0];
// 播放重采样
SwrContext *playSwr = swr_alloc_set_opts(nullptr,
av_get_default_channel_layout(MIX_AUDIO_CHANNELS),
AV_SAMPLE_FMT_S16,
MIX_AUDIO_RATE,
av_get_default_channel_layout(MIX_AUDIO_CHANNELS),
AV_SAMPLE_FMT_S16,
MIX_OUTPUT_RATE,
0, nullptr);
swr_init(playSwr);
int play_size = av_samples_get_buffer_size(nullptr, MIX_AUDIO_CHANNELS, MIX_AUDIO_SAMPLES, AV_SAMPLE_FMT_S16, 0);
uint8_t* play_buffer = (uint8_t *)av_malloc(play_size);
// FILE *cap_fp = fopen(who == 0 ? "/root/aec_cap_a.pcm" : "/root/aec_cap_b.pcm", "wb");
// FILE *play_fp = fopen(who == 0 ? "/root/aec_play_a.pcm" : "/root/aec_play_b.pcm", "wb");
// FILE *agc_fp = fopen(who == 0 ? "/root/agc_a.pcm" : "/root/agc_b.pcm", "wb");
// FILE *ns_before = fopen("/root/ns_before.pcm", "wb");
// FILE *ns_after = fopen("/root/ns_after.pcm", "wb");
long volume = 0;
alsa_volume(&volume, false);
int64_t spk_time_out = 1000000;
int64_t curTime = 0;
speaker_enable(false);
FILE *agc_before_fp = fopen("/root/agc_before.pcm", "wb");
FILE *agc_after_fp = fopen("/root/agc_after.pcm", "wb");
while (pushCTx->running || pullCtx->running)
{
// 获取 MIX_INPUT_SAMPLES 长度的解码音频填充到outBuffer中
if (sampleSize <= 0) sampleSize = outSize;
while (sampleSize > 0)
{
if (!pullCtx->running) break;
if (pullCtx->list->size() <= 0) {
break;
}
std::unique_lock<std::mutex> lck(*pullCtx->mutex);
auto data = pullCtx->list->begin();
int readSize = sampleSize < (data->size - data->index) ? sampleSize : (data->size - data->index);
memcpy(outBuffer + outSize - sampleSize, data->data + data->index, readSize);
sampleSize -= readSize;
data->index += readSize;
if (data->index >= data->size) {
free(data->data);
pullCtx->list->erase(pullCtx->list->begin());
}
}
if (sampleSize <= 0) {
if (!curTime) curTime = gettimeofday();
if (spk_time_out > 0 && gettimeofday() - curTime > spk_time_out) {
speaker_enable(true);
spk_time_out = 0;
InfoL << "------------------------------- enable speaker";
}
// 音频处理
{
// ctx->apm->ProcessStream((int16_t *)outBuffer, *ctx->rtc_stream_config, *ctx->rtc_stream_config, (int16_t *)outBuffer);
t_analyze = gettimeofday();
// pullCtx->apm_agc->ProcessReverseStream((int16_t *)outBuffer, *pullCtx->rtc_stream_config, *pullCtx->rtc_stream_config, (int16_t *)outBuffer);
// pullCtx->apm_ns->ProcessReverseStream((int16_t *)outBuffer, *pullCtx->rtc_stream_config, *pullCtx->rtc_stream_config, (int16_t *)outBuffer);
pullCtx->apm_aec->ProcessReverseStream((int16_t *)outBuffer, *pullCtx->rtc_stream_config, *pullCtx->rtc_stream_config, (int16_t *)outBuffer);
}
// stream delay
{
int64_t delay = t_render - t_analyze + t_process - t_capture;
// pullCtx->apm_agc->set_stream_delay_ms(delay / 1000);
// pullCtx->apm_ns->set_stream_delay_ms(delay / 1000);
pullCtx->apm_aec->set_stream_delay_ms(delay / 1000);
InfoL << "set_stream_delay_ms: " << pullCtx->apm_aec->stream_delay_ms();
}
// rnnoise降噪
if (pushCTx->rnnoise_enable) {
int16_t *ptr = (int16_t *)outBuffer;
int nb_samples = outputFrame->nb_samples;
for(int i = 0; i < nb_samples; ++i) {
for (int ch = 0; ch < MIX_AUDIO_CHANNELS; ++ch) {
in[ch][i] = ptr[i * MIX_AUDIO_CHANNELS + ch];
}
}
const float *input[] = {in[0]};
float *output[] = {out[0]};
for (int i = 0; i < nb_samples; ++i)
for (int ch = 0; ch < MIX_AUDIO_CHANNELS; ++ch)
output[ch][i] = input[ch][i];
pushCTx->rnnoise->process(input, output, nb_samples,
pushCTx->vadThreshold, pushCTx->vadGracePeriodBlocks, pushCTx->retroactiveVADGraceBlocks);
// for (int ch = 0; ch < MIX_AUDIO_CHANNELS; ++ch)
// rnnoise_process_frame(rnn, output[ch], input[ch]);
// PrintI("rnnoise process: vadThreshold=%lf, vadGracePeriodBlocks=%d, retroactiveVADGraceBlocks=%d\n",
// ctx->vadThreshold, ctx->vadGracePeriodBlocks, ctx->retroactiveVADGraceBlocks);
for (int i = 0; i < nb_samples; ++i)
for (int ch = 0; ch < MIX_AUDIO_CHANNELS; ++ch)
ptr[i * MIX_AUDIO_CHANNELS + ch] = output[ch][i];
}
// 重采样
{
const uint8_t* in[1] = {outBuffer};
uint8_t *out[1] = {play_buffer};
int len = swr_convert(playSwr, out, MIX_AUDIO_SAMPLES, in, MIX_OUTPUT_SAMPLES);
int total = len;
while (len > 0) {
len = swr_convert(playSwr, out, MIX_AUDIO_SAMPLES, nullptr, 0);
total += len;
}
if (total != MIX_AUDIO_SAMPLES)
WarnL << "play swr convert size: " << total << " != " << MIX_AUDIO_SAMPLES;
}
t_render = gettimeofday();
usbPlaybackDev.write(play_buffer, play_size);
}
// 采集
t_capture = gettimeofday();
size_t read_size = usbCaptureDev.read(capData, buffer_size);
// PrintI("alsa read %d\n", read_size);
if (read_size <= 0) {
msleep(1);
continue;
}
//重采样
memcpy(inputFrame->data[0], capData, buffer_size);
{
const uint8_t** in = (const uint8_t**)inputFrame->data;
uint8_t **out = outputFrame->data;
int len2, out_data_size;
len2 = swr_convert(swrCtx, out, outputFrame->nb_samples, in, inputFrame->nb_samples);
if (len2 < 0) {
printf("swr_convert failed. \n");
break;
}
int out_size = len2;
while (len2 > 0) {
len2 = swr_convert(swrCtx, out, outputFrame->nb_samples, nullptr, 0);
out_size += len2;
}
}
// 增益处理
{
pushCTx->apm_agc->ProcessStream((int16_t *)output_buf, *pushCTx->rtc_stream_config, *pushCTx->rtc_stream_config, (int16_t *)output_buf);
}
// 降噪处理
{
// pushCTx->apm_ns->ProcessStream((int16_t *)output_buf, *pushCTx->rtc_stream_config, *pushCTx->rtc_stream_config, (int16_t *)output_buf);
}
#if 0
if (pullCtx->apm->recommended_stream_analog_level() != volume)
alsa_volume(&volume, true);
pushCTx->apm->set_stream_analog_level(volume);
if (pushCTx->apm->GetStatistics().voice_detected) {
alsa::vol_scaler_run((int16_t *)capData, buffer_size/2, 100);
InfoL << "---- voice_detected";
} else {
alsa::vol_scaler_run((int16_t *)capData, buffer_size/2, 1);
}
#endif
#if 0
{
alsa::vol_scaler_run((int16_t *)capData, buffer_size/2, 100);
uint8_t agcBuf[buffer_size];
int samples = 160;
int times = usbCaptureDev.getFrames() * 2 / samples;
int16_t *in = (int16_t *)capData;
int16_t *out = (int16_t *)agcBuf;
for (int i = 0; i < times; ++i) {
agc_process(pushCTx->agcModule, in + i*samples, out + i*samples, MIX_AUDIO_RATE, samples);
}
memcpy(capData, agcBuf, buffer_size);
}
#endif
// fwrite(output_buf, 1, output_size, agc_before_fp);
// 回声消除
{
t_process = gettimeofday();
// pushCTx->apm_agc->ProcessStream((int16_t *)capData, *pushCTx->rtc_stream_config, *pushCTx->rtc_stream_config, (int16_t *)capData);
// alsa::vol_scaler_run((int16_t *)capData, buffer_size/2, 100);
// pushCTx->apm_agc->ProcessStream((int16_t *)capData, *pushCTx->rtc_stream_config, *pushCTx->rtc_stream_config, (int16_t *)capData);
// pushCTx->apm_ns->ProcessStream((int16_t *)capData, *pushCTx->rtc_stream_config, *pushCTx->rtc_stream_config, (int16_t *)capData);
pushCTx->apm_aec->ProcessStream((int16_t *)output_buf, *pushCTx->rtc_stream_config, *pushCTx->rtc_stream_config, (int16_t *)output_buf);
// pushCTx->apm_ns->ProcessStream((int16_t *)capData, *pushCTx->rtc_stream_config, *pushCTx->rtc_stream_config, (int16_t *)capData);
// alsa::vol_scaler_run((int16_t *)capData, buffer_size/2, 100);
}
// fwrite(output_buf, 1, output_size, agc_after_fp);
// cap buffer
{
uint8_t *buffer = (uint8_t *)malloc(output_size);
memcpy(buffer, output_buf, output_size);
std::unique_lock<std::mutex> lck(*pushCTx->mutex);
audio_buf_t out;
out.data = buffer;
out.index = 0;
out.size = output_size;
pushCTx->list->emplace_back(out);
}
// volume = pullCtx->apm->recommended_stream_analog_level();
}
usbPlaybackDev.destory();
free(outBuffer);
pullCtx->alsa = nullptr;
usbCaptureDev.destory();
if (capData) free(capData);
pushCTx->alsa = nullptr;
}
void capture_thread(CallContext *ctx)
{
// 声卡初始化
alsa::AlsaDev usbCaptureDev;
if (usbCaptureDev.applyConfig(*ctx->alsa_config) < 0) {
PrintE("alsa config failed.\n");
return ;
}
if (usbCaptureDev.init(SND_PCM_STREAM_CAPTURE) < 0) {
PrintE("alsa init failed.\n");
return ;
}
PrintI("alsa init: %s\n", usbCaptureDev.configToString());
ctx->alsa = &usbCaptureDev;
uint8_t *capData = nullptr;
int buffer_size = usbCaptureDev.getFrames() * usbCaptureDev.getFrameSize();
capData = (uint8_t *)malloc(buffer_size);
assert(capData);
while (ctx->running) {
// 采集
t_capture = gettimeofday();
size_t read_size = usbCaptureDev.read(capData, buffer_size);
// PrintI("alsa read %d\n", read_size);
if (read_size <= 0) {
msleep(1);
continue;
}
// 降噪
{
std::unique_lock<std::mutex> lck(*ctx->mutex);
t_process = gettimeofday();
ctx->apm_aec->ProcessStream((int16_t *)capData, *ctx->rtc_stream_config, *ctx->rtc_stream_config, (int16_t *)capData);
}
{
uint8_t *buffer = (uint8_t *)malloc(buffer_size);
memcpy(buffer, capData, buffer_size);
std::unique_lock<std::mutex> lck(*ctx->mutex);
audio_buf_t out;
out.data = buffer;
out.index = 0;
out.size = buffer_size;
ctx->list->emplace_back(out);
}
}
usbCaptureDev.destory();
if (capData) free(capData);
ctx->alsa = nullptr;
}
void push_thread(CallContext *ctx)
{
pthread_setname_np(ctx->rtmp_thread->native_handle(), "push_thread");
RtmpConfig rtmp;
if (pushInit(&ctx->rtmp) < 0) {
return ;
}
memcpy(&rtmp, &ctx->rtmp, sizeof(rtmp));
AVRational av;
int64_t pts = 0;
AVPacket *pkt = av_packet_alloc();
int ret;
av.den = rtmp.codecCtx->sample_rate;
av.num = 1;
AVFrame *inputFrame = av_frame_alloc();
{
inputFrame->sample_rate = MIX_OUTPUT_RATE;
inputFrame->format = AV_SAMPLE_FMT_S16;
inputFrame->channels = MIX_AUDIO_CHANNELS;
inputFrame->nb_samples = 1024 * MIX_OUTPUT_RATE / 44100;
inputFrame->channel_layout = av_get_default_channel_layout(MIX_AUDIO_CHANNELS);
int size = av_samples_get_buffer_size(nullptr,
inputFrame->channels, inputFrame->nb_samples, (AVSampleFormat)inputFrame->format, 1);
uint8_t *buffer = (uint8_t *)av_malloc(size);
avcodec_fill_audio_frame(inputFrame, inputFrame->channels, (AVSampleFormat)inputFrame->format,
(const uint8_t*)buffer, size, 1);
}
AVFrame *outputFrame = av_frame_alloc();
{
outputFrame->format = rtmp.codecCtx->sample_fmt;
outputFrame->channel_layout = rtmp.codecCtx->channel_layout;
outputFrame->sample_rate = rtmp.codecCtx->sample_rate;
outputFrame->nb_samples = rtmp.codecCtx->frame_size;
outputFrame->channels = rtmp.codecCtx->channels;
int output_bz = av_samples_get_buffer_size(NULL, outputFrame->channels, outputFrame->nb_samples, (AVSampleFormat)outputFrame->format, 0);
uint8_t *samples_data = (uint8_t *)av_malloc(output_bz);
avcodec_fill_audio_frame(outputFrame, outputFrame->channels, (AVSampleFormat)outputFrame->format, samples_data, output_bz, 0);
}
// 写入帧头
ret = avformat_write_header(rtmp.formatCtx, nullptr);
if (ret < 0) {
PrintE("avformat_write_header failed.\n");
return ;
}
int frames = 0;
while (ctx->running) {
if (frames <= 0) frames = inputFrame->nb_samples;
while (frames > 0 && ctx->list->size() > 0)
{
std::unique_lock<std::mutex> lck(*ctx->mutex);
auto nsData = ctx->list->begin();
int needSize = frames * sizeof(int16_t) * inputFrame->channels;
int readSize = (nsData->size - nsData->index) >= needSize ? needSize : (nsData->size - nsData->index);
memcpy(inputFrame->data[0] + (inputFrame->nb_samples - frames)*sizeof(int16_t)*inputFrame->channels, nsData->data + nsData->index, readSize);
frames -= readSize/(sizeof(int16_t) * inputFrame->channels);
nsData->index += readSize;
if (nsData->index >= nsData->size) {
free(nsData->data);
ctx->list->erase(ctx->list->begin());
}
}
if (frames > 0) {
msleep(20);
continue;
}
// 重采样
{
const uint8_t** in = (const uint8_t**)inputFrame->data;
uint8_t **out = outputFrame->data;
int len2, out_data_size;
len2 = swr_convert(rtmp.swrCtx, out, outputFrame->nb_samples, in, inputFrame->nb_samples);
if (len2 < 0) {
printf("swr_convert failed. \n");
break;
}
while (len2 > 0)
len2 = swr_convert(rtmp.swrCtx, out, outputFrame->nb_samples, nullptr, 0);
// out_data_size = len2 * rtmp.codecCtx->channels * av_get_bytes_per_sample(rtmp.codecCtx->sample_fmt);
// if (ns_fp) fwrite(outputFrame->data[0], 1, out_data_size, ns_fp);
}
// 推流到远端
if (pts > INT64_MAX) pts = 0;
outputFrame->pts = pts;
pts += av_rescale_q(outputFrame->nb_samples, av, rtmp.codecCtx->time_base);
ret = avcodec_send_frame(rtmp.codecCtx, outputFrame);
if (ret < 0) {
PrintE("avcodec_send_frame failed: %d\n", ret);
break;
}
while (ret >= 0) {
ret = avcodec_receive_packet(rtmp.codecCtx, pkt);
if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
break;
} else if (ret < 0) {
fprintf(stderr, "Error during encoding\n");
break;
}
// 将数据包时间戳从编码器时间基转换到流时间基
pkt->stream_index = rtmp.stream->index;
av_packet_rescale_ts(pkt, rtmp.codecCtx->time_base, rtmp.stream->time_base);
pkt->duration = av_rescale_q(pkt->duration, rtmp.codecCtx->time_base, rtmp.stream->time_base);
// 写入数据包到输出媒体文件
ret = av_interleaved_write_frame(rtmp.formatCtx, pkt);
if (ret < 0) {
fprintf(stderr, "Error while writing audio frame\n");
break;
}
// 释放数据包
av_packet_unref(pkt);
}
}
if (ctx->running)
ctx->running = false;
// 写入帧尾
av_write_trailer(rtmp.formatCtx);
// 释放线程资源
av_packet_free(&pkt);
av_frame_free(&inputFrame);
av_frame_free(&outputFrame);
pushDestory(&ctx->rtmp);
memset(&rtmp, 0, sizeof(rtmp));
}
void play_thread(CallContext *ctx)
{
alsa::AlsaDev usbPlaybackDev;
if (usbPlaybackDev.applyConfig(*ctx->alsa_config) < 0) {
PrintE("alsa config failed.\n");
return ;
}
// PrintI("alsa before init: %s\n", usbPlaybackDev.configToString());
if (usbPlaybackDev.init(SND_PCM_STREAM_PLAYBACK) < 0) {
PrintE("alsa init failed.\n");
return ;
}
PrintI("alsa init: %s\n", usbPlaybackDev.configToString());
ctx->alsa = &usbPlaybackDev;
int sampleSize = 0;
int outSize = MIX_AUDIO_SAMPLES * MIX_AUDIO_CHANNELS * sizeof(int16_t);
uint8_t *outBuffer = (uint8_t *)calloc(outSize, sizeof(uint8_t));
while (ctx->running) {
// 获取 MIX_INPUT_SAMPLES 长度的解码音频填充到outBuffer中
if (sampleSize <= 0) sampleSize = outSize;
while (sampleSize > 0)
{
if (!ctx->running) break;
if (ctx->list->size() <= 0) {
break;
}
std::unique_lock<std::mutex> lck(*ctx->mutex);
auto data = ctx->list->begin();
int readSize = sampleSize < (data->size - data->index) ? sampleSize : (data->size - data->index);
memcpy(outBuffer + outSize - sampleSize, data->data + data->index, readSize);
sampleSize -= readSize;
data->index += readSize;
if (data->index >= data->size) {
free(data->data);
ctx->list->erase(ctx->list->begin());
}
}
if (sampleSize <= 0) {
t_render = gettimeofday();
usbPlaybackDev.write(outBuffer, outSize);
// 音频处理
{
std::unique_lock<std::mutex> lck(*ctx->mutex);
t_analyze = gettimeofday();
ctx->apm_aec->ProcessReverseStream((int16_t *)outBuffer, *ctx->rtc_stream_config, *ctx->rtc_stream_config, (int16_t *)outBuffer);
int64_t delay = t_render - t_analyze + t_process - t_capture;
ctx->apm_aec->set_stream_delay_ms(0);
InfoL << "set_stream_delay_ms: " << ctx->apm_aec->stream_delay_ms();
// << "gettimeofday: " << gettimeofday();
}
}
}
usbPlaybackDev.destory();
free(outBuffer);
ctx->alsa = nullptr;
}
void pull_thread(CallContext *ctx)
{
pthread_setname_np(ctx->rtmp_thread->native_handle(), "pull_thread");
int ret;
RtmpConfig rtmp;
if (pullInit(&ctx->rtmp) < 0) {
return ;
}
memcpy(&rtmp, &ctx->rtmp, sizeof(rtmp));
AVPacket *pkt = av_packet_alloc();
AVFrame *outputFrame = av_frame_alloc();
int maxBuffSize = 1024 * 4 * 2;
uint8_t *swrBuffer = (uint8_t *)calloc(maxBuffSize, sizeof(uint8_t));
while (ctx->running) {
if (av_read_frame(rtmp.formatCtx, pkt) >= 0 &&
pkt->stream_index == rtmp.stream->index) {
ret = avcodec_send_packet(rtmp.codecCtx, pkt);
if (ret == AVERROR(EAGAIN)) {
LogW("send packet again.\n");
av_usleep(10*1000);
continue;
}
else if (ret < 0) {
LogE("send packet error ret={}\n", ret);
break;
}
while ( avcodec_receive_frame(rtmp.codecCtx, outputFrame) >= 0 ) {
int outSamples = swr_convert(rtmp.swrCtx, &swrBuffer, maxBuffSize/(sizeof(int16_t) * MIX_AUDIO_CHANNELS),
(uint8_t const **) (outputFrame->data), outputFrame->nb_samples);
int size = outSamples * MIX_AUDIO_CHANNELS * av_get_bytes_per_sample(AV_SAMPLE_FMT_S16);
{
int size = outSamples * MIX_AUDIO_CHANNELS * av_get_bytes_per_sample(AV_SAMPLE_FMT_S16);
uint8_t *buffer = (uint8_t *)calloc(size, sizeof(uint8_t));
memcpy(buffer, swrBuffer, size);
std::unique_lock<std::mutex> lck(*ctx->mutex);
audio_buf_t out;
out.data = buffer;
out.index = 0;
out.size = size;
ctx->list->emplace_back(out);
// if (out_fp) fwrite(buffer, 1, size, out_fp);
}
}
av_frame_unref(outputFrame);
}
av_packet_unref(pkt);
}
if (ctx->running)
ctx->running = false;
pullDestory(&ctx->rtmp);
memset(&rtmp, 0, sizeof(rtmp));
av_packet_free(&pkt);
av_frame_free(&outputFrame);
free(swrBuffer);
}