Blog Archive

Thursday, February 26, 2009

Multicast trivia

In multicast IP addressses, the low 28 bits are varied (the high four are a 1110). But in ethernet, only 23 bits are avaiable to multicast. So level 2 devices throw out the high 5 bits of the lower 28 of a multicast address, which obviously causes over-subscription on LANs. If you subscribe to 224.1.1.1, you'll get 224.129.1.1, 225.1.1.1, through 239.129.1.1.

Therefore! Kids, always vary the low byte of a multicast address.

Saturday, February 14, 2009

-lkse or -lthr?

A token-passing test using two threads. The threads use mutexes and condition variables to signal each other.

Machine: 3ghz quad core, running FreeBSD 7.1; not like the number of cores matters since only once process is running at a time

Using -lkse, a roundtrip between two threads is 14us.
Using -lthr, it's 1.2us.

-lkse is the N:M threading library (N user threads map onto M kernel threads).
-lthr -s the 1:1 threading library (1 kernel thread per 1 user thread)

On 2.3ghz quad core running Windows XP 64: 14usec / roundtrip.

Draw your conclusions. Source code below



// general utilities

#include <iostream>
#include <cstdio> // cerr,cin
#include <string>
#include <sys/time.h>

#define prlog(xxx) cout << xxx << "\n"
using namespace std;

#define frep_(i, max) for (int i = 0, i##lim=(max); i < i##lim; i++) // forward
#define frep1_(i, max) for (int i = 1, i##lim=(max); i < i##lim; i++)
#define rrep_(i, max) for (int i = (max); i-->0;)
#define rrep1_(i, max) for (int i = (max); --i>0;)

inline void PrintPerfSample(const char* s, double d) {
double secs_per_call=1/d;
if (secs_per_call < 0.000001) {
cout<<s<<": "<<d<<" samples per sec (";
cout<<secs_per_call*1000000000.0 <<" nanosec per sample";
} else if (secs_per_call < 0.001) {
cout<<s<<": "<<d<<" samples per sec (";
cout<<secs_per_call*1000000.0 <<" microsec per sample";
} else if (secs_per_call < 1) {
cout<<s<<": "<<d<<" samples per sec (";
cout<<secs_per_call*1000 <<" msec per sample";
} else {
cout<<s<<": "<<d<<" samples per sec (";
cout<<secs_per_call<<" secs per sample";
}
cout<<")\n";
}

#define DO_PERF_TEST(name,action) \
{\
double base_time = GetAbsTimer();\
frep_(loop_index,10) {\
action;\
};\
int loop_quant = 10/(GetAbsTimer()-base_time);\
if (loop_quant<1000) loop_quant=1000;\
if (loop_quant>100000) loop_quant=100000;\
base_time = GetAbsTimer();\
double timer_sample_overhead = GetAbsTimer()-base_time;\
int timer_sampled=2;\
int number_of_loops=0;\
do {\
frep(loop_index,loop_quant) {\
action;\
}\
number_of_loops++;\
timer_sampled++;\
} while ((GetAbsTimer()-base_time) < 2);\
PrintPerfSample(name,double(number_of_loops)*double(loop_quant)/(GetAbsTimer()-base_time-timer_sample_overhead*(timer_sampled+1)));\
}

double GetAbsTimer() {
struct timespec ts;
int result=clock_gettime(CLOCK_REALTIME,&ts);
if (result!=0) exit(1);
return ts.tv_sec + ts.tv_nsec*1e-9;
}

// threading utilities

#include <pthread.h>

struct SyncEvent {
pthread_mutex_t mutex;
pthread_cond_t cond;
bool triggered;
SyncEvent() {
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
triggered = false;
}
~SyncEvent() {
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
}
void Set() {
pthread_mutex_lock(&mutex);
triggered=true;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
void Reset() {
pthread_mutex_lock(&mutex);
triggered=true;
pthread_mutex_unlock(&mutex);
}
void Wait() {
pthread_mutex_lock(&mutex);
while (!triggered) {
pthread_cond_wait(&cond,&mutex);
}
pthread_mutex_unlock(&mutex);
}
bool Test() {
return triggered;
}
};


struct WorkThread {
pthread_t thread;

virtual void DoWork()=0;
WorkThread() {
thread=0;
}
virtual ~WorkThread() {
}
static void* ThreadFunc(void *arg) {
((WorkThread*)arg)->DoWork();
delete (WorkThread*)arg;
return NULL;
}
void Start() {// will delete itself after completion
if (pthread_create(&thread,NULL,&ThreadFunc,(void*)this)!=0) {
prlog("thread creation failed");
}
}
};

// actual code

struct FakePipe {
SyncEvent has_data;
FakePipe() {
}
void Read() {
has_data.Wait();
has_data.Reset();
}
void Write() {
has_data.Set();
}
};

struct Thread1 :WorkThread {
FakePipe &_1,&_2,&_3,&_exit;
Thread1(FakePipe &p1,FakePipe &p2, FakePipe &p3, FakePipe &exit):_1(p1),_2(p2),_3(p3), _exit(exit) {
Start();
}

void DoWork() {
_1.Write();
_2.Read();
DO_PERF_TEST("Messaging",
{
_1.Write();
_2.Read();
}
);
_3.Write();
_1.Write();
_exit.Write();
}
};
struct Thread2 :WorkThread {
FakePipe &_1,&_2,&_3,&_exit;
Thread2(FakePipe &p1,FakePipe &p2, FakePipe &p3, FakePipe &exit):_1(p1),_2(p2),_3(p3), _exit(exit) {
Start();
}
void DoWork() {
while (!_3.has_data.Test()) {
_1.Read();
_2.Write();
}
_exit.Write();
}
};

int main() {
FakePipe _1,_2,_3,_exit1,_exit2;

new Thread2(_1,_2,_3,_exit1);
new Thread1(_1,_2,_3,_exit2);

_exit1.Read();
_exit2.Read();
}