Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
427 views
in Technique[技术] by (71.8m points)

go - RTMP Server forward to ffmpeg

I'm looking into for a solution to accept rtmp connections and after demux forward to ffmpeg pipe. Here is the solution I tried, I just put the part of Handeling Connection since its a long code based on: https://github.com/gwuhaolin/livego

func (s *Server) handleConn(conn *core.Conn) error {
    if err := conn.HandshakeServer(); err != nil {
        conn.Close()
        log.Println("handleConn HandshakeServer err: ", err)
        return err
    }
    connServer := core.NewConnServer(conn)

    if err := connServer.ReadMsg(); err != nil {
        conn.Close()
        log.Println("handleConn read msg err: ", err)
        return err
    }
    appname, name, url := connServer.GetInfo()
        if connServer.IsPublisher() {
        //todo: check token if is valid
        connServer.PublishInfo.Name = appname
        reader := NewVirReader(connServer)
        s.handler.HandleReader(reader)
        log.Println("new publisher: %+v", reader.Info())

        if s.getter != nil {
            writeType := reflect.TypeOf(s.getter)
            log.Println("handleConn:writeType=%v", writeType)
            writer := s.getter.GetWriter(reader.Info())
            s.handler.HandleWriter(writer)
        }
        flvWriter := new(flvBus.Bus)
        flvWriter.SetBackend(q)
        s.handler.HandleWriter(flvWriter.GetWriter(reader.Info()))
       } else {
        writer := NewVirWriter(connServer)
        log.Println("new player: %+v", writer.Info())
        s.handler.HandleWriter(writer)
    }

    return nil
}

then on flv bus: assume that ctx is the pipe0 of ffmpeg.

package flvBus

import (
    "github.com/gwuhaolin/livego/av"
    "github.com/gwuhaolin/livego/protocol/amf"
    "github.com/gwuhaolin/livego/utils/pio"
    "github.com/gwuhaolin/livego/utils/uid"
    "time"
)

var (
    flvHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09}
)

const (
    headerLen = 11
)

type FLVWriter struct {
    Uid string
    av.RWBaser
    app, title, url string
    buf             []byte
    closed          chan struct{}
    ctx             Writer
}

func NewFLVWriter(app, title, url string, ctx Writer) *FLVWriter {
    ret := &FLVWriter{
        Uid:     uid.NewId(),
        app:     app,
        title:   title,
        url:     url,
        ctx:     ctx,
        RWBaser: av.NewRWBaser(time.Second * 10),
        closed:  make(chan struct{}),
        buf:     make([]byte, headerLen),
    }

    ret.ctx.Write(flvHeader)
    pio.PutI32BE(ret.buf[:4], 0)
    ret.ctx.Write(ret.buf[:4])
    return ret
}

func (writer *FLVWriter) Write(p *av.Packet) error {
    writer.RWBaser.SetPreTime()
    h := writer.buf[:headerLen]

    typeID := av.TAG_VIDEO
    if !p.IsVideo {
        if p.IsMetadata {
            var err error
            typeID = av.TAG_SCRIPTDATAAMF0
            p.Data, err = amf.MetaDataReform(p.Data, amf.DEL)
            if err != nil {
                return err
            }
        } else {
            typeID = av.TAG_AUDIO
        }
    }
    dataLen := len(p.Data)
    timestamp := p.TimeStamp
    timestamp += writer.BaseTimeStamp()
    writer.RWBaser.RecTimeStamp(timestamp, uint32(typeID))

    preDataLen := dataLen + headerLen
    timestampbase := timestamp & 0xffffff
    timestampExt := timestamp >> 24 & 0xff

    pio.PutU8(h[0:1], uint8(typeID))
    pio.PutI24BE(h[1:4], int32(dataLen))
    pio.PutI24BE(h[4:7], int32(timestampbase))
    pio.PutU8(h[7:8], uint8(timestampExt))

    if _, err := writer.ctx.Write(h); err != nil {
        return err
    }

    if _, err := writer.ctx.Write(p.Data); err != nil {
        return err
    }

    pio.PutI32BE(h[:4], int32(preDataLen))
    if _, err := writer.ctx.Write(h[:4]); err != nil {
        return err
    }

    return nil
}

func (writer *FLVWriter) Wait() {
    select {
    case <-writer.closed:
        return
    }
}

func (writer *FLVWriter) Close(error) {
    //writer.ctx.Close()
    close(writer.closed)
}

func (writer *FLVWriter) Info() (ret av.Info) {
    ret.UID = writer.Uid
    ret.URL = writer.url
    ret.Key = writer.app + "/" + writer.title
    return
}

but at the end it return me pipe:0: Invalid data found when processing input.

How can I forward captured frames to ffmpeg? Also any other solutions are welcome.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...