Showing error 1215

User: Jiri Slaby
Error type: Double Unlock
Error type description: Some lock is unlocked twice unintentionally in a sequence
File location: net/tipc/link.c
Line in file: 604
Project: Linux Kernel
Project version: 2.6.28
Tools: Stanse (1.2)
Entered: 2012-04-30 10:52:00 UTC


Source:

   1/*
   2 * net/tipc/link.c: TIPC link code
   3 *
   4 * Copyright (c) 1996-2007, Ericsson AB
   5 * Copyright (c) 2004-2007, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "dbg.h"
  39#include "link.h"
  40#include "net.h"
  41#include "node.h"
  42#include "port.h"
  43#include "addr.h"
  44#include "node_subscr.h"
  45#include "name_distr.h"
  46#include "bearer.h"
  47#include "name_table.h"
  48#include "discover.h"
  49#include "config.h"
  50#include "bcast.h"
  51
  52
  53/*
  54 * Out-of-range value for link session numbers
  55 */
  56
  57#define INVALID_SESSION 0x10000
  58
  59/*
  60 * Limit for deferred reception queue:
  61 */
  62
  63#define DEF_QUEUE_LIMIT 256u
  64
  65/*
  66 * Link state events:
  67 */
  68
  69#define  STARTING_EVT    856384768        /* link processing trigger */
  70#define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
  71#define  TIMEOUT_EVT     560817u        /* link timer expired */
  72
  73/*
  74 * The following two 'message types' is really just implementation
  75 * data conveniently stored in the message header.
  76 * They must not be considered part of the protocol
  77 */
  78#define OPEN_MSG   0
  79#define CLOSED_MSG 1
  80
  81/*
  82 * State value stored in 'exp_msg_count'
  83 */
  84
  85#define START_CHANGEOVER 100000u
  86
  87/**
  88 * struct link_name - deconstructed link name
  89 * @addr_local: network address of node at this end
  90 * @if_local: name of interface at this end
  91 * @addr_peer: network address of node at far end
  92 * @if_peer: name of interface at far end
  93 */
  94
  95struct link_name {
  96        u32 addr_local;
  97        char if_local[TIPC_MAX_IF_NAME];
  98        u32 addr_peer;
  99        char if_peer[TIPC_MAX_IF_NAME];
 100};
 101
 102#if 0
 103
 104/* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
 105
 106/**
 107 * struct link_event - link up/down event notification
 108 */
 109
 110struct link_event {
 111        u32 addr;
 112        int up;
 113        void (*fcn)(u32, char *, int);
 114        char name[TIPC_MAX_LINK_NAME];
 115};
 116
 117#endif
 118
 119static void link_handle_out_of_seq_msg(struct link *l_ptr,
 120                                       struct sk_buff *buf);
 121static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf);
 122static int  link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
 123static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
 124static int  link_send_sections_long(struct port *sender,
 125                                    struct iovec const *msg_sect,
 126                                    u32 num_sect, u32 destnode);
 127static void link_check_defragm_bufs(struct link *l_ptr);
 128static void link_state_event(struct link *l_ptr, u32 event);
 129static void link_reset_statistics(struct link *l_ptr);
 130static void link_print(struct link *l_ptr, struct print_buf *buf,
 131                       const char *str);
 132
 133/*
 134 * Debugging code used by link routines only
 135 *
 136 * When debugging link problems on a system that has multiple links,
 137 * the standard TIPC debugging routines may not be useful since they
 138 * allow the output from multiple links to be intermixed.  For this reason
 139 * routines of the form "dbg_link_XXX()" have been created that will capture
 140 * debug info into a link's personal print buffer, which can then be dumped
 141 * into the TIPC system log (TIPC_LOG) upon request.
 142 *
 143 * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size
 144 * of the print buffer used by each link.  If LINK_LOG_BUF_SIZE is set to 0,
 145 * the dbg_link_XXX() routines simply send their output to the standard
 146 * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful
 147 * when there is only a single link in the system being debugged.
 148 *
 149 * Notes:
 150 * - When enabled, LINK_LOG_BUF_SIZE should be set to at least TIPC_PB_MIN_SIZE
 151 * - "l_ptr" must be valid when using dbg_link_XXX() macros
 152 */
 153
 154#define LINK_LOG_BUF_SIZE 0
 155
 156#define dbg_link(fmt, arg...) \
 157        do { \
 158                if (LINK_LOG_BUF_SIZE) \
 159                        tipc_printf(&l_ptr->print_buf, fmt, ## arg); \
 160        } while (0)
 161#define dbg_link_msg(msg, txt) \
 162        do { \
 163                if (LINK_LOG_BUF_SIZE) \
 164                        tipc_msg_dbg(&l_ptr->print_buf, msg, txt); \
 165        } while (0)
 166#define dbg_link_state(txt) \
 167        do { \
 168                if (LINK_LOG_BUF_SIZE) \
 169                        link_print(l_ptr, &l_ptr->print_buf, txt); \
 170        } while (0)
 171#define dbg_link_dump() do { \
 172        if (LINK_LOG_BUF_SIZE) { \
 173                tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \
 174                tipc_printbuf_move(LOG, &l_ptr->print_buf); \
 175        } \
 176} while (0)
 177
 178static void dbg_print_link(struct link *l_ptr, const char *str)
 179{
 180        if (DBG_OUTPUT != TIPC_NULL)
 181                link_print(l_ptr, DBG_OUTPUT, str);
 182}
 183
 184static void dbg_print_buf_chain(struct sk_buff *root_buf)
 185{
 186        if (DBG_OUTPUT != TIPC_NULL) {
 187                struct sk_buff *buf = root_buf;
 188
 189                while (buf) {
 190                        msg_dbg(buf_msg(buf), "In chain: ");
 191                        buf = buf->next;
 192                }
 193        }
 194}
 195
 196/*
 197 *  Simple link routines
 198 */
 199
 200static unsigned int align(unsigned int i)
 201{
 202        return (i + 3) & ~3u;
 203}
 204
 205static int link_working_working(struct link *l_ptr)
 206{
 207        return (l_ptr->state == WORKING_WORKING);
 208}
 209
 210static int link_working_unknown(struct link *l_ptr)
 211{
 212        return (l_ptr->state == WORKING_UNKNOWN);
 213}
 214
 215static int link_reset_unknown(struct link *l_ptr)
 216{
 217        return (l_ptr->state == RESET_UNKNOWN);
 218}
 219
 220static int link_reset_reset(struct link *l_ptr)
 221{
 222        return (l_ptr->state == RESET_RESET);
 223}
 224
 225static int link_blocked(struct link *l_ptr)
 226{
 227        return (l_ptr->exp_msg_count || l_ptr->blocked);
 228}
 229
 230static int link_congested(struct link *l_ptr)
 231{
 232        return (l_ptr->out_queue_size >= l_ptr->queue_limit[0]);
 233}
 234
 235static u32 link_max_pkt(struct link *l_ptr)
 236{
 237        return l_ptr->max_pkt;
 238}
 239
 240static void link_init_max_pkt(struct link *l_ptr)
 241{
 242        u32 max_pkt;
 243
 244        max_pkt = (l_ptr->b_ptr->publ.mtu & ~3);
 245        if (max_pkt > MAX_MSG_SIZE)
 246                max_pkt = MAX_MSG_SIZE;
 247
 248        l_ptr->max_pkt_target = max_pkt;
 249        if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
 250                l_ptr->max_pkt = l_ptr->max_pkt_target;
 251        else
 252                l_ptr->max_pkt = MAX_PKT_DEFAULT;
 253
 254        l_ptr->max_pkt_probes = 0;
 255}
 256
 257static u32 link_next_sent(struct link *l_ptr)
 258{
 259        if (l_ptr->next_out)
 260                return msg_seqno(buf_msg(l_ptr->next_out));
 261        return mod(l_ptr->next_out_no);
 262}
 263
 264static u32 link_last_sent(struct link *l_ptr)
 265{
 266        return mod(link_next_sent(l_ptr) - 1);
 267}
 268
 269/*
 270 *  Simple non-static link routines (i.e. referenced outside this file)
 271 */
 272
 273int tipc_link_is_up(struct link *l_ptr)
 274{
 275        if (!l_ptr)
 276                return 0;
 277        return (link_working_working(l_ptr) || link_working_unknown(l_ptr));
 278}
 279
 280int tipc_link_is_active(struct link *l_ptr)
 281{
 282        return ((l_ptr->owner->active_links[0] == l_ptr) ||
 283                (l_ptr->owner->active_links[1] == l_ptr));
 284}
 285
 286/**
 287 * link_name_validate - validate & (optionally) deconstruct link name
 288 * @name - ptr to link name string
 289 * @name_parts - ptr to area for link name components (or NULL if not needed)
 290 *
 291 * Returns 1 if link name is valid, otherwise 0.
 292 */
 293
 294static int link_name_validate(const char *name, struct link_name *name_parts)
 295{
 296        char name_copy[TIPC_MAX_LINK_NAME];
 297        char *addr_local;
 298        char *if_local;
 299        char *addr_peer;
 300        char *if_peer;
 301        char dummy;
 302        u32 z_local, c_local, n_local;
 303        u32 z_peer, c_peer, n_peer;
 304        u32 if_local_len;
 305        u32 if_peer_len;
 306
 307        /* copy link name & ensure length is OK */
 308
 309        name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
 310        /* need above in case non-Posix strncpy() doesn't pad with nulls */
 311        strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
 312        if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
 313                return 0;
 314
 315        /* ensure all component parts of link name are present */
 316
 317        addr_local = name_copy;
 318        if ((if_local = strchr(addr_local, ':')) == NULL)
 319                return 0;
 320        *(if_local++) = 0;
 321        if ((addr_peer = strchr(if_local, '-')) == NULL)
 322                return 0;
 323        *(addr_peer++) = 0;
 324        if_local_len = addr_peer - if_local;
 325        if ((if_peer = strchr(addr_peer, ':')) == NULL)
 326                return 0;
 327        *(if_peer++) = 0;
 328        if_peer_len = strlen(if_peer) + 1;
 329
 330        /* validate component parts of link name */
 331
 332        if ((sscanf(addr_local, "%u.%u.%u%c",
 333                    &z_local, &c_local, &n_local, &dummy) != 3) ||
 334            (sscanf(addr_peer, "%u.%u.%u%c",
 335                    &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
 336            (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
 337            (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
 338            (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
 339            (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) ||
 340            (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) ||
 341            (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1)))
 342                return 0;
 343
 344        /* return link name components, if necessary */
 345
 346        if (name_parts) {
 347                name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
 348                strcpy(name_parts->if_local, if_local);
 349                name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
 350                strcpy(name_parts->if_peer, if_peer);
 351        }
 352        return 1;
 353}
 354
 355/**
 356 * link_timeout - handle expiration of link timer
 357 * @l_ptr: pointer to link
 358 *
 359 * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
 360 * with tipc_link_delete().  (There is no risk that the node will be deleted by
 361 * another thread because tipc_link_delete() always cancels the link timer before
 362 * tipc_node_delete() is called.)
 363 */
 364
 365static void link_timeout(struct link *l_ptr)
 366{
 367        tipc_node_lock(l_ptr->owner);
 368
 369        /* update counters used in statistical profiling of send traffic */
 370
 371        l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
 372        l_ptr->stats.queue_sz_counts++;
 373
 374        if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
 375                l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
 376
 377        if (l_ptr->first_out) {
 378                struct tipc_msg *msg = buf_msg(l_ptr->first_out);
 379                u32 length = msg_size(msg);
 380
 381                if ((msg_user(msg) == MSG_FRAGMENTER)
 382                    && (msg_type(msg) == FIRST_FRAGMENT)) {
 383                        length = msg_size(msg_get_wrapped(msg));
 384                }
 385                if (length) {
 386                        l_ptr->stats.msg_lengths_total += length;
 387                        l_ptr->stats.msg_length_counts++;
 388                        if (length <= 64)
 389                                l_ptr->stats.msg_length_profile[0]++;
 390                        else if (length <= 256)
 391                                l_ptr->stats.msg_length_profile[1]++;
 392                        else if (length <= 1024)
 393                                l_ptr->stats.msg_length_profile[2]++;
 394                        else if (length <= 4096)
 395                                l_ptr->stats.msg_length_profile[3]++;
 396                        else if (length <= 16384)
 397                                l_ptr->stats.msg_length_profile[4]++;
 398                        else if (length <= 32768)
 399                                l_ptr->stats.msg_length_profile[5]++;
 400                        else
 401                                l_ptr->stats.msg_length_profile[6]++;
 402                }
 403        }
 404
 405        /* do all other link processing performed on a periodic basis */
 406
 407        link_check_defragm_bufs(l_ptr);
 408
 409        link_state_event(l_ptr, TIMEOUT_EVT);
 410
 411        if (l_ptr->next_out)
 412                tipc_link_push_queue(l_ptr);
 413
 414        tipc_node_unlock(l_ptr->owner);
 415}
 416
 417static void link_set_timer(struct link *l_ptr, u32 time)
 418{
 419        k_start_timer(&l_ptr->timer, time);
 420}
 421
 422/**
 423 * tipc_link_create - create a new link
 424 * @b_ptr: pointer to associated bearer
 425 * @peer: network address of node at other end of link
 426 * @media_addr: media address to use when sending messages over link
 427 *
 428 * Returns pointer to link.
 429 */
 430
 431struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
 432                              const struct tipc_media_addr *media_addr)
 433{
 434        struct link *l_ptr;
 435        struct tipc_msg *msg;
 436        char *if_name;
 437
 438        l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
 439        if (!l_ptr) {
 440                warn("Link creation failed, no memory\n");
 441                return NULL;
 442        }
 443
 444        if (LINK_LOG_BUF_SIZE) {
 445                char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC);
 446
 447                if (!pb) {
 448                        kfree(l_ptr);
 449                        warn("Link creation failed, no memory for print buffer\n");
 450                        return NULL;
 451                }
 452                tipc_printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE);
 453        }
 454
 455        l_ptr->addr = peer;
 456        if_name = strchr(b_ptr->publ.name, ':') + 1;
 457        sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
 458                tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
 459                tipc_node(tipc_own_addr),
 460                if_name,
 461                tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
 462                /* note: peer i/f is appended to link name by reset/activate */
 463        memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
 464        l_ptr->checkpoint = 1;
 465        l_ptr->b_ptr = b_ptr;
 466        link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
 467        l_ptr->state = RESET_UNKNOWN;
 468
 469        l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
 470        msg = l_ptr->pmsg;
 471        msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
 472        msg_set_size(msg, sizeof(l_ptr->proto_msg));
 473        msg_set_session(msg, (tipc_random & 0xffff));
 474        msg_set_bearer_id(msg, b_ptr->identity);
 475        strcpy((char *)msg_data(msg), if_name);
 476
 477        l_ptr->priority = b_ptr->priority;
 478        tipc_link_set_queue_limits(l_ptr, b_ptr->media->window);
 479
 480        link_init_max_pkt(l_ptr);
 481
 482        l_ptr->next_out_no = 1;
 483        INIT_LIST_HEAD(&l_ptr->waiting_ports);
 484
 485        link_reset_statistics(l_ptr);
 486
 487        l_ptr->owner = tipc_node_attach_link(l_ptr);
 488        if (!l_ptr->owner) {
 489                if (LINK_LOG_BUF_SIZE)
 490                        kfree(l_ptr->print_buf.buf);
 491                kfree(l_ptr);
 492                return NULL;
 493        }
 494
 495        k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
 496        list_add_tail(&l_ptr->link_list, &b_ptr->links);
 497        tipc_k_signal((Handler)tipc_link_start, (unsigned long)l_ptr);
 498
 499        dbg("tipc_link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n",
 500            l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit);
 501
 502        return l_ptr;
 503}
 504
 505/**
 506 * tipc_link_delete - delete a link
 507 * @l_ptr: pointer to link
 508 *
 509 * Note: 'tipc_net_lock' is write_locked, bearer is locked.
 510 * This routine must not grab the node lock until after link timer cancellation
 511 * to avoid a potential deadlock situation.
 512 */
 513
 514void tipc_link_delete(struct link *l_ptr)
 515{
 516        if (!l_ptr) {
 517                err("Attempt to delete non-existent link\n");
 518                return;
 519        }
 520
 521        dbg("tipc_link_delete()\n");
 522
 523        k_cancel_timer(&l_ptr->timer);
 524
 525        tipc_node_lock(l_ptr->owner);
 526        tipc_link_reset(l_ptr);
 527        tipc_node_detach_link(l_ptr->owner, l_ptr);
 528        tipc_link_stop(l_ptr);
 529        list_del_init(&l_ptr->link_list);
 530        if (LINK_LOG_BUF_SIZE)
 531                kfree(l_ptr->print_buf.buf);
 532        tipc_node_unlock(l_ptr->owner);
 533        k_term_timer(&l_ptr->timer);
 534        kfree(l_ptr);
 535}
 536
 537void tipc_link_start(struct link *l_ptr)
 538{
 539        dbg("tipc_link_start %x\n", l_ptr);
 540        link_state_event(l_ptr, STARTING_EVT);
 541}
 542
 543/**
 544 * link_schedule_port - schedule port for deferred sending
 545 * @l_ptr: pointer to link
 546 * @origport: reference to sending port
 547 * @sz: amount of data to be sent
 548 *
 549 * Schedules port for renewed sending of messages after link congestion
 550 * has abated.
 551 */
 552
 553static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
 554{
 555        struct port *p_ptr;
 556
 557        spin_lock_bh(&tipc_port_list_lock);
 558        p_ptr = tipc_port_lock(origport);
 559        if (p_ptr) {
 560                if (!p_ptr->wakeup)
 561                        goto exit;
 562                if (!list_empty(&p_ptr->wait_list))
 563                        goto exit;
 564                p_ptr->congested_link = l_ptr;
 565                p_ptr->publ.congested = 1;
 566                p_ptr->waiting_pkts = 1 + ((sz - 1) / link_max_pkt(l_ptr));
 567                list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
 568                l_ptr->stats.link_congs++;
 569exit:
 570                tipc_port_unlock(p_ptr);
 571        }
 572        spin_unlock_bh(&tipc_port_list_lock);
 573        return -ELINKCONG;
 574}
 575
 576void tipc_link_wakeup_ports(struct link *l_ptr, int all)
 577{
 578        struct port *p_ptr;
 579        struct port *temp_p_ptr;
 580        int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
 581
 582        if (all)
 583                win = 100000;
 584        if (win <= 0)
 585                return;
 586        if (!spin_trylock_bh(&tipc_port_list_lock))
 587                return;
 588        if (link_congested(l_ptr))
 589                goto exit;
 590        list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
 591                                 wait_list) {
 592                if (win <= 0)
 593                        break;
 594                list_del_init(&p_ptr->wait_list);
 595                p_ptr->congested_link = NULL;
 596                spin_lock_bh(p_ptr->publ.lock);
 597                p_ptr->publ.congested = 0;
 598                p_ptr->wakeup(&p_ptr->publ);
 599                win -= p_ptr->waiting_pkts;
 600                spin_unlock_bh(p_ptr->publ.lock);
 601        }
 602
 603exit:
 604        spin_unlock_bh(&tipc_port_list_lock);
 605}
 606
 607/**
 608 * link_release_outqueue - purge link's outbound message queue
 609 * @l_ptr: pointer to link
 610 */
 611
 612static void link_release_outqueue(struct link *l_ptr)
 613{
 614        struct sk_buff *buf = l_ptr->first_out;
 615        struct sk_buff *next;
 616
 617        while (buf) {
 618                next = buf->next;
 619                buf_discard(buf);
 620                buf = next;
 621        }
 622        l_ptr->first_out = NULL;
 623        l_ptr->out_queue_size = 0;
 624}
 625
 626/**
 627 * tipc_link_reset_fragments - purge link's inbound message fragments queue
 628 * @l_ptr: pointer to link
 629 */
 630
 631void tipc_link_reset_fragments(struct link *l_ptr)
 632{
 633        struct sk_buff *buf = l_ptr->defragm_buf;
 634        struct sk_buff *next;
 635
 636        while (buf) {
 637                next = buf->next;
 638                buf_discard(buf);
 639                buf = next;
 640        }
 641        l_ptr->defragm_buf = NULL;
 642}
 643
 644/**
 645 * tipc_link_stop - purge all inbound and outbound messages associated with link
 646 * @l_ptr: pointer to link
 647 */
 648
 649void tipc_link_stop(struct link *l_ptr)
 650{
 651        struct sk_buff *buf;
 652        struct sk_buff *next;
 653
 654        buf = l_ptr->oldest_deferred_in;
 655        while (buf) {
 656                next = buf->next;
 657                buf_discard(buf);
 658                buf = next;
 659        }
 660
 661        buf = l_ptr->first_out;
 662        while (buf) {
 663                next = buf->next;
 664                buf_discard(buf);
 665                buf = next;
 666        }
 667
 668        tipc_link_reset_fragments(l_ptr);
 669
 670        buf_discard(l_ptr->proto_msg_queue);
 671        l_ptr->proto_msg_queue = NULL;
 672}
 673
 674#if 0
 675
 676/* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
 677
 678static void link_recv_event(struct link_event *ev)
 679{
 680        ev->fcn(ev->addr, ev->name, ev->up);
 681        kfree(ev);
 682}
 683
 684static void link_send_event(void (*fcn)(u32 a, char *n, int up),
 685                            struct link *l_ptr, int up)
 686{
 687        struct link_event *ev;
 688
 689        ev = kmalloc(sizeof(*ev), GFP_ATOMIC);
 690        if (!ev) {
 691                warn("Link event allocation failure\n");
 692                return;
 693        }
 694        ev->addr = l_ptr->addr;
 695        ev->up = up;
 696        ev->fcn = fcn;
 697        memcpy(ev->name, l_ptr->name, TIPC_MAX_LINK_NAME);
 698        tipc_k_signal((Handler)link_recv_event, (unsigned long)ev);
 699}
 700
 701#else
 702
 703#define link_send_event(fcn, l_ptr, up) do { } while (0)
 704
 705#endif
 706
 707void tipc_link_reset(struct link *l_ptr)
 708{
 709        struct sk_buff *buf;
 710        u32 prev_state = l_ptr->state;
 711        u32 checkpoint = l_ptr->next_in_no;
 712        int was_active_link = tipc_link_is_active(l_ptr);
 713
 714        msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
 715
 716        /* Link is down, accept any session */
 717        l_ptr->peer_session = INVALID_SESSION;
 718
 719        /* Prepare for max packet size negotiation */
 720        link_init_max_pkt(l_ptr);
 721
 722        l_ptr->state = RESET_UNKNOWN;
 723        dbg_link_state("Resetting Link\n");
 724
 725        if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
 726                return;
 727
 728        tipc_node_link_down(l_ptr->owner, l_ptr);
 729        tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
 730#if 0
 731        tipc_printf(TIPC_CONS, "\nReset link <%s>\n", l_ptr->name);
 732        dbg_link_dump();
 733#endif
 734        if (was_active_link && tipc_node_has_active_links(l_ptr->owner) &&
 735            l_ptr->owner->permit_changeover) {
 736                l_ptr->reset_checkpoint = checkpoint;
 737                l_ptr->exp_msg_count = START_CHANGEOVER;
 738        }
 739
 740        /* Clean up all queues: */
 741
 742        link_release_outqueue(l_ptr);
 743        buf_discard(l_ptr->proto_msg_queue);
 744        l_ptr->proto_msg_queue = NULL;
 745        buf = l_ptr->oldest_deferred_in;
 746        while (buf) {
 747                struct sk_buff *next = buf->next;
 748                buf_discard(buf);
 749                buf = next;
 750        }
 751        if (!list_empty(&l_ptr->waiting_ports))
 752                tipc_link_wakeup_ports(l_ptr, 1);
 753
 754        l_ptr->retransm_queue_head = 0;
 755        l_ptr->retransm_queue_size = 0;
 756        l_ptr->last_out = NULL;
 757        l_ptr->first_out = NULL;
 758        l_ptr->next_out = NULL;
 759        l_ptr->unacked_window = 0;
 760        l_ptr->checkpoint = 1;
 761        l_ptr->next_out_no = 1;
 762        l_ptr->deferred_inqueue_sz = 0;
 763        l_ptr->oldest_deferred_in = NULL;
 764        l_ptr->newest_deferred_in = NULL;
 765        l_ptr->fsm_msg_cnt = 0;
 766        l_ptr->stale_count = 0;
 767        link_reset_statistics(l_ptr);
 768
 769        link_send_event(tipc_cfg_link_event, l_ptr, 0);
 770        if (!in_own_cluster(l_ptr->addr))
 771                link_send_event(tipc_disc_link_event, l_ptr, 0);
 772}
 773
 774
 775static void link_activate(struct link *l_ptr)
 776{
 777        l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
 778        tipc_node_link_up(l_ptr->owner, l_ptr);
 779        tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
 780        link_send_event(tipc_cfg_link_event, l_ptr, 1);
 781        if (!in_own_cluster(l_ptr->addr))
 782                link_send_event(tipc_disc_link_event, l_ptr, 1);
 783}
 784
 785/**
 786 * link_state_event - link finite state machine
 787 * @l_ptr: pointer to link
 788 * @event: state machine event to process
 789 */
 790
 791static void link_state_event(struct link *l_ptr, unsigned event)
 792{
 793        struct link *other;
 794        u32 cont_intv = l_ptr->continuity_interval;
 795
 796        if (!l_ptr->started && (event != STARTING_EVT))
 797                return;                /* Not yet. */
 798
 799        if (link_blocked(l_ptr)) {
 800                if (event == TIMEOUT_EVT) {
 801                        link_set_timer(l_ptr, cont_intv);
 802                }
 803                return;          /* Changeover going on */
 804        }
 805        dbg_link("STATE_EV: <%s> ", l_ptr->name);
 806
 807        switch (l_ptr->state) {
 808        case WORKING_WORKING:
 809                dbg_link("WW/");
 810                switch (event) {
 811                case TRAFFIC_MSG_EVT:
 812                        dbg_link("TRF-");
 813                        /* fall through */
 814                case ACTIVATE_MSG:
 815                        dbg_link("ACT\n");
 816                        break;
 817                case TIMEOUT_EVT:
 818                        dbg_link("TIM ");
 819                        if (l_ptr->next_in_no != l_ptr->checkpoint) {
 820                                l_ptr->checkpoint = l_ptr->next_in_no;
 821                                if (tipc_bclink_acks_missing(l_ptr->owner)) {
 822                                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
 823                                                                 0, 0, 0, 0, 0);
 824                                        l_ptr->fsm_msg_cnt++;
 825                                } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
 826                                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
 827                                                                 1, 0, 0, 0, 0);
 828                                        l_ptr->fsm_msg_cnt++;
 829                                }
 830                                link_set_timer(l_ptr, cont_intv);
 831                                break;
 832                        }
 833                        dbg_link(" -> WU\n");
 834                        l_ptr->state = WORKING_UNKNOWN;
 835                        l_ptr->fsm_msg_cnt = 0;
 836                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
 837                        l_ptr->fsm_msg_cnt++;
 838                        link_set_timer(l_ptr, cont_intv / 4);
 839                        break;
 840                case RESET_MSG:
 841                        dbg_link("RES -> RR\n");
 842                        info("Resetting link <%s>, requested by peer\n",
 843                             l_ptr->name);
 844                        tipc_link_reset(l_ptr);
 845                        l_ptr->state = RESET_RESET;
 846                        l_ptr->fsm_msg_cnt = 0;
 847                        tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
 848                        l_ptr->fsm_msg_cnt++;
 849                        link_set_timer(l_ptr, cont_intv);
 850                        break;
 851                default:
 852                        err("Unknown link event %u in WW state\n", event);
 853                }
 854                break;
 855        case WORKING_UNKNOWN:
 856                dbg_link("WU/");
 857                switch (event) {
 858                case TRAFFIC_MSG_EVT:
 859                        dbg_link("TRF-");
 860                case ACTIVATE_MSG:
 861                        dbg_link("ACT -> WW\n");
 862                        l_ptr->state = WORKING_WORKING;
 863                        l_ptr->fsm_msg_cnt = 0;
 864                        link_set_timer(l_ptr, cont_intv);
 865                        break;
 866                case RESET_MSG:
 867                        dbg_link("RES -> RR\n");
 868                        info("Resetting link <%s>, requested by peer "
 869                             "while probing\n", l_ptr->name);
 870                        tipc_link_reset(l_ptr);
 871                        l_ptr->state = RESET_RESET;
 872                        l_ptr->fsm_msg_cnt = 0;
 873                        tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
 874                        l_ptr->fsm_msg_cnt++;
 875                        link_set_timer(l_ptr, cont_intv);
 876                        break;
 877                case TIMEOUT_EVT:
 878                        dbg_link("TIM ");
 879                        if (l_ptr->next_in_no != l_ptr->checkpoint) {
 880                                dbg_link("-> WW \n");
 881                                l_ptr->state = WORKING_WORKING;
 882                                l_ptr->fsm_msg_cnt = 0;
 883                                l_ptr->checkpoint = l_ptr->next_in_no;
 884                                if (tipc_bclink_acks_missing(l_ptr->owner)) {
 885                                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
 886                                                                 0, 0, 0, 0, 0);
 887                                        l_ptr->fsm_msg_cnt++;
 888                                }
 889                                link_set_timer(l_ptr, cont_intv);
 890                        } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
 891                                dbg_link("Probing %u/%u,timer = %u ms)\n",
 892                                         l_ptr->fsm_msg_cnt, l_ptr->abort_limit,
 893                                         cont_intv / 4);
 894                                tipc_link_send_proto_msg(l_ptr, STATE_MSG,
 895                                                         1, 0, 0, 0, 0);
 896                                l_ptr->fsm_msg_cnt++;
 897                                link_set_timer(l_ptr, cont_intv / 4);
 898                        } else {        /* Link has failed */
 899                                dbg_link("-> RU (%u probes unanswered)\n",
 900                                         l_ptr->fsm_msg_cnt);
 901                                warn("Resetting link <%s>, peer not responding\n",
 902                                     l_ptr->name);
 903                                tipc_link_reset(l_ptr);
 904                                l_ptr->state = RESET_UNKNOWN;
 905                                l_ptr->fsm_msg_cnt = 0;
 906                                tipc_link_send_proto_msg(l_ptr, RESET_MSG,
 907                                                         0, 0, 0, 0, 0);
 908                                l_ptr->fsm_msg_cnt++;
 909                                link_set_timer(l_ptr, cont_intv);
 910                        }
 911                        break;
 912                default:
 913                        err("Unknown link event %u in WU state\n", event);
 914                }
 915                break;
 916        case RESET_UNKNOWN:
 917                dbg_link("RU/");
 918                switch (event) {
 919                case TRAFFIC_MSG_EVT:
 920                        dbg_link("TRF-\n");
 921                        break;
 922                case ACTIVATE_MSG:
 923                        other = l_ptr->owner->active_links[0];
 924                        if (other && link_working_unknown(other)) {
 925                                dbg_link("ACT\n");
 926                                break;
 927                        }
 928                        dbg_link("ACT -> WW\n");
 929                        l_ptr->state = WORKING_WORKING;
 930                        l_ptr->fsm_msg_cnt = 0;
 931                        link_activate(l_ptr);
 932                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
 933                        l_ptr->fsm_msg_cnt++;
 934                        link_set_timer(l_ptr, cont_intv);
 935                        break;
 936                case RESET_MSG:
 937                        dbg_link("RES \n");
 938                        dbg_link(" -> RR\n");
 939                        l_ptr->state = RESET_RESET;
 940                        l_ptr->fsm_msg_cnt = 0;
 941                        tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
 942                        l_ptr->fsm_msg_cnt++;
 943                        link_set_timer(l_ptr, cont_intv);
 944                        break;
 945                case STARTING_EVT:
 946                        dbg_link("START-");
 947                        l_ptr->started = 1;
 948                        /* fall through */
 949                case TIMEOUT_EVT:
 950                        dbg_link("TIM \n");
 951                        tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
 952                        l_ptr->fsm_msg_cnt++;
 953                        link_set_timer(l_ptr, cont_intv);
 954                        break;
 955                default:
 956                        err("Unknown link event %u in RU state\n", event);
 957                }
 958                break;
 959        case RESET_RESET:
 960                dbg_link("RR/ ");
 961                switch (event) {
 962                case TRAFFIC_MSG_EVT:
 963                        dbg_link("TRF-");
 964                        /* fall through */
 965                case ACTIVATE_MSG:
 966                        other = l_ptr->owner->active_links[0];
 967                        if (other && link_working_unknown(other)) {
 968                                dbg_link("ACT\n");
 969                                break;
 970                        }
 971                        dbg_link("ACT -> WW\n");
 972                        l_ptr->state = WORKING_WORKING;
 973                        l_ptr->fsm_msg_cnt = 0;
 974                        link_activate(l_ptr);
 975                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
 976                        l_ptr->fsm_msg_cnt++;
 977                        link_set_timer(l_ptr, cont_intv);
 978                        break;
 979                case RESET_MSG:
 980                        dbg_link("RES\n");
 981                        break;
 982                case TIMEOUT_EVT:
 983                        dbg_link("TIM\n");
 984                        tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
 985                        l_ptr->fsm_msg_cnt++;
 986                        link_set_timer(l_ptr, cont_intv);
 987                        dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt);
 988                        break;
 989                default:
 990                        err("Unknown link event %u in RR state\n", event);
 991                }
 992                break;
 993        default:
 994                err("Unknown link state %u/%u\n", l_ptr->state, event);
 995        }
 996}
 997
 998/*
 999 * link_bundle_buf(): Append contents of a buffer to
1000 * the tail of an existing one.
1001 */
1002
1003static int link_bundle_buf(struct link *l_ptr,
1004                           struct sk_buff *bundler,
1005                           struct sk_buff *buf)
1006{
1007        struct tipc_msg *bundler_msg = buf_msg(bundler);
1008        struct tipc_msg *msg = buf_msg(buf);
1009        u32 size = msg_size(msg);
1010        u32 bundle_size = msg_size(bundler_msg);
1011        u32 to_pos = align(bundle_size);
1012        u32 pad = to_pos - bundle_size;
1013
1014        if (msg_user(bundler_msg) != MSG_BUNDLER)
1015                return 0;
1016        if (msg_type(bundler_msg) != OPEN_MSG)
1017                return 0;
1018        if (skb_tailroom(bundler) < (pad + size))
1019                return 0;
1020        if (link_max_pkt(l_ptr) < (to_pos + size))
1021                return 0;
1022
1023        skb_put(bundler, pad + size);
1024        skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
1025        msg_set_size(bundler_msg, to_pos + size);
1026        msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
1027        dbg("Packed msg # %u(%u octets) into pos %u in buf(#%u)\n",
1028            msg_msgcnt(bundler_msg), size, to_pos, msg_seqno(bundler_msg));
1029        msg_dbg(msg, "PACKD:");
1030        buf_discard(buf);
1031        l_ptr->stats.sent_bundled++;
1032        return 1;
1033}
1034
1035static void link_add_to_outqueue(struct link *l_ptr,
1036                                 struct sk_buff *buf,
1037                                 struct tipc_msg *msg)
1038{
1039        u32 ack = mod(l_ptr->next_in_no - 1);
1040        u32 seqno = mod(l_ptr->next_out_no++);
1041
1042        msg_set_word(msg, 2, ((ack << 16) | seqno));
1043        msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1044        buf->next = NULL;
1045        if (l_ptr->first_out) {
1046                l_ptr->last_out->next = buf;
1047                l_ptr->last_out = buf;
1048        } else
1049                l_ptr->first_out = l_ptr->last_out = buf;
1050        l_ptr->out_queue_size++;
1051}
1052
1053/*
1054 * tipc_link_send_buf() is the 'full path' for messages, called from
1055 * inside TIPC when the 'fast path' in tipc_send_buf
1056 * has failed, and from link_send()
1057 */
1058
1059int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
1060{
1061        struct tipc_msg *msg = buf_msg(buf);
1062        u32 size = msg_size(msg);
1063        u32 dsz = msg_data_sz(msg);
1064        u32 queue_size = l_ptr->out_queue_size;
1065        u32 imp = msg_tot_importance(msg);
1066        u32 queue_limit = l_ptr->queue_limit[imp];
1067        u32 max_packet = link_max_pkt(l_ptr);
1068
1069        msg_set_prevnode(msg, tipc_own_addr);        /* If routed message */
1070
1071        /* Match msg importance against queue limits: */
1072
1073        if (unlikely(queue_size >= queue_limit)) {
1074                if (imp <= TIPC_CRITICAL_IMPORTANCE) {
1075                        return link_schedule_port(l_ptr, msg_origport(msg),
1076                                                  size);
1077                }
1078                msg_dbg(msg, "TIPC: Congestion, throwing away\n");
1079                buf_discard(buf);
1080                if (imp > CONN_MANAGER) {
1081                        warn("Resetting link <%s>, send queue full", l_ptr->name);
1082                        tipc_link_reset(l_ptr);
1083                }
1084                return dsz;
1085        }
1086
1087        /* Fragmentation needed ? */
1088
1089        if (size > max_packet)
1090                return tipc_link_send_long_buf(l_ptr, buf);
1091
1092        /* Packet can be queued or sent: */
1093
1094        if (queue_size > l_ptr->stats.max_queue_sz)
1095                l_ptr->stats.max_queue_sz = queue_size;
1096
1097        if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
1098                   !link_congested(l_ptr))) {
1099                link_add_to_outqueue(l_ptr, buf, msg);
1100
1101                if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
1102                        l_ptr->unacked_window = 0;
1103                } else {
1104                        tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1105                        l_ptr->stats.bearer_congs++;
1106                        l_ptr->next_out = buf;
1107                }
1108                return dsz;
1109        }
1110        /* Congestion: can message be bundled ?: */
1111
1112        if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
1113            (msg_user(msg) != MSG_FRAGMENTER)) {
1114
1115                /* Try adding message to an existing bundle */
1116
1117                if (l_ptr->next_out &&
1118                    link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
1119                        tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1120                        return dsz;
1121                }
1122
1123                /* Try creating a new bundle */
1124
1125                if (size <= max_packet * 2 / 3) {
1126                        struct sk_buff *bundler = buf_acquire(max_packet);
1127                        struct tipc_msg bundler_hdr;
1128
1129                        if (bundler) {
1130                                msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
1131                                         INT_H_SIZE, l_ptr->addr);
1132                                skb_copy_to_linear_data(bundler, &bundler_hdr,
1133                                                        INT_H_SIZE);
1134                                skb_trim(bundler, INT_H_SIZE);
1135                                link_bundle_buf(l_ptr, bundler, buf);
1136                                buf = bundler;
1137                                msg = buf_msg(buf);
1138                                l_ptr->stats.sent_bundles++;
1139                        }
1140                }
1141        }
1142        if (!l_ptr->next_out)
1143                l_ptr->next_out = buf;
1144        link_add_to_outqueue(l_ptr, buf, msg);
1145        tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1146        return dsz;
1147}
1148
1149/*
1150 * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
1151 * not been selected yet, and the the owner node is not locked
1152 * Called by TIPC internal users, e.g. the name distributor
1153 */
1154
1155int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
1156{
1157        struct link *l_ptr;
1158        struct tipc_node *n_ptr;
1159        int res = -ELINKCONG;
1160
1161        read_lock_bh(&tipc_net_lock);
1162        n_ptr = tipc_node_select(dest, selector);
1163        if (n_ptr) {
1164                tipc_node_lock(n_ptr);
1165                l_ptr = n_ptr->active_links[selector & 1];
1166                if (l_ptr) {
1167                        dbg("tipc_link_send: found link %x for dest %x\n", l_ptr, dest);
1168                        res = tipc_link_send_buf(l_ptr, buf);
1169                } else {
1170                        dbg("Attempt to send msg to unreachable node:\n");
1171                        msg_dbg(buf_msg(buf),">>>");
1172                        buf_discard(buf);
1173                }
1174                tipc_node_unlock(n_ptr);
1175        } else {
1176                dbg("Attempt to send msg to unknown node:\n");
1177                msg_dbg(buf_msg(buf),">>>");
1178                buf_discard(buf);
1179        }
1180        read_unlock_bh(&tipc_net_lock);
1181        return res;
1182}
1183
1184/*
1185 * link_send_buf_fast: Entry for data messages where the
1186 * destination link is known and the header is complete,
1187 * inclusive total message length. Very time critical.
1188 * Link is locked. Returns user data length.
1189 */
1190
1191static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf,
1192                              u32 *used_max_pkt)
1193{
1194        struct tipc_msg *msg = buf_msg(buf);
1195        int res = msg_data_sz(msg);
1196
1197        if (likely(!link_congested(l_ptr))) {
1198                if (likely(msg_size(msg) <= link_max_pkt(l_ptr))) {
1199                        if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
1200                                link_add_to_outqueue(l_ptr, buf, msg);
1201                                if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
1202                                                            &l_ptr->media_addr))) {
1203                                        l_ptr->unacked_window = 0;
1204                                        msg_dbg(msg,"SENT_FAST:");
1205                                        return res;
1206                                }
1207                                dbg("failed sent fast...\n");
1208                                tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1209                                l_ptr->stats.bearer_congs++;
1210                                l_ptr->next_out = buf;
1211                                return res;
1212                        }
1213                }
1214                else
1215                        *used_max_pkt = link_max_pkt(l_ptr);
1216        }
1217        return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
1218}
1219
1220/*
1221 * tipc_send_buf_fast: Entry for data messages where the
1222 * destination node is known and the header is complete,
1223 * inclusive total message length.
1224 * Returns user data length.
1225 */
1226int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
1227{
1228        struct link *l_ptr;
1229        struct tipc_node *n_ptr;
1230        int res;
1231        u32 selector = msg_origport(buf_msg(buf)) & 1;
1232        u32 dummy;
1233
1234        if (destnode == tipc_own_addr)
1235                return tipc_port_recv_msg(buf);
1236
1237        read_lock_bh(&tipc_net_lock);
1238        n_ptr = tipc_node_select(destnode, selector);
1239        if (likely(n_ptr)) {
1240                tipc_node_lock(n_ptr);
1241                l_ptr = n_ptr->active_links[selector];
1242                dbg("send_fast: buf %x selected %x, destnode = %x\n",
1243                    buf, l_ptr, destnode);
1244                if (likely(l_ptr)) {
1245                        res = link_send_buf_fast(l_ptr, buf, &dummy);
1246                        tipc_node_unlock(n_ptr);
1247                        read_unlock_bh(&tipc_net_lock);
1248                        return res;
1249                }
1250                tipc_node_unlock(n_ptr);
1251        }
1252        read_unlock_bh(&tipc_net_lock);
1253        res = msg_data_sz(buf_msg(buf));
1254        tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1255        return res;
1256}
1257
1258
1259/*
1260 * tipc_link_send_sections_fast: Entry for messages where the
1261 * destination processor is known and the header is complete,
1262 * except for total message length.
1263 * Returns user data length or errno.
1264 */
1265int tipc_link_send_sections_fast(struct port *sender,
1266                                 struct iovec const *msg_sect,
1267                                 const u32 num_sect,
1268                                 u32 destaddr)
1269{
1270        struct tipc_msg *hdr = &sender->publ.phdr;
1271        struct link *l_ptr;
1272        struct sk_buff *buf;
1273        struct tipc_node *node;
1274        int res;
1275        u32 selector = msg_origport(hdr) & 1;
1276
1277again:
1278        /*
1279         * Try building message using port's max_pkt hint.
1280         * (Must not hold any locks while building message.)
1281         */
1282
1283        res = msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt,
1284                        !sender->user_port, &buf);
1285
1286        read_lock_bh(&tipc_net_lock);
1287        node = tipc_node_select(destaddr, selector);
1288        if (likely(node)) {
1289                tipc_node_lock(node);
1290                l_ptr = node->active_links[selector];
1291                if (likely(l_ptr)) {
1292                        if (likely(buf)) {
1293                                res = link_send_buf_fast(l_ptr, buf,
1294                                                         &sender->publ.max_pkt);
1295                                if (unlikely(res < 0))
1296                                        buf_discard(buf);
1297exit:
1298                                tipc_node_unlock(node);
1299                                read_unlock_bh(&tipc_net_lock);
1300                                return res;
1301                        }
1302
1303                        /* Exit if build request was invalid */
1304
1305                        if (unlikely(res < 0))
1306                                goto exit;
1307
1308                        /* Exit if link (or bearer) is congested */
1309
1310                        if (link_congested(l_ptr) ||
1311                            !list_empty(&l_ptr->b_ptr->cong_links)) {
1312                                res = link_schedule_port(l_ptr,
1313                                                         sender->publ.ref, res);
1314                                goto exit;
1315                        }
1316
1317                        /*
1318                         * Message size exceeds max_pkt hint; update hint,
1319                         * then re-try fast path or fragment the message
1320                         */
1321
1322                        sender->publ.max_pkt = link_max_pkt(l_ptr);
1323                        tipc_node_unlock(node);
1324                        read_unlock_bh(&tipc_net_lock);
1325
1326
1327                        if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt)
1328                                goto again;
1329
1330                        return link_send_sections_long(sender, msg_sect,
1331                                                       num_sect, destaddr);
1332                }
1333                tipc_node_unlock(node);
1334        }
1335        read_unlock_bh(&tipc_net_lock);
1336
1337        /* Couldn't find a link to the destination node */
1338
1339        if (buf)
1340                return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1341        if (res >= 0)
1342                return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1343                                                 TIPC_ERR_NO_NODE);
1344        return res;
1345}
1346
1347/*
1348 * link_send_sections_long(): Entry for long messages where the
1349 * destination node is known and the header is complete,
1350 * inclusive total message length.
1351 * Link and bearer congestion status have been checked to be ok,
1352 * and are ignored if they change.
1353 *
1354 * Note that fragments do not use the full link MTU so that they won't have
1355 * to undergo refragmentation if link changeover causes them to be sent
1356 * over another link with an additional tunnel header added as prefix.
1357 * (Refragmentation will still occur if the other link has a smaller MTU.)
1358 *
1359 * Returns user data length or errno.
1360 */
1361static int link_send_sections_long(struct port *sender,
1362                                   struct iovec const *msg_sect,
1363                                   u32 num_sect,
1364                                   u32 destaddr)
1365{
1366        struct link *l_ptr;
1367        struct tipc_node *node;
1368        struct tipc_msg *hdr = &sender->publ.phdr;
1369        u32 dsz = msg_data_sz(hdr);
1370        u32 max_pkt,fragm_sz,rest;
1371        struct tipc_msg fragm_hdr;
1372        struct sk_buff *buf,*buf_chain,*prev;
1373        u32 fragm_crs,fragm_rest,hsz,sect_rest;
1374        const unchar *sect_crs;
1375        int curr_sect;
1376        u32 fragm_no;
1377
1378again:
1379        fragm_no = 1;
1380        max_pkt = sender->publ.max_pkt - INT_H_SIZE;
1381                /* leave room for tunnel header in case of link changeover */
1382        fragm_sz = max_pkt - INT_H_SIZE;
1383                /* leave room for fragmentation header in each fragment */
1384        rest = dsz;
1385        fragm_crs = 0;
1386        fragm_rest = 0;
1387        sect_rest = 0;
1388        sect_crs = NULL;
1389        curr_sect = -1;
1390
1391        /* Prepare reusable fragment header: */
1392
1393        msg_dbg(hdr, ">FRAGMENTING>");
1394        msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1395                 INT_H_SIZE, msg_destnode(hdr));
1396        msg_set_link_selector(&fragm_hdr, sender->publ.ref);
1397        msg_set_size(&fragm_hdr, max_pkt);
1398        msg_set_fragm_no(&fragm_hdr, 1);
1399
1400        /* Prepare header of first fragment: */
1401
1402        buf_chain = buf = buf_acquire(max_pkt);
1403        if (!buf)
1404                return -ENOMEM;
1405        buf->next = NULL;
1406        skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1407        hsz = msg_hdr_sz(hdr);
1408        skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1409        msg_dbg(buf_msg(buf), ">BUILD>");
1410
1411        /* Chop up message: */
1412
1413        fragm_crs = INT_H_SIZE + hsz;
1414        fragm_rest = fragm_sz - hsz;
1415
1416        do {                /* For all sections */
1417                u32 sz;
1418
1419                if (!sect_rest) {
1420                        sect_rest = msg_sect[++curr_sect].iov_len;
1421                        sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1422                }
1423
1424                if (sect_rest < fragm_rest)
1425                        sz = sect_rest;
1426                else
1427                        sz = fragm_rest;
1428
1429                if (likely(!sender->user_port)) {
1430                        if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1431error:
1432                                for (; buf_chain; buf_chain = buf) {
1433                                        buf = buf_chain->next;
1434                                        buf_discard(buf_chain);
1435                                }
1436                                return -EFAULT;
1437                        }
1438                } else
1439                        skb_copy_to_linear_data_offset(buf, fragm_crs,
1440                                                       sect_crs, sz);
1441                sect_crs += sz;
1442                sect_rest -= sz;
1443                fragm_crs += sz;
1444                fragm_rest -= sz;
1445                rest -= sz;
1446
1447                if (!fragm_rest && rest) {
1448
1449                        /* Initiate new fragment: */
1450                        if (rest <= fragm_sz) {
1451                                fragm_sz = rest;
1452                                msg_set_type(&fragm_hdr,LAST_FRAGMENT);
1453                        } else {
1454                                msg_set_type(&fragm_hdr, FRAGMENT);
1455                        }
1456                        msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1457                        msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1458                        prev = buf;
1459                        buf = buf_acquire(fragm_sz + INT_H_SIZE);
1460                        if (!buf)
1461                                goto error;
1462
1463                        buf->next = NULL;
1464                        prev->next = buf;
1465                        skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1466                        fragm_crs = INT_H_SIZE;
1467                        fragm_rest = fragm_sz;
1468                        msg_dbg(buf_msg(buf),"  >BUILD>");
1469                }
1470        }
1471        while (rest > 0);
1472
1473        /*
1474         * Now we have a buffer chain. Select a link and check
1475         * that packet size is still OK
1476         */
1477        node = tipc_node_select(destaddr, sender->publ.ref & 1);
1478        if (likely(node)) {
1479                tipc_node_lock(node);
1480                l_ptr = node->active_links[sender->publ.ref & 1];
1481                if (!l_ptr) {
1482                        tipc_node_unlock(node);
1483                        goto reject;
1484                }
1485                if (link_max_pkt(l_ptr) < max_pkt) {
1486                        sender->publ.max_pkt = link_max_pkt(l_ptr);
1487                        tipc_node_unlock(node);
1488                        for (; buf_chain; buf_chain = buf) {
1489                                buf = buf_chain->next;
1490                                buf_discard(buf_chain);
1491                        }
1492                        goto again;
1493                }
1494        } else {
1495reject:
1496                for (; buf_chain; buf_chain = buf) {
1497                        buf = buf_chain->next;
1498                        buf_discard(buf_chain);
1499                }
1500                return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1501                                                 TIPC_ERR_NO_NODE);
1502        }
1503
1504        /* Append whole chain to send queue: */
1505
1506        buf = buf_chain;
1507        l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1);
1508        if (!l_ptr->next_out)
1509                l_ptr->next_out = buf_chain;
1510        l_ptr->stats.sent_fragmented++;
1511        while (buf) {
1512                struct sk_buff *next = buf->next;
1513                struct tipc_msg *msg = buf_msg(buf);
1514
1515                l_ptr->stats.sent_fragments++;
1516                msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
1517                link_add_to_outqueue(l_ptr, buf, msg);
1518                msg_dbg(msg, ">ADD>");
1519                buf = next;
1520        }
1521
1522        /* Send it, if possible: */
1523
1524        tipc_link_push_queue(l_ptr);
1525        tipc_node_unlock(node);
1526        return dsz;
1527}
1528
1529/*
1530 * tipc_link_push_packet: Push one unsent packet to the media
1531 */
1532u32 tipc_link_push_packet(struct link *l_ptr)
1533{
1534        struct sk_buff *buf = l_ptr->first_out;
1535        u32 r_q_size = l_ptr->retransm_queue_size;
1536        u32 r_q_head = l_ptr->retransm_queue_head;
1537
1538        /* Step to position where retransmission failed, if any,    */
1539        /* consider that buffers may have been released in meantime */
1540
1541        if (r_q_size && buf) {
1542                u32 last = lesser(mod(r_q_head + r_q_size),
1543                                  link_last_sent(l_ptr));
1544                u32 first = msg_seqno(buf_msg(buf));
1545
1546                while (buf && less(first, r_q_head)) {
1547                        first = mod(first + 1);
1548                        buf = buf->next;
1549                }
1550                l_ptr->retransm_queue_head = r_q_head = first;
1551                l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1552        }
1553
1554        /* Continue retransmission now, if there is anything: */
1555
1556        if (r_q_size && buf && !skb_cloned(buf)) {
1557                msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1558                msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1559                if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1560                        msg_dbg(buf_msg(buf), ">DEF-RETR>");
1561                        l_ptr->retransm_queue_head = mod(++r_q_head);
1562                        l_ptr->retransm_queue_size = --r_q_size;
1563                        l_ptr->stats.retransmitted++;
1564                        return 0;
1565                } else {
1566                        l_ptr->stats.bearer_congs++;
1567                        msg_dbg(buf_msg(buf), "|>DEF-RETR>");
1568                        return PUSH_FAILED;
1569                }
1570        }
1571
1572        /* Send deferred protocol message, if any: */
1573
1574        buf = l_ptr->proto_msg_queue;
1575        if (buf) {
1576                msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1577                msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in);
1578                if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1579                        msg_dbg(buf_msg(buf), ">DEF-PROT>");
1580                        l_ptr->unacked_window = 0;
1581                        buf_discard(buf);
1582                        l_ptr->proto_msg_queue = NULL;
1583                        return 0;
1584                } else {
1585                        msg_dbg(buf_msg(buf), "|>DEF-PROT>");
1586                        l_ptr->stats.bearer_congs++;
1587                        return PUSH_FAILED;
1588                }
1589        }
1590
1591        /* Send one deferred data message, if send window not full: */
1592
1593        buf = l_ptr->next_out;
1594        if (buf) {
1595                struct tipc_msg *msg = buf_msg(buf);
1596                u32 next = msg_seqno(msg);
1597                u32 first = msg_seqno(buf_msg(l_ptr->first_out));
1598
1599                if (mod(next - first) < l_ptr->queue_limit[0]) {
1600                        msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1601                        msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1602                        if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1603                                if (msg_user(msg) == MSG_BUNDLER)
1604                                        msg_set_type(msg, CLOSED_MSG);
1605                                msg_dbg(msg, ">PUSH-DATA>");
1606                                l_ptr->next_out = buf->next;
1607                                return 0;
1608                        } else {
1609                                msg_dbg(msg, "|PUSH-DATA|");
1610                                l_ptr->stats.bearer_congs++;
1611                                return PUSH_FAILED;
1612                        }
1613                }
1614        }
1615        return PUSH_FINISHED;
1616}
1617
1618/*
1619 * push_queue(): push out the unsent messages of a link where
1620 *               congestion has abated. Node is locked
1621 */
1622void tipc_link_push_queue(struct link *l_ptr)
1623{
1624        u32 res;
1625
1626        if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
1627                return;
1628
1629        do {
1630                res = tipc_link_push_packet(l_ptr);
1631        } while (!res);
1632
1633        if (res == PUSH_FAILED)
1634                tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1635}
1636
1637static void link_reset_all(unsigned long addr)
1638{
1639        struct tipc_node *n_ptr;
1640        char addr_string[16];
1641        u32 i;
1642
1643        read_lock_bh(&tipc_net_lock);
1644        n_ptr = tipc_node_find((u32)addr);
1645        if (!n_ptr) {
1646                read_unlock_bh(&tipc_net_lock);
1647                return;        /* node no longer exists */
1648        }
1649
1650        tipc_node_lock(n_ptr);
1651
1652        warn("Resetting all links to %s\n",
1653             addr_string_fill(addr_string, n_ptr->addr));
1654
1655        for (i = 0; i < MAX_BEARERS; i++) {
1656                if (n_ptr->links[i]) {
1657                        link_print(n_ptr->links[i], TIPC_OUTPUT,
1658                                   "Resetting link\n");
1659                        tipc_link_reset(n_ptr->links[i]);
1660                }
1661        }
1662
1663        tipc_node_unlock(n_ptr);
1664        read_unlock_bh(&tipc_net_lock);
1665}
1666
1667static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)
1668{
1669        struct tipc_msg *msg = buf_msg(buf);
1670
1671        warn("Retransmission failure on link <%s>\n", l_ptr->name);
1672        tipc_msg_dbg(TIPC_OUTPUT, msg, ">RETR-FAIL>");
1673
1674        if (l_ptr->addr) {
1675
1676                /* Handle failure on standard link */
1677
1678                link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n");
1679                tipc_link_reset(l_ptr);
1680
1681        } else {
1682
1683                /* Handle failure on broadcast link */
1684
1685                struct tipc_node *n_ptr;
1686                char addr_string[16];
1687
1688                tipc_printf(TIPC_OUTPUT, "Msg seq number: %u,  ", msg_seqno(msg));
1689                tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n",
1690                                     (unsigned long) TIPC_SKB_CB(buf)->handle);
1691
1692                n_ptr = l_ptr->owner->next;
1693                tipc_node_lock(n_ptr);
1694
1695                addr_string_fill(addr_string, n_ptr->addr);
1696                tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string);
1697                tipc_printf(TIPC_OUTPUT, "Supported: %d,  ", n_ptr->bclink.supported);
1698                tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked);
1699                tipc_printf(TIPC_OUTPUT, "Last in: %u,  ", n_ptr->bclink.last_in);
1700                tipc_printf(TIPC_OUTPUT, "Gap after: %u,  ", n_ptr->bclink.gap_after);
1701                tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to);
1702                tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
1703
1704                tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1705
1706                tipc_node_unlock(n_ptr);
1707
1708                l_ptr->stale_count = 0;
1709        }
1710}
1711
1712void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,
1713                          u32 retransmits)
1714{
1715        struct tipc_msg *msg;
1716
1717        if (!buf)
1718                return;
1719
1720        msg = buf_msg(buf);
1721
1722        dbg("Retransmitting %u in link %x\n", retransmits, l_ptr);
1723
1724        if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
1725                if (!skb_cloned(buf)) {
1726                        msg_dbg(msg, ">NO_RETR->BCONG>");
1727                        dbg_print_link(l_ptr, "   ");
1728                        l_ptr->retransm_queue_head = msg_seqno(msg);
1729                        l_ptr->retransm_queue_size = retransmits;
1730                        return;
1731                } else {
1732                        /* Don't retransmit if driver already has the buffer */
1733                }
1734        } else {
1735                /* Detect repeated retransmit failures on uncongested bearer */
1736
1737                if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1738                        if (++l_ptr->stale_count > 100) {
1739                                link_retransmit_failure(l_ptr, buf);
1740                                return;
1741                        }
1742                } else {
1743                        l_ptr->last_retransmitted = msg_seqno(msg);
1744                        l_ptr->stale_count = 1;
1745                }
1746        }
1747
1748        while (retransmits && (buf != l_ptr->next_out) && buf && !skb_cloned(buf)) {
1749                msg = buf_msg(buf);
1750                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1751                msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1752                if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1753                        msg_dbg(buf_msg(buf), ">RETR>");
1754                        buf = buf->next;
1755                        retransmits--;
1756                        l_ptr->stats.retransmitted++;
1757                } else {
1758                        tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1759                        l_ptr->stats.bearer_congs++;
1760                        l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));
1761                        l_ptr->retransm_queue_size = retransmits;
1762                        return;
1763                }
1764        }
1765
1766        l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1767}
1768
1769/**
1770 * link_insert_deferred_queue - insert deferred messages back into receive chain
1771 */
1772
1773static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr,
1774                                                  struct sk_buff *buf)
1775{
1776        u32 seq_no;
1777
1778        if (l_ptr->oldest_deferred_in == NULL)
1779                return buf;
1780
1781        seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
1782        if (seq_no == mod(l_ptr->next_in_no)) {
1783                l_ptr->newest_deferred_in->next = buf;
1784                buf = l_ptr->oldest_deferred_in;
1785                l_ptr->oldest_deferred_in = NULL;
1786                l_ptr->deferred_inqueue_sz = 0;
1787        }
1788        return buf;
1789}
1790
1791/**
1792 * link_recv_buf_validate - validate basic format of received message
1793 *
1794 * This routine ensures a TIPC message has an acceptable header, and at least
1795 * as much data as the header indicates it should.  The routine also ensures
1796 * that the entire message header is stored in the main fragment of the message
1797 * buffer, to simplify future access to message header fields.
1798 *
1799 * Note: Having extra info present in the message header or data areas is OK.
1800 * TIPC will ignore the excess, under the assumption that it is optional info
1801 * introduced by a later release of the protocol.
1802 */
1803
1804static int link_recv_buf_validate(struct sk_buff *buf)
1805{
1806        static u32 min_data_hdr_size[8] = {
1807                SHORT_H_SIZE, MCAST_H_SIZE, LONG_H_SIZE, DIR_MSG_H_SIZE,
1808                MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1809                };
1810
1811        struct tipc_msg *msg;
1812        u32 tipc_hdr[2];
1813        u32 size;
1814        u32 hdr_size;
1815        u32 min_hdr_size;
1816
1817        if (unlikely(buf->len < MIN_H_SIZE))
1818                return 0;
1819
1820        msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1821        if (msg == NULL)
1822                return 0;
1823
1824        if (unlikely(msg_version(msg) != TIPC_VERSION))
1825                return 0;
1826
1827        size = msg_size(msg);
1828        hdr_size = msg_hdr_sz(msg);
1829        min_hdr_size = msg_isdata(msg) ?
1830                min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1831
1832        if (unlikely((hdr_size < min_hdr_size) ||
1833                     (size < hdr_size) ||
1834                     (buf->len < size) ||
1835                     (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1836                return 0;
1837
1838        return pskb_may_pull(buf, hdr_size);
1839}
1840
1841void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
1842{
1843        read_lock_bh(&tipc_net_lock);
1844        while (head) {
1845                struct bearer *b_ptr = (struct bearer *)tb_ptr;
1846                struct tipc_node *n_ptr;
1847                struct link *l_ptr;
1848                struct sk_buff *crs;
1849                struct sk_buff *buf = head;
1850                struct tipc_msg *msg;
1851                u32 seq_no;
1852                u32 ackd;
1853                u32 released = 0;
1854                int type;
1855
1856                head = head->next;
1857
1858                /* Ensure message is well-formed */
1859
1860                if (unlikely(!link_recv_buf_validate(buf)))
1861                        goto cont;
1862
1863                /* Ensure message data is a single contiguous unit */
1864
1865                if (unlikely(buf_linearize(buf))) {
1866                        goto cont;
1867                }
1868
1869                /* Handle arrival of a non-unicast link message */
1870
1871                msg = buf_msg(buf);
1872
1873                if (unlikely(msg_non_seq(msg))) {
1874                        if (msg_user(msg) ==  LINK_CONFIG)
1875                                tipc_disc_recv_msg(buf, b_ptr);
1876                        else
1877                                tipc_bclink_recv_pkt(buf);
1878                        continue;
1879                }
1880
1881                if (unlikely(!msg_short(msg) &&
1882                             (msg_destnode(msg) != tipc_own_addr)))
1883                        goto cont;
1884
1885                /* Locate unicast link endpoint that should handle message */
1886
1887                n_ptr = tipc_node_find(msg_prevnode(msg));
1888                if (unlikely(!n_ptr))
1889                        goto cont;
1890                tipc_node_lock(n_ptr);
1891
1892                l_ptr = n_ptr->links[b_ptr->identity];
1893                if (unlikely(!l_ptr)) {
1894                        tipc_node_unlock(n_ptr);
1895                        goto cont;
1896                }
1897
1898                /* Validate message sequence number info */
1899
1900                seq_no = msg_seqno(msg);
1901                ackd = msg_ack(msg);
1902
1903                /* Release acked messages */
1904
1905                if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {
1906                        if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
1907                                tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1908                }
1909
1910                crs = l_ptr->first_out;
1911                while ((crs != l_ptr->next_out) &&
1912                       less_eq(msg_seqno(buf_msg(crs)), ackd)) {
1913                        struct sk_buff *next = crs->next;
1914
1915                        buf_discard(crs);
1916                        crs = next;
1917                        released++;
1918                }
1919                if (released) {
1920                        l_ptr->first_out = crs;
1921                        l_ptr->out_queue_size -= released;
1922                }
1923
1924                /* Try sending any messages link endpoint has pending */
1925
1926                if (unlikely(l_ptr->next_out))
1927                        tipc_link_push_queue(l_ptr);
1928                if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1929                        tipc_link_wakeup_ports(l_ptr, 0);
1930                if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1931                        l_ptr->stats.sent_acks++;
1932                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1933                }
1934
1935                /* Now (finally!) process the incoming message */
1936
1937protocol_check:
1938                if (likely(link_working_working(l_ptr))) {
1939                        if (likely(seq_no == mod(l_ptr->next_in_no))) {
1940                                l_ptr->next_in_no++;
1941                                if (unlikely(l_ptr->oldest_deferred_in))
1942                                        head = link_insert_deferred_queue(l_ptr,
1943                                                                          head);
1944                                if (likely(msg_is_dest(msg, tipc_own_addr))) {
1945deliver:
1946                                        if (likely(msg_isdata(msg))) {
1947                                                tipc_node_unlock(n_ptr);
1948                                                tipc_port_recv_msg(buf);
1949                                                continue;
1950                                        }
1951                                        switch (msg_user(msg)) {
1952                                        case MSG_BUNDLER:
1953                                                l_ptr->stats.recv_bundles++;
1954                                                l_ptr->stats.recv_bundled +=
1955                                                        msg_msgcnt(msg);
1956                                                tipc_node_unlock(n_ptr);
1957                                                tipc_link_recv_bundle(buf);
1958                                                continue;
1959                                        case ROUTE_DISTRIBUTOR:
1960                                                tipc_node_unlock(n_ptr);
1961                                                tipc_cltr_recv_routing_table(buf);
1962                                                continue;
1963                                        case NAME_DISTRIBUTOR:
1964                                                tipc_node_unlock(n_ptr);
1965                                                tipc_named_recv(buf);
1966                                                continue;
1967                                        case CONN_MANAGER:
1968                                                tipc_node_unlock(n_ptr);
1969                                                tipc_port_recv_proto_msg(buf);
1970                                                continue;
1971                                        case MSG_FRAGMENTER:
1972                                                l_ptr->stats.recv_fragments++;
1973                                                if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
1974                                                                            &buf, &msg)) {
1975                                                        l_ptr->stats.recv_fragmented++;
1976                                                        goto deliver;
1977                                                }
1978                                                break;
1979                                        case CHANGEOVER_PROTOCOL:
1980                                                type = msg_type(msg);
1981                                                if (link_recv_changeover_msg(&l_ptr, &buf)) {
1982                                                        msg = buf_msg(buf);
1983                                                        seq_no = msg_seqno(msg);
1984                                                        if (type == ORIGINAL_MSG)
1985                                                                goto deliver;
1986                                                        goto protocol_check;
1987                                                }
1988                                                break;
1989                                        }
1990                                }
1991                                tipc_node_unlock(n_ptr);
1992                                tipc_net_route_msg(buf);
1993                                continue;
1994                        }
1995                        link_handle_out_of_seq_msg(l_ptr, buf);
1996                        head = link_insert_deferred_queue(l_ptr, head);
1997                        tipc_node_unlock(n_ptr);
1998                        continue;
1999                }
2000
2001                if (msg_user(msg) == LINK_PROTOCOL) {
2002                        link_recv_proto_msg(l_ptr, buf);
2003                        head = link_insert_deferred_queue(l_ptr, head);
2004                        tipc_node_unlock(n_ptr);
2005                        continue;
2006                }
2007                msg_dbg(msg,"NSEQ<REC<");
2008                link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2009
2010                if (link_working_working(l_ptr)) {
2011                        /* Re-insert in front of queue */
2012                        msg_dbg(msg,"RECV-REINS:");
2013                        buf->next = head;
2014                        head = buf;
2015                        tipc_node_unlock(n_ptr);
2016                        continue;
2017                }
2018                tipc_node_unlock(n_ptr);
2019cont:
2020                buf_discard(buf);
2021        }
2022        read_unlock_bh(&tipc_net_lock);
2023}
2024
2025/*
2026 * link_defer_buf(): Sort a received out-of-sequence packet
2027 *                   into the deferred reception queue.
2028 * Returns the increase of the queue length,i.e. 0 or 1
2029 */
2030
2031u32 tipc_link_defer_pkt(struct sk_buff **head,
2032                        struct sk_buff **tail,
2033                        struct sk_buff *buf)
2034{
2035        struct sk_buff *prev = NULL;
2036        struct sk_buff *crs = *head;
2037        u32 seq_no = msg_seqno(buf_msg(buf));
2038
2039        buf->next = NULL;
2040
2041        /* Empty queue ? */
2042        if (*head == NULL) {
2043                *head = *tail = buf;
2044                return 1;
2045        }
2046
2047        /* Last ? */
2048        if (less(msg_seqno(buf_msg(*tail)), seq_no)) {
2049                (*tail)->next = buf;
2050                *tail = buf;
2051                return 1;
2052        }
2053
2054        /* Scan through queue and sort it in */
2055        do {
2056                struct tipc_msg *msg = buf_msg(crs);
2057
2058                if (less(seq_no, msg_seqno(msg))) {
2059                        buf->next = crs;
2060                        if (prev)
2061                                prev->next = buf;
2062                        else
2063                                *head = buf;
2064                        return 1;
2065                }
2066                if (seq_no == msg_seqno(msg)) {
2067                        break;
2068                }
2069                prev = crs;
2070                crs = crs->next;
2071        }
2072        while (crs);
2073
2074        /* Message is a duplicate of an existing message */
2075
2076        buf_discard(buf);
2077        return 0;
2078}
2079
2080/**
2081 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
2082 */
2083
2084static void link_handle_out_of_seq_msg(struct link *l_ptr,
2085                                       struct sk_buff *buf)
2086{
2087        u32 seq_no = msg_seqno(buf_msg(buf));
2088
2089        if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
2090                link_recv_proto_msg(l_ptr, buf);
2091                return;
2092        }
2093
2094        dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n",
2095            seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no);
2096
2097        /* Record OOS packet arrival (force mismatch on next timeout) */
2098
2099        l_ptr->checkpoint--;
2100
2101        /*
2102         * Discard packet if a duplicate; otherwise add it to deferred queue
2103         * and notify peer of gap as per protocol specification
2104         */
2105
2106        if (less(seq_no, mod(l_ptr->next_in_no))) {
2107                l_ptr->stats.duplicates++;
2108                buf_discard(buf);
2109                return;
2110        }
2111
2112        if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
2113                                &l_ptr->newest_deferred_in, buf)) {
2114                l_ptr->deferred_inqueue_sz++;
2115                l_ptr->stats.deferred_recv++;
2116                if ((l_ptr->deferred_inqueue_sz % 16) == 1)
2117                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
2118        } else
2119                l_ptr->stats.duplicates++;
2120}
2121
2122/*
2123 * Send protocol message to the other endpoint.
2124 */
2125void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
2126                              u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
2127{
2128        struct sk_buff *buf = NULL;
2129        struct tipc_msg *msg = l_ptr->pmsg;
2130        u32 msg_size = sizeof(l_ptr->proto_msg);
2131
2132        if (link_blocked(l_ptr))
2133                return;
2134        msg_set_type(msg, msg_typ);
2135        msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
2136        msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
2137        msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
2138
2139        if (msg_typ == STATE_MSG) {
2140                u32 next_sent = mod(l_ptr->next_out_no);
2141
2142                if (!tipc_link_is_up(l_ptr))
2143                        return;
2144                if (l_ptr->next_out)
2145                        next_sent = msg_seqno(buf_msg(l_ptr->next_out));
2146                msg_set_next_sent(msg, next_sent);
2147                if (l_ptr->oldest_deferred_in) {
2148                        u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
2149                        gap = mod(rec - mod(l_ptr->next_in_no));
2150                }
2151                msg_set_seq_gap(msg, gap);
2152                if (gap)
2153                        l_ptr->stats.sent_nacks++;
2154                msg_set_link_tolerance(msg, tolerance);
2155                msg_set_linkprio(msg, priority);
2156                msg_set_max_pkt(msg, ack_mtu);
2157                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
2158                msg_set_probe(msg, probe_msg != 0);
2159                if (probe_msg) {
2160                        u32 mtu = l_ptr->max_pkt;
2161
2162                        if ((mtu < l_ptr->max_pkt_target) &&
2163                            link_working_working(l_ptr) &&
2164                            l_ptr->fsm_msg_cnt) {
2165                                msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2166                                if (l_ptr->max_pkt_probes == 10) {
2167                                        l_ptr->max_pkt_target = (msg_size - 4);
2168                                        l_ptr->max_pkt_probes = 0;
2169                                        msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2170                                }
2171                                l_ptr->max_pkt_probes++;
2172                        }
2173
2174                        l_ptr->stats.sent_probes++;
2175                }
2176                l_ptr->stats.sent_states++;
2177        } else {                /* RESET_MSG or ACTIVATE_MSG */
2178                msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
2179                msg_set_seq_gap(msg, 0);
2180                msg_set_next_sent(msg, 1);
2181                msg_set_link_tolerance(msg, l_ptr->tolerance);
2182                msg_set_linkprio(msg, l_ptr->priority);
2183                msg_set_max_pkt(msg, l_ptr->max_pkt_target);
2184        }
2185
2186        if (tipc_node_has_redundant_links(l_ptr->owner)) {
2187                msg_set_redundant_link(msg);
2188        } else {
2189                msg_clear_redundant_link(msg);
2190        }
2191        msg_set_linkprio(msg, l_ptr->priority);
2192
2193        /* Ensure sequence number will not fit : */
2194
2195        msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
2196
2197        /* Congestion? */
2198
2199        if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
2200                if (!l_ptr->proto_msg_queue) {
2201                        l_ptr->proto_msg_queue =
2202                                buf_acquire(sizeof(l_ptr->proto_msg));
2203                }
2204                buf = l_ptr->proto_msg_queue;
2205                if (!buf)
2206                        return;
2207                skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2208                return;
2209        }
2210        msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
2211
2212        /* Message can be sent */
2213
2214        msg_dbg(msg, ">>");
2215
2216        buf = buf_acquire(msg_size);
2217        if (!buf)
2218                return;
2219
2220        skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2221        msg_set_size(buf_msg(buf), msg_size);
2222
2223        if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
2224                l_ptr->unacked_window = 0;
2225                buf_discard(buf);
2226                return;
2227        }
2228
2229        /* New congestion */
2230        tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
2231        l_ptr->proto_msg_queue = buf;
2232        l_ptr->stats.bearer_congs++;
2233}
2234
2235/*
2236 * Receive protocol message :
2237 * Note that network plane id propagates through the network, and may
2238 * change at any time. The node with lowest address rules
2239 */
2240
2241static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
2242{
2243        u32 rec_gap = 0;
2244        u32 max_pkt_info;
2245        u32 max_pkt_ack;
2246        u32 msg_tol;
2247        struct tipc_msg *msg = buf_msg(buf);
2248
2249        dbg("AT(%u):", jiffies_to_msecs(jiffies));
2250        msg_dbg(msg, "<<");
2251        if (link_blocked(l_ptr))
2252                goto exit;
2253
2254        /* record unnumbered packet arrival (force mismatch on next timeout) */
2255
2256        l_ptr->checkpoint--;
2257
2258        if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
2259                if (tipc_own_addr > msg_prevnode(msg))
2260                        l_ptr->b_ptr->net_plane = msg_net_plane(msg);
2261
2262        l_ptr->owner->permit_changeover = msg_redundant_link(msg);
2263
2264        switch (msg_type(msg)) {
2265
2266        case RESET_MSG:
2267                if (!link_working_unknown(l_ptr) &&
2268                    (l_ptr->peer_session != INVALID_SESSION)) {
2269                        if (msg_session(msg) == l_ptr->peer_session) {
2270                                dbg("Duplicate RESET: %u<->%u\n",
2271                                    msg_session(msg), l_ptr->peer_session);
2272                                break; /* duplicate: ignore */
2273                        }
2274                }
2275                /* fall thru' */
2276        case ACTIVATE_MSG:
2277                /* Update link settings according other endpoint's values */
2278
2279                strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2280
2281                if ((msg_tol = msg_link_tolerance(msg)) &&
2282                    (msg_tol > l_ptr->tolerance))
2283                        link_set_supervision_props(l_ptr, msg_tol);
2284
2285                if (msg_linkprio(msg) > l_ptr->priority)
2286                        l_ptr->priority = msg_linkprio(msg);
2287
2288                max_pkt_info = msg_max_pkt(msg);
2289                if (max_pkt_info) {
2290                        if (max_pkt_info < l_ptr->max_pkt_target)
2291                                l_ptr->max_pkt_target = max_pkt_info;
2292                        if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2293                                l_ptr->max_pkt = l_ptr->max_pkt_target;
2294                } else {
2295                        l_ptr->max_pkt = l_ptr->max_pkt_target;
2296                }
2297                l_ptr->owner->bclink.supported = (max_pkt_info != 0);
2298
2299                link_state_event(l_ptr, msg_type(msg));
2300
2301                l_ptr->peer_session = msg_session(msg);
2302                l_ptr->peer_bearer_id = msg_bearer_id(msg);
2303
2304                /* Synchronize broadcast sequence numbers */
2305                if (!tipc_node_has_redundant_links(l_ptr->owner)) {
2306                        l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2307                }
2308                break;
2309        case STATE_MSG:
2310
2311                if ((msg_tol = msg_link_tolerance(msg)))
2312                        link_set_supervision_props(l_ptr, msg_tol);
2313
2314                if (msg_linkprio(msg) &&
2315                    (msg_linkprio(msg) != l_ptr->priority)) {
2316                        warn("Resetting link <%s>, priority change %u->%u\n",
2317                             l_ptr->name, l_ptr->priority, msg_linkprio(msg));
2318                        l_ptr->priority = msg_linkprio(msg);
2319                        tipc_link_reset(l_ptr); /* Enforce change to take effect */
2320                        break;
2321                }
2322                link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2323                l_ptr->stats.recv_states++;
2324                if (link_reset_unknown(l_ptr))
2325                        break;
2326
2327                if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2328                        rec_gap = mod(msg_next_sent(msg) -
2329                                      mod(l_ptr->next_in_no));
2330                }
2331
2332                max_pkt_ack = msg_max_pkt(msg);
2333                if (max_pkt_ack > l_ptr->max_pkt) {
2334                        dbg("Link <%s> updated MTU %u -> %u\n",
2335                            l_ptr->name, l_ptr->max_pkt, max_pkt_ack);
2336                        l_ptr->max_pkt = max_pkt_ack;
2337                        l_ptr->max_pkt_probes = 0;
2338                }
2339
2340                max_pkt_ack = 0;
2341                if (msg_probe(msg)) {
2342                        l_ptr->stats.recv_probes++;
2343                        if (msg_size(msg) > sizeof(l_ptr->proto_msg)) {
2344                                max_pkt_ack = msg_size(msg);
2345                        }
2346                }
2347
2348                /* Protocol message before retransmits, reduce loss risk */
2349
2350                tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
2351
2352                if (rec_gap || (msg_probe(msg))) {
2353                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2354                                                 0, rec_gap, 0, 0, max_pkt_ack);
2355                }
2356                if (msg_seq_gap(msg)) {
2357                        msg_dbg(msg, "With Gap:");
2358                        l_ptr->stats.recv_nacks++;
2359                        tipc_link_retransmit(l_ptr, l_ptr->first_out,
2360                                             msg_seq_gap(msg));
2361                }
2362                break;
2363        default:
2364                msg_dbg(buf_msg(buf), "<DISCARDING UNKNOWN<");
2365        }
2366exit:
2367        buf_discard(buf);
2368}
2369
2370
2371/*
2372 * tipc_link_tunnel(): Send one message via a link belonging to
2373 * another bearer. Owner node is locked.
2374 */
2375void tipc_link_tunnel(struct link *l_ptr,
2376                      struct tipc_msg *tunnel_hdr,
2377                      struct tipc_msg  *msg,
2378                      u32 selector)
2379{
2380        struct link *tunnel;
2381        struct sk_buff *buf;
2382        u32 length = msg_size(msg);
2383
2384        tunnel = l_ptr->owner->active_links[selector & 1];
2385        if (!tipc_link_is_up(tunnel)) {
2386                warn("Link changeover error, "
2387                     "tunnel link no longer available\n");
2388                return;
2389        }
2390        msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2391        buf = buf_acquire(length + INT_H_SIZE);
2392        if (!buf) {
2393                warn("Link changeover error, "
2394                     "unable to send tunnel msg\n");
2395                return;
2396        }
2397        skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2398        skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2399        dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane);
2400        msg_dbg(buf_msg(buf), ">SEND>");
2401        tipc_link_send_buf(tunnel, buf);
2402}
2403
2404
2405
2406/*
2407 * changeover(): Send whole message queue via the remaining link
2408 *               Owner node is locked.
2409 */
2410
2411void tipc_link_changeover(struct link *l_ptr)
2412{
2413        u32 msgcount = l_ptr->out_queue_size;
2414        struct sk_buff *crs = l_ptr->first_out;
2415        struct link *tunnel = l_ptr->owner->active_links[0];
2416        struct tipc_msg tunnel_hdr;
2417        int split_bundles;
2418
2419        if (!tunnel)
2420                return;
2421
2422        if (!l_ptr->owner->permit_changeover) {
2423                warn("Link changeover error, "
2424                     "peer did not permit changeover\n");
2425                return;
2426        }
2427
2428        msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2429                 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2430        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2431        msg_set_msgcnt(&tunnel_hdr, msgcount);
2432        dbg("Link changeover requires %u tunnel messages\n", msgcount);
2433
2434        if (!l_ptr->first_out) {
2435                struct sk_buff *buf;
2436
2437                buf = buf_acquire(INT_H_SIZE);
2438                if (buf) {
2439                        skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2440                        msg_set_size(&tunnel_hdr, INT_H_SIZE);
2441                        dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2442                            tunnel->b_ptr->net_plane);
2443                        msg_dbg(&tunnel_hdr, "EMPTY>SEND>");
2444                        tipc_link_send_buf(tunnel, buf);
2445                } else {
2446                        warn("Link changeover error, "
2447                             "unable to send changeover msg\n");
2448                }
2449                return;
2450        }
2451
2452        split_bundles = (l_ptr->owner->active_links[0] !=
2453                         l_ptr->owner->active_links[1]);
2454
2455        while (crs) {
2456                struct tipc_msg *msg = buf_msg(crs);
2457
2458                if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2459                        struct tipc_msg *m = msg_get_wrapped(msg);
2460                        unchar* pos = (unchar*)m;
2461
2462                        msgcount = msg_msgcnt(msg);
2463                        while (msgcount--) {
2464                                msg_set_seqno(m,msg_seqno(msg));
2465                                tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2466                                                 msg_link_selector(m));
2467                                pos += align(msg_size(m));
2468                                m = (struct tipc_msg *)pos;
2469                        }
2470                } else {
2471                        tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2472                                         msg_link_selector(msg));
2473                }
2474                crs = crs->next;
2475        }
2476}
2477
2478void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel)
2479{
2480        struct sk_buff *iter;
2481        struct tipc_msg tunnel_hdr;
2482
2483        msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2484                 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
2485        msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2486        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2487        iter = l_ptr->first_out;
2488        while (iter) {
2489                struct sk_buff *outbuf;
2490                struct tipc_msg *msg = buf_msg(iter);
2491                u32 length = msg_size(msg);
2492
2493                if (msg_user(msg) == MSG_BUNDLER)
2494                        msg_set_type(msg, CLOSED_MSG);
2495                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));        /* Update */
2496                msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2497                msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2498                outbuf = buf_acquire(length + INT_H_SIZE);
2499                if (outbuf == NULL) {
2500                        warn("Link changeover error, "
2501                             "unable to send duplicate msg\n");
2502                        return;
2503                }
2504                skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2505                skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2506                                               length);
2507                dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2508                    tunnel->b_ptr->net_plane);
2509                msg_dbg(buf_msg(outbuf), ">SEND>");
2510                tipc_link_send_buf(tunnel, outbuf);
2511                if (!tipc_link_is_up(l_ptr))
2512                        return;
2513                iter = iter->next;
2514        }
2515}
2516
2517
2518
2519/**
2520 * buf_extract - extracts embedded TIPC message from another message
2521 * @skb: encapsulating message buffer
2522 * @from_pos: offset to extract from
2523 *
2524 * Returns a new message buffer containing an embedded message.  The
2525 * encapsulating message itself is left unchanged.
2526 */
2527
2528static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2529{
2530        struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2531        u32 size = msg_size(msg);
2532        struct sk_buff *eb;
2533
2534        eb = buf_acquire(size);
2535        if (eb)
2536                skb_copy_to_linear_data(eb, msg, size);
2537        return eb;
2538}
2539
2540/*
2541 *  link_recv_changeover_msg(): Receive tunneled packet sent
2542 *  via other link. Node is locked. Return extracted buffer.
2543 */
2544
2545static int link_recv_changeover_msg(struct link **l_ptr,
2546                                    struct sk_buff **buf)
2547{
2548        struct sk_buff *tunnel_buf = *buf;
2549        struct link *dest_link;
2550        struct tipc_msg *msg;
2551        struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2552        u32 msg_typ = msg_type(tunnel_msg);
2553        u32 msg_count = msg_msgcnt(tunnel_msg);
2554
2555        dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)];
2556        if (!dest_link) {
2557                msg_dbg(tunnel_msg, "NOLINK/<REC<");
2558                goto exit;
2559        }
2560        if (dest_link == *l_ptr) {
2561                err("Unexpected changeover message on link <%s>\n",
2562                    (*l_ptr)->name);
2563                goto exit;
2564        }
2565        dbg("%c<-%c:", dest_link->b_ptr->net_plane,
2566            (*l_ptr)->b_ptr->net_plane);
2567        *l_ptr = dest_link;
2568        msg = msg_get_wrapped(tunnel_msg);
2569
2570        if (msg_typ == DUPLICATE_MSG) {
2571                if (less(msg_seqno(msg), mod(dest_link->next_in_no))) {
2572                        msg_dbg(tunnel_msg, "DROP/<REC<");
2573                        goto exit;
2574                }
2575                *buf = buf_extract(tunnel_buf,INT_H_SIZE);
2576                if (*buf == NULL) {
2577                        warn("Link changeover error, duplicate msg dropped\n");
2578                        goto exit;
2579                }
2580                msg_dbg(tunnel_msg, "TNL<REC<");
2581                buf_discard(tunnel_buf);
2582                return 1;
2583        }
2584
2585        /* First original message ?: */
2586
2587        if (tipc_link_is_up(dest_link)) {
2588                msg_dbg(tunnel_msg, "UP/FIRST/<REC<");
2589                info("Resetting link <%s>, changeover initiated by peer\n",
2590                     dest_link->name);
2591                tipc_link_reset(dest_link);
2592                dest_link->exp_msg_count = msg_count;
2593                dbg("Expecting %u tunnelled messages\n", msg_count);
2594                if (!msg_count)
2595                        goto exit;
2596        } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2597                msg_dbg(tunnel_msg, "BLK/FIRST/<REC<");
2598                dest_link->exp_msg_count = msg_count;
2599                dbg("Expecting %u tunnelled messages\n", msg_count);
2600                if (!msg_count)
2601                        goto exit;
2602        }
2603
2604        /* Receive original message */
2605
2606        if (dest_link->exp_msg_count == 0) {
2607                warn("Link switchover error, "
2608                     "got too many tunnelled messages\n");
2609                msg_dbg(tunnel_msg, "OVERDUE/DROP/<REC<");
2610                dbg_print_link(dest_link, "LINK:");
2611                goto exit;
2612        }
2613        dest_link->exp_msg_count--;
2614        if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2615                msg_dbg(tunnel_msg, "DROP/DUPL/<REC<");
2616                goto exit;
2617        } else {
2618                *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2619                if (*buf != NULL) {
2620                        msg_dbg(tunnel_msg, "TNL<REC<");
2621                        buf_discard(tunnel_buf);
2622                        return 1;
2623                } else {
2624                        warn("Link changeover error, original msg dropped\n");
2625                }
2626        }
2627exit:
2628        *buf = NULL;
2629        buf_discard(tunnel_buf);
2630        return 0;
2631}
2632
2633/*
2634 *  Bundler functionality:
2635 */
2636void tipc_link_recv_bundle(struct sk_buff *buf)
2637{
2638        u32 msgcount = msg_msgcnt(buf_msg(buf));
2639        u32 pos = INT_H_SIZE;
2640        struct sk_buff *obuf;
2641
2642        msg_dbg(buf_msg(buf), "<BNDL<: ");
2643        while (msgcount--) {
2644                obuf = buf_extract(buf, pos);
2645                if (obuf == NULL) {
2646                        warn("Link unable to unbundle message(s)\n");
2647                        break;
2648                }
2649                pos += align(msg_size(buf_msg(obuf)));
2650                msg_dbg(buf_msg(obuf), "     /");
2651                tipc_net_route_msg(obuf);
2652        }
2653        buf_discard(buf);
2654}
2655
2656/*
2657 *  Fragmentation/defragmentation:
2658 */
2659
2660
2661/*
2662 * tipc_link_send_long_buf: Entry for buffers needing fragmentation.
2663 * The buffer is complete, inclusive total message length.
2664 * Returns user data length.
2665 */
2666int tipc_link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2667{
2668        struct tipc_msg *inmsg = buf_msg(buf);
2669        struct tipc_msg fragm_hdr;
2670        u32 insize = msg_size(inmsg);
2671        u32 dsz = msg_data_sz(inmsg);
2672        unchar *crs = buf->data;
2673        u32 rest = insize;
2674        u32 pack_sz = link_max_pkt(l_ptr);
2675        u32 fragm_sz = pack_sz - INT_H_SIZE;
2676        u32 fragm_no = 1;
2677        u32 destaddr;
2678
2679        if (msg_short(inmsg))
2680                destaddr = l_ptr->addr;
2681        else
2682                destaddr = msg_destnode(inmsg);
2683
2684        if (msg_routed(inmsg))
2685                msg_set_prevnode(inmsg, tipc_own_addr);
2686
2687        /* Prepare reusable fragment header: */
2688
2689        msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2690                 INT_H_SIZE, destaddr);
2691        msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
2692        msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
2693        msg_set_fragm_no(&fragm_hdr, fragm_no);
2694        l_ptr->stats.sent_fragmented++;
2695
2696        /* Chop up message: */
2697
2698        while (rest > 0) {
2699                struct sk_buff *fragm;
2700
2701                if (rest <= fragm_sz) {
2702                        fragm_sz = rest;
2703                        msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2704                }
2705                fragm = buf_acquire(fragm_sz + INT_H_SIZE);
2706                if (fragm == NULL) {
2707                        warn("Link unable to fragment message\n");
2708                        dsz = -ENOMEM;
2709                        goto exit;
2710                }
2711                msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2712                skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2713                skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2714                                               fragm_sz);
2715                /*  Send queued messages first, if any: */
2716
2717                l_ptr->stats.sent_fragments++;
2718                tipc_link_send_buf(l_ptr, fragm);
2719                if (!tipc_link_is_up(l_ptr))
2720                        return dsz;
2721                msg_set_fragm_no(&fragm_hdr, ++fragm_no);
2722                rest -= fragm_sz;
2723                crs += fragm_sz;
2724                msg_set_type(&fragm_hdr, FRAGMENT);
2725        }
2726exit:
2727        buf_discard(buf);
2728        return dsz;
2729}
2730
2731/*
2732 * A pending message being re-assembled must store certain values
2733 * to handle subsequent fragments correctly. The following functions
2734 * help storing these values in unused, available fields in the
2735 * pending message. This makes dynamic memory allocation unecessary.
2736 */
2737
2738static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2739{
2740        msg_set_seqno(buf_msg(buf), seqno);
2741}
2742
2743static u32 get_fragm_size(struct sk_buff *buf)
2744{
2745        return msg_ack(buf_msg(buf));
2746}
2747
2748static void set_fragm_size(struct sk_buff *buf, u32 sz)
2749{
2750        msg_set_ack(buf_msg(buf), sz);
2751}
2752
2753static u32 get_expected_frags(struct sk_buff *buf)
2754{
2755        return msg_bcast_ack(buf_msg(buf));
2756}
2757
2758static void set_expected_frags(struct sk_buff *buf, u32 exp)
2759{
2760        msg_set_bcast_ack(buf_msg(buf), exp);
2761}
2762
2763static u32 get_timer_cnt(struct sk_buff *buf)
2764{
2765        return msg_reroute_cnt(buf_msg(buf));
2766}
2767
2768static void incr_timer_cnt(struct sk_buff *buf)
2769{
2770        msg_incr_reroute_cnt(buf_msg(buf));
2771}
2772
2773/*
2774 * tipc_link_recv_fragment(): Called with node lock on. Returns
2775 * the reassembled buffer if message is complete.
2776 */
2777int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2778                            struct tipc_msg **m)
2779{
2780        struct sk_buff *prev = NULL;
2781        struct sk_buff *fbuf = *fb;
2782        struct tipc_msg *fragm = buf_msg(fbuf);
2783        struct sk_buff *pbuf = *pending;
2784        u32 long_msg_seq_no = msg_long_msgno(fragm);
2785
2786        *fb = NULL;
2787        msg_dbg(fragm,"FRG<REC<");
2788
2789        /* Is there an incomplete message waiting for this fragment? */
2790
2791        while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no)
2792                        || (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2793                prev = pbuf;
2794                pbuf = pbuf->next;
2795        }
2796
2797        if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2798                struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2799                u32 msg_sz = msg_size(imsg);
2800                u32 fragm_sz = msg_data_sz(fragm);
2801                u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz);
2802                u32 max =  TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE;
2803                if (msg_type(imsg) == TIPC_MCAST_MSG)
2804                        max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2805                if (msg_size(imsg) > max) {
2806                        msg_dbg(fragm,"<REC<Oversized: ");
2807                        buf_discard(fbuf);
2808                        return 0;
2809                }
2810                pbuf = buf_acquire(msg_size(imsg));
2811                if (pbuf != NULL) {
2812                        pbuf->next = *pending;
2813                        *pending = pbuf;
2814                        skb_copy_to_linear_data(pbuf, imsg,
2815                                                msg_data_sz(fragm));
2816                        /*  Prepare buffer for subsequent fragments. */
2817
2818                        set_long_msg_seqno(pbuf, long_msg_seq_no);
2819                        set_fragm_size(pbuf,fragm_sz);
2820                        set_expected_frags(pbuf,exp_fragm_cnt - 1);
2821                } else {
2822                        warn("Link unable to reassemble fragmented message\n");
2823                }
2824                buf_discard(fbuf);
2825                return 0;
2826        } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2827                u32 dsz = msg_data_sz(fragm);
2828                u32 fsz = get_fragm_size(pbuf);
2829                u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2830                u32 exp_frags = get_expected_frags(pbuf) - 1;
2831                skb_copy_to_linear_data_offset(pbuf, crs,
2832                                               msg_data(fragm), dsz);
2833                buf_discard(fbuf);
2834
2835                /* Is message complete? */
2836
2837                if (exp_frags == 0) {
2838                        if (prev)
2839                                prev->next = pbuf->next;
2840                        else
2841                                *pending = pbuf->next;
2842                        msg_reset_reroute_cnt(buf_msg(pbuf));
2843                        *fb = pbuf;
2844                        *m = buf_msg(pbuf);
2845                        return 1;
2846                }
2847                set_expected_frags(pbuf,exp_frags);
2848                return 0;
2849        }
2850        dbg(" Discarding orphan fragment %x\n",fbuf);
2851        msg_dbg(fragm,"ORPHAN:");
2852        dbg("Pending long buffers:\n");
2853        dbg_print_buf_chain(*pending);
2854        buf_discard(fbuf);
2855        return 0;
2856}
2857
2858/**
2859 * link_check_defragm_bufs - flush stale incoming message fragments
2860 * @l_ptr: pointer to link
2861 */
2862
2863static void link_check_defragm_bufs(struct link *l_ptr)
2864{
2865        struct sk_buff *prev = NULL;
2866        struct sk_buff *next = NULL;
2867        struct sk_buff *buf = l_ptr->defragm_buf;
2868
2869        if (!buf)
2870                return;
2871        if (!link_working_working(l_ptr))
2872                return;
2873        while (buf) {
2874                u32 cnt = get_timer_cnt(buf);
2875
2876                next = buf->next;
2877                if (cnt < 4) {
2878                        incr_timer_cnt(buf);
2879                        prev = buf;
2880                } else {
2881                        dbg(" Discarding incomplete long buffer\n");
2882                        msg_dbg(buf_msg(buf), "LONG:");
2883                        dbg_print_link(l_ptr, "curr:");
2884                        dbg("Pending long buffers:\n");
2885                        dbg_print_buf_chain(l_ptr->defragm_buf);
2886                        if (prev)
2887                                prev->next = buf->next;
2888                        else
2889                                l_ptr->defragm_buf = buf->next;
2890                        buf_discard(buf);
2891                }
2892                buf = next;
2893        }
2894}
2895
2896
2897
2898static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
2899{
2900        l_ptr->tolerance = tolerance;
2901        l_ptr->continuity_interval =
2902                ((tolerance / 4) > 500) ? 500 : tolerance / 4;
2903        l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2904}
2905
2906
2907void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
2908{
2909        /* Data messages from this node, inclusive FIRST_FRAGM */
2910        l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
2911        l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
2912        l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
2913        l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
2914        /* Transiting data messages,inclusive FIRST_FRAGM */
2915        l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
2916        l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
2917        l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
2918        l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
2919        l_ptr->queue_limit[CONN_MANAGER] = 1200;
2920        l_ptr->queue_limit[ROUTE_DISTRIBUTOR] = 1200;
2921        l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2922        l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2923        /* FRAGMENT and LAST_FRAGMENT packets */
2924        l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2925}
2926
2927/**
2928 * link_find_link - locate link by name
2929 * @name - ptr to link name string
2930 * @node - ptr to area to be filled with ptr to associated node
2931 *
2932 * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2933 * this also prevents link deletion.
2934 *
2935 * Returns pointer to link (or 0 if invalid link name).
2936 */
2937
2938static struct link *link_find_link(const char *name, struct tipc_node **node)
2939{
2940        struct link_name link_name_parts;
2941        struct bearer *b_ptr;
2942        struct link *l_ptr;
2943
2944        if (!link_name_validate(name, &link_name_parts))
2945                return NULL;
2946
2947        b_ptr = tipc_bearer_find_interface(link_name_parts.if_local);
2948        if (!b_ptr)
2949                return NULL;
2950
2951        *node = tipc_node_find(link_name_parts.addr_peer);
2952        if (!*node)
2953                return NULL;
2954
2955        l_ptr = (*node)->links[b_ptr->identity];
2956        if (!l_ptr || strcmp(l_ptr->name, name))
2957                return NULL;
2958
2959        return l_ptr;
2960}
2961
2962struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2963                                     u16 cmd)
2964{
2965        struct tipc_link_config *args;
2966        u32 new_value;
2967        struct link *l_ptr;
2968        struct tipc_node *node;
2969        int res;
2970
2971        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2972                return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2973
2974        args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2975        new_value = ntohl(args->value);
2976
2977        if (!strcmp(args->name, tipc_bclink_name)) {
2978                if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2979                    (tipc_bclink_set_queue_limits(new_value) == 0))
2980                        return tipc_cfg_reply_none();
2981                return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2982                                                   " (cannot change setting on broadcast link)");
2983        }
2984
2985        read_lock_bh(&tipc_net_lock);
2986        l_ptr = link_find_link(args->name, &node);
2987        if (!l_ptr) {
2988                read_unlock_bh(&tipc_net_lock);
2989                return tipc_cfg_reply_error_string("link not found");
2990        }
2991
2992        tipc_node_lock(node);
2993        res = -EINVAL;
2994        switch (cmd) {
2995        case TIPC_CMD_SET_LINK_TOL:
2996                if ((new_value >= TIPC_MIN_LINK_TOL) &&
2997                    (new_value <= TIPC_MAX_LINK_TOL)) {
2998                        link_set_supervision_props(l_ptr, new_value);
2999                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
3000                                                 0, 0, new_value, 0, 0);
3001                        res = 0;
3002                }
3003                break;
3004        case TIPC_CMD_SET_LINK_PRI:
3005                if ((new_value >= TIPC_MIN_LINK_PRI) &&
3006                    (new_value <= TIPC_MAX_LINK_PRI)) {
3007                        l_ptr->priority = new_value;
3008                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
3009                                                 0, 0, 0, new_value, 0);
3010                        res = 0;
3011                }
3012                break;
3013        case TIPC_CMD_SET_LINK_WINDOW:
3014                if ((new_value >= TIPC_MIN_LINK_WIN) &&
3015                    (new_value <= TIPC_MAX_LINK_WIN)) {
3016                        tipc_link_set_queue_limits(l_ptr, new_value);
3017                        res = 0;
3018                }
3019                break;
3020        }
3021        tipc_node_unlock(node);
3022
3023        read_unlock_bh(&tipc_net_lock);
3024        if (res)
3025                return tipc_cfg_reply_error_string("cannot change link setting");
3026
3027        return tipc_cfg_reply_none();
3028}
3029
3030/**
3031 * link_reset_statistics - reset link statistics
3032 * @l_ptr: pointer to link
3033 */
3034
3035static void link_reset_statistics(struct link *l_ptr)
3036{
3037        memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
3038        l_ptr->stats.sent_info = l_ptr->next_out_no;
3039        l_ptr->stats.recv_info = l_ptr->next_in_no;
3040}
3041
3042struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
3043{
3044        char *link_name;
3045        struct link *l_ptr;
3046        struct tipc_node *node;
3047
3048        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
3049                return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
3050
3051        link_name = (char *)TLV_DATA(req_tlv_area);
3052        if (!strcmp(link_name, tipc_bclink_name)) {
3053                if (tipc_bclink_reset_stats())
3054                        return tipc_cfg_reply_error_string("link not found");
3055                return tipc_cfg_reply_none();
3056        }
3057
3058        read_lock_bh(&tipc_net_lock);
3059        l_ptr = link_find_link(link_name, &node);
3060        if (!l_ptr) {
3061                read_unlock_bh(&tipc_net_lock);
3062                return tipc_cfg_reply_error_string("link not found");
3063        }
3064
3065        tipc_node_lock(node);
3066        link_reset_statistics(l_ptr);
3067        tipc_node_unlock(node);
3068        read_unlock_bh(&tipc_net_lock);
3069        return tipc_cfg_reply_none();
3070}
3071
3072/**
3073 * percent - convert count to a percentage of total (rounding up or down)
3074 */
3075
3076static u32 percent(u32 count, u32 total)
3077{
3078        return (count * 100 + (total / 2)) / total;
3079}
3080
3081/**
3082 * tipc_link_stats - print link statistics
3083 * @name: link name
3084 * @buf: print buffer area
3085 * @buf_size: size of print buffer area
3086 *
3087 * Returns length of print buffer data string (or 0 if error)
3088 */
3089
3090static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
3091{
3092        struct print_buf pb;
3093        struct link *l_ptr;
3094        struct tipc_node *node;
3095        char *status;
3096        u32 profile_total = 0;
3097
3098        if (!strcmp(name, tipc_bclink_name))
3099                return tipc_bclink_stats(buf, buf_size);
3100
3101        tipc_printbuf_init(&pb, buf, buf_size);
3102
3103        read_lock_bh(&tipc_net_lock);
3104        l_ptr = link_find_link(name, &node);
3105        if (!l_ptr) {
3106                read_unlock_bh(&tipc_net_lock);
3107                return 0;
3108        }
3109        tipc_node_lock(node);
3110
3111        if (tipc_link_is_active(l_ptr))
3112                status = "ACTIVE";
3113        else if (tipc_link_is_up(l_ptr))
3114                status = "STANDBY";
3115        else
3116                status = "DEFUNCT";
3117        tipc_printf(&pb, "Link <%s>\n"
3118                         "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
3119                         "  Window:%u packets\n",
3120                    l_ptr->name, status, link_max_pkt(l_ptr),
3121                    l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]);
3122        tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
3123                    l_ptr->next_in_no - l_ptr->stats.recv_info,
3124                    l_ptr->stats.recv_fragments,
3125                    l_ptr->stats.recv_fragmented,
3126                    l_ptr->stats.recv_bundles,
3127                    l_ptr->stats.recv_bundled);
3128        tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
3129                    l_ptr->next_out_no - l_ptr->stats.sent_info,
3130                    l_ptr->stats.sent_fragments,
3131                    l_ptr->stats.sent_fragmented,
3132                    l_ptr->stats.sent_bundles,
3133                    l_ptr->stats.sent_bundled);
3134        profile_total = l_ptr->stats.msg_length_counts;
3135        if (!profile_total)
3136                profile_total = 1;
3137        tipc_printf(&pb, "  TX profile sample:%u packets  average:%u octets\n"
3138                         "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
3139                         "-16354:%u%% -32768:%u%% -66000:%u%%\n",
3140                    l_ptr->stats.msg_length_counts,
3141                    l_ptr->stats.msg_lengths_total / profile_total,
3142                    percent(l_ptr->stats.msg_length_profile[0], profile_total),
3143                    percent(l_ptr->stats.msg_length_profile[1], profile_total),
3144                    percent(l_ptr->stats.msg_length_profile[2], profile_total),
3145                    percent(l_ptr->stats.msg_length_profile[3], profile_total),
3146                    percent(l_ptr->stats.msg_length_profile[4], profile_total),
3147                    percent(l_ptr->stats.msg_length_profile[5], profile_total),
3148                    percent(l_ptr->stats.msg_length_profile[6], profile_total));
3149        tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n",
3150                    l_ptr->stats.recv_states,
3151                    l_ptr->stats.recv_probes,
3152                    l_ptr->stats.recv_nacks,
3153                    l_ptr->stats.deferred_recv,
3154                    l_ptr->stats.duplicates);
3155        tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n",
3156                    l_ptr->stats.sent_states,
3157                    l_ptr->stats.sent_probes,
3158                    l_ptr->stats.sent_nacks,
3159                    l_ptr->stats.sent_acks,
3160                    l_ptr->stats.retransmitted);
3161        tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
3162                    l_ptr->stats.bearer_congs,
3163                    l_ptr->stats.link_congs,
3164                    l_ptr->stats.max_queue_sz,
3165                    l_ptr->stats.queue_sz_counts
3166                    ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts)
3167                    : 0);
3168
3169        tipc_node_unlock(node);
3170        read_unlock_bh(&tipc_net_lock);
3171        return tipc_printbuf_validate(&pb);
3172}
3173
3174#define MAX_LINK_STATS_INFO 2000
3175
3176struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
3177{
3178        struct sk_buff *buf;
3179        struct tlv_desc *rep_tlv;
3180        int str_len;
3181
3182        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
3183                return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
3184
3185        buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO));
3186        if (!buf)
3187                return NULL;
3188
3189        rep_tlv = (struct tlv_desc *)buf->data;
3190
3191        str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
3192                                  (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
3193        if (!str_len) {
3194                buf_discard(buf);
3195                return tipc_cfg_reply_error_string("link not found");
3196        }
3197
3198        skb_put(buf, TLV_SPACE(str_len));
3199        TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
3200
3201        return buf;
3202}
3203
3204#if 0
3205int link_control(const char *name, u32 op, u32 val)
3206{
3207        int res = -EINVAL;
3208        struct link *l_ptr;
3209        u32 bearer_id;
3210        struct tipc_node * node;
3211        u32 a;
3212
3213        a = link_name2addr(name, &bearer_id);
3214        read_lock_bh(&tipc_net_lock);
3215        node = tipc_node_find(a);
3216        if (node) {
3217                tipc_node_lock(node);
3218                l_ptr = node->links[bearer_id];
3219                if (l_ptr) {
3220                        if (op == TIPC_REMOVE_LINK) {
3221                                struct bearer *b_ptr = l_ptr->b_ptr;
3222                                spin_lock_bh(&b_ptr->publ.lock);
3223                                tipc_link_delete(l_ptr);
3224                                spin_unlock_bh(&b_ptr->publ.lock);
3225                        }
3226                        if (op == TIPC_CMD_BLOCK_LINK) {
3227                                tipc_link_reset(l_ptr);
3228                                l_ptr->blocked = 1;
3229                        }
3230                        if (op == TIPC_CMD_UNBLOCK_LINK) {
3231                                l_ptr->blocked = 0;
3232                        }
3233                        res = 0;
3234                }
3235                tipc_node_unlock(node);
3236        }
3237        read_unlock_bh(&tipc_net_lock);
3238        return res;
3239}
3240#endif
3241
3242/**
3243 * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
3244 * @dest: network address of destination node
3245 * @selector: used to select from set of active links
3246 *
3247 * If no active link can be found, uses default maximum packet size.
3248 */
3249
3250u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
3251{
3252        struct tipc_node *n_ptr;
3253        struct link *l_ptr;
3254        u32 res = MAX_PKT_DEFAULT;
3255
3256        if (dest == tipc_own_addr)
3257                return MAX_MSG_SIZE;
3258
3259        read_lock_bh(&tipc_net_lock);
3260        n_ptr = tipc_node_select(dest, selector);
3261        if (n_ptr) {
3262                tipc_node_lock(n_ptr);
3263                l_ptr = n_ptr->active_links[selector & 1];
3264                if (l_ptr)
3265                        res = link_max_pkt(l_ptr);
3266                tipc_node_unlock(n_ptr);
3267        }
3268        read_unlock_bh(&tipc_net_lock);
3269        return res;
3270}
3271
3272#if 0
3273static void link_dump_rec_queue(struct link *l_ptr)
3274{
3275        struct sk_buff *crs;
3276
3277        if (!l_ptr->oldest_deferred_in) {
3278                info("Reception queue empty\n");
3279                return;
3280        }
3281        info("Contents of Reception queue:\n");
3282        crs = l_ptr->oldest_deferred_in;
3283        while (crs) {
3284                if (crs->data == (void *)0x0000a3a3) {
3285                        info("buffer %x invalid\n", crs);
3286                        return;
3287                }
3288                msg_dbg(buf_msg(crs), "In rec queue: \n");
3289                crs = crs->next;
3290        }
3291}
3292#endif
3293
3294static void link_dump_send_queue(struct link *l_ptr)
3295{
3296        if (l_ptr->next_out) {
3297                info("\nContents of unsent queue:\n");
3298                dbg_print_buf_chain(l_ptr->next_out);
3299        }
3300        info("\nContents of send queue:\n");
3301        if (l_ptr->first_out) {
3302                dbg_print_buf_chain(l_ptr->first_out);
3303        }
3304        info("Empty send queue\n");
3305}
3306
3307static void link_print(struct link *l_ptr, struct print_buf *buf,
3308                       const char *str)
3309{
3310        tipc_printf(buf, str);
3311        if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
3312                return;
3313        tipc_printf(buf, "Link %x<%s>:",
3314                    l_ptr->addr, l_ptr->b_ptr->publ.name);
3315        tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no));
3316        tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no));
3317        tipc_printf(buf, "SQUE");
3318        if (l_ptr->first_out) {
3319                tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out)));
3320                if (l_ptr->next_out)
3321                        tipc_printf(buf, "%u..",
3322                                    msg_seqno(buf_msg(l_ptr->next_out)));
3323                tipc_printf(buf, "%u]",
3324                            msg_seqno(buf_msg
3325                                      (l_ptr->last_out)), l_ptr->out_queue_size);
3326                if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) -
3327                         msg_seqno(buf_msg(l_ptr->first_out)))
3328                     != (l_ptr->out_queue_size - 1))
3329                    || (l_ptr->last_out->next != NULL)) {
3330                        tipc_printf(buf, "\nSend queue inconsistency\n");
3331                        tipc_printf(buf, "first_out= %x ", l_ptr->first_out);
3332                        tipc_printf(buf, "next_out= %x ", l_ptr->next_out);
3333                        tipc_printf(buf, "last_out= %x ", l_ptr->last_out);
3334                        link_dump_send_queue(l_ptr);
3335                }
3336        } else
3337                tipc_printf(buf, "[]");
3338        tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size);
3339        if (l_ptr->oldest_deferred_in) {
3340                u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
3341                u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in));
3342                tipc_printf(buf, ":RQUE[%u..%u]", o, n);
3343                if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) {
3344                        tipc_printf(buf, ":RQSIZ(%u)",
3345                                    l_ptr->deferred_inqueue_sz);
3346                }
3347        }
3348        if (link_working_unknown(l_ptr))
3349                tipc_printf(buf, ":WU");
3350        if (link_reset_reset(l_ptr))
3351                tipc_printf(buf, ":RR");
3352        if (link_reset_unknown(l_ptr))
3353                tipc_printf(buf, ":RU");
3354        if (link_working_working(l_ptr))
3355                tipc_printf(buf, ":WW");
3356        tipc_printf(buf, "\n");
3357}
3358