使用pthread构建固定死锁场景

使用pthread中的条件变量和锁,构建了一个可以模拟死锁的脚本,能实现每个线程按顺序执行,从而形成固定死锁场景,方便测试死锁算法性能,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <time.h>
#include <stdbool.h>
#include "libpq-fe.h"
#include <unistd.h>

#define RING_TRANS_NUM 5
#define OTHER_TRANS_NUM 10

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t conds[RING_TRANS_NUM + OTHER_TRANS_NUM];
static int turn = 0; // 当前执行的线程编号

typedef struct Tag
{
int row_id;
int thread_id;
}Tag;

enum cn_port {
cn01 = 1,
cn02 = 2,
};

void free_and_end_trans(PGconn *conn, PGresult *res)
{
if(res != NULL)
{
PQclear(res);
}
PQfinish(conn);
pthread_exit(NULL);
}

void *other_thread_function(void *arg)
{
PGconn *conn = NULL;
PGresult *res = NULL;
char conn_info[100];
int cur_port = rand() % 2 + cn01;

snprintf(conn_info, sizeof(conn_info),
"dbname = postgres user = gaotianfu password = 129212351GTFgtf_ host = 127.0.0.1 port = %d", cur_port);

conn = PQconnectdb(conn_info);
if (PQstatus(conn) != CONNECTION_OK) {
free_and_end_trans(conn, res);
}

/* 开启事务 */
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
free_and_end_trans(conn, res);
}
PQclear(res);

Tag *tag = (Tag *)arg;

pthread_mutex_lock(&mutex);
while (turn != tag->thread_id) {
pthread_cond_wait(&conds[tag->thread_id], &mutex);
}

turn = turn == (RING_TRANS_NUM + OTHER_TRANS_NUM - 1) ? (RING_TRANS_NUM - 1) : turn + 1;
pthread_cond_signal(&conds[turn]); // 通知下一个线程
pthread_mutex_unlock(&mutex);

char update_query[100];
snprintf(update_query, sizeof(update_query), "UPDATE a SET str = '11' WHERE id = %d", tag->row_id);
free(tag);
res = PQexec(conn, update_query);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
}

res = PQexec(conn, "END");
free_and_end_trans(conn, res);
return NULL;
}

void *ring_thread_function(void *arg) {
PGconn *conn = NULL;
PGresult *res = NULL;
char conn_info[100];
int cur_port = rand() % 2 + cn01;

snprintf(conn_info, sizeof(conn_info),
"dbname = xxx user = xxx password = xxx host = xxx port = %d", cur_port);

conn = PQconnectdb(conn_info);
if (PQstatus(conn) != CONNECTION_OK) {
free_and_end_trans(conn, res);
}

/* 开启事务 */
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
free_and_end_trans(conn, res);
}
PQclear(res);

Tag *tag = (Tag *)arg;

for(int i = 0; i < 2; i++)
{
pthread_mutex_lock(&mutex);
while (turn != tag->thread_id) {
pthread_cond_wait(&conds[tag->thread_id], &mutex);
}
// printf("%lu my_thread_id is %d, ture is %d ",pthread_self(), tag->thread_id, turn);

char update_query[100];
snprintf(update_query, sizeof(update_query), "UPDATE a SET str = '11' WHERE id = %d", tag->row_id);

if(i == 1)
{
turn = turn == (RING_TRANS_NUM - 2) ? RING_TRANS_NUM: turn + 1; // 倒数第二个 ring_thread 通知 other_thread
pthread_cond_signal(&conds[turn]);
pthread_mutex_unlock(&mutex);
printf("the %d time, %lu update %d, signal %d\n", i, pthread_self(), tag->row_id, turn);
}
res = PQexec(conn, update_query);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
if((tag->row_id == RING_TRANS_NUM + 1) && (i == 1))
{
printf(" make deadlock\n");
}
else
{
printf(" %lu update %d\n", pthread_self(), tag->row_id);
}
PQclear(res);
}
if(i == 1)
{
free(tag);
}
if(i == 0)
{
tag->row_id = (tag->row_id == RING_TRANS_NUM) ? 1 : tag->row_id + 1;
turn = turn == (RING_TRANS_NUM - 1) ? 0 : (turn % RING_TRANS_NUM) + 1;
pthread_cond_signal(&conds[turn]); // 通知下一个线程
pthread_mutex_unlock(&mutex);
printf("the %d time, %lu , signal %d\n", i, pthread_self(), turn);
}
}

res = PQexec(conn, "END");
free_and_end_trans(conn, res);
return NULL;
}


int main(int argc, char *argv[]) {
for (int i = 0; i < RING_TRANS_NUM + OTHER_TRANS_NUM; ++i) {
pthread_cond_init(&conds[i], NULL); // 初始化条件变量
}

pthread_t *ring_thread;
ring_thread = (pthread_t *) malloc(sizeof(pthread_t) * RING_TRANS_NUM);

pthread_t *other_thread;
other_thread = (pthread_t *) malloc(sizeof(pthread_t) * OTHER_TRANS_NUM);

/* 形成依赖 */
for(int i = 0; i < RING_TRANS_NUM; ++i)
{
Tag *cur_tag = (Tag *) malloc(sizeof(Tag));
cur_tag->row_id = i + 1;
cur_tag->thread_id = i;
pthread_create(&ring_thread[i], NULL, ring_thread_function, (void *) cur_tag);
pthread_detach(ring_thread[i]);
}

/* 其他依赖 */
for(int i = 0; i < OTHER_TRANS_NUM; ++i)
{
Tag *cur_tag = (Tag *) malloc(sizeof(Tag));
cur_tag->row_id = 2; /* 只更新第 2 行 */
cur_tag->thread_id = i + RING_TRANS_NUM;
pthread_create(&other_thread[i], NULL, other_thread_function, (void *) cur_tag);
pthread_detach(other_thread[i]);
}

while (1) {
sleep(10);
}

return 0;
}

上面的代码实现的效果为,首先形成一个长依赖路径,路径上的每个事务都更新自己的数据行,例如第一个事务更新第一行数据,第二个事务更新第二行数据,第RING_TRANS_NUM个事务更新第RING_TRANS_NUM行数据,此时通知第一个事务所在的线程去更新第二行数据,第二个事务去更新第三行数据,直到第RING_TRANS_NUM - 1个事务更新第RING_TRANS_NUM行数据。这样就形成了长依赖路径。

然后添加其他依赖,这些依赖依附于上述长依赖路径,本程序中设置其他依赖都更新第二行数据。当这些其他依赖都执行完事务语句时,通知第RING_TRANS_NUM个事务所在的线程去更新第一行数据,最终形成了死锁环。