Skip to content
42 changes: 41 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ require (
git.torproject.org/pluggable-transports/snowflake.git/v2 v2.3.0
github.com/AlecAivazis/survey/v2 v2.3.5
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/anacrolix/dht/v2 v2.19.3-0.20221129012050-3231c919e67b
github.com/anacrolix/torrent v1.47.1-0.20221102120345-c63f7e1bd720
github.com/apex/log v1.9.0
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/cretz/bine v0.2.0
Expand Down Expand Up @@ -44,9 +46,48 @@ require (
)

require (
crawshaw.io/sqlite v0.3.3-0.20210127221821-98b1f83c5508 // indirect
github.com/RoaringBitmap/roaring v1.2.1 // indirect
github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/anacrolix/chansync v0.3.0 // indirect
github.com/anacrolix/envpprof v1.2.1 // indirect
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 // indirect
github.com/anacrolix/go-libutp v1.2.0 // indirect
github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30 // indirect
github.com/anacrolix/missinggo v1.3.0 // indirect
github.com/anacrolix/missinggo/perf v1.0.0 // indirect
github.com/anacrolix/missinggo/v2 v2.7.0 // indirect
github.com/anacrolix/mmsg v1.0.0 // indirect
github.com/anacrolix/multiless v0.3.0 // indirect
github.com/anacrolix/stm v0.4.0 // indirect
github.com/anacrolix/sync v0.4.0 // indirect
github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96 // indirect
github.com/anacrolix/utp v0.1.0 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/benbjohnson/immutable v0.3.0 // indirect
github.com/bits-and-blooms/bitset v1.2.2 // indirect
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/lispad/go-generics-tools v1.1.0 // indirect
github.com/mattn/go-sqlite3 v1.14.14 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 // indirect
github.com/segmentio/fasthash v1.0.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tidwall/btree v1.3.1 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.opentelemetry.io/otel v1.8.0 // indirect
go.opentelemetry.io/otel/trace v1.8.0 // indirect
golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
)

require (
Expand Down Expand Up @@ -84,7 +125,6 @@ require (
github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect
github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-sqlite3 v1.14.15 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/mroth/weightedrand v0.4.1 // indirect
Expand Down
168 changes: 161 additions & 7 deletions go.sum

Large diffs are not rendered by default.

Binary file not shown.
277 changes: 277 additions & 0 deletions internal/engine/experiment/bittorrent/bittorrent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package bittorrent

import (
"bytes"
"context"
"fmt"
"github.com/pkg/errors"
"net"
"net/url"
"os"
"time"

"github.com/anacrolix/dht/v2"
"github.com/anacrolix/torrent"
"github.com/ooni/probe-cli/v3/internal/engine/experiment/urlgetter"
"github.com/ooni/probe-cli/v3/internal/measurexlite"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/tracex"
)

var (
// errNoInputProvided indicates no input was passed
errNoInputProvided = errors.New("no input provided")

// errInputIsNotAnURL indicates that input is not an URL
errInputIsNotAnURL = errors.New("input is not an URL")

// errInvalidScheme indicates that the scheme is invalid
// golint is stupid and does not let us end erorr with ":"
errInvalidScheme = errors.New("scheme must be magnet")
)

const (
testName = "bittorrent"
testVersion = "0.0.1"
)

// Config contains the experiment config.
type Config struct {
BootstrapNodes []*string
DisableDHTSecurity bool
}

type runtimeConfig struct {
magnet string
}

func config(input model.MeasurementTarget) (*runtimeConfig, error) {
if input == "" {
return nil, errNoInputProvided
}

parsed, err := url.Parse(string(input))
if err != nil {
return nil, fmt.Errorf("%w: %s", errInputIsNotAnURL, err.Error())
}
if parsed.Scheme != "magnet" {
return nil, errInvalidScheme
}

validConfig := runtimeConfig{
magnet: string(input),
}

return &validConfig, nil
}

// TestKeys contains the experiment results
type TestKeys struct {
// DNS queries when resolving trackers
Queries []*model.ArchivalDNSLookupResult `json:"queries"`
// Indicates any kind of failure
Failure string `json:"failure"`
// The total number of peers contacted about the requested magnet
PeersNum int `json:"peers_num"`
// The complete list of peers contacted
Peers []string `json:"peers"`
// The total number of bytes received by the client
TotalBytesRead int64 `json:"total_bytes_received"`
// The total number of bad pieces (failed verification) received by the client
TotalBadPieces int64 `json:"total_bad_pieces"`
}

func (tk *TestKeys) failure(err error) {
tk.Failure = *tracex.NewFailure(err)
}

// Measurer performs the measurement.
type Measurer struct {
// Config contains the experiment settings. If empty we
// will be using default settings.
Config Config

// Getter is an optional getter to be used for testing.
Getter urlgetter.MultiGetter
}

// ExperimentName implements ExperimentMeasurer.ExperimentName
func (m Measurer) ExperimentName() string {
return testName
}

// ExperimentVersion implements ExperimentMeasurer.ExperimentVersion
func (m Measurer) ExperimentVersion() string {
return testVersion
}

func torrentStats(torrent *torrent.Torrent, client *torrent.Client, tk *TestKeys) {
stats := torrent.Stats()
tk.PeersNum = len(tk.Peers)
tk.TotalBytesRead = stats.ConnStats.BytesRead.Int64()
tk.TotalBadPieces = stats.ConnStats.PiecesDirtiedBad.Int64()
}

func timeoutStats(torrent *torrent.Torrent, client *torrent.Client, tk *TestKeys) {
torrentStats(torrent, client, tk)
tk.Failure = "download_timeout"
}

// Run implements ExperimentMeasurer.Run
func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error {
//ctx context.Context, sess model.ExperimentSession,
//measurement *model.Measurement, callbacks model.ExperimentCallbacks,
//) error {
sess := args.Session
measurement := args.Measurement
log := sess.Logger()
trace := measurexlite.NewTrace(0, measurement.MeasurementStartTimeSaved)
resolver := trace.NewStdlibResolver(log)

config, err := config(measurement.Input)
if err != nil {
// Invalid input data, we don't even generate report
return err
}

tk := new(TestKeys)
measurement.TestKeys = tk

ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

tmpdir, err := os.MkdirTemp("", "ooni")
if err != nil {
log.Warnf(*tracex.NewFailure(err))
return nil
}
log.Infof("Using temporary directory %s", tmpdir)
defer os.RemoveAll(tmpdir)

conf := torrent.NewDefaultClientConfig()
conf.DataDir = tmpdir
conf.NoUpload = true
conf.ListenPort = 0
if len(m.Config.BootstrapNodes) != 0 {
// Explicit bootstrap nodes instead of default
// (used for testing via localhost peer)
conf.DhtStartingNodes = func(network string) dht.StartingNodesGetter {
return func() (addrs []dht.Addr, err error) {
for _, addrport := range m.Config.BootstrapNodes {
udpAddr, err := net.ResolveUDPAddr("udp", *addrport)
if err != nil {
return nil, err
}
addrs = append(addrs, dht.NewAddr(udpAddr))
}
return addrs, nil
}
}
}
// Try disable DHT security
if m.Config.DisableDHTSecurity {
conf.ConfigureAnacrolixDhtServer = func (dht *dht.ServerConfig) {
dht.NoSecurity = true
}
}

// Lookup tracker IPs via ooni utils
conf.LookupTrackerIp = func(u *url.URL) ([]net.IP, error) {

log.Infof("Resolving DNS for %s", u.Hostname())
resolvedAddrs, err := resolver.LookupHost(ctx, u.Hostname())
addrs := []net.IP{}
if err != nil {
return addrs, nil
}
log.Infof("Finished DNS for %s: %v", u.Hostname(), resolvedAddrs)
for _, addr := range resolvedAddrs {
addrs = append(addrs, net.ParseIP(addr))
}
tk.Queries = append(tk.Queries, trace.DNSLookupsFromRoundTrip()...)
return addrs, err
}

// We want to test Bittorrent connectivity, not HTTPS/websockets
conf.DisableWebtorrent = true
conf.DisableWebseeds = true

// Register new peers to the test keys
clientCallbacks := new(torrent.Callbacks)
clientCallbacks.NewPeer = append(clientCallbacks.NewPeer,
func(peer *torrent.Peer) {
log.Debugf("Found new peer: %s", peer.RemoteAddr.String())
tk.Peers = append(tk.Peers, peer.RemoteAddr.String())
},
)
conf.Callbacks = *clientCallbacks

client, err := torrent.NewClient(conf)
if err != nil {
log.Warnf(*tracex.NewFailure(err))
return nil
}
defer client.Close()

torrent, err := client.AddMagnet(config.magnet)
if err != nil {
log.Warnf(*tracex.NewFailure(err))
return nil
}

select {
case <-ctx.Done():
tk.Failure = "metainfo_timeout"
writer := bytes.NewBufferString("")
client.WriteStatus(writer)
log.Info(writer.String())

return nil
case <-torrent.GotInfo():
}

torrent.DownloadAll()

// Setup a new chan to know when the torrent is finished... allows to apply timeout
finished := make(chan bool)

go func() {
client.WaitAll()
finished <- true
}()
select {
case <-ctx.Done():
timeoutStats(torrent, client, tk)
tk.Failure = "timeout"
case <-finished:
torrentStats(torrent, client, tk)
writer := bytes.NewBufferString("")
client.WriteStatus(writer)
log.Info(writer.String())
}

return nil
}

// NewExperimentMeasurer creates a new ExperimentMeasurer.
func NewExperimentMeasurer(config Config) model.ExperimentMeasurer {
return &Measurer{Config: config}
}

// SummaryKeys contains summary keys for this experiment.
//
// Note that this structure is part of the ABI contract with ooniprobe
// therefore we should be careful when changing it.
type SummaryKeys struct {
IsAnomaly bool `json:"-"`
}

// GetSummaryKeys implements model.ExperimentMeasurer.GetSummaryKeys.
func (m Measurer) GetSummaryKeys(measurement *model.Measurement) (interface{}, error) {
sk := SummaryKeys{IsAnomaly: false}
_, ok := measurement.TestKeys.(*TestKeys)
if !ok {
return sk, errors.New("invalid test keys type")
}
return sk, nil
}
Loading